bounded<T> static method

(MpmcSender<T>, MpmcReceiver<T>) bounded<T>(
  1. int capacity, {
  2. String? metricsId,
})

Creates a bounded MPMC channel with fixed capacity.

Producers may block when the buffer is full. Provides natural backpressure to prevent memory exhaustion in high-load scenarios.

Special cases:

  • capacity = 0: Rendezvous channel (direct handoff)
  • capacity > 0: Fixed-size buffer with blocking on full

Parameters:

  • capacity: Maximum number of messages to buffer (≥ 0)

Example - Task queue with backpressure:

// Limited work queue prevents memory exhaustion
final (taskTx, taskRx) = Mpmc.bounded<WorkItem>(capacity: 1000);

// Producers block when queue is full (natural backpressure)
Future<void> submitTask(WorkItem task) async {
  final result = await taskTx.send(task);
  if (result is SendErrorDisconnected) {
    throw StateError('Worker pool shut down');
  }
}

// Consumer processes at sustainable rate
await for (final task in taskRx.stream()) {
  await processWorkItem(task);
}

Example - Rendezvous (capacity = 0):

// Direct producer-consumer handoff
final (tx, rx) = Mpmc.bounded<Message>(capacity: 0);

// Producer waits for consumer to be ready
await tx.send(message); // Blocks until consumer calls recv()

Implementation

static (MpmcSender<T>, MpmcReceiver<T>) bounded<T>(int capacity,
    {String? metricsId}) {
  if (capacity < 0) {
    throw ArgumentError.value(capacity, 'capacity', 'Must be >= 0');
  }

  final buf = (capacity == 0)
      ? RendezvousBuffer<T>()
      : BoundedBuffer<T>(capacity: capacity);
  final core = _MpmcCore<T>(buf, metricsId: metricsId);
  final tx = core.attachSender((c) => MpmcSender<T>._(c));
  final rx = core.attachReceiver((c) => MpmcReceiver._(c));
  return (tx, rx);
}