Skip to content

Commit

Permalink
Merge pull request #459 from atlanhq/ft-864
Browse files Browse the repository at this point in the history
FT-864 Add logic for open telementry logging.
  • Loading branch information
ErnestoLoma authored Dec 31, 2024
2 parents be896e9 + 47aff52 commit 730f227
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 11 deletions.
2 changes: 1 addition & 1 deletion pyatlan/client/atlan.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ def _call_api_internal(
response_ = response.text
else:
response_ = events if events else response.json()
LOGGER.debug(response_)
LOGGER.debug("response: %s", response_)
return response_
except (
requests.exceptions.JSONDecodeError,
Expand Down
122 changes: 121 additions & 1 deletion pyatlan/pkg/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import json
import logging
import os
from typing import Dict, List
from typing import Any, Dict, List, Mapping, Optional, Sequence, Union

from pydantic.v1 import parse_obj_as, parse_raw_as

Expand All @@ -12,6 +12,56 @@

LOGGER = logging.getLogger(__name__)

# Try to import OpenTelemetry libraries
try:
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import ( # type:ignore
OTLPLogExporter,
)
from opentelemetry.sdk._logs import ( # type:ignore
LogData,
LoggerProvider,
LoggingHandler,
)
from opentelemetry.sdk._logs._internal.export import ( # type:ignore
BatchLogRecordProcessor,
)
from opentelemetry.sdk.resources import Resource # type:ignore

class CustomBatchLogRecordProcessor(BatchLogRecordProcessor):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def emit(self, log_data: LogData) -> None:
if not self._is_valid_type(log_data.log_record.body):
log_data.log_record.body = str(log_data.log_record.body)
super().emit(log_data)

def _is_valid_type(self, value: Any) -> bool:
# see https://github.com/open-telemetry/opentelemetry-python/blob/c883f6cc1243ab7e0e5bc177169f25cdf0aac29f/
# exporter/opentelemetry-exporter-otlp-proto-common/src/opentelemetry/exporter/otlp/proto/common/_internal
# /__init__.py#L69
# for valid encode types
if isinstance(value, bool):
return True
if isinstance(value, str):
return True
if isinstance(value, int):
return True
if isinstance(value, float):
return True
if isinstance(value, Sequence):
return all(self._is_valid_type(v) for v in value)
elif isinstance(value, Mapping):
return all(
self._is_valid_type(k) & self._is_valid_type(v)
for k, v in value.items()
)
return False

OTEL_IMPORTS_AVAILABLE = True
except ImportError:
OTEL_IMPORTS_AVAILABLE = False


def get_client(impersonate_user_id: str) -> AtlanClient:
"""
Expand Down Expand Up @@ -117,3 +167,73 @@ def validate_connector_and_connection(v):
from pyatlan.pkg.models import ConnectorAndConnection

return parse_raw_as(ConnectorAndConnection, v)


def has_handler(logger: logging.Logger, handler_class) -> bool:
"""
Checks if a logger or its ancestor has a handler of a specific class. The function
iterates through the logger's handlers and optionally ascends the logger hierarchy,
checking each logger's handlers for an instance of the specified handler class.
Args:
logger (logging.Logger): The logger to inspect for the handler.
handler_class: The class of the handler to look for.
Returns:
bool: True if the handler of the specified class is found, False otherwise.
"""
c: Optional[logging.Logger] = logger
while c:
for hdlr in c.handlers:
if isinstance(hdlr, handler_class):
return True
c = c.parent if c.propagate else None
return False


def add_otel_handler(
logger: logging.Logger, level: Union[int, str], resource: dict
) -> Optional[logging.Handler]:
"""
Adds an OpenTelemetry logging handler to the provided logger if the necessary
OpenTelemetry imports are available and the handler is not already present.
This function uses the provided logging level and resource configuration for
setting up the OpenTelemetry handler. The handler is set up with a custom
formatter for log messages. This function also makes use of workflow-specific
environment variables to enrich the resource data with workflow node
information if available.
Parameters:
logger (logging.Logger): The logger instance to which the OpenTelemetry
handler will be added.
level (Union[int, str]): The logging level to be set for the OpenTelemetry
handler, such as logging.INFO or logging.DEBUG.
resource (dict): A dictionary representing the OpenTelemetry resource
configuration. Additional resource attributes may be dynamically added
inside the function.
Returns:
Optional[logging.Logger]: The created OpenTelemetry handler if successfully
added; otherwise, None.
"""
if OTEL_IMPORTS_AVAILABLE and not has_handler(logger, LoggingHandler):
if workflow_node_name := os.getenv("OTEL_WF_NODE_NAME", ""):
resource["k8s.workflow.node.name"] = workflow_node_name
logger_provider = LoggerProvider(Resource.create(resource))
otel_log_exporter = OTLPLogExporter(
endpoint=os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT"), insecure=True
)
logger_provider.add_log_record_processor(
CustomBatchLogRecordProcessor(otel_log_exporter)
)

otel_handler = LoggingHandler(level=level, logger_provider=logger_provider)
otel_handler.setLevel(level)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
otel_handler.setFormatter(formatter)
logger.addHandler(otel_handler)
logger.info("OpenTelemetry handler with formatter added to the logger.")
return otel_handler
return None
5 changes: 4 additions & 1 deletion pyatlan/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,10 @@ class AuthorizationFilter(logging.Filter):
"""

def filter(self, record: logging.LogRecord) -> bool:
if record.args and hasattr(record.args, "__iter__"):
if isinstance(record.args, dict) and "access_token" in record.args:
record.args = record.args.copy()
record.args["access_token"] = "***REDACTED***" # noqa: S105
elif record.args and hasattr(record.args, "__iter__"):
for arg in record.args:
if (
isinstance(arg, dict)
Expand Down
29 changes: 21 additions & 8 deletions tests/integration/test_index_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@
NOW_AS_YYYY_MM_DD = datetime.today().strftime("%Y-%m-%d")
EXISTING_TAG = "Issue"
EXISTING_SOURCE_SYNCED_TAG = "Confidential"
COLUMN_WITH_CUSTOM_ATTRIBUTES = (
"default/snowflake/1733440936/ANALYTICS"
"/WIDE_WORLD_IMPORTERS/STG_STATE_PROVINCES/LATEST_RECORDED_POPULATION"
)
DB_NAME = "ANALYTICS"
TABLE_NAME = "STG_STATE_PROVINCES"
COLUMN_NAME = "LATEST_RECORDED_POPULATION"
SCHEMA_NAME = "WIDE_WORLD_IMPORTERS"

VALUES_FOR_TERM_QUERIES = {
"with_categories": "VBsYc9dUoEcAtDxZmjby6@mweSfpXBwfYWedQTvA3Gi",
Expand Down Expand Up @@ -94,6 +94,18 @@
}


@pytest.fixture(scope="module")
def snowflake_conn(client: AtlanClient):
return client.asset.find_connections_by_name(
"development", AtlanConnectorType.SNOWFLAKE
)[0]


@pytest.fixture(scope="module")
def snowflake_column_qn(snowflake_conn):
return f"{snowflake_conn.qualified_name}/{DB_NAME}/{SCHEMA_NAME}/{TABLE_NAME}/{COLUMN_NAME}"


@dataclass()
class AssetTracker:
missing_types: Set[str] = field(default_factory=set)
Expand Down Expand Up @@ -247,12 +259,13 @@ def test_source_tag_assign_with_value(client: AtlanClient, table: Table):
_assert_source_tag(tables, EXISTING_SOURCE_SYNCED_TAG, "Not Restricted")


def test_search_source_specific_custom_attributes(client: AtlanClient):
def test_search_source_specific_custom_attributes(
client: AtlanClient, snowflake_column_qn: str
):
# Test with get_by_qualified_name()
column_qn = COLUMN_WITH_CUSTOM_ATTRIBUTES
asset = client.asset.get_by_qualified_name(
asset_type=Column,
qualified_name=column_qn,
qualified_name=snowflake_column_qn,
min_ext_info=True,
ignore_relationships=True,
)
Expand All @@ -262,7 +275,7 @@ def test_search_source_specific_custom_attributes(client: AtlanClient):
results = (
FluentSearch()
.where(CompoundQuery.active_assets())
.where(Column.QUALIFIED_NAME.eq(column_qn))
.where(Column.QUALIFIED_NAME.eq(snowflake_column_qn))
.include_on_results(Column.CUSTOM_ATTRIBUTES)
.execute(client=client)
)
Expand Down

0 comments on commit 730f227

Please sign in to comment.