cross_channel 0.8.2 copy "cross_channel: ^0.8.2" to clipboard
cross_channel: ^0.8.2 copied to clipboard

High-performance & flexible channels for Dart/Flutter.

License pub.flutter-io.cn pub points CI

cross_channel #

Fast & flexible channels for Dart/Flutter.
Rust-style concurrency primitives: MPSC, MPMC, SPSC, OneShot, plus drop policies, latest-only, select, stream/isolate/web adapters.

Bench-tested · ~1.6–1.9 Mops/s

✨ Features #

  • Drop policies: block, oldest, newest
  • LatestOnly buffers (coalesce to last)
  • Select over Futures, Streams, Receivers & Timers
  • Backpressure with bounded queues; rendezvous (capacity=0)
  • Notify primitive (lightweight wakeups: notifyOne, notifyAll, notified)
  • Stream adapters: toBroadcastStream, redirectToSender
  • Isolate/Web adapters: request/reply, ReceivePort / MessagePort bridges
  • Battle-tested: unit + stress tests, micro-benchmarks included

📦 Install #

dart pub add cross_channel

🧭 API #

High-Level (XChannel) #

final (tx, rx) = XChannel.mpsc<T>(capacity: 1024, policy: DropPolicy.block);
final (tx, rx) = XChannel.mpmc<T>(capacity: 1024, policy: DropPolicy.oldest);
final (tx, rx) = XChannel.mpscLatest<T>(); // MPSC latest-only
final (tx, rx) = XChannel.mpmcLatest<T>(); // MPMC latest-only (competitive)
final (tx, rx) = XChannel.spsc<int>(capacity: 1024); // pow2 rounded internally
final (tx, rx) = XChannel.oneshot<T>(consumeOnce: false);

Low-Level #

Low-level flavors are available in mpsc.dart, mpmc.dart, spsc.dart, oneshot.dart.

import 'package:cross_channel/mpsc.dart';
final (tx, rx) = Mpsc.unbounded<T>(); // chunked=true (default)
final (tx, rx) = Mpsc.unbounded<T>(chunked: false); // simple unbounded
final (tx, rx) = Mpsc.bounded<T>( 1024);
final (tx, rx) = Mpsc.channel<T>(capacity: 1024, policy: DropPolicy.oldest, onDrop: (d) {});
final (tx, rx) = Mpsc.latest<T>();

import 'package:cross_channel/mpmc.dart';
final (tx, rx) = Mpmc.unbounded<T>(); // chunked=true (default)
final (tx, rx) = Mpmc.unbounded<T>(chunked: false); // simple unbounded
final (tx, rx) = Mpmc.bounded<T>(1024);
final (tx, rx) = Mpmc.channel<T>(capacity: 1024, policy: DropPolicy.oldest, onDrop: (d) {});
final (tx, rx) = Mpmc.latest<T>();

import 'package:cross_channel/spsc.dart';
final (tx, rx) = Spsc.channel<T>(1024); // pow2 rounded internally

import 'package:cross_channel/oneshot.dart';
final (tx, rx) = OneShot.channel<T>(consumeOnce: false);

Note #

  • Unbounded channels use chunked buffer by default (hot ring and chunked overflow)
    • Defaults: hot=8192, chunk=4096, rebalanceBatch=64, threshold=cap/16, gate=chunk/2.
  • Streams are single-subscription. Clone receivers for parallel consumers on MPMC.
  • Interop available in isolate_extension.dart, stream_extension.dart, web_extension.dart.
  • Core traits & buffers available in src/*.

🚦 Choosing a channel #

Channel Producers Consumers Use case Notes
MPSC multi single Task queue, async pipeline Backpressure & drop policies
MPMC multi multi Work-sharing worker pool Consumers compete (no broadcast)
SPSC single single Ultra-low-latency hot path Lock-free ring (pow2 capacity)
OneShot 1 1 (or many) Request/response, once-only signal consumeOnce option
LatestOnly multi single (MPSC) / competitive (MPMC) Progress, sensors, UI signals Always coalesces to last value

When to use Notify vs channels #

  • Use Notify for control-plane wakeups without payloads: config-changed, flush, shutdown, “poke a waiter”, etc. (notifyOne / notifyAll; waiter calls notified() and awaits.)
  • Use channels for data-plane messages with payloads and ordering: tasks, jobs, progress values, events to be processed, etc.

💡 Quick examples #

MPSC with backpressure #

import 'package:cross_channel/cross_channel.dart';

Future<void> producer(MpscSender<int> tx) async {
  for (var i = 0; i < 100; i++) {
    await tx.send(i); // waits if queue is full
  }
  tx.close();
}

Future<void> consumer(MpscReceiver<int> rx) async {
  await for (final v in rx.stream()) {
    // handle v
  }
}

void main() {
  final (tx, rx) = XChannel.mpsc<int>(capacity: 8);
  Future.wait([producer(tx), consumer(rx)])
}

MPMC worker pool (competitive consumption) #

import 'package:cross_channel/cross_channel.dart';

Future<void> worker(int id, MpmcReceiver<String> rx) async {
  await for (final task in rx.stream()) {
    // process task
  }
}

void main() async {
  final (tx, rx0) = XChannel.mpmc<String>(capacity: 16);
  final rx1 = rx0.clone();
  final rx2 = rx0.clone();

  final w0 = worker(0, rx0);
  final w1 = worker(1, rx1);
  final w2 = worker(2, rx2);

  for (var i = 0; i < 20; i++) {
    await tx.send('task $i');
  }
  tx.close();

  await Future.wait([w0, w1, w2]);
}

LatestOnly signal (coalesced progress) #

import 'package:cross_channel/cross_channel.dart';

Future<void> ui(MpscReceiver<double> rx) async {
  await for (final p in rx.stream()) {
    // update progress bar with p in [0..1]
  }
}

void main() async {
  final (tx, rx) = XChannel.mpscLatest<double>();

  final _ = ui(rx);
  for (var i = 0; i <= 100; i++) {
    tx.trySend(i / 100); // overwrites previous value
    await Future.delayed(const Duration(milliseconds: 10));
  }
  tx.close();
}

Sliding queues (drop policies) #

final (tx, rx) = XChannel.mpsc<int>(
  capacity: 1024,
  policy: DropPolicy.oldest, // or DropPolicy.newest
  onDrop: (d) => print('dropped $d'),
);
  • oldest: evicts the oldest queued item to make room (keeps newest data flowing)
  • newest: drops the incoming item (send “looks ok” but value discarded)
  • block: default (producer waits when full)

OneShot (single vs multi observe) #

// consumeOnce = true: first receiver consumes, then disconnects
final (stx, srx) = XChannel.oneshot<String>(consumeOnce: true);

// consumeOnce = false: every receiver sees the same value (until higher-level teardown)
final (btx, brx) = XChannel.oneshot<String>(consumeOnce: false);

🔔 Notify (lightweight wakeups) #

A tiny synchronization primitive to signal tasks without passing data.

  • notified() → returns a (Future<void>, cancel) pair.
    • If a permit is available, it completes immediately and consumes one permit.
    • Otherwise it registers a waiter until notified or canceled.
  • notifyOne() → wakes one waiter or stores one permit if none is waiting.
  • notifyAll() → wakes all current waiters (does not store permits).
  • close() → wakes everyone with disconnected.
  • Integrates with XSelect via onFuture.
import 'package:cross_channel/cross_channel.dart';

final n = Notify();

// Waiter
final (f, cancel) = n.notified();
// ... later: cancel();  // optional

// Notifiers
n.notifyOne(); // or
n.notifyAll();

// With XSelect
final (fu, _) = n.notified();
await XSelect.run<void>((s) => s
  ..onFuture<void>(fu, (_) => null, tag: 'notify')
  ..onTick(Ticker.every(const Duration(seconds: 1)), () => null, tag: 'tick')
);

🧰 XSelect (futures/streams/receivers/timers) #

XSelect lets you race multiple asynchronous branches and cancel the losers.

Cheat-sheet

  • Channels (Receiver)

    • onRecv(rx, (RecvResult

    • onRecvValue(rx, (T) -> R, {onDisconnected, tag})

    • onRecvError(rx, (Object, StackTrace?) -> R

  • Notify

    • onNotify(Notify n, R Function() body, {Object? tag})
    • onNotifyOnce(Notify n, R Function() body, {Object? tag})
  • Futures

    • onFuture(future, (T) -> R, {tag})

    • onFutureValue(future, (T) -> R, {tag})

    • onFutureError(future, (Object, StackTrace?) -> R, {tag})

  • Streams

    • onStream(stream, (T) -> R, {tag})

    • onStreamDone(stream, () -> R, {tag})

  • Timers

    • onDelay(Duration, () -> R, {tag}) (one-shot)

    • onTick(Duration, () -> R, {tag}) (every)

  • Sending

    • onSend(sender, value, {tag}) (races a send; wins when the send completes)
  • Sync fast-path

    • XSelect.syncRun(builder) → only immediate, non-blocking arms (e.g., tryRecv, already-due timers).
  • Guards

    • if_(() => bool) to enable/disable a branch without rebuilding the selection.
  • Fairness vs order

    • Fairness by rotation is default; call .ordered() to preserve declaration order
import 'package:cross_channel/cross_channel.dart';

Future<void> main() async {
  final (tx, rx) = XChannel.mpsc<int>(capacity: 8);

  // Example: first event wins among a stream, a channel, a timer, and a future.
  final result = await XSelect.run<String>((s) => s
    ..onStream<int>(
      Stream.periodic(const Duration(milliseconds: 50), (i) => i),
      (i) => 'stream:$i',
      tag: 'S',
    )
    // simple
    ..onRecvValue<int>(
      rx,
      (v) => 'recv:$v',
      onDisconnected: () => 'disconnected',
      tag: 'R',
    )
    // full control over RecvResult
    ..onRecv<int>(
      rx,
      (res) {
        if (res is RecvOk<int>) return 'recv:${res.value}';
        if (res is RecvErrorDisconnected) return 'disconnected';
        return 'unexpected:$res';
      },
      tag: 'R'?
    )
    ..onTick(
     const Duration(milliseconds: 50),
      () => 'timer',
      tag: 'T',
    )
    ..onFuture<String>(
      Future<String>.delayed(const Duration(milliseconds: 80), () => 'future'),
      (s) => s,
      tag: 'F',
    )
  );

  print('winner -> $result');
}

Notes:

  • returns the first resolved branch and cancels the rest.
  • ordered = true forces order, otherwise we use fairness rotation
  • syncRun only uses immediate arms, aka non blocking
  • Use if_ to conditionally include a branch without rebuilding the selection.

🔗 Interop #

Stream #

import 'package:cross_channel/stream_extension.dart';

// Receiver → broadcast Stream (pause/resume when no listeners)
final broadcast = rx.toBroadcastStream(
  waitForListeners: true,
  stopWhenNoListeners: true,
  closeReceiverOnDone: false,
);

// Stream → Sender (optional drop on full; auto-close sender on done)
await someStream.redirectToSender(tx, dropWhenFull: true);

Isolates #

import 'dart:isolate';
import 'package:cross_channel/isolate_extension.dart';

// Typed request/reply
final reply = await someSendPort.request<Map<String, Object?>>(
  'get_user',
  data: {'id': 42},
  timeout: const Duration(seconds: 3),
);

// Port → channel bridge
final rp = ReceivePort();
final (tx, rx) = rp.toMpsc<MyEvent>(capacity: 512, strict: true);

Web #

import 'package:web/web.dart';
import 'package:cross_channel/web_extension.dart';


final channel = MessageChannel();

// Typed request/reply
final res = await channel.port1.request<String>('ping');

// Port → channel bridge
final (tx, rx) = channel.port2.toMpmc<JsEvent>(capacity: 512, strict: true);

🧩 Results & helpers #

// SendResult: SendOk | SendErrorFull | SendErrorDisconnected
// RecvResult: RecvOk(value) | RecvErrorEmpty | RecvErrorDisconnected

// Extensions:

// SendResult
r.hasSend;
r.isFull;
r.isDisconnected;
r.isTimeout;
r.isFailed;
r.hasError;

// RecvResult
rr.hasValue;
rr.isEmpty;
rr.isDisconnected;
rr.isTimeout;
rr.isFailed;
rr.hasError;
rr.valueOrNull;

// Timeouts/batching/draining:
await tx.sendTimeout(v, const Duration(milliseconds: 10));
await tx.sendAll(iterable);     // waits on full
tx.trySendAll(iterable);        // best-effort

rx.tryRecvAll(max: 128);        // burst non-blocking drain
await rx.recvAll(idle: Duration(milliseconds: 1), max: 1024);

// Cancelable receive (all receivers):
final (fut, cancel) = rx.recvCancelable();
cancel(); // attempts to abort the wait if still pending

📊 Benchmarks (Dart VM, i7-8550U, High priority, CPU affinity set) #

  • Benches are single-isolate micro-benchmarks.
  • Pinning affinity/priority helps stabilize latencies.

MPSC #

case Mops/s (recv) ns/op (recv) p99 us (recv)
ping-pong cap=1 (1P/1C) 1.87 535.7 7.5
pipeline cap=1024 (1P/1C) 1.83 547.1 13.1
pipeline unbounded (1P/1C) 1.77 563.0 12.3
pipeline unbounded (chunked) (1P/1C) 1.83 546.5 10.1
multi-producers cap=1024 (4P/1C) 1.72 565.6 10.6
pipeline rendezvous cap=0 (1P/1C) 1.68 593.8 11.4
sliding=oldest cap=1024 (1P/1C) 1.76 568.3 15.9
sliding=newest cap=1024 (1P/1C) 1.74 574.6 11.6
latestOnly (coalesce) (1P/1C) 1.60 626.3 14.9

MPMC #

case Mops/s (recv) ns/op (recv) p99 us (recv)
ping-pong cap=1 (1P/1C) 1.87 536.1 7.5
pipeline cap=1024 (1P/1C) 1.90 525.0 14.2
pipeline unbounded (1P/1C) 1.87 534.1 11.4
multi-producers cap=1024 (4P/1C) 1.72 580.5 18.0
multi-producers cap=1024 (4P/4C) 1.72 580.8 16.7
pipeline rendezvous cap=0 (1P/1C) 1.62 619.0 13.0
sliding=oldest cap=1024 (1P/1C) 1.76 569.5 14.9
sliding=newest cap=1024 (1P/1C) 1.74 573.5 10.9
latestOnly (coalesce) (1P/1C) 1.63 614.3 15.8
latestOnly (coalesce/competitive) (1P/4C) 1.66 604.1 22.8

SPSC #

case Mops/s (recv) ns/op (recv) p99 us (recv)
spsc ping-pong cap=1024 1.76 569.8 5.2
spsc pipeline cap=1024 1.78 562.8 7.1
spsc pipeline cap=4096 1.80 555.3 15.7

ONESHOT #

case Mops/s (recv) ns/op (recv) p99 us (recv)
oneshot send+receive 2.75 363.6 1.7
oneshot pipeline 2.80 356.9 4.3

How to bench #

This repo ships with two PowerShell scripts to run the micro-benchmarks. They produce consistent, copy-pastable output and (optionally) a CSV for later analysis.

Windows or PowerShell Core (pwsh) on macOS/Linux is fine.
For non-PowerShell usage, see the manual commands at the end.

Lite — fast dev loop

Compiles and runs once per target. No CSV, no CPU pinning, no priority tweaks.

# All suites, 1e6 iterations per case
.\bench_lite.ps1 -Target all -Count 1000000

# Single suite
.\bench_lite.ps1 -Target mpsc -Count 1000000

Full — reproducible runs + CSV

Lets you set CPU affinity, process priority, repeat counts, and append results to a CSV.

# Compile & Run, MPMC only, 5 repeats, High priority, CPU0,
# append CSV lines to bench\out.csv
.\bench_full.ps1 `
  -Target mpmc `
  -Action cr `
  -Count 1000000 `
  -Repeat 5 `
  -Priority High `
  -Affinity 0x1 `
  -Csv `
  -OutCsv "bench\out.csv" `
  -AppendCsv

Parameters (full mode):

  • Target: spsc, mpsc, mpmc, oneshot, isolate, inter_isolate, all
  • Action: compile (just build), run (run existing exes), cr (compile+run)
  • Count: iterations per case (e.g., 1000000)
  • Repeat: run the suite multiple times (e.g., -Repeat 5)
  • Priority: Idle · BelowNormal · Normal · AboveNormal · High · RealTime
  • Affinity: CPU bitmask (e.g., 0x1 = CPU0, 0x3 = CPU0–1)
  • Csv: write results to CSV
  • OutCsv: CSV path (default bench\out.csv)
  • AppendCsv: append instead of overwriting the header

Stability tips: pin CPU with -Affinity, raise -Priority High, do a warm-up repeat, and keep the machine idle.

Benchmark CSV Format

Note: Benchmarks use the same metrics CSV format shown in the Metrics section.
When you run benchmarks with --csv, they output detailed metrics for each test case.

The benchmark scripts automatically enable metrics collection and use StdExporter or CsvExporter to output comprehensive performance data including operation counts, latency percentiles, and throughput measurements.

You can aggregate across repeats (median/mean/p95) in your own tooling.

📊 Metrics (Advanced) #

Built-in metrics system for production monitoring and performance analysis.

import 'package:cross_channel/metrics.dart';

// Enable metrics globally
MetricsConfig.enabled = true;
MetricsConfig.sampleLatency = true;
MetricsConfig.sampleRate = 0.1; // 10% sampling

// Configure exporter
MetricsConfig.exporter = StdExporter(
  useColor: true,
  compact: false,
);

// OR export to CSV file
final csvFile = File('metrics.csv');
MetricsConfig.exporter = CsvExporter(sink: csvFile.openWrite());

// Channels automatically collect metrics
final (tx, rx) = XChannel.mpsc<String>(capacity: 1000);

Available exporters:

  • NoopExporter - Default (discards metrics)
  • StdExporter - Formatted console output with colors
  • CsvExporter - Export to CSV format (stdout or file)
  • Custom exporters by extending MetricsExporter

Metrics collected:

  • Operation counts: sent, recv, dropped, closed
  • Latency percentiles: P50, P95, P99 (via P² algorithm)
  • Performance: ops/second, ns/operation, drop rates
  • Per-channel and global aggregation

Performance impact: unmeasured.

CSV Metrics Format #

The CsvExporter produces runtime metrics. This is the same format used by the benchmark scripts:

ts,type,id,sent,recv,dropped,closed,trySendOk,trySendFail,tryRecvOk,tryRecvEmpty,send_p50_ns,send_p95_ns,send_p99_ns,recv_p50_ns,recv_p95_ns,recv_p99_ns,mops,ns_per_op,drop_rate,try_send_failure_rate,try_recv_empty_rate,channels_count
2025-09-12T20:26:00.000Z,global,,1000,950,50,0,800,200,900,50,,,,,,,0.95,1052,0.05,0.2,0.05,3
2025-09-12T20:26:00.000Z,channel,my-worker-queue,500,480,20,0,400,100,450,30,1200.5,2100.8,5000.2,950.1,1800.3,4200.1,0.48,2083,0.04,0.2,0.06,

Field Groups:

Metadata:

  • ts - Timestamp (ISO8601)
  • type - 'global' or 'channel'
  • id - Channel identifier (empty for global)

Core Operations:

  • sent, recv - Total blocking operation counts
  • dropped, closed - Loss and lifecycle events

Non-blocking Operations:

  • trySendOk, trySendFail - Non-blocking send results
  • tryRecvOk, tryRecvEmpty - Non-blocking receive results

Latency Percentiles (nanoseconds):

  • send_p50_ns, send_p95_ns, send_p99_ns - Send operation latencies
  • recv_p50_ns, recv_p95_ns, recv_p99_ns - Receive operation latencies

Performance Metrics:

  • mops - Million operations per second
  • ns_per_op - Nanoseconds per operation

Quality Metrics:

  • drop_rate - Percentage of dropped messages (0.0-1.0)
  • try_send_failure_rate - Non-blocking send failure rate (0.0-1.0)
  • try_recv_empty_rate - Non-blocking receive empty rate (0.0-1.0)

System:

  • channels_count - Total active channels (global snapshots only)

🧪 Testing #

This repo ships with comprehensive unit, stress and integration tests (isolate/web/stream).

dart test

License #

MIT

0
likes
160
points
532
downloads

Publisher

verified publishermki.dev

Weekly Downloads

High-performance & flexible channels for Dart/Flutter.

Repository (GitHub)
View/report issues

Topics

#concurrency #async #channels #mpsc #mpmc

Documentation

API reference

License

MIT (license)

Dependencies

web

More

Packages that depend on cross_channel