MpmcReceiver<T> class final

Multi-consumer receiver with competitive message consumption.

Competes with other receivers for messages in a fair, load-balancing manner. Each message goes to exactly one consumer - this enables natural work distribution but requires careful coordination between consumers.

Competitive Consumption Semantics

  • One winner per message: Each message received by exactly one consumer
  • Fair competition: No consumer starvation under normal conditions
  • Load balancing: Work automatically distributed among active consumers
  • Clone independence: Each cloned receiver competes independently
  • Stream single-use: Each receiver's stream can only be used once

Performance Notes

  • Coordination overhead: Additional overhead compared to MPSC
  • Fair competition: Designed to prevent consumer monopolization
  • Reasonable scaling: Performance scales reasonably with consumer count
  • Shared buffer: Memory efficient with shared buffer design

Usage Guidelines

  • Worker pools: Perfect for distributing work among multiple workers
  • Load balancing: Natural distribution without explicit load balancer
  • Parallel processing: Scale processing by adding more consumer instances
  • Resource pooling: Multiple consumers can serve different resource types

Example - Worker pool with competitive consumption:

final (workTx, workRx) = Mpmc.bounded<Task>(capacity: 1000);

// Create worker pool with competing consumers
final workers = <MpmcReceiver<Task>>[];
for (int i = 0; i < workerCount; i++) {
  workers.add(workRx.clone());
}
workRx.close(); // Close original handle

// Each worker competes for tasks
for (int workerId = 0; workerId < workerCount; workerId++) {
  final worker = workers[workerId];

  Thread.run(() async {
    await for (final task in worker.stream()) {
      // Only this worker processes this task
      final result = await processTask(task, workerId);
      await task.resultChannel.send(result);
    }
  });
}

// Producers send work - automatically load balanced
for (final task in taskList) {
  await workTx.send(task); // Goes to one available worker
}

Example - Multi-tier processing pipeline:

// Tier 2: Multiple consumers compete for processed data
final (tier2Tx, tier2Rx) = Mpmc.channel<ProcessedData>(capacity: 500);

// Multiple tier-2 processors (competing consumers)
final tier2Consumers = <MpmcReceiver<ProcessedData>>[];
for (int i = 0; i < tier2ProcessorCount; i++) {
  tier2Consumers.add(tier2Rx.clone());
}

// Each tier-2 processor handles different data
for (int i = 0; i < tier2ProcessorCount; i++) {
  final consumer = tier2Consumers[i];
  final processorId = i;

  Thread.run(() async {
    await for (final data in consumer.stream()) {
      // Competitive consumption - each data goes to one processor
      final result = await processTier2(data, processorId);
      await tier3Channel.send(result);
    }
  });
}

Example - Request handling with failover:

final (requestTx, requestRx) = Mpmc.unbounded<Request>();

// Multiple server instances (competing consumers)
final serverInstances = <ServerInstance>[];
for (int i = 0; i < serverCount; i++) {
  final serverRx = requestRx.clone();
  serverInstances.add(ServerInstance(serverRx, serverId: i));

  Thread.run(() async {
    try {
      await for (final request in serverRx.stream()) {
        // Only one server handles each request
        final response = await serverInstances[i].handle(request);
        await request.responseChannel.send(response);
      }
    } catch (e) {
      // This server failed - others continue competing
      logger.error('Server $i failed: $e');
      serverRx.close();
    }
  });
}
Available extensions

Properties

hashCode int
The hash code for this object.
no setterinherited
isDisconnected bool
no setter
runtimeType Type
A representation of the runtime type of the object.
no setterinherited

Methods

clone() MpmcReceiver<T>
close() → void
noSuchMethod(Invocation invocation) → dynamic
Invoked when a nonexistent method or property is accessed.
inherited
recv() Future<RecvResult<T>>
recvAll({Duration idle = Duration.zero, int? max}) Future<Iterable<T>>

Available on Receiver<T>, provided by the ReceiverDrainX extension

Receive multiple values with batching and idle timeout.
recvCancelable() → (Future<RecvResult<T>>, void Function())
recvTimeout(Duration d) Future<RecvResult<T>>

Available on Receiver<T>, provided by the ReceiverTimeoutX extension

Receive a value with a timeout to prevent indefinite waiting.
stream() Stream<T>
toBroadcastStream({bool waitForListeners = false, bool stopWhenNoListeners = true, bool closeReceiverOnDone = false, bool sync = false}) Stream<T>

Available on KeepAliveReceiver<T>, provided by the StreamReceiverX extension

Convert a channel receiver to a broadcast stream for multiple listeners.
toString() String
A string representation of this object.
inherited
tryRecv() RecvResult<T>
tryRecvAll({int? max}) Iterable<T>

Available on Receiver<T>, provided by the ReceiverDrainX extension

Drain all immediately available values without waiting.

Operators

operator ==(Object other) bool
The equality operator.
inherited