From dbfd4d7444dac5f0a674495b98a89228774040da Mon Sep 17 00:00:00 2001 From: Shubham21k Date: Wed, 5 Feb 2025 12:07:40 +0530 Subject: [PATCH] [HUDI-8941] stop spark context basis version using SparkAdaptor & add javadocs --- .../scala/org/apache/spark/sql/hudi/SparkAdapter.scala | 9 +++++++++ .../org/apache/spark/sql/adapter/BaseSpark3Adapter.scala | 9 ++++----- .../org/apache/spark/sql/adapter/Spark3_3Adapter.scala | 1 + .../org/apache/spark/sql/adapter/Spark3_4Adapter.scala | 3 ++- .../apache/hudi/utilities/streamer/HoodieStreamer.java | 1 - 5 files changed, 16 insertions(+), 7 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala index b1dc2cdfb4ac..febb7cd87a26 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala @@ -21,6 +21,7 @@ package org.apache.spark.sql.hudi import org.apache.hudi.client.utils.SparkRowSerDe import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.storage.StoragePath + import org.apache.avro.Schema import org.apache.hadoop.conf.Configuration import org.apache.spark.api.java.JavaSparkContext @@ -240,5 +241,13 @@ trait SparkAdapter extends Serializable { queryExecution: QueryExecution, name: Option[String] = None)(body: => T): T + + /** + * Stop spark context with exit code + * + * @param jssc JavaSparkContext object to shutdown the spark context + * @param exitCode passed as a param to shutdown spark context with provided exit code + * @return + */ def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit } diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala index f10657c7207a..4534af3dd222 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala @@ -26,7 +26,7 @@ import org.apache.hudi.spark3.internal.ReflectUtil import org.apache.hudi.storage.StoragePath import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.internal.Logging -import org.apache.spark.sql.{HoodieSpark3CatalogUtils, SQLContext, SparkSession} +import org.apache.spark.sql.{HoodieSpark3CatalogUtils, SparkSession, SQLContext} import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, HoodieSparkAvroSchemaConverters} import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate, Predicate} import org.apache.spark.sql.catalyst.util.DateFormatter @@ -35,12 +35,13 @@ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hudi.SparkAdapter import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch} +import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} import org.apache.spark.storage.StorageLevel import java.time.ZoneId import java.util.TimeZone import java.util.concurrent.ConcurrentHashMap + import scala.collection.JavaConverters._ /** @@ -105,7 +106,5 @@ abstract class BaseSpark3Adapter extends SparkAdapter with Logging { SQLExecution.withNewExecutionId(queryExecution, name)(body) } - override def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit = { - jssc.stop() - } + def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit } diff --git a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala index e811757d5e28..c926f2e8fd0b 100644 --- a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.adapter import org.apache.hudi.Spark33HoodieFileScanRDD import org.apache.avro.Schema import org.apache.hadoop.conf.Configuration +import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql._ import org.apache.spark.sql.avro._ import org.apache.spark.sql.catalyst.InternalRow diff --git a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala index 9698126f5cae..61cb16906ccc 100644 --- a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala +++ b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.adapter import org.apache.avro.Schema import org.apache.hadoop.conf.Configuration import org.apache.hudi.Spark34HoodieFileScanRDD +import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql.avro._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, ResolvedTable} @@ -146,6 +147,6 @@ class Spark3_4Adapter extends BaseSpark3Adapter { } override def stopSparkContext(jssc: JavaSparkContext, exitCode: Int) = { - jssc.stop() + jssc.sc.stop(exitCode) } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java index 86ae8251168b..9636b0a3102f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java @@ -78,7 +78,6 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.hudi.SparkAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory;