gossip_crdts 1.0.4  gossip_crdts: ^1.0.4 copied to clipboard
gossip_crdts: ^1.0.4 copied to clipboard
CRDT (Conflict-free Replicated Data Types) extensions for the gossip protocol library with support for distributed data structures
CRDT Extensions for Gossip Protocol Library #
A Dart library that extends the gossip protocol library with Conflict-free Replicated Data Types (CRDTs), enabling automatic conflict resolution and convergent state synchronization across distributed nodes.
Features #
- 🔀 Seamless Integration: Extends existing GossipNode instances with CRDT capabilities
- 🧮 Multiple CRDT Types: Counters, Sets, Registers, and Maps with conflict-free semantics
- 🔄 Automatic Synchronization: CRDTs sync automatically through the gossip protocol
- 🛡️ Type Safety: Strongly typed CRDT operations with compile-time guarantees
- 💾 Pluggable Storage: Abstract storage interface for different persistence backends
- 📊 Event Streams: Real-time notifications for CRDT operations and updates
Installation #
Add this to your pubspec.yaml:
dependencies:
  gossip:
    git: https://github.com/da1nerd/gossip.git
  gossip_crdts:
    git: https://github.com/da1nerd/gossip-crdts.git
Then run:
dart pub get
Quick Start #
Basic Usage with GossipNode #
import 'package:gossip/gossip.dart';
import 'package:gossip_crdts/gossip_crdts.dart';
void main() async {
  // Create a regular gossip node
  final node = GossipNode(
    config: GossipConfig(nodeId: 'node1'),
    eventStore: MemoryEventStore(),
    transport: MyTransport(),
  );
  await node.start();
  // Enable CRDT support
  final crdtManager = await node.enableCRDTSupport();
  // Register and use a counter CRDT
  final counter = GCounter('page-views');
  await crdtManager.register(counter);
  await crdtManager.performOperation('page-views', 'increment', {'amount': 5});
  print('Counter value: ${counter.value}'); // Counter value: 5
  await node.stop();
}
Using the Unified Interface #
import 'package:gossip/gossip.dart';
import 'package:gossip_crdts/gossip_crdts.dart';
void main() async {
  // Create a gossip node with CRDT support
  final gossipNode = GossipNode(/* ... */);
  final crdtNode = await gossipNode.withCRDTSupport();
  await crdtNode.start();
  // Use both gossip and CRDT functionality
  await crdtNode.createEvent({'type': 'message', 'content': 'hello'});
  
  final counter = GCounter('likes');
  await crdtNode.registerCRDT(counter);
  await crdtNode.performCRDTOperation('likes', 'increment', {'amount': 1});
  await crdtNode.close();
}
Simple Gossip Node Integration #
import 'package:gossip/gossip.dart';
import 'package:gossip_crdts/gossip_crdts.dart';
void main() async {
  final gossipNode = GossipNode(/* ... */);
  final crdtNode = await gossipNode.withCRDTSupport();
  await crdtNode.initialize();
  await crdtNode.startGossiping();
  // Use CRDTs with gossip protocol
  final set = GSet<String>('user-tags');
  await crdtNode.registerCRDT(set);
  await crdtNode.performCRDTOperation('user-tags', 'add', {'element': 'music'});
  await crdtNode.close();
}
Supported CRDT Types #
Counters #
G-Counter (Grow-only Counter)
Perfect for metrics that only increase:
final counter = GCounter('page-views');
await crdtManager.register(counter);
// Only increment operations are supported
await crdtManager.performOperation('page-views', 'increment', {'amount': 1});
print(counter.value); // Always positive, monotonically increasing
PN-Counter (Increment/Decrement Counter)
Supports both increment and decrement operations:
final votes = PNCounter('vote-tally');
await crdtManager.register(votes);
// Both increment and decrement are supported
await crdtManager.performOperation('vote-tally', 'increment', {'amount': 5});
await crdtManager.performOperation('vote-tally', 'decrement', {'amount': 2});
print(votes.value); // 3
print('Upvotes: ${votes.totalPositive}, Downvotes: ${votes.totalNegative}');
Sets #
G-Set (Grow-only Set)
A set that can only have elements added:
final tags = GSet<String>('user-tags');
await crdtManager.register(tags);
await crdtManager.performOperation('user-tags', 'add', {'element': 'music'});
await crdtManager.performOperation('user-tags', 'add', {'element': 'sports'});
print(tags.value); // {'music', 'sports'}
print('Contains music: ${tags.contains('music')}'); // true
OR-Set (Observed-Remove Set)
A set that supports both add and remove operations:
final items = ORSet<String>('shopping-cart');
await crdtManager.register(items);
await crdtManager.performOperation('shopping-cart', 'add', {'element': 'apple'});
await crdtManager.performOperation('shopping-cart', 'add', {'element': 'banana'});
await crdtManager.performOperation('shopping-cart', 'remove', {'element': 'apple'});
print(items.value); // {'banana'}
Registers #
LWW-Register (Last-Writer-Wins Register)
A single-value register where the most recent update wins:
final status = LWWRegister<String>('user-status');
await crdtManager.register(status);
await crdtManager.performOperation('user-status', 'set', {
  'value': 'online',
  'timestamp': DateTime.now().millisecondsSinceEpoch,
});
print(status.value); // 'online'
MV-Register (Multi-Value Register)
A register that preserves concurrent updates:
final config = MVRegister<String>('app-theme');
await crdtManager.register(config);
// Concurrent updates are preserved
await crdtManager.performOperation('app-theme', 'set', {
  'value': 'dark',
  'vectorClock': {'node1': 1},
});
print(config.hasConflict); // true if concurrent updates exist
print(config.values); // Set of all concurrent values
Maps #
LWW-Map (Last-Writer-Wins Map)
A map where the most recent update wins for each key:
final settings = LWWMap<String, String>('user-settings');
await crdtManager.register(settings);
await crdtManager.performOperation('user-settings', 'put', {
  'key': 'theme',
  'value': 'dark',
  'timestamp': DateTime.now().millisecondsSinceEpoch,
});
print(settings['theme']); // 'dark'
OR-Map (Observed-Remove Map)
A map that supports both add and remove operations with CRDT values:
final counters = ORMap<String, GCounter>('user-counters', 
  crdtFactory: (id, type) => GCounter(id));
await crdtManager.register(counters);
await crdtManager.performOperation('user-counters', 'add', {
  'key': 'user1',
  'crdtType': 'GCounter',
  'crdtId': 'user1-counter'
});
Sequences #
RGA Array (Replicated Growable Array)
Perfect for collaborative text editing and ordered sequences:
final text = RGAArray<String>('shared-document');
await crdtManager.register(text);
// Insert characters for collaborative text editing
await crdtManager.performOperation('shared-document', 'insert', {
  'index': 0,
  'element': 'H',
});
await crdtManager.performOperation('shared-document', 'insert', {
  'index': 1,
  'element': 'i',
});
print(text.getText()); // 'Hi'
print(text.length); // 2
// Delete characters
await crdtManager.performOperation('shared-document', 'delete', {
  'index': 1,
});
print(text.getText()); // 'H'
Flags #
Enable-Wins Flag
A boolean flag where enable operations always win over disable operations:
final feature = EnableWinsFlag('feature-x-enabled');
await crdtManager.register(feature);
await crdtManager.performOperation('feature-x-enabled', 'enable', {});
print(feature.isEnabled); // true
// Even if another node disables concurrently, enable wins
await crdtManager.performOperation('feature-x-enabled', 'disable', {});
// After merge, flag remains enabled if any replica enabled it
Event Streams and Monitoring #
Subscribe to CRDT events for real-time updates:
// Listen to all CRDT operations
crdtManager.onOperation.listen((operationEvent) {
  print('Operation: ${operationEvent.operation.operation} '
        'on ${operationEvent.operation.crdtId} '
        'from ${operationEvent.source}');
});
// Listen to CRDT updates
crdtManager.onUpdate.listen((updateEvent) {
  print('CRDT ${updateEvent.crdtId} updated: ${updateEvent.type}');
});
// Listen to synchronization events
crdtManager.onSync.listen((syncEvent) {
  print('Sync ${syncEvent.type} with peer ${syncEvent.peerId}: '
        '${syncEvent.crdtCount} CRDTs');
});
Custom Storage Backends #
Implement custom storage for production use:
class DatabaseCRDTStore implements CRDTStore {
  final Database db;
  
  DatabaseCRDTStore(this.db);
  
  @override
  Future<void> saveCRDT(CRDT crdt) async {
    final state = crdt.getState();
    await db.execute('''
      INSERT OR REPLACE INTO crdts (id, type, state) 
      VALUES (?, ?, ?)
    ''', [crdt.id, crdt.type, jsonEncode(state)]);
  }
  
  @override
  Future<Map<String, dynamic>?> loadCRDTState(String crdtId) async {
    final result = await db.query('SELECT state FROM crdts WHERE id = ?', [crdtId]);
    if (result.isEmpty) return null;
    return jsonDecode(result.first['state']);
  }
  
  // Implement other methods...
}
// Use custom storage
final customStore = DatabaseCRDTStore(myDatabase);
final crdtManager = await node.enableCRDTSupport(crdtStore: customStore);
Distributed Counter Example #
Here's a complete example of multiple nodes collaborating on shared counters:
import 'package:gossip/gossip.dart';
import 'package:gossip_crdts/gossip_crdts.dart';
void main() async {
  // Create multiple nodes
  final nodes = <CRDTEnabledGossipNode>[];
  
  for (int i = 1; i <= 3; i++) {
    final gossipNode = GossipNode(
      config: GossipConfig(nodeId: 'node-$i'),
      eventStore: MemoryEventStore(),
      transport: MyTransport(),
    );
    
    final crdtNode = await gossipNode.withCRDTSupport();
    nodes.add(crdtNode);
    await crdtNode.start();
  }
  // Register shared counter on all nodes
  for (final node in nodes) {
    final counter = GCounter('shared-counter');
    await node.registerCRDT(counter);
  }
  // Each node increments the counter
  for (int i = 0; i < nodes.length; i++) {
    await nodes[i].performCRDTOperation(
      'shared-counter', 
      'increment', 
      {'amount': (i + 1) * 10}
    );
  }
  // Wait for synchronization
  await Future.delayed(Duration(seconds: 2));
  // All nodes should show the same total: 60 (10 + 20 + 30)
  for (final node in nodes) {
    final counter = node.getCRDT<GCounter>('shared-counter');
    print('${node.config.nodeId}: ${counter?.value}'); // All show 60
  }
  // Clean up
  for (final node in nodes) {
    await node.close();
  }
}
Architecture #
The CRDT extension follows a clean architecture that integrates seamlessly with the gossip protocol:
┌─────────────────────┐
│    Application      │
├─────────────────────┤
│  CRDTManager /      │
│  CRDTEnabledNode    │  ← CRDT coordination layer
├─────────────────────┤
│   GossipNode        │  ← Existing gossip protocol
├─────────────────────┤
│  CRDT Implementations │
│  - GCounter         │
│  - PNCounter        │  ← Individual CRDT types
│  - GSet             │
│  - ...              │
├─────────────────────┤
│   CRDTStore         │  ← Pluggable storage
└─────────────────────┘
Best Practices #
Choosing CRDT Types #
- G-Counter: Use for metrics that only increase (views, downloads, likes)
- PN-Counter: Use for values that can go up or down (votes, scores, inventory)
- G-Set: Use for collections that only grow (tags, features, flags)
- OR-Set: Use for collections that need both add and remove (shopping carts, user lists)
- LWW-Register: Use for single values where latest update should win (user status, configuration)
- MV-Register: Use when you need to detect and handle concurrent updates manually
- LWW-Map: Use for key-value storage where latest update per key should win
- OR-Map: Use for maps with CRDT values that need add/remove operations
- RGA Array: Use for collaborative text editing and ordered sequences
- Enable-Wins Flag: Use for boolean flags where "enabled" should dominate
Performance Considerations #
- Operation Frequency: Higher operation frequency requires more network bandwidth
- CRDT Size: Large CRDTs take more time to synchronize
- Network Partitions: CRDTs handle partitions gracefully but may have temporary inconsistencies
Error Handling #
try {
  await crdtManager.performOperation('my-counter', 'increment', {'amount': 5});
} on CRDTException catch (e) {
  print('CRDT operation failed: ${e.message}');
  // Handle CRDT-specific errors
} on GossipException catch (e) {
  print('Gossip operation failed: ${e.message}');
  // Handle gossip protocol errors
}
Storage Recommendations #
- Development: Use MemoryCRDTStore(no persistence)
- Testing: Use MemoryCRDTStorefor clean test isolation
- Production: Implement custom storage backend with proper persistence
Testing #
Run the test suite:
dart test
The test suite includes:
- Unit tests for all CRDT implementations
- Integration tests with the gossip protocol
- Storage backend tests
- Convergence property tests
Examples #
See the /example directory for complete working examples:
- Collaborative Counter: Multiple nodes incrementing shared counters
- Distributed Set: Nodes adding elements to shared sets
- Chat Application: Real-time chat using CRDTs for message ordering
Contributing #
- Fork the repository
- Create a feature branch (git checkout -b feature/new-crdt-type)
- Add tests for your changes
- Ensure all tests pass (dart test)
- Commit your changes (git commit -am 'Add new CRDT type')
- Push to the branch (git push origin feature/new-crdt-type)
- Create a Pull Request
Roadmap #
- ✅ OR-Set: Observed-Remove Set with add and remove operations ✓
- ✅ LWW-Register: Last-Writer-Wins Register for single values ✓
- ✅ MV-Register: Multi-Value Register preserving concurrent updates ✓
- ✅ OR-Map: Observed-Remove Map with CRDT values ✓
- ✅ LWW-Map: Last-Writer-Wins Map for key-value storage ✓
- ✅ RGA Array: Replicated Growable Array for collaborative text editing ✓
- ✅ Enable-Wins Flag: Boolean flag CRDT for feature toggles ✓
- ❌ Sequence CRDT: Generic ordered sequence operations
- ❌ Causal Tree: Alternative text editing CRDT
- ❌ File-based Storage: Built-in file system storage backend
- ❌ Compression: Optional compression for large CRDT states
- ❌ Delta Synchronization: Send only changes instead of full state
- ❌ CRDT Factory Registry: Automatic CRDT reconstruction from state
License #
This project is licensed under the MIT License - see the LICENSE file for details.