toBroadcastStream method

Stream<T> toBroadcastStream({
  1. bool waitForListeners = false,
  2. bool stopWhenNoListeners = true,
  3. bool closeReceiverOnDone = false,
  4. bool sync = false,
})

Convert a channel receiver to a broadcast stream for multiple listeners.

Perfect for integrating channels with Flutter widgets that expect broadcast streams, or when multiple components need to listen to the same channel data.

Parameters:

  • waitForListeners: If true, doesn't start consuming until first listener
  • stopWhenNoListeners: If true, pauses consumption when no listeners
  • closeReceiverOnDone: If true, closes the receiver when stream ends
  • sync: If true, events are delivered synchronously

Flutter Integration:

// Progress updates in multiple widgets
final (tx, rx) = XChannel.mpscLatest<double>();
final broadcast = rx.toBroadcastStream();

// Multiple StreamBuilders can listen
StreamBuilder<double>(
  stream: broadcast,
  builder: (context, snap) => ProgressIndicator(value: snap.data),
)

StreamBuilder<double>(
  stream: broadcast,
  builder: (context, snap) => Text('Progress: ${(snap.data ?? 0) * 100}%'),
)

Reactive Programming:

// Event processing with multiple subscribers
final (tx, rx) = XChannel.mpsc<UserEvent>();
final broadcast = rx.toBroadcastStream();

// Analytics subscriber
broadcast.listen((event) => analytics.track(event));

// UI updates subscriber
broadcast.listen((event) => updateUI(event));

// Logging subscriber
broadcast.listen((event) => logger.info('Event: $event'));

Resource Management:

// Efficient resource usage - pause when no listeners
final broadcast = rx.toBroadcastStream(
  waitForListeners: true,    // Don't start until needed
  stopWhenNoListeners: true, // Pause when unused
);

Implementation

Stream<T> toBroadcastStream({
  bool waitForListeners = false,
  bool stopWhenNoListeners = true,
  bool closeReceiverOnDone = false,
  bool sync = false,
}) {
  final source = stream();
  StreamSubscription<T>? sub;
  late final StreamController<T> ctrl;

  final bool shouldPause = waitForListeners && stopWhenNoListeners;

  void startIfNeeded() {
    if (sub != null) return;
    sub = source.listen(
      (v) => ctrl.add(v),
      onError: ctrl.addError,
      onDone: () async {
        if (closeReceiverOnDone && !isDisconnected) close();
        await ctrl.close();
      },
      cancelOnError: false,
    );
    if (shouldPause && !ctrl.hasListener) {
      sub!.pause();
    }
  }

  Future<void> pauseIfNeeded() async {
    final s = sub;
    if (s != null && !s.isPaused) s.pause();
  }

  Future<void> resumeIfNeeded() async {
    final s = sub;
    if (s != null && s.isPaused) s.resume();
  }

  ctrl = StreamController<T>.broadcast(
    sync: sync,
    onListen: () {
      startIfNeeded();
      if (shouldPause) resumeIfNeeded();
    },
    onCancel: () async {
      if (shouldPause && !ctrl.hasListener) {
        return pauseIfNeeded();
      }
      return;
    },
  );

  if (!waitForListeners) {
    startIfNeeded();
  }

  return ctrl.stream;
}