From 7cdff5a61c33575571235a6625e8e5efeab7502a Mon Sep 17 00:00:00 2001 From: Siddique Bagwan Date: Mon, 18 Nov 2024 15:11:05 +0530 Subject: [PATCH 01/11] configurable: convert upstream column to lowercase --- .../source/looker/lookml_concept_context.py | 3 +-- .../ingestion/source/looker/lookml_config.py | 6 +++++ .../ingestion/source/looker/view_upstream.py | 24 +++++++++++++++---- .../top_10_employee_income_source.view.lkml | 2 +- .../duplicate_field_ingestion_golden.json | 10 ++++---- .../tests/integration/lookml/test_lookml.py | 2 ++ 6 files changed, 35 insertions(+), 12 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_concept_context.py b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_concept_context.py index ce4a242027e11..2451dfa7081a1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_concept_context.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_concept_context.py @@ -88,8 +88,7 @@ def column_name_in_sql_attribute(self) -> List[str]: for upstream_field_match in re.finditer(r"\${TABLE}\.[\"]*([\.\w]+)", sql): matched_field = upstream_field_match.group(1) # Remove quotes from field names - matched_field = matched_field.replace('"', "").replace("`", "").lower() - column_names.append(matched_field) + column_names.append(matched_field.replace('"', "").replace("`", "")) return column_names diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_config.py b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_config.py index da837da161386..4285cb1bd9b1c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_config.py @@ -178,6 +178,12 @@ class LookMLSourceConfig( "All if comments are evaluated to true for configured looker_environment value", ) + convert_upstream_column_to_lowercase: bool = Field( + default=True, + description="Indicates whether to convert upstream column names to lowercase (default: True). " + "Enable this option if column-level lineage is not functioning correctly and retry the ingestion process.", + ) + @validator("connection_to_platform_map", pre=True) def convert_string_to_connection_def(cls, conn_map): # Previous version of config supported strings in connection map. This upconverts strings to ConnectionMap diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py index 057dbca428184..5a9ca49d3fa21 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py @@ -237,6 +237,14 @@ def get_upstream_dataset_urn(self) -> List[Urn]: def create_fields(self) -> List[ViewField]: return [] # it is for the special case + def convert_upstream_column_to_lowercase(self, columns: List[str]) -> List[str]: + return [ + column.lower() + if self.config.convert_upstream_column_to_lowercase + else column + for column in columns + ] + class SqlBasedDerivedViewUpstream(AbstractViewUpstream, ABC): """ @@ -380,7 +388,9 @@ def get_upstream_column_ref( ], # 0th index has table of from clause column=column, ) - for column in field_context.column_name_in_sql_attribute() + for column in self.convert_upstream_column_to_lowercase( + field_context.column_name_in_sql_attribute() + ) ] # fix any derived view reference present in urn @@ -489,7 +499,9 @@ def get_upstream_column_ref( explore_urn: str = self._get_upstream_dataset_urn()[0] - for column in field_context.column_name_in_sql_attribute(): + for column in self.convert_upstream_column_to_lowercase( + field_context.column_name_in_sql_attribute() + ): if column in self._get_explore_column_mapping(): explore_column: Dict = self._get_explore_column_mapping()[column] upstream_column_refs.append( @@ -551,7 +563,9 @@ def get_upstream_column_ref( ) -> List[ColumnRef]: upstream_column_ref: List[ColumnRef] = [] - for column_name in field_context.column_name_in_sql_attribute(): + for column_name in self.convert_upstream_column_to_lowercase( + field_context.column_name_in_sql_attribute() + ): upstream_column_ref.append( ColumnRef(table=self._get_upstream_dataset_urn(), column=column_name) ) @@ -613,7 +627,9 @@ def get_upstream_column_ref( if not self._get_upstream_dataset_urn(): return upstream_column_ref - for column_name in field_context.column_name_in_sql_attribute(): + for column_name in self.convert_upstream_column_to_lowercase( + field_context.column_name_in_sql_attribute() + ): upstream_column_ref.append( ColumnRef(table=self._get_upstream_dataset_urn()[0], column=column_name) ) diff --git a/metadata-ingestion/tests/integration/lookml/drop_hive_dot/top_10_employee_income_source.view.lkml b/metadata-ingestion/tests/integration/lookml/drop_hive_dot/top_10_employee_income_source.view.lkml index 149ce9219b54b..742b6d99da110 100644 --- a/metadata-ingestion/tests/integration/lookml/drop_hive_dot/top_10_employee_income_source.view.lkml +++ b/metadata-ingestion/tests/integration/lookml/drop_hive_dot/top_10_employee_income_source.view.lkml @@ -11,7 +11,7 @@ view: top_10_employee_income_source { dimension: id { type: number - sql: ${TABLE}.id ;; + sql: ${TABLE}.id;; } dimension: name { diff --git a/metadata-ingestion/tests/integration/lookml/duplicate_field_ingestion_golden.json b/metadata-ingestion/tests/integration/lookml/duplicate_field_ingestion_golden.json index f88dfba42eb97..01cd7bd9561ea 100644 --- a/metadata-ingestion/tests/integration/lookml/duplicate_field_ingestion_golden.json +++ b/metadata-ingestion/tests/integration/lookml/duplicate_field_ingestion_golden.json @@ -176,7 +176,7 @@ { "upstreamType": "FIELD_SET", "upstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,.public.dataset_lineages,PROD),entity)" + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,.public.dataset_lineages,PROD),ENTITY)" ], "downstreamType": "FIELD", "downstreams": [ @@ -187,7 +187,7 @@ { "upstreamType": "FIELD_SET", "upstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,.public.dataset_lineages,PROD),metadata)" + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,.public.dataset_lineages,PROD),METADATA)" ], "downstreamType": "FIELD", "downstreams": [ @@ -198,7 +198,7 @@ { "upstreamType": "FIELD_SET", "upstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,.public.dataset_lineages,PROD),urn)" + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,.public.dataset_lineages,PROD),URN)" ], "downstreamType": "FIELD", "downstreams": [ @@ -209,7 +209,7 @@ { "upstreamType": "FIELD_SET", "upstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,.public.dataset_lineages,PROD),version)" + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,.public.dataset_lineages,PROD),VERSION)" ], "downstreamType": "FIELD", "downstreams": [ @@ -220,7 +220,7 @@ { "upstreamType": "FIELD_SET", "upstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,.public.dataset_lineages,PROD),createdon)" + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,.public.dataset_lineages,PROD),CREATEDON)" ], "downstreamType": "FIELD", "downstreams": [ diff --git a/metadata-ingestion/tests/integration/lookml/test_lookml.py b/metadata-ingestion/tests/integration/lookml/test_lookml.py index 94b3b103d0548..fc268703692a0 100644 --- a/metadata-ingestion/tests/integration/lookml/test_lookml.py +++ b/metadata-ingestion/tests/integration/lookml/test_lookml.py @@ -842,6 +842,8 @@ def test_duplicate_field_ingest(pytestconfig, tmp_path, mock_time): f"{test_resources_dir}/lkml_samples_duplicate_field", ) + new_recipe["source"]["config"]["convert_upstream_column_to_lowercase"] = False + pipeline = Pipeline.create(new_recipe) pipeline.run() pipeline.pretty_print_summary() From 3d39e4a4aed6e2c9e5a4f343473d3318b1294235 Mon Sep 17 00:00:00 2001 From: Siddique Bagwan Date: Sat, 23 Nov 2024 23:19:14 +0530 Subject: [PATCH 02/11] wip --- .../source/looker/lookml_concept_context.py | 2 +- .../ingestion/source/looker/view_upstream.py | 115 +++++++++++------- .../datahub/sql_parsing/schema_resolver.py | 21 +++- .../datahub/sql_parsing/sqlglot_lineage.py | 44 +++++-- 4 files changed, 122 insertions(+), 60 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_concept_context.py b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_concept_context.py index 1968c05c8ee84..103f4175a9ccf 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_concept_context.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_concept_context.py @@ -88,7 +88,7 @@ def column_name_in_sql_attribute(self) -> List[str]: for upstream_field_match in re.finditer(r"\${TABLE}\.[\"]*([\.\w]+)", sql): matched_field = upstream_field_match.group(1) # Remove quotes from field names - column_names.append(matched_field.replace('"', "").replace("`", "")) + column_names.append(matched_field.replace('"', "").replace("`", "").lower()) return column_names diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py index efc241746ea4f..9f9a2d312d8cd 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py @@ -25,6 +25,7 @@ LookMLSourceReport, ) from datahub.ingestion.source.looker.urn_functions import get_qualified_table_name +from datahub.sql_parsing.schema_resolver import SchemaResolver from datahub.sql_parsing.sqlglot_lineage import ( ColumnLineageInfo, ColumnRef, @@ -236,13 +237,53 @@ def get_upstream_dataset_urn(self) -> List[Urn]: def create_fields(self) -> List[ViewField]: return [] # it is for the special case - def convert_upstream_column_to_lowercase(self, columns: List[str]) -> List[str]: - return [ - column.lower() - if self.config.convert_upstream_column_to_lowercase - else column - for column in columns - ] + def create_upstream_column_refs( + self, upstream_urn: str, expected_columns: List[str] + ) -> List[ColumnRef]: + """ + - **`upstream_urn`**: The URN of the upstream dataset. + + - **`expected_columns`**: These are the columns identified by the Looker connector as belonging to the `upstream_urn` dataset. However, there is potential for human error in specifying the columns of the upstream dataset. For example, a user might declare a column in lowercase, while on the actual platform, it may exist in uppercase, or vice versa. + + - This function ensures consistency in column-level lineage by consulting GMS before creating the final `ColumnRef` instance, avoiding discrepancies. + """ + + actual_columns: List[str] = [] + + with SchemaResolver( + platform=self.view_context.view_connection.platform, + platform_instance=self.view_context.view_connection.platform_instance, + env=self.view_context.view_connection.platform_env or self.config.env, + graph=self.ctx.graph, + ) as schema_resolver: + urn, schema_info = schema_resolver.resolve_urn(urn=upstream_urn) + + if schema_info: + column_from_gms: List[str] = list( + schema_info.keys() + ) # list() to silent lint + actual_columns = [ + column + for column in column_from_gms + if column.lower() in map(str.lower, expected_columns) + ] + else: + logger.info( + f"schema_info not found for dataset {urn} in GMS. Using expected_columns to form ColumnRef" + ) + actual_columns = expected_columns + + upstream_column_refs: List[ColumnRef] = [] + + for column in actual_columns: + upstream_column_refs.append( + ColumnRef( + column=column, + table=upstream_urn, + ) + ) + + return upstream_column_refs class SqlBasedDerivedViewUpstream(AbstractViewUpstream, ABC): @@ -380,17 +421,12 @@ def get_upstream_column_ref( # in-case of "select * from look_ml_view.SQL_TABLE_NAME" or extra field are defined in the looker view which is # referring to upstream table if self._get_upstream_dataset_urn() and not upstreams_column_refs: - upstreams_column_refs = [ - ColumnRef( - table=self._get_upstream_dataset_urn()[ - 0 - ], # 0th index has table of from clause - column=column, - ) - for column in self.convert_upstream_column_to_lowercase( - field_context.column_name_in_sql_attribute() - ) - ] + upstreams_column_refs = self.create_upstream_column_refs( + upstream_urn=self._get_upstream_dataset_urn()[ + 0 + ], # 0th index has table of from clause, + expected_columns=field_context.column_name_in_sql_attribute(), + ) # fix any derived view reference present in urn upstreams_column_refs = resolve_derived_view_urn_of_col_ref( @@ -497,20 +533,18 @@ def get_upstream_column_ref( return upstream_column_refs explore_urn: str = self._get_upstream_dataset_urn()[0] + expected_columns: List[str] = [] - for column in self.convert_upstream_column_to_lowercase( - field_context.column_name_in_sql_attribute() - ): + for column in field_context.column_name_in_sql_attribute(): if column in self._get_explore_column_mapping(): explore_column: Dict = self._get_explore_column_mapping()[column] - upstream_column_refs.append( - ColumnRef( - column=explore_column.get("field", explore_column[NAME]), - table=explore_urn, - ) + expected_columns.append( + explore_column.get("field", explore_column[NAME]) ) - return upstream_column_refs + return self.create_upstream_column_refs( + upstream_urn=explore_urn, expected_columns=expected_columns + ) def get_upstream_dataset_urn(self) -> List[Urn]: return self._get_upstream_dataset_urn() @@ -560,16 +594,10 @@ def __get_upstream_dataset_urn(self) -> Urn: def get_upstream_column_ref( self, field_context: LookerFieldContext ) -> List[ColumnRef]: - upstream_column_ref: List[ColumnRef] = [] - - for column_name in self.convert_upstream_column_to_lowercase( - field_context.column_name_in_sql_attribute() - ): - upstream_column_ref.append( - ColumnRef(table=self._get_upstream_dataset_urn(), column=column_name) - ) - - return upstream_column_ref + return self.create_upstream_column_refs( + upstream_urn=self._get_upstream_dataset_urn(), + expected_columns=field_context.column_name_in_sql_attribute(), + ) def get_upstream_dataset_urn(self) -> List[Urn]: return [self._get_upstream_dataset_urn()] @@ -623,17 +651,14 @@ def get_upstream_column_ref( self, field_context: LookerFieldContext ) -> List[ColumnRef]: upstream_column_ref: List[ColumnRef] = [] + if not self._get_upstream_dataset_urn(): return upstream_column_ref - for column_name in self.convert_upstream_column_to_lowercase( - field_context.column_name_in_sql_attribute() - ): - upstream_column_ref.append( - ColumnRef(table=self._get_upstream_dataset_urn()[0], column=column_name) - ) - - return upstream_column_ref + return self.create_upstream_column_refs( + upstream_urn=self._get_upstream_dataset_urn()[0], + expected_columns=field_context.column_name_in_sql_attribute(), + ) def get_upstream_dataset_urn(self) -> List[Urn]: return self._get_upstream_dataset_urn() diff --git a/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py b/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py index e3f2fbc786b43..886efa5e18201 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py +++ b/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py @@ -1,6 +1,7 @@ import contextlib import pathlib -from typing import Dict, List, Optional, Protocol, Set, Tuple +from types import TracebackType +from typing import Dict, List, Optional, Protocol, Set, Tuple, Type from typing_extensions import TypedDict @@ -73,6 +74,17 @@ def __init__( extra_columns={"is_missing": lambda v: v is None}, ) + def __enter__(self) -> "SchemaResolver": + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_value: Optional[BaseException], + traceback: Optional[TracebackType], + ) -> None: + self.close() + @property def platform(self) -> str: return self._platform @@ -123,6 +135,13 @@ def get_urn_for_table( ) return urn + def resolve_urn(self, urn: str) -> Tuple[str, Optional[SchemaInfo]]: + schema_info = self._resolve_schema_info(urn) + if schema_info: + return urn, schema_info + + return urn, None + def resolve_table(self, table: _TableName) -> Tuple[str, Optional[SchemaInfo]]: urn = self.get_urn_for_table(table) diff --git a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py index 9adb792a4be51..1ae176cf6adb9 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py @@ -1181,6 +1181,28 @@ def sqlglot_lineage( ) +def create_schema_resolver( + platform: str, + env: str, + graph: Optional[DataHubGraph] = None, + platform_instance: Optional[str] = None, + schema_aware: bool = True, +) -> SchemaResolver: + if graph and schema_aware: + return graph._make_schema_resolver( + platform=platform, + platform_instance=platform_instance, + env=env, + ) + + return SchemaResolver( + platform=platform, + platform_instance=platform_instance, + env=env, + graph=None, + ) + + def create_lineage_sql_parsed_result( query: str, default_db: Optional[str], @@ -1191,21 +1213,17 @@ def create_lineage_sql_parsed_result( graph: Optional[DataHubGraph] = None, schema_aware: bool = True, ) -> SqlParsingResult: + schema_resolver = create_schema_resolver( + platform=platform, + platform_instance=platform_instance, + env=env, + schema_aware=schema_aware, + graph=graph, + ) + + needs_close: bool = False if graph and schema_aware: - needs_close = False - schema_resolver = graph._make_schema_resolver( - platform=platform, - platform_instance=platform_instance, - env=env, - ) - else: needs_close = True - schema_resolver = SchemaResolver( - platform=platform, - platform_instance=platform_instance, - env=env, - graph=None, - ) try: return sqlglot_lineage( From b33a0a97d3e6cdfb4d439b9d182b337ae1303722 Mon Sep 17 00:00:00 2001 From: Siddique Bagwan Date: Sun, 24 Nov 2024 21:52:27 +0530 Subject: [PATCH 03/11] existing test-case are working --- .../datahub/ingestion/source/looker/lookml_config.py | 6 ------ .../datahub/ingestion/source/looker/view_upstream.py | 1 - .../top_10_employee_income_source.view.lkml | 2 +- .../lookml/duplicate_field_ingestion_golden.json | 10 +++++----- .../tests/integration/lookml/test_lookml.py | 4 +--- 5 files changed, 7 insertions(+), 16 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_config.py b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_config.py index aef5386034561..7ffb895349ed2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_config.py @@ -172,12 +172,6 @@ class LookMLSourceConfig( "All if comments are evaluated to true for configured looker_environment value", ) - convert_upstream_column_to_lowercase: bool = Field( - default=True, - description="Indicates whether to convert upstream column names to lowercase (default: True). " - "Enable this option if column-level lineage is not functioning correctly and retry the ingestion process.", - ) - @validator("connection_to_platform_map", pre=True) def convert_string_to_connection_def(cls, conn_map): # Previous version of config supported strings in connection map. This upconverts strings to ConnectionMap diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py index 9f9a2d312d8cd..566dbf9fadccc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py @@ -249,7 +249,6 @@ def create_upstream_column_refs( """ actual_columns: List[str] = [] - with SchemaResolver( platform=self.view_context.view_connection.platform, platform_instance=self.view_context.view_connection.platform_instance, diff --git a/metadata-ingestion/tests/integration/lookml/drop_hive_dot/top_10_employee_income_source.view.lkml b/metadata-ingestion/tests/integration/lookml/drop_hive_dot/top_10_employee_income_source.view.lkml index 742b6d99da110..149ce9219b54b 100644 --- a/metadata-ingestion/tests/integration/lookml/drop_hive_dot/top_10_employee_income_source.view.lkml +++ b/metadata-ingestion/tests/integration/lookml/drop_hive_dot/top_10_employee_income_source.view.lkml @@ -11,7 +11,7 @@ view: top_10_employee_income_source { dimension: id { type: number - sql: ${TABLE}.id;; + sql: ${TABLE}.id ;; } dimension: name { diff --git a/metadata-ingestion/tests/integration/lookml/duplicate_field_ingestion_golden.json b/metadata-ingestion/tests/integration/lookml/duplicate_field_ingestion_golden.json index 01cd7bd9561ea..f88dfba42eb97 100644 --- a/metadata-ingestion/tests/integration/lookml/duplicate_field_ingestion_golden.json +++ b/metadata-ingestion/tests/integration/lookml/duplicate_field_ingestion_golden.json @@ -176,7 +176,7 @@ { "upstreamType": "FIELD_SET", "upstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,.public.dataset_lineages,PROD),ENTITY)" + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,.public.dataset_lineages,PROD),entity)" ], "downstreamType": "FIELD", "downstreams": [ @@ -187,7 +187,7 @@ { "upstreamType": "FIELD_SET", "upstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,.public.dataset_lineages,PROD),METADATA)" + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,.public.dataset_lineages,PROD),metadata)" ], "downstreamType": "FIELD", "downstreams": [ @@ -198,7 +198,7 @@ { "upstreamType": "FIELD_SET", "upstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,.public.dataset_lineages,PROD),URN)" + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,.public.dataset_lineages,PROD),urn)" ], "downstreamType": "FIELD", "downstreams": [ @@ -209,7 +209,7 @@ { "upstreamType": "FIELD_SET", "upstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,.public.dataset_lineages,PROD),VERSION)" + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,.public.dataset_lineages,PROD),version)" ], "downstreamType": "FIELD", "downstreams": [ @@ -220,7 +220,7 @@ { "upstreamType": "FIELD_SET", "upstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,.public.dataset_lineages,PROD),CREATEDON)" + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,.public.dataset_lineages,PROD),createdon)" ], "downstreamType": "FIELD", "downstreams": [ diff --git a/metadata-ingestion/tests/integration/lookml/test_lookml.py b/metadata-ingestion/tests/integration/lookml/test_lookml.py index 2aef5a0ae8b58..ea1b903de0de5 100644 --- a/metadata-ingestion/tests/integration/lookml/test_lookml.py +++ b/metadata-ingestion/tests/integration/lookml/test_lookml.py @@ -62,7 +62,7 @@ def get_default_recipe(output_file_path, base_folder_path): @freeze_time(FROZEN_TIME) def test_lookml_ingest(pytestconfig, tmp_path, mock_time): - """Test backwards compatibility with previous form of config with new flags turned off""" + """Test backwards compatibility with a previous form of config with new flags turned off""" test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml" mce_out_file = "expected_output.json" @@ -839,8 +839,6 @@ def test_duplicate_field_ingest(pytestconfig, tmp_path, mock_time): f"{test_resources_dir}/lkml_samples_duplicate_field", ) - new_recipe["source"]["config"]["convert_upstream_column_to_lowercase"] = False - pipeline = Pipeline.create(new_recipe) pipeline.run() pipeline.pretty_print_summary() From ff3077e8a74a894ada0863855e189a3900ce2303 Mon Sep 17 00:00:00 2001 From: Siddique Bagwan Date: Sun, 24 Nov 2024 22:32:35 +0530 Subject: [PATCH 04/11] test case for column resolution from gms --- .../ingestion/source/looker/view_upstream.py | 19 +- .../gms_schema_resolution/data.model.lkml | 6 + .../top_10_employee_income_source.view.lkml | 18 + .../lookml/gms_schema_resolution_golden.json | 358 ++++++++++++++++++ .../tests/integration/lookml/test_lookml.py | 42 +- 5 files changed, 437 insertions(+), 6 deletions(-) create mode 100644 metadata-ingestion/tests/integration/lookml/gms_schema_resolution/data.model.lkml create mode 100644 metadata-ingestion/tests/integration/lookml/gms_schema_resolution/top_10_employee_income_source.view.lkml create mode 100644 metadata-ingestion/tests/integration/lookml/gms_schema_resolution_golden.json diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py index 566dbf9fadccc..608568321858b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py @@ -2,7 +2,7 @@ import re from abc import ABC, abstractmethod from functools import lru_cache -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Tuple from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance from datahub.ingestion.api.common import PipelineContext @@ -25,7 +25,7 @@ LookMLSourceReport, ) from datahub.ingestion.source.looker.urn_functions import get_qualified_table_name -from datahub.sql_parsing.schema_resolver import SchemaResolver +from datahub.sql_parsing.schema_resolver import SchemaInfo, SchemaResolver from datahub.sql_parsing.sqlglot_lineage import ( ColumnLineageInfo, ColumnRef, @@ -198,10 +198,20 @@ def _generate_fully_qualified_name( return sql_table_name.lower() +@lru_cache(maxsize=128) +def _get_schema_info( + schema_resolver: SchemaResolver, dataset_urn: str +) -> Tuple[str, Optional[SchemaInfo]]: + """ + For each field of lookml view, this function is getting called and hence added lru_cache + """ + return schema_resolver.resolve_urn(urn=dataset_urn) + + class AbstractViewUpstream(ABC): """ Implementation of this interface extracts the view upstream as per the way the view is bound to datasets. - For detail explanation please refer lookml_concept_context.LookerViewContext documentation. + For detail explanation, please refer lookml_concept_context.LookerViewContext documentation. """ view_context: LookerViewContext @@ -247,7 +257,6 @@ def create_upstream_column_refs( - This function ensures consistency in column-level lineage by consulting GMS before creating the final `ColumnRef` instance, avoiding discrepancies. """ - actual_columns: List[str] = [] with SchemaResolver( platform=self.view_context.view_connection.platform, @@ -255,7 +264,7 @@ def create_upstream_column_refs( env=self.view_context.view_connection.platform_env or self.config.env, graph=self.ctx.graph, ) as schema_resolver: - urn, schema_info = schema_resolver.resolve_urn(urn=upstream_urn) + urn, schema_info = _get_schema_info(schema_resolver, upstream_urn) if schema_info: column_from_gms: List[str] = list( diff --git a/metadata-ingestion/tests/integration/lookml/gms_schema_resolution/data.model.lkml b/metadata-ingestion/tests/integration/lookml/gms_schema_resolution/data.model.lkml new file mode 100644 index 0000000000000..95391f6a73e63 --- /dev/null +++ b/metadata-ingestion/tests/integration/lookml/gms_schema_resolution/data.model.lkml @@ -0,0 +1,6 @@ +connection: "my_connection" + +include: "top_10_employee_income_source.view.lkml" + +explore: top_10_employee_income_source { +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/lookml/gms_schema_resolution/top_10_employee_income_source.view.lkml b/metadata-ingestion/tests/integration/lookml/gms_schema_resolution/top_10_employee_income_source.view.lkml new file mode 100644 index 0000000000000..6037bab33c44f --- /dev/null +++ b/metadata-ingestion/tests/integration/lookml/gms_schema_resolution/top_10_employee_income_source.view.lkml @@ -0,0 +1,18 @@ +view: top_10_employee_income_source { + sql_table_name: "db.public.employee" + ;; + dimension: id { + type: number + sql: ${TABLE}.id ;; + } + + dimension: name { + type: string + sql: ${TABLE}.name ;; + } + + dimension: source { + type: string + sql: ${TABLE}.source ;; + } +} \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/lookml/gms_schema_resolution_golden.json b/metadata-ingestion/tests/integration/lookml/gms_schema_resolution_golden.json new file mode 100644 index 0000000000000..9b0dd78ca1e8e --- /dev/null +++ b/metadata-ingestion/tests/integration/lookml/gms_schema_resolution_golden.json @@ -0,0 +1,358 @@ +[ +{ + "entityType": "container", + "entityUrn": "urn:li:container:78f22c19304954b15e8adb1d9809975e", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "looker", + "env": "PROD", + "project_name": "lkml_samples" + }, + "name": "lkml_samples", + "env": "PROD" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:78f22c19304954b15e8adb1d9809975e", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:78f22c19304954b15e8adb1d9809975e", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:looker" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:78f22c19304954b15e8adb1d9809975e", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "LookML Project" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:78f22c19304954b15e8adb1d9809975e", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "Folders" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.top_10_employee_income_source,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "View" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.top_10_employee_income_source,PROD)", + "changeType": "UPSERT", + "aspectName": "viewProperties", + "aspect": { + "json": { + "materialized": false, + "viewLogic": "view: top_10_employee_income_source {\n sql_table_name: \"db.public.employee\"\n ;;\n dimension: id {\n type: number\n sql: ${TABLE}.id ;;\n }\n\n dimension: name {\n type: string\n sql: ${TABLE}.name ;;\n }\n\n dimension: source {\n type: string\n sql: ${TABLE}.source ;;\n }\n}", + "viewLanguage": "lookml" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.top_10_employee_income_source,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:78f22c19304954b15e8adb1d9809975e" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "proposedSnapshot": { + "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { + "urn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.top_10_employee_income_source,PROD)", + "aspects": [ + { + "com.linkedin.pegasus2avro.common.BrowsePaths": { + "paths": [ + "/Develop/lkml_samples/" + ] + } + }, + { + "com.linkedin.pegasus2avro.common.Status": { + "removed": false + } + }, + { + "com.linkedin.pegasus2avro.dataset.UpstreamLineage": { + "upstreams": [ + { + "auditStamp": { + "time": 1586847600000, + "actor": "urn:li:corpuser:datahub" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:hive,public.employee,PROD)", + "type": "VIEW" + } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,public.employee,PROD),Id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.top_10_employee_income_source,PROD),id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,public.employee,PROD),Name)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.top_10_employee_income_source,PROD),name)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,public.employee,PROD),source)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.top_10_employee_income_source,PROD),source)" + ], + "confidenceScore": 1.0 + } + ] + } + }, + { + "com.linkedin.pegasus2avro.schema.SchemaMetadata": { + "schemaName": "top_10_employee_income_source", + "platform": "urn:li:dataPlatform:looker", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.pegasus2avro.schema.OtherSchema": { + "rawSchema": "" + } + }, + "fields": [ + { + "fieldPath": "id", + "nullable": false, + "description": "", + "label": "", + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "number", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "name", + "nullable": false, + "description": "", + "label": "", + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "source", + "nullable": false, + "description": "", + "label": "", + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + } + ] + }, + "isPartOfKey": false + } + ], + "primaryKeys": [] + } + }, + { + "com.linkedin.pegasus2avro.dataset.DatasetProperties": { + "customProperties": { + "looker.file.path": "top_10_employee_income_source.view.lkml", + "looker.model": "data" + }, + "name": "top_10_employee_income_source", + "tags": [] + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.top_10_employee_income_source,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "Develop" + }, + { + "id": "urn:li:container:78f22c19304954b15e8adb1d9809975e", + "urn": "urn:li:container:78f22c19304954b15e8adb1d9809975e" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "tag", + "entityUrn": "urn:li:tag:Dimension", + "changeType": "UPSERT", + "aspectName": "tagKey", + "aspect": { + "json": { + "name": "Dimension" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/lookml/test_lookml.py b/metadata-ingestion/tests/integration/lookml/test_lookml.py index ea1b903de0de5..5637917c73104 100644 --- a/metadata-ingestion/tests/integration/lookml/test_lookml.py +++ b/metadata-ingestion/tests/integration/lookml/test_lookml.py @@ -2,7 +2,7 @@ import pathlib from typing import Any, List from unittest import mock -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import pydantic import pytest @@ -1013,3 +1013,43 @@ def test_drop_hive(pytestconfig, tmp_path, mock_time): output_path=tmp_path / mce_out_file, golden_path=golden_path, ) + + +@freeze_time(FROZEN_TIME) +def test_gms_schema_resolution(pytestconfig, tmp_path, mock_time): + test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml" + mce_out_file = "drop_hive_dot.json" + + new_recipe = get_default_recipe( + f"{tmp_path}/{mce_out_file}", + f"{test_resources_dir}/gms_schema_resolution", + ) + + new_recipe["source"]["config"]["connection_to_platform_map"] = { + "my_connection": "hive" + } + + with patch( + "datahub.ingestion.source.looker.view_upstream.SchemaResolver" + ) as MockSchemaResolver: + mock_instance = MockSchemaResolver.return_value + mock_instance.__enter__.return_value = mock_instance + mock_instance.resolve_urn.return_value = ( + "urn:li:dataset:(urn:li:dataPlatform:dbt, db.public.employee, PROD)", + { + "Id": "String", + "Name": "String", + "source": "String", + }, + ) + pipeline = Pipeline.create(new_recipe) + pipeline.run() + pipeline.pretty_print_summary() + pipeline.raise_from_status(raise_warnings=True) + + golden_path = test_resources_dir / "gms_schema_resolution_golden.json" + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / mce_out_file, + golden_path=golden_path, + ) From e877cd0ed9b33398f4e158f9d8ef7cb6540a584a Mon Sep 17 00:00:00 2001 From: Siddique Bagwan Date: Sun, 1 Dec 2024 13:39:03 +0530 Subject: [PATCH 05/11] address review comments --- .../ingestion/source/looker/view_upstream.py | 33 ++++++++----------- .../datahub/sql_parsing/schema_resolver.py | 14 +------- .../datahub/sql_parsing/sqlglot_lineage.py | 16 +++++++++ .../tests/integration/lookml/test_lookml.py | 26 +++++++-------- 4 files changed, 43 insertions(+), 46 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py index 608568321858b..53c3647dfd378 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py @@ -32,6 +32,8 @@ SqlParsingResult, Urn, create_lineage_sql_parsed_result, + create_schema_resolver, + infer_schema_columns, ) logger = logging.getLogger(__name__) @@ -257,29 +259,22 @@ def create_upstream_column_refs( - This function ensures consistency in column-level lineage by consulting GMS before creating the final `ColumnRef` instance, avoiding discrepancies. """ - actual_columns: List[str] = [] - with SchemaResolver( + schema_resolver = create_schema_resolver( platform=self.view_context.view_connection.platform, platform_instance=self.view_context.view_connection.platform_instance, env=self.view_context.view_connection.platform_env or self.config.env, graph=self.ctx.graph, - ) as schema_resolver: - urn, schema_info = _get_schema_info(schema_resolver, upstream_urn) - - if schema_info: - column_from_gms: List[str] = list( - schema_info.keys() - ) # list() to silent lint - actual_columns = [ - column - for column in column_from_gms - if column.lower() in map(str.lower, expected_columns) - ] - else: - logger.info( - f"schema_info not found for dataset {urn} in GMS. Using expected_columns to form ColumnRef" - ) - actual_columns = expected_columns + ) + + urn, schema_info = _get_schema_info(schema_resolver, upstream_urn) + + if schema_info: + actual_columns = infer_schema_columns(schema_info, expected_columns) + else: + logger.info( + f"schema_info not found for dataset {urn} in GMS. Using expected_columns to form ColumnRef" + ) + actual_columns = expected_columns upstream_column_refs: List[ColumnRef] = [] diff --git a/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py b/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py index 886efa5e18201..c1f70fc9e34a8 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py +++ b/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py @@ -1,7 +1,6 @@ import contextlib import pathlib -from types import TracebackType -from typing import Dict, List, Optional, Protocol, Set, Tuple, Type +from typing import Dict, List, Optional, Protocol, Set, Tuple from typing_extensions import TypedDict @@ -74,17 +73,6 @@ def __init__( extra_columns={"is_missing": lambda v: v is None}, ) - def __enter__(self) -> "SchemaResolver": - return self - - def __exit__( - self, - exc_type: Optional[Type[BaseException]], - exc_value: Optional[BaseException], - traceback: Optional[TracebackType], - ) -> None: - self.close() - @property def platform(self) -> str: return self._platform diff --git a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py index 1ae176cf6adb9..820c219bd63fb 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py @@ -1181,6 +1181,7 @@ def sqlglot_lineage( ) +@functools.lru_cache(maxsize=128) def create_schema_resolver( platform: str, env: str, @@ -1193,6 +1194,7 @@ def create_schema_resolver( platform=platform, platform_instance=platform_instance, env=env, + graph=graph, ) return SchemaResolver( @@ -1258,6 +1260,20 @@ def infer_output_schema(result: SqlParsingResult) -> Optional[List[SchemaFieldCl return output_schema +def infer_schema_columns( + schema_info: SchemaInfo, expected_columns: List[str] +) -> List[str]: + column_from_gms: List[str] = list(schema_info.keys()) # list() to silent lint + + actual_columns: List[str] = [ + column + for column in column_from_gms + if column.lower() in map(str.lower, expected_columns) + ] + + return actual_columns + + def view_definition_lineage_helper( result: SqlParsingResult, view_urn: str ) -> SqlParsingResult: diff --git a/metadata-ingestion/tests/integration/lookml/test_lookml.py b/metadata-ingestion/tests/integration/lookml/test_lookml.py index 5637917c73104..7e3ac22741303 100644 --- a/metadata-ingestion/tests/integration/lookml/test_lookml.py +++ b/metadata-ingestion/tests/integration/lookml/test_lookml.py @@ -1,6 +1,6 @@ import logging import pathlib -from typing import Any, List +from typing import Any, List, Optional, Tuple from unittest import mock from unittest.mock import MagicMock, patch @@ -25,6 +25,7 @@ MetadataChangeEventClass, UpstreamLineageClass, ) +from datahub.sql_parsing.schema_resolver import SchemaInfo, SchemaResolver from tests.test_helpers import mce_helpers from tests.test_helpers.state_helpers import get_current_checkpoint_from_pipeline @@ -1029,19 +1030,16 @@ def test_gms_schema_resolution(pytestconfig, tmp_path, mock_time): "my_connection": "hive" } - with patch( - "datahub.ingestion.source.looker.view_upstream.SchemaResolver" - ) as MockSchemaResolver: - mock_instance = MockSchemaResolver.return_value - mock_instance.__enter__.return_value = mock_instance - mock_instance.resolve_urn.return_value = ( - "urn:li:dataset:(urn:li:dataPlatform:dbt, db.public.employee, PROD)", - { - "Id": "String", - "Name": "String", - "source": "String", - }, - ) + return_value: Tuple[str, Optional[SchemaInfo]] = ( + "urn:li:dataset:(urn:li:dataPlatform:dbt, db.public.employee, PROD)", + { + "Id": "String", + "Name": "String", + "source": "String", + }, + ) + + with patch.object(SchemaResolver, "resolve_urn", return_value=return_value): pipeline = Pipeline.create(new_recipe) pipeline.run() pipeline.pretty_print_summary() From bfa92e0ded7380675a20f9368dff4bcd1fa1b611 Mon Sep 17 00:00:00 2001 From: Siddique Bagwan Date: Mon, 2 Dec 2024 13:49:08 +0530 Subject: [PATCH 06/11] remove graph=graph --- metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py | 1 - 1 file changed, 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py index 820c219bd63fb..8c61adce9e1ca 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py @@ -1194,7 +1194,6 @@ def create_schema_resolver( platform=platform, platform_instance=platform_instance, env=env, - graph=graph, ) return SchemaResolver( From 66d825d2fbaeabbdeac104803d09ef756833180e Mon Sep 17 00:00:00 2001 From: Siddique Bagwan Date: Mon, 2 Dec 2024 15:13:17 +0530 Subject: [PATCH 07/11] fix test-case --- .../ingestion/source/looker/view_upstream.py | 8 +++---- .../datahub/sql_parsing/sqlglot_lineage.py | 22 ++++++++++++++++--- 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py index 53c3647dfd378..e07a37161fd7b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py @@ -31,9 +31,9 @@ ColumnRef, SqlParsingResult, Urn, + create_and_cache_schema_resolver, create_lineage_sql_parsed_result, - create_schema_resolver, - infer_schema_columns, + infer_upstream_columns, ) logger = logging.getLogger(__name__) @@ -259,7 +259,7 @@ def create_upstream_column_refs( - This function ensures consistency in column-level lineage by consulting GMS before creating the final `ColumnRef` instance, avoiding discrepancies. """ - schema_resolver = create_schema_resolver( + schema_resolver = create_and_cache_schema_resolver( platform=self.view_context.view_connection.platform, platform_instance=self.view_context.view_connection.platform_instance, env=self.view_context.view_connection.platform_env or self.config.env, @@ -269,7 +269,7 @@ def create_upstream_column_refs( urn, schema_info = _get_schema_info(schema_resolver, upstream_urn) if schema_info: - actual_columns = infer_schema_columns(schema_info, expected_columns) + actual_columns = infer_upstream_columns(schema_info, expected_columns) else: logger.info( f"schema_info not found for dataset {urn} in GMS. Using expected_columns to form ColumnRef" diff --git a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py index 8c61adce9e1ca..2af1466818e9a 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py @@ -1182,6 +1182,22 @@ def sqlglot_lineage( @functools.lru_cache(maxsize=128) +def create_and_cache_schema_resolver( + platform: str, + env: str, + graph: Optional[DataHubGraph] = None, + platform_instance: Optional[str] = None, + schema_aware: bool = True, +) -> SchemaResolver: + return create_schema_resolver( + platform=platform, + env=env, + graph=graph, + platform_instance=platform_instance, + schema_aware=schema_aware, + ) + + def create_schema_resolver( platform: str, env: str, @@ -1222,9 +1238,9 @@ def create_lineage_sql_parsed_result( graph=graph, ) - needs_close: bool = False + needs_close: bool = True if graph and schema_aware: - needs_close = True + needs_close = False try: return sqlglot_lineage( @@ -1259,7 +1275,7 @@ def infer_output_schema(result: SqlParsingResult) -> Optional[List[SchemaFieldCl return output_schema -def infer_schema_columns( +def infer_upstream_columns( schema_info: SchemaInfo, expected_columns: List[str] ) -> List[str]: column_from_gms: List[str] = list(schema_info.keys()) # list() to silent lint From 8f43c7aad18dd21aa9621e4f4d1f670895fb391c Mon Sep 17 00:00:00 2001 From: Siddique Bagwan Date: Fri, 6 Dec 2024 17:59:39 +0530 Subject: [PATCH 08/11] address review comments --- .../ingestion/source/looker/view_upstream.py | 21 +++++-------------- .../datahub/sql_parsing/sqlglot_lineage.py | 10 ++++----- .../tests/integration/lookml/test_lookml.py | 2 +- 3 files changed, 11 insertions(+), 22 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py index e07a37161fd7b..2dca3cc39cf74 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py @@ -2,7 +2,7 @@ import re from abc import ABC, abstractmethod from functools import lru_cache -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance from datahub.ingestion.api.common import PipelineContext @@ -25,7 +25,6 @@ LookMLSourceReport, ) from datahub.ingestion.source.looker.urn_functions import get_qualified_table_name -from datahub.sql_parsing.schema_resolver import SchemaInfo, SchemaResolver from datahub.sql_parsing.sqlglot_lineage import ( ColumnLineageInfo, ColumnRef, @@ -33,7 +32,7 @@ Urn, create_and_cache_schema_resolver, create_lineage_sql_parsed_result, - infer_upstream_columns, + match_columns_to_schema, ) logger = logging.getLogger(__name__) @@ -200,16 +199,6 @@ def _generate_fully_qualified_name( return sql_table_name.lower() -@lru_cache(maxsize=128) -def _get_schema_info( - schema_resolver: SchemaResolver, dataset_urn: str -) -> Tuple[str, Optional[SchemaInfo]]: - """ - For each field of lookml view, this function is getting called and hence added lru_cache - """ - return schema_resolver.resolve_urn(urn=dataset_urn) - - class AbstractViewUpstream(ABC): """ Implementation of this interface extracts the view upstream as per the way the view is bound to datasets. @@ -266,15 +255,15 @@ def create_upstream_column_refs( graph=self.ctx.graph, ) - urn, schema_info = _get_schema_info(schema_resolver, upstream_urn) + urn, schema_info = schema_resolver.resolve_urn(urn=upstream_urn) if schema_info: - actual_columns = infer_upstream_columns(schema_info, expected_columns) + actual_columns = match_columns_to_schema(schema_info, expected_columns) else: logger.info( f"schema_info not found for dataset {urn} in GMS. Using expected_columns to form ColumnRef" ) - actual_columns = expected_columns + actual_columns = [column.lower() for column in expected_columns] upstream_column_refs: List[ColumnRef] = [] diff --git a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py index 2af1466818e9a..8fbd1f1dd3c0d 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py @@ -1275,18 +1275,18 @@ def infer_output_schema(result: SqlParsingResult) -> Optional[List[SchemaFieldCl return output_schema -def infer_upstream_columns( - schema_info: SchemaInfo, expected_columns: List[str] +def match_columns_to_schema( + schema_info: SchemaInfo, input_columns: List[str] ) -> List[str]: column_from_gms: List[str] = list(schema_info.keys()) # list() to silent lint - actual_columns: List[str] = [ + output_columns: List[str] = [ column for column in column_from_gms - if column.lower() in map(str.lower, expected_columns) + if column.lower() in map(str.lower, input_columns) ] - return actual_columns + return output_columns def view_definition_lineage_helper( diff --git a/metadata-ingestion/tests/integration/lookml/test_lookml.py b/metadata-ingestion/tests/integration/lookml/test_lookml.py index 7e3ac22741303..1e3213e108336 100644 --- a/metadata-ingestion/tests/integration/lookml/test_lookml.py +++ b/metadata-ingestion/tests/integration/lookml/test_lookml.py @@ -1031,7 +1031,7 @@ def test_gms_schema_resolution(pytestconfig, tmp_path, mock_time): } return_value: Tuple[str, Optional[SchemaInfo]] = ( - "urn:li:dataset:(urn:li:dataPlatform:dbt, db.public.employee, PROD)", + "fake_dataset_urn", { "Id": "String", "Name": "String", From bdfbbe4c9afdbd2eb0c6c82ce88f92eae736c5e0 Mon Sep 17 00:00:00 2001 From: Siddique Bagwan Date: Mon, 9 Dec 2024 17:09:07 +0530 Subject: [PATCH 09/11] address review comments --- .../ingestion/source/looker/view_upstream.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py index 2dca3cc39cf74..0bc9d94ceae2a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py @@ -239,7 +239,7 @@ def create_fields(self) -> List[ViewField]: return [] # it is for the special case def create_upstream_column_refs( - self, upstream_urn: str, expected_columns: List[str] + self, upstream_urn: str, downstream_looker_columns: List[str] ) -> List[ColumnRef]: """ - **`upstream_urn`**: The URN of the upstream dataset. @@ -258,12 +258,14 @@ def create_upstream_column_refs( urn, schema_info = schema_resolver.resolve_urn(urn=upstream_urn) if schema_info: - actual_columns = match_columns_to_schema(schema_info, expected_columns) + actual_columns = match_columns_to_schema( + schema_info, downstream_looker_columns + ) else: logger.info( f"schema_info not found for dataset {urn} in GMS. Using expected_columns to form ColumnRef" ) - actual_columns = [column.lower() for column in expected_columns] + actual_columns = [column.lower() for column in downstream_looker_columns] upstream_column_refs: List[ColumnRef] = [] @@ -417,7 +419,7 @@ def get_upstream_column_ref( upstream_urn=self._get_upstream_dataset_urn()[ 0 ], # 0th index has table of from clause, - expected_columns=field_context.column_name_in_sql_attribute(), + downstream_looker_columns=field_context.column_name_in_sql_attribute(), ) # fix any derived view reference present in urn @@ -535,7 +537,7 @@ def get_upstream_column_ref( ) return self.create_upstream_column_refs( - upstream_urn=explore_urn, expected_columns=expected_columns + upstream_urn=explore_urn, downstream_looker_columns=expected_columns ) def get_upstream_dataset_urn(self) -> List[Urn]: @@ -588,7 +590,7 @@ def get_upstream_column_ref( ) -> List[ColumnRef]: return self.create_upstream_column_refs( upstream_urn=self._get_upstream_dataset_urn(), - expected_columns=field_context.column_name_in_sql_attribute(), + downstream_looker_columns=field_context.column_name_in_sql_attribute(), ) def get_upstream_dataset_urn(self) -> List[Urn]: @@ -649,7 +651,7 @@ def get_upstream_column_ref( return self.create_upstream_column_refs( upstream_urn=self._get_upstream_dataset_urn()[0], - expected_columns=field_context.column_name_in_sql_attribute(), + downstream_looker_columns=field_context.column_name_in_sql_attribute(), ) def get_upstream_dataset_urn(self) -> List[Urn]: From bc0f11903321d07ff2403ddd7378f8f2ed3597a4 Mon Sep 17 00:00:00 2001 From: Siddique Bagwan Date: Tue, 10 Dec 2024 17:08:31 +0530 Subject: [PATCH 10/11] address review comments --- .../ingestion/source/looker/view_upstream.py | 2 +- .../datahub/sql_parsing/schema_resolver.py | 16 ++++ .../datahub/sql_parsing/sqlglot_lineage.py | 14 ---- .../unit/sql_parsing/test_schemaresolver.py | 76 +++++++++++++++++-- 4 files changed, 87 insertions(+), 21 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py index 0bc9d94ceae2a..971181e4300d6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py @@ -25,6 +25,7 @@ LookMLSourceReport, ) from datahub.ingestion.source.looker.urn_functions import get_qualified_table_name +from datahub.sql_parsing.schema_resolver import match_columns_to_schema from datahub.sql_parsing.sqlglot_lineage import ( ColumnLineageInfo, ColumnRef, @@ -32,7 +33,6 @@ Urn, create_and_cache_schema_resolver, create_lineage_sql_parsed_result, - match_columns_to_schema, ) logger = logging.getLogger(__name__) diff --git a/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py b/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py index c1f70fc9e34a8..6aa10381a883e 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py +++ b/metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py @@ -300,3 +300,19 @@ def _convert_schema_field_list_to_info( def _convert_schema_aspect_to_info(schema_metadata: SchemaMetadataClass) -> SchemaInfo: return _convert_schema_field_list_to_info(schema_metadata.fields) + + +def match_columns_to_schema( + schema_info: SchemaInfo, input_columns: List[str] +) -> List[str]: + column_from_gms: List[str] = list(schema_info.keys()) # list() to silent lint + + gms_column_map: Dict[str, str] = { + column.lower(): column for column in column_from_gms + } + + output_columns: List[str] = [ + gms_column_map.get(column.lower(), column) for column in input_columns + ] + + return output_columns diff --git a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py index a109dddd2fab2..f387618bfaec1 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py @@ -1275,20 +1275,6 @@ def infer_output_schema(result: SqlParsingResult) -> Optional[List[SchemaFieldCl return output_schema -def match_columns_to_schema( - schema_info: SchemaInfo, input_columns: List[str] -) -> List[str]: - column_from_gms: List[str] = list(schema_info.keys()) # list() to silent lint - - output_columns: List[str] = [ - column - for column in column_from_gms - if column.lower() in map(str.lower, input_columns) - ] - - return output_columns - - def view_definition_lineage_helper( result: SqlParsingResult, view_urn: str ) -> SqlParsingResult: diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_schemaresolver.py b/metadata-ingestion/tests/unit/sql_parsing/test_schemaresolver.py index e5fa980bec452..dc09cf54aab92 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/test_schemaresolver.py +++ b/metadata-ingestion/tests/unit/sql_parsing/test_schemaresolver.py @@ -1,7 +1,12 @@ -from datahub.sql_parsing.schema_resolver import SchemaResolver, _TableName +from datahub.sql_parsing.schema_resolver import ( + SchemaInfo, + SchemaResolver, + _TableName, + match_columns_to_schema, +) -def test_basic_schema_resolver(): +def create_default_schema_resolver(urn: str) -> SchemaResolver: schema_resolver = SchemaResolver( platform="redshift", env="PROD", @@ -9,18 +14,51 @@ def test_basic_schema_resolver(): ) schema_resolver.add_raw_schema_info( - urn="urn:li:dataset:(urn:li:dataPlatform:redshift,my_db.public.test_table,PROD)", + urn=urn, schema_info={"name": "STRING"}, ) + return schema_resolver + + +def test_basic_schema_resolver(): + input_urn = ( + "urn:li:dataset:(urn:li:dataPlatform:redshift,my_db.public.test_table,PROD)" + ) + + schema_resolver = create_default_schema_resolver(urn=input_urn) + urn, schema = schema_resolver.resolve_table( _TableName(database="my_db", db_schema="public", table="test_table") ) - assert ( - urn - == "urn:li:dataset:(urn:li:dataPlatform:redshift,my_db.public.test_table,PROD)" + + assert urn == input_urn + + assert schema + + assert schema["name"] + + assert schema_resolver.schema_count() == 1 + + +def test_resolve_urn(): + input_urn: str = ( + "urn:li:dataset:(urn:li:dataPlatform:redshift,my_db.public.test_table,PROD)" + ) + + schema_resolver = create_default_schema_resolver(urn=input_urn) + + schema_resolver.add_raw_schema_info( + urn=input_urn, + schema_info={"name": "STRING"}, ) + + urn, schema = schema_resolver.resolve_urn(urn=input_urn) + + assert urn == input_urn + assert schema + assert schema["name"] assert schema_resolver.schema_count() == 1 @@ -62,3 +100,29 @@ def test_get_urn_for_table_not_lower_should_keep_capital_letters(): == "urn:li:dataset:(urn:li:dataPlatform:mssql,Uppercased-Instance.Database.DataSet.Table,PROD)" ) assert schema_resolver.schema_count() == 0 + + +def test_match_columns_to_schema(): + input_urn = ( + "urn:li:dataset:(urn:li:dataPlatform:redshift,my_db.public.test_table,PROD)" + ) + + schema_resolver = create_default_schema_resolver(urn=input_urn) + + schema_resolver.add_raw_schema_info( + urn=input_urn, + schema_info={"address": "STRING"}, + ) + + schema_resolver.add_raw_schema_info( + urn=input_urn, + schema_info={"Id": "STRING"}, + ) + + schema_info: SchemaInfo = {"id": "string", "Name": "string", "Address": "string"} + + output_columns = match_columns_to_schema( + schema_info, input_columns=["Id", "name", "address", "weight"] + ) + + assert output_columns == ["id", "Name", "Address", "weight"] From 72169ee8ac4bde107ea1a217001fa1446f684956 Mon Sep 17 00:00:00 2001 From: Siddique Bagwan Date: Wed, 11 Dec 2024 22:23:20 +0530 Subject: [PATCH 11/11] address review comments --- .../unit/sql_parsing/test_schemaresolver.py | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_schemaresolver.py b/metadata-ingestion/tests/unit/sql_parsing/test_schemaresolver.py index dc09cf54aab92..67222531d3bc1 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/test_schemaresolver.py +++ b/metadata-ingestion/tests/unit/sql_parsing/test_schemaresolver.py @@ -103,22 +103,6 @@ def test_get_urn_for_table_not_lower_should_keep_capital_letters(): def test_match_columns_to_schema(): - input_urn = ( - "urn:li:dataset:(urn:li:dataPlatform:redshift,my_db.public.test_table,PROD)" - ) - - schema_resolver = create_default_schema_resolver(urn=input_urn) - - schema_resolver.add_raw_schema_info( - urn=input_urn, - schema_info={"address": "STRING"}, - ) - - schema_resolver.add_raw_schema_info( - urn=input_urn, - schema_info={"Id": "STRING"}, - ) - schema_info: SchemaInfo = {"id": "string", "Name": "string", "Address": "string"} output_columns = match_columns_to_schema(