gossip_crdts 1.0.4
gossip_crdts: ^1.0.4 copied to clipboard
CRDT (Conflict-free Replicated Data Types) extensions for the gossip protocol library with support for distributed data structures
CRDT Extensions for Gossip Protocol Library #
A Dart library that extends the gossip protocol library with Conflict-free Replicated Data Types (CRDTs), enabling automatic conflict resolution and convergent state synchronization across distributed nodes.
Features #
- 🔀 Seamless Integration: Extends existing GossipNode instances with CRDT capabilities
- 🧮 Multiple CRDT Types: Counters, Sets, Registers, and Maps with conflict-free semantics
- 🔄 Automatic Synchronization: CRDTs sync automatically through the gossip protocol
- 🛡️ Type Safety: Strongly typed CRDT operations with compile-time guarantees
- 💾 Pluggable Storage: Abstract storage interface for different persistence backends
- 📊 Event Streams: Real-time notifications for CRDT operations and updates
Installation #
Add this to your pubspec.yaml:
dependencies:
gossip:
git: https://github.com/da1nerd/gossip.git
gossip_crdts:
git: https://github.com/da1nerd/gossip-crdts.git
Then run:
dart pub get
Quick Start #
Basic Usage with GossipNode #
import 'package:gossip/gossip.dart';
import 'package:gossip_crdts/gossip_crdts.dart';
void main() async {
// Create a regular gossip node
final node = GossipNode(
config: GossipConfig(nodeId: 'node1'),
eventStore: MemoryEventStore(),
transport: MyTransport(),
);
await node.start();
// Enable CRDT support
final crdtManager = await node.enableCRDTSupport();
// Register and use a counter CRDT
final counter = GCounter('page-views');
await crdtManager.register(counter);
await crdtManager.performOperation('page-views', 'increment', {'amount': 5});
print('Counter value: ${counter.value}'); // Counter value: 5
await node.stop();
}
Using the Unified Interface #
import 'package:gossip/gossip.dart';
import 'package:gossip_crdts/gossip_crdts.dart';
void main() async {
// Create a gossip node with CRDT support
final gossipNode = GossipNode(/* ... */);
final crdtNode = await gossipNode.withCRDTSupport();
await crdtNode.start();
// Use both gossip and CRDT functionality
await crdtNode.createEvent({'type': 'message', 'content': 'hello'});
final counter = GCounter('likes');
await crdtNode.registerCRDT(counter);
await crdtNode.performCRDTOperation('likes', 'increment', {'amount': 1});
await crdtNode.close();
}
Simple Gossip Node Integration #
import 'package:gossip/gossip.dart';
import 'package:gossip_crdts/gossip_crdts.dart';
void main() async {
final gossipNode = GossipNode(/* ... */);
final crdtNode = await gossipNode.withCRDTSupport();
await crdtNode.initialize();
await crdtNode.startGossiping();
// Use CRDTs with gossip protocol
final set = GSet<String>('user-tags');
await crdtNode.registerCRDT(set);
await crdtNode.performCRDTOperation('user-tags', 'add', {'element': 'music'});
await crdtNode.close();
}
Supported CRDT Types #
Counters #
G-Counter (Grow-only Counter)
Perfect for metrics that only increase:
final counter = GCounter('page-views');
await crdtManager.register(counter);
// Only increment operations are supported
await crdtManager.performOperation('page-views', 'increment', {'amount': 1});
print(counter.value); // Always positive, monotonically increasing
PN-Counter (Increment/Decrement Counter)
Supports both increment and decrement operations:
final votes = PNCounter('vote-tally');
await crdtManager.register(votes);
// Both increment and decrement are supported
await crdtManager.performOperation('vote-tally', 'increment', {'amount': 5});
await crdtManager.performOperation('vote-tally', 'decrement', {'amount': 2});
print(votes.value); // 3
print('Upvotes: ${votes.totalPositive}, Downvotes: ${votes.totalNegative}');
Sets #
G-Set (Grow-only Set)
A set that can only have elements added:
final tags = GSet<String>('user-tags');
await crdtManager.register(tags);
await crdtManager.performOperation('user-tags', 'add', {'element': 'music'});
await crdtManager.performOperation('user-tags', 'add', {'element': 'sports'});
print(tags.value); // {'music', 'sports'}
print('Contains music: ${tags.contains('music')}'); // true
OR-Set (Observed-Remove Set)
A set that supports both add and remove operations:
final items = ORSet<String>('shopping-cart');
await crdtManager.register(items);
await crdtManager.performOperation('shopping-cart', 'add', {'element': 'apple'});
await crdtManager.performOperation('shopping-cart', 'add', {'element': 'banana'});
await crdtManager.performOperation('shopping-cart', 'remove', {'element': 'apple'});
print(items.value); // {'banana'}
Registers #
LWW-Register (Last-Writer-Wins Register)
A single-value register where the most recent update wins:
final status = LWWRegister<String>('user-status');
await crdtManager.register(status);
await crdtManager.performOperation('user-status', 'set', {
'value': 'online',
'timestamp': DateTime.now().millisecondsSinceEpoch,
});
print(status.value); // 'online'
MV-Register (Multi-Value Register)
A register that preserves concurrent updates:
final config = MVRegister<String>('app-theme');
await crdtManager.register(config);
// Concurrent updates are preserved
await crdtManager.performOperation('app-theme', 'set', {
'value': 'dark',
'vectorClock': {'node1': 1},
});
print(config.hasConflict); // true if concurrent updates exist
print(config.values); // Set of all concurrent values
Maps #
LWW-Map (Last-Writer-Wins Map)
A map where the most recent update wins for each key:
final settings = LWWMap<String, String>('user-settings');
await crdtManager.register(settings);
await crdtManager.performOperation('user-settings', 'put', {
'key': 'theme',
'value': 'dark',
'timestamp': DateTime.now().millisecondsSinceEpoch,
});
print(settings['theme']); // 'dark'
OR-Map (Observed-Remove Map)
A map that supports both add and remove operations with CRDT values:
final counters = ORMap<String, GCounter>('user-counters',
crdtFactory: (id, type) => GCounter(id));
await crdtManager.register(counters);
await crdtManager.performOperation('user-counters', 'add', {
'key': 'user1',
'crdtType': 'GCounter',
'crdtId': 'user1-counter'
});
Sequences #
RGA Array (Replicated Growable Array)
Perfect for collaborative text editing and ordered sequences:
final text = RGAArray<String>('shared-document');
await crdtManager.register(text);
// Insert characters for collaborative text editing
await crdtManager.performOperation('shared-document', 'insert', {
'index': 0,
'element': 'H',
});
await crdtManager.performOperation('shared-document', 'insert', {
'index': 1,
'element': 'i',
});
print(text.getText()); // 'Hi'
print(text.length); // 2
// Delete characters
await crdtManager.performOperation('shared-document', 'delete', {
'index': 1,
});
print(text.getText()); // 'H'
Flags #
Enable-Wins Flag
A boolean flag where enable operations always win over disable operations:
final feature = EnableWinsFlag('feature-x-enabled');
await crdtManager.register(feature);
await crdtManager.performOperation('feature-x-enabled', 'enable', {});
print(feature.isEnabled); // true
// Even if another node disables concurrently, enable wins
await crdtManager.performOperation('feature-x-enabled', 'disable', {});
// After merge, flag remains enabled if any replica enabled it
Event Streams and Monitoring #
Subscribe to CRDT events for real-time updates:
// Listen to all CRDT operations
crdtManager.onOperation.listen((operationEvent) {
print('Operation: ${operationEvent.operation.operation} '
'on ${operationEvent.operation.crdtId} '
'from ${operationEvent.source}');
});
// Listen to CRDT updates
crdtManager.onUpdate.listen((updateEvent) {
print('CRDT ${updateEvent.crdtId} updated: ${updateEvent.type}');
});
// Listen to synchronization events
crdtManager.onSync.listen((syncEvent) {
print('Sync ${syncEvent.type} with peer ${syncEvent.peerId}: '
'${syncEvent.crdtCount} CRDTs');
});
Custom Storage Backends #
Implement custom storage for production use:
class DatabaseCRDTStore implements CRDTStore {
final Database db;
DatabaseCRDTStore(this.db);
@override
Future<void> saveCRDT(CRDT crdt) async {
final state = crdt.getState();
await db.execute('''
INSERT OR REPLACE INTO crdts (id, type, state)
VALUES (?, ?, ?)
''', [crdt.id, crdt.type, jsonEncode(state)]);
}
@override
Future<Map<String, dynamic>?> loadCRDTState(String crdtId) async {
final result = await db.query('SELECT state FROM crdts WHERE id = ?', [crdtId]);
if (result.isEmpty) return null;
return jsonDecode(result.first['state']);
}
// Implement other methods...
}
// Use custom storage
final customStore = DatabaseCRDTStore(myDatabase);
final crdtManager = await node.enableCRDTSupport(crdtStore: customStore);
Distributed Counter Example #
Here's a complete example of multiple nodes collaborating on shared counters:
import 'package:gossip/gossip.dart';
import 'package:gossip_crdts/gossip_crdts.dart';
void main() async {
// Create multiple nodes
final nodes = <CRDTEnabledGossipNode>[];
for (int i = 1; i <= 3; i++) {
final gossipNode = GossipNode(
config: GossipConfig(nodeId: 'node-$i'),
eventStore: MemoryEventStore(),
transport: MyTransport(),
);
final crdtNode = await gossipNode.withCRDTSupport();
nodes.add(crdtNode);
await crdtNode.start();
}
// Register shared counter on all nodes
for (final node in nodes) {
final counter = GCounter('shared-counter');
await node.registerCRDT(counter);
}
// Each node increments the counter
for (int i = 0; i < nodes.length; i++) {
await nodes[i].performCRDTOperation(
'shared-counter',
'increment',
{'amount': (i + 1) * 10}
);
}
// Wait for synchronization
await Future.delayed(Duration(seconds: 2));
// All nodes should show the same total: 60 (10 + 20 + 30)
for (final node in nodes) {
final counter = node.getCRDT<GCounter>('shared-counter');
print('${node.config.nodeId}: ${counter?.value}'); // All show 60
}
// Clean up
for (final node in nodes) {
await node.close();
}
}
Architecture #
The CRDT extension follows a clean architecture that integrates seamlessly with the gossip protocol:
┌─────────────────────┐
│ Application │
├─────────────────────┤
│ CRDTManager / │
│ CRDTEnabledNode │ ← CRDT coordination layer
├─────────────────────┤
│ GossipNode │ ← Existing gossip protocol
├─────────────────────┤
│ CRDT Implementations │
│ - GCounter │
│ - PNCounter │ ← Individual CRDT types
│ - GSet │
│ - ... │
├─────────────────────┤
│ CRDTStore │ ← Pluggable storage
└─────────────────────┘
Best Practices #
Choosing CRDT Types #
- G-Counter: Use for metrics that only increase (views, downloads, likes)
- PN-Counter: Use for values that can go up or down (votes, scores, inventory)
- G-Set: Use for collections that only grow (tags, features, flags)
- OR-Set: Use for collections that need both add and remove (shopping carts, user lists)
- LWW-Register: Use for single values where latest update should win (user status, configuration)
- MV-Register: Use when you need to detect and handle concurrent updates manually
- LWW-Map: Use for key-value storage where latest update per key should win
- OR-Map: Use for maps with CRDT values that need add/remove operations
- RGA Array: Use for collaborative text editing and ordered sequences
- Enable-Wins Flag: Use for boolean flags where "enabled" should dominate
Performance Considerations #
- Operation Frequency: Higher operation frequency requires more network bandwidth
- CRDT Size: Large CRDTs take more time to synchronize
- Network Partitions: CRDTs handle partitions gracefully but may have temporary inconsistencies
Error Handling #
try {
await crdtManager.performOperation('my-counter', 'increment', {'amount': 5});
} on CRDTException catch (e) {
print('CRDT operation failed: ${e.message}');
// Handle CRDT-specific errors
} on GossipException catch (e) {
print('Gossip operation failed: ${e.message}');
// Handle gossip protocol errors
}
Storage Recommendations #
- Development: Use
MemoryCRDTStore(no persistence) - Testing: Use
MemoryCRDTStorefor clean test isolation - Production: Implement custom storage backend with proper persistence
Testing #
Run the test suite:
dart test
The test suite includes:
- Unit tests for all CRDT implementations
- Integration tests with the gossip protocol
- Storage backend tests
- Convergence property tests
Examples #
See the /example directory for complete working examples:
- Collaborative Counter: Multiple nodes incrementing shared counters
- Distributed Set: Nodes adding elements to shared sets
- Chat Application: Real-time chat using CRDTs for message ordering
Contributing #
- Fork the repository
- Create a feature branch (
git checkout -b feature/new-crdt-type) - Add tests for your changes
- Ensure all tests pass (
dart test) - Commit your changes (
git commit -am 'Add new CRDT type') - Push to the branch (
git push origin feature/new-crdt-type) - Create a Pull Request
Roadmap #
- ✅ OR-Set: Observed-Remove Set with add and remove operations ✓
- ✅ LWW-Register: Last-Writer-Wins Register for single values ✓
- ✅ MV-Register: Multi-Value Register preserving concurrent updates ✓
- ✅ OR-Map: Observed-Remove Map with CRDT values ✓
- ✅ LWW-Map: Last-Writer-Wins Map for key-value storage ✓
- ✅ RGA Array: Replicated Growable Array for collaborative text editing ✓
- ✅ Enable-Wins Flag: Boolean flag CRDT for feature toggles ✓
- ❌ Sequence CRDT: Generic ordered sequence operations
- ❌ Causal Tree: Alternative text editing CRDT
- ❌ File-based Storage: Built-in file system storage backend
- ❌ Compression: Optional compression for large CRDT states
- ❌ Delta Synchronization: Send only changes instead of full state
- ❌ CRDT Factory Registry: Automatic CRDT reconstruction from state
License #
This project is licensed under the MIT License - see the LICENSE file for details.