cross_channel 0.8.2
cross_channel: ^0.8.2 copied to clipboard
High-performance & flexible channels for Dart/Flutter.
cross_channel #
Fast & flexible channels for Dart/Flutter.
Rust-style concurrency primitives: MPSC, MPMC, SPSC, OneShot, plus drop policies, latest-only, select, stream/isolate/web adapters.
Bench-tested · ~1.6–1.9 Mops/s
✨ Features #
- Drop policies:
block
,oldest
,newest
- LatestOnly buffers (coalesce to last)
- Select over Futures, Streams, Receivers & Timers
- Backpressure with bounded queues; rendezvous (
capacity=0
) - Notify primitive (lightweight wakeups:
notifyOne
,notifyAll
,notified
) - Stream adapters:
toBroadcastStream
,redirectToSender
- Isolate/Web adapters: request/reply,
ReceivePort
/MessagePort
bridges - Battle-tested: unit + stress tests, micro-benchmarks included
📦 Install #
dart pub add cross_channel
🧭 API #
High-Level (XChannel) #
final (tx, rx) = XChannel.mpsc<T>(capacity: 1024, policy: DropPolicy.block);
final (tx, rx) = XChannel.mpmc<T>(capacity: 1024, policy: DropPolicy.oldest);
final (tx, rx) = XChannel.mpscLatest<T>(); // MPSC latest-only
final (tx, rx) = XChannel.mpmcLatest<T>(); // MPMC latest-only (competitive)
final (tx, rx) = XChannel.spsc<int>(capacity: 1024); // pow2 rounded internally
final (tx, rx) = XChannel.oneshot<T>(consumeOnce: false);
Low-Level #
Low-level flavors are available in mpsc.dart
, mpmc.dart
, spsc.dart
, oneshot.dart
.
import 'package:cross_channel/mpsc.dart';
final (tx, rx) = Mpsc.unbounded<T>(); // chunked=true (default)
final (tx, rx) = Mpsc.unbounded<T>(chunked: false); // simple unbounded
final (tx, rx) = Mpsc.bounded<T>( 1024);
final (tx, rx) = Mpsc.channel<T>(capacity: 1024, policy: DropPolicy.oldest, onDrop: (d) {});
final (tx, rx) = Mpsc.latest<T>();
import 'package:cross_channel/mpmc.dart';
final (tx, rx) = Mpmc.unbounded<T>(); // chunked=true (default)
final (tx, rx) = Mpmc.unbounded<T>(chunked: false); // simple unbounded
final (tx, rx) = Mpmc.bounded<T>(1024);
final (tx, rx) = Mpmc.channel<T>(capacity: 1024, policy: DropPolicy.oldest, onDrop: (d) {});
final (tx, rx) = Mpmc.latest<T>();
import 'package:cross_channel/spsc.dart';
final (tx, rx) = Spsc.channel<T>(1024); // pow2 rounded internally
import 'package:cross_channel/oneshot.dart';
final (tx, rx) = OneShot.channel<T>(consumeOnce: false);
Note #
- Unbounded channels use chunked buffer by default (hot ring and chunked overflow)
- Defaults: hot=8192, chunk=4096, rebalanceBatch=64, threshold=cap/16, gate=chunk/2.
- Streams are single-subscription. Clone receivers for parallel consumers on MPMC.
- Interop available in
isolate_extension.dart
,stream_extension.dart
,web_extension.dart
. - Core traits & buffers available in
src/*
.
🚦 Choosing a channel #
Channel | Producers | Consumers | Use case | Notes |
---|---|---|---|---|
MPSC | multi | single | Task queue, async pipeline | Backpressure & drop policies |
MPMC | multi | multi | Work-sharing worker pool | Consumers compete (no broadcast) |
SPSC | single | single | Ultra-low-latency hot path | Lock-free ring (pow2 capacity) |
OneShot | 1 | 1 (or many) | Request/response, once-only signal | consumeOnce option |
LatestOnly | multi | single (MPSC) / competitive (MPMC) | Progress, sensors, UI signals | Always coalesces to last value |
When to use Notify vs channels #
- Use Notify for control-plane wakeups without payloads: config-changed, flush, shutdown, “poke a waiter”, etc. (notifyOne / notifyAll; waiter calls notified() and awaits.)
- Use channels for data-plane messages with payloads and ordering: tasks, jobs, progress values, events to be processed, etc.
💡 Quick examples #
MPSC with backpressure #
import 'package:cross_channel/cross_channel.dart';
Future<void> producer(MpscSender<int> tx) async {
for (var i = 0; i < 100; i++) {
await tx.send(i); // waits if queue is full
}
tx.close();
}
Future<void> consumer(MpscReceiver<int> rx) async {
await for (final v in rx.stream()) {
// handle v
}
}
void main() {
final (tx, rx) = XChannel.mpsc<int>(capacity: 8);
Future.wait([producer(tx), consumer(rx)])
}
MPMC worker pool (competitive consumption) #
import 'package:cross_channel/cross_channel.dart';
Future<void> worker(int id, MpmcReceiver<String> rx) async {
await for (final task in rx.stream()) {
// process task
}
}
void main() async {
final (tx, rx0) = XChannel.mpmc<String>(capacity: 16);
final rx1 = rx0.clone();
final rx2 = rx0.clone();
final w0 = worker(0, rx0);
final w1 = worker(1, rx1);
final w2 = worker(2, rx2);
for (var i = 0; i < 20; i++) {
await tx.send('task $i');
}
tx.close();
await Future.wait([w0, w1, w2]);
}
LatestOnly signal (coalesced progress) #
import 'package:cross_channel/cross_channel.dart';
Future<void> ui(MpscReceiver<double> rx) async {
await for (final p in rx.stream()) {
// update progress bar with p in [0..1]
}
}
void main() async {
final (tx, rx) = XChannel.mpscLatest<double>();
final _ = ui(rx);
for (var i = 0; i <= 100; i++) {
tx.trySend(i / 100); // overwrites previous value
await Future.delayed(const Duration(milliseconds: 10));
}
tx.close();
}
Sliding queues (drop policies) #
final (tx, rx) = XChannel.mpsc<int>(
capacity: 1024,
policy: DropPolicy.oldest, // or DropPolicy.newest
onDrop: (d) => print('dropped $d'),
);
oldest
: evicts the oldest queued item to make room (keeps newest data flowing)newest
: drops the incoming item (send “looks ok” but value discarded)block
: default (producer waits when full)
OneShot (single vs multi observe) #
// consumeOnce = true: first receiver consumes, then disconnects
final (stx, srx) = XChannel.oneshot<String>(consumeOnce: true);
// consumeOnce = false: every receiver sees the same value (until higher-level teardown)
final (btx, brx) = XChannel.oneshot<String>(consumeOnce: false);
🔔 Notify (lightweight wakeups) #
A tiny synchronization primitive to signal tasks without passing data.
notified()
→ returns a(Future<void>, cancel)
pair.- If a permit is available, it completes immediately and consumes one permit.
- Otherwise it registers a waiter until notified or canceled.
notifyOne()
→ wakes one waiter or stores one permit if none is waiting.notifyAll()
→ wakes all current waiters (does not store permits).close()
→ wakes everyone withdisconnected
.- Integrates with
XSelect
viaonFuture
.
import 'package:cross_channel/cross_channel.dart';
final n = Notify();
// Waiter
final (f, cancel) = n.notified();
// ... later: cancel(); // optional
// Notifiers
n.notifyOne(); // or
n.notifyAll();
// With XSelect
final (fu, _) = n.notified();
await XSelect.run<void>((s) => s
..onFuture<void>(fu, (_) => null, tag: 'notify')
..onTick(Ticker.every(const Duration(seconds: 1)), () => null, tag: 'tick')
);
🧰 XSelect (futures/streams/receivers/timers) #
XSelect
lets you race multiple asynchronous branches and cancel the losers.
Cheat-sheet
-
Channels (Receiver)
-
onRecv(rx, (RecvResult
-
onRecvValue(rx, (T) -> R, {onDisconnected, tag})
-
onRecvError(rx, (Object, StackTrace?) -> R
-
-
Notify
- onNotify(Notify n, R Function() body, {Object? tag})
- onNotifyOnce(Notify n, R Function() body, {Object? tag})
-
Futures
-
onFuture(future, (T) -> R, {tag})
-
onFutureValue(future, (T) -> R, {tag})
-
onFutureError(future, (Object, StackTrace?) -> R, {tag})
-
-
Streams
-
onStream(stream, (T) -> R, {tag})
-
onStreamDone(stream, () -> R, {tag})
-
-
Timers
-
onDelay(Duration, () -> R, {tag}) (one-shot)
-
onTick(Duration, () -> R, {tag}) (every)
-
-
Sending
- onSend(sender, value, {tag}) (races a send; wins when the send completes)
-
Sync fast-path
- XSelect.syncRun(builder) → only immediate, non-blocking arms (e.g., tryRecv, already-due timers).
-
Guards
- if_(() => bool) to enable/disable a branch without rebuilding the selection.
-
Fairness vs order
- Fairness by rotation is default; call .ordered() to preserve declaration order
import 'package:cross_channel/cross_channel.dart';
Future<void> main() async {
final (tx, rx) = XChannel.mpsc<int>(capacity: 8);
// Example: first event wins among a stream, a channel, a timer, and a future.
final result = await XSelect.run<String>((s) => s
..onStream<int>(
Stream.periodic(const Duration(milliseconds: 50), (i) => i),
(i) => 'stream:$i',
tag: 'S',
)
// simple
..onRecvValue<int>(
rx,
(v) => 'recv:$v',
onDisconnected: () => 'disconnected',
tag: 'R',
)
// full control over RecvResult
..onRecv<int>(
rx,
(res) {
if (res is RecvOk<int>) return 'recv:${res.value}';
if (res is RecvErrorDisconnected) return 'disconnected';
return 'unexpected:$res';
},
tag: 'R'?
)
..onTick(
const Duration(milliseconds: 50),
() => 'timer',
tag: 'T',
)
..onFuture<String>(
Future<String>.delayed(const Duration(milliseconds: 80), () => 'future'),
(s) => s,
tag: 'F',
)
);
print('winner -> $result');
}
Notes:
- returns the first resolved branch and cancels the rest.
- ordered = true forces order, otherwise we use fairness rotation
syncRun
only uses immediate arms, aka non blocking- Use
if_
to conditionally include a branch without rebuilding the selection.
🔗 Interop #
Stream #
import 'package:cross_channel/stream_extension.dart';
// Receiver → broadcast Stream (pause/resume when no listeners)
final broadcast = rx.toBroadcastStream(
waitForListeners: true,
stopWhenNoListeners: true,
closeReceiverOnDone: false,
);
// Stream → Sender (optional drop on full; auto-close sender on done)
await someStream.redirectToSender(tx, dropWhenFull: true);
Isolates #
import 'dart:isolate';
import 'package:cross_channel/isolate_extension.dart';
// Typed request/reply
final reply = await someSendPort.request<Map<String, Object?>>(
'get_user',
data: {'id': 42},
timeout: const Duration(seconds: 3),
);
// Port → channel bridge
final rp = ReceivePort();
final (tx, rx) = rp.toMpsc<MyEvent>(capacity: 512, strict: true);
Web #
import 'package:web/web.dart';
import 'package:cross_channel/web_extension.dart';
final channel = MessageChannel();
// Typed request/reply
final res = await channel.port1.request<String>('ping');
// Port → channel bridge
final (tx, rx) = channel.port2.toMpmc<JsEvent>(capacity: 512, strict: true);
🧩 Results & helpers #
// SendResult: SendOk | SendErrorFull | SendErrorDisconnected
// RecvResult: RecvOk(value) | RecvErrorEmpty | RecvErrorDisconnected
// Extensions:
// SendResult
r.hasSend;
r.isFull;
r.isDisconnected;
r.isTimeout;
r.isFailed;
r.hasError;
// RecvResult
rr.hasValue;
rr.isEmpty;
rr.isDisconnected;
rr.isTimeout;
rr.isFailed;
rr.hasError;
rr.valueOrNull;
// Timeouts/batching/draining:
await tx.sendTimeout(v, const Duration(milliseconds: 10));
await tx.sendAll(iterable); // waits on full
tx.trySendAll(iterable); // best-effort
rx.tryRecvAll(max: 128); // burst non-blocking drain
await rx.recvAll(idle: Duration(milliseconds: 1), max: 1024);
// Cancelable receive (all receivers):
final (fut, cancel) = rx.recvCancelable();
cancel(); // attempts to abort the wait if still pending
📊 Benchmarks (Dart VM, i7-8550U, High priority, CPU affinity set) #
- Benches are single-isolate micro-benchmarks.
- Pinning affinity/priority helps stabilize latencies.
MPSC #
case | Mops/s (recv) | ns/op (recv) | p99 us (recv) |
---|---|---|---|
ping-pong cap=1 (1P/1C) | 1.87 | 535.7 | 7.5 |
pipeline cap=1024 (1P/1C) | 1.83 | 547.1 | 13.1 |
pipeline unbounded (1P/1C) | 1.77 | 563.0 | 12.3 |
pipeline unbounded (chunked) (1P/1C) | 1.83 | 546.5 | 10.1 |
multi-producers cap=1024 (4P/1C) | 1.72 | 565.6 | 10.6 |
pipeline rendezvous cap=0 (1P/1C) | 1.68 | 593.8 | 11.4 |
sliding=oldest cap=1024 (1P/1C) | 1.76 | 568.3 | 15.9 |
sliding=newest cap=1024 (1P/1C) | 1.74 | 574.6 | 11.6 |
latestOnly (coalesce) (1P/1C) | 1.60 | 626.3 | 14.9 |
MPMC #
case | Mops/s (recv) | ns/op (recv) | p99 us (recv) |
---|---|---|---|
ping-pong cap=1 (1P/1C) | 1.87 | 536.1 | 7.5 |
pipeline cap=1024 (1P/1C) | 1.90 | 525.0 | 14.2 |
pipeline unbounded (1P/1C) | 1.87 | 534.1 | 11.4 |
multi-producers cap=1024 (4P/1C) | 1.72 | 580.5 | 18.0 |
multi-producers cap=1024 (4P/4C) | 1.72 | 580.8 | 16.7 |
pipeline rendezvous cap=0 (1P/1C) | 1.62 | 619.0 | 13.0 |
sliding=oldest cap=1024 (1P/1C) | 1.76 | 569.5 | 14.9 |
sliding=newest cap=1024 (1P/1C) | 1.74 | 573.5 | 10.9 |
latestOnly (coalesce) (1P/1C) | 1.63 | 614.3 | 15.8 |
latestOnly (coalesce/competitive) (1P/4C) | 1.66 | 604.1 | 22.8 |
SPSC #
case | Mops/s (recv) | ns/op (recv) | p99 us (recv) |
---|---|---|---|
spsc ping-pong cap=1024 | 1.76 | 569.8 | 5.2 |
spsc pipeline cap=1024 | 1.78 | 562.8 | 7.1 |
spsc pipeline cap=4096 | 1.80 | 555.3 | 15.7 |
ONESHOT #
case | Mops/s (recv) | ns/op (recv) | p99 us (recv) |
---|---|---|---|
oneshot send+receive | 2.75 | 363.6 | 1.7 |
oneshot pipeline | 2.80 | 356.9 | 4.3 |
How to bench #
This repo ships with two PowerShell scripts to run the micro-benchmarks. They produce consistent, copy-pastable output and (optionally) a CSV for later analysis.
Windows or PowerShell Core (
pwsh
) on macOS/Linux is fine.
For non-PowerShell usage, see the manual commands at the end.
Lite — fast dev loop
Compiles and runs once per target. No CSV, no CPU pinning, no priority tweaks.
# All suites, 1e6 iterations per case
.\bench_lite.ps1 -Target all -Count 1000000
# Single suite
.\bench_lite.ps1 -Target mpsc -Count 1000000
Full — reproducible runs + CSV
Lets you set CPU affinity, process priority, repeat counts, and append results to a CSV.
# Compile & Run, MPMC only, 5 repeats, High priority, CPU0,
# append CSV lines to bench\out.csv
.\bench_full.ps1 `
-Target mpmc `
-Action cr `
-Count 1000000 `
-Repeat 5 `
-Priority High `
-Affinity 0x1 `
-Csv `
-OutCsv "bench\out.csv" `
-AppendCsv
Parameters (full mode):
- Target: spsc, mpsc, mpmc, oneshot, isolate, inter_isolate, all
- Action: compile (just build), run (run existing exes), cr (compile+run)
- Count: iterations per case (e.g., 1000000)
- Repeat: run the suite multiple times (e.g., -Repeat 5)
- Priority: Idle · BelowNormal · Normal · AboveNormal · High · RealTime
- Affinity: CPU bitmask (e.g., 0x1 = CPU0, 0x3 = CPU0–1)
- Csv: write results to CSV
- OutCsv: CSV path (default bench\out.csv)
- AppendCsv: append instead of overwriting the header
Stability tips: pin CPU with -Affinity, raise -Priority High, do a warm-up repeat, and keep the machine idle.
Benchmark CSV Format
Note: Benchmarks use the same metrics CSV format shown in the Metrics section.
When you run benchmarks with--csv
, they output detailed metrics for each test case.
The benchmark scripts automatically enable metrics collection and use StdExporter
or CsvExporter
to output comprehensive performance data including operation counts, latency percentiles, and throughput measurements.
You can aggregate across repeats (median/mean/p95) in your own tooling.
📊 Metrics (Advanced) #
Built-in metrics system for production monitoring and performance analysis.
import 'package:cross_channel/metrics.dart';
// Enable metrics globally
MetricsConfig.enabled = true;
MetricsConfig.sampleLatency = true;
MetricsConfig.sampleRate = 0.1; // 10% sampling
// Configure exporter
MetricsConfig.exporter = StdExporter(
useColor: true,
compact: false,
);
// OR export to CSV file
final csvFile = File('metrics.csv');
MetricsConfig.exporter = CsvExporter(sink: csvFile.openWrite());
// Channels automatically collect metrics
final (tx, rx) = XChannel.mpsc<String>(capacity: 1000);
Available exporters:
NoopExporter
- Default (discards metrics)StdExporter
- Formatted console output with colorsCsvExporter
- Export to CSV format (stdout or file)- Custom exporters by extending
MetricsExporter
Metrics collected:
- Operation counts:
sent
,recv
,dropped
,closed
- Latency percentiles: P50, P95, P99 (via P² algorithm)
- Performance: ops/second, ns/operation, drop rates
- Per-channel and global aggregation
Performance impact: unmeasured.
CSV Metrics Format #
The CsvExporter
produces runtime metrics. This is the same format used by the benchmark scripts:
ts,type,id,sent,recv,dropped,closed,trySendOk,trySendFail,tryRecvOk,tryRecvEmpty,send_p50_ns,send_p95_ns,send_p99_ns,recv_p50_ns,recv_p95_ns,recv_p99_ns,mops,ns_per_op,drop_rate,try_send_failure_rate,try_recv_empty_rate,channels_count
2025-09-12T20:26:00.000Z,global,,1000,950,50,0,800,200,900,50,,,,,,,0.95,1052,0.05,0.2,0.05,3
2025-09-12T20:26:00.000Z,channel,my-worker-queue,500,480,20,0,400,100,450,30,1200.5,2100.8,5000.2,950.1,1800.3,4200.1,0.48,2083,0.04,0.2,0.06,
Field Groups:
Metadata:
ts
- Timestamp (ISO8601)type
- 'global' or 'channel'id
- Channel identifier (empty for global)
Core Operations:
sent
,recv
- Total blocking operation countsdropped
,closed
- Loss and lifecycle events
Non-blocking Operations:
trySendOk
,trySendFail
- Non-blocking send resultstryRecvOk
,tryRecvEmpty
- Non-blocking receive results
Latency Percentiles (nanoseconds):
send_p50_ns
,send_p95_ns
,send_p99_ns
- Send operation latenciesrecv_p50_ns
,recv_p95_ns
,recv_p99_ns
- Receive operation latencies
Performance Metrics:
mops
- Million operations per secondns_per_op
- Nanoseconds per operation
Quality Metrics:
drop_rate
- Percentage of dropped messages (0.0-1.0)try_send_failure_rate
- Non-blocking send failure rate (0.0-1.0)try_recv_empty_rate
- Non-blocking receive empty rate (0.0-1.0)
System:
channels_count
- Total active channels (global snapshots only)
🧪 Testing #
This repo ships with comprehensive unit, stress and integration tests (isolate/web/stream).
dart test
License #
MIT