synchronize method

Future<DatumSyncResult<T>> synchronize(
  1. String userId, {
  2. DatumSyncOptions<DatumEntityInterface>? options,
  3. DatumSyncScope? scope,
})

Implementation

Future<DatumSyncResult<T>> synchronize(
  String userId, {
  DatumSyncOptions<DatumEntityInterface>? options,
  DatumSyncScope? scope,
}) async {
  _ensureInitialized();

  _logger.info('🔄 [${T.toString()}] Starting sync for user: $userId');

  return _syncRequestStrategy.execute(
    () async {
      if (_isSyncPaused) {
        _logger.info('Sync for user $userId skipped: manager is paused.');
        return DatumSyncResult.skipped(
          userId,
          await getPendingCount(userId),
          reason: 'Sync is paused',
        );
      }

      // Handle user switching logic before proceeding with synchronization.
      if (_syncEngineInstance.lastActiveUserId != null && _syncEngineInstance.lastActiveUserId != userId) {
        if (config.defaultUserSwitchStrategy == UserSwitchStrategy.promptIfUnsyncedData) {
          final oldUserOps = await _queueManager.getPending(
            _syncEngineInstance.lastActiveUserId ?? '',
          );
          if (oldUserOps.isNotEmpty) {
            throw UserSwitchException(
              oldUserId: _syncEngineInstance.lastActiveUserId,
              newUserId: userId,
              message: 'Cannot switch user while unsynced data exists for the previous user.',
            );
          }
        }
        // Other strategies like syncThenSwitch or clearAndFetch would be handled here.
      }

      try {
        // Merge provided options with defaults from config
        final mergedOptions = _mergeSyncOptions(options);

        // Convert options to the correct type if needed.
        // This handles cases where options might be passed with a different generic type from Datum.
        var typedOptions = mergedOptions != null
            ? DatumSyncOptions<T>(
                includeDeletes: mergedOptions.includeDeletes,
                resolveConflicts: mergedOptions.resolveConflicts,
                forceFullSync: mergedOptions.forceFullSync,
                overrideBatchSize: mergedOptions.overrideBatchSize,
                timeout: mergedOptions.timeout,
                direction: mergedOptions.direction,
                conflictResolver: mergedOptions.conflictResolver is DatumConflictResolver<T> ? mergedOptions.conflictResolver as DatumConflictResolver<T> : null,
                query: mergedOptions.query,
              )
            : null;

        // Allow custom sync direction resolution via callback
        final pendingCount = await getPendingCount(userId);
        final currentDirection = typedOptions?.direction ?? config.defaultSyncDirection;

        if (config.syncDirectionResolver != null) {
          final resolvedDirection = config.syncDirectionResolver!(pendingCount, currentDirection);
          if (resolvedDirection != null && resolvedDirection != currentDirection) {
            _logger.debug('Custom sync direction resolver changed direction from $currentDirection to $resolvedDirection for user $userId');
            final optimizedOptions = typedOptions?.copyWith(direction: resolvedDirection) ?? DatumSyncOptions<T>(direction: resolvedDirection);
            typedOptions = optimizedOptions;
          }
        }

        // If no scope is provided but options contain a query, create a scope from the query
        DatumSyncScope? effectiveScope = scope;
        if (effectiveScope == null && typedOptions?.query != null && typedOptions!.query != const DatumQuery()) {
          effectiveScope = DatumSyncScope(query: typedOptions.query);
        }

        // Check if sync should be skipped based on final direction and pending operations
        final finalDirection = typedOptions?.direction ?? config.defaultSyncDirection;
        if (finalDirection == SyncDirection.pushOnly && pendingCount == 0) {
          // If the direction is pushOnly and there are no pending operations,
          // we can skip the sync entirely for this manager.
          _logger.info('Push-only sync for user $userId skipped: no pending operations.');
          return DatumSyncResult.skipped(userId, 0);
        }

        final (result, events) = await _syncEngineInstance.synchronize(
          userId,
          options: typedOptions,
          scope: effectiveScope,
        );
        _processSyncEvents(events);
        // Persist the result of the sync operation.
        if (!result.wasSkipped) {
          await localAdapter.saveLastSyncResult(userId, result);
          // Also update sync metadata in persistence
          final metadata = await localAdapter.getSyncMetadata(userId);
          if (metadata != null) {
            await persistence?.saveSyncMetadata(userId, metadata);
          }
        }
        return result;
      } on Object catch (e, _) {
        // If it's a SyncExceptionWithEvents, process events and throw the original error
        if (e is SyncExceptionWithEvents<T>) {
          _processSyncEvents(e.events);
          throw e.originalError;
        }
        // Otherwise, re-throw the original error
        rethrow;
      }
    },
    isSyncInProgress: () => _syncEngineInstance.isSyncing,
    onSkipped: () {
      _logger.info('Sync for user $userId skipped: another sync is in progress.');
      return DatumSyncResult.skipped(
        userId,
        0, // Can't reliably get pending count here without async, so default to 0.
        reason: 'Sync in progress',
      );
    },
  );
}