Skip to content

Commit

Permalink
[HUDI-8941] stop spark context basis version using SparkAdaptor & add…
Browse files Browse the repository at this point in the history
… javadocs
  • Loading branch information
Shubham21k committed Feb 5, 2025
1 parent f1c9fc7 commit dbfd4d7
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.apache.spark.sql.hudi
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.storage.StoragePath

import org.apache.avro.Schema
import org.apache.hadoop.conf.Configuration
import org.apache.spark.api.java.JavaSparkContext
Expand Down Expand Up @@ -240,5 +241,13 @@ trait SparkAdapter extends Serializable {
queryExecution: QueryExecution,
name: Option[String] = None)(body: => T): T


/**
* Stop spark context with exit code
*
* @param jssc JavaSparkContext object to shutdown the spark context
* @param exitCode passed as a param to shutdown spark context with provided exit code
* @return
*/
def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.hudi.spark3.internal.ReflectUtil
import org.apache.hudi.storage.StoragePath
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{HoodieSpark3CatalogUtils, SQLContext, SparkSession}
import org.apache.spark.sql.{HoodieSpark3CatalogUtils, SparkSession, SQLContext}
import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, HoodieSparkAvroSchemaConverters}
import org.apache.spark.sql.catalyst.expressions.{Expression, InterpretedPredicate, Predicate}
import org.apache.spark.sql.catalyst.util.DateFormatter
Expand All @@ -35,12 +35,13 @@ import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hudi.SparkAdapter
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.storage.StorageLevel

import java.time.ZoneId
import java.util.TimeZone
import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._

/**
Expand Down Expand Up @@ -105,7 +106,5 @@ abstract class BaseSpark3Adapter extends SparkAdapter with Logging {
SQLExecution.withNewExecutionId(queryExecution, name)(body)
}

override def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit = {
jssc.stop()
}
def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.adapter
import org.apache.hudi.Spark33HoodieFileScanRDD
import org.apache.avro.Schema
import org.apache.hadoop.conf.Configuration
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.avro._
import org.apache.spark.sql.catalyst.InternalRow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.adapter
import org.apache.avro.Schema
import org.apache.hadoop.conf.Configuration
import org.apache.hudi.Spark34HoodieFileScanRDD
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.avro._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, ResolvedTable}
Expand Down Expand Up @@ -146,6 +147,6 @@ class Spark3_4Adapter extends BaseSpark3Adapter {
}

override def stopSparkContext(jssc: JavaSparkContext, exitCode: Int) = {
jssc.stop()
jssc.sc.stop(exitCode)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.hudi.SparkAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down

0 comments on commit dbfd4d7

Please sign in to comment.