latest<T> static method

(MpmcSender<T>, MpmcReceiver<T>) latest<T>({
  1. String? metricsId,
})

Creates a latest-only MPMC channel with competitive consumption.

Only retains the most recent message, but with MPMC semantics: exactly one consumer will receive each update. This is competitive consumption, not broadcasting - multiple consumers compete for the latest value.

Important: This is NOT a broadcast cache! Each update goes to exactly one consumer. If you need broadcasting, use multiple MPSC channels or implement your own broadcast layer.

Characteristics:

  • Latest-only storage: Only the most recent value is kept
  • Competitive consumption: First consumer to recv() gets the value
  • No queuing: Rapid updates automatically coalesce
  • Multi-producer: Multiple threads can send concurrently
  • Multi-consumer: Multiple consumers compete for messages

Example - Distributed state updates:

final (statusTx, statusRx) = Mpmc.latest<SystemStatus>();

// Multiple monitoring threads update status
statusTx.send(SystemStatus(cpu: 45, memory: 60));
statusTx.send(SystemStatus(cpu: 50, memory: 65)); // Overwrites previous

// Multiple consumers compete for updates
final consumer1 = statusRx.clone();
final consumer2 = statusRx.clone();

// Only ONE of these will get the latest status
final status1Future = consumer1.recv(); // May get the value
final status2Future = consumer2.recv(); // May get disconnected

Example - Load balancer with latest config:

final (configTx, configRx) = Mpmc.latest<LoadBalancerConfig>();

// Config updates from admin
configTx.send(LoadBalancerConfig(servers: newServerList));

// Multiple load balancer instances compete for config
for (int i = 0; i < balancerCount; i++) {
  final balancerRx = configRx.clone();
  Thread.run(() async {
    await for (final config in balancerRx.stream()) {
      // Only one balancer gets each config update
      await updateBalancerConfig(config);
    }
  });
}

Implementation

static (MpmcSender<T>, MpmcReceiver<T>) latest<T>({String? metricsId}) {
  final buf = LatestOnlyBuffer<T>();
  final core = _MpmcCore<T>(buf, metricsId: metricsId);
  final tx = core.attachSender((c) => MpmcSender<T>._(c));
  final rx = core.attachReceiver((c) => MpmcReceiver<T>._(c));
  return (tx, rx);
}