From 0e1ef30fc406a6112f4f6d5b08ea3500bc5cefbe Mon Sep 17 00:00:00 2001 From: fhan Date: Tue, 21 Jan 2025 14:45:37 +0800 Subject: [PATCH] [HUDI-8882] timeline instant metrics for clean&compaction&clustering&rollback --- .../hudi/client/BaseHoodieWriteClient.java | 2 +- .../apache/hudi/metrics/HoodieMetrics.java | 93 +++++++++++++++---- .../hudi/metrics/TestHoodieMetrics.java | 79 +++++++++++++--- 3 files changed, 140 insertions(+), 34 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 82d34c5515d2..fec1679c5faf 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -565,7 +565,7 @@ protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, Stri // Delete the marker directory for the instant. WriteMarkersFactory.get(config.getMarkersType(), table, instantTime) .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism()); - metrics.updateClusteringTimeLineInstantMetrics(table.getActiveTimeline()); + metrics.updateTimelineInstantMetrics(table.getActiveTimeline()); } finally { this.heartbeatClient.stop(instantTime); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java index 104cbfed9c5b..ed1c1eae1184 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java @@ -19,9 +19,11 @@ package org.apache.hudi.metrics; import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.VisibleForTesting; @@ -34,6 +36,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Set; + /** * Wrapper for metrics-related operations. */ @@ -65,9 +69,18 @@ public class HoodieMetrics { public static final String COMMIT_LATENCY_IN_MS_STR = "commitLatencyInMs"; public static final String COMMIT_FRESHNESS_IN_MS_STR = "commitFreshnessInMs"; public static final String COMMIT_TIME_STR = "commitTime"; - public static final String EARLIEST_INFLIGHT_CLUSTERING_INSTANT_STR = "earliestInflightClusteringInstant"; + public static final String EARLIEST_PENDING_CLUSTERING_INSTANT_STR = "earliestInflightClusteringInstant"; + public static final String EARLIEST_PENDING_COMPACTION_INSTANT_STR = "earliestInflightCompactionInstant"; + public static final String EARLIEST_PENDING_CLEAN_INSTANT_STR = "earliestInflightCleanInstant"; + public static final String EARLIEST_PENDING_ROLLBACK_INSTANT_STR = "earliestInflightRollbackInstant"; public static final String LATEST_COMPLETED_CLUSTERING_INSTANT_STR = "latestCompletedClusteringInstant"; + public static final String LATEST_COMPLETED_COMPACTION_INSTANT_STR = "latestCompletedCompactionInstant"; + public static final String LATEST_COMPLETED_CLEAN_INSTANT_STR = "latestCompletedCleanInstant"; + public static final String LATEST_COMPLETED_ROLLBACK_INSTANT_STR = "latestCompletedRollbackInstant"; public static final String PENDING_CLUSTERING_INSTANT_COUNT_STR = "pendingClusteringInstantCount"; + public static final String PENDING_COMPACTION_INSTANT_COUNT_STR = "pendingCompactionInstantCount"; + public static final String PENDING_CLEAN_INSTANT_COUNT_STR = "pendingCleanInstantCount"; + public static final String PENDING_ROLLBACK_INSTANT_COUNT_STR = "pendingRollbackInstantCount"; public static final String SUCCESS_EXTENSION = ".success"; public static final String FAILURE_EXTENSION = ".failure"; @@ -383,27 +396,67 @@ public void updateClusteringFileCreationMetrics(long durationInMs) { reportMetrics(HoodieTimeline.CLUSTERING_ACTION, "fileCreationTime", durationInMs); } - public void updateClusteringTimeLineInstantMetrics(final HoodieActiveTimeline activeTimeline) { - if (config.isMetricsOn()) { - // Compute Metrics - long pendingClusteringInstantCount = activeTimeline.filterPendingClusteringTimeline().getInstants().size(); - long earliestInflightClusteringInstantLong = 0L; - Option earliestInflightClusteringInstant = activeTimeline.filterPendingClusteringTimeline().getFirstPendingClusterInstant(); - if (earliestInflightClusteringInstant.isPresent()) { - earliestInflightClusteringInstantLong = Long.valueOf(earliestInflightClusteringInstant.get().requestedTime()); - } - long latestCompletedClusteringInstantLong = 0L; - Option latestCompletedClusteringInstant = activeTimeline.filterCompletedInstants().getLastClusteringInstant(); - if (latestCompletedClusteringInstant.isPresent()) { - latestCompletedClusteringInstantLong = Long.valueOf(latestCompletedClusteringInstant.get().requestedTime()); - } + public void updateTimelineInstantMetrics(final HoodieActiveTimeline activeTimeline) { + updateEarliestPendingInstant(activeTimeline, EARLIEST_PENDING_CLUSTERING_INSTANT_STR, HoodieTimeline.CLUSTERING_ACTION); + updateEarliestPendingInstant(activeTimeline, EARLIEST_PENDING_COMPACTION_INSTANT_STR, HoodieTimeline.COMPACTION_ACTION); + updateEarliestPendingInstant(activeTimeline, EARLIEST_PENDING_CLEAN_INSTANT_STR, HoodieTimeline.CLEAN_ACTION); + updateEarliestPendingInstant(activeTimeline, EARLIEST_PENDING_ROLLBACK_INSTANT_STR, HoodieTimeline.ROLLBACK_ACTION); + + updateLatestCompletedInstant(activeTimeline, LATEST_COMPLETED_CLUSTERING_INSTANT_STR, HoodieTimeline.REPLACE_COMMIT_ACTION); + updateLatestCompletedInstant(activeTimeline, LATEST_COMPLETED_COMPACTION_INSTANT_STR, HoodieTimeline.COMMIT_ACTION); + updateLatestCompletedInstant(activeTimeline, LATEST_COMPLETED_CLEAN_INSTANT_STR, HoodieTimeline.CLEAN_ACTION); + updateLatestCompletedInstant(activeTimeline, LATEST_COMPLETED_ROLLBACK_INSTANT_STR, HoodieTimeline.ROLLBACK_ACTION); + + updatePendingInstantCount(activeTimeline, PENDING_CLUSTERING_INSTANT_COUNT_STR, HoodieTimeline.CLUSTERING_ACTION); + updatePendingInstantCount(activeTimeline, PENDING_COMPACTION_INSTANT_COUNT_STR, HoodieTimeline.COMPACTION_ACTION); + updatePendingInstantCount(activeTimeline, PENDING_CLEAN_INSTANT_COUNT_STR, HoodieTimeline.CLEAN_ACTION); + updatePendingInstantCount(activeTimeline, PENDING_ROLLBACK_INSTANT_COUNT_STR, HoodieTimeline.ROLLBACK_ACTION); + } + + private void updateEarliestPendingInstant(final HoodieActiveTimeline activeTimeline, + final String metricName, + final String action) { + Set validActions = CollectionUtils.createSet(action); + HoodieTimeline filteredInstants = activeTimeline.filterInflightsAndRequested().filter(instant -> validActions.contains(instant.getAction())); + Option hoodieInstantOption = filteredInstants.firstInstant(); + if (hoodieInstantOption.isPresent()) { + updateMetric(action, metricName, Long.parseLong(hoodieInstantOption.get().requestedTime())); + } + } + private void updateLatestCompletedInstant(final HoodieActiveTimeline activeTimeline, + final String metricName, + String action) { + String tableType = config.getTableType().name(); + if (HoodieTableType.MERGE_ON_READ.name().equalsIgnoreCase(tableType) + && LATEST_COMPLETED_COMPACTION_INSTANT_STR.equalsIgnoreCase(metricName)) { + action = HoodieActiveTimeline.COMMIT_ACTION; + } + if (HoodieTableType.COPY_ON_WRITE.name().equalsIgnoreCase(tableType) + && LATEST_COMPLETED_CLUSTERING_INSTANT_STR.equalsIgnoreCase(metricName)) { + action = HoodieActiveTimeline.REPLACE_COMMIT_ACTION; + } + Set validActions = CollectionUtils.createSet(action); + HoodieTimeline filteredInstants = activeTimeline.filterCompletedInstants().filter(instant -> validActions.contains(instant.getAction())); + Option hoodieInstantOption = filteredInstants.lastInstant(); + if (hoodieInstantOption.isPresent()) { + updateMetric(action, metricName, Long.parseLong(hoodieInstantOption.get().requestedTime())); + } + } + + private void updatePendingInstantCount(final HoodieActiveTimeline activeTimeline, + final String metricName, + final String action) { + Set validActions = CollectionUtils.createSet(action); + HoodieTimeline filteredInstants = activeTimeline.filterInflightsAndRequested().filter(instant -> validActions.contains(instant.getAction())); + updateMetric(action, metricName, filteredInstants.countInstants()); + } + + private void updateMetric(final String action, final String metricName, final long metricValue) { + if (config.isMetricsOn()) { LOG.info( - String.format("Sending timeline clustering instant metrics (%s=%d, %s=%d, %s=%d)", EARLIEST_INFLIGHT_CLUSTERING_INSTANT_STR, earliestInflightClusteringInstantLong, - LATEST_COMPLETED_CLUSTERING_INSTANT_STR, latestCompletedClusteringInstantLong, PENDING_CLUSTERING_INSTANT_COUNT_STR, pendingClusteringInstantCount)); - metrics.registerGauge(getMetricsName(HoodieTimeline.CLUSTERING_ACTION, EARLIEST_INFLIGHT_CLUSTERING_INSTANT_STR), earliestInflightClusteringInstantLong); - metrics.registerGauge(getMetricsName(HoodieTimeline.CLUSTERING_ACTION, LATEST_COMPLETED_CLUSTERING_INSTANT_STR), latestCompletedClusteringInstantLong); - metrics.registerGauge(getMetricsName(HoodieTimeline.CLUSTERING_ACTION, PENDING_CLUSTERING_INSTANT_COUNT_STR), pendingClusteringInstantCount); + String.format("Updating timeline instant related metrics (%s=%d)", metricName, metricValue)); + metrics.registerGauge(getMetricsName(action, metricName), metricValue); } } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java index ee5dba93b001..5985868a824d 100755 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java @@ -19,6 +19,7 @@ package org.apache.hudi.metrics; import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -233,23 +234,75 @@ public void testTimerCtxandGauges() throws InterruptedException { assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalRollbackLogBlocks()); }); - // Clustering Timeline Instant Metrics + // MOCK Timeline Instant Metrics for Clean & Rollback + when(writeConfig.getTableType()).thenReturn(HoodieTableType.COPY_ON_WRITE); + + HoodieInstant instant004 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, "1004"); + HoodieInstant instant007 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.CLEAN_ACTION, "1007"); + HoodieInstant instant009 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.ROLLBACK_ACTION, "1009"); + HoodieInstant instant0010 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, "10010"); + HoodieInstant instant0013 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.ROLLBACK_ACTION, "10013"); + HoodieInstant instant0015 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, "10015"); + HoodieInstant instant0016 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.ROLLBACK_ACTION, "10016"); + HoodieInstant instant0017 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.ROLLBACK_ACTION, "10017"); + + HoodieActiveTimeline activeTimeline1 = new MockHoodieActiveTimeline(instant004, instant007, instant009, instant0010, instant0013, instant0015, instant0016, instant0017); + hoodieMetrics.updateTimelineInstantMetrics(activeTimeline1); + + metricName = hoodieMetrics.getMetricsName(HoodieTimeline.CLEAN_ACTION, HoodieMetrics.EARLIEST_PENDING_CLEAN_INSTANT_STR); + assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), Long.valueOf("1004")); + metricName = hoodieMetrics.getMetricsName(HoodieTimeline.ROLLBACK_ACTION, HoodieMetrics.EARLIEST_PENDING_ROLLBACK_INSTANT_STR); + assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), Long.valueOf("1009")); + + metricName = hoodieMetrics.getMetricsName(HoodieTimeline.CLEAN_ACTION, HoodieMetrics.LATEST_COMPLETED_CLEAN_INSTANT_STR); + assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), Long.valueOf("1007")); + metricName = hoodieMetrics.getMetricsName(HoodieTimeline.ROLLBACK_ACTION, HoodieMetrics.LATEST_COMPLETED_ROLLBACK_INSTANT_STR); + assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), Long.valueOf("10017")); + + metricName = hoodieMetrics.getMetricsName(HoodieTimeline.CLEAN_ACTION, HoodieMetrics.PENDING_CLEAN_INSTANT_COUNT_STR); + assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), 3L); + metricName = hoodieMetrics.getMetricsName(HoodieTimeline.ROLLBACK_ACTION, HoodieMetrics.PENDING_ROLLBACK_INSTANT_COUNT_STR); + assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), 2L); + + // MOCK Timeline Instant Metrics for Clustering HoodieInstant instant001 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.CLUSTERING_ACTION, "1001"); - HoodieInstant instant0011 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "11001"); - HoodieInstant instant002 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.CLUSTERING_ACTION, "1002"); - HoodieInstant instant003 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLUSTERING_ACTION, "1003"); - HoodieInstant instant004 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLUSTERING_ACTION, "1004"); HoodieInstant instant005 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.CLUSTERING_ACTION, "1005"); - HoodieInstant instant006 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLUSTERING_ACTION, "1006"); - HoodieActiveTimeline activeTimeline = new MockHoodieActiveTimeline(instant001, instant0011, instant002, instant003, instant004, instant005, instant006); - hoodieMetrics.updateClusteringTimeLineInstantMetrics(activeTimeline); + HoodieInstant instant0011 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.REPLACE_COMMIT_ACTION, "10011"); + HoodieInstant instant0018 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLUSTERING_ACTION, "10018"); + + HoodieActiveTimeline activeTimeline2 = new MockHoodieActiveTimeline(instant001, instant005, instant0011, instant0018); + hoodieMetrics.updateTimelineInstantMetrics(activeTimeline2); + + metricName = hoodieMetrics.getMetricsName(HoodieTimeline.CLUSTERING_ACTION, HoodieMetrics.EARLIEST_PENDING_CLUSTERING_INSTANT_STR); + assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), Long.valueOf("10018")); + + metricName = hoodieMetrics.getMetricsName(HoodieTimeline.REPLACE_COMMIT_ACTION, HoodieMetrics.LATEST_COMPLETED_CLUSTERING_INSTANT_STR); + assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), Long.valueOf("10011")); - metricName = hoodieMetrics.getMetricsName(HoodieTimeline.CLUSTERING_ACTION, HoodieMetrics.EARLIEST_INFLIGHT_CLUSTERING_INSTANT_STR); - assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), Long.valueOf("1002")); - metricName = hoodieMetrics.getMetricsName(HoodieTimeline.CLUSTERING_ACTION, HoodieMetrics.LATEST_COMPLETED_CLUSTERING_INSTANT_STR); - assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), Long.valueOf("1005")); metricName = hoodieMetrics.getMetricsName(HoodieTimeline.CLUSTERING_ACTION, HoodieMetrics.PENDING_CLUSTERING_INSTANT_COUNT_STR); - assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), 4L); + assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), 1L); + + // MOCK Timeline Instant Metrics for Compaction + when(writeConfig.getTableType()).thenReturn(HoodieTableType.MERGE_ON_READ); + + HoodieInstant instant002 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "1002"); + HoodieInstant instant003 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "1003"); + HoodieInstant instant006 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "1006"); + HoodieInstant instant008 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "1008"); + HoodieInstant instant0012 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, "10012"); + HoodieInstant instant0014 = INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, "10014"); + + HoodieActiveTimeline activeTimeline3 = new MockHoodieActiveTimeline(instant002, instant003, instant006, instant008, instant0012, instant0014); + hoodieMetrics.updateTimelineInstantMetrics(activeTimeline3); + + metricName = hoodieMetrics.getMetricsName(HoodieTimeline.COMPACTION_ACTION, HoodieMetrics.EARLIEST_PENDING_COMPACTION_INSTANT_STR); + assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), Long.valueOf("1002")); + + metricName = hoodieMetrics.getMetricsName(HoodieTimeline.COMMIT_ACTION, HoodieMetrics.LATEST_COMPLETED_COMPACTION_INSTANT_STR); + assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), Long.valueOf("1006")); + + metricName = hoodieMetrics.getMetricsName(HoodieTimeline.COMPACTION_ACTION, HoodieMetrics.PENDING_COMPACTION_INSTANT_COUNT_STR); + assertEquals((long)metrics.getRegistry().getGauges().get(metricName).getValue(), 5L); } private static class MockHoodieActiveTimeline extends ActiveTimelineV2 {