channel<T> static method
(MpmcSender<T> , MpmcReceiver<T> )
channel<T>({
- int? capacity,
- DropPolicy policy = DropPolicy.block,
- OnDrop<
T> ? onDrop, - bool chunked = true,
- String? metricsId,
Creates an MPMC channel with advanced drop policies for backpressure handling.
When the buffer fills up, different policies determine how to handle new messages:
DropPolicy.block
: Block producers until space available (default)DropPolicy.oldest
: Drop oldest message to make room for new oneDropPolicy.newest
: Drop incoming message (appears successful but discarded)
Parameters:
capacity
: Buffer size (null = unbounded, 0 = rendezvous)policy
: How to handle buffer overflowonDrop
: Optional callback when messages are droppedchunked
: Use chunked buffer for unbounded channels
Example - Reliable logging with overflow protection:
final (logTx, logRx) = Mpmc.channel<LogEntry>(
capacity: 10000,
policy: DropPolicy.oldest, // Keep recent logs
onDrop: (dropped) => print('Dropped log: ${dropped.message}'),
);
// High-volume logging won't block
void logEvent(LogEntry entry) {
logTx.trySend(entry); // Never blocks, may drop oldest
}
// Log processor gets most recent entries
await for (final entry in logRx.stream()) {
await persistLog(entry);
}
Example - Real-time data with newest-drop:
final (dataTx, dataRx) = Mpmc.channel<SensorReading>(
capacity: 100,
policy: DropPolicy.newest, // Preserve existing data
onDrop: (reading) => _droppedCount++,
);
// Sensor updates won't block or disrupt processing
void updateSensor(SensorReading reading) {
dataTx.trySend(reading); // May drop if consumer is slow
}
Implementation
static (MpmcSender<T>, MpmcReceiver<T>) channel<T>({
int? capacity,
DropPolicy policy = DropPolicy.block,
OnDrop<T>? onDrop,
bool chunked = true,
String? metricsId,
}) {
final inner = capacity == null
? chunked
? ChunkedBuffer<T>()
: UnboundedBuffer<T>()
: (capacity == 0)
? RendezvousBuffer<T>()
: BoundedBuffer<T>(capacity: capacity);
final bool usePolicy =
capacity != null && capacity > 0 && policy != DropPolicy.block;
final ChannelBuffer<T> buf = usePolicy
? PolicyBufferWrapper<T>(inner, policy: policy, onDrop: onDrop)
: inner;
final core = _MpmcCore<T>(buf, metricsId: metricsId);
final tx = core.attachSender((c) => MpmcSender<T>._(c));
final rx = core.attachReceiver((c) => MpmcReceiver<T>._(c));
return (tx, rx);
}