diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index 93e20031f3dc..febb7cd87a26 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -24,6 +24,7 @@ import org.apache.hudi.storage.StoragePath import org.apache.avro.Schema import org.apache.hadoop.conf.Configuration +import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql._ import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConverters, HoodieAvroSerializer} import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases @@ -239,4 +240,14 @@ trait SparkAdapter extends Serializable { def sqlExecutionWithNewExecutionId[T](sparkSession: SparkSession, queryExecution: QueryExecution, name: Option[String] = None)(body: => T): T + + + /** + * Stop spark context with exit code + * + * @param jssc JavaSparkContext object to shutdown the spark context + * @param exitCode passed as a param to shutdown spark context with provided exit code + * @return + */ + def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit } diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala index 539ce713c53f..4534af3dd222 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala @@ -24,6 +24,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.util.JsonUtils import org.apache.hudi.spark3.internal.ReflectUtil import org.apache.hudi.storage.StoragePath +import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.internal.Logging import org.apache.spark.sql.{HoodieSpark3CatalogUtils, SparkSession, SQLContext} import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, HoodieSparkAvroSchemaConverters} @@ -104,4 +105,6 @@ abstract class BaseSpark3Adapter extends SparkAdapter with Logging { name: Option[String])(body: => T): T = { SQLExecution.withNewExecutionId(queryExecution, name)(body) } + + def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit } diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala index 35afd221abe9..d1b25b6c9762 100644 --- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.adapter import org.apache.hudi.Spark33HoodieFileScanRDD import org.apache.avro.Schema import org.apache.hadoop.conf.Configuration +import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql._ import org.apache.spark.sql.avro._ import org.apache.spark.sql.catalyst.InternalRow @@ -144,4 +145,8 @@ class Spark3_3Adapter extends BaseSpark3Adapter { hadoopConf: Configuration): SparkParquetReader = { Spark33ParquetReader.build(vectorized, sqlConf, options, hadoopConf) } + + override def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit = { + jssc.stop() + } } diff --git a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala index 1eb6c97f362f..a388eefb3b09 100644 --- a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.adapter import org.apache.avro.Schema import org.apache.hadoop.conf.Configuration import org.apache.hudi.Spark34HoodieFileScanRDD +import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.avro._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, ResolvedTable} @@ -144,4 +145,8 @@ class Spark3_4Adapter extends BaseSpark3Adapter { hadoopConf: Configuration): SparkParquetReader = { Spark34ParquetReader.build(vectorized, sqlConf, options, hadoopConf) } + + override def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit = { + jssc.sc.stop(exitCode) + } } diff --git a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala index 40019f3f28a4..9ba2b6ff1bc0 100644 --- a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.adapter import org.apache.avro.Schema import org.apache.hadoop.conf.Configuration import org.apache.hudi.Spark35HoodieFileScanRDD +import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql._ import org.apache.spark.sql.avro._ import org.apache.spark.sql.catalyst.InternalRow @@ -148,4 +149,8 @@ class Spark3_5Adapter extends BaseSpark3Adapter { hadoopConf: Configuration): SparkParquetReader = { Spark35ParquetReader.build(vectorized, sqlConf, options, hadoopConf) } + + override def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit = { + jssc.sc.stop(exitCode) + } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java index 05b902087a66..923f41d1d09a 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java @@ -18,6 +18,7 @@ package org.apache.hudi.utilities; +import org.apache.hudi.SparkAdapterSupport$; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieSparkEngineContext; @@ -31,6 +32,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.utilities.streamer.HoodieStreamer; @@ -98,10 +100,14 @@ public static void main(String[] args) { HDFSParquetImporter dataImporter = new HDFSParquetImporter(cfg); JavaSparkContext jssc = UtilHelpers.buildSparkContext("data-importer-" + cfg.tableName, cfg.sparkMaster, cfg.sparkMemory); + int exitCode = 0; try { dataImporter.dataImport(jssc, cfg.retry); + } catch (Throwable throwable) { + exitCode = 1; + throw new HoodieException("Failed to run HoodieStreamer ", throwable); } finally { - jssc.stop(); + SparkAdapterSupport$.MODULE$.sparkAdapter().stopSparkContext(jssc, exitCode); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java index b09c056c6fe5..8be5ede95377 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java @@ -18,6 +18,7 @@ package org.apache.hudi.utilities; +import org.apache.hudi.SparkAdapterSupport$; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.TypedProperties; @@ -110,12 +111,14 @@ public static void main(String[] args) { String dirName = new Path(cfg.basePath).getName(); JavaSparkContext jssc = UtilHelpers.buildSparkContext("hoodie-cleaner-" + dirName, cfg.sparkMaster); + int exitCode = 0; try { new HoodieCleaner(cfg, jssc).run(); } catch (Throwable throwable) { + exitCode = 1; throw new HoodieException("Failed to run cleaning for " + cfg.basePath, throwable); } finally { - jssc.stop(); + SparkAdapterSupport$.MODULE$.sparkAdapter().stopSparkContext(jssc, exitCode); } LOG.info("Cleaner ran successfully"); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieTTLJob.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieTTLJob.java index 7ab5b5747c6f..e938a5d7f016 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieTTLJob.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieTTLJob.java @@ -18,6 +18,7 @@ package org.apache.hudi.utilities; +import org.apache.hudi.SparkAdapterSupport$; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -117,12 +118,14 @@ public static void main(String[] args) { String dirName = new Path(cfg.basePath).getName(); JavaSparkContext jssc = UtilHelpers.buildSparkContext("hoodie-ttl-job-" + dirName, cfg.sparkMaster); + int exitCode = 0; try { new HoodieTTLJob(jssc, cfg).run(); } catch (Throwable throwable) { + exitCode = 1; throw new HoodieException("Failed to run ttl for " + cfg.basePath, throwable); } finally { - jssc.stop(); + SparkAdapterSupport$.MODULE$.sparkAdapter().stopSparkContext(jssc, exitCode); } LOG.info("Hoodie TTL job ran successfully"); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieMultiTableStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieMultiTableStreamer.java index 92c4a843c94a..06b829948657 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieMultiTableStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieMultiTableStreamer.java @@ -19,6 +19,7 @@ package org.apache.hudi.utilities.streamer; +import org.apache.hudi.SparkAdapterSupport$; import org.apache.hudi.client.utils.OperationConverter; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; @@ -279,10 +280,14 @@ public static void main(String[] args) throws IOException { } JavaSparkContext jssc = UtilHelpers.buildSparkContext("multi-table-streamer", Constants.LOCAL_SPARK_MASTER); + int exitCode = 0; try { new HoodieMultiTableStreamer(config, jssc).sync(); + } catch (Throwable throwable) { + exitCode = 1; + throw new HoodieException("Failed to run HoodieMultiTableStreamer ", throwable); } finally { - jssc.stop(); + SparkAdapterSupport$.MODULE$.sparkAdapter().stopSparkContext(jssc, exitCode); } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java index cfe01dfde68a..9636b0a3102f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java @@ -21,6 +21,7 @@ import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.HoodieWriterUtils; +import org.apache.hudi.SparkAdapterSupport$; import org.apache.hudi.async.AsyncClusteringService; import org.apache.hudi.async.AsyncCompactService; import org.apache.hudi.async.SparkAsyncClusteringService; @@ -640,10 +641,14 @@ public static void main(String[] args) throws Exception { LOG.warn("--enable-hive-sync will be deprecated in a future release; please use --enable-sync instead for Hive syncing"); } + int exitCode = 0; try { new HoodieStreamer(cfg, jssc).sync(); + } catch (Throwable throwable) { + exitCode = 1; + throw new HoodieException("Failed to run HoodieStreamer ", throwable); } finally { - jssc.stop(); + SparkAdapterSupport$.MODULE$.sparkAdapter().stopSparkContext(jssc, exitCode); } }