toBroadcastStream method
Convert a channel receiver to a broadcast stream for multiple listeners.
Perfect for integrating channels with Flutter widgets that expect broadcast streams, or when multiple components need to listen to the same channel data.
Parameters:
waitForListeners
: Iftrue
, doesn't start consuming until first listenerstopWhenNoListeners
: Iftrue
, pauses consumption when no listenerscloseReceiverOnDone
: Iftrue
, closes the receiver when stream endssync
: Iftrue
, events are delivered synchronously
Flutter Integration:
// Progress updates in multiple widgets
final (tx, rx) = XChannel.mpscLatest<double>();
final broadcast = rx.toBroadcastStream();
// Multiple StreamBuilders can listen
StreamBuilder<double>(
stream: broadcast,
builder: (context, snap) => ProgressIndicator(value: snap.data),
)
StreamBuilder<double>(
stream: broadcast,
builder: (context, snap) => Text('Progress: ${(snap.data ?? 0) * 100}%'),
)
Reactive Programming:
// Event processing with multiple subscribers
final (tx, rx) = XChannel.mpsc<UserEvent>();
final broadcast = rx.toBroadcastStream();
// Analytics subscriber
broadcast.listen((event) => analytics.track(event));
// UI updates subscriber
broadcast.listen((event) => updateUI(event));
// Logging subscriber
broadcast.listen((event) => logger.info('Event: $event'));
Resource Management:
// Efficient resource usage - pause when no listeners
final broadcast = rx.toBroadcastStream(
waitForListeners: true, // Don't start until needed
stopWhenNoListeners: true, // Pause when unused
);
Implementation
Stream<T> toBroadcastStream({
bool waitForListeners = false,
bool stopWhenNoListeners = true,
bool closeReceiverOnDone = false,
bool sync = false,
}) {
final source = stream();
StreamSubscription<T>? sub;
late final StreamController<T> ctrl;
final bool shouldPause = waitForListeners && stopWhenNoListeners;
void startIfNeeded() {
if (sub != null) return;
sub = source.listen(
(v) => ctrl.add(v),
onError: ctrl.addError,
onDone: () async {
if (closeReceiverOnDone && !isDisconnected) close();
await ctrl.close();
},
cancelOnError: false,
);
if (shouldPause && !ctrl.hasListener) {
sub!.pause();
}
}
Future<void> pauseIfNeeded() async {
final s = sub;
if (s != null && !s.isPaused) s.pause();
}
Future<void> resumeIfNeeded() async {
final s = sub;
if (s != null && s.isPaused) s.resume();
}
ctrl = StreamController<T>.broadcast(
sync: sync,
onListen: () {
startIfNeeded();
if (shouldPause) resumeIfNeeded();
},
onCancel: () async {
if (shouldPause && !ctrl.hasListener) {
return pauseIfNeeded();
}
return;
},
);
if (!waitForListeners) {
startIfNeeded();
}
return ctrl.stream;
}