durable_outbox 0.1.1
durable_outbox: ^0.1.1 copied to clipboard
A reliable offline queue library with guaranteed delivery for Flutter/Dart applications.
durable_outbox #
Reliable offline queue library with guaranteed delivery for Flutter/Dart applications.
durable_outbox is a robust, cross-platform offline queue library that ensures reliable delivery of events and commands to your backend, even when the network is unavailable. Built with the outbox pattern, it provides at-least-once delivery guarantees with idempotency support.
✨ Features #
- ✅ Offline mode: Requests accumulate locally when network is unavailable
- ✅ Guaranteed delivery: At-least-once delivery with idempotency support
- ✅ Automatic retry: Smart retry with decorrelated jitter backoff
- ✅ Deduplication: Idempotency keys prevent duplicate processing
- ✅ Pause/Resume: Control processing on network/account/token changes
- ✅ Plugin architecture: HTTP, gRPC, and custom transport adapters
- ✅ Cross-platform: Works on mobile, desktop, web, and CLI
- ✅ Priority queues: Support for priority-based processing
- ✅ Delayed execution: Schedule entries for future processing
- ✅ Observability: Built-in metrics and state monitoring
📦 Installation #
Add durable_outbox to your pubspec.yaml:
dependencies:
durable_outbox: ^0.1.1
Then run:
dart pub get
🚀 Quick Start #
Basic Usage #
import 'package:durable_outbox/durable_outbox.dart';
import 'package:path_provider/path_provider.dart' as path_provider;
import 'package:path/path.dart' as path;
// Get application documents directory
final appDir = await path_provider.getApplicationDocumentsDirectory();
final dbPath = path.join(appDir.path, 'outbox.db');
// Create outbox with SQLite store
final outbox = DurableOutbox(
store: SqliteStore(dbPath: dbPath),
transport: HttpTransport(
endpoint: Uri.parse('https://api.example.com/outbox'),
authHeaders: () async => {'Authorization': 'Bearer $token'},
client: yourHttpClient, // Your HTTP client implementation
),
config: const OutboxConfig(
concurrency: 3,
autoStart: true,
),
);
await outbox.init();
// Enqueue an entry
await outbox.enqueue(
channel: 'orders',
payload: {'action': 'create', 'orderId': 'o-123'},
idempotencyKey: 'orders:o-123',
);
// Entries are automatically processed in the background
Simple HTTP Client Implementation #
For examples, you'll need to implement the HttpClient interface:
import 'dart:io' as io;
import 'package:durable_outbox/durable_outbox.dart';
class SimpleHttpClient implements HttpClient {
@override
Future<HttpResponse> request({
required String method,
required Uri uri,
Map<String, String>? headers,
Object? body,
Duration? timeout,
}) async {
final client = io.HttpClient();
try {
final request = await client.openUrl(method, uri);
headers?.forEach((key, value) {
request.headers.set(key, value);
});
if (body != null) {
request.write(body);
}
final response = await request.close().timeout(timeout ?? const Duration(seconds: 15));
final responseBody = await response.transform(const io.SystemEncoding().decoder).join();
final responseHeaders = <String, String>{};
response.headers.forEach((key, values) {
responseHeaders[key] = values.join(', ');
});
return HttpResponse(
statusCode: response.statusCode,
headers: responseHeaders,
body: responseBody,
);
} finally {
client.close();
}
}
}
🎯 Core Concepts #
Enqueueing Entries #
Add entries to the queue for processing:
// Basic enqueue
final id = await outbox.enqueue(
channel: 'orders',
payload: {'action': 'create', 'orderId': 'o-123'},
);
// With idempotency key
await outbox.enqueue(
channel: 'orders',
payload: {'action': 'create', 'orderId': 'o-123'},
idempotencyKey: 'orders:o-123',
);
// With priority (higher = processed earlier)
await outbox.enqueue(
channel: 'analytics',
payload: {'event': 'page_view'},
priority: -1, // Low priority
);
// Delayed execution
await outbox.enqueue(
channel: 'notifications',
payload: {'message': 'Reminder'},
notBefore: DateTime.now().add(const Duration(hours: 1)),
);
Retry Policy #
By default, durable_outbox uses Decorrelated Jitter Backoff for retry scheduling:
final outbox = DurableOutbox(
store: SqliteStore(dbPath: dbPath),
transport: HttpTransport(/* ... */),
config: OutboxConfig(
retry: RetryPolicy(
baseDelay: const Duration(milliseconds: 500),
maxDelay: const Duration(seconds: 60),
maxAttempts: 8,
),
),
);
Retry triggers:
- Network errors (
SocketException,HttpException) - Timeout errors (
TimeoutException) - HTTP status codes: 429 (Too Many Requests), 5xx (Server Errors), 408 (Request Timeout)
No retry on:
- 4xx client errors (except 429 and 408)
- Permanent failures
Idempotency #
Each entry can have an idempotencyKey to prevent duplicate processing:
await outbox.enqueue(
channel: 'orders',
payload: {'action': 'create', 'orderId': 'o-123'},
idempotencyKey: 'orders:o-123',
);
The transport automatically adds the Idempotency-Key header. On 409 (conflict) or server indication of "already processed", the entry is marked as done without retrying.
Channels #
Organize entries into logical queues using channels:
// Orders channel
await outbox.enqueue(
channel: 'orders',
payload: orderData,
);
// Analytics channel
await outbox.enqueue(
channel: 'analytics',
payload: analyticsData,
);
// Uploads channel
await outbox.enqueue(
channel: 'uploads',
payload: uploadData,
);
You can monitor and clear entries by channel:
// Watch count for specific channel
outbox.store.watchCount(channel: 'orders').listen((count) {
print('Orders queue: $count');
});
// Clear specific channel
await outbox.clear(channel: 'orders');
Pause and Resume #
Control processing based on network state or user actions:
// Pause processing (e.g., when network is unavailable)
outbox.pause();
// Resume processing (e.g., when network is restored)
outbox.resume();
// Manually trigger processing
await outbox.drain();
Monitoring #
Watch queue state and counts:
// Watch overall state
outbox.watch().listen((state) {
print('Paused: ${state.isPaused}');
print('Running: ${state.isRunning}');
print('Queued: ${state.queuedCount}');
print('Processing: ${state.processingCount}');
print('Failed: ${state.failedCount}');
});
// Watch queue count
outbox.store.watchCount(channel: 'orders').listen((count) {
// Update UI badge
setState(() {
pendingOrdersCount = count;
});
});
⚙️ Configuration #
Full Configuration Example #
final outbox = DurableOutbox(
store: SqliteStore(dbPath: dbPath),
transport: HttpTransport(
endpoint: Uri.parse('https://api.example.com/outbox'),
authHeaders: () async => {
'Authorization': 'Bearer $token',
'X-API-Key': apiKey,
},
client: yourHttpClient,
timeout: const Duration(seconds: 15),
method: 'POST',
sendAsJson: true,
),
config: OutboxConfig(
retry: RetryPolicy(
baseDelay: const Duration(milliseconds: 500),
maxDelay: const Duration(seconds: 60),
maxAttempts: 8,
),
concurrency: 3, // Parallel processing tasks
autoStart: true, // Start processing on enqueue
lockTimeout: const Duration(minutes: 5), // Protection against hangs
heartbeat: const Duration(seconds: 1), // Processing tick interval
pauseOnNoNetwork: false, // Optional network monitoring
),
metrics: ConsoleMetricsSink(), // Optional metrics
);
Stores #
SQLite Store (Mobile/Desktop)
Persistent storage using SQLite:
final store = SqliteStore(dbPath: '/path/to/outbox.db');
await store.init();
Memory Store (Testing)
In-memory storage for testing:
final store = MemoryStore();
await store.init();
Transports #
HTTP Transport
Send entries via HTTP/HTTPS:
final transport = HttpTransport(
endpoint: Uri.parse('https://api.example.com/outbox'),
authHeaders: () async => {'Authorization': 'Bearer $token'},
client: yourHttpClient,
timeout: const Duration(seconds: 15),
method: 'POST',
sendAsJson: true,
);
Custom Transport
Create your own transport implementation:
class GrpcTransport implements OutboxTransport {
@override
Future<SendResult> send(OutboxEntry entry) async {
// Custom gRPC implementation
try {
// Send via gRPC
final response = await grpcClient.send(entry.payload);
return const SendResult(success: true);
} catch (e) {
return SendResult(
success: false,
error: e.toString(),
);
}
}
}
📚 Examples #
See the example/ directory for complete examples:
quick_start.dart- Basic usage with SQLite storehttp_orders.dart- Order processing with multiple entriesanalytics_lowprio.dart- Low-priority analytics events with delayed start
Running Examples #
# Quick start example
dart run example/quick_start.dart
# HTTP orders example
dart run example/http_orders.dart
# Analytics example
dart run example/analytics_lowprio.dart
🔧 Advanced Usage #
Custom Metrics #
Track outbox metrics with custom sinks:
class CustomMetricsSink implements MetricsSink {
@override
void counter(String name, {int value = 1, Map<String, String>? tags}) {
// Send to your metrics service
metricsService.increment(name, value, tags: tags);
}
@override
void gauge(String name, double value, {Map<String, String>? tags}) {
// Record gauge metric
metricsService.gauge(name, value, tags: tags);
}
@override
void timing(String name, Duration duration, {Map<String, String>? tags}) {
// Record timing metric
metricsService.timing(name, duration, tags: tags);
}
}
final outbox = DurableOutbox(
// ...
metrics: CustomMetricsSink(),
);
Error Handling #
Handle processing errors:
// Watch for failed entries
outbox.watch().listen((state) {
if (state.failedCount > 0) {
// Handle failed entries
print('${state.failedCount} entries failed');
}
});
// Clear failed entries
await outbox.clear(channel: 'orders');
Lifecycle Management #
Properly initialize and dispose:
final outbox = DurableOutbox(/* ... */);
// Initialize
await outbox.init();
// Use outbox
await outbox.enqueue(/* ... */);
// Cleanup when done
await outbox.close();
🛡️ Reliability #
- Idempotency: Every entry can have an idempotency key to prevent duplicates
- Transactions: Store operations use transactions for consistency
- Stuck Entry Protection:
lockTimeoutandheartbeatrestart hung entries - Error Logging: Last error is stored with each entry for debugging
- Retry Safety: Retries only on transient errors, not permanent failures
🌐 Platform Support #
| Platform | Store | Status |
|---|---|---|
| Dart CLI / Server | SqliteStore (sqlite3) |
✅ Fully supported |
| Flutter Mobile | SqliteStore (sqlite3) |
✅ Fully supported |
| Flutter Desktop | SqliteStore (sqlite3) |
✅ Fully supported |
| Flutter Web | MemoryStore |
✅ Basic support |
| Testing | MemoryStore |
✅ Fully supported |
Note: For production web applications, consider implementing an IndexedDB store (planned for v0.2.0).
📖 API Reference #
DurableOutbox #
Main outbox facade class.
Methods:
Future<void> init()- Initialize the outboxFuture<String> enqueue({required String channel, required Object payload, Map<String, String>? headers, String? idempotencyKey, int priority = 0, DateTime? notBefore})- Enqueue an entryFuture<void> drain()- Manually trigger processingvoid pause()- Pause processingvoid resume()- Resume processingFuture<void> clear({String? channel})- Clear entriesStream<OutboxState> watch()- Watch outbox stateFuture<void> close()- Clean up resources
OutboxStore #
Storage abstraction interface.
Implementations:
SqliteStore- SQLite-based persistent storageMemoryStore- In-memory storage for testing
OutboxTransport #
Transport abstraction interface.
Implementations:
HttpTransport- HTTP/HTTPS transport
RetryPolicy #
Configuration for retry behavior.
Parameters:
baseDelay(default: 500ms) - Base delay for backoffmaxDelay(default: 60s) - Maximum delay between retriesmaxAttempts(default: 8) - Maximum number of retry attempts
🤝 Contributing #
Contributions are welcome! Please feel free to submit a Pull Request.
📄 License #
This project is licensed under the BSD 3-Clause License.
🙏 Acknowledgments #
Inspired by the outbox pattern and reliable queue systems. Built specifically for Dart/Flutter with focus on offline-first applications, guaranteed delivery, and cross-platform consistency.
Made with ❤️ for the Dart community