MpmcSender<T> class final

Multi-producer sender for MPMC channels with full cloning support.

Enables multiple producers to send concurrently to multiple competing consumers. Handles coordination for competitive consumption patterns.

Key Features

  • Multi-producer support: Multiple senders can send simultaneously
  • Consumer coordination: Manages competition between multiple consumers
  • Clone support: clone creates independent sender handles
  • Reference counting: Channel remains open while any sender exists
  • Load balancing: Helps distribute work among consumers

Performance Notes

  • Additional overhead: More coordination than MPSC due to multiple consumers
  • Scalable design: Handles multiple producers reasonably well
  • Consumer impact: Performance affected by number of competing consumers
  • Batch support: Compatible with batch sending extensions

Example - Distributed request processing:

final (requestTx, requestRx) = Mpmc.bounded<WorkRequest>(capacity: 1000);

// Multiple client threads (producers)
final clientSenders = <MpmcSender<WorkRequest>>[];
for (int i = 0; i < clientThreads; i++) {
  clientSenders.add(requestTx.clone());
}
requestTx.close(); // Close original handle

// Each client thread sends requests
for (int i = 0; i < clientThreads; i++) {
  final clientId = i;
  final sender = clientSenders[i];

  Thread.run(() async {
    while (clientActive[clientId]) {
      final request = generateClientRequest(clientId);
      await sender.send(request); // Thread-safe concurrent sending
    }
    sender.close(); // Client finished
  });
}

Example - Multi-stage pipeline:

// Stage 1 → Stage 2 (multiple producers → multiple consumers)
final (stage2Tx, stage2Rx) = Mpmc.unbounded<IntermediateData>();

// Multiple stage-1 processors become stage-2 producers
final stage1Processors = <Thread>[];
for (int i = 0; i < stage1Count; i++) {
  final producerTx = stage2Tx.clone();

  stage1Processors.add(Thread(() async {
    await for (final rawData in stage1Sources[i].stream()) {
      final processed = await processStage1(rawData);
      await producerTx.send(processed); // Feeds competing stage-2 consumers
    }
    producerTx.close();
  }));
}
Available extensions

Properties

hashCode int
The hash code for this object.
no setterinherited
isDisconnected bool
no setter
runtimeType Type
A representation of the runtime type of the object.
no setterinherited

Methods

clone() MpmcSender<T>
close() → void
noSuchMethod(Invocation invocation) → dynamic
Invoked when a nonexistent method or property is accessed.
inherited
send(T v) Future<SendResult>
sendAll(Iterable<T> it) Future<void>

Available on Sender<T>, provided by the SenderBatchX extension

Send all items with backpressure handling.
sendTimeout(T v, Duration d) Future<SendResult>

Available on Sender<T>, provided by the SenderTimeoutX extension

Send a value with a timeout to prevent indefinite blocking.
toString() String
A string representation of this object.
inherited
trySend(T v) SendResult
trySendAll(Iterable<T> it) Future<void>

Available on Sender<T>, provided by the SenderBatchX extension

Send all items using trySend without waiting (best-effort).

Operators

operator ==(Object other) bool
The equality operator.
inherited