MpmcReceiver<T> class
final
Multi-consumer receiver with competitive message consumption.
Competes with other receivers for messages in a fair, load-balancing manner. Each message goes to exactly one consumer - this enables natural work distribution but requires careful coordination between consumers.
Competitive Consumption Semantics
- One winner per message: Each message received by exactly one consumer
- Fair competition: No consumer starvation under normal conditions
- Load balancing: Work automatically distributed among active consumers
- Clone independence: Each cloned receiver competes independently
- Stream single-use: Each receiver's stream can only be used once
Performance Notes
- Coordination overhead: Additional overhead compared to MPSC
- Fair competition: Designed to prevent consumer monopolization
- Reasonable scaling: Performance scales reasonably with consumer count
- Shared buffer: Memory efficient with shared buffer design
Usage Guidelines
- Worker pools: Perfect for distributing work among multiple workers
- Load balancing: Natural distribution without explicit load balancer
- Parallel processing: Scale processing by adding more consumer instances
- Resource pooling: Multiple consumers can serve different resource types
Example - Worker pool with competitive consumption:
final (workTx, workRx) = Mpmc.bounded<Task>(capacity: 1000);
// Create worker pool with competing consumers
final workers = <MpmcReceiver<Task>>[];
for (int i = 0; i < workerCount; i++) {
workers.add(workRx.clone());
}
workRx.close(); // Close original handle
// Each worker competes for tasks
for (int workerId = 0; workerId < workerCount; workerId++) {
final worker = workers[workerId];
Thread.run(() async {
await for (final task in worker.stream()) {
// Only this worker processes this task
final result = await processTask(task, workerId);
await task.resultChannel.send(result);
}
});
}
// Producers send work - automatically load balanced
for (final task in taskList) {
await workTx.send(task); // Goes to one available worker
}
Example - Multi-tier processing pipeline:
// Tier 2: Multiple consumers compete for processed data
final (tier2Tx, tier2Rx) = Mpmc.channel<ProcessedData>(capacity: 500);
// Multiple tier-2 processors (competing consumers)
final tier2Consumers = <MpmcReceiver<ProcessedData>>[];
for (int i = 0; i < tier2ProcessorCount; i++) {
tier2Consumers.add(tier2Rx.clone());
}
// Each tier-2 processor handles different data
for (int i = 0; i < tier2ProcessorCount; i++) {
final consumer = tier2Consumers[i];
final processorId = i;
Thread.run(() async {
await for (final data in consumer.stream()) {
// Competitive consumption - each data goes to one processor
final result = await processTier2(data, processorId);
await tier3Channel.send(result);
}
});
}
Example - Request handling with failover:
final (requestTx, requestRx) = Mpmc.unbounded<Request>();
// Multiple server instances (competing consumers)
final serverInstances = <ServerInstance>[];
for (int i = 0; i < serverCount; i++) {
final serverRx = requestRx.clone();
serverInstances.add(ServerInstance(serverRx, serverId: i));
Thread.run(() async {
try {
await for (final request in serverRx.stream()) {
// Only one server handles each request
final response = await serverInstances[i].handle(request);
await request.responseChannel.send(response);
}
} catch (e) {
// This server failed - others continue competing
logger.error('Server $i failed: $e');
serverRx.close();
}
});
}
- Available extensions
Properties
- hashCode → int
-
The hash code for this object.
no setterinherited
- isDisconnected → bool
-
no setter
- runtimeType → Type
-
A representation of the runtime type of the object.
no setterinherited
Methods
-
clone(
) → MpmcReceiver< T> -
close(
) → void -
noSuchMethod(
Invocation invocation) → dynamic -
Invoked when a nonexistent method or property is accessed.
inherited
-
recv(
) → Future< RecvResult< T> > -
recvAll(
{Duration idle = Duration.zero, int? max}) → Future< Iterable< T> > -
Available on Receiver<
Receive multiple values with batching and idle timeout.T> , provided by the ReceiverDrainX extension -
recvCancelable(
) → (Future< RecvResult< , void Function())T> > -
recvTimeout(
Duration d) → Future< RecvResult< T> > -
Available on Receiver<
Receive a value with a timeout to prevent indefinite waiting.T> , provided by the ReceiverTimeoutX extension -
stream(
) → Stream< T> -
toBroadcastStream(
{bool waitForListeners = false, bool stopWhenNoListeners = true, bool closeReceiverOnDone = false, bool sync = false}) → Stream< T> -
Available on KeepAliveReceiver<
Convert a channel receiver to a broadcast stream for multiple listeners.T> , provided by the StreamReceiverX extension -
toString(
) → String -
A string representation of this object.
inherited
-
tryRecv(
) → RecvResult< T> -
tryRecvAll(
{int? max}) → Iterable< T> -
Available on Receiver<
Drain all immediately available values without waiting.T> , provided by the ReceiverDrainX extension
Operators
-
operator ==(
Object other) → bool -
The equality operator.
inherited