synchronize method
Future<DatumSyncResult<T> >
synchronize(
- String userId, {
- DatumSyncOptions<
DatumEntityInterface> ? options, - DatumSyncScope? scope,
Implementation
Future<DatumSyncResult<T>> synchronize(
String userId, {
DatumSyncOptions<DatumEntityInterface>? options,
DatumSyncScope? scope,
}) async {
_ensureInitialized();
_logger.info('🔄 [${T.toString()}] Starting sync for user: $userId');
return _syncRequestStrategy.execute(
() async {
if (_isSyncPaused) {
_logger.info('Sync for user $userId skipped: manager is paused.');
return DatumSyncResult.skipped(
userId,
await getPendingCount(userId),
reason: 'Sync is paused',
);
}
// Handle user switching logic before proceeding with synchronization.
if (_syncEngineInstance.lastActiveUserId != null && _syncEngineInstance.lastActiveUserId != userId) {
if (config.defaultUserSwitchStrategy == UserSwitchStrategy.promptIfUnsyncedData) {
final oldUserOps = await _queueManager.getPending(
_syncEngineInstance.lastActiveUserId ?? '',
);
if (oldUserOps.isNotEmpty) {
throw UserSwitchException(
oldUserId: _syncEngineInstance.lastActiveUserId,
newUserId: userId,
message: 'Cannot switch user while unsynced data exists for the previous user.',
);
}
}
// Other strategies like syncThenSwitch or clearAndFetch would be handled here.
}
try {
// Merge provided options with defaults from config
final mergedOptions = _mergeSyncOptions(options);
// Convert options to the correct type if needed.
// This handles cases where options might be passed with a different generic type from Datum.
var typedOptions = mergedOptions != null
? DatumSyncOptions<T>(
includeDeletes: mergedOptions.includeDeletes,
resolveConflicts: mergedOptions.resolveConflicts,
forceFullSync: mergedOptions.forceFullSync,
overrideBatchSize: mergedOptions.overrideBatchSize,
timeout: mergedOptions.timeout,
direction: mergedOptions.direction,
conflictResolver: mergedOptions.conflictResolver is DatumConflictResolver<T> ? mergedOptions.conflictResolver as DatumConflictResolver<T> : null,
query: mergedOptions.query,
)
: null;
// Allow custom sync direction resolution via callback
final pendingCount = await getPendingCount(userId);
final currentDirection = typedOptions?.direction ?? config.defaultSyncDirection;
if (config.syncDirectionResolver != null) {
final resolvedDirection = config.syncDirectionResolver!(pendingCount, currentDirection);
if (resolvedDirection != null && resolvedDirection != currentDirection) {
_logger.debug('Custom sync direction resolver changed direction from $currentDirection to $resolvedDirection for user $userId');
final optimizedOptions = typedOptions?.copyWith(direction: resolvedDirection) ?? DatumSyncOptions<T>(direction: resolvedDirection);
typedOptions = optimizedOptions;
}
}
// If no scope is provided but options contain a query, create a scope from the query
DatumSyncScope? effectiveScope = scope;
if (effectiveScope == null && typedOptions?.query != null && typedOptions!.query != const DatumQuery()) {
effectiveScope = DatumSyncScope(query: typedOptions.query);
}
// Check if sync should be skipped based on final direction and pending operations
final finalDirection = typedOptions?.direction ?? config.defaultSyncDirection;
if (finalDirection == SyncDirection.pushOnly && pendingCount == 0) {
// If the direction is pushOnly and there are no pending operations,
// we can skip the sync entirely for this manager.
_logger.info('Push-only sync for user $userId skipped: no pending operations.');
return DatumSyncResult.skipped(userId, 0);
}
final (result, events) = await _syncEngineInstance.synchronize(
userId,
options: typedOptions,
scope: effectiveScope,
);
_processSyncEvents(events);
// Persist the result of the sync operation.
if (!result.wasSkipped) {
await localAdapter.saveLastSyncResult(userId, result);
// Also update sync metadata in persistence
final metadata = await localAdapter.getSyncMetadata(userId);
if (metadata != null) {
await persistence?.saveSyncMetadata(userId, metadata);
}
}
return result;
} on Object catch (e, _) {
// If it's a SyncExceptionWithEvents, process events and throw the original error
if (e is SyncExceptionWithEvents<T>) {
_processSyncEvents(e.events);
throw e.originalError;
}
// Otherwise, re-throw the original error
rethrow;
}
},
isSyncInProgress: () => _syncEngineInstance.isSyncing,
onSkipped: () {
_logger.info('Sync for user $userId skipped: another sync is in progress.');
return DatumSyncResult.skipped(
userId,
0, // Can't reliably get pending count here without async, so default to 0.
reason: 'Sync in progress',
);
},
);
}