asStream method

  1. @override
Stream<TetherClientReturn<TModel>> asStream()
override

Stream implementation

Implementation

@override
Stream<TetherClientReturn<TModel>> asStream() {
  if (type != SqlOperationType.select) {
    return Stream.error(
      UnsupportedError('Streaming is only supported for SELECT operations.'),
    );
  }
  if (localQuery == null) {
    return Stream.error(
      ArgumentError('localQuery must be provided for streaming.'),
    );
  }

  // 1. Build the local query.
  final aliasedQuery = localQuery!.copyWith(
    selectColumns: '${localQuery!.selectColumns} AS jsobjects',
  );
  final FinalSqlStatement builtQuery = aliasedQuery.build();

  // 2. Create a stream that watches the local database.
  // This stream will emit whenever the underlying data changes.
  final localDataStream = localDb
      .watch(builtQuery.sql, parameters: builtQuery.arguments)
      .map((rows) {
    final models = rows
        .where((row) => row['jsobjects'] != null)
        .map((row) => fromSqliteFactory(
            createRowFromMap(jsonDecode(row['jsobjects'] as String))))
        .toList();
    return TetherClientReturn<TModel>(data: models);
  });

  // 3. Define the remote sync operation as a Future.
  // This will fetch the remote count and upsert the data.
  Future<int?> syncRemoteData() async {
    if (isLocalOnly || !syncWithSupabase) return null;
    try {
      final remoteDataResponse =
          await _resetPreferForCount().count(CountOption.exact);
      await upsertSupabaseData(remoteDataResponse.data);
      return remoteDataResponse.count;
    } catch (e, s) {
      // Propagate the error through the stream.
      debugPrint('Error fetching remote data for stream: $e $s');
      rethrow;
    }
  }

  // 4. Use RxDart's `CombineLatestStream` to merge the local data stream
  //    with the result of the remote sync operation.
  return CombineLatestStream.combine2(
    localDataStream,
    // Convert the Future into a stream that emits once.
    // `startWith(null)` ensures the stream emits immediately with no count,
    // allowing the local data to be shown first.
    Stream.fromFuture(syncRemoteData()).startWith(null),
    (localData, remoteCount) {
      // The first emission will have remoteCount = null.
      // Subsequent emissions from localDataStream will be combined with the
      // now-completed remoteCount.
      // If the remote sync completes, it will trigger a new emission here.
      return TetherClientReturn(
        data: localData.data,
        count: remoteCount ?? localData.count, // Use remote count when available
        error: localData.error,
      );
    },
  ).handleError((error, stackTrace) {
    // Catch errors from either the local stream or the remote fetch.
    debugPrint('Error in asStream pipeline: $error');
    // It's better to let the UI layer handle the error state.
    // We rethrow it so providers like StreamProvider can catch it.
    throw error;
  });
}