Mpsc class

MPSC (Multiple-Producer Single-Consumer) channels - The workhorse of concurrent systems.

The most versatile channel type for fan-in patterns where multiple producers send data to a single consumer. Combines thread-safety for concurrent producers with single-consumer optimizations for maximum throughput.

Core Strengths

  • Thread-safe producers: Multiple threads can send concurrently
  • Single-consumer optimization: No consumer-side coordination overhead
  • Rich buffering strategies: Unbounded, bounded, rendezvous, latest-only
  • Advanced drop policies: Handle backpressure with oldest/newest dropping
  • Producer cloning: Create multiple sender handles from one channel
  • Flexible capacity: From 0 (rendezvous) to unlimited (unbounded)

When to Use MPSC

  • Event aggregation: Multiple event sources → single event loop
  • Task queues: Multiple workers → single dispatcher
  • Logging systems: Multiple threads → single log writer
  • UI updates: Multiple components → single UI thread
  • Data collection: Multiple sensors → single processor
  • Request handling: Multiple clients → single server

Performance Characteristics

  • Good throughput: ~535-950ns per operation (see benchmarks for details)
  • Latest-only mode: Extremely fast ~7ns per operation for coalescing buffers
  • Memory efficient: Chunked buffers available for unbounded channels
  • Scalable: Designed to handle multiple concurrent producers
  • Efficient design: Optimized for producer-consumer scenarios

Usage Patterns

Event aggregation system:

// Multiple event sources feeding a single processor
final (eventTx, eventRx) = Mpsc.unbounded<AppEvent>();

// Multiple producers
final userTx = eventTx.clone();
final networkTx = eventTx.clone();
final systemTx = eventTx.clone();

// Producers
userTx.send(AppEvent.userClick(button));
networkTx.send(AppEvent.dataReceived(payload));
systemTx.send(AppEvent.lowMemory());

// Single event loop (consumer)
await for (final event in eventRx.stream()) {
  await handleEvent(event);
}

High-throughput logging:

// Bounded with drop policy for reliability
final (logTx, logRx) = Mpsc.channel<LogEntry>(
  capacity: 10000,
  policy: DropPolicy.oldest, // Don't block on full buffer
  onDrop: (entry) => _droppedLogs++,
);

// Multiple threads logging
logTx.send(LogEntry.info('Processing started'));
logTx.send(LogEntry.error('Database connection failed'));

// Single log writer
await for (final entry in logRx.stream()) {
  await writeToFile(entry);
}

Real-time data processing:

// Latest-only for real-time updates
final (statusTx, statusRx) = Mpsc.latest<SystemStatus>();

// Multiple monitoring threads
statusTx.send(SystemStatus(cpu: 45, memory: 60));
statusTx.send(SystemStatus(cpu: 50, memory: 65)); // Overwrites previous

// UI updates (always gets latest state)
await for (final status in statusRx.stream()) {
  updateUI(status);
}

Request-response pattern:

// Task queue with bounded capacity
final (taskTx, taskRx) = Mpsc.bounded<WorkItem>(capacity: 1000);

// Multiple client threads
for (int i = 0; i < clientCount; i++) {
  final tx = taskTx.clone();
  clients.add(Client(tx));
}

// Single worker thread
await for (final task in taskRx.stream()) {
  final result = await processTask(task);
  task.responseChannel.send(result);
}

Constructors

Mpsc.new()

Properties

hashCode int
The hash code for this object.
no setterinherited
runtimeType Type
A representation of the runtime type of the object.
no setterinherited

Methods

noSuchMethod(Invocation invocation) → dynamic
Invoked when a nonexistent method or property is accessed.
inherited
toString() String
A string representation of this object.
inherited

Operators

operator ==(Object other) bool
The equality operator.
inherited

Static Methods

bounded<T>(int capacity, {String? metricsId}) → (MpscSender<T>, MpscReceiver<T>)
Creates a bounded MPSC channel with fixed capacity.
channel<T>({int? capacity, DropPolicy policy = DropPolicy.block, OnDrop<T>? onDrop, bool chunked = true, String? metricsId}) → (MpscSender<T>, MpscReceiver<T>)
Creates an MPSC channel with advanced drop policies for backpressure handling.
latest<T>({String? metricsId}) → (MpscSender<T>, MpscReceiver<T>)
Creates a latest-only MPSC channel for real-time state updates.
unbounded<T>({bool chunked = true, String? metricsId}) → (MpscSender<T>, MpscReceiver<T>)
Creates an unbounded MPSC channel with unlimited capacity.