MpscReceiver<T> class final

Single consumer for MPSC channels with efficient receive operations.

Receives messages from multiple producers in FIFO order. Designed for single-consumer scenarios with good performance.

Key Features

  • Single consumer: Optimized for one consumer, multiple producers
  • FIFO ordering: Messages received in first-in, first-out order
  • Batching support: Extensions available for bulk operations
  • Stream integration: Supports async iteration patterns

Usage Guidelines

  • Single consumer only: Only one thread should use this receiver
  • Single-subscription stream: stream can only be called once
  • Graceful shutdown: Monitor isDisconnected for end-of-data detection
  • Error handling: Check RecvResult for different completion states

Example - Event processing loop:

final (eventTx, eventRx) = Mpsc.unbounded<Event>();

// Event processing with different patterns
class EventProcessor {
  Future<void> processEvents() async {
    // Pattern 1: Stream-based (recommended)
    await for (final event in eventRx.stream()) {
      await handleEvent(event);
    }

    // Pattern 2: Manual loop with error handling
    while (true) {
      switch (await eventRx.recv()) {
        case RecvOk(value: final event):
          await handleEvent(event);
        case RecvErrorDisconnected():
          print('All producers finished');
          return;
        case RecvErrorTimeout():
          print('Timeout waiting for events');
          continue;
      }
    }
  }
}

Example - Batch processing:

// Use drain extensions for efficient batch processing
final batch = <LogEntry>[];
await eventRx.drainInto(batch, maxItems: 100);
if (batch.isNotEmpty) {
  await processBatch(batch);
}
Available extensions

Properties

hashCode int
The hash code for this object.
no setterinherited
isDisconnected bool
no setter
runtimeType Type
A representation of the runtime type of the object.
no setterinherited

Methods

close() → void
noSuchMethod(Invocation invocation) → dynamic
Invoked when a nonexistent method or property is accessed.
inherited
recv() Future<RecvResult<T>>
recvAll({Duration idle = Duration.zero, int? max}) Future<Iterable<T>>

Available on Receiver<T>, provided by the ReceiverDrainX extension

Receive multiple values with batching and idle timeout.
recvCancelable() → (Future<RecvResult<T>>, void Function())
recvTimeout(Duration d) Future<RecvResult<T>>

Available on Receiver<T>, provided by the ReceiverTimeoutX extension

Receive a value with a timeout to prevent indefinite waiting.
stream() Stream<T>
toBroadcastStream({bool waitForListeners = false, bool stopWhenNoListeners = true, bool closeReceiverOnDone = false, bool sync = false}) Stream<T>

Available on KeepAliveReceiver<T>, provided by the StreamReceiverX extension

Convert a channel receiver to a broadcast stream for multiple listeners.
toString() String
A string representation of this object.
inherited
tryRecv() RecvResult<T>
tryRecvAll({int? max}) Iterable<T>

Available on Receiver<T>, provided by the ReceiverDrainX extension

Drain all immediately available values without waiting.

Operators

operator ==(Object other) bool
The equality operator.
inherited