stem 0.1.0-alpha.4 copy "stem: ^0.1.0-alpha.4" to clipboard
stem: ^0.1.0-alpha.4 copied to clipboard

Stem is a Dart-native background job platform with Redis Streams, retries, scheduling, observability, and security tooling.

pub package Dart License Build Status Buy Me A Coffee

Stem #

Stem is a Dart-native background job platform. It gives you Celery-style task execution with a Dart-first API, Redis Streams integration, retries, scheduling, observability, and security tooling-all without leaving the Dart ecosystem.

Install #

dart pub add stem           # core runtime APIs
dart pub add stem_redis     # Redis broker + result backend
dart pub add stem_postgres  # (optional) Postgres broker + backend
dart pub global activate stem_cli

Add the pub-cache bin directory to your PATH so the stem_cli tool is available:

export PATH="$HOME/.pub-cache/bin:$PATH"
stem --help

Quick Start #

Direct enqueue (map-based) #

import 'dart:async';
import 'package:stem/stem.dart';
import 'package:stem_redis/stem_redis.dart';

class HelloTask implements TaskHandler<void> {
  @override
  String get name => 'demo.hello';

  @override
  TaskOptions get options => const TaskOptions(
        queue: 'default',
        maxRetries: 3,
        rateLimit: '10/s',
        visibilityTimeout: Duration(seconds: 60),
      );

  @override
  Future<void> call(TaskContext context, Map<String, Object?> args) async {
    final who = args['name'] as String? ?? 'world';
    print('Hello $who (attempt ${context.attempt})');
  }
}

Future<void> main() async {
  final registry = SimpleTaskRegistry()..register(HelloTask());
  final broker = await RedisStreamsBroker.connect('redis://localhost:6379');
  final backend = await RedisResultBackend.connect('redis://localhost:6379/1');

  final stem = Stem(broker: broker, registry: registry, backend: backend);
  final worker = Worker(broker: broker, registry: registry, backend: backend);

  unawaited(worker.start());
  await stem.enqueue('demo.hello', args: {'name': 'Stem'});
  await Future<void>.delayed(const Duration(seconds: 1));
  await worker.shutdown();
  await broker.close();
  await backend.close();
}

Typed helpers with TaskDefinition #

Use the new typed wrapper when you want compile-time checking and shared metadata:

class HelloTask implements TaskHandler<void> {
  static final definition = TaskDefinition<HelloArgs, void>(
    name: 'demo.hello',
    encodeArgs: (args) => {'name': args.name},
    metadata: TaskMetadata(description: 'Simple hello world example'),
  );

  @override
  String get name => 'demo.hello';

  @override
  TaskOptions get options => const TaskOptions(maxRetries: 3);

  @override
  TaskMetadata get metadata => definition.metadata;

  @override
  Future<void> call(TaskContext context, Map<String, Object?> args) async {
    final who = args['name'] as String? ?? 'world';
    print('Hello $who (attempt ${context.attempt})');
  }
}

class HelloArgs {
  const HelloArgs({required this.name});
  final String name;
}

Future<void> main() async {
  final registry = SimpleTaskRegistry()..register(HelloTask());
  final broker = await RedisStreamsBroker.connect('redis://localhost:6379');
  final backend = await RedisResultBackend.connect('redis://localhost:6379/1');

  final stem = Stem(broker: broker, registry: registry, backend: backend);
  final worker = Worker(broker: broker, registry: registry, backend: backend);

  unawaited(worker.start());
  await stem.enqueueCall(
    HelloTask.definition(const HelloArgs(name: 'Stem')),
  );
  await Future<void>.delayed(const Duration(seconds: 1));
  await worker.shutdown();
  await broker.close();
  await backend.close();
}

You can also build requests fluently with the TaskEnqueueBuilder:

final taskId = await TaskEnqueueBuilder(
  definition: HelloTask.definition,
  args: const HelloArgs(name: 'Tenant A'),
)
  ..header('x-tenant', 'tenant-a')
  ..priority(5)
  ..delay(const Duration(seconds: 30))
  .enqueueWith(stem);

Bootstrap helpers #

Spin up a full runtime in one call using the bootstrap APIs:

final app = await StemWorkflowApp.inMemory(
  flows: [
    Flow(
      name: 'demo.workflow',
      build: (flow) {
        flow.step('hello', (ctx) async => 'done');
      },
    ),
  ],
);

final runId = await app.startWorkflow('demo.workflow');
final state = await app.waitForCompletion(runId);
print(state?.status); // WorkflowStatus.completed

await app.shutdown();

Workflow script facade #

Prefer the high-level WorkflowScript facade when you want to author a workflow as a single async function. The facade wraps FlowBuilder so your code can await script.step, await step.sleep, and await step.awaitEvent while retaining the same durability semantics (checkpoints, resume payloads, auto-versioning) as the lower-level API:

final app = await StemWorkflowApp.inMemory(
  scripts: [
    WorkflowScript(
      name: 'orders.workflow',
      run: (script) async {
        final checkout = await script.step('checkout', (step) async {
          return await chargeCustomer(step.params['userId'] as String);
        });

        await script.step('poll-shipment', (step) async {
          final resume = step.takeResumeData();
          if (resume != true) {
            await step.sleep(const Duration(seconds: 30));
            return 'waiting';
          }
          final status = await fetchShipment(checkout.id);
          if (!status.isComplete) {
            await step.sleep(const Duration(seconds: 30));
            return 'waiting';
          }
          return status.value;
        }, autoVersion: true);

        final receipt = await script.step('notify', (step) async {
          await sendReceiptEmail(checkout);
          return 'emailed';
        });

        return receipt;
      },
    ),
  ],
);

Inside a script step you can access the same metadata as FlowContext:

  • step.previousResult contains the prior step’s persisted value.
  • step.iteration tracks the current auto-version suffix when autoVersion: true is set.
  • step.idempotencyKey('scope') builds stable outbound identifiers.
  • step.takeResumeData() surfaces payloads from sleeps or awaited events so you can branch on resume paths.

Durable workflow semantics #

  • Chords dispatch from workers. Once every branch completes, any worker may enqueue the callback, ensuring producer crashes do not block completion.

  • Steps may run multiple times. The runtime replays a step from the top after every suspension (sleep, awaited event, rewind) and after worker crashes, so handlers must be idempotent.

  • Event waits are durable watchers. When a step calls awaitEvent, the runtime registers the run in the store so the next emitted payload is persisted atomically and delivered exactly once on resume. Operators can inspect suspended runs via WorkflowStore.listWatchers or runsWaitingOn.

  • Checkpoints act as heartbeats. Every successful saveStep refreshes the run's updatedAt timestamp so operators (and future reclaim logic) can distinguish actively-owned runs from ones that need recovery.

  • Sleeps persist wake timestamps. When a resumed step calls sleep again, the runtime skips re-suspending once the stored resumeAt is reached so loop handlers can simply call sleep without extra guards.

  • Use ctx.takeResumeData() to detect whether a step is resuming. Call it at the start of the handler and branch accordingly.

  • When you suspend, provide a marker in the data payload so the resumed step can distinguish the wake-up path. For example:

    final resume = ctx.takeResumeData();
    if (resume != true) {
      ctx.sleep(const Duration(milliseconds: 200));
      return null;
    }
    
  • Awaited events behave the same way: the emitted payload is delivered via takeResumeData() when the run resumes.

  • Only return values you want persisted. If a handler returns null, the runtime treats it as "no result yet" and will run the step again on resume.

  • Derive outbound idempotency tokens with ctx.idempotencyKey('charge') so retries reuse the same stable identifier (workflow/run/scope).

  • Use autoVersion: true on steps that you plan to re-execute (e.g. after rewinding). Each completion stores a checkpoint like step#0, step#1, ... and the handler receives the current iteration via ctx.iteration.

  • Set an optional WorkflowCancellationPolicy when starting runs to auto-cancel workflows that exceed a wall-clock budget or stay suspended beyond an allowed duration. When a policy trips, the run transitions to cancelled and the reason is surfaced via stem wf show.

flow.step(
  'process-item',
  autoVersion: true,
  (ctx) async {
    final iteration = ctx.iteration;
    final item = items[iteration];
    return await process(item);
  },
);

final runId = await runtime.startWorkflow(
  'demo.workflow',
  params: const {'userId': '42'},
  cancellationPolicy: const WorkflowCancellationPolicy(
    maxRunDuration: Duration(minutes: 15),
    maxSuspendDuration: Duration(minutes: 5),
  ),
);

Adapter packages expose typed factories (e.g. redisBrokerFactory, postgresResultBackendFactory, sqliteWorkflowStoreFactory) so you can replace drivers by importing the adapter you need.

Features #

  • Task pipeline - enqueue with delays, priorities, idempotency helpers, and retries.
  • Workers - isolate pools with soft/hard time limits, autoscaling, and remote control (stem worker ping|revoke|shutdown).
  • Scheduling - Beat-style scheduler with interval/cron/solar/clocked entries and drift tracking.
  • Workflows - Durable Flow runtime with pluggable stores (in-memory, Redis, Postgres, SQLite) and CLI introspection via stem wf.
  • Observability - Dartastic OpenTelemetry metrics/traces, heartbeats, CLI inspection (stem observe, stem dlq).
  • Security - Payload signing (HMAC or Ed25519), TLS automation scripts, revocation persistence.
  • Adapters - In-memory drivers included here; Redis Streams and Postgres adapters ship via the stem_redis and stem_postgres packages.
  • Specs & tooling - OpenSpec change workflow, quality gates (tool/quality/run_quality_checks.sh), chaos/regression suites.

Documentation & Examples #

  • Full docs: Full docs (run npm install && npm start inside .site/).
  • Guided onboarding: Guided onboarding (install → infra → ops → production).
  • Examples (each has its own README):
  • workflows - end-to-end workflow samples (in-memory, sleep/event, SQLite, Redis). See versioned_rewind.dart for auto-versioned step rewinds.
  • cancellation_policy - demonstrates auto-cancelling long workflows using WorkflowCancellationPolicy.
  • rate_limit_delay - delayed enqueue, priority clamping, Redis rate limiter.
  • dlq_sandbox - dead-letter inspection and replay via CLI.
  • microservice, monolith_service, mixed_cluster - production-style topologies.
  • unique_tasks - enables TaskOptions.unique with a shared lock store.
  • security examples - payload signing + TLS profiles.
  • postgres_tls - Redis broker + Postgres backend secured via the shared STEM_TLS_* settings.
  • otel_metrics - OTLP collectors + Grafana dashboards.

Running Tests Locally #

Start the dockerised dependencies and export the integration variables before invoking the test suite:

source packages/stem_cli/_init_test_env
dart test

The helper script launches packages/stem_cli/docker/testing/docker-compose.yml (Redis + Postgres) and populates STEM_TEST_* environment variables needed by the integration suites.

Adapter Contract Tests #

Stem ships a reusable adapter contract suite in packages/stem_adapter_tests. Adapter packages (Redis broker/postgres backend, SQLite adapters, and any future integrations) add it as a dev_dependency and invoke runBrokerContractTests / runResultBackendContractTests from their integration tests. The harness exercises core behaviours-enqueue/ack/nack, dead-letter replay, lease extension, result persistence, group aggregation, and heartbeat storage-so all adapters stay aligned with the broker and result backend contracts. See test/integration/brokers/postgres_broker_integration_test.dart and test/integration/backends/postgres_backend_integration_test.dart for reference usage.

Testing helpers #

Use FakeStem from package:stem/stem.dart in unit tests when you want to record enqueued jobs without standing up brokers:

final fake = FakeStem();
await fake.enqueue('tasks.email', args: {'id': 1});
final recorded = fake.enqueues.single;
expect(recorded.name, 'tasks.email');
  • FakeWorkflowClock keeps workflow tests deterministic. Inject the same clock into your runtime and store, then advance it directly instead of sleeping:

    final clock = FakeWorkflowClock(DateTime.utc(2024, 1, 1));
    final store = InMemoryWorkflowStore(clock: clock);
    final runtime = WorkflowRuntime(
      stem: stem,
      store: store,
      eventBus: InMemoryEventBus(store),
      clock: clock,
    );
    
    clock.advance(const Duration(seconds: 5));
    final dueRuns = await store.dueRuns(clock.now());
    
0
likes
160
points
286
downloads

Publisher

unverified uploader

Weekly Downloads

Stem is a Dart-native background job platform with Redis Streams, retries, scheduling, observability, and security tooling.

Repository (GitHub)
View/report issues

Topics

#background-processing #message-queue #observability

Documentation

API reference

License

MIT (license)

Dependencies

collection, contextual, crypto, cryptography, dartastic_opentelemetry, dartastic_opentelemetry_api, glob, sqlite3, timezone, yaml

More

Packages that depend on stem