synchronize method
Future<DatumSyncResult<DatumEntityInterface> >
synchronize(
- String userId, {
- DatumSyncOptions<
DatumEntityInterface> ? options,
A global sync that can coordinate across all managers.
Implementation
Future<DatumSyncResult<DatumEntityInterface>> synchronize(
String userId, {
DatumSyncOptions<DatumEntityInterface>? options,
}) async {
final snapshot = _getSnapshot(userId);
if (snapshot.status == DatumSyncStatus.syncing) {
logger.info('[Global] Sync for user $userId skipped: another global sync is already in progress.');
return DatumSyncResult.skipped(userId, snapshot.pendingOperations);
}
final stopwatch = Stopwatch()..start();
_updateSnapshot(userId, (s) => s.copyWith(status: DatumSyncStatus.syncing));
for (final observer in globalObservers) {
observer.onSyncStart();
}
var totalSynced = 0;
var totalFailed = 0;
var totalConflicts = 0;
final allPending = <DatumSyncOperation<DatumEntityInterface>>[];
try {
final direction = options?.direction ?? config.defaultSyncDirection;
final pushResults = <DatumSyncResult<DatumEntityInterface>>[];
final pullResults = <DatumSyncResult<DatumEntityInterface>>[];
switch (direction) {
case SyncDirection.pushThenPull:
pushResults.addAll(await _pushChanges(userId, options));
pullResults.addAll(await _pullChanges(userId, options));
for (final res in pushResults) {
totalSynced += res.syncedCount;
totalFailed += res.failedCount;
allPending.addAll(res.pendingOperations);
}
for (final res in pullResults) {
totalSynced += res.syncedCount;
totalConflicts += res.conflictsResolved;
allPending.addAll(res.pendingOperations);
}
case SyncDirection.pullThenPush:
pullResults.addAll(await _pullChanges(userId, options));
for (final res in pullResults) {
totalSynced += res.syncedCount;
totalFailed += res.failedCount;
totalConflicts += res.conflictsResolved;
allPending.addAll(res.pendingOperations);
}
pushResults.addAll(await _pushChanges(userId, options));
for (final res in pushResults) {
totalSynced += res.syncedCount;
totalFailed += res.failedCount;
allPending.addAll(res.pendingOperations);
}
case SyncDirection.pushOnly:
pushResults.addAll(await _pushChanges(userId, options));
for (final res in pushResults) {
totalSynced += res.syncedCount;
totalFailed += res.failedCount;
allPending.addAll(res.pendingOperations);
}
case SyncDirection.pullOnly:
pullResults.addAll(await _pullChanges(userId, options));
for (final res in pullResults) {
totalSynced += res.syncedCount;
totalFailed += res.failedCount;
totalConflicts += res.conflictsResolved;
allPending.addAll(res.pendingOperations);
}
}
final result = DatumSyncResult<DatumEntityInterface>(
userId: userId,
duration: stopwatch.elapsed,
syncedCount: totalSynced,
failedCount: totalFailed,
conflictsResolved: totalConflicts,
pendingOperations: allPending,
);
_updateSnapshot(userId, (s) => s.copyWith(status: DatumSyncStatus.completed, lastCompletedAt: DateTime.now()));
for (final observer in globalObservers) {
observer.onSyncEnd(result);
}
return result;
} catch (e, stack) {
logger.error('Synchronization failed for user $userId', stack);
_updateSnapshot(userId, (s) => s.copyWith(status: DatumSyncStatus.failed, errors: [e]));
return Future.error(e, stack);
}
}