redirectToSender method
Redirect all stream data into a channel sender.
Efficiently pipes stream data into channels with configurable backpressure handling. Essential for integrating HTTP responses, file streams, or any existing Stream-based API with channel processing.
Parameters:
tx
: The channel sender to receive stream datadropWhenFull
: Iftrue
, drop items when channel is full (for bounded channels)closeSenderOnDone
: Iftrue
, close sender when stream completes
HTTP Integration:
// Stream HTTP responses into channel processing
final (tx, rx) = XChannel.mpsc<HttpResponse>(capacity: 100);
// Redirect HTTP stream to channel
final responseStream = httpClient.get(url).asStream();
await responseStream.redirectToSender(tx);
// Process responses in channel consumer
await for (final response in rx.stream()) {
final data = await response.readAsString();
await processData(data);
}
File Processing:
// Stream file lines into batch processor
final (tx, rx) = XChannel.mpsc<String>(capacity: 1000);
final fileStream = File('large_file.txt')
.openRead()
.transform(utf8.decoder)
.transform(LineSplitter());
// Redirect file lines to channel
await fileStream.redirectToSender(tx);
// Batch process lines
final batch = await rx.recvAll(max: 100);
await processBatch(batch.toList());
Backpressure Strategies:
// Strategy 1: Wait for space (reliable)
await stream.redirectToSender(tx, dropWhenFull: false);
// Strategy 2: Drop on full (high throughput)
await stream.redirectToSender(tx, dropWhenFull: true);
Implementation
Future<void> redirectToSender(
KeepAliveSender<T> tx, {
bool dropWhenFull = false,
bool closeSenderOnDone = true,
}) async {
try {
await for (final v in this) {
final r = tx.trySend(v);
if (r.hasSend) continue;
if (r.isDisconnected) break;
if (dropWhenFull) continue;
final r1 = await tx.send(v);
if (r1.isDisconnected) break;
}
} finally {
if (closeSenderOnDone && !tx.isDisconnected) {
tx.close();
}
}
}