Mpmc class final

MPMC (Multiple-Producer Multiple-Consumer) channels - Maximum flexibility for complex patterns.

The most general channel type supporting concurrent producers AND consumers. Messages are distributed among consumers using competitive consumption - each message is received by exactly one consumer (no broadcasting).

Core Characteristics

  • Multiple producers: Thread-safe concurrent sending
  • Multiple consumers: Competitive message consumption (not broadcast)
  • FIFO ordering: Messages maintain send order across all consumers
  • Load balancing: Work automatically distributed among available consumers
  • Full cloning: Both senders and receivers can be cloned
  • Fair scheduling: Consumers compete fairly for messages

When to Use MPMC

  • Worker pools: Multiple producers → multiple worker consumers
  • Load balancing: Distribute work across multiple processors
  • Pipeline stages: Multi-stage processing with multiple workers per stage
  • Distributed systems: Multiple clients → multiple servers
  • Parallel processing: Fan-out work distribution patterns
  • Resource pools: Multiple requesters → multiple resource providers

Important: Competitive Consumption

Unlike broadcast channels, each message goes to exactly one consumer. This enables load balancing but means consumers compete for messages.

Performance Considerations

  • Coordination overhead: Higher than MPSC due to consumer coordination
  • Good throughput: ~525-937ns per operation (see benchmarks for details)
  • Latest-only mode: Very fast ~7-8ns per operation for coalescing buffers
  • Scalability: Performance may degrade with many consumers
  • Memory efficiency: Shared buffer across all consumers

Usage Patterns

Worker pool architecture:

// Create work distribution system
final (workTx, workRx) = Mpmc.bounded<Task>(capacity: 1000);

// Multiple job producers
final producers = <MpmcSender<Task>>[];
for (int i = 0; i < clientCount; i++) {
  producers.add(workTx.clone());
}

// Multiple worker consumers
final workers = <MpmcReceiver<Task>>[];
for (int i = 0; i < workerCount; i++) {
  workers.add(workRx.clone());
}

// Each producer submits work
for (final producer in producers) {
  producer.send(Task('Process data $i'));
}

// Each worker competes for tasks
for (int i = 0; i < workerCount; i++) {
  final worker = workers[i];
  Thread.run(() async {
    await for (final task in worker.stream()) {
      await processTask(task); // Each task processed once
    }
  });
}

Pipeline with parallel stages:

// Stage 1: Data ingestion (1 producer → multiple preprocessors)
final (dataTx, dataRx) = Mpmc.unbounded<RawData>();

// Stage 2: Preprocessing (multiple → multiple)
final (processedTx, processedRx) = Mpmc.bounded<ProcessedData>(capacity: 500);

// Stage 3: Output (multiple → 1)
final (outputTx, outputRx) = Mpmc.latest<Result>();

// Multiple preprocessors compete for raw data
for (int i = 0; i < preprocessorCount; i++) {
  final dataWorker = dataRx.clone();
  final processedProducer = processedTx.clone();

  Thread.run(() async {
    await for (final raw in dataWorker.stream()) {
      final processed = await preprocess(raw);
      await processedProducer.send(processed);
    }
  });
}

// Multiple final processors compete for processed data
for (int i = 0; i < finalizerCount; i++) {
  final processedWorker = processedRx.clone();
  final resultProducer = outputTx.clone();

  Thread.run(() async {
    await for (final processed in processedWorker.stream()) {
      final result = await finalize(processed);
      await resultProducer.send(result);
    }
  });
}

Distributed request processing:

// Request distribution system
final (requestTx, requestRx) = Mpmc.channel<Request>(
  capacity: 2000,
  policy: DropPolicy.oldest, // Handle overload gracefully
);

// Multiple clients sending requests
for (final client in clients) {
  final clientTx = requestTx.clone();
  client.setRequestSender(clientTx);
}

// Multiple server instances processing requests
for (int i = 0; i < serverCount; i++) {
  final serverRx = requestRx.clone();
  servers[i] = RequestServer(serverRx);

  Thread.run(() async {
    await for (final request in serverRx.stream()) {
      final response = await servers[i].handle(request);
      await request.responseChannel.send(response);
    }
  });
}

Constructors

Mpmc.new()

Properties

hashCode int
The hash code for this object.
no setterinherited
runtimeType Type
A representation of the runtime type of the object.
no setterinherited

Methods

noSuchMethod(Invocation invocation) → dynamic
Invoked when a nonexistent method or property is accessed.
inherited
toString() String
A string representation of this object.
inherited

Operators

operator ==(Object other) bool
The equality operator.
inherited

Static Methods

bounded<T>(int capacity, {String? metricsId}) → (MpmcSender<T>, MpmcReceiver<T>)
Creates a bounded MPMC channel with fixed capacity.
channel<T>({int? capacity, DropPolicy policy = DropPolicy.block, OnDrop<T>? onDrop, bool chunked = true, String? metricsId}) → (MpmcSender<T>, MpmcReceiver<T>)
Creates an MPMC channel with advanced drop policies for backpressure handling.
latest<T>({String? metricsId}) → (MpmcSender<T>, MpmcReceiver<T>)
Creates a latest-only MPMC channel with competitive consumption.
unbounded<T>({bool chunked = true, String? metricsId}) → (MpmcSender<T>, MpmcReceiver<T>)
Creates an unbounded MPMC channel with unlimited capacity.