execute method

  1. @override
Future execute(
  1. dynamic _params
)
override

Implementation

@override
Future execute(dynamic _params) async {
  print("Upload large file execute");
  this.params = _params as UploadFileParams;

  total = params.file.size.toDouble();

  workPhase = WorkPhase.processing;
  sendStatus();

  this.sendUpdate(
      message:
          "file: ${params.file.name} size ${params.file.size.formatBytes()}");
  var pfile = params.file;
  this.sendUpdate(message: "found file: ${pfile}");

  uploads = UploadsApi(
    ApiClient(
      transport: DioLibTransport(basePath: params.apiBasePath!),
      authentication: BearerAuthentication(params.apiToken!),
      serializer: AggregateApiReader(
        PrimitiveApiReader(),
        UploadsApiReader(),
        ApiReader.mmodel(),
      ),
    ),
  );
  // final _progress = progress ?? ProgressTracker<Uri>.ratio();

  String keyName;

  this.sendUpdate(
      message:
          "Starting upload... Total of ${params.file.size.formatBytes()}");
  keyName = params.keyName!;
  StartUpload upload;
  try {
    upload = await uploads.startUpload(
        body: UploadRequest.of(path: keyName),
        mediaParams: params.mediaType == null || params.mediaUrl == null
            ? null
            : {
                "mediaType": params.mediaType,
                "fileName": params.file.name,
                "fileSize": params.file.size,
                "mediaUrl": params.mediaUrl,
              });
  } catch (e, stack) {
    log.severe(e, stack);
    message = "Failed to start upload: $e";
    rethrow;
  }

  try {
    if (upload.mediaId != null) {
      sendUpdate(message: "Found a duplicate!", progress: 100);
      return;
    }
    uploadId = upload.uploadId!;
    log.info("Started upload $uploadId");
    sendUpdate(message: "Starting to buffer: $uploadId", progress: 10);

    log.info("Starting upload of ${pfile.name}");

    /// 5mb min size
    var uploadSize = 1024 * 1024 * 5;
    _currentPart = 1;

    var _buf = BytesBuffer();
    final _parts = <Future<ETagResponse>>[];

    final locks = LocalSemaphore(3);
    await for (var chunk in pfile.openStream()) {
      if (this.isShuttingDown) {
        return;
      }
      _buf.add(chunk);
      _seen += chunk.length;
      if (_buf.length >= uploadSize) {
        /// upload chunk
        _currentPart = _currentPart + 1;
        final me = _currentPart;
        var _b = _buf.toBytes();
        _buf = BytesBuffer();
        final _comp = Completer<ETagResponse>();
        _parts.add(_comp.future);
        await locks.acquire();
        try {
          log.info("Starting upload ${_currentPart + 1}");
          sendUpdate(
            message: "Buffered enough:  Uploading ${_currentPart - 1}",
            state: {
              "uploadId": uploadId,
              "currentPart": _currentPart,
            },
          );
          final p = await uploadPart(me, _b);
          _comp.complete(p);
        } finally {
          locks.release();
        }
      }
    }

    /// Wait for any pending items before continuing
    final parts = [...(await Future.wait(_parts))];

    /// Take care of the last item
    if (_buf.length > 0) {
      _currentPart = _currentPart + 1;
      final me = _currentPart;
      parts.add(await uploadPart(me, _buf.toBytes()));
    }

    sendUpdate(
        message: "Parts are done!  Now we need to finalize!", progress: 95);
    log.info("Completed uploading ${_parts.length} parts for $uploadId");
    var resp = await uploads.completeUpload(uploadId,
        body: FinishUpload.of(
          keyName: keyName,
          parts: parts,
        ));

    log.info("Completed $uploadId status of ${resp}");
    sendUpdate(message: "Done!");
  } catch (e, s) {
    log.severe(e, s);

    rethrow;
  }
}