triggerJobRuns method

Future<int> triggerJobRuns()

Implementation

Future<int> triggerJobRuns() {
  if (_currentStream != null) {
    _logger.info('Already running jobs. Ignoring trigger.');
    return Future.value(0);
  }
  _logger.finest('Triggering Job Runs. ${_queue.length}');
  final completer = Completer<int>();
  var successfulJobs = 0;
  //    final job = _queue.removeFirst();
  _currentStream =
      (() async* {
        final copyQueue = _queue
            .map((job) async {
              await job.runner(job).drain(null);
              return job;
            })
            .toList(growable: false);
        for (final job in copyQueue) {
          yield await job;
        }
      })().listen(
        (successJob) {
          _queue.remove(successJob);
          successfulJobs++;
          _logger.finest(
            'Success job. remaining: ${_queue.length} - completed: $successfulJobs',
          );
        },
        onDone: () {
          _logger.finest('All jobs done.');
          _errorCount = 0;
          _lastError = null;

          _currentStream = null;
          completer.complete(successfulJobs);
        },
        onError: (Object error, StackTrace stackTrace) {
          _logger.warning('Error while executing job', error, stackTrace);
          _errorCount++;
          _lastError = DateTime.now();
          _currentStream!.cancel();
          _currentStream = null;
          completer.completeError(error, stackTrace);

          const errorWait = 10;
          final minWait = Duration(
            seconds: errorWait * (_errorCount * _errorCount + 1),
          );
          if (_lastError!
                  .difference(DateTime.now())
                  .abs()
                  .compareTo(minWait) <
              0) {
            _logger.finest('There was an error. waiting at least $minWait');
            if (_queue.length > maxQueueSize) {
              _logger.finest('clearing log buffer. ${_queue.length}');
              _queue.clear();
            }
          }
          return Future.value(null);
        },
      );

  return completer.future;
}