subscribe<_Model extends OfflineFirstWithGraphqlModel> method

Stream<List<_Model>> subscribe<_Model extends OfflineFirstWithGraphqlModel>({
  1. OfflineFirstGetPolicy policy = OfflineFirstGetPolicy.awaitRemoteWhenNoneExist,
  2. Query? query,
})

Listen for streaming changes from the remoteProvider. Data is returned in complete batches. get is invoked on the memoryCacheProvider and sqliteProvider following an upsert invocation. For more, see notifySubscriptionsWithLocalData.

It is strongly recommended that this invocation be immediately .listened assigned with the assignment/subscription .cancel()'d as soon as the data is no longer needed. The stream will not close naturally.

Implementation

Stream<List<_Model>> subscribe<_Model extends OfflineFirstWithGraphqlModel>({
  OfflineFirstGetPolicy policy = OfflineFirstGetPolicy.awaitRemoteWhenNoneExist,
  Query? query,
}) {
  query ??= Query();
  if (subscriptions[_Model]?[query] != null) {
    return subscriptions[_Model]![query]!.stream as Stream<List<_Model>>;
  }

  final withPolicy = applyPolicyToQuery(query, get: policy);

  StreamSubscription<List<_Model>>? remoteSubscription;
  final adapter = remoteProvider.modelDictionary.adapterFor[_Model];
  if (adapter?.queryOperationTransformer != null &&
      adapter?.queryOperationTransformer!(query, null).subscribe != null) {
    remoteSubscription = remoteProvider
        .subscribe<_Model>(query: withPolicy, repository: this)
        .listen((modelsFromRemote) async {
      // Remote results are never returned directly;
      // after the remote results are fetched they're stored
      // and memory/SQLite is reported to the subscribers
      final modelsIntoSqlite =
          await storeRemoteResults<_Model>(modelsFromRemote, shouldNotify: false);
      memoryCacheProvider.hydrate<_Model>(modelsIntoSqlite);
    });
  }

  final controller = StreamController<List<_Model>>(
    onCancel: () async {
      remoteSubscription?.cancel();
      subscriptions[_Model]?[query]?.close();
      subscriptions[_Model]?.remove(query);
      if (subscriptions[_Model]?.isEmpty ?? false) {
        subscriptions.remove(_Model);
      }
    },
  );

  subscriptions[_Model] ??= {};
  subscriptions[_Model]?[query] = controller;

  // Seed initial data from local when opening a new subscription
  get<_Model>(query: query, policy: OfflineFirstGetPolicy.localOnly).then((results) {
    if (!controller.isClosed) controller.add(results);
  });

  return controller.stream;
}