Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8704] Instant time to completion time state upgrade migration for Flink StreamReadMonitoringFunction #12539

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.metrics.FlinkStreamReadMetrics;
import org.apache.hudi.source.prune.PartitionPruners;
import org.apache.hudi.state.upgrade.StateUpgradeHelper;
import org.apache.hudi.state.upgrade.StateUpgrader;
import org.apache.hudi.state.upgrade.StateVersion;
import org.apache.hudi.state.upgrade.source.StreamReadMonitoringStateUpgrader;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.StreamerUtil;

Expand Down Expand Up @@ -148,6 +152,10 @@ public void initializeState(FunctionInitializationContext context) throws Except
LOG.info("Restoring state for the class {} with table {} and base path {}.",
getClass().getSimpleName(), conf.getString(FlinkOptions.TABLE_NAME), path);

// Handle state upgrades
StateUpgrader<String> stateUpgrader = new StreamReadMonitoringStateUpgrader(metaClient, issuedInstant);
new StateUpgradeHelper<>(instantState, stateUpgrader, StateVersion.V1).upgradeState();

List<String> retrievedStates = new ArrayList<>();
for (String entry : this.instantState.get()) {
retrievedStates.add(entry);
Expand All @@ -156,27 +164,11 @@ public void initializeState(FunctionInitializationContext context) throws Except
ValidationUtils.checkArgument(retrievedStates.size() <= 2,
getClass().getSimpleName() + " retrieved invalid state.");

if (retrievedStates.size() == 1 && issuedInstant != null) {
// this is the case where we have both legacy and new state.
// the two should be mutually exclusive for the operator, thus we throw the exception.

throw new IllegalArgumentException(
"The " + getClass().getSimpleName() + " has already restored from a previous Flink version.");

} else if (retrievedStates.size() == 1) {
// for forward compatibility
this.issuedInstant = retrievedStates.get(0);
if (LOG.isDebugEnabled()) {
LOG.debug("{} retrieved an issued instant of time {} for table {} with path {}.",
getClass().getSimpleName(), issuedInstant, conf.get(FlinkOptions.TABLE_NAME), path);
}
} else if (retrievedStates.size() == 2) {
this.issuedInstant = retrievedStates.get(0);
this.issuedOffset = retrievedStates.get(1);
if (LOG.isDebugEnabled()) {
LOG.debug("{} retrieved an issued instant of time [{}, {}] for table {} with path {}.",
getClass().getSimpleName(), issuedInstant, issuedOffset, conf.get(FlinkOptions.TABLE_NAME), path);
}
this.issuedInstant = retrievedStates.get(0);
this.issuedOffset = retrievedStates.get(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expected this be a simple change, check the issuedOffset against the timeline for the first time of read, if the issuedOffset is an instant time, then just switch to it's completion time or modification time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for the misunderstanding, i do not quite understand what you mean, I was under the impression that this would be a backward compatibility improvement when migrating from a Hudi 0.x table to a Hudi 1.0 table.

From what i understand:

version attribute available in state?
0.x issuedInstant true
0.x issuedOffset false
1.x issuedInstant true
1.x issuedOffset true

Hence, i was under the impression that we needed a way to add a issuedOffset when upgrading from 0.x, where issuedOffset will be null in the state backend during first read after upgrade.

if (LOG.isDebugEnabled()) {
LOG.debug("{} retrieved an issued instant of time [{}, {}] for table {} with path {}.",
getClass().getSimpleName(), issuedInstant, issuedOffset, conf.get(FlinkOptions.TABLE_NAME), path);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.state.upgrade;

import org.apache.flink.api.common.state.ListState;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

public class StateUpgradeHelper<T> {

private final ListState<T> state;
private final StateUpgrader<T> upgrader;
private final StateVersion targetVersion;

public StateUpgradeHelper(ListState<T> state, StateUpgrader<T> upgrader, StateVersion targetVersion) {
this.state = state;
this.upgrader = upgrader;
this.targetVersion = targetVersion;
}

public void upgradeState() throws Exception {
List<T> currentState = StreamSupport
.stream(state.get().spliterator(), false)
.collect(Collectors.toList());

StateVersion detectedVersion = upgrader.detectVersion(currentState);
if (upgrader.canUpgrade(detectedVersion, targetVersion)) {
List<T> upgradedState = upgrader.upgrade(currentState, detectedVersion, targetVersion);
state.clear();
state.addAll(upgradedState);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.state.upgrade;

import java.util.List;

public interface StateUpgrader<T> {
List<T> upgrade(List<T> oldState, StateVersion fromVersion, StateVersion toVersion);

boolean canUpgrade(StateVersion fromVersion, StateVersion toVersion);

StateVersion detectVersion(List<T> oldState);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.state.upgrade;

public enum StateVersion {
V0(0),
V1(1),
UNKNOWN(-1);

private final int version;

StateVersion(int version) {
this.version = version;
}

public int getValue() {
return version;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.state.upgrade.source;

import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieUpgradeDowngradeException;
import org.apache.hudi.state.upgrade.StateUpgrader;
import org.apache.hudi.state.upgrade.StateVersion;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;

public class StreamReadMonitoringStateUpgrader implements StateUpgrader<String> {

private static final Logger LOG = LoggerFactory.getLogger(StreamReadMonitoringStateUpgrader.class);

private final HoodieTableMetaClient metaClient;
private final String issuedInstant;

public StreamReadMonitoringStateUpgrader(HoodieTableMetaClient metaClient, String issuedInstant) {
this.metaClient = metaClient;
this.issuedInstant = issuedInstant;
}

@Override
public List<String> upgrade(List<String> oldState, StateVersion fromVersion, StateVersion toVersion) {
switch (fromVersion) {
case V0:
if (toVersion == StateVersion.V1) {
return upgradeV0ToV1(oldState);
}
throw new HoodieUpgradeDowngradeException("Unsupported version upgrade path :" + fromVersion + " -> " + toVersion);
case V1:
// Do nothing
return oldState;
default:
throw new HoodieUpgradeDowngradeException("Unsupported version upgrade path :" + fromVersion + " -> " + toVersion);
}
}

@Override
public boolean canUpgrade(StateVersion fromVersion, StateVersion toVersion) {
return fromVersion.getValue() < toVersion.getValue();
}

@Override
public StateVersion detectVersion(List<String> state) {
switch (state.size()) {
case 1:
return StateVersion.V0;
case 2:
return StateVersion.V1;
default:
throw new IllegalStateException("Unknown state size when detecting version, size: " + state.size());
}
}

private List<String> upgradeV0ToV1(List<String> oldState) {
ValidationUtils.checkState(oldState.size() == 1, "Retrieved state must have a size of 1");

// this is the case where we have both legacy and new state.
// the two should be mutually exclusive for the operator, thus we throw the exception.
ValidationUtils.checkState(this.issuedInstant != null,
"The " + getClass().getSimpleName() + " has already restored from a previous Flink version.");

String issuedInstant = oldState.get(0);
HoodieTimeline filteredTimeline;
boolean isReadArchivedTimeline = false;
if (metaClient.getActiveTimeline().isBeforeTimelineStarts(issuedInstant)) {
isReadArchivedTimeline = true;
// if issuedInstant (requestedTime) is in archive timeline, scan archive timeline to find its completion time
LOG.warn("The reader's restored instant is in the archive timeline, will scan the archive timeline for issuedOffset (completionTime) using requestTime: {}", issuedInstant);
HoodieArchivedTimeline archivedTimeline = metaClient.getArchivedTimeline(StringUtils.EMPTY_STRING, false);
filteredTimeline = archivedTimeline.findInstantsAfterOrEquals(issuedInstant, 3);
} else {
filteredTimeline = metaClient.getActiveTimeline().findInstantsAfterOrEquals(issuedInstant, 3);
}

if (filteredTimeline.firstInstant().isEmpty()) {
LOG.error("Unable to find instant {} in [isArchived: {}] timeline: {}", issuedInstant, isReadArchivedTimeline, filteredTimeline.getInstants());
throw new HoodieException("Unable to find completionTime in timeline for instant: " + issuedInstant);
}
String issuedOffset = filteredTimeline.firstInstant().get().getCompletionTime();

return Arrays.asList(issuedInstant, issuedOffset);
}
}
Loading
Loading