changes method

Observable<Changes> changes()

Implementation

Observable<Changes> changes() {
  Map<SquidDocId, Map<String, dynamic>>? beforeDocMap;
  var beforeDocSet = <Map<String, dynamic>>{};
  return combineLatest([
    snapshots(),
    _documentIdentityService
        .observeChanges()
        .switchMap((idResolutionMap) {
          if (beforeDocMap != null) {
            for (final entry in idResolutionMap.entries) {
              replaceKeyInMap(beforeDocMap!, entry.key, entry.value);
            }
          }

          /// Avoid notifying subscribers when a document id changes. Subscribers
          /// should only be notified when the snapshots stream pushes a
          /// new value.
          return never();
        })
        .cast()
        .beginWith(just(<SquidDocId, SquidDocId>{}))
  ])
      .cast<Iterable>()
      .map((data) => data.first)
      .cast<Iterable<DocumentReference>>()
      .map((data) {
    var inserts = <DocumentReference>[];
    final updates = <DocumentReference>[];
    final deletes = <Map<String, dynamic>>[];
    if (beforeDocMap == null) {
      inserts = [...data];
    } else {
      for (final docAfter in data) {
        final squidDocId = docAfter.squidDocId;
        final docAfterData = docAfter.data;
        if (beforeDocSet.contains(docAfterData)) {
          beforeDocMap!.remove(squidDocId);
          beforeDocSet.remove(docAfterData);
          continue;
        }

        if (beforeDocMap!.containsKey(squidDocId)) {
          updates.add(docAfter);
          final beforeDoc = beforeDocMap![squidDocId]!;
          beforeDocMap!.remove(squidDocId);
          beforeDocSet.remove(beforeDoc);
        } else {
          inserts.add(docAfter);
        }
      }
      for (final beforeDocData in beforeDocSet) {
        deletes.add(beforeDocData);
      }
    }
    beforeDocMap = {};
    beforeDocSet = {};
    for (final afterDoc in data) {
      final afterDocData = afterDoc.data;
      beforeDocMap![afterDoc.squidDocId] = afterDocData;
      beforeDocSet.add(afterDocData);
    }
    return Changes._(inserts, updates, deletes);
  });
}