From f0c8f18ee09297c4c92b36d8ce7d0e0259f80178 Mon Sep 17 00:00:00 2001 From: Zhanghao Chen Date: Tue, 7 Jan 2025 10:29:46 +0800 Subject: [PATCH] [FLINK-22091][yarn] Make Flink on YARN honor env.java.home (#25877) --- .../generated/environment_configuration.html | 6 +++ .../flink/configuration/ConfigConstants.java | 2 + .../flink/configuration/CoreOptions.java | 11 ++++ .../ContaineredTaskManagerParameters.java | 7 +++ .../flink/yarn/YarnClusterDescriptor.java | 5 ++ .../java/org/apache/flink/yarn/UtilsTest.java | 54 ++++++++++++++----- .../flink/yarn/YarnClusterDescriptorTest.java | 19 ++++++- 7 files changed, 89 insertions(+), 15 deletions(-) diff --git a/docs/layouts/shortcodes/generated/environment_configuration.html b/docs/layouts/shortcodes/generated/environment_configuration.html index 6f520e5abff1d..84712c412cd82 100644 --- a/docs/layouts/shortcodes/generated/environment_configuration.html +++ b/docs/layouts/shortcodes/generated/environment_configuration.html @@ -38,6 +38,12 @@ String A string of default JVM options to prepend to env.java.opts.taskmanager. This is intended to be set by administrators. + +
env.java.home
+ (none) + String + Location where Java is installed. If not specified, Flink will use your default Java installation. +
env.java.opts.all
(none) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 653be48964feb..220a1ddcf7350 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -65,6 +65,8 @@ public final class ConfigConstants { // ----------------------------- Environment Variables ---------------------------- + public static final String ENV_JAVA_HOME = "JAVA_HOME"; + /** The environment variable name which contains the location of the configuration directory. */ public static final String ENV_FLINK_CONF_DIR = "FLINK_CONF_DIR"; diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java index 03b92b328c6c3..a0570fb1a6163 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java @@ -225,6 +225,17 @@ public static String[] mergeListsToArray(List base, List append) // process parameters // ------------------------------------------------------------------------ + public static final ConfigOption FLINK_JAVA_HOME = + ConfigOptions.key("env.java.home") + .stringType() + .noDefaultValue() + .withDescription( + Description.builder() + .text( + "Location where Java is installed. If not specified," + + " Flink will use your default Java installation.") + .build()); + public static final ConfigOption FLINK_JVM_OPTIONS = ConfigOptions.key("env.java.opts.all") .stringType() diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java index ba84f97cdb304..c6a34df24fd55 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java @@ -19,11 +19,14 @@ package org.apache.flink.runtime.clusterframework; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.ResourceManagerOptions; import java.util.HashMap; import java.util.Map; +import static org.apache.flink.configuration.ConfigConstants.ENV_JAVA_HOME; + /** This class describes the basic parameters for launching a TaskManager process. */ public class ContaineredTaskManagerParameters implements java.io.Serializable { @@ -90,6 +93,10 @@ public static ContaineredTaskManagerParameters create( } } + // set JAVA_HOME + config.getOptional(CoreOptions.FLINK_JAVA_HOME) + .ifPresent(javaHome -> envVars.put(ENV_JAVA_HOME, javaHome)); + // done return new ContaineredTaskManagerParameters(taskExecutorProcessSpec, envVars); } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index c1c77c197d0c2..8d2e88e253adc 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -137,6 +137,7 @@ import static org.apache.flink.configuration.ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR; import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_LIB_DIR; import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_OPT_DIR; +import static org.apache.flink.configuration.ConfigConstants.ENV_JAVA_HOME; import static org.apache.flink.configuration.ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX; import static org.apache.flink.configuration.ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX; import static org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH; @@ -1969,6 +1970,10 @@ Map generateApplicationMasterEnv( ConfigurationUtils.getPrefixedKeyValuePairs( ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX, this.flinkConfiguration)); + // set JAVA_HOME + this.flinkConfiguration + .getOptional(CoreOptions.FLINK_JAVA_HOME) + .ifPresent(javaHome -> env.put(ENV_JAVA_HOME, javaHome)); // set Flink app class path env.put(ENV_FLINK_CLASSPATH, classPathStr); // Set FLINK_LIB_DIR to `lib` folder under working dir in container diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java index 559071559d7e6..eb3a6c1c37fb3 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java @@ -44,8 +44,11 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Stream; +import static org.apache.flink.configuration.ConfigConstants.ENV_JAVA_HOME; +import static org.apache.flink.configuration.ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX; import static org.apache.flink.yarn.configuration.YarnConfigOptions.YARN_CONTAINER_START_COMMAND_TEMPLATE; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -56,6 +59,19 @@ class UtilsTest { private static final String YARN_RM_ARBITRARY_SCHEDULER_CLAZZ = "org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler"; + private static final TaskExecutorProcessSpec TASK_EXECUTOR_PROCESS_SPEC = + new TaskExecutorProcessSpec( + new CPUResource(1.0), + new MemorySize(0), // frameworkHeapSize + new MemorySize(0), // frameworkOffHeapSize + new MemorySize(111), // taskHeapSize + new MemorySize(0), // taskOffHeapSize + new MemorySize(222), // networkMemSize + new MemorySize(0), // managedMemorySize + new MemorySize(333), // jvmMetaspaceSize + new MemorySize(0), // jvmOverheadSize + Collections.emptyList()); + @Test void testDeleteApplicationFiles(@TempDir Path tempDir) throws Exception { final Path applicationFilesDir = Files.createTempDirectory(tempDir, ".flink"); @@ -208,20 +224,8 @@ void testGetYarnConfiguration() { @Test void testGetTaskManagerShellCommand() { final Configuration cfg = new Configuration(); - final TaskExecutorProcessSpec taskExecutorProcessSpec = - new TaskExecutorProcessSpec( - new CPUResource(1.0), - new MemorySize(0), // frameworkHeapSize - new MemorySize(0), // frameworkOffHeapSize - new MemorySize(111), // taskHeapSize - new MemorySize(0), // taskOffHeapSize - new MemorySize(222), // networkMemSize - new MemorySize(0), // managedMemorySize - new MemorySize(333), // jvmMetaspaceSize - new MemorySize(0), // jvmOverheadSize - Collections.emptyList()); final ContaineredTaskManagerParameters containeredParams = - new ContaineredTaskManagerParameters(taskExecutorProcessSpec, new HashMap<>()); + new ContaineredTaskManagerParameters(TASK_EXECUTOR_PROCESS_SPEC, new HashMap<>()); // no logging, with/out krb5 final String java = "$JAVA_HOME/bin/java"; @@ -238,7 +242,8 @@ void testGetTaskManagerShellCommand() { + " -Dlog4j.configurationFile=file:./conf/log4j.properties"; // if set final String mainClass = "org.apache.flink.yarn.UtilsTest"; final String dynamicConfigs = - TaskExecutorProcessUtils.generateDynamicConfigsStr(taskExecutorProcessSpec).trim(); + TaskExecutorProcessUtils.generateDynamicConfigsStr(TASK_EXECUTOR_PROCESS_SPEC) + .trim(); final String basicArgs = "--configDir ./conf"; final String mainArgs = "-Djobmanager.rpc.address=host1 -Dkey.a=v1"; final String args = dynamicConfigs + " " + basicArgs + " " + mainArgs; @@ -674,6 +679,27 @@ void testGenerateJvmOptsString() { Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS)); } + @Test + void testGetTaskManagerEnvsWithJavaHomeSet() { + final Configuration cfg = new Configuration(); + cfg.set(CoreOptions.FLINK_JAVA_HOME, "/opt/jdk"); + cfg.setString(CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + "key", "val"); + final ContaineredTaskManagerParameters containeredParams = + ContaineredTaskManagerParameters.create(cfg, TASK_EXECUTOR_PROCESS_SPEC); + final Map envVars = containeredParams.taskManagerEnv(); + assertThat(envVars).containsEntry(ENV_JAVA_HOME, "/opt/jdk").containsEntry("key", "val"); + } + + @Test + void testGetTaskManagerEnvsWithoutJavaHomeSet() { + final Configuration cfg = new Configuration(); + cfg.setString(CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + "key", "val"); + final ContaineredTaskManagerParameters containeredParams = + ContaineredTaskManagerParameters.create(cfg, TASK_EXECUTOR_PROCESS_SPEC); + final Map envVars = containeredParams.taskManagerEnv(); + assertThat(envVars).doesNotContainKey(ENV_JAVA_HOME); + } + private static void verifyUnitResourceVariousSchedulers( YarnConfiguration yarnConfig, int minMem, int minVcore, int incMem, int incVcore) { yarnConfig.set(YarnConfiguration.RM_SCHEDULER, Utils.YARN_RM_FAIR_SCHEDULER_CLAZZ); diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java index 1710b1da22acd..7ff1b814d541c 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java @@ -921,11 +921,14 @@ public void testGenerateApplicationMasterEnv(@TempDir File flinkHomeDir) throws final String fakeLocalFlinkJar = "./lib/flink_dist.jar"; final String fakeClassPath = fakeLocalFlinkJar + ":./usrlib/user.jar"; final ApplicationId appId = ApplicationId.newInstance(0, 0); + final Configuration flinkConfig = new Configuration(); + flinkConfig.set(CoreOptions.FLINK_JAVA_HOME, "/opt/jdk"); final Map masterEnv = getTestMasterEnv( - new Configuration(), flinkHomeDir, fakeClassPath, fakeLocalFlinkJar, appId); + flinkConfig, flinkHomeDir, fakeClassPath, fakeLocalFlinkJar, appId); assertThat(masterEnv) + .containsEntry(ConfigConstants.ENV_JAVA_HOME, "/opt/jdk") .containsEntry(ConfigConstants.ENV_FLINK_LIB_DIR, "./lib") .containsEntry(YarnConfigKeys.ENV_APP_ID, appId.toString()) .containsEntry( @@ -940,6 +943,20 @@ public void testGenerateApplicationMasterEnv(@TempDir File flinkHomeDir) throws .containsEntry(YarnConfigKeys.ENV_CLIENT_HOME_DIR, flinkHomeDir.getPath()); } + @Test + public void testContainerEnvJavaHomeNotOverriddenByDefault(@TempDir File flinkHomeDir) + throws IOException { + final Configuration flinkConfig = new Configuration(); + final Map masterEnv = + getTestMasterEnv( + flinkConfig, + flinkHomeDir, + "", + "./lib/flink_dist.jar", + ApplicationId.newInstance(0, 0)); + assertThat(masterEnv).doesNotContainKey(ConfigConstants.ENV_JAVA_HOME); + } + @Test public void testEnvFlinkLibDirVarNotOverriddenByContainerEnv(@TempDir File tmpDir) throws IOException {