channel<T> static method

(MpmcSender<T>, MpmcReceiver<T>) channel<T>({
  1. int? capacity,
  2. DropPolicy policy = DropPolicy.block,
  3. OnDrop<T>? onDrop,
  4. bool chunked = true,
  5. String? metricsId,
})

Creates an MPMC channel with advanced drop policies for backpressure handling.

When the buffer fills up, different policies determine how to handle new messages:

  • DropPolicy.block: Block producers until space available (default)
  • DropPolicy.oldest: Drop oldest message to make room for new one
  • DropPolicy.newest: Drop incoming message (appears successful but discarded)

Parameters:

  • capacity: Buffer size (null = unbounded, 0 = rendezvous)
  • policy: How to handle buffer overflow
  • onDrop: Optional callback when messages are dropped
  • chunked: Use chunked buffer for unbounded channels

Example - Reliable logging with overflow protection:

final (logTx, logRx) = Mpmc.channel<LogEntry>(
  capacity: 10000,
  policy: DropPolicy.oldest, // Keep recent logs
  onDrop: (dropped) => print('Dropped log: ${dropped.message}'),
);

// High-volume logging won't block
void logEvent(LogEntry entry) {
  logTx.trySend(entry); // Never blocks, may drop oldest
}

// Log processor gets most recent entries
await for (final entry in logRx.stream()) {
  await persistLog(entry);
}

Example - Real-time data with newest-drop:

final (dataTx, dataRx) = Mpmc.channel<SensorReading>(
  capacity: 100,
  policy: DropPolicy.newest, // Preserve existing data
  onDrop: (reading) => _droppedCount++,
);

// Sensor updates won't block or disrupt processing
void updateSensor(SensorReading reading) {
  dataTx.trySend(reading); // May drop if consumer is slow
}

Implementation

static (MpmcSender<T>, MpmcReceiver<T>) channel<T>({
  int? capacity,
  DropPolicy policy = DropPolicy.block,
  OnDrop<T>? onDrop,
  bool chunked = true,
  String? metricsId,
}) {
  final inner = capacity == null
      ? chunked
          ? ChunkedBuffer<T>()
          : UnboundedBuffer<T>()
      : (capacity == 0)
          ? RendezvousBuffer<T>()
          : BoundedBuffer<T>(capacity: capacity);
  final bool usePolicy =
      capacity != null && capacity > 0 && policy != DropPolicy.block;
  final ChannelBuffer<T> buf = usePolicy
      ? PolicyBufferWrapper<T>(inner, policy: policy, onDrop: onDrop)
      : inner;
  final core = _MpmcCore<T>(buf, metricsId: metricsId);
  final tx = core.attachSender((c) => MpmcSender<T>._(c));
  final rx = core.attachReceiver((c) => MpmcReceiver<T>._(c));
  return (tx, rx);
}