bind method

  1. @override
Stream<Event> bind(
  1. Stream<List<int>> stream
)
override

Transforms the provided stream.

Returns a new stream with events that are computed from events of the provided stream.

The StreamTransformer interface is completely generic, so it cannot say what subclasses do. Each StreamTransformer should document clearly how it transforms the stream (on the class or variable used to access the transformer), as well as any differences from the following typical behavior:

  • When the returned stream is listened to, it starts listening to the input stream.
  • Subscriptions of the returned stream forward (in a reasonable time) a StreamSubscription.pause call to the subscription of the input stream.
  • Similarly, canceling a subscription of the returned stream eventually (in reasonable time) cancels the subscription of the input stream.

"Reasonable time" depends on the transformer and stream. Some transformers, like a "timeout" transformer, might make these operations depend on a duration. Others might not delay them at all, or just by a microtask.

Transformers are free to handle errors in any way. A transformer implementation may choose to propagate errors, or convert them to other events, or ignore them completely, but if errors are ignored, it should be documented explicitly.

Implementation

@override
Stream<Event> bind(Stream<List<int>> stream) {
  _controller = StreamController(onListen: () {
    // the event we are currently building
    var currentEvent = Event();
    // the regexes we will use later
    var lineRegex = RegExp(r'^([^:]*)(?::)?(?: )?(.*)?$');
    var removeEndingNewlineRegex = RegExp(r'^((?:.|\n)*)\n$');
    // This stream will receive chunks of data that is not necessarily a
    // single event. So we build events on the fly and broadcast the event as
    // soon as we encounter a double newline, then we start a new one.
    stream
        .transform(const Utf8Decoder())
        .transform(const LineSplitter())
        .listen((String line) {
      if (line.isEmpty) {
        // event is done
        // strip ending newline from data
        if (currentEvent.data != null) {
          var match =
              removeEndingNewlineRegex.firstMatch(currentEvent.data!)!;
          currentEvent.data = match.group(1);
        }
        _controller.add(currentEvent);
        currentEvent = Event();
        return;
      }
      // match the line prefix and the value using the regex
      Match match = lineRegex.firstMatch(line)!;
      var field = match.group(1)!;
      var value = match.group(2) ?? '';
      if (field.isEmpty) {
        // lines starting with a colon are to be ignored
        return;
      }
      switch (field) {
        case 'event':
          currentEvent.event = value;
          break;
        case 'data':
          currentEvent.data = '${currentEvent.data ?? ''}$value\n';
          break;
        case 'id':
          currentEvent.id = value;
          break;
        case 'retry':
          if (retryIndicator != null) {
            retryIndicator!(Duration(milliseconds: int.parse(value)));
          }
          break;
      }
    });
  });
  return _controller.stream;
}