redirectToSender method

Future<void> redirectToSender(
  1. KeepAliveSender<T> tx, {
  2. bool dropWhenFull = false,
  3. bool closeSenderOnDone = true,
})

Redirect all stream data into a channel sender.

Efficiently pipes stream data into channels with configurable backpressure handling. Essential for integrating HTTP responses, file streams, or any existing Stream-based API with channel processing.

Parameters:

  • tx: The channel sender to receive stream data
  • dropWhenFull: If true, drop items when channel is full (for bounded channels)
  • closeSenderOnDone: If true, close sender when stream completes

HTTP Integration:

// Stream HTTP responses into channel processing
final (tx, rx) = XChannel.mpsc<HttpResponse>(capacity: 100);

// Redirect HTTP stream to channel
final responseStream = httpClient.get(url).asStream();
await responseStream.redirectToSender(tx);

// Process responses in channel consumer
await for (final response in rx.stream()) {
  final data = await response.readAsString();
  await processData(data);
}

File Processing:

// Stream file lines into batch processor
final (tx, rx) = XChannel.mpsc<String>(capacity: 1000);

final fileStream = File('large_file.txt')
  .openRead()
  .transform(utf8.decoder)
  .transform(LineSplitter());

// Redirect file lines to channel
await fileStream.redirectToSender(tx);

// Batch process lines
final batch = await rx.recvAll(max: 100);
await processBatch(batch.toList());

Backpressure Strategies:

// Strategy 1: Wait for space (reliable)
await stream.redirectToSender(tx, dropWhenFull: false);

// Strategy 2: Drop on full (high throughput)
await stream.redirectToSender(tx, dropWhenFull: true);

Implementation

Future<void> redirectToSender(
  KeepAliveSender<T> tx, {
  bool dropWhenFull = false,
  bool closeSenderOnDone = true,
}) async {
  try {
    await for (final v in this) {
      final r = tx.trySend(v);
      if (r.hasSend) continue;
      if (r.isDisconnected) break;
      if (dropWhenFull) continue;

      final r1 = await tx.send(v);
      if (r1.isDisconnected) break;
    }
  } finally {
    if (closeSenderOnDone && !tx.isDisconnected) {
      tx.close();
    }
  }
}