From 383a3a21380e91c95162b49b5a8fccfa5bf3ce72 Mon Sep 17 00:00:00 2001 From: Ernest Hill Date: Mon, 30 Dec 2024 09:48:49 +0300 Subject: [PATCH 01/11] Add logic for open telementry logging. --- pyatlan/pkg/utils.py | 91 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 90 insertions(+), 1 deletion(-) diff --git a/pyatlan/pkg/utils.py b/pyatlan/pkg/utils.py index 4f8b16a2d..b8f34e417 100644 --- a/pyatlan/pkg/utils.py +++ b/pyatlan/pkg/utils.py @@ -3,7 +3,7 @@ import json import logging import os -from typing import Dict, List +from typing import Any, Dict, List, Mapping, Optional, Sequence, Union from pydantic.v1 import parse_obj_as, parse_raw_as @@ -12,6 +12,56 @@ LOGGER = logging.getLogger(__name__) +# Try to import OpenTelemetry libraries +try: + from opentelemetry.exporter.otlp.proto.grpc._log_exporter import ( + OTLPLogExporter, # type:ignore + ) + from opentelemetry.sdk._logs import ( # type:ignore + LogData, + LoggerProvider, + LoggingHandler, + ) + from opentelemetry.sdk._logs._internal.export import ( + BatchLogRecordProcessor, # type:ignore + ) + from opentelemetry.sdk.resources import Resource # type:ignore + + class CustomBatchLogRecordProcessor(BatchLogRecordProcessor): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def emit(self, log_data: LogData) -> None: + if not self._is_valid_type(log_data.log_record.body): + log_data.log_record.body = str(log_data.log_record.body) + super().emit(log_data) + + def _is_valid_type(self, value: Any) -> bool: + # see https://github.com/open-telemetry/opentelemetry-python/blob/c883f6cc1243ab7e0e5bc177169f25cdf0aac29f/ + # exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal + # /__init__.py#L69 + # for valid encode types + if isinstance(value, bool): + return True + if isinstance(value, str): + return True + if isinstance(value, int): + return True + if isinstance(value, float): + return True + if isinstance(value, Sequence): + return all(self._is_valid_type(v) for v in value) + elif isinstance(value, Mapping): + return all( + self._is_valid_type(k) & self._is_valid_type(v) + for k, v in value.items() + ) + return False + + OTEL_IMPORTS_AVAILABLE = True +except ImportError: + OTEL_IMPORTS_AVAILABLE = False + def get_client(impersonate_user_id: str) -> AtlanClient: """ @@ -117,3 +167,42 @@ def validate_connector_and_connection(v): from pyatlan.pkg.models import ConnectorAndConnection return parse_raw_as(ConnectorAndConnection, v) + + +def has_handler(logger: logging.Logger, handler_class) -> bool: + c: Optional[logging.Logger] = logger + while c: + for hdlr in c.handlers: + if isinstance(hdlr, handler_class): + return True + c = c.parent if c.propagate else None + return False + + +def add_otel_handler( + logger: logging.Logger, level: Union[int, str], resource: dict +) -> None: + """ + Adds an OpenTelemetry handler to the logger if not already present. + + Args: + logger (logging.Logger): The logger to which the handler will be added. + level (int | str): The logging level. + resource (dict): A dictionary of resource attributes to be associated with the logger. + """ + if OTEL_IMPORTS_AVAILABLE and not has_handler(logger, LoggingHandler): + workflow_node_name = os.getenv("OTEL_WF_NODE_NAME", "") + if workflow_node_name: + resource["k8s.workflow.node.name"] = workflow_node_name + logger_provider = LoggerProvider(Resource.create(resource)) + otel_log_exporter = OTLPLogExporter( + endpoint=os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT"), insecure=True + ) + logger_provider.add_log_record_processor( + CustomBatchLogRecordProcessor(otel_log_exporter) + ) + + otel_handler = LoggingHandler(level=level, logger_provider=logger_provider) + otel_handler.setLevel(level) + logger.addHandler(otel_handler) + logger.info("OpenTelemetry handler added to the logger.") From a8cd15da5ddeb31745c7c7dbad54509769d63170 Mon Sep 17 00:00:00 2001 From: Ernest Hill Date: Mon, 30 Dec 2024 12:18:28 +0300 Subject: [PATCH 02/11] Fix broken test --- pyatlan/pkg/utils.py | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/pyatlan/pkg/utils.py b/pyatlan/pkg/utils.py index b8f34e417..318605756 100644 --- a/pyatlan/pkg/utils.py +++ b/pyatlan/pkg/utils.py @@ -14,16 +14,16 @@ # Try to import OpenTelemetry libraries try: - from opentelemetry.exporter.otlp.proto.grpc._log_exporter import ( - OTLPLogExporter, # type:ignore + from opentelemetry.exporter.otlp.proto.grpc._log_exporter import ( # type:ignore + OTLPLogExporter, ) from opentelemetry.sdk._logs import ( # type:ignore LogData, LoggerProvider, LoggingHandler, ) - from opentelemetry.sdk._logs._internal.export import ( - BatchLogRecordProcessor, # type:ignore + from opentelemetry.sdk._logs._internal.export import ( # type:ignore + BatchLogRecordProcessor, ) from opentelemetry.sdk.resources import Resource # type:ignore @@ -170,6 +170,18 @@ def validate_connector_and_connection(v): def has_handler(logger: logging.Logger, handler_class) -> bool: + """ + Checks if a logger or its ancestor has a handler of a specific class. The function + iterates through the logger's handlers and optionally ascends the logger hierarchy, + checking each logger's handlers for an instance of the specified handler class. + + Args: + logger (logging.Logger): The logger to inspect for the handler. + handler_class: The class of the handler to look for. + + Returns: + bool: True if the handler of the specified class is found, False otherwise. + """ c: Optional[logging.Logger] = logger while c: for hdlr in c.handlers: From 483566b7dbc9ed6d3693f18a781d008da81473f2 Mon Sep 17 00:00:00 2001 From: Ernest Hill Date: Tue, 31 Dec 2024 09:01:14 +0300 Subject: [PATCH 03/11] Improve code quality --- pyatlan/pkg/utils.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pyatlan/pkg/utils.py b/pyatlan/pkg/utils.py index 318605756..7cbe662a3 100644 --- a/pyatlan/pkg/utils.py +++ b/pyatlan/pkg/utils.py @@ -203,8 +203,7 @@ def add_otel_handler( resource (dict): A dictionary of resource attributes to be associated with the logger. """ if OTEL_IMPORTS_AVAILABLE and not has_handler(logger, LoggingHandler): - workflow_node_name = os.getenv("OTEL_WF_NODE_NAME", "") - if workflow_node_name: + if workflow_node_name := os.getenv("OTEL_WF_NODE_NAME", ""): resource["k8s.workflow.node.name"] = workflow_node_name logger_provider = LoggerProvider(Resource.create(resource)) otel_log_exporter = OTLPLogExporter( From 82655307a19ab3c473d494eac67b79b64ca121b2 Mon Sep 17 00:00:00 2001 From: Ernest Hill Date: Tue, 31 Dec 2024 12:47:06 +0300 Subject: [PATCH 04/11] Add formatter to otel handler. --- pyatlan/pkg/utils.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pyatlan/pkg/utils.py b/pyatlan/pkg/utils.py index 7cbe662a3..96c8d057e 100644 --- a/pyatlan/pkg/utils.py +++ b/pyatlan/pkg/utils.py @@ -215,5 +215,9 @@ def add_otel_handler( otel_handler = LoggingHandler(level=level, logger_provider=logger_provider) otel_handler.setLevel(level) + formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) + otel_handler.setFormatter(formatter) logger.addHandler(otel_handler) logger.info("OpenTelemetry handler added to the logger.") From 36818018f9be4808c3c2b973da0a205c61efc6ee Mon Sep 17 00:00:00 2001 From: Ernest Hill Date: Tue, 31 Dec 2024 12:54:04 +0300 Subject: [PATCH 05/11] Add formatter to otel handler. --- pyatlan/pkg/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyatlan/pkg/utils.py b/pyatlan/pkg/utils.py index 96c8d057e..c9e7484a3 100644 --- a/pyatlan/pkg/utils.py +++ b/pyatlan/pkg/utils.py @@ -220,4 +220,4 @@ def add_otel_handler( ) otel_handler.setFormatter(formatter) logger.addHandler(otel_handler) - logger.info("OpenTelemetry handler added to the logger.") + logger.info("OpenTelemetry handler with formatter added to the logger.") From e0d4e869d8c94ef070449bd77b11851e0077abca Mon Sep 17 00:00:00 2001 From: Ernest Hill Date: Tue, 31 Dec 2024 15:11:51 +0300 Subject: [PATCH 06/11] Change add_otel_handler to return Optional[logging.Handler]. --- pyatlan/pkg/utils.py | 30 +++++++++++++++++++++++------- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/pyatlan/pkg/utils.py b/pyatlan/pkg/utils.py index c9e7484a3..027be6e2c 100644 --- a/pyatlan/pkg/utils.py +++ b/pyatlan/pkg/utils.py @@ -193,14 +193,28 @@ def has_handler(logger: logging.Logger, handler_class) -> bool: def add_otel_handler( logger: logging.Logger, level: Union[int, str], resource: dict -) -> None: - """ - Adds an OpenTelemetry handler to the logger if not already present. +) -> Optional[logging.Handler]: + """ + Adds an OpenTelemetry logging handler to the provided logger if the necessary + OpenTelemetry imports are available and the handler is not already present. + This function uses the provided logging level and resource configuration for + setting up the OpenTelemetry handler. The handler is set up with a custom + formatter for log messages. This function also makes use of workflow-specific + environment variables to enrich the resource data with workflow node + information if available. + + Parameters: + logger (logging.Logger): The logger instance to which the OpenTelemetry + handler will be added. + level (Union[int, str]): The logging level to be set for the OpenTelemetry + handler, such as logging.INFO or logging.DEBUG. + resource (dict): A dictionary representing the OpenTelemetry resource + configuration. Additional resource attributes may be dynamically added + inside the function. - Args: - logger (logging.Logger): The logger to which the handler will be added. - level (int | str): The logging level. - resource (dict): A dictionary of resource attributes to be associated with the logger. + Returns: + Optional[logging.Logger]: The created OpenTelemetry handler if successfully + added; otherwise, None. """ if OTEL_IMPORTS_AVAILABLE and not has_handler(logger, LoggingHandler): if workflow_node_name := os.getenv("OTEL_WF_NODE_NAME", ""): @@ -221,3 +235,5 @@ def add_otel_handler( otel_handler.setFormatter(formatter) logger.addHandler(otel_handler) logger.info("OpenTelemetry handler with formatter added to the logger.") + return otel_handler + return None From 995f171c4c96934ce297242049934af4e22ef376 Mon Sep 17 00:00:00 2001 From: Ernest Hill Date: Tue, 31 Dec 2024 15:40:41 +0300 Subject: [PATCH 07/11] Add additional redaction of logs --- pyatlan/utils.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/pyatlan/utils.py b/pyatlan/utils.py index 3411b6881..8ab206bb4 100644 --- a/pyatlan/utils.py +++ b/pyatlan/utils.py @@ -337,12 +337,11 @@ class AuthorizationFilter(logging.Filter): def filter(self, record: logging.LogRecord) -> bool: if record.args and hasattr(record.args, "__iter__"): for arg in record.args: - if ( - isinstance(arg, dict) - and "headers" in arg - and "authorization" in arg["headers"] - ): - arg["headers"]["authorization"] = "***REDACTED***" + if isinstance(arg, dict): + if "headers" in arg and "authorization" in arg["headers"]: + arg["headers"]["authorization"] = "***REDACTED***" + elif "access_token" in arg: + arg["access_token"] = "***REDACTED***" # noqa: S105 return True From 2c4811d7a609e68c6ca0958d65a6a514985caa4e Mon Sep 17 00:00:00 2001 From: Ernest Hill Date: Tue, 31 Dec 2024 16:04:40 +0300 Subject: [PATCH 08/11] Modified logging --- pyatlan/client/atlan.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyatlan/client/atlan.py b/pyatlan/client/atlan.py index 53ba49649..99a83a31e 100644 --- a/pyatlan/client/atlan.py +++ b/pyatlan/client/atlan.py @@ -403,7 +403,7 @@ def _call_api_internal( response_ = response.text else: response_ = events if events else response.json() - LOGGER.debug(response_) + LOGGER.debug("response: %s", response_) return response_ except ( requests.exceptions.JSONDecodeError, From 63639e9384b3188a776b48e8f604405ff66d1aee Mon Sep 17 00:00:00 2001 From: Ernest Hill Date: Tue, 31 Dec 2024 17:05:34 +0300 Subject: [PATCH 09/11] Change log redaction logic. --- pyatlan/utils.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/pyatlan/utils.py b/pyatlan/utils.py index 8ab206bb4..0081fba14 100644 --- a/pyatlan/utils.py +++ b/pyatlan/utils.py @@ -335,13 +335,16 @@ class AuthorizationFilter(logging.Filter): """ def filter(self, record: logging.LogRecord) -> bool: - if record.args and hasattr(record.args, "__iter__"): + if isinstance(record.args, dict) and "access_token" in record.args: + record.args["access_token"] = "***REDACTED***" # noqa: S105 + elif record.args and hasattr(record.args, "__iter__"): for arg in record.args: - if isinstance(arg, dict): - if "headers" in arg and "authorization" in arg["headers"]: - arg["headers"]["authorization"] = "***REDACTED***" - elif "access_token" in arg: - arg["access_token"] = "***REDACTED***" # noqa: S105 + if ( + isinstance(arg, dict) + and "headers" in arg + and "authorization" in arg["headers"] + ): + arg["headers"]["authorization"] = "***REDACTED***" return True From a099053b414fd8bc49eeeed41a59610f135ad586 Mon Sep 17 00:00:00 2001 From: Aryamanz29 Date: Tue, 31 Dec 2024 19:37:03 +0530 Subject: [PATCH 10/11] [test/fix] Removed hardcoded column qualifiedName in `test_search_source_specific_custom_attributes` --- tests/integration/test_index_search.py | 29 +++++++++++++++++++------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/tests/integration/test_index_search.py b/tests/integration/test_index_search.py index 2aa9d7b05..6cad614e1 100644 --- a/tests/integration/test_index_search.py +++ b/tests/integration/test_index_search.py @@ -37,10 +37,10 @@ NOW_AS_YYYY_MM_DD = datetime.today().strftime("%Y-%m-%d") EXISTING_TAG = "Issue" EXISTING_SOURCE_SYNCED_TAG = "Confidential" -COLUMN_WITH_CUSTOM_ATTRIBUTES = ( - "default/snowflake/1733440936/ANALYTICS" - "/WIDE_WORLD_IMPORTERS/STG_STATE_PROVINCES/LATEST_RECORDED_POPULATION" -) +DB_NAME = "ANALYTICS" +TABLE_NAME = "STG_STATE_PROVINCES" +COLUMN_NAME = "LATEST_RECORDED_POPULATION" +SCHEMA_NAME = "WIDE_WORLD_IMPORTERS" VALUES_FOR_TERM_QUERIES = { "with_categories": "VBsYc9dUoEcAtDxZmjby6@mweSfpXBwfYWedQTvA3Gi", @@ -94,6 +94,18 @@ } +@pytest.fixture(scope="module") +def snowflake_conn(client: AtlanClient): + return client.asset.find_connections_by_name( + "development", AtlanConnectorType.SNOWFLAKE + )[0] + + +@pytest.fixture(scope="module") +def snowflake_column_qn(snowflake_conn): + return f"{snowflake_conn.qualified_name}/{DB_NAME}/{SCHEMA_NAME}/{TABLE_NAME}/{COLUMN_NAME}" + + @dataclass() class AssetTracker: missing_types: Set[str] = field(default_factory=set) @@ -247,12 +259,13 @@ def test_source_tag_assign_with_value(client: AtlanClient, table: Table): _assert_source_tag(tables, EXISTING_SOURCE_SYNCED_TAG, "Not Restricted") -def test_search_source_specific_custom_attributes(client: AtlanClient): +def test_search_source_specific_custom_attributes( + client: AtlanClient, snowflake_column_qn: str +): # Test with get_by_qualified_name() - column_qn = COLUMN_WITH_CUSTOM_ATTRIBUTES asset = client.asset.get_by_qualified_name( asset_type=Column, - qualified_name=column_qn, + qualified_name=snowflake_column_qn, min_ext_info=True, ignore_relationships=True, ) @@ -262,7 +275,7 @@ def test_search_source_specific_custom_attributes(client: AtlanClient): results = ( FluentSearch() .where(CompoundQuery.active_assets()) - .where(Column.QUALIFIED_NAME.eq(column_qn)) + .where(Column.QUALIFIED_NAME.eq(snowflake_column_qn)) .include_on_results(Column.CUSTOM_ATTRIBUTES) .execute(client=client) ) From 3b26184c2d6895822cc18b2bff08cfb0b5deeb77 Mon Sep 17 00:00:00 2001 From: Ernest Hill Date: Tue, 31 Dec 2024 17:19:47 +0300 Subject: [PATCH 11/11] Change log redaction logic. --- pyatlan/utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pyatlan/utils.py b/pyatlan/utils.py index 0081fba14..a2217bb1f 100644 --- a/pyatlan/utils.py +++ b/pyatlan/utils.py @@ -336,6 +336,7 @@ class AuthorizationFilter(logging.Filter): def filter(self, record: logging.LogRecord) -> bool: if isinstance(record.args, dict) and "access_token" in record.args: + record.args = record.args.copy() record.args["access_token"] = "***REDACTED***" # noqa: S105 elif record.args and hasattr(record.args, "__iter__"): for arg in record.args: