subscribe<_Model extends OfflineFirstWithGraphqlModel> method
Stream<List<_Model> >
subscribe<_Model extends OfflineFirstWithGraphqlModel>({
- OfflineFirstGetPolicy policy = OfflineFirstGetPolicy.awaitRemoteWhenNoneExist,
- Query? query,
Listen for streaming changes from the remoteProvider. Data is returned in complete batches.
get is invoked on the memoryCacheProvider
and sqliteProvider
following an upsert
invocation. For more, see notifySubscriptionsWithLocalData.
It is strongly recommended that this invocation be immediately .listen
ed assigned
with the assignment/subscription .cancel()
'd as soon as the data is no longer needed.
The stream will not close naturally.
Implementation
Stream<List<_Model>> subscribe<_Model extends OfflineFirstWithGraphqlModel>({
OfflineFirstGetPolicy policy = OfflineFirstGetPolicy.awaitRemoteWhenNoneExist,
Query? query,
}) {
query ??= Query();
if (subscriptions[_Model]?[query] != null) {
return subscriptions[_Model]![query]!.stream as Stream<List<_Model>>;
}
final withPolicy = applyPolicyToQuery(query, get: policy);
StreamSubscription<List<_Model>>? remoteSubscription;
final adapter = remoteProvider.modelDictionary.adapterFor[_Model];
if (adapter?.queryOperationTransformer != null &&
adapter?.queryOperationTransformer!(query, null).subscribe != null) {
remoteSubscription = remoteProvider
.subscribe<_Model>(query: withPolicy, repository: this)
.listen((modelsFromRemote) async {
// Remote results are never returned directly;
// after the remote results are fetched they're stored
// and memory/SQLite is reported to the subscribers
final modelsIntoSqlite =
await storeRemoteResults<_Model>(modelsFromRemote, shouldNotify: false);
memoryCacheProvider.hydrate<_Model>(modelsIntoSqlite);
});
}
final controller = StreamController<List<_Model>>(
onCancel: () async {
remoteSubscription?.cancel();
subscriptions[_Model]?[query]?.close();
subscriptions[_Model]?.remove(query);
if (subscriptions[_Model]?.isEmpty ?? false) {
subscriptions.remove(_Model);
}
},
);
subscriptions[_Model] ??= {};
subscriptions[_Model]?[query] = controller;
// Seed initial data from local when opening a new subscription
get<_Model>(query: query, policy: OfflineFirstGetPolicy.localOnly).then((results) {
if (!controller.isClosed) controller.add(results);
});
return controller.stream;
}