Mpsc class
MPSC (Multiple-Producer Single-Consumer) channels - The workhorse of concurrent systems.
The most versatile channel type for fan-in patterns where multiple producers send data to a single consumer. Combines thread-safety for concurrent producers with single-consumer optimizations for maximum throughput.
Core Strengths
- Thread-safe producers: Multiple threads can send concurrently
- Single-consumer optimization: No consumer-side coordination overhead
- Rich buffering strategies: Unbounded, bounded, rendezvous, latest-only
- Advanced drop policies: Handle backpressure with oldest/newest dropping
- Producer cloning: Create multiple sender handles from one channel
- Flexible capacity: From 0 (rendezvous) to unlimited (unbounded)
When to Use MPSC
- Event aggregation: Multiple event sources → single event loop
- Task queues: Multiple workers → single dispatcher
- Logging systems: Multiple threads → single log writer
- UI updates: Multiple components → single UI thread
- Data collection: Multiple sensors → single processor
- Request handling: Multiple clients → single server
Performance Characteristics
- Good throughput: ~535-950ns per operation (see benchmarks for details)
- Latest-only mode: Extremely fast ~7ns per operation for coalescing buffers
- Memory efficient: Chunked buffers available for unbounded channels
- Scalable: Designed to handle multiple concurrent producers
- Efficient design: Optimized for producer-consumer scenarios
Usage Patterns
Event aggregation system:
// Multiple event sources feeding a single processor
final (eventTx, eventRx) = Mpsc.unbounded<AppEvent>();
// Multiple producers
final userTx = eventTx.clone();
final networkTx = eventTx.clone();
final systemTx = eventTx.clone();
// Producers
userTx.send(AppEvent.userClick(button));
networkTx.send(AppEvent.dataReceived(payload));
systemTx.send(AppEvent.lowMemory());
// Single event loop (consumer)
await for (final event in eventRx.stream()) {
await handleEvent(event);
}
High-throughput logging:
// Bounded with drop policy for reliability
final (logTx, logRx) = Mpsc.channel<LogEntry>(
capacity: 10000,
policy: DropPolicy.oldest, // Don't block on full buffer
onDrop: (entry) => _droppedLogs++,
);
// Multiple threads logging
logTx.send(LogEntry.info('Processing started'));
logTx.send(LogEntry.error('Database connection failed'));
// Single log writer
await for (final entry in logRx.stream()) {
await writeToFile(entry);
}
Real-time data processing:
// Latest-only for real-time updates
final (statusTx, statusRx) = Mpsc.latest<SystemStatus>();
// Multiple monitoring threads
statusTx.send(SystemStatus(cpu: 45, memory: 60));
statusTx.send(SystemStatus(cpu: 50, memory: 65)); // Overwrites previous
// UI updates (always gets latest state)
await for (final status in statusRx.stream()) {
updateUI(status);
}
Request-response pattern:
// Task queue with bounded capacity
final (taskTx, taskRx) = Mpsc.bounded<WorkItem>(capacity: 1000);
// Multiple client threads
for (int i = 0; i < clientCount; i++) {
final tx = taskTx.clone();
clients.add(Client(tx));
}
// Single worker thread
await for (final task in taskRx.stream()) {
final result = await processTask(task);
task.responseChannel.send(result);
}
Constructors
- Mpsc.new()
Properties
- hashCode → int
-
The hash code for this object.
no setterinherited
- runtimeType → Type
-
A representation of the runtime type of the object.
no setterinherited
Methods
-
noSuchMethod(
Invocation invocation) → dynamic -
Invoked when a nonexistent method or property is accessed.
inherited
-
toString(
) → String -
A string representation of this object.
inherited
Operators
-
operator ==(
Object other) → bool -
The equality operator.
inherited
Static Methods
-
bounded<
T> (int capacity, {String? metricsId}) → (MpscSender< T> , MpscReceiver<T> ) - Creates a bounded MPSC channel with fixed capacity.
-
channel<
T> ({int? capacity, DropPolicy policy = DropPolicy.block, OnDrop< T> ? onDrop, bool chunked = true, String? metricsId}) → (MpscSender<T> , MpscReceiver<T> ) - Creates an MPSC channel with advanced drop policies for backpressure handling.
-
latest<
T> ({String? metricsId}) → (MpscSender< T> , MpscReceiver<T> ) - Creates a latest-only MPSC channel for real-time state updates.
-
unbounded<
T> ({bool chunked = true, String? metricsId}) → (MpscSender< T> , MpscReceiver<T> ) - Creates an unbounded MPSC channel with unlimited capacity.