Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8704] Instant time to completion time state upgrade migration for Flink StreamReadMonitoringFunction #12539

Closed
wants to merge 9 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.metrics.FlinkStreamReadMetrics;
import org.apache.hudi.source.prune.PartitionPruners;
import org.apache.hudi.state.upgrade.StateUpgradeHelper;
import org.apache.hudi.state.upgrade.StateUpgrader;
import org.apache.hudi.state.upgrade.StateVersion;
import org.apache.hudi.state.upgrade.source.StreamReadMonitoringStateUpgrader;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.StreamerUtil;

Expand Down Expand Up @@ -148,6 +152,10 @@ public void initializeState(FunctionInitializationContext context) throws Except
LOG.info("Restoring state for the class {} with table {} and base path {}.",
getClass().getSimpleName(), conf.getString(FlinkOptions.TABLE_NAME), path);

// Handle state upgrades
StateUpgrader<String> stateUpgrader = new StreamReadMonitoringStateUpgrader(metaClient, issuedInstant);
new StateUpgradeHelper<>(instantState, stateUpgrader, StateVersion.V2).upgradeState();

List<String> retrievedStates = new ArrayList<>();
for (String entry : this.instantState.get()) {
retrievedStates.add(entry);
Expand All @@ -156,27 +164,11 @@ public void initializeState(FunctionInitializationContext context) throws Except
ValidationUtils.checkArgument(retrievedStates.size() <= 2,
getClass().getSimpleName() + " retrieved invalid state.");

if (retrievedStates.size() == 1 && issuedInstant != null) {
// this is the case where we have both legacy and new state.
// the two should be mutually exclusive for the operator, thus we throw the exception.

throw new IllegalArgumentException(
"The " + getClass().getSimpleName() + " has already restored from a previous Flink version.");

} else if (retrievedStates.size() == 1) {
// for forward compatibility
this.issuedInstant = retrievedStates.get(0);
if (LOG.isDebugEnabled()) {
LOG.debug("{} retrieved an issued instant of time {} for table {} with path {}.",
getClass().getSimpleName(), issuedInstant, conf.get(FlinkOptions.TABLE_NAME), path);
}
} else if (retrievedStates.size() == 2) {
this.issuedInstant = retrievedStates.get(0);
this.issuedOffset = retrievedStates.get(1);
if (LOG.isDebugEnabled()) {
LOG.debug("{} retrieved an issued instant of time [{}, {}] for table {} with path {}.",
getClass().getSimpleName(), issuedInstant, issuedOffset, conf.get(FlinkOptions.TABLE_NAME), path);
}
this.issuedInstant = retrievedStates.get(0);
this.issuedOffset = retrievedStates.get(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expected this be a simple change, check the issuedOffset against the timeline for the first time of read, if the issuedOffset is an instant time, then just switch to it's completion time or modification time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for the misunderstanding, i do not quite understand what you mean, I was under the impression that this would be a backward compatibility improvement when migrating from a Hudi 0.x table to a Hudi 1.0 table.

From what i understand:

version attribute available in state?
0.x issuedInstant true
0.x issuedOffset false
1.x issuedInstant true
1.x issuedOffset true

Hence, i was under the impression that we needed a way to add a issuedOffset when upgrading from 0.x, where issuedOffset will be null in the state backend during first read after upgrade.

if (LOG.isDebugEnabled()) {
LOG.debug("{} retrieved an issued instant of time [{}, {}] for table {} with path {}.",
getClass().getSimpleName(), issuedInstant, issuedOffset, conf.get(FlinkOptions.TABLE_NAME), path);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.state.upgrade;

import org.apache.flink.api.common.state.ListState;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

public class StateUpgradeHelper<T> {

private final ListState<T> state;
private final StateUpgrader<T> upgrader;
private final StateVersion currentVersion;

public StateUpgradeHelper(ListState<T> state, StateUpgrader<T> upgrader, StateVersion currentVersion) {
this.state = state;
this.upgrader = upgrader;
this.currentVersion = currentVersion;
}

public void upgradeState() throws Exception {
List<T> currentState = StreamSupport
.stream(state.get().spliterator(), false)
.collect(Collectors.toList());

StateVersion detectedVersion = detectVersion(currentState);
if (upgrader.canUpgrade(detectedVersion, currentVersion)) {
List<T> upgradedState = upgrader.upgrade(currentState, detectedVersion, currentVersion);
state.clear();
state.addAll(upgradedState);
}
}

private StateVersion detectVersion(List<T> state) {
return state.size() == 1 ? StateVersion.V1 : StateVersion.V2;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.state.upgrade;

import java.util.List;

public interface StateUpgrader<T> {
List<T> upgrade(List<T> oldState, StateVersion fromVersion, StateVersion toVersion);

boolean canUpgrade(StateVersion fromVersion, StateVersion toVersion);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.state.upgrade;

// StateVersion.java
public enum StateVersion {
V1(1),
V2(2);

private final int version;

StateVersion(int version) {
this.version = version;
}

public int getValue() {
return version;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.state.upgrade.source;

import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.state.upgrade.StateUpgrader;
import org.apache.hudi.state.upgrade.StateVersion;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;

public class StreamReadMonitoringStateUpgrader implements StateUpgrader<String> {

private static final Logger LOG = LoggerFactory.getLogger(StreamReadMonitoringStateUpgrader.class);

private final HoodieTableMetaClient metaClient;
private final String issuedInstant;

public StreamReadMonitoringStateUpgrader(HoodieTableMetaClient metaClient, String issuedInstant) {
this.metaClient = metaClient;
this.issuedInstant = issuedInstant;
}

@Override
public List<String> upgrade(List<String> oldState, StateVersion fromVersion, StateVersion toVersion) {
switch (fromVersion) {
case V1:
if (toVersion == StateVersion.V2) {
return upgradeV1ToV2(oldState);
}
throw new IllegalStateException("Unsupported version upgrade path");
case V2:
// Do nothing
return oldState;
default:
throw new IllegalStateException("Unsupported version upgrade path");
}
}

@Override
public boolean canUpgrade(StateVersion fromVersion, StateVersion toVersion) {
return fromVersion.getValue() < toVersion.getValue();
}

private List<String> upgradeV1ToV2(List<String> oldState) {
ValidationUtils.checkState(oldState.size() == 1, "Retrieved state must have a size of 1");

// this is the case where we have both legacy and new state.
// the two should be mutually exclusive for the operator, thus we throw the exception.
ValidationUtils.checkState(this.issuedInstant != null,
"The " + getClass().getSimpleName() + " has already restored from a previous Flink version.");

String issuedInstant = oldState.get(0);

// TODO 1: Find issuedOffset by querying for completion time using metaClient's active offset
// TODO 2: If issuedInstant (requestInstant) is in archive timeline, how do we handle that, should we throw an error?
voonhous marked this conversation as resolved.
Show resolved Hide resolved
String issuedOffset = "";
return Arrays.asList(issuedInstant, issuedOffset);
}
}
Loading