diff --git a/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala b/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala index 2a67440cf0e..f9038010b42 100644 --- a/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala +++ b/file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala @@ -66,6 +66,12 @@ import scala.collection.mutable.ListBuffer import scala.jdk.CollectionConverters._ import scala.jdk.OptionConverters._ +import org.apache.texera.dao.jooq.generated.tables.DatasetUploadSession.DATASET_UPLOAD_SESSION +import org.apache.texera.dao.jooq.generated.tables.DatasetUploadSessionPart.DATASET_UPLOAD_SESSION_PART +import org.apache.texera.dao.jooq.generated.enums.UploadPartStatusEnum + +import java.util.UUID + object DatasetResource { private val context = SqlServer @@ -89,11 +95,11 @@ object DatasetResource { */ private def put(buf: Array[Byte], len: Int, url: String, partNum: Int): String = { val conn = new URL(url).openConnection().asInstanceOf[HttpURLConnection] - conn.setDoOutput(true); + conn.setDoOutput(true) conn.setRequestMethod("PUT") conn.setFixedLengthStreamingMode(len) val out = conn.getOutputStream - out.write(buf, 0, len); + out.write(buf, 0, len) out.close() val code = conn.getResponseCode @@ -401,7 +407,6 @@ class DatasetResource { e ) } - // delete the directory on S3 if ( S3StorageClient.directoryExists(StorageConfig.lakefsBucketName, dataset.getRepositoryName) @@ -640,138 +645,84 @@ class DatasetResource { @QueryParam("ownerEmail") ownerEmail: String, @QueryParam("datasetName") datasetName: String, @QueryParam("filePath") encodedUrl: String, - @QueryParam("uploadId") uploadId: Optional[String], @QueryParam("numParts") numParts: Optional[Integer], - payload: Map[ - String, - Any - ], // Expecting {"parts": [...], "physicalAddress": "s3://bucket/path"} + payload: Map[String, Any], // Expecting {"uploadToken": "..."} for abort and finish @Auth user: SessionUser ): Response = { val uid = user.getUid - withTransaction(context) { ctx => - val dataset = context - .select(DATASET.fields: _*) - .from(DATASET) - .leftJoin(USER) - .on(USER.UID.eq(DATASET.OWNER_UID)) - .where(USER.EMAIL.eq(ownerEmail)) - .and(DATASET.NAME.eq(datasetName)) - .fetchOneInto(classOf[Dataset]) - if (dataset == null || !userHasWriteAccess(ctx, dataset.getDid, uid)) { - throw new ForbiddenException(ERR_USER_HAS_NO_ACCESS_TO_DATASET_MESSAGE) - } - - // Decode the file path - val repositoryName = dataset.getRepositoryName - val filePath = URLDecoder.decode(encodedUrl, StandardCharsets.UTF_8.name()) - - operationType.toLowerCase match { - case "init" => - val numPartsValue = numParts.toScala.getOrElse( - throw new BadRequestException("numParts is required for initialization") - ) + operationType.toLowerCase match { + case "init" => initMultipartUpload(ownerEmail, datasetName, encodedUrl, numParts, uid) + case "finish" => finishMultipartUpload(payload, uid) + case "abort" => abortMultipartUpload(payload, uid) + case _ => + throw new BadRequestException("Invalid type parameter. Use 'init', 'finish', or 'abort'.") + } + } - val presignedResponse = LakeFSStorageClient.initiatePresignedMultipartUploads( - repositoryName, - filePath, - numPartsValue - ) - Response - .ok( - Map( - "uploadId" -> presignedResponse.getUploadId, - "presignedUrls" -> presignedResponse.getPresignedUrls, - "physicalAddress" -> presignedResponse.getPhysicalAddress - ) - ) - .build() + @POST + @RolesAllowed(Array("REGULAR", "ADMIN")) + @Path("/multipart-upload/part") + @Consumes(Array(MediaType.APPLICATION_OCTET_STREAM)) + def uploadPart( + @QueryParam("token") uploadToken: String, + @QueryParam("partNumber") partNumber: Int, + partStream: InputStream, + @Context headers: HttpHeaders, + @Auth user: SessionUser + ): Response = { - case "finish" => - val uploadIdValue = uploadId.toScala.getOrElse( - throw new BadRequestException("uploadId is required for completion") - ) + if (uploadToken == null || uploadToken.isEmpty) + throw new BadRequestException("token is required") - // Extract parts from the payload - val partsList = payload.get("parts") match { - case Some(rawList: List[_]) => - try { - rawList.map { - case part: Map[_, _] => - val partMap = part.asInstanceOf[Map[String, Any]] - val partNumber = partMap.get("PartNumber") match { - case Some(i: Int) => i - case Some(s: String) => s.toInt - case _ => throw new BadRequestException("Invalid or missing PartNumber") - } - val eTag = partMap.get("ETag") match { - case Some(s: String) => s - case _ => throw new BadRequestException("Invalid or missing ETag") - } - (partNumber, eTag) - - case _ => - throw new BadRequestException("Each part must be a Map[String, Any]") - } - } catch { - case e: NumberFormatException => - throw new BadRequestException("PartNumber must be an integer", e) - } - - case _ => - throw new BadRequestException("Missing or invalid 'parts' list in payload") - } + if (partNumber < 1) + throw new BadRequestException("partNumber must be >= 1") - // Extract physical address from payload - val physicalAddress = payload.get("physicalAddress") match { - case Some(address: String) => address - case _ => throw new BadRequestException("Missing physicalAddress in payload") - } + val tokenUuid = parseUploadTokenOrBadRequest(uploadToken, "token") - // Complete the multipart upload with parts and physical address - val objectStats = LakeFSStorageClient.completePresignedMultipartUploads( - repositoryName, - filePath, - uploadIdValue, - partsList, - physicalAddress - ) + // -------- Step 1: lock the part row and move to UPLOADING -------- + val presignedUrl = withTransaction(context) { ctx => + val partRecord = lockPartForUploadOrFail(ctx, tokenUuid, partNumber, user.getUid) + partRecord.getPresignedUrl + } - Response - .ok( - Map( - "message" -> "Multipart upload completed successfully", - "filePath" -> objectStats.getPath + // -------- Step 2: stream bytes to S3 -------- + val (eTag, bytesSent) = + try uploadPartToPresignedUrl(presignedUrl, partStream, headers, partNumber) + catch { + case e: Exception => + // revert status back to PENDING on failure + withTransaction(context) { ctx => + ctx + .update(DATASET_UPLOAD_SESSION_PART) + .set(DATASET_UPLOAD_SESSION_PART.STATUS, UploadPartStatusEnum.PENDING) + .set(DATASET_UPLOAD_SESSION_PART.UPDATED_AT, java.time.OffsetDateTime.now()) + .where( + DATASET_UPLOAD_SESSION_PART.UPLOAD_TOKEN + .eq(tokenUuid) + .and(DATASET_UPLOAD_SESSION_PART.PART_NUMBER.eq(partNumber)) ) - ) - .build() - - case "abort" => - val uploadIdValue = uploadId.toScala.getOrElse( - throw new BadRequestException("uploadId is required for abortion") - ) - - // Extract physical address from payload - val physicalAddress = payload.get("physicalAddress") match { - case Some(address: String) => address - case _ => throw new BadRequestException("Missing physicalAddress in payload") + .execute() } - - // Abort the multipart upload - LakeFSStorageClient.abortPresignedMultipartUploads( - repositoryName, - filePath, - uploadIdValue, - physicalAddress - ) - - Response.ok(Map("message" -> "Multipart upload aborted successfully")).build() - - case _ => - throw new BadRequestException("Invalid type parameter. Use 'init', 'finish', or 'abort'.") + throw e } + + // -------- Step 3: mark as COMPLETED and store ETag-------- + withTransaction(context) { ctx => + ctx + .update(DATASET_UPLOAD_SESSION_PART) + .set(DATASET_UPLOAD_SESSION_PART.STATUS, UploadPartStatusEnum.COMPLETED) + .set(DATASET_UPLOAD_SESSION_PART.ETAG, eTag) + .set(DATASET_UPLOAD_SESSION_PART.UPDATED_AT, java.time.OffsetDateTime.now()) + .where( + DATASET_UPLOAD_SESSION_PART.UPLOAD_TOKEN + .eq(tokenUuid) + .and(DATASET_UPLOAD_SESSION_PART.PART_NUMBER.eq(partNumber)) + ) + .execute() } + + Response.ok().build() } @POST @@ -1014,9 +965,8 @@ class DatasetResource { val ownerNode = DatasetFileNode .fromLakeFSRepositoryCommittedObjects( Map( - (user.getEmail, dataset.getName, latestVersion.getName) -> - LakeFSStorageClient - .retrieveObjectsOfVersion(dataset.getRepositoryName, latestVersion.getVersionHash) + (user.getEmail, dataset.getName, latestVersion.getName) -> LakeFSStorageClient + .retrieveObjectsOfVersion(dataset.getRepositoryName, latestVersion.getVersionHash) ) ) .head @@ -1372,4 +1322,308 @@ class DatasetResource { Right(response) } } + + // === Multipart helpers === + + private def initMultipartUpload( + ownerEmail: String, + datasetName: String, + encodedUrl: String, + numParts: Optional[Integer], + uid: Int + ): Response = { + withTransaction(context) { ctx => + val dataset = ctx + .select(DATASET.fields: _*) + .from(DATASET) + .leftJoin(USER) + .on(USER.UID.eq(DATASET.OWNER_UID)) + .where(USER.EMAIL.eq(ownerEmail)) + .and(DATASET.NAME.eq(datasetName)) + .fetchOneInto(classOf[Dataset]) + + if (dataset == null || !userHasWriteAccess(ctx, dataset.getDid, uid)) { + throw new ForbiddenException(ERR_USER_HAS_NO_ACCESS_TO_DATASET_MESSAGE) + } + + val repositoryName = dataset.getRepositoryName + val filePath = URLDecoder.decode(encodedUrl, StandardCharsets.UTF_8.name()) + + val numPartsValue = numParts.toScala.getOrElse( + throw new BadRequestException("numParts is required for initialization") + ) + + val presign = LakeFSStorageClient.initiatePresignedMultipartUploads( + repositoryName, + filePath, + numPartsValue + ) + val uploadIdStr = presign.getUploadId + val presignedUrls = presign.getPresignedUrls.asScala.toArray.map(_.toString) + val physicalAddr = presign.getPhysicalAddress + + val token = java.util.UUID.randomUUID() + + val sessionRecord = ctx.newRecord(DATASET_UPLOAD_SESSION) + sessionRecord.setUploadToken(token) + sessionRecord.setDid(dataset.getDid) + sessionRecord.setUid(uid) + sessionRecord.setFilePath(filePath) + sessionRecord.setUploadId(uploadIdStr) + sessionRecord.setPhysicalAddress(physicalAddr) + sessionRecord.store() + + presignedUrls.zipWithIndex.foreach { + case (url, idx) => + val partRecord = ctx.newRecord(DATASET_UPLOAD_SESSION_PART) + partRecord.setUploadToken(token) + partRecord.setPartNumber(idx + 1) + partRecord.setStatus(UploadPartStatusEnum.PENDING) + partRecord.setPresignedUrl(url) + partRecord.store() + } + + Response + .ok( + Map( + "uploadToken" -> token.toString + ) + ) + .build() + } + } + private def finishMultipartUpload( + payload: Map[String, Any], + uid: Int + ): Response = { + val tokenUuid = extractUploadTokenFromPayload(payload, "completion") + + withTransaction(context) { ctx => + val (session, dataset) = loadSessionAndDatasetOrFail(ctx, tokenUuid, uid) + + val partRecords = ctx + .selectFrom(DATASET_UPLOAD_SESSION_PART) + .where( + DATASET_UPLOAD_SESSION_PART.UPLOAD_TOKEN + .eq(tokenUuid) + .and(DATASET_UPLOAD_SESSION_PART.STATUS.eq(UploadPartStatusEnum.COMPLETED)) + ) + .orderBy(DATASET_UPLOAD_SESSION_PART.PART_NUMBER.asc()) + .fetch() + + if (partRecords.isEmpty) { + throw new BadRequestException("No completed parts for this upload") + } + + val partsList: List[(Int, String)] = + partRecords.asScala.toList.map { r => + val etag = Option(r.getEtag).getOrElse { + throw new WebApplicationException( + s"Missing ETag for part ${r.getPartNumber}", + Response.Status.INTERNAL_SERVER_ERROR + ) + } + (r.getPartNumber.intValue(), etag) + } + + // TODO: later enforce max total size here. + + val objectStats = LakeFSStorageClient.completePresignedMultipartUploads( + dataset.getRepositoryName, + session.getFilePath, + session.getUploadId, + partsList, + session.getPhysicalAddress + ) + + ctx + .deleteFrom(DATASET_UPLOAD_SESSION) + .where(DATASET_UPLOAD_SESSION.UPLOAD_TOKEN.eq(tokenUuid)) + .execute() + + Response + .ok( + Map( + "message" -> "Multipart upload completed successfully", + "filePath" -> objectStats.getPath + ) + ) + .build() + } + } + private def abortMultipartUpload( + payload: Map[String, Any], + uid: Int + ): Response = { + val tokenUuid = extractUploadTokenFromPayload(payload, "abortion") + + withTransaction(context) { ctx => + val (session, dataset) = loadSessionAndDatasetOrFail(ctx, tokenUuid, uid) + + LakeFSStorageClient.abortPresignedMultipartUploads( + dataset.getRepositoryName, + session.getFilePath, + session.getUploadId, + session.getPhysicalAddress + ) + + ctx + .deleteFrom(DATASET_UPLOAD_SESSION) + .where(DATASET_UPLOAD_SESSION.UPLOAD_TOKEN.eq(tokenUuid)) + .execute() + + Response.ok(Map("message" -> "Multipart upload aborted successfully")).build() + } + } + + private def parseUploadTokenOrBadRequest(raw: String, fieldName: String): UUID = { + try UUID.fromString(raw) + catch { + case _: IllegalArgumentException => + throw new BadRequestException(s"Invalid $fieldName format") + } + } + + private def extractUploadTokenFromPayload( + payload: Map[String, Any], + opName: String + ): UUID = { + val tokenValueStr = payload + .get("uploadToken") + .map(_.asInstanceOf[String]) + .getOrElse { + throw new BadRequestException(s"uploadToken is required for $opName") + } + + parseUploadTokenOrBadRequest(tokenValueStr, "uploadToken") + } + + private def loadSessionAndDatasetOrFail( + ctx: DSLContext, + tokenUuid: UUID, + uid: Int + ) = { + val session = ctx + .selectFrom(DATASET_UPLOAD_SESSION) + .where(DATASET_UPLOAD_SESSION.UPLOAD_TOKEN.eq(tokenUuid)) + .fetchOne() + + if (session == null) { + throw new NotFoundException("Upload session not found or already finalized") + } + + if (session.getUid != uid) { + throw new ForbiddenException("User has no access to this upload session") + } + + val dataset = ctx + .selectFrom(DATASET) + .where(DATASET.DID.eq(session.getDid)) + .fetchOne() + + if (dataset == null || !userHasWriteAccess(ctx, dataset.getDid, uid)) { + throw new ForbiddenException(ERR_USER_HAS_NO_ACCESS_TO_DATASET_MESSAGE) + } + + (session, dataset) + } + + private def lockPartForUploadOrFail( + ctx: DSLContext, + tokenUuid: UUID, + partNumber: Int, + uid: Int + ) = { + val session = ctx + .selectFrom(DATASET_UPLOAD_SESSION) + .where(DATASET_UPLOAD_SESSION.UPLOAD_TOKEN.eq(tokenUuid)) + .fetchOne() + + if (session == null) { + throw new NotFoundException("Upload session not found or expired") + } + + if (session.getUid != uid) { + throw new ForbiddenException("User has no access to this upload session") + } + + val partRecord = ctx + .selectFrom(DATASET_UPLOAD_SESSION_PART) + .where( + DATASET_UPLOAD_SESSION_PART.UPLOAD_TOKEN + .eq(tokenUuid) + .and(DATASET_UPLOAD_SESSION_PART.PART_NUMBER.eq(partNumber)) + ) + .forUpdate() + .fetchOne() + + if (partRecord == null) { + throw new BadRequestException("Invalid partNumber") + } + + partRecord.getStatus match { + case UploadPartStatusEnum.COMPLETED => + throw new BadRequestException("This part has already been completed") + + case UploadPartStatusEnum.UPLOADING => + throw new WebApplicationException( + "This part is already being uploaded", + Response.Status.CONFLICT + ) + + case UploadPartStatusEnum.PENDING => + partRecord.setStatus(UploadPartStatusEnum.UPLOADING) + partRecord.setUpdatedAt(java.time.OffsetDateTime.now()) + partRecord.update() + } + + partRecord + } + + private def uploadPartToPresignedUrl( + presignedUrl: String, + partStream: InputStream, + headers: HttpHeaders, + partNumber: Int + ): (String, Long) = { + val conn = new URL(presignedUrl).openConnection().asInstanceOf[HttpURLConnection] + conn.setDoOutput(true) + conn.setRequestMethod("PUT") + + // Only a hint for streaming, not trust boundary + Option(headers.getHeaderString(HttpHeaders.CONTENT_LENGTH)) + .flatMap(s => scala.util.Try(s.toLong).toOption) + .foreach(len => conn.setFixedLengthStreamingMode(len)) + + conn.setRequestProperty("Content-Type", "application/octet-stream") + + val outStream = conn.getOutputStream + val buffer = new Array[Byte](8 * 1024) + var bytesRead = partStream.read(buffer) + var sent: Long = 0L + + try { + while (bytesRead != -1) { + outStream.write(buffer, 0, bytesRead) + sent += bytesRead + bytesRead = partStream.read(buffer) + } + } finally { + try outStream.close() + catch { case _: Exception => () } + try partStream.close() + catch { case _: Exception => () } + } + + val code = conn.getResponseCode + if (code != HttpURLConnection.HTTP_OK && code != HttpURLConnection.HTTP_CREATED) { + conn.disconnect() + throw new RuntimeException(s"Part $partNumber upload failed (HTTP $code)") + } + + val eTag = Option(conn.getHeaderField("ETag")).map(_.replace("\"", "")).getOrElse("") + conn.disconnect() + (eTag, sent) + } + } diff --git a/frontend/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.ts b/frontend/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.ts index b4d12f5a28e..fff40cdf414 100644 --- a/frontend/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.ts +++ b/frontend/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.ts @@ -104,8 +104,8 @@ export class DatasetDetailComponent implements OnInit { // List of upload tasks – each task tracked by its filePath public uploadTasks: Array< MultipartUploadProgress & { - filePath: string; - } + filePath: string; + } > = []; @Output() userMakeChanges = new EventEmitter(); @@ -416,8 +416,7 @@ export class DatasetDetailComponent implements OnInit { filePath: file.name, percentage: 0, status: "initializing", - uploadId: "", - physicalAddress: "", + uploadToken: "", }); // Start multipart upload const subscription = this.datasetService @@ -558,21 +557,24 @@ export class DatasetDetailComponent implements OnInit { this.onUploadComplete(); } + if (!task.uploadToken) { + this.uploadTasks = this.uploadTasks.filter(t => t.filePath !== task.filePath); + return; + } + this.datasetService .finalizeMultipartUpload( this.ownerEmail, this.datasetName, task.filePath, - task.uploadId, - [], - task.physicalAddress, + task.uploadToken, true // abort flag ) .pipe(untilDestroyed(this)) .subscribe(() => { this.notificationService.info(`${task.filePath} uploading has been terminated`); }); - // Remove the aborted task immediately + this.uploadTasks = this.uploadTasks.filter(t => t.filePath !== task.filePath); } diff --git a/frontend/src/app/dashboard/service/user/dataset/dataset.service.ts b/frontend/src/app/dashboard/service/user/dataset/dataset.service.ts index c09125d73b1..eb729f9b1ee 100644 --- a/frontend/src/app/dashboard/service/user/dataset/dataset.service.ts +++ b/frontend/src/app/dashboard/service/user/dataset/dataset.service.ts @@ -27,6 +27,7 @@ import { DashboardDataset } from "../../../type/dashboard-dataset.interface"; import { DatasetFileNode } from "../../../../common/type/datasetVersionFileTree"; import { DatasetStagedObject } from "../../../../common/type/dataset-staged-object"; import { GuiConfigService } from "../../../../common/service/gui-config.service"; +import { AuthService } from "src/app/common/service/user/auth.service"; export const DATASET_BASE_URL = "dataset"; export const DATASET_CREATE_URL = DATASET_BASE_URL + "/create"; @@ -51,11 +52,10 @@ export interface MultipartUploadProgress { filePath: string; percentage: number; status: "initializing" | "uploading" | "finished" | "aborted"; - uploadId: string; - physicalAddress: string; - uploadSpeed?: number; // bytes per second - estimatedTimeRemaining?: number; // seconds - totalTime?: number; // total seconds taken + uploadToken: string; + uploadSpeed?: number; // bytes per second + estimatedTimeRemaining?: number; // seconds + totalTime?: number; // total seconds taken } @Injectable({ @@ -122,6 +122,7 @@ export class DatasetService { public retrieveAccessibleDatasets(): Observable { return this.http.get(`${AppSettings.getApiEndpoint()}/${DATASET_LIST_URL}`); } + public createDatasetVersion(did: number, newVersion: string): Observable { return this.http .post<{ @@ -141,6 +142,13 @@ export class DatasetService { /** * Handles multipart upload for large files using RxJS, * with a concurrency limit on how many parts we process in parallel. + * + * Backend flow: + * POST /dataset/multipart-upload?type=init&ownerEmail=...&datasetName=...&filePath=...&numParts=N + * -> { uploadToken } + * POST /dataset/multipart-upload/part?token=&partNumber= (body: raw chunk) + * POST /dataset/multipart-upload?type=finish (body: { uploadToken }) + * POST /dataset/multipart-upload?type=abort (body: { uploadToken }) */ public multipartUpload( ownerEmail: string, @@ -152,8 +160,8 @@ export class DatasetService { ): Observable { const partCount = Math.ceil(file.size / partSize); - return new Observable(observer => { - // Track upload progress for each part independently + return new Observable(observer => { + // Track upload progress (bytes) for each part independently const partProgress = new Map(); // Progress tracking state @@ -162,8 +170,15 @@ export class DatasetService { let lastETA = 0; let lastUpdateTime = 0; - // Calculate stats with smoothing + const lastStats = { + uploadSpeed: 0, + estimatedTimeRemaining: 0, + totalTime: 0, + }; + const getTotalTime = () => (startTime ? (Date.now() - startTime) / 1000 : 0); + + // Calculate stats with smoothing and simple throttling (~1s) const calculateStats = (totalUploaded: number) => { if (startTime === null) { startTime = Date.now(); @@ -172,25 +187,28 @@ export class DatasetService { const now = Date.now(); const elapsed = getTotalTime(); - // Throttle updates to every 1s const shouldUpdate = now - lastUpdateTime >= 1000; if (!shouldUpdate) { - return null; + // keep totalTime fresh even when throttled + lastStats.totalTime = elapsed; + return lastStats; } lastUpdateTime = now; - // Calculate speed with moving average const currentSpeed = elapsed > 0 ? totalUploaded / elapsed : 0; speedSamples.push(currentSpeed); - if (speedSamples.length > 5) speedSamples.shift(); - const avgSpeed = speedSamples.reduce((a, b) => a + b, 0) / speedSamples.length; + if (speedSamples.length > 5) { + speedSamples.shift(); + } + const avgSpeed = + speedSamples.length > 0 + ? speedSamples.reduce((a, b) => a + b, 0) / speedSamples.length + : 0; - // Calculate smooth ETA const remaining = file.size - totalUploaded; let eta = avgSpeed > 0 ? remaining / avgSpeed : 0; - eta = Math.min(eta, 24 * 60 * 60); // cap ETA at 24h, 86400 sec + eta = Math.min(eta, 24 * 60 * 60); // cap ETA at 24h - // Smooth ETA changes (limit to 30% change) if (lastETA > 0 && eta > 0) { const maxChange = lastETA * 0.3; const diff = Math.abs(eta - lastETA); @@ -200,229 +218,239 @@ export class DatasetService { } lastETA = eta; - // Near completion optimization const percentComplete = (totalUploaded / file.size) * 100; if (percentComplete > 95) { eta = Math.min(eta, 10); } - return { - uploadSpeed: avgSpeed, - estimatedTimeRemaining: Math.max(0, Math.round(eta)), - totalTime: elapsed, - }; + lastStats.uploadSpeed = avgSpeed; + lastStats.estimatedTimeRemaining = Math.max(0, Math.round(eta)); + lastStats.totalTime = elapsed; + + return lastStats; }; - const subscription = this.initiateMultipartUpload(ownerEmail, datasetName, filePath, partCount) + // 1. INIT: ask backend to create a LakeFS multipart upload session and get uploadToken + const initParams = new HttpParams() + .set("type", "init") + .set("ownerEmail", ownerEmail) + .set("datasetName", datasetName) + .set("filePath", encodeURIComponent(filePath)) + .set("numParts", partCount.toString()); + + const init$ = this.http.post<{ uploadToken: string }>( + `${AppSettings.getApiEndpoint()}/${DATASET_BASE_URL}/multipart-upload`, + {}, + { params: initParams } + ); + + const subscription = init$ .pipe( - switchMap(initiateResponse => { - const { uploadId, presignedUrls, physicalAddress } = initiateResponse; - if (!uploadId) { + switchMap(initResp => { + const uploadToken = initResp.uploadToken; + if (!uploadToken) { observer.error(new Error("Failed to initiate multipart upload")); return EMPTY; } + + // Notify UI that upload is starting observer.next({ - filePath: filePath, + filePath, percentage: 0, status: "initializing", - uploadId: uploadId, - physicalAddress: physicalAddress, + uploadToken, uploadSpeed: 0, estimatedTimeRemaining: 0, totalTime: 0, }); - // Keep track of all uploaded parts - const uploadedParts: { PartNumber: number; ETag: string }[] = []; - - // 1) Convert presignedUrls into a stream of URLs - return from(presignedUrls).pipe( - // 2) Use mergeMap with concurrency limit to upload chunk by chunk - mergeMap((url, index) => { - const partNumber = index + 1; - const start = index * partSize; - const end = Math.min(start + partSize, file.size); - const chunk = file.slice(start, end); - - // Upload the chunk - return new Observable(partObserver => { - const xhr = new XMLHttpRequest(); - - xhr.upload.addEventListener("progress", event => { - if (event.lengthComputable) { - // Update this specific part's progress - partProgress.set(partNumber, event.loaded); - - // Calculate total progress across all parts - let totalUploaded = 0; - partProgress.forEach(bytes => (totalUploaded += bytes)); - const percentage = Math.round((totalUploaded / file.size) * 100); - const stats = calculateStats(totalUploaded); - - observer.next({ - filePath, - percentage: Math.min(percentage, 99), // Cap at 99% until finalized - status: "uploading", - uploadId, - physicalAddress, - ...stats, - }); - } - }); - - xhr.addEventListener("load", () => { - if (xhr.status === 200 || xhr.status === 201) { - const etag = xhr.getResponseHeader("ETag")?.replace(/"/g, ""); - if (!etag) { - partObserver.error(new Error(`Missing ETag for part ${partNumber}`)); - return; + // 2. Upload each part to /multipart-upload/part using XMLHttpRequest + return from(Array.from({ length: partCount }, (_, i) => i)).pipe( + mergeMap( + index => { + const partNumber = index + 1; + const start = index * partSize; + const end = Math.min(start + partSize, file.size); + const chunk = file.slice(start, end); + + return new Observable(partObserver => { + const xhr = new XMLHttpRequest(); + + xhr.upload.addEventListener("progress", event => { + if (event.lengthComputable) { + partProgress.set(partNumber, event.loaded); + + let totalUploaded = 0; + partProgress.forEach(bytes => { + totalUploaded += bytes; + }); + + const percentage = Math.round((totalUploaded / file.size) * 100); + const stats = calculateStats(totalUploaded); + + observer.next({ + filePath, + percentage: Math.min(percentage, 99), + status: "uploading", + uploadToken, + ...stats, + }); + } + }); + + xhr.addEventListener("load", () => { + if (xhr.status === 200 || xhr.status === 204) { + // Mark part as fully uploaded + partProgress.set(partNumber, chunk.size); + + let totalUploaded = 0; + partProgress.forEach(bytes => { + totalUploaded += bytes; + }); + + // Force stats recompute on completion + lastUpdateTime = 0; + const percentage = Math.round((totalUploaded / file.size) * 100); + const stats = calculateStats(totalUploaded); + + observer.next({ + filePath, + percentage: Math.min(percentage, 99), + status: "uploading", + uploadToken, + ...stats, + }); + + partObserver.complete(); + } else { + partObserver.error( + new Error(`Failed to upload part ${partNumber} (HTTP ${xhr.status})`) + ); } + }); - // Mark this part as fully uploaded - partProgress.set(partNumber, chunk.size); - uploadedParts.push({ PartNumber: partNumber, ETag: etag }); - - // Recalculate progress - let totalUploaded = 0; - partProgress.forEach(bytes => (totalUploaded += bytes)); - const percentage = Math.round((totalUploaded / file.size) * 100); - lastUpdateTime = 0; - const stats = calculateStats(totalUploaded); - - observer.next({ - filePath, - percentage: Math.min(percentage, 99), - status: "uploading", - uploadId, - physicalAddress, - ...stats, - }); - partObserver.complete(); - } else { + xhr.addEventListener("error", () => { + // Remove failed part from progress + partProgress.delete(partNumber); partObserver.error(new Error(`Failed to upload part ${partNumber}`)); - } - }); + }); - xhr.addEventListener("error", () => { - // Remove failed part from progress - partProgress.delete(partNumber); - partObserver.error(new Error(`Failed to upload part ${partNumber}`)); - }); + const partUrl = + `${AppSettings.getApiEndpoint()}/${DATASET_BASE_URL}/multipart-upload/part` + + `?token=${encodeURIComponent(uploadToken)}&partNumber=${partNumber}`; - xhr.open("PUT", url); - xhr.send(chunk); - }); - }, concurrencyLimit), - - // 3) Collect results from all uploads (like forkJoin, but respects concurrency) - toArray(), - // 4) Finalize if all parts succeeded - switchMap(() => - this.finalizeMultipartUpload( - ownerEmail, - datasetName, - filePath, - uploadId, - uploadedParts, - physicalAddress, - false - ) + xhr.open("POST", partUrl); + xhr.setRequestHeader("Content-Type", "application/octet-stream"); + const token = AuthService.getAccessToken(); + if (token) { + xhr.setRequestHeader("Authorization", `Bearer ${token}`); + } + xhr.send(chunk); + return () => { + try { + xhr.abort(); + } catch {} + }; + }); + }, + concurrencyLimit ), + toArray(), // wait for all parts + // 3. FINISH: notify backend that all parts are done + switchMap(() => { + const finishParams = new HttpParams() + .set("type", "finish") + .set("ownerEmail", ownerEmail) + .set("datasetName", datasetName) + .set("filePath", encodeURIComponent(filePath)); + + const body = { uploadToken }; + + return this.http.post( + `${AppSettings.getApiEndpoint()}/${DATASET_BASE_URL}/multipart-upload`, + body, + { params: finishParams } + ); + }), tap(() => { + const totalTime = getTotalTime(); observer.next({ filePath, percentage: 100, status: "finished", - uploadId: uploadId, - physicalAddress: physicalAddress, + uploadToken, uploadSpeed: 0, estimatedTimeRemaining: 0, - totalTime: getTotalTime(), + totalTime, }); observer.complete(); }), - catchError((error: unknown) => { - // If an error occurred, abort the upload + catchError(error => { + // On error, compute best-effort percentage from bytes we've seen + let totalUploaded = 0; + partProgress.forEach(bytes => { + totalUploaded += bytes; + }); + const percentage = + file.size > 0 ? Math.round((totalUploaded / file.size) * 100) : 0; + observer.next({ filePath, - percentage: Math.round((uploadedParts.length / partCount) * 100), + percentage, status: "aborted", - uploadId: uploadId, - physicalAddress: physicalAddress, + uploadToken, uploadSpeed: 0, estimatedTimeRemaining: 0, totalTime: getTotalTime(), }); - return this.finalizeMultipartUpload( - ownerEmail, - datasetName, - filePath, - uploadId, - uploadedParts, - physicalAddress, - true - ).pipe(switchMap(() => throwError(() => error))); + // Abort on backend + const abortParams = new HttpParams() + .set("type", "abort") + .set("ownerEmail", ownerEmail) + .set("datasetName", datasetName) + .set("filePath", encodeURIComponent(filePath)); + + const body = { uploadToken }; + + return this.http + .post( + `${AppSettings.getApiEndpoint()}/${DATASET_BASE_URL}/multipart-upload`, + body, + { params: abortParams } + ) + .pipe( + switchMap(() => throwError(() => error)), + catchError(() => throwError(() => error)) + ); }) ); }) ) .subscribe({ - error: (err: unknown) => observer.error(err), + error: err => observer.error(err), }); + return () => subscription.unsubscribe(); }); } - /** - * Initiates a multipart upload and retrieves presigned URLs for each part. - * @param ownerEmail Owner's email - * @param datasetName Dataset Name - * @param filePath File path within the dataset - * @param numParts Number of parts for the multipart upload - */ - private initiateMultipartUpload( - ownerEmail: string, - datasetName: string, - filePath: string, - numParts: number - ): Observable<{ uploadId: string; presignedUrls: string[]; physicalAddress: string }> { - const params = new HttpParams() - .set("type", "init") - .set("ownerEmail", ownerEmail) - .set("datasetName", datasetName) - .set("filePath", encodeURIComponent(filePath)) - .set("numParts", numParts.toString()); - - return this.http.post<{ uploadId: string; presignedUrls: string[]; physicalAddress: string }>( - `${AppSettings.getApiEndpoint()}/${DATASET_BASE_URL}/multipart-upload`, - {}, - { params } - ); - } - - /** - * Completes or aborts a multipart upload, sending part numbers and ETags to the backend. - */ public finalizeMultipartUpload( ownerEmail: string, datasetName: string, filePath: string, - uploadId: string, - parts: { PartNumber: number; ETag: string }[], - physicalAddress: string, + uploadToken: string, isAbort: boolean ): Observable { const params = new HttpParams() .set("type", isAbort ? "abort" : "finish") .set("ownerEmail", ownerEmail) .set("datasetName", datasetName) - .set("filePath", encodeURIComponent(filePath)) - .set("uploadId", uploadId); + .set("filePath", encodeURIComponent(filePath)); return this.http.post( `${AppSettings.getApiEndpoint()}/${DATASET_BASE_URL}/multipart-upload`, - { parts, physicalAddress }, + { uploadToken }, { params } ); } diff --git a/sql/texera_ddl.sql b/sql/texera_ddl.sql index 7b0f9b9063d..b0ea0162f5e 100644 --- a/sql/texera_ddl.sql +++ b/sql/texera_ddl.sql @@ -58,6 +58,8 @@ DROP TABLE IF EXISTS workflow_version CASCADE; DROP TABLE IF EXISTS project CASCADE; DROP TABLE IF EXISTS workflow_of_project CASCADE; DROP TABLE IF EXISTS workflow_executions CASCADE; +DROP TABLE IF EXISTS dataset_upload_session_part CASCADE; +DROP TABLE IF EXISTS dataset_upload_session CASCADE; DROP TABLE IF EXISTS dataset CASCADE; DROP TABLE IF EXISTS dataset_user_access CASCADE; DROP TABLE IF EXISTS dataset_version CASCADE; @@ -79,11 +81,13 @@ DROP TABLE IF EXISTS computing_unit_user_access CASCADE; DROP TYPE IF EXISTS user_role_enum CASCADE; DROP TYPE IF EXISTS privilege_enum CASCADE; DROP TYPE IF EXISTS action_enum CASCADE; +DROP TYPE IF EXISTS upload_part_status_enum CASCADE; CREATE TYPE user_role_enum AS ENUM ('INACTIVE', 'RESTRICTED', 'REGULAR', 'ADMIN'); CREATE TYPE action_enum AS ENUM ('like', 'unlike', 'view', 'clone'); CREATE TYPE privilege_enum AS ENUM ('NONE', 'READ', 'WRITE'); CREATE TYPE workflow_computing_unit_type_enum AS ENUM ('local', 'kubernetes'); +CREATE TYPE upload_part_status_enum AS ENUM ('PENDING', 'UPLOADING', 'COMPLETED'); -- ============================================ -- 5. Create tables @@ -274,6 +278,37 @@ CREATE TABLE IF NOT EXISTS dataset_version FOREIGN KEY (did) REFERENCES dataset(did) ON DELETE CASCADE ); +-- dataset_upload_session: tracks one multipart upload session for a single file +CREATE TABLE IF NOT EXISTS dataset_upload_session +( + upload_token UUID PRIMARY KEY, + did INT NOT NULL, + uid INT NOT NULL, + file_path TEXT NOT NULL, + upload_id VARCHAR(256) NOT NULL, + physical_address TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + + FOREIGN KEY (did) REFERENCES dataset(did) ON DELETE CASCADE, + FOREIGN KEY (uid) REFERENCES "user"(uid) ON DELETE CASCADE +); + +-- dataset_upload_session_part: one row per (session, partNumber) +CREATE TABLE IF NOT EXISTS dataset_upload_session_part +( + upload_token UUID NOT NULL, + part_number INT NOT NULL, + status upload_part_status_enum NOT NULL DEFAULT 'PENDING', + etag VARCHAR(256), + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + presigned_url TEXT NOT NULL, + + PRIMARY KEY (upload_token, part_number), + FOREIGN KEY (upload_token) REFERENCES dataset_upload_session(upload_token) ON DELETE CASCADE +); + -- operator_executions (modified to match MySQL: no separate primary key; added console_messages_uri) CREATE TABLE IF NOT EXISTS operator_executions (