extractDataStream method

Stream<String> extractDataStream({
  1. required Stream<List<int>> htmlStream,
  2. required String selector,
  3. String? attribute,
  4. bool asText = true,
  5. int chunkSize = 1024 * 1024,
})

Parses HTML content and extracts data using CSS selectors in a streaming fashion

htmlStream is the stream of HTML content to parse selector is the CSS selector to use attribute is the attribute to extract (optional) asText whether to extract the text content (default: true) chunkSize is the size of each chunk to process (default: 1024 * 1024 bytes)

Implementation

Stream<String> extractDataStream({
  required Stream<List<int>> htmlStream,
  required String selector,
  String? attribute,
  bool asText = true,
  int chunkSize = 1024 * 1024, // 1MB chunks
}) async* {
  _logger.info('Starting streaming extraction with selector: $selector');
  if (attribute != null) {
    _logger.info('Using attribute: $attribute');
  }

  // Buffer to accumulate HTML chunks
  final buffer = StringBuffer();

  // Track if we've found the opening body tag
  bool foundBody = false;

  // Track elements we've already processed to avoid duplicates
  final processedElements = <String>{};

  await for (var chunk in htmlStream.transform(utf8.decoder)) {
    buffer.write(chunk);
    String html = buffer.toString();

    // Only start processing once we have the opening body tag
    if (!foundBody && html.contains('<body')) {
      foundBody = true;
    }

    // If we haven't found the body yet, continue accumulating
    if (!foundBody) {
      continue;
    }

    try {
      // Parse the accumulated HTML
      final document = html_parser.parse(html);

      // Query the elements
      final elements = document.querySelectorAll(selector);

      // Process each element
      for (var element in elements) {
        String value;

        if (attribute != null) {
          value = element.attributes[attribute] ?? '';
        } else if (asText) {
          value = element.text.trim();
        } else {
          value = element.outerHtml;
        }

        // Generate a unique key for this element to avoid duplicates
        final elementKey = _generateElementKey(element, value);

        // Only yield elements we haven't processed yet
        if (!processedElements.contains(elementKey) && value.isNotEmpty) {
          processedElements.add(elementKey);
          yield value;
        }
      }

      // If the buffer is getting too large, trim it
      if (buffer.length > chunkSize * 2) {
        // Keep only the last chunk to ensure we don't miss elements that span chunks
        html = html.substring(html.length - chunkSize);
        buffer.clear();
        buffer.write(html);
      }
    } catch (e) {
      _logger.warning('Error parsing HTML chunk: $e');
      // Continue processing - don't throw an exception for a single chunk
    }
  }

  // Process any remaining HTML
  if (buffer.isNotEmpty) {
    try {
      final document = html_parser.parse(buffer.toString());
      final elements = document.querySelectorAll(selector);

      for (var element in elements) {
        String value;

        if (attribute != null) {
          value = element.attributes[attribute] ?? '';
        } else if (asText) {
          value = element.text.trim();
        } else {
          value = element.outerHtml;
        }

        final elementKey = _generateElementKey(element, value);

        if (!processedElements.contains(elementKey) && value.isNotEmpty) {
          processedElements.add(elementKey);
          yield value;
        }
      }
    } catch (e) {
      _logger.error('Error parsing final HTML chunk: $e');
      throw ScrapingException.parsing(
        'Error parsing final HTML chunk',
        originalException: e,
        isRetryable: false,
      );
    }
  }

  _logger.info(
    'Completed streaming extraction, found ${processedElements.length} items',
  );
}