extractStructuredDataStream method
Parses HTML content and extracts structured data using CSS selectors in a streaming fashion
htmlStream
is the stream of HTML content to parse
selectors
is a map of field names to CSS selectors
attributes
is a map of field names to attributes to extract (optional)
chunkSize
is the size of each chunk to process (default: 1024 * 1024 bytes)
Implementation
Stream<Map<String, String>> extractStructuredDataStream({
required Stream<List<int>> htmlStream,
required Map<String, String> selectors,
Map<String, String?>? attributes,
int chunkSize = 1024 * 1024, // 1MB chunks
}) async* {
_logger.info(
'Starting streaming structured extraction with selectors: ${selectors.toString()}',
);
if (attributes != null) {
_logger.info('Using attributes: ${attributes.toString()}');
}
// Buffer to accumulate HTML chunks
final buffer = StringBuffer();
// Track if we've found the opening body tag
bool foundBody = false;
// Track items we've already processed to avoid duplicates
final processedItems = <String>{};
await for (var chunk in htmlStream.transform(utf8.decoder)) {
buffer.write(chunk);
String html = buffer.toString();
// Only start processing once we have the opening body tag
if (!foundBody && html.contains('<body')) {
foundBody = true;
}
// If we haven't found the body yet, continue accumulating
if (!foundBody) {
continue;
}
try {
// Parse the accumulated HTML
final document = html_parser.parse(html);
// Find the maximum number of items for any selector
int maxItems = 0;
final elementsByField = <String, List<Element>>{};
selectors.forEach((field, selector) {
final elements = document.querySelectorAll(selector);
elementsByField[field] = elements;
if (elements.length > maxItems) {
maxItems = elements.length;
}
});
// Process each item
for (int i = 0; i < maxItems; i++) {
final item = <String, String>{};
bool hasData = false;
selectors.forEach((field, selector) {
final elements = elementsByField[field] ?? [];
if (i < elements.length) {
final element = elements[i];
final attribute = attributes?[field];
if (attribute != null) {
item[field] = element.attributes[attribute] ?? '';
} else {
item[field] = element.text.trim();
}
if (item[field]!.isNotEmpty) {
hasData = true;
}
} else {
item[field] = '';
}
});
// Only yield items that have at least some data
if (hasData) {
final itemKey = _generateItemKey(item);
if (!processedItems.contains(itemKey)) {
processedItems.add(itemKey);
yield item;
}
}
}
// If the buffer is getting too large, trim it
if (buffer.length > chunkSize * 2) {
// Keep only the last chunk to ensure we don't miss elements that span chunks
html = html.substring(html.length - chunkSize);
buffer.clear();
buffer.write(html);
}
} catch (e) {
_logger.warning('Error parsing HTML chunk: $e');
// Continue processing - don't throw an exception for a single chunk
}
}
// Process any remaining HTML
if (buffer.isNotEmpty) {
try {
final document = html_parser.parse(buffer.toString());
// Find the maximum number of items for any selector
int maxItems = 0;
final elementsByField = <String, List<Element>>{};
selectors.forEach((field, selector) {
final elements = document.querySelectorAll(selector);
elementsByField[field] = elements;
if (elements.length > maxItems) {
maxItems = elements.length;
}
});
// Process each item
for (int i = 0; i < maxItems; i++) {
final item = <String, String>{};
bool hasData = false;
selectors.forEach((field, selector) {
final elements = elementsByField[field] ?? [];
if (i < elements.length) {
final element = elements[i];
final attribute = attributes?[field];
if (attribute != null) {
item[field] = element.attributes[attribute] ?? '';
} else {
item[field] = element.text.trim();
}
if (item[field]!.isNotEmpty) {
hasData = true;
}
} else {
item[field] = '';
}
});
// Only yield items that have at least some data
if (hasData) {
final itemKey = _generateItemKey(item);
if (!processedItems.contains(itemKey)) {
processedItems.add(itemKey);
yield item;
}
}
}
} catch (e) {
_logger.error('Error parsing final HTML chunk: $e');
throw ScrapingException.parsing(
'Error parsing final HTML chunk',
originalException: e,
isRetryable: false,
);
}
}
_logger.info(
'Completed streaming structured extraction, found ${processedItems.length} items',
);
}