streamRecordsThroughFile function
Stream<ListRecord>
streamRecordsThroughFile(
- FileOpener resourceFile,
- List<
FieldDefinition> fieldDefinitions, - LoadCriterion? criteria, {
- bool doValidationChecks = false,
- GtfsDataset? dataset,
- Completer<
void> ? cancellationSignal,
Implementation
Stream<ListRecord> streamRecordsThroughFile(
FileOpener resourceFile,
List<FieldDefinition> fieldDefinitions,
LoadCriterion? criteria, {
bool doValidationChecks = false,
GtfsDataset? dataset,
Completer<void>? cancellationSignal,
}) async* {
if (doValidationChecks && dataset == null) {
throw Exception('Wants to do validation checks but no dataset provided');
}
final (stream: fileStream, name: fileName) = resourceFile;
List<FieldDefinition>? header;
List<int>? criterionRequestedFieldsIndices;
await for (final rawLine in fileStream()
.transform(utf8.decoder)
.transform(
CsvToListConverter(
shouldParseNumbers: false,
csvSettingsDetector: FirstOccurrenceSettingsDetector(
eols: ['\r\n', '\n'],
),
),
)) {
ListRecord record = ListRecord.from(rawLine);
if (header == null) {
header = record
.map((e) {
final field = fieldDefinitions.firstWhere(
(element) => element.name == e,
orElse: () {
_logger.warning(
'Unknown field in $fileName, header = ${record.join(',')}',
);
return FieldDefinition<Object>(
e ?? 'unnamed_field',
(dataset, header, records) => null,
type: TextFieldType(),
);
},
);
return field;
})
.toList(growable: false);
yield record;
if (criteria != null) {
criterionRequestedFieldsIndices = criteria.requestedFields
.map((e) => rawLine.indexOf(e))
.toList(growable: false);
}
continue;
}
final matchesCriteria =
criteria?.criterion(
criterionRequestedFieldsIndices!
.map((e) => e == -1 ? null : record.elementAtOrNull(e))
.toList(growable: false),
) ??
true;
if (matchesCriteria) {
record = record
.map((e) => (e?.isEmpty ?? true) ? null : e)
.toList(growable: false);
if (doValidationChecks) {
final mapRecord = toMapRecord(header, record);
await _checkForIndividualField(
mapRecord,
fieldDefinitions,
header,
dataset!,
);
}
yield record;
}
if (cancellationSignal?.isCompleted ?? false) break;
}
// TODO: Check the field-checking stuff to (a) not require the full file and
// TODO: (b) not give the record as a map record.
//_checkForOverallField(file, fieldDefinitions, header!, dataset!);
}