asStream method
Stream implementation
Implementation
@override
Stream<TetherClientReturn<TModel>> asStream() {
if (type != SqlOperationType.select) {
return Stream.error(
UnsupportedError('Streaming is only supported for SELECT operations.'),
);
}
if (localQuery == null) {
return Stream.error(
ArgumentError('localQuery must be provided for streaming.'),
);
}
// 1. Build the local query.
final aliasedQuery = localQuery!.copyWith(
selectColumns: '${localQuery!.selectColumns} AS jsobjects',
);
final FinalSqlStatement builtQuery = aliasedQuery.build();
// 2. Create a stream that watches the local database.
// This stream will emit whenever the underlying data changes.
final localDataStream = localDb
.watch(builtQuery.sql, parameters: builtQuery.arguments)
.map((rows) {
final models = rows
.where((row) => row['jsobjects'] != null)
.map((row) => fromSqliteFactory(
createRowFromMap(jsonDecode(row['jsobjects'] as String))))
.toList();
return TetherClientReturn<TModel>(data: models);
});
// 3. Define the remote sync operation as a Future.
// This will fetch the remote count and upsert the data.
Future<int?> syncRemoteData() async {
if (isLocalOnly || !syncWithSupabase) return null;
try {
final remoteDataResponse =
await _resetPreferForCount().count(CountOption.exact);
await upsertSupabaseData(remoteDataResponse.data);
return remoteDataResponse.count;
} catch (e, s) {
// Propagate the error through the stream.
debugPrint('Error fetching remote data for stream: $e $s');
rethrow;
}
}
// 4. Use RxDart's `CombineLatestStream` to merge the local data stream
// with the result of the remote sync operation.
return CombineLatestStream.combine2(
localDataStream,
// Convert the Future into a stream that emits once.
// `startWith(null)` ensures the stream emits immediately with no count,
// allowing the local data to be shown first.
Stream.fromFuture(syncRemoteData()).startWith(null),
(localData, remoteCount) {
// The first emission will have remoteCount = null.
// Subsequent emissions from localDataStream will be combined with the
// now-completed remoteCount.
// If the remote sync completes, it will trigger a new emission here.
return TetherClientReturn(
data: localData.data,
count: remoteCount ?? localData.count, // Use remote count when available
error: localData.error,
);
},
).handleError((error, stackTrace) {
// Catch errors from either the local stream or the remote fetch.
debugPrint('Error in asStream pipeline: $error');
// It's better to let the UI layer handle the error state.
// We rethrow it so providers like StreamProvider can catch it.
throw error;
});
}