Skip to content

Commit

Permalink
feat(ingest/metabase): API key support in Metabase source (#12711)
Browse files Browse the repository at this point in the history
  • Loading branch information
rajatgl17 authored Feb 28, 2025
1 parent 3083dd0 commit c4e5a9e
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 32 deletions.
1 change: 1 addition & 0 deletions metadata-ingestion/docs/sources/metabase/metabase.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ source:
# Credentials
username: user
password: pass
api_key: key

# Options
default_schema: public
Expand Down
86 changes: 54 additions & 32 deletions metadata-ingestion/src/datahub/ingestion/source/metabase.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,19 @@ class MetabaseConfig(DatasetLineageProviderConfigBase, StatefulIngestionConfigBa
default=None,
description="optional URL to use in links (if `connect_uri` is only for ingestion)",
)
username: Optional[str] = Field(default=None, description="Metabase username.")
username: Optional[str] = Field(
default=None,
description="Metabase username, used when an API key is not provided.",
)
password: Optional[pydantic.SecretStr] = Field(
default=None, description="Metabase password."
default=None,
description="Metabase password, used when an API key is not provided.",
)

# https://www.metabase.com/learn/metabase-basics/administration/administration-and-operation/metabase-api#example-get-request
api_key: Optional[pydantic.SecretStr] = Field(
default=None,
description="Metabase API key. If provided, the username and password will be ignored. Recommended method.",
)
# TODO: Check and remove this if no longer needed.
# Config database_alias is removed from sql sources.
Expand Down Expand Up @@ -178,30 +188,40 @@ def __init__(self, ctx: PipelineContext, config: MetabaseConfig):
self.source_config: MetabaseConfig = config

def setup_session(self) -> None:
login_response = requests.post(
f"{self.config.connect_uri}/api/session",
None,
{
"username": self.config.username,
"password": (
self.config.password.get_secret_value()
if self.config.password
else None
),
},
)
self.session = requests.session()
if self.config.api_key:
self.session.headers.update(
{
"x-api-key": self.config.api_key.get_secret_value(),
"Content-Type": "application/json",
"Accept": "*/*",
}
)
else:
# If no API key is provided, generate a session token using username and password.
login_response = requests.post(
f"{self.config.connect_uri}/api/session",
None,
{
"username": self.config.username,
"password": (
self.config.password.get_secret_value()
if self.config.password
else None
),
},
)

login_response.raise_for_status()
self.access_token = login_response.json().get("id", "")
login_response.raise_for_status()
self.access_token = login_response.json().get("id", "")

self.session = requests.session()
self.session.headers.update(
{
"X-Metabase-Session": f"{self.access_token}",
"Content-Type": "application/json",
"Accept": "*/*",
}
)
self.session.headers.update(
{
"X-Metabase-Session": f"{self.access_token}",
"Content-Type": "application/json",
"Accept": "*/*",
}
)

# Test the connection
try:
Expand All @@ -217,15 +237,17 @@ def setup_session(self) -> None:
)

def close(self) -> None:
response = requests.delete(
f"{self.config.connect_uri}/api/session",
headers={"X-Metabase-Session": self.access_token},
)
if response.status_code not in (200, 204):
self.report.report_failure(
title="Unable to Log User Out",
message=f"Unable to logout for user {self.config.username}",
# API key authentication does not require session closure.
if not self.config.api_key:
response = requests.delete(
f"{self.config.connect_uri}/api/session",
headers={"X-Metabase-Session": self.access_token},
)
if response.status_code not in (200, 204):
self.report.report_failure(
title="Unable to Log User Out",
message=f"Unable to logout for user {self.config.username}",
)
super().close()

def emit_dashboard_mces(self) -> Iterable[MetadataWorkUnit]:
Expand Down
83 changes: 83 additions & 0 deletions metadata-ingestion/tests/unit/test_metabase_source.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
from unittest.mock import MagicMock, patch

import pydantic

from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.metabase import (
MetabaseConfig,
Expand Down Expand Up @@ -52,3 +56,82 @@ def test_set_display_uri():

assert config.connect_uri == "localhost:3000"
assert config.display_uri == display_uri


@patch("requests.session")
def test_connection_uses_api_key_if_in_config(mock_session):
metabase_config = MetabaseConfig(
connect_uri="localhost:3000", api_key=pydantic.SecretStr("key")
)
ctx = PipelineContext(run_id="metabase-test-apikey")

mock_session_instance = MagicMock()
mock_session_instance.headers = {}
mock_session.return_value = mock_session_instance

mock_response = MagicMock()
mock_response.status_code = 200
mock_session_instance.get.return_value = mock_response

metabase_source = MetabaseSource(ctx, metabase_config)
metabase_source.close()

mock_session_instance.get.assert_called_once_with("localhost:3000/api/user/current")
request_headers = mock_session_instance.headers
assert request_headers["x-api-key"] == "key"


@patch("requests.delete")
@patch("requests.Session.get")
@patch("requests.post")
def test_create_session_from_config_username_password(mock_post, mock_get, mock_delete):
metabase_config = MetabaseConfig(
connect_uri="localhost:3000", username="un", password=pydantic.SecretStr("pwd")
)
ctx = PipelineContext(run_id="metabase-test")

mock_response = MagicMock()
mock_response.status_code = 200
mock_get.return_value = mock_response
mock_post.return_value = mock_response
mock_delete.return_value = mock_response

metabase_source = MetabaseSource(ctx, metabase_config)
metabase_source.close()

kwargs_post = mock_post.call_args
assert kwargs_post[0][0] == "localhost:3000/api/session"
assert kwargs_post[0][2]["password"] == "pwd"
assert kwargs_post[0][2]["username"] == "un"

kwargs_get = mock_get.call_args
assert kwargs_get[0][0] == "localhost:3000/api/user/current"

mock_delete.assert_called_once()


@patch("requests.delete")
@patch("requests.Session.get")
@patch("requests.post")
def test_fail_session_delete(mock_post, mock_get, mock_delete):
metabase_config = MetabaseConfig(
connect_uri="localhost:3000", username="un", password=pydantic.SecretStr("pwd")
)
ctx = PipelineContext(run_id="metabase-test")

mock_response = MagicMock()
mock_response.status_code = 200
mock_get.return_value = mock_response
mock_post.return_value = mock_response

mock_response_delete = MagicMock()
mock_response_delete.status_code = 400
mock_delete.return_value = mock_response_delete

mock_report = MagicMock()

metabase_source = MetabaseSource(ctx, metabase_config)
metabase_source.report = mock_report
metabase_source.close()

mock_report.report_failure.assert_called_once()

0 comments on commit c4e5a9e

Please sign in to comment.