diff --git a/docs/layouts/shortcodes/generated/environment_configuration.html b/docs/layouts/shortcodes/generated/environment_configuration.html
index 6f520e5abff1d..84712c412cd82 100644
--- a/docs/layouts/shortcodes/generated/environment_configuration.html
+++ b/docs/layouts/shortcodes/generated/environment_configuration.html
@@ -38,6 +38,12 @@
String |
A string of default JVM options to prepend to env.java.opts.taskmanager . This is intended to be set by administrators. |
+
+ env.java.home |
+ (none) |
+ String |
+ Location where Java is installed. If not specified, Flink will use your default Java installation. |
+
env.java.opts.all |
(none) |
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 653be48964feb..220a1ddcf7350 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -65,6 +65,8 @@ public final class ConfigConstants {
// ----------------------------- Environment Variables ----------------------------
+ public static final String ENV_JAVA_HOME = "JAVA_HOME";
+
/** The environment variable name which contains the location of the configuration directory. */
public static final String ENV_FLINK_CONF_DIR = "FLINK_CONF_DIR";
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
index 03b92b328c6c3..a0570fb1a6163 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java
@@ -225,6 +225,17 @@ public static String[] mergeListsToArray(List base, List append)
// process parameters
// ------------------------------------------------------------------------
+ public static final ConfigOption FLINK_JAVA_HOME =
+ ConfigOptions.key("env.java.home")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ Description.builder()
+ .text(
+ "Location where Java is installed. If not specified,"
+ + " Flink will use your default Java installation.")
+ .build());
+
public static final ConfigOption FLINK_JVM_OPTIONS =
ConfigOptions.key("env.java.opts.all")
.stringType()
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
index ba84f97cdb304..c6a34df24fd55 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/ContaineredTaskManagerParameters.java
@@ -19,11 +19,14 @@
package org.apache.flink.runtime.clusterframework;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
import java.util.HashMap;
import java.util.Map;
+import static org.apache.flink.configuration.ConfigConstants.ENV_JAVA_HOME;
+
/** This class describes the basic parameters for launching a TaskManager process. */
public class ContaineredTaskManagerParameters implements java.io.Serializable {
@@ -90,6 +93,10 @@ public static ContaineredTaskManagerParameters create(
}
}
+ // set JAVA_HOME
+ config.getOptional(CoreOptions.FLINK_JAVA_HOME)
+ .ifPresent(javaHome -> envVars.put(ENV_JAVA_HOME, javaHome));
+
// done
return new ContaineredTaskManagerParameters(taskExecutorProcessSpec, envVars);
}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
index c1c77c197d0c2..8d2e88e253adc 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
@@ -137,6 +137,7 @@
import static org.apache.flink.configuration.ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR;
import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_LIB_DIR;
import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_OPT_DIR;
+import static org.apache.flink.configuration.ConfigConstants.ENV_JAVA_HOME;
import static org.apache.flink.configuration.ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX;
import static org.apache.flink.configuration.ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX;
import static org.apache.flink.runtime.entrypoint.component.FileJobGraphRetriever.JOB_GRAPH_FILE_PATH;
@@ -1969,6 +1970,10 @@ Map generateApplicationMasterEnv(
ConfigurationUtils.getPrefixedKeyValuePairs(
ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX,
this.flinkConfiguration));
+ // set JAVA_HOME
+ this.flinkConfiguration
+ .getOptional(CoreOptions.FLINK_JAVA_HOME)
+ .ifPresent(javaHome -> env.put(ENV_JAVA_HOME, javaHome));
// set Flink app class path
env.put(ENV_FLINK_CLASSPATH, classPathStr);
// Set FLINK_LIB_DIR to `lib` folder under working dir in container
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
index 559071559d7e6..eb3a6c1c37fb3 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -44,8 +44,11 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.stream.Stream;
+import static org.apache.flink.configuration.ConfigConstants.ENV_JAVA_HOME;
+import static org.apache.flink.configuration.ResourceManagerOptions.CONTAINERIZED_TASK_MANAGER_ENV_PREFIX;
import static org.apache.flink.yarn.configuration.YarnConfigOptions.YARN_CONTAINER_START_COMMAND_TEMPLATE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -56,6 +59,19 @@ class UtilsTest {
private static final String YARN_RM_ARBITRARY_SCHEDULER_CLAZZ =
"org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler";
+ private static final TaskExecutorProcessSpec TASK_EXECUTOR_PROCESS_SPEC =
+ new TaskExecutorProcessSpec(
+ new CPUResource(1.0),
+ new MemorySize(0), // frameworkHeapSize
+ new MemorySize(0), // frameworkOffHeapSize
+ new MemorySize(111), // taskHeapSize
+ new MemorySize(0), // taskOffHeapSize
+ new MemorySize(222), // networkMemSize
+ new MemorySize(0), // managedMemorySize
+ new MemorySize(333), // jvmMetaspaceSize
+ new MemorySize(0), // jvmOverheadSize
+ Collections.emptyList());
+
@Test
void testDeleteApplicationFiles(@TempDir Path tempDir) throws Exception {
final Path applicationFilesDir = Files.createTempDirectory(tempDir, ".flink");
@@ -208,20 +224,8 @@ void testGetYarnConfiguration() {
@Test
void testGetTaskManagerShellCommand() {
final Configuration cfg = new Configuration();
- final TaskExecutorProcessSpec taskExecutorProcessSpec =
- new TaskExecutorProcessSpec(
- new CPUResource(1.0),
- new MemorySize(0), // frameworkHeapSize
- new MemorySize(0), // frameworkOffHeapSize
- new MemorySize(111), // taskHeapSize
- new MemorySize(0), // taskOffHeapSize
- new MemorySize(222), // networkMemSize
- new MemorySize(0), // managedMemorySize
- new MemorySize(333), // jvmMetaspaceSize
- new MemorySize(0), // jvmOverheadSize
- Collections.emptyList());
final ContaineredTaskManagerParameters containeredParams =
- new ContaineredTaskManagerParameters(taskExecutorProcessSpec, new HashMap<>());
+ new ContaineredTaskManagerParameters(TASK_EXECUTOR_PROCESS_SPEC, new HashMap<>());
// no logging, with/out krb5
final String java = "$JAVA_HOME/bin/java";
@@ -238,7 +242,8 @@ void testGetTaskManagerShellCommand() {
+ " -Dlog4j.configurationFile=file:./conf/log4j.properties"; // if set
final String mainClass = "org.apache.flink.yarn.UtilsTest";
final String dynamicConfigs =
- TaskExecutorProcessUtils.generateDynamicConfigsStr(taskExecutorProcessSpec).trim();
+ TaskExecutorProcessUtils.generateDynamicConfigsStr(TASK_EXECUTOR_PROCESS_SPEC)
+ .trim();
final String basicArgs = "--configDir ./conf";
final String mainArgs = "-Djobmanager.rpc.address=host1 -Dkey.a=v1";
final String args = dynamicConfigs + " " + basicArgs + " " + mainArgs;
@@ -674,6 +679,27 @@ void testGenerateJvmOptsString() {
Utils.IGNORE_UNRECOGNIZED_VM_OPTIONS));
}
+ @Test
+ void testGetTaskManagerEnvsWithJavaHomeSet() {
+ final Configuration cfg = new Configuration();
+ cfg.set(CoreOptions.FLINK_JAVA_HOME, "/opt/jdk");
+ cfg.setString(CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + "key", "val");
+ final ContaineredTaskManagerParameters containeredParams =
+ ContaineredTaskManagerParameters.create(cfg, TASK_EXECUTOR_PROCESS_SPEC);
+ final Map envVars = containeredParams.taskManagerEnv();
+ assertThat(envVars).containsEntry(ENV_JAVA_HOME, "/opt/jdk").containsEntry("key", "val");
+ }
+
+ @Test
+ void testGetTaskManagerEnvsWithoutJavaHomeSet() {
+ final Configuration cfg = new Configuration();
+ cfg.setString(CONTAINERIZED_TASK_MANAGER_ENV_PREFIX + "key", "val");
+ final ContaineredTaskManagerParameters containeredParams =
+ ContaineredTaskManagerParameters.create(cfg, TASK_EXECUTOR_PROCESS_SPEC);
+ final Map envVars = containeredParams.taskManagerEnv();
+ assertThat(envVars).doesNotContainKey(ENV_JAVA_HOME);
+ }
+
private static void verifyUnitResourceVariousSchedulers(
YarnConfiguration yarnConfig, int minMem, int minVcore, int incMem, int incVcore) {
yarnConfig.set(YarnConfiguration.RM_SCHEDULER, Utils.YARN_RM_FAIR_SCHEDULER_CLAZZ);
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
index 1710b1da22acd..7ff1b814d541c 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterDescriptorTest.java
@@ -921,11 +921,14 @@ public void testGenerateApplicationMasterEnv(@TempDir File flinkHomeDir) throws
final String fakeLocalFlinkJar = "./lib/flink_dist.jar";
final String fakeClassPath = fakeLocalFlinkJar + ":./usrlib/user.jar";
final ApplicationId appId = ApplicationId.newInstance(0, 0);
+ final Configuration flinkConfig = new Configuration();
+ flinkConfig.set(CoreOptions.FLINK_JAVA_HOME, "/opt/jdk");
final Map masterEnv =
getTestMasterEnv(
- new Configuration(), flinkHomeDir, fakeClassPath, fakeLocalFlinkJar, appId);
+ flinkConfig, flinkHomeDir, fakeClassPath, fakeLocalFlinkJar, appId);
assertThat(masterEnv)
+ .containsEntry(ConfigConstants.ENV_JAVA_HOME, "/opt/jdk")
.containsEntry(ConfigConstants.ENV_FLINK_LIB_DIR, "./lib")
.containsEntry(YarnConfigKeys.ENV_APP_ID, appId.toString())
.containsEntry(
@@ -940,6 +943,20 @@ public void testGenerateApplicationMasterEnv(@TempDir File flinkHomeDir) throws
.containsEntry(YarnConfigKeys.ENV_CLIENT_HOME_DIR, flinkHomeDir.getPath());
}
+ @Test
+ public void testContainerEnvJavaHomeNotOverriddenByDefault(@TempDir File flinkHomeDir)
+ throws IOException {
+ final Configuration flinkConfig = new Configuration();
+ final Map masterEnv =
+ getTestMasterEnv(
+ flinkConfig,
+ flinkHomeDir,
+ "",
+ "./lib/flink_dist.jar",
+ ApplicationId.newInstance(0, 0));
+ assertThat(masterEnv).doesNotContainKey(ConfigConstants.ENV_JAVA_HOME);
+ }
+
@Test
public void testEnvFlinkLibDirVarNotOverriddenByContainerEnv(@TempDir File tmpDir)
throws IOException {