From 895f912648d653e998dd739cba11bf340bb93c1b Mon Sep 17 00:00:00 2001 From: oshinongit Date: Mon, 30 Sep 2024 13:58:35 +0200 Subject: [PATCH] feat: check and log mediaconvert job status --- src/pipelines/aws/aws-pipeline.ts | 68 +++++++++++++++++++++++++++---- 1 file changed, 60 insertions(+), 8 deletions(-) diff --git a/src/pipelines/aws/aws-pipeline.ts b/src/pipelines/aws/aws-pipeline.ts index 5b7c9c9..add367a 100644 --- a/src/pipelines/aws/aws-pipeline.ts +++ b/src/pipelines/aws/aws-pipeline.ts @@ -1,6 +1,8 @@ import { ECSClient, RunTaskCommand } from '@aws-sdk/client-ecs'; import { CreateJobCommand, + CreateJobCommandOutput, + GetJobCommand, MediaConvertClient } from '@aws-sdk/client-mediaconvert'; import { @@ -40,6 +42,7 @@ export default class AWSPipeline implements Pipeline { private mediaConvert: MediaConvertClient; private ecs: ECSClient; private static readonly MAX_WAIT_TIME = 28800; //Max wait time for AWS resources is 28800 seconds (8 hours). + private static readonly MEDIACONVERT_CHECK_JOB_INTERVAL_MS = 15000; //Check status of mediaconvert job interval in seconds. constructor(configuration: AWSPipelineConfiguration) { this.configuration = configuration; @@ -84,12 +87,60 @@ export default class AWSPipeline implements Pipeline { } } - async waitForObjectInS3(S3Bucket: string, S3Key: string): Promise { + async mediaConvertJobStatus( + mediaConvert: MediaConvertClient, + jobId: string + ): Promise { try { - await waitUntilObjectExists( - { client: this.s3, maxWaitTime: AWSPipeline.MAX_WAIT_TIME }, - { Bucket: S3Bucket, Key: S3Key } - ); + const command = new GetJobCommand({ Id: jobId }); + const response = await mediaConvert.send(command); + if (response.Job?.Status === 'ERROR') { + logger.error(`Job ${jobId} error ${response.Job?.ErrorMessage}`); + } + return response.Job?.Status || undefined; + } catch (error) { + logger.error(`Error getting job ${jobId}: ${error}`); + return undefined; + } + } + + async repeatGetJobStatus( + mediaConvert: MediaConvertClient, + jobId: string, + interval_ms: number + ): Promise { + const status: string | undefined = await this.mediaConvertJobStatus( + mediaConvert, + jobId + ); + + if (status === 'COMPLETED' || status === 'ERROR' || status === 'CANCELED') { + return; + } else { + logger.debug(`Job ${jobId} status: ${status}. Waiting...`); + await new Promise((resolve) => setTimeout(resolve, interval_ms)); + return this.repeatGetJobStatus(mediaConvert, jobId, interval_ms); + } + } + + async waitForObjectInS3( + S3Bucket: string, + S3Key: string, + jobId?: string + ): Promise { + try { + if (jobId) { + await this.repeatGetJobStatus( + this.mediaConvert, + jobId, + AWSPipeline.MEDIACONVERT_CHECK_JOB_INTERVAL_MS + ); + } else { + await waitUntilObjectExists( + { client: this.s3, maxWaitTime: AWSPipeline.MAX_WAIT_TIME }, + { Bucket: S3Bucket, Key: S3Key } + ); + } return true; } catch (error) { logger.error( @@ -230,13 +281,14 @@ export default class AWSPipeline implements Pipeline { // Transcode logger.info('Transcoding ' + inputFilename + ' to ' + outputURI + '...'); + let createJobResponse: CreateJobCommandOutput; try { const accelerationSettings = this.configuration.accelerationMode ? { AccelerationSettings: { Mode: this.configuration.accelerationMode } } : {}; - await this.mediaConvert.send( + createJobResponse = await this.mediaConvert.send( new CreateJobCommand({ Role: this.configuration.mediaConvertRole, Settings: settings, @@ -249,10 +301,10 @@ export default class AWSPipeline implements Pipeline { ); throw error; } - const s3Status = await this.waitForObjectInS3( outputBucket, - `${outputFolder}/${outputObject}` + `${outputFolder}/${outputObject}`, + createJobResponse.Job?.Id ); if (!s3Status) return ''; await this.probeMetadata(outputBucket, outputFolder, outputObject);