-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-8882] Support Compaction/Rollback/Clean Timeline Instant Metrics in HoodieMetrics #12681
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,9 +19,11 @@ | |
package org.apache.hudi.metrics; | ||
|
||
import org.apache.hudi.common.model.HoodieCommitMetadata; | ||
import org.apache.hudi.common.model.HoodieTableType; | ||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; | ||
import org.apache.hudi.common.table.timeline.HoodieInstant; | ||
import org.apache.hudi.common.table.timeline.HoodieTimeline; | ||
import org.apache.hudi.common.util.CollectionUtils; | ||
import org.apache.hudi.common.util.Option; | ||
import org.apache.hudi.common.util.StringUtils; | ||
import org.apache.hudi.common.util.VisibleForTesting; | ||
|
@@ -34,6 +36,8 @@ | |
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.Set; | ||
|
||
/** | ||
* Wrapper for metrics-related operations. | ||
*/ | ||
|
@@ -65,9 +69,18 @@ public class HoodieMetrics { | |
public static final String COMMIT_LATENCY_IN_MS_STR = "commitLatencyInMs"; | ||
public static final String COMMIT_FRESHNESS_IN_MS_STR = "commitFreshnessInMs"; | ||
public static final String COMMIT_TIME_STR = "commitTime"; | ||
public static final String EARLIEST_INFLIGHT_CLUSTERING_INSTANT_STR = "earliestInflightClusteringInstant"; | ||
public static final String EARLIEST_PENDING_CLUSTERING_INSTANT_STR = "earliestInflightClusteringInstant"; | ||
public static final String EARLIEST_PENDING_COMPACTION_INSTANT_STR = "earliestInflightCompactionInstant"; | ||
public static final String EARLIEST_PENDING_CLEAN_INSTANT_STR = "earliestInflightCleanInstant"; | ||
public static final String EARLIEST_PENDING_ROLLBACK_INSTANT_STR = "earliestInflightRollbackInstant"; | ||
public static final String LATEST_COMPLETED_CLUSTERING_INSTANT_STR = "latestCompletedClusteringInstant"; | ||
public static final String LATEST_COMPLETED_COMPACTION_INSTANT_STR = "latestCompletedCompactionInstant"; | ||
public static final String LATEST_COMPLETED_CLEAN_INSTANT_STR = "latestCompletedCleanInstant"; | ||
public static final String LATEST_COMPLETED_ROLLBACK_INSTANT_STR = "latestCompletedRollbackInstant"; | ||
public static final String PENDING_CLUSTERING_INSTANT_COUNT_STR = "pendingClusteringInstantCount"; | ||
public static final String PENDING_COMPACTION_INSTANT_COUNT_STR = "pendingCompactionInstantCount"; | ||
public static final String PENDING_CLEAN_INSTANT_COUNT_STR = "pendingCleanInstantCount"; | ||
public static final String PENDING_ROLLBACK_INSTANT_COUNT_STR = "pendingRollbackInstantCount"; | ||
public static final String SUCCESS_EXTENSION = ".success"; | ||
public static final String FAILURE_EXTENSION = ".failure"; | ||
|
||
|
@@ -383,27 +396,67 @@ public void updateClusteringFileCreationMetrics(long durationInMs) { | |
reportMetrics(HoodieTimeline.CLUSTERING_ACTION, "fileCreationTime", durationInMs); | ||
} | ||
|
||
public void updateClusteringTimeLineInstantMetrics(final HoodieActiveTimeline activeTimeline) { | ||
if (config.isMetricsOn()) { | ||
// Compute Metrics | ||
long pendingClusteringInstantCount = activeTimeline.filterPendingClusteringTimeline().getInstants().size(); | ||
long earliestInflightClusteringInstantLong = 0L; | ||
Option<HoodieInstant> earliestInflightClusteringInstant = activeTimeline.filterPendingClusteringTimeline().getFirstPendingClusterInstant(); | ||
if (earliestInflightClusteringInstant.isPresent()) { | ||
earliestInflightClusteringInstantLong = Long.valueOf(earliestInflightClusteringInstant.get().requestedTime()); | ||
} | ||
long latestCompletedClusteringInstantLong = 0L; | ||
Option<HoodieInstant> latestCompletedClusteringInstant = activeTimeline.filterCompletedInstants().getLastClusteringInstant(); | ||
if (latestCompletedClusteringInstant.isPresent()) { | ||
latestCompletedClusteringInstantLong = Long.valueOf(latestCompletedClusteringInstant.get().requestedTime()); | ||
} | ||
public void updateTimelineInstantMetrics(final HoodieActiveTimeline activeTimeline) { | ||
updateEarliestPendingInstant(activeTimeline, EARLIEST_PENDING_CLUSTERING_INSTANT_STR, HoodieTimeline.CLUSTERING_ACTION); | ||
updateEarliestPendingInstant(activeTimeline, EARLIEST_PENDING_COMPACTION_INSTANT_STR, HoodieTimeline.COMPACTION_ACTION); | ||
updateEarliestPendingInstant(activeTimeline, EARLIEST_PENDING_CLEAN_INSTANT_STR, HoodieTimeline.CLEAN_ACTION); | ||
updateEarliestPendingInstant(activeTimeline, EARLIEST_PENDING_ROLLBACK_INSTANT_STR, HoodieTimeline.ROLLBACK_ACTION); | ||
|
||
updateLatestCompletedInstant(activeTimeline, LATEST_COMPLETED_CLUSTERING_INSTANT_STR, HoodieTimeline.REPLACE_COMMIT_ACTION); | ||
updateLatestCompletedInstant(activeTimeline, LATEST_COMPLETED_COMPACTION_INSTANT_STR, HoodieTimeline.COMMIT_ACTION); | ||
updateLatestCompletedInstant(activeTimeline, LATEST_COMPLETED_CLEAN_INSTANT_STR, HoodieTimeline.CLEAN_ACTION); | ||
updateLatestCompletedInstant(activeTimeline, LATEST_COMPLETED_ROLLBACK_INSTANT_STR, HoodieTimeline.ROLLBACK_ACTION); | ||
|
||
updatePendingInstantCount(activeTimeline, PENDING_CLUSTERING_INSTANT_COUNT_STR, HoodieTimeline.CLUSTERING_ACTION); | ||
updatePendingInstantCount(activeTimeline, PENDING_COMPACTION_INSTANT_COUNT_STR, HoodieTimeline.COMPACTION_ACTION); | ||
updatePendingInstantCount(activeTimeline, PENDING_CLEAN_INSTANT_COUNT_STR, HoodieTimeline.CLEAN_ACTION); | ||
updatePendingInstantCount(activeTimeline, PENDING_ROLLBACK_INSTANT_COUNT_STR, HoodieTimeline.ROLLBACK_ACTION); | ||
} | ||
|
||
private void updateEarliestPendingInstant(final HoodieActiveTimeline activeTimeline, | ||
final String metricName, | ||
final String action) { | ||
Set<String> validActions = CollectionUtils.createSet(action); | ||
HoodieTimeline filteredInstants = activeTimeline.filterInflightsAndRequested().filter(instant -> validActions.contains(instant.getAction())); | ||
Option<HoodieInstant> hoodieInstantOption = filteredInstants.firstInstant(); | ||
if (hoodieInstantOption.isPresent()) { | ||
updateMetric(action, metricName, Long.parseLong(hoodieInstantOption.get().requestedTime())); | ||
} | ||
} | ||
|
||
private void updateLatestCompletedInstant(final HoodieActiveTimeline activeTimeline, | ||
final String metricName, | ||
String action) { | ||
String tableType = config.getTableType().name(); | ||
if (HoodieTableType.MERGE_ON_READ.name().equalsIgnoreCase(tableType) | ||
&& LATEST_COMPLETED_COMPACTION_INSTANT_STR.equalsIgnoreCase(metricName)) { | ||
action = HoodieActiveTimeline.COMMIT_ACTION; | ||
} | ||
if (HoodieTableType.COPY_ON_WRITE.name().equalsIgnoreCase(tableType) | ||
&& LATEST_COMPLETED_CLUSTERING_INSTANT_STR.equalsIgnoreCase(metricName)) { | ||
action = HoodieActiveTimeline.REPLACE_COMMIT_ACTION; | ||
} | ||
Comment on lines
+431
to
+438
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we use switch style? |
||
Set<String> validActions = CollectionUtils.createSet(action); | ||
HoodieTimeline filteredInstants = activeTimeline.filterCompletedInstants().filter(instant -> validActions.contains(instant.getAction())); | ||
Option<HoodieInstant> hoodieInstantOption = filteredInstants.lastInstant(); | ||
if (hoodieInstantOption.isPresent()) { | ||
updateMetric(action, metricName, Long.parseLong(hoodieInstantOption.get().requestedTime())); | ||
} | ||
} | ||
|
||
private void updatePendingInstantCount(final HoodieActiveTimeline activeTimeline, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could we add some java docs? what does |
||
final String metricName, | ||
final String action) { | ||
Set<String> validActions = CollectionUtils.createSet(action); | ||
HoodieTimeline filteredInstants = activeTimeline.filterInflightsAndRequested().filter(instant -> validActions.contains(instant.getAction())); | ||
updateMetric(action, metricName, filteredInstants.countInstants()); | ||
} | ||
|
||
private void updateMetric(final String action, final String metricName, final long metricValue) { | ||
if (config.isMetricsOn()) { | ||
LOG.info( | ||
String.format("Sending timeline clustering instant metrics (%s=%d, %s=%d, %s=%d)", EARLIEST_INFLIGHT_CLUSTERING_INSTANT_STR, earliestInflightClusteringInstantLong, | ||
LATEST_COMPLETED_CLUSTERING_INSTANT_STR, latestCompletedClusteringInstantLong, PENDING_CLUSTERING_INSTANT_COUNT_STR, pendingClusteringInstantCount)); | ||
metrics.registerGauge(getMetricsName(HoodieTimeline.CLUSTERING_ACTION, EARLIEST_INFLIGHT_CLUSTERING_INSTANT_STR), earliestInflightClusteringInstantLong); | ||
metrics.registerGauge(getMetricsName(HoodieTimeline.CLUSTERING_ACTION, LATEST_COMPLETED_CLUSTERING_INSTANT_STR), latestCompletedClusteringInstantLong); | ||
metrics.registerGauge(getMetricsName(HoodieTimeline.CLUSTERING_ACTION, PENDING_CLUSTERING_INSTANT_COUNT_STR), pendingClusteringInstantCount); | ||
String.format("Updating timeline instant related metrics (%s=%d)", metricName, metricValue)); | ||
metrics.registerGauge(getMetricsName(action, metricName), metricValue); | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we rename this function to
updateTableServiceInstantMetrics