MpmcSender<T> class
final
Multi-producer sender for MPMC channels with full cloning support.
Enables multiple producers to send concurrently to multiple competing consumers. Handles coordination for competitive consumption patterns.
Key Features
- Multi-producer support: Multiple senders can send simultaneously
- Consumer coordination: Manages competition between multiple consumers
- Clone support: clone creates independent sender handles
- Reference counting: Channel remains open while any sender exists
- Load balancing: Helps distribute work among consumers
Performance Notes
- Additional overhead: More coordination than MPSC due to multiple consumers
- Scalable design: Handles multiple producers reasonably well
- Consumer impact: Performance affected by number of competing consumers
- Batch support: Compatible with batch sending extensions
Example - Distributed request processing:
final (requestTx, requestRx) = Mpmc.bounded<WorkRequest>(capacity: 1000);
// Multiple client threads (producers)
final clientSenders = <MpmcSender<WorkRequest>>[];
for (int i = 0; i < clientThreads; i++) {
clientSenders.add(requestTx.clone());
}
requestTx.close(); // Close original handle
// Each client thread sends requests
for (int i = 0; i < clientThreads; i++) {
final clientId = i;
final sender = clientSenders[i];
Thread.run(() async {
while (clientActive[clientId]) {
final request = generateClientRequest(clientId);
await sender.send(request); // Thread-safe concurrent sending
}
sender.close(); // Client finished
});
}
Example - Multi-stage pipeline:
// Stage 1 → Stage 2 (multiple producers → multiple consumers)
final (stage2Tx, stage2Rx) = Mpmc.unbounded<IntermediateData>();
// Multiple stage-1 processors become stage-2 producers
final stage1Processors = <Thread>[];
for (int i = 0; i < stage1Count; i++) {
final producerTx = stage2Tx.clone();
stage1Processors.add(Thread(() async {
await for (final rawData in stage1Sources[i].stream()) {
final processed = await processStage1(rawData);
await producerTx.send(processed); // Feeds competing stage-2 consumers
}
producerTx.close();
}));
}
- Available extensions
Properties
- hashCode → int
-
The hash code for this object.
no setterinherited
- isDisconnected → bool
-
no setter
- runtimeType → Type
-
A representation of the runtime type of the object.
no setterinherited
Methods
-
clone(
) → MpmcSender< T> -
close(
) → void -
noSuchMethod(
Invocation invocation) → dynamic -
Invoked when a nonexistent method or property is accessed.
inherited
-
send(
T v) → Future< SendResult> -
sendAll(
Iterable< T> it) → Future<void> -
Available on Sender<
Send all items with backpressure handling.T> , provided by the SenderBatchX extension -
sendTimeout(
T v, Duration d) → Future< SendResult> -
Available on Sender<
Send a value with a timeout to prevent indefinite blocking.T> , provided by the SenderTimeoutX extension -
toString(
) → String -
A string representation of this object.
inherited
-
trySend(
T v) → SendResult -
trySendAll(
Iterable< T> it) → Future<void> -
Available on Sender<
Send all items usingT> , provided by the SenderBatchX extensiontrySend
without waiting (best-effort).
Operators
-
operator ==(
Object other) → bool -
The equality operator.
inherited