Skip to content

Commit

Permalink
Change BigQueryCredentail to common function: GCPCredential
Browse files Browse the repository at this point in the history
  • Loading branch information
ryota-cloud committed Mar 3, 2025
1 parent 0eeeb72 commit 6c43ecc
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 89 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import json
import logging
import os
import re
import tempfile
from datetime import timedelta
from typing import Any, Dict, List, Optional, Union

Expand All @@ -17,10 +15,10 @@
PlatformInstanceConfigMixin,
)
from datahub.configuration.validate_field_removal import pydantic_removed_field
from datahub.configuration.validate_multiline_string import pydantic_multiline_string
from datahub.ingestion.glossary.classification_mixin import (
ClassificationSourceConfigMixin,
)
from datahub.ingestion.source.common.credentials import GCPCredential
from datahub.ingestion.source.data_lake_common.path_spec import PathSpec
from datahub.ingestion.source.sql.sql_config import SQLCommonConfig, SQLFilterConfig
from datahub.ingestion.source.state.stateful_ingestion_base import (
Expand Down Expand Up @@ -107,50 +105,8 @@ class BigQueryUsageConfig(BaseUsageConfig):
)


class BigQueryCredential(ConfigModel):
project_id: str = Field(description="Project id to set the credentials")
private_key_id: str = Field(description="Private key id")
private_key: str = Field(
description="Private key in a form of '-----BEGIN PRIVATE KEY-----\\nprivate-key\\n-----END PRIVATE KEY-----\\n'"
)
client_email: str = Field(description="Client email")
client_id: str = Field(description="Client Id")
auth_uri: str = Field(
default="https://accounts.google.com/o/oauth2/auth",
description="Authentication uri",
)
token_uri: str = Field(
default="https://oauth2.googleapis.com/token", description="Token uri"
)
auth_provider_x509_cert_url: str = Field(
default="https://www.googleapis.com/oauth2/v1/certs",
description="Auth provider x509 certificate url",
)
type: str = Field(default="service_account", description="Authentication type")
client_x509_cert_url: Optional[str] = Field(
default=None,
description="If not set it will be default to https://www.googleapis.com/robot/v1/metadata/x509/client_email",
)

_fix_private_key_newlines = pydantic_multiline_string("private_key")

@root_validator(skip_on_failure=True)
def validate_config(cls, values: Dict[str, Any]) -> Dict[str, Any]:
if values.get("client_x509_cert_url") is None:
values["client_x509_cert_url"] = (
f"https://www.googleapis.com/robot/v1/metadata/x509/{values['client_email']}"
)
return values

def create_credential_temp_file(self) -> str:
with tempfile.NamedTemporaryFile(delete=False) as fp:
cred_json = json.dumps(self.dict(), indent=4, separators=(",", ": "))
fp.write(cred_json.encode())
return fp.name


class BigQueryConnectionConfig(ConfigModel):
credential: Optional[BigQueryCredential] = Field(
credential: Optional[GCPCredential] = Field(
default=None, description="BigQuery credential informations"
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import json
import tempfile
from typing import Any, Dict, Optional

from pydantic import Field, root_validator

from datahub.configuration import ConfigModel
from datahub.configuration.validate_multiline_string import pydantic_multiline_string


class GCPCredential(ConfigModel):
project_id: Optional[str] = Field(description="Project id to set the credentials")
private_key_id: str = Field(description="Private key id")
private_key: str = Field(
description="Private key in a form of '-----BEGIN PRIVATE KEY-----\\nprivate-key\\n-----END PRIVATE KEY-----\\n'"
)
client_email: str = Field(description="Client email")
client_id: str = Field(description="Client Id")
auth_uri: str = Field(
default="https://accounts.google.com/o/oauth2/auth",
description="Authentication uri",
)
token_uri: str = Field(
default="https://oauth2.googleapis.com/token", description="Token uri"
)
auth_provider_x509_cert_url: str = Field(
default="https://www.googleapis.com/oauth2/v1/certs",
description="Auth provider x509 certificate url",
)
type: str = Field(default="service_account", description="Authentication type")
client_x509_cert_url: Optional[str] = Field(
default=None,
description="If not set it will be default to https://www.googleapis.com/robot/v1/metadata/x509/client_email",
)

_fix_private_key_newlines = pydantic_multiline_string("private_key")

@root_validator(skip_on_failure=True)
def validate_config(cls, values: Dict[str, Any]) -> Dict[str, Any]:
if values.get("client_x509_cert_url") is None:
values["client_x509_cert_url"] = (
f"https://www.googleapis.com/robot/v1/metadata/x509/{values['client_email']}"
)
return values

def create_credential_temp_file(self, project_id: Optional[str] = None) -> str:
configs = self.dict()
if project_id:
configs["project_id"] = project_id
with tempfile.NamedTemporaryFile(delete=False) as fp:
cred_json = json.dumps(self.dict(), indent=4, separators=(",", ": "))
fp.write(cred_json.encode())
return fp.name
42 changes: 1 addition & 41 deletions metadata-ingestion/src/datahub/ingestion/source/vertexai.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import json
import logging
import tempfile
import time
from typing import Any, Iterable, List, Optional, TypeVar

Expand All @@ -21,9 +19,7 @@

import datahub.emitter.mce_builder as builder
from datahub._codegen.aspect import _Aspect
from datahub.configuration import ConfigModel
from datahub.configuration.source_common import EnvConfigMixin
from datahub.configuration.validate_multiline_string import pydantic_multiline_string
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import ProjectIdKey, gen_containers
from datahub.ingestion.api.common import PipelineContext
Expand All @@ -37,6 +33,7 @@
from datahub.ingestion.api.source import Source, SourceCapability, SourceReport
from datahub.ingestion.api.source_helpers import auto_workunit
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.common.credentials import GCPCredential
from datahub.metadata.com.linkedin.pegasus2avro.ml.metadata import (
MLTrainingRunProperties,
)
Expand All @@ -59,43 +56,6 @@
logger = logging.getLogger(__name__)


class GCPCredential(ConfigModel):
private_key_id: str = Field(description="Private key id")
private_key: str = Field(
description="Private key in a form of '-----BEGIN PRIVATE KEY-----\\nprivate-key\\n-----END PRIVATE KEY-----\\n'"
)
client_email: str = Field(description="Client email")
client_id: str = Field(description="Client Id")
auth_uri: str = Field(
default="https://accounts.google.com/o/oauth2/auth",
description="Authentication uri",
)
token_uri: str = Field(
default="https://oauth2.googleapis.com/token", description="Token uri"
)
auth_provider_x509_cert_url: str = Field(
default="https://www.googleapis.com/oauth2/v1/certs",
description="Auth provider x509 certificate url",
)
type: str = Field(default="service_account", description="Authentication type")
client_x509_cert_url: Optional[str] = Field(
default=None,
description="If not set it will be default to https://www.googleapis.com/robot/v1/metadata/x509/client_email",
)

_fix_private_key_newlines = pydantic_multiline_string("private_key")

def create_credential_temp_file(self, project_id: Optional[str] = None) -> str:
# Adding project_id from the top level config
configs = self.dict()
if project_id:
configs["project_id"] = project_id
with tempfile.NamedTemporaryFile(delete=False) as fp:
cred_json = json.dumps(configs, indent=4, separators=(",", ": "))
fp.write(cred_json.encode())
return fp.name


class VertexAIConfig(EnvConfigMixin):
credential: Optional[GCPCredential] = Field(
default=None, description="GCP credential information"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from datahub.configuration.common import ConfigurationWarning
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryCredential
from datahub.ingestion.source.common.credentials import GCPCredential
from datahub.ingestion.source.fivetran.config import (
BigQueryDestinationConfig,
FivetranSourceConfig,
Expand Down Expand Up @@ -398,7 +398,7 @@ def test_fivetran_snowflake_destination_config():
@freeze_time(FROZEN_TIME)
def test_fivetran_bigquery_destination_config():
bigquery_dest = BigQueryDestinationConfig(
credential=BigQueryCredential(
credential=GCPCredential(
private_key_id="testprivatekey",
project_id="test-project",
client_email="fivetran-connector@test-project.iam.gserviceaccount.com",
Expand Down

0 comments on commit 6c43ecc

Please sign in to comment.