bounded<T> static method
Creates a bounded MPMC channel with fixed capacity.
Producers may block when the buffer is full. Provides natural backpressure to prevent memory exhaustion in high-load scenarios.
Special cases:
capacity = 0
: Rendezvous channel (direct handoff)capacity > 0
: Fixed-size buffer with blocking on full
Parameters:
capacity
: Maximum number of messages to buffer (≥ 0)
Example - Task queue with backpressure:
// Limited work queue prevents memory exhaustion
final (taskTx, taskRx) = Mpmc.bounded<WorkItem>(capacity: 1000);
// Producers block when queue is full (natural backpressure)
Future<void> submitTask(WorkItem task) async {
final result = await taskTx.send(task);
if (result is SendErrorDisconnected) {
throw StateError('Worker pool shut down');
}
}
// Consumer processes at sustainable rate
await for (final task in taskRx.stream()) {
await processWorkItem(task);
}
Example - Rendezvous (capacity = 0):
// Direct producer-consumer handoff
final (tx, rx) = Mpmc.bounded<Message>(capacity: 0);
// Producer waits for consumer to be ready
await tx.send(message); // Blocks until consumer calls recv()
Implementation
static (MpmcSender<T>, MpmcReceiver<T>) bounded<T>(int capacity,
{String? metricsId}) {
if (capacity < 0) {
throw ArgumentError.value(capacity, 'capacity', 'Must be >= 0');
}
final buf = (capacity == 0)
? RendezvousBuffer<T>()
: BoundedBuffer<T>(capacity: capacity);
final core = _MpmcCore<T>(buf, metricsId: metricsId);
final tx = core.attachSender((c) => MpmcSender<T>._(c));
final rx = core.attachReceiver((c) => MpmcReceiver._(c));
return (tx, rx);
}