synchronize method

A global sync that can coordinate across all managers.

Implementation

Future<DatumSyncResult<DatumEntityInterface>> synchronize(
  String userId, {
  DatumSyncOptions<DatumEntityInterface>? options,
}) async {
  final snapshot = _getSnapshot(userId);
  if (snapshot.status == DatumSyncStatus.syncing) {
    logger.info('[Global] Sync for user $userId skipped: another global sync is already in progress.');
    return DatumSyncResult.skipped(userId, snapshot.pendingOperations);
  }

  final stopwatch = Stopwatch()..start();
  _updateSnapshot(userId, (s) => s.copyWith(status: DatumSyncStatus.syncing));
  for (final observer in globalObservers) {
    observer.onSyncStart();
  }

  var totalSynced = 0;
  var totalFailed = 0;
  var totalConflicts = 0;
  final allPending = <DatumSyncOperation<DatumEntityInterface>>[];

  try {
    final direction = options?.direction ?? config.defaultSyncDirection;
    final pushResults = <DatumSyncResult<DatumEntityInterface>>[];
    final pullResults = <DatumSyncResult<DatumEntityInterface>>[];

    switch (direction) {
      case SyncDirection.pushThenPull:
        pushResults.addAll(await _pushChanges(userId, options));
        pullResults.addAll(await _pullChanges(userId, options));
        for (final res in pushResults) {
          totalSynced += res.syncedCount;
          totalFailed += res.failedCount;
          allPending.addAll(res.pendingOperations);
        }
        for (final res in pullResults) {
          totalSynced += res.syncedCount;
          totalConflicts += res.conflictsResolved;
          allPending.addAll(res.pendingOperations);
        }
      case SyncDirection.pullThenPush:
        pullResults.addAll(await _pullChanges(userId, options));
        for (final res in pullResults) {
          totalSynced += res.syncedCount;
          totalFailed += res.failedCount;
          totalConflicts += res.conflictsResolved;
          allPending.addAll(res.pendingOperations);
        }
        pushResults.addAll(await _pushChanges(userId, options));
        for (final res in pushResults) {
          totalSynced += res.syncedCount;
          totalFailed += res.failedCount;
          allPending.addAll(res.pendingOperations);
        }
      case SyncDirection.pushOnly:
        pushResults.addAll(await _pushChanges(userId, options));
        for (final res in pushResults) {
          totalSynced += res.syncedCount;
          totalFailed += res.failedCount;
          allPending.addAll(res.pendingOperations);
        }
      case SyncDirection.pullOnly:
        pullResults.addAll(await _pullChanges(userId, options));
        for (final res in pullResults) {
          totalSynced += res.syncedCount;
          totalFailed += res.failedCount;
          totalConflicts += res.conflictsResolved;
          allPending.addAll(res.pendingOperations);
        }
    }

    final result = DatumSyncResult<DatumEntityInterface>(
      userId: userId,
      duration: stopwatch.elapsed,
      syncedCount: totalSynced,
      failedCount: totalFailed,
      conflictsResolved: totalConflicts,
      pendingOperations: allPending,
    );

    _updateSnapshot(userId, (s) => s.copyWith(status: DatumSyncStatus.completed, lastCompletedAt: DateTime.now()));
    for (final observer in globalObservers) {
      observer.onSyncEnd(result);
    }

    return result;
  } catch (e, stack) {
    logger.error('Synchronization failed for user $userId', stack);
    _updateSnapshot(userId, (s) => s.copyWith(status: DatumSyncStatus.failed, errors: [e]));
    return Future.error(e, stack);
  }
}