streamRecordsThroughFile function

Stream<ListRecord> streamRecordsThroughFile(
  1. FileOpener resourceFile,
  2. List<FieldDefinition> fieldDefinitions,
  3. LoadCriterion? criteria, {
  4. bool doValidationChecks = false,
  5. GtfsDataset? dataset,
  6. Completer<void>? cancellationSignal,
})

Implementation

Stream<ListRecord> streamRecordsThroughFile(
  FileOpener resourceFile,
  List<FieldDefinition> fieldDefinitions,
  LoadCriterion? criteria, {
  bool doValidationChecks = false,
  GtfsDataset? dataset,
  Completer<void>? cancellationSignal,
}) async* {
  if (doValidationChecks && dataset == null) {
    throw Exception('Wants to do validation checks but no dataset provided');
  }

  final (stream: fileStream, name: fileName) = resourceFile;

  List<FieldDefinition>? header;
  List<int>? criterionRequestedFieldsIndices;
  await for (final rawLine in fileStream()
      .transform(utf8.decoder)
      .transform(
        CsvToListConverter(
          shouldParseNumbers: false,
          csvSettingsDetector: FirstOccurrenceSettingsDetector(
            eols: ['\r\n', '\n'],
          ),
        ),
      )) {
    ListRecord record = ListRecord.from(rawLine);
    if (header == null) {
      header = record
          .map((e) {
            final field = fieldDefinitions.firstWhere(
              (element) => element.name == e,
              orElse: () {
                _logger.warning(
                  'Unknown field in $fileName, header = ${record.join(',')}',
                );
                return FieldDefinition<Object>(
                  e ?? 'unnamed_field',
                  (dataset, header, records) => null,
                  type: TextFieldType(),
                );
              },
            );

            return field;
          })
          .toList(growable: false);
      yield record;

      if (criteria != null) {
        criterionRequestedFieldsIndices = criteria.requestedFields
            .map((e) => rawLine.indexOf(e))
            .toList(growable: false);
      }

      continue;
    }

    final matchesCriteria =
        criteria?.criterion(
          criterionRequestedFieldsIndices!
              .map((e) => e == -1 ? null : record.elementAtOrNull(e))
              .toList(growable: false),
        ) ??
        true;

    if (matchesCriteria) {
      record = record
          .map((e) => (e?.isEmpty ?? true) ? null : e)
          .toList(growable: false);

      if (doValidationChecks) {
        final mapRecord = toMapRecord(header, record);
        await _checkForIndividualField(
          mapRecord,
          fieldDefinitions,
          header,
          dataset!,
        );
      }

      yield record;
    }

    if (cancellationSignal?.isCompleted ?? false) break;
  }

  // TODO: Check the field-checking stuff to (a) not require the full file and
  // TODO: (b) not give the record as a map record.
  //_checkForOverallField(file, fieldDefinitions, header!, dataset!);
}