From 8f7f2c17242fb084f8784f015f76808bfda9b593 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Fri, 18 Oct 2024 14:29:03 -0700 Subject: [PATCH] feat(ingest/fivetran): protect against high sync volume (#11589) --- .../ingestion/source/fivetran/fivetran.py | 21 ++++--- .../source/fivetran/fivetran_log_api.py | 63 ++++++++++--------- .../source/fivetran/fivetran_query.py | 34 +++++++--- .../integration/fivetran/test_fivetran.py | 58 ++++------------- 4 files changed, 87 insertions(+), 89 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py index 704a6f20a5c19..334bb58ea84f8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py @@ -27,7 +27,10 @@ PlatformDetail, ) from datahub.ingestion.source.fivetran.data_classes import Connector, Job -from datahub.ingestion.source.fivetran.fivetran_log_api import FivetranLogAPI +from datahub.ingestion.source.fivetran.fivetran_log_api import ( + MAX_JOBS_PER_CONNECTOR, + FivetranLogAPI, +) from datahub.ingestion.source.state.stale_entity_removal_handler import ( StaleEntityRemovalHandler, ) @@ -72,11 +75,6 @@ def __init__(self, config: FivetranSourceConfig, ctx: PipelineContext): self.audit_log = FivetranLogAPI(self.config.fivetran_log_config) - # Create and register the stateful ingestion use-case handler. - self.stale_entity_removal_handler = StaleEntityRemovalHandler.create( - self, self.config, self.ctx - ) - def _extend_lineage(self, connector: Connector, datajob: DataJob) -> None: input_dataset_urn_list: List[DatasetUrn] = [] output_dataset_urn_list: List[DatasetUrn] = [] @@ -267,6 +265,13 @@ def _get_connector_workunits( ).as_workunit(is_primary_source=False) # Map Fivetran's job/sync history entity with Datahub's data process entity + if len(connector.jobs) >= MAX_JOBS_PER_CONNECTOR: + self.report.warning( + title="Not all sync history was captured", + message=f"The connector had more than {MAX_JOBS_PER_CONNECTOR} sync runs in the past {self.config.history_sync_lookback_period} days. " + f"Only the most recent {MAX_JOBS_PER_CONNECTOR} syncs were ingested.", + context=f"{connector.connector_name} (connector_id: {connector.connector_id})", + ) for job in connector.jobs: dpi = self._generate_dpi_from_job(job, datajob) yield from self._get_dpi_workunits(job, dpi) @@ -279,7 +284,9 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> Source: def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: return [ *super().get_workunit_processors(), - self.stale_entity_removal_handler.workunit_processor, + StaleEntityRemovalHandler.create( + self, self.config, self.ctx + ).workunit_processor, ] def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py index 31c16139066e4..5908efe39e2b4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_log_api.py @@ -22,6 +22,10 @@ logger: logging.Logger = logging.getLogger(__name__) +# We don't want to generate a massive number of dataProcesses for a single connector. +# This is primarily used as a safeguard to prevent performance issues. +MAX_JOBS_PER_CONNECTOR = 1000 + class FivetranLogAPI: def __init__(self, fivetran_log_config: FivetranLogConfig) -> None: @@ -158,34 +162,32 @@ def _get_table_lineage( return table_lineage_list - def _get_all_connector_sync_logs(self, syncs_interval: int) -> Dict[str, Dict]: - sync_logs = {} - for row in self._query( - self.fivetran_log_query.get_sync_logs_query().format( - db_clause=self.fivetran_log_query.db_clause, - syncs_interval=syncs_interval, - ) - ): - if row[Constant.CONNECTOR_ID] not in sync_logs: - sync_logs[row[Constant.CONNECTOR_ID]] = { - row[Constant.SYNC_ID]: { - row["message_event"]: ( - row[Constant.TIME_STAMP].timestamp(), - row[Constant.MESSAGE_DATA], - ) - } - } - elif row[Constant.SYNC_ID] not in sync_logs[row[Constant.CONNECTOR_ID]]: - sync_logs[row[Constant.CONNECTOR_ID]][row[Constant.SYNC_ID]] = { - row["message_event"]: ( - row[Constant.TIME_STAMP].timestamp(), - row[Constant.MESSAGE_DATA], - ) - } - else: - sync_logs[row[Constant.CONNECTOR_ID]][row[Constant.SYNC_ID]][ - row["message_event"] - ] = (row[Constant.TIME_STAMP].timestamp(), row[Constant.MESSAGE_DATA]) + def _get_all_connector_sync_logs( + self, syncs_interval: int, connector_ids: List[str] + ) -> Dict[str, Dict[str, Dict[str, Tuple[float, Optional[str]]]]]: + sync_logs: Dict[str, Dict[str, Dict[str, Tuple[float, Optional[str]]]]] = {} + + # Format connector_ids as a comma-separated string of quoted IDs + formatted_connector_ids = ", ".join(f"'{id}'" for id in connector_ids) + + query = self.fivetran_log_query.get_sync_logs_query().format( + db_clause=self.fivetran_log_query.db_clause, + syncs_interval=syncs_interval, + max_jobs_per_connector=MAX_JOBS_PER_CONNECTOR, + connector_ids=formatted_connector_ids, + ) + + for row in self._query(query): + connector_id = row[Constant.CONNECTOR_ID] + sync_id = row[Constant.SYNC_ID] + + if connector_id not in sync_logs: + sync_logs[connector_id] = {} + + sync_logs[connector_id][sync_id] = { + "sync_start": (row["start_time"].timestamp(), None), + "sync_end": (row["end_time"].timestamp(), row["end_message_data"]), + } return sync_logs @@ -244,7 +246,10 @@ def _fill_connectors_table_lineage(self, connectors: List[Connector]) -> None: def _fill_connectors_jobs( self, connectors: List[Connector], syncs_interval: int ) -> None: - sync_logs = self._get_all_connector_sync_logs(syncs_interval) + connector_ids = [connector.connector_id for connector in connectors] + sync_logs = self._get_all_connector_sync_logs( + syncs_interval, connector_ids=connector_ids + ) for connector in connectors: connector.jobs = self._get_jobs_list(sync_logs.get(connector.connector_id)) diff --git a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py index d965f53ff554b..c4680b4b1037a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_query.py @@ -37,14 +37,32 @@ def get_users_query(self) -> str: def get_sync_logs_query(self) -> str: return """ - SELECT connector_id, - sync_id, - message_event, - message_data, - time_stamp - FROM {db_clause}log - WHERE message_event in ('sync_start', 'sync_end') - and time_stamp > CURRENT_TIMESTAMP - INTERVAL '{syncs_interval} days'""" + WITH ranked_syncs AS ( + SELECT + connector_id, + sync_id, + MAX(CASE WHEN message_event = 'sync_start' THEN time_stamp END) as start_time, + MAX(CASE WHEN message_event = 'sync_end' THEN time_stamp END) as end_time, + MAX(CASE WHEN message_event = 'sync_end' THEN message_data END) as end_message_data, + ROW_NUMBER() OVER (PARTITION BY connector_id ORDER BY MAX(time_stamp) DESC) as rn + FROM {db_clause}log + WHERE message_event in ('sync_start', 'sync_end') + AND time_stamp > CURRENT_TIMESTAMP - INTERVAL '{syncs_interval} days' + AND connector_id IN ({connector_ids}) + GROUP BY connector_id, sync_id + ) + SELECT + connector_id, + sync_id, + start_time, + end_time, + end_message_data + FROM ranked_syncs + WHERE rn <= {max_jobs_per_connector} + AND start_time IS NOT NULL + AND end_time IS NOT NULL + ORDER BY connector_id, end_time DESC + """ def get_table_lineage_query(self) -> str: return f""" diff --git a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py index 0f5d098ee39c4..33ac09e69a3c0 100644 --- a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py +++ b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py @@ -101,64 +101,32 @@ def default_query_results( } ] elif query == fivetran_log_query.get_sync_logs_query().format( - db_clause=fivetran_log_query.db_clause, syncs_interval=7 + db_clause=fivetran_log_query.db_clause, + syncs_interval=7, + max_jobs_per_connector=1000, + connector_ids="'calendar_elected'", ): return [ { "connector_id": "calendar_elected", "sync_id": "4c9a03d6-eded-4422-a46a-163266e58243", - "message_event": "sync_start", - "message_data": None, - "time_stamp": datetime.datetime(2023, 9, 20, 6, 37, 32, 606000), + "start_time": datetime.datetime(2023, 9, 20, 6, 37, 32, 606000), + "end_time": datetime.datetime(2023, 9, 20, 6, 38, 5, 56000), + "end_message_data": '"{\\"status\\":\\"SUCCESSFUL\\"}"', }, { "connector_id": "calendar_elected", "sync_id": "f773d1e9-c791-48f4-894f-8cf9b3dfc834", - "message_event": "sync_start", - "message_data": None, - "time_stamp": datetime.datetime(2023, 10, 3, 14, 35, 30, 345000), + "start_time": datetime.datetime(2023, 10, 3, 14, 35, 30, 345000), + "end_time": datetime.datetime(2023, 10, 3, 14, 35, 31, 512000), + "end_message_data": '"{\\"reason\\":\\"Sync has been cancelled because of a user action in the dashboard.Standard Config updated.\\",\\"status\\":\\"CANCELED\\"}"', }, { "connector_id": "calendar_elected", "sync_id": "63c2fc85-600b-455f-9ba0-f576522465be", - "message_event": "sync_start", - "message_data": None, - "time_stamp": datetime.datetime(2023, 10, 3, 14, 35, 55, 401000), - }, - { - "connector_id": "calendar_elected", - "sync_id": "e773e1e9-c791-46f4-894f-8ch9b3dfc832", - "message_event": "sync_start", - "message_data": None, - "time_stamp": datetime.datetime(2023, 10, 3, 14, 37, 5, 403000), - }, - { - "connector_id": "calendar_elected", - "sync_id": "4c9a03d6-eded-4422-a46a-163266e58243", - "message_event": "sync_end", - "message_data": '"{\\"status\\":\\"SUCCESSFUL\\"}"', - "time_stamp": datetime.datetime(2023, 9, 20, 6, 38, 5, 56000), - }, - { - "connector_id": "calendar_elected", - "sync_id": "f773d1e9-c791-48f4-894f-8cf9b3dfc834", - "message_event": "sync_end", - "message_data": '"{\\"reason\\":\\"Sync has been cancelled because of a user action in the dashboard.Standard Config updated.\\",\\"status\\":\\"CANCELED\\"}"', - "time_stamp": datetime.datetime(2023, 10, 3, 14, 35, 31, 512000), - }, - { - "connector_id": "calendar_elected", - "sync_id": "63c2fc85-600b-455f-9ba0-f576522465be", - "message_event": "sync_end", - "message_data": '"{\\"reason\\":\\"java.lang.RuntimeException: FATAL: too many connections for role \\\\\\"hxwraqld\\\\\\"\\",\\"taskType\\":\\"reconnect\\",\\"status\\":\\"FAILURE_WITH_TASK\\"}"', - "time_stamp": datetime.datetime(2023, 10, 3, 14, 36, 29, 678000), - }, - { - "connector_id": "calendar_elected", - "sync_id": "e773e1e9-c791-46f4-894f-8ch9b3dfc832", - "message_event": "sync_end", - "message_data": None, - "time_stamp": datetime.datetime(2023, 10, 3, 14, 37, 35, 478000), + "start_time": datetime.datetime(2023, 10, 3, 14, 35, 55, 401000), + "end_time": datetime.datetime(2023, 10, 3, 14, 36, 29, 678000), + "end_message_data": '"{\\"reason\\":\\"java.lang.RuntimeException: FATAL: too many connections for role \\\\\\"hxwraqld\\\\\\"\\",\\"taskType\\":\\"reconnect\\",\\"status\\":\\"FAILURE_WITH_TASK\\"}"', }, ] # Unreachable code