extractDataStream method
Parses HTML content and extracts data using CSS selectors in a streaming fashion
htmlStream
is the stream of HTML content to parse
selector
is the CSS selector to use
attribute
is the attribute to extract (optional)
asText
whether to extract the text content (default: true)
chunkSize
is the size of each chunk to process (default: 1024 * 1024 bytes)
Implementation
Stream<String> extractDataStream({
required Stream<List<int>> htmlStream,
required String selector,
String? attribute,
bool asText = true,
int chunkSize = 1024 * 1024, // 1MB chunks
}) async* {
_logger.info('Starting streaming extraction with selector: $selector');
if (attribute != null) {
_logger.info('Using attribute: $attribute');
}
// Buffer to accumulate HTML chunks
final buffer = StringBuffer();
// Track if we've found the opening body tag
bool foundBody = false;
// Track elements we've already processed to avoid duplicates
final processedElements = <String>{};
await for (var chunk in htmlStream.transform(utf8.decoder)) {
buffer.write(chunk);
String html = buffer.toString();
// Only start processing once we have the opening body tag
if (!foundBody && html.contains('<body')) {
foundBody = true;
}
// If we haven't found the body yet, continue accumulating
if (!foundBody) {
continue;
}
try {
// Parse the accumulated HTML
final document = html_parser.parse(html);
// Query the elements
final elements = document.querySelectorAll(selector);
// Process each element
for (var element in elements) {
String value;
if (attribute != null) {
value = element.attributes[attribute] ?? '';
} else if (asText) {
value = element.text.trim();
} else {
value = element.outerHtml;
}
// Generate a unique key for this element to avoid duplicates
final elementKey = _generateElementKey(element, value);
// Only yield elements we haven't processed yet
if (!processedElements.contains(elementKey) && value.isNotEmpty) {
processedElements.add(elementKey);
yield value;
}
}
// If the buffer is getting too large, trim it
if (buffer.length > chunkSize * 2) {
// Keep only the last chunk to ensure we don't miss elements that span chunks
html = html.substring(html.length - chunkSize);
buffer.clear();
buffer.write(html);
}
} catch (e) {
_logger.warning('Error parsing HTML chunk: $e');
// Continue processing - don't throw an exception for a single chunk
}
}
// Process any remaining HTML
if (buffer.isNotEmpty) {
try {
final document = html_parser.parse(buffer.toString());
final elements = document.querySelectorAll(selector);
for (var element in elements) {
String value;
if (attribute != null) {
value = element.attributes[attribute] ?? '';
} else if (asText) {
value = element.text.trim();
} else {
value = element.outerHtml;
}
final elementKey = _generateElementKey(element, value);
if (!processedElements.contains(elementKey) && value.isNotEmpty) {
processedElements.add(elementKey);
yield value;
}
}
} catch (e) {
_logger.error('Error parsing final HTML chunk: $e');
throw ScrapingException.parsing(
'Error parsing final HTML chunk',
originalException: e,
isRetryable: false,
);
}
}
_logger.info(
'Completed streaming extraction, found ${processedElements.length} items',
);
}