streamHttpRequest function

Future<Stream<HttpStreamOutput?>> streamHttpRequest(
  1. String requestId,
  2. APIType apiType,
  3. HttpRequestModel httpRequestModel, {
  4. SupportedUriSchemes defaultUriScheme = kDefaultUriScheme,
  5. bool noSSL = false,
})

Implementation

Future<Stream<HttpStreamOutput>> streamHttpRequest(
  String requestId,
  APIType apiType,
  HttpRequestModel httpRequestModel, {
  SupportedUriSchemes defaultUriScheme = kDefaultUriScheme,
  bool noSSL = false,
}) async {
  final authData = httpRequestModel.authModel;
  final controller = StreamController<HttpStreamOutput>();
  StreamSubscription<List<int>?>? subscription;
  final stopwatch = Stopwatch()..start();

  Future<void> _cleanup() async {
    stopwatch.stop();
    await subscription?.cancel();
    httpClientManager.closeClient(requestId);
    await Future.microtask(() {});
    controller.close();
  }

  Future<void> _addCancelledMessage() async {
    if (!controller.isClosed) {
      controller.add((null, null, null, kMsgRequestCancelled));
    }
    httpClientManager.removeCancelledRequest(requestId);
    await _cleanup();
  }

  Future<void> _addErrorMessage(dynamic error) async {
    await Future.microtask(() {});
    if (httpClientManager.wasRequestCancelled(requestId)) {
      await _addCancelledMessage();
    } else {
      controller.add((null, null, null, error.toString()));
      await _cleanup();
    }
  }

  controller.onCancel = () async {
    await subscription?.cancel();
    httpClientManager.cancelRequest(requestId);
  };

  if (httpClientManager.wasRequestCancelled(requestId)) {
    await _addCancelledMessage();
    return controller.stream;
  }

  final client = httpClientManager.createClient(requestId, noSSL: noSSL);

  HttpRequestModel authenticatedHttpRequestModel = httpRequestModel.copyWith();

  try {
    if (authData != null && authData.type != APIAuthType.none) {
      authenticatedHttpRequestModel = await handleAuth(
        httpRequestModel,
        authData,
      );
    }
  } catch (e) {
    await _addErrorMessage(e.toString());
    return controller.stream;
  }

  final (uri, uriError) = getValidRequestUri(
    authenticatedHttpRequestModel.url,
    authenticatedHttpRequestModel.enabledParams,
    defaultUriScheme: defaultUriScheme,
  );

  if (uri == null) {
    await _addErrorMessage(uriError ?? 'Invalid URL');
    return controller.stream;
  }

  try {
    final streamedResponse = await makeStreamedRequest(
      client: client,
      uri: uri,
      requestModel: authenticatedHttpRequestModel,
      apiType: apiType,
    );

    HttpResponse _createResponseFromBytes(List<int> bytes) {
      return HttpResponse.bytes(
        bytes,
        streamedResponse.statusCode,
        request: streamedResponse.request,
        headers: streamedResponse.headers,
        isRedirect: streamedResponse.isRedirect,
        persistentConnection: streamedResponse.persistentConnection,
        reasonPhrase: streamedResponse.reasonPhrase,
      );
    }

    final contentType =
        getMediaTypeFromHeaders(streamedResponse.headers)?.mimeType ?? '';
    final chunkList = <List<int>>[];

    subscription = streamedResponse.stream.listen(
      (bytes) async {
        if (controller.isClosed) return;
        final isStreaming = kStreamingResponseTypes.contains(contentType);
        if (isStreaming) {
          final response = _createResponseFromBytes(bytes);
          controller.add((true, response, stopwatch.elapsed, null));
        } else {
          chunkList.add(bytes);
        }
      },
      onDone: () async {
        if (chunkList.isNotEmpty && !controller.isClosed) {
          final allBytes = chunkList.expand((x) => x).toList();
          final response = _createResponseFromBytes(allBytes);
          final isStreaming = kStreamingResponseTypes.contains(contentType);
          controller.add((isStreaming, response, stopwatch.elapsed, null));
          chunkList.clear();
        } else {
          final response = _createResponseFromBytes([]);
          controller.add((false, response, stopwatch.elapsed, null));
        }
        await _cleanup();
      },
      onError: _addErrorMessage,
    );
    return controller.stream;
  } catch (e) {
    await _addErrorMessage(e);
    return controller.stream;
  }
}