Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8882] Support Compaction/Rollback/Clean Timeline Instant Metrics in HoodieMetrics #12681

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, Stri
// Delete the marker directory for the instant.
WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
.quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
metrics.updateClusteringTimeLineInstantMetrics(table.getActiveTimeline());
metrics.updateTimelineInstantMetrics(table.getActiveTimeline());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we rename this function to updateTableServiceInstantMetrics

} finally {
this.heartbeatClient.stop(instantTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
package org.apache.hudi.metrics;

import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.VisibleForTesting;
Expand All @@ -34,6 +36,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Set;

/**
* Wrapper for metrics-related operations.
*/
Expand Down Expand Up @@ -65,9 +69,18 @@ public class HoodieMetrics {
public static final String COMMIT_LATENCY_IN_MS_STR = "commitLatencyInMs";
public static final String COMMIT_FRESHNESS_IN_MS_STR = "commitFreshnessInMs";
public static final String COMMIT_TIME_STR = "commitTime";
public static final String EARLIEST_INFLIGHT_CLUSTERING_INSTANT_STR = "earliestInflightClusteringInstant";
public static final String EARLIEST_PENDING_CLUSTERING_INSTANT_STR = "earliestInflightClusteringInstant";
public static final String EARLIEST_PENDING_COMPACTION_INSTANT_STR = "earliestInflightCompactionInstant";
public static final String EARLIEST_PENDING_CLEAN_INSTANT_STR = "earliestInflightCleanInstant";
public static final String EARLIEST_PENDING_ROLLBACK_INSTANT_STR = "earliestInflightRollbackInstant";
public static final String LATEST_COMPLETED_CLUSTERING_INSTANT_STR = "latestCompletedClusteringInstant";
public static final String LATEST_COMPLETED_COMPACTION_INSTANT_STR = "latestCompletedCompactionInstant";
public static final String LATEST_COMPLETED_CLEAN_INSTANT_STR = "latestCompletedCleanInstant";
public static final String LATEST_COMPLETED_ROLLBACK_INSTANT_STR = "latestCompletedRollbackInstant";
public static final String PENDING_CLUSTERING_INSTANT_COUNT_STR = "pendingClusteringInstantCount";
public static final String PENDING_COMPACTION_INSTANT_COUNT_STR = "pendingCompactionInstantCount";
public static final String PENDING_CLEAN_INSTANT_COUNT_STR = "pendingCleanInstantCount";
public static final String PENDING_ROLLBACK_INSTANT_COUNT_STR = "pendingRollbackInstantCount";
public static final String SUCCESS_EXTENSION = ".success";
public static final String FAILURE_EXTENSION = ".failure";

Expand Down Expand Up @@ -383,27 +396,67 @@ public void updateClusteringFileCreationMetrics(long durationInMs) {
reportMetrics(HoodieTimeline.CLUSTERING_ACTION, "fileCreationTime", durationInMs);
}

public void updateClusteringTimeLineInstantMetrics(final HoodieActiveTimeline activeTimeline) {
if (config.isMetricsOn()) {
// Compute Metrics
long pendingClusteringInstantCount = activeTimeline.filterPendingClusteringTimeline().getInstants().size();
long earliestInflightClusteringInstantLong = 0L;
Option<HoodieInstant> earliestInflightClusteringInstant = activeTimeline.filterPendingClusteringTimeline().getFirstPendingClusterInstant();
if (earliestInflightClusteringInstant.isPresent()) {
earliestInflightClusteringInstantLong = Long.valueOf(earliestInflightClusteringInstant.get().requestedTime());
}
long latestCompletedClusteringInstantLong = 0L;
Option<HoodieInstant> latestCompletedClusteringInstant = activeTimeline.filterCompletedInstants().getLastClusteringInstant();
if (latestCompletedClusteringInstant.isPresent()) {
latestCompletedClusteringInstantLong = Long.valueOf(latestCompletedClusteringInstant.get().requestedTime());
}
public void updateTimelineInstantMetrics(final HoodieActiveTimeline activeTimeline) {
updateEarliestPendingInstant(activeTimeline, EARLIEST_PENDING_CLUSTERING_INSTANT_STR, HoodieTimeline.CLUSTERING_ACTION);
updateEarliestPendingInstant(activeTimeline, EARLIEST_PENDING_COMPACTION_INSTANT_STR, HoodieTimeline.COMPACTION_ACTION);
updateEarliestPendingInstant(activeTimeline, EARLIEST_PENDING_CLEAN_INSTANT_STR, HoodieTimeline.CLEAN_ACTION);
updateEarliestPendingInstant(activeTimeline, EARLIEST_PENDING_ROLLBACK_INSTANT_STR, HoodieTimeline.ROLLBACK_ACTION);

updateLatestCompletedInstant(activeTimeline, LATEST_COMPLETED_CLUSTERING_INSTANT_STR, HoodieTimeline.REPLACE_COMMIT_ACTION);
updateLatestCompletedInstant(activeTimeline, LATEST_COMPLETED_COMPACTION_INSTANT_STR, HoodieTimeline.COMMIT_ACTION);
updateLatestCompletedInstant(activeTimeline, LATEST_COMPLETED_CLEAN_INSTANT_STR, HoodieTimeline.CLEAN_ACTION);
updateLatestCompletedInstant(activeTimeline, LATEST_COMPLETED_ROLLBACK_INSTANT_STR, HoodieTimeline.ROLLBACK_ACTION);

updatePendingInstantCount(activeTimeline, PENDING_CLUSTERING_INSTANT_COUNT_STR, HoodieTimeline.CLUSTERING_ACTION);
updatePendingInstantCount(activeTimeline, PENDING_COMPACTION_INSTANT_COUNT_STR, HoodieTimeline.COMPACTION_ACTION);
updatePendingInstantCount(activeTimeline, PENDING_CLEAN_INSTANT_COUNT_STR, HoodieTimeline.CLEAN_ACTION);
updatePendingInstantCount(activeTimeline, PENDING_ROLLBACK_INSTANT_COUNT_STR, HoodieTimeline.ROLLBACK_ACTION);
}

private void updateEarliestPendingInstant(final HoodieActiveTimeline activeTimeline,
final String metricName,
final String action) {
Set<String> validActions = CollectionUtils.createSet(action);
HoodieTimeline filteredInstants = activeTimeline.filterInflightsAndRequested().filter(instant -> validActions.contains(instant.getAction()));
Option<HoodieInstant> hoodieInstantOption = filteredInstants.firstInstant();
if (hoodieInstantOption.isPresent()) {
updateMetric(action, metricName, Long.parseLong(hoodieInstantOption.get().requestedTime()));
}
}

private void updateLatestCompletedInstant(final HoodieActiveTimeline activeTimeline,
final String metricName,
String action) {
String tableType = config.getTableType().name();
if (HoodieTableType.MERGE_ON_READ.name().equalsIgnoreCase(tableType)
&& LATEST_COMPLETED_COMPACTION_INSTANT_STR.equalsIgnoreCase(metricName)) {
action = HoodieActiveTimeline.COMMIT_ACTION;
}
if (HoodieTableType.COPY_ON_WRITE.name().equalsIgnoreCase(tableType)
&& LATEST_COMPLETED_CLUSTERING_INSTANT_STR.equalsIgnoreCase(metricName)) {
action = HoodieActiveTimeline.REPLACE_COMMIT_ACTION;
}
Comment on lines +431 to +438
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use switch style?
Also no need to take care of tableType, bcz MOR table also can do clustering.

Set<String> validActions = CollectionUtils.createSet(action);
HoodieTimeline filteredInstants = activeTimeline.filterCompletedInstants().filter(instant -> validActions.contains(instant.getAction()));
Option<HoodieInstant> hoodieInstantOption = filteredInstants.lastInstant();
if (hoodieInstantOption.isPresent()) {
updateMetric(action, metricName, Long.parseLong(hoodieInstantOption.get().requestedTime()));
}
}

private void updatePendingInstantCount(final HoodieActiveTimeline activeTimeline,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we add some java docs? what does PendingInstantCount mean? and what is this metric used for?

final String metricName,
final String action) {
Set<String> validActions = CollectionUtils.createSet(action);
HoodieTimeline filteredInstants = activeTimeline.filterInflightsAndRequested().filter(instant -> validActions.contains(instant.getAction()));
updateMetric(action, metricName, filteredInstants.countInstants());
}

private void updateMetric(final String action, final String metricName, final long metricValue) {
if (config.isMetricsOn()) {
LOG.info(
String.format("Sending timeline clustering instant metrics (%s=%d, %s=%d, %s=%d)", EARLIEST_INFLIGHT_CLUSTERING_INSTANT_STR, earliestInflightClusteringInstantLong,
LATEST_COMPLETED_CLUSTERING_INSTANT_STR, latestCompletedClusteringInstantLong, PENDING_CLUSTERING_INSTANT_COUNT_STR, pendingClusteringInstantCount));
metrics.registerGauge(getMetricsName(HoodieTimeline.CLUSTERING_ACTION, EARLIEST_INFLIGHT_CLUSTERING_INSTANT_STR), earliestInflightClusteringInstantLong);
metrics.registerGauge(getMetricsName(HoodieTimeline.CLUSTERING_ACTION, LATEST_COMPLETED_CLUSTERING_INSTANT_STR), latestCompletedClusteringInstantLong);
metrics.registerGauge(getMetricsName(HoodieTimeline.CLUSTERING_ACTION, PENDING_CLUSTERING_INSTANT_COUNT_STR), pendingClusteringInstantCount);
String.format("Updating timeline instant related metrics (%s=%d)", metricName, metricValue));
metrics.registerGauge(getMetricsName(action, metricName), metricValue);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.metrics;

import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
Expand Down Expand Up @@ -233,23 +234,75 @@ public void testTimerCtxandGauges() throws InterruptedException {
assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalRollbackLogBlocks());
});

// Clustering Timeline Instant Metrics
// MOCK Timeline Instant Metrics for Clean & Rollback
when(writeConfig.getTableType()).thenReturn(HoodieTableType.COPY_ON_WRITE);

HoodieInstant instant004 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, "1004");
HoodieInstant instant007 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.CLEAN_ACTION, "1007");
HoodieInstant instant009 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.ROLLBACK_ACTION, "1009");
HoodieInstant instant0010 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, "10010");
HoodieInstant instant0013 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.ROLLBACK_ACTION, "10013");
HoodieInstant instant0015 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, "10015");
HoodieInstant instant0016 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.ROLLBACK_ACTION, "10016");
HoodieInstant instant0017 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.ROLLBACK_ACTION, "10017");

HoodieActiveTimeline activeTimeline1 = new MockHoodieActiveTimeline(instant004, instant007, instant009, instant0010, instant0013, instant0015, instant0016, instant0017);
hoodieMetrics.updateTimelineInstantMetrics(activeTimeline1);

metricName = hoodieMetrics.getMetricsName(HoodieTimeline.CLEAN_ACTION, HoodieMetrics.EARLIEST_PENDING_CLEAN_INSTANT_STR);
assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), Long.valueOf("1004"));
metricName = hoodieMetrics.getMetricsName(HoodieTimeline.ROLLBACK_ACTION, HoodieMetrics.EARLIEST_PENDING_ROLLBACK_INSTANT_STR);
assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), Long.valueOf("1009"));

metricName = hoodieMetrics.getMetricsName(HoodieTimeline.CLEAN_ACTION, HoodieMetrics.LATEST_COMPLETED_CLEAN_INSTANT_STR);
assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), Long.valueOf("1007"));
metricName = hoodieMetrics.getMetricsName(HoodieTimeline.ROLLBACK_ACTION, HoodieMetrics.LATEST_COMPLETED_ROLLBACK_INSTANT_STR);
assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), Long.valueOf("10017"));

metricName = hoodieMetrics.getMetricsName(HoodieTimeline.CLEAN_ACTION, HoodieMetrics.PENDING_CLEAN_INSTANT_COUNT_STR);
assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), 3L);
metricName = hoodieMetrics.getMetricsName(HoodieTimeline.ROLLBACK_ACTION, HoodieMetrics.PENDING_ROLLBACK_INSTANT_COUNT_STR);
assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), 2L);

// MOCK Timeline Instant Metrics for Clustering
HoodieInstant instant001 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.CLUSTERING_ACTION, "1001");
HoodieInstant instant0011 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "11001");
HoodieInstant instant002 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.CLUSTERING_ACTION, "1002");
HoodieInstant instant003 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLUSTERING_ACTION, "1003");
HoodieInstant instant004 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLUSTERING_ACTION, "1004");
HoodieInstant instant005 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.CLUSTERING_ACTION, "1005");
HoodieInstant instant006 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLUSTERING_ACTION, "1006");
HoodieActiveTimeline activeTimeline = new MockHoodieActiveTimeline(instant001, instant0011, instant002, instant003, instant004, instant005, instant006);
hoodieMetrics.updateClusteringTimeLineInstantMetrics(activeTimeline);
HoodieInstant instant0011 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.REPLACE_COMMIT_ACTION, "10011");
HoodieInstant instant0018 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLUSTERING_ACTION, "10018");

HoodieActiveTimeline activeTimeline2 = new MockHoodieActiveTimeline(instant001, instant005, instant0011, instant0018);
hoodieMetrics.updateTimelineInstantMetrics(activeTimeline2);

metricName = hoodieMetrics.getMetricsName(HoodieTimeline.CLUSTERING_ACTION, HoodieMetrics.EARLIEST_PENDING_CLUSTERING_INSTANT_STR);
assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), Long.valueOf("10018"));

metricName = hoodieMetrics.getMetricsName(HoodieTimeline.REPLACE_COMMIT_ACTION, HoodieMetrics.LATEST_COMPLETED_CLUSTERING_INSTANT_STR);
assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), Long.valueOf("10011"));

metricName = hoodieMetrics.getMetricsName(HoodieTimeline.CLUSTERING_ACTION, HoodieMetrics.EARLIEST_INFLIGHT_CLUSTERING_INSTANT_STR);
assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), Long.valueOf("1002"));
metricName = hoodieMetrics.getMetricsName(HoodieTimeline.CLUSTERING_ACTION, HoodieMetrics.LATEST_COMPLETED_CLUSTERING_INSTANT_STR);
assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), Long.valueOf("1005"));
metricName = hoodieMetrics.getMetricsName(HoodieTimeline.CLUSTERING_ACTION, HoodieMetrics.PENDING_CLUSTERING_INSTANT_COUNT_STR);
assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), 4L);
assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), 1L);

// MOCK Timeline Instant Metrics for Compaction
when(writeConfig.getTableType()).thenReturn(HoodieTableType.MERGE_ON_READ);

HoodieInstant instant002 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "1002");
HoodieInstant instant003 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "1003");
HoodieInstant instant006 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "1006");
HoodieInstant instant008 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "1008");
HoodieInstant instant0012 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "10012");
HoodieInstant instant0014 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "10014");

HoodieActiveTimeline activeTimeline3 = new MockHoodieActiveTimeline(instant002, instant003, instant006, instant008, instant0012, instant0014);
hoodieMetrics.updateTimelineInstantMetrics(activeTimeline3);

metricName = hoodieMetrics.getMetricsName(HoodieTimeline.COMPACTION_ACTION, HoodieMetrics.EARLIEST_PENDING_COMPACTION_INSTANT_STR);
assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), Long.valueOf("1002"));

metricName = hoodieMetrics.getMetricsName(HoodieTimeline.COMMIT_ACTION, HoodieMetrics.LATEST_COMPLETED_COMPACTION_INSTANT_STR);
assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), Long.valueOf("1006"));

metricName = hoodieMetrics.getMetricsName(HoodieTimeline.COMPACTION_ACTION, HoodieMetrics.PENDING_COMPACTION_INSTANT_COUNT_STR);
assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), 5L);
}

private static class MockHoodieActiveTimeline extends ActiveTimelineV2 {
Expand Down
Loading