Skip to content

Commit

Permalink
[HUDI-7471] Use existing util method to get Spark conf in tests (#10802)
Browse files Browse the repository at this point in the history
  • Loading branch information
yihua committed May 14, 2024
1 parent 39135e7 commit 4f2d60d
Show file tree
Hide file tree
Showing 9 changed files with 21 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public class HoodieClientTestUtils {
*/
public static SparkConf getSparkConfForTest(String appName) {
SparkConf sparkConf = new SparkConf().setAppName(appName)
.setMaster("local[4]")
.setMaster("local[8]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar")
.set("spark.sql.shuffle.partitions", "4")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public interface SparkProvider extends org.apache.hudi.testutils.providers.Hoodi
default SparkConf conf(Map<String, String> overwritingConfigs) {
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.app.name", getClass().getName());
sparkConf.set("spark.master", "local[*]");
sparkConf.set("spark.master", "local[8]");
sparkConf.set("spark.default.parallelism", "4");
sparkConf.set("spark.sql.shuffle.partitions", "4");
sparkConf.set("spark.driver.maxResultSize", "2g");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.execution.datasources

import org.apache.hadoop.fs.Path
import org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest
import org.apache.spark.sql.SparkSession
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
Expand All @@ -31,9 +32,7 @@ class TestHoodieInMemoryFileIndex {
@Test
def testCreateInMemoryIndex(@TempDir tempDir: File): Unit = {
val spark = SparkSession.builder
.appName("Hoodie Datasource test")
.master("local[2]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config(getSparkConfForTest("Hoodie Datasource test"))
.getOrCreate

val folders: Seq[Path] = Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,15 @@ class TestHoodieSparkSqlWriter {
@Test
def testThrowExceptionInvalidSerializer(): Unit = {
spark.stop()
val session = SparkSession.builder().appName("hoodie_test").master("local").getOrCreate()
val session = SparkSession.builder()
// Here we intentionally remove the "spark.serializer" config to test failure
.config(getSparkConfForTest("hoodie_test").remove("spark.serializer"))
.getOrCreate()
try {
val sqlContext = session.sqlContext
val options = Map("path" -> "hoodie/test/path", HoodieWriteConfig.TBL_NAME.key -> "hoodie_test_tbl")
val options = Map(
"path" -> (tempPath.toUri.toString + "/testThrowExceptionInvalidSerializer/basePath"),
HoodieWriteConfig.TBL_NAME.key -> "hoodie_test_tbl")
val e = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.ErrorIfExists, options,
session.emptyDataFrame))
assert(e.getMessage.contains("spark.serializer"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.hudi

import org.apache.avro.generic.GenericRecord
import org.apache.hudi.testutils.DataSourceTestUtils
import org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest
import org.apache.spark.sql.types.{ArrayType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.junit.jupiter.api.Assertions._
Expand Down Expand Up @@ -88,11 +89,7 @@ class TestHoodieSparkUtils {
@Test
def testCreateRddSchemaEvol(): Unit = {
val spark = SparkSession.builder
.appName("Hoodie Datasource test")
.master("local[2]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar")
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
.config(getSparkConfForTest("Hoodie Datasource test"))
.getOrCreate

val schema = DataSourceTestUtils.getStructTypeExampleSchema
Expand Down Expand Up @@ -126,11 +123,7 @@ class TestHoodieSparkUtils {
@Test
def testCreateRddWithNestedSchemas(): Unit = {
val spark = SparkSession.builder
.appName("Hoodie Datasource test")
.master("local[2]")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar")
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
.config(getSparkConfForTest("Hoodie Datasource test"))
.getOrCreate

val innerStruct1 = new StructType().add("innerKey","string",false).add("innerValue", "long", true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

import java.util.stream.Stream;

import static org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand All @@ -64,9 +65,7 @@ public class TestSourceFormatAdapter {
public static void start() {
spark = SparkSession
.builder()
.master("local[*]")
.appName(TestSourceFormatAdapter.class.getName())
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config(getSparkConfForTest(TestSourceFormatAdapter.class.getName()))
.getOrCreate();
jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.utilities.deltastreamer.TestSourceFormatAdapter;
import org.apache.hudi.utilities.testutils.SanitizationTestUtils;

import org.apache.avro.Schema;
Expand All @@ -45,6 +44,7 @@
import java.io.InputStream;
import java.util.stream.Stream;

import static org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest;
import static org.apache.hudi.utilities.testutils.SanitizationTestUtils.generateProperFormattedSchema;
import static org.apache.hudi.utilities.testutils.SanitizationTestUtils.generateRenamedSchemaWithConfiguredReplacement;
import static org.apache.hudi.utilities.testutils.SanitizationTestUtils.generateRenamedSchemaWithDefaultReplacement;
Expand All @@ -61,9 +61,7 @@ public class TestSanitizationUtils {
public static void start() {
spark = SparkSession
.builder()
.master("local[*]")
.appName(TestSourceFormatAdapter.class.getName())
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config(getSparkConfForTest(TestSanitizationUtils.class.getName()))
.getOrCreate();
jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public static void initTestServices(boolean needsHdfs, boolean needsHive, boolea
zookeeperTestService.start();
}

jsc = UtilHelpers.buildSparkContext(UtilitiesTestBase.class.getName() + "-hoodie", "local[4]");
jsc = UtilHelpers.buildSparkContext(UtilitiesTestBase.class.getName() + "-hoodie", "local[8]");
context = new HoodieSparkEngineContext(jsc);
sqlContext = new SQLContext(jsc);
sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.util.Collections;

import static org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

Expand All @@ -39,8 +40,7 @@ public void testSqlQuery() {

SparkSession spark = SparkSession
.builder()
.master("local[2]")
.appName(TestSqlQueryBasedTransformer.class.getName())
.config(getSparkConfForTest(TestSqlQueryBasedTransformer.class.getName()))
.getOrCreate();

JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
Expand Down

0 comments on commit 4f2d60d

Please sign in to comment.