Mpmc class final
MPMC (Multiple-Producer Multiple-Consumer) channels - Maximum flexibility for complex patterns.
The most general channel type supporting concurrent producers AND consumers. Messages are distributed among consumers using competitive consumption - each message is received by exactly one consumer (no broadcasting).
Core Characteristics
- Multiple producers: Thread-safe concurrent sending
- Multiple consumers: Competitive message consumption (not broadcast)
- FIFO ordering: Messages maintain send order across all consumers
- Load balancing: Work automatically distributed among available consumers
- Full cloning: Both senders and receivers can be cloned
- Fair scheduling: Consumers compete fairly for messages
When to Use MPMC
- Worker pools: Multiple producers → multiple worker consumers
- Load balancing: Distribute work across multiple processors
- Pipeline stages: Multi-stage processing with multiple workers per stage
- Distributed systems: Multiple clients → multiple servers
- Parallel processing: Fan-out work distribution patterns
- Resource pools: Multiple requesters → multiple resource providers
Important: Competitive Consumption
Unlike broadcast channels, each message goes to exactly one consumer. This enables load balancing but means consumers compete for messages.
Performance Considerations
- Coordination overhead: Higher than MPSC due to consumer coordination
- Good throughput: ~525-937ns per operation (see benchmarks for details)
- Latest-only mode: Very fast ~7-8ns per operation for coalescing buffers
- Scalability: Performance may degrade with many consumers
- Memory efficiency: Shared buffer across all consumers
Usage Patterns
Worker pool architecture:
// Create work distribution system
final (workTx, workRx) = Mpmc.bounded<Task>(capacity: 1000);
// Multiple job producers
final producers = <MpmcSender<Task>>[];
for (int i = 0; i < clientCount; i++) {
producers.add(workTx.clone());
}
// Multiple worker consumers
final workers = <MpmcReceiver<Task>>[];
for (int i = 0; i < workerCount; i++) {
workers.add(workRx.clone());
}
// Each producer submits work
for (final producer in producers) {
producer.send(Task('Process data $i'));
}
// Each worker competes for tasks
for (int i = 0; i < workerCount; i++) {
final worker = workers[i];
Thread.run(() async {
await for (final task in worker.stream()) {
await processTask(task); // Each task processed once
}
});
}
Pipeline with parallel stages:
// Stage 1: Data ingestion (1 producer → multiple preprocessors)
final (dataTx, dataRx) = Mpmc.unbounded<RawData>();
// Stage 2: Preprocessing (multiple → multiple)
final (processedTx, processedRx) = Mpmc.bounded<ProcessedData>(capacity: 500);
// Stage 3: Output (multiple → 1)
final (outputTx, outputRx) = Mpmc.latest<Result>();
// Multiple preprocessors compete for raw data
for (int i = 0; i < preprocessorCount; i++) {
final dataWorker = dataRx.clone();
final processedProducer = processedTx.clone();
Thread.run(() async {
await for (final raw in dataWorker.stream()) {
final processed = await preprocess(raw);
await processedProducer.send(processed);
}
});
}
// Multiple final processors compete for processed data
for (int i = 0; i < finalizerCount; i++) {
final processedWorker = processedRx.clone();
final resultProducer = outputTx.clone();
Thread.run(() async {
await for (final processed in processedWorker.stream()) {
final result = await finalize(processed);
await resultProducer.send(result);
}
});
}
Distributed request processing:
// Request distribution system
final (requestTx, requestRx) = Mpmc.channel<Request>(
capacity: 2000,
policy: DropPolicy.oldest, // Handle overload gracefully
);
// Multiple clients sending requests
for (final client in clients) {
final clientTx = requestTx.clone();
client.setRequestSender(clientTx);
}
// Multiple server instances processing requests
for (int i = 0; i < serverCount; i++) {
final serverRx = requestRx.clone();
servers[i] = RequestServer(serverRx);
Thread.run(() async {
await for (final request in serverRx.stream()) {
final response = await servers[i].handle(request);
await request.responseChannel.send(response);
}
});
}
Constructors
- Mpmc.new()
Properties
- hashCode → int
-
The hash code for this object.
no setterinherited
- runtimeType → Type
-
A representation of the runtime type of the object.
no setterinherited
Methods
-
noSuchMethod(
Invocation invocation) → dynamic -
Invoked when a nonexistent method or property is accessed.
inherited
-
toString(
) → String -
A string representation of this object.
inherited
Operators
-
operator ==(
Object other) → bool -
The equality operator.
inherited
Static Methods
-
bounded<
T> (int capacity, {String? metricsId}) → (MpmcSender< T> , MpmcReceiver<T> ) - Creates a bounded MPMC channel with fixed capacity.
-
channel<
T> ({int? capacity, DropPolicy policy = DropPolicy.block, OnDrop< T> ? onDrop, bool chunked = true, String? metricsId}) → (MpmcSender<T> , MpmcReceiver<T> ) - Creates an MPMC channel with advanced drop policies for backpressure handling.
-
latest<
T> ({String? metricsId}) → (MpmcSender< T> , MpmcReceiver<T> ) - Creates a latest-only MPMC channel with competitive consumption.
-
unbounded<
T> ({bool chunked = true, String? metricsId}) → (MpmcSender< T> , MpmcReceiver<T> ) - Creates an unbounded MPMC channel with unlimited capacity.