diff --git a/datahub-web-react/src/app/ingest/source/builder/sources.json b/datahub-web-react/src/app/ingest/source/builder/sources.json index 102cce0f491e3..c288221483026 100644 --- a/datahub-web-react/src/app/ingest/source/builder/sources.json +++ b/datahub-web-react/src/app/ingest/source/builder/sources.json @@ -333,5 +333,12 @@ "description": "Import Nodes and Relationships from Neo4j.", "docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/neo4j/", "recipe": "source:\n type: 'neo4j'\n config:\n uri: 'neo4j+ssc://host:7687'\n username: 'neo4j'\n password: 'password'\n env: 'PROD'\n\nsink:\n type: \"datahub-rest\"\n config:\n server: 'http://localhost:8080'" + }, + { + "urn": "urn:li:dataPlatform:vertexai", + "name": "vertexai", + "displayName": "VertexAI", + "docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/vertexai/", + "recipe": "source:\n type: vertexai\n config:\n project_id: # you GCP project ID \n region: # region where your GCP project resides \n # Credentials\n # Add GCP credentials" } ] diff --git a/datahub-web-react/src/images/vertexai.png b/datahub-web-react/src/images/vertexai.png new file mode 100644 index 0000000000000..93b43b7d61200 Binary files /dev/null and b/datahub-web-react/src/images/vertexai.png differ diff --git a/metadata-ingestion/docs/sources/vertexai/vertexai_pre.md b/metadata-ingestion/docs/sources/vertexai/vertexai_pre.md new file mode 100644 index 0000000000000..c4a9c7924fb73 --- /dev/null +++ b/metadata-ingestion/docs/sources/vertexai/vertexai_pre.md @@ -0,0 +1,48 @@ +Ingesting metadata from VertexAI requires using the **Vertex AI** module. + +#### Prerequisites +Please refer to the [Vertex AI documentation](https://cloud.google.com/vertex-ai/docs) for basic information on Vertex AI. + +#### Credentials to access to GCP +Please read the section to understand how to set up application default Credentials to GCP [GCP docs](https://cloud.google.com/docs/authentication/provide-credentials-adc#how-to). + +#### Create a service account and assign roles + +1. Setup a ServiceAccount as per [GCP docs](https://cloud.google.com/iam/docs/creating-managing-service-accounts#iam-service-accounts-create-console) and assign the previously created role to this service account. +2. Download a service account JSON keyfile. +- Example credential file: + +```json +{ + "type": "service_account", + "project_id": "project-id-1234567", + "private_key_id": "d0121d0000882411234e11166c6aaa23ed5d74e0", + "private_key": "-----BEGIN PRIVATE KEY-----\nMIIyourkey\n-----END PRIVATE KEY-----", + "client_email": "test@suppproject-id-1234567.iam.gserviceaccount.com", + "client_id": "113545814931671546333", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://oauth2.googleapis.com/token", + "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", + "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/test%suppproject-id-1234567.iam.gserviceaccount.com" +} +``` + +3. To provide credentials to the source, you can either: + +- Set an environment variable: + + ```sh + $ export GOOGLE_APPLICATION_CREDENTIALS="/path/to/keyfile.json" + ``` + + _or_ + +- Set credential config in your source based on the credential json file. For example: + + ```yml + credential: + private_key_id: "d0121d0000882411234e11166c6aaa23ed5d74e0" + private_key: "-----BEGIN PRIVATE KEY-----\nMIIyourkey\n-----END PRIVATE KEY-----\n" + client_email: "test@suppproject-id-1234567.iam.gserviceaccount.com" + client_id: "123456678890" + ``` diff --git a/metadata-ingestion/docs/sources/vertexai/vertexai_recipe.yml b/metadata-ingestion/docs/sources/vertexai/vertexai_recipe.yml new file mode 100644 index 0000000000000..78135700225dc --- /dev/null +++ b/metadata-ingestion/docs/sources/vertexai/vertexai_recipe.yml @@ -0,0 +1,16 @@ +source: + type: vertexai + config: + project_id: "acryl-poc" + region: "us-west2" +# Note that GOOGLE_APPLICATION_CREDENTIALS or credential section below is required for authentication. +# credential: +# private_key: '-----BEGIN PRIVATE KEY-----\\nprivate-key\\n-----END PRIVATE KEY-----\\n' +# private_key_id: "project_key_id" +# client_email: "client_email" +# client_id: "client_id" + +sink: + type: "datahub-rest" + config: + server: "http://localhost:8080" diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index fbeba1e510b64..7e172ed9cc2bc 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -532,6 +532,7 @@ "sigma": sqlglot_lib | {"requests"}, "sac": sac, "neo4j": {"pandas", "neo4j"}, + "vertexai": {"google-cloud-aiplatform>=1.80.0"}, } # This is mainly used to exclude plugins from the Docker image. @@ -677,6 +678,7 @@ "sac", "cassandra", "neo4j", + "vertexai", ] if plugin for dependency in plugins[plugin] @@ -710,6 +712,7 @@ "mariadb", "redash", "vertica", + "vertexai" ] if plugin for dependency in plugins[plugin] @@ -799,6 +802,7 @@ "sac = datahub.ingestion.source.sac.sac:SACSource", "cassandra = datahub.ingestion.source.cassandra.cassandra:CassandraSource", "neo4j = datahub.ingestion.source.neo4j.neo4j_source:Neo4jSource", + "vertexai = datahub.ingestion.source.vertexai:VertexAISource", ], "datahub.ingestion.transformer.plugins": [ "pattern_cleanup_ownership = datahub.ingestion.transformer.pattern_cleanup_ownership:PatternCleanUpOwnership", diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index 57bfa2e3090d3..1f777feeccf78 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -1,8 +1,6 @@ -import json import logging import os import re -import tempfile from datetime import timedelta from typing import Any, Dict, List, Optional, Union @@ -17,10 +15,10 @@ PlatformInstanceConfigMixin, ) from datahub.configuration.validate_field_removal import pydantic_removed_field -from datahub.configuration.validate_multiline_string import pydantic_multiline_string from datahub.ingestion.glossary.classification_mixin import ( ClassificationSourceConfigMixin, ) +from datahub.ingestion.source.common.gcp_credentials_config import GCPCredential from datahub.ingestion.source.data_lake_common.path_spec import PathSpec from datahub.ingestion.source.sql.sql_config import SQLCommonConfig, SQLFilterConfig from datahub.ingestion.source.state.stateful_ingestion_base import ( @@ -107,50 +105,8 @@ class BigQueryUsageConfig(BaseUsageConfig): ) -class BigQueryCredential(ConfigModel): - project_id: str = Field(description="Project id to set the credentials") - private_key_id: str = Field(description="Private key id") - private_key: str = Field( - description="Private key in a form of '-----BEGIN PRIVATE KEY-----\\nprivate-key\\n-----END PRIVATE KEY-----\\n'" - ) - client_email: str = Field(description="Client email") - client_id: str = Field(description="Client Id") - auth_uri: str = Field( - default="https://accounts.google.com/o/oauth2/auth", - description="Authentication uri", - ) - token_uri: str = Field( - default="https://oauth2.googleapis.com/token", description="Token uri" - ) - auth_provider_x509_cert_url: str = Field( - default="https://www.googleapis.com/oauth2/v1/certs", - description="Auth provider x509 certificate url", - ) - type: str = Field(default="service_account", description="Authentication type") - client_x509_cert_url: Optional[str] = Field( - default=None, - description="If not set it will be default to https://www.googleapis.com/robot/v1/metadata/x509/client_email", - ) - - _fix_private_key_newlines = pydantic_multiline_string("private_key") - - @root_validator(skip_on_failure=True) - def validate_config(cls, values: Dict[str, Any]) -> Dict[str, Any]: - if values.get("client_x509_cert_url") is None: - values["client_x509_cert_url"] = ( - f"https://www.googleapis.com/robot/v1/metadata/x509/{values['client_email']}" - ) - return values - - def create_credential_temp_file(self) -> str: - with tempfile.NamedTemporaryFile(delete=False) as fp: - cred_json = json.dumps(self.dict(), indent=4, separators=(",", ": ")) - fp.write(cred_json.encode()) - return fp.name - - class BigQueryConnectionConfig(ConfigModel): - credential: Optional[BigQueryCredential] = Field( + credential: Optional[GCPCredential] = Field( default=None, description="BigQuery credential informations" ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/common/gcp_credentials_config.py b/metadata-ingestion/src/datahub/ingestion/source/common/gcp_credentials_config.py new file mode 100644 index 0000000000000..a1c9cac9319c8 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/common/gcp_credentials_config.py @@ -0,0 +1,53 @@ +import json +import tempfile +from typing import Any, Dict, Optional + +from pydantic import Field, root_validator + +from datahub.configuration import ConfigModel +from datahub.configuration.validate_multiline_string import pydantic_multiline_string + + +class GCPCredential(ConfigModel): + project_id: Optional[str] = Field(description="Project id to set the credentials") + private_key_id: str = Field(description="Private key id") + private_key: str = Field( + description="Private key in a form of '-----BEGIN PRIVATE KEY-----\\nprivate-key\\n-----END PRIVATE KEY-----\\n'" + ) + client_email: str = Field(description="Client email") + client_id: str = Field(description="Client Id") + auth_uri: str = Field( + default="https://accounts.google.com/o/oauth2/auth", + description="Authentication uri", + ) + token_uri: str = Field( + default="https://oauth2.googleapis.com/token", description="Token uri" + ) + auth_provider_x509_cert_url: str = Field( + default="https://www.googleapis.com/oauth2/v1/certs", + description="Auth provider x509 certificate url", + ) + type: str = Field(default="service_account", description="Authentication type") + client_x509_cert_url: Optional[str] = Field( + default=None, + description="If not set it will be default to https://www.googleapis.com/robot/v1/metadata/x509/client_email", + ) + + _fix_private_key_newlines = pydantic_multiline_string("private_key") + + @root_validator(skip_on_failure=True) + def validate_config(cls, values: Dict[str, Any]) -> Dict[str, Any]: + if values.get("client_x509_cert_url") is None: + values["client_x509_cert_url"] = ( + f"https://www.googleapis.com/robot/v1/metadata/x509/{values['client_email']}" + ) + return values + + def create_credential_temp_file(self, project_id: Optional[str] = None) -> str: + configs = self.dict() + if project_id: + configs["project_id"] = project_id + with tempfile.NamedTemporaryFile(delete=False) as fp: + cred_json = json.dumps(configs, indent=4, separators=(",", ": ")) + fp.write(cred_json.encode()) + return fp.name diff --git a/metadata-ingestion/src/datahub/ingestion/source/vertexai.py b/metadata-ingestion/src/datahub/ingestion/source/vertexai.py new file mode 100644 index 0000000000000..d1d0b9043b18e --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/vertexai.py @@ -0,0 +1,715 @@ +import dataclasses +import logging +from collections import defaultdict +from datetime import datetime +from typing import Any, Iterable, List, Optional, TypeVar + +from google.api_core.exceptions import GoogleAPICallError +from google.cloud import aiplatform +from google.cloud.aiplatform import ( + AutoMLForecastingTrainingJob, + AutoMLImageTrainingJob, + AutoMLTabularTrainingJob, + AutoMLTextTrainingJob, + AutoMLVideoTrainingJob, + Endpoint, +) +from google.cloud.aiplatform.base import VertexAiResourceNoun +from google.cloud.aiplatform.models import Model, VersionInfo +from google.oauth2 import service_account +from pydantic import PrivateAttr +from pydantic.fields import Field + +import datahub.emitter.mce_builder as builder +from datahub._codegen.aspect import _Aspect +from datahub.configuration.source_common import EnvConfigMixin +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.emitter.mcp_builder import ProjectIdKey, gen_containers +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.decorators import ( + SupportStatus, + capability, + config_class, + platform_name, + support_status, +) +from datahub.ingestion.api.source import Source, SourceCapability, SourceReport +from datahub.ingestion.api.source_helpers import auto_workunit +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.common.gcp_credentials_config import GCPCredential +from datahub.metadata.com.linkedin.pegasus2avro.ml.metadata import ( + MLTrainingRunProperties, +) +from datahub.metadata.schema_classes import ( + AuditStampClass, + ContainerClass, + DataProcessInstanceInputClass, + DataProcessInstancePropertiesClass, + DatasetPropertiesClass, + MLModelDeploymentPropertiesClass, + MLModelGroupPropertiesClass, + MLModelPropertiesClass, + SubTypesClass, + TimeStampClass, + VersionTagClass, +) +from datahub.utilities.str_enum import StrEnum +from datahub.utilities.time import datetime_to_ts_millis + +T = TypeVar("T") + +logger = logging.getLogger(__name__) + + +class VertexAIConfig(EnvConfigMixin): + credential: Optional[GCPCredential] = Field( + default=None, description="GCP credential information" + ) + project_id: str = Field(description=("Project ID in Google Cloud Platform")) + region: str = Field( + description=("Region of your project in Google Cloud Platform"), + ) + bucket_uri: Optional[str] = Field( + default=None, + description=("Bucket URI used in your project"), + ) + vertexai_url: Optional[str] = Field( + default="https://console.cloud.google.com/vertex-ai", + description=("VertexUI URI"), + ) + _credentials_path: Optional[str] = PrivateAttr(None) + + def __init__(self, **data: Any): + super().__init__(**data) + + if self.credential: + self._credentials_path = self.credential.create_credential_temp_file( + self.project_id + ) + logger.debug( + f"Creating temporary credential file at {self._credentials_path}" + ) + + +class MLTypes(StrEnum): + TRAINING_JOB = "Training Job" + MODEL = "ML Model" + MODEL_GROUP = "ML Model Group" + ENDPOINT = "Endpoint" + DATASET = "Dataset" + PROJECT = "Project" + + +@dataclasses.dataclass +class TrainingJobMetadata: + job: VertexAiResourceNoun + input_dataset: Optional[VertexAiResourceNoun] = None + output_model: Optional[Model] = None + output_model_version: Optional[VersionInfo] = None + + +@dataclasses.dataclass +class ModelMetadata: + model: Model + model_version: VersionInfo + training_job_urn: Optional[str] = None + endpoints: Optional[List[Endpoint]] = None + + +@platform_name("Vertex AI", id="vertexai") +@config_class(VertexAIConfig) +@support_status(SupportStatus.TESTING) +@capability( + SourceCapability.DESCRIPTIONS, + "Extract descriptions for Vertex AI Registered Models and Model Versions", +) +@capability(SourceCapability.TAGS, "Extract tags for Vertex AI Registered Model Stages") +class VertexAISource(Source): + platform: str = "vertexai" + + def __init__(self, ctx: PipelineContext, config: VertexAIConfig): + super().__init__(ctx) + self.config = config + self.report = SourceReport() + + credentials = ( + service_account.Credentials.from_service_account_file( + self.config._credentials_path + ) + if self.config.credential + else None + ) + + aiplatform.init( + project=config.project_id, location=config.region, credentials=credentials + ) + self.client = aiplatform + self.endpoints: Optional[dict] = None + self.datasets: Optional[dict] = None + + def get_report(self) -> SourceReport: + return self.report + + def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: + """ + Main Function to fetch and yields mcps for various VertexAI resources. + - Models and Model Versions from the Model Registry + - Training Jobs + """ + + # Ingest Project + yield from self._gen_project_workunits() + # Fetch and Ingest Models, Model Versions a from Model Registry + yield from auto_workunit(self._get_ml_models_mcps()) + # Fetch and Ingest Training Jobs + yield from auto_workunit(self._get_training_jobs_mcps()) + # TODO Fetch Experiments and Experiment Runs + + def _gen_project_workunits(self) -> Iterable[MetadataWorkUnit]: + yield from gen_containers( + container_key=self._get_project_container(), + name=self.config.project_id, + sub_types=[MLTypes.PROJECT], + ) + + def _get_ml_models_mcps(self) -> Iterable[MetadataChangeProposalWrapper]: + """ + Fetch List of Models in Model Registry and generate a corresponding mcp. + """ + registered_models = self.client.Model.list() + for model in registered_models: + # create mcp for Model Group (= Model in VertexAI) + yield from self._gen_ml_group_mcps(model) + model_versions = model.versioning_registry.list_versions() + for model_version in model_versions: + # create mcp for Model (= Model Version in VertexAI) + logger.info( + f"Ingesting a model (name: {model.display_name} id:{model.name})" + ) + yield from self._get_ml_model_mcps( + model=model, model_version=model_version + ) + + def _get_ml_model_mcps( + self, model: Model, model_version: VersionInfo + ) -> Iterable[MetadataChangeProposalWrapper]: + model_meta: ModelMetadata = self._get_ml_model_metadata(model, model_version) + # Create ML Model Entity + yield from self._gen_ml_model_mcps(model_meta) + # Create Endpoint Entity + yield from self._gen_endpoint_mcps(model_meta) + + def _get_ml_model_metadata( + self, model: Model, model_version: VersionInfo + ) -> ModelMetadata: + model_meta = ModelMetadata(model=model, model_version=model_version) + # Search for endpoints associated with the model + endpoints = self._search_endpoint(model) + model_meta.endpoints = endpoints + return model_meta + + def _get_training_jobs_mcps(self) -> Iterable[MetadataChangeProposalWrapper]: + """ + Fetches training jobs from Vertex AI and generates corresponding mcps. + This method retrieves various types of training jobs from Vertex AI, including + CustomJob, CustomTrainingJob, CustomContainerTrainingJob, CustomPythonPackageTrainingJob, + AutoMLTabularTrainingJob, AutoMLTextTrainingJob, AutoMLImageTrainingJob, AutoMLVideoTrainingJob, + and AutoMLForecastingTrainingJob. For each job, it generates mcps containing metadata + about the job, its inputs, and its outputs. + """ + class_names = [ + "CustomJob", + "CustomTrainingJob", + "CustomContainerTrainingJob", + "CustomPythonPackageTrainingJob", + "AutoMLTabularTrainingJob", + "AutoMLTextTrainingJob", + "AutoMLImageTrainingJob", + "AutoMLVideoTrainingJob", + "AutoMLForecastingTrainingJob", + ] + # Iterate over class names and call the list() function + for class_name in class_names: + logger.info(f"Fetching a list of {class_name}s from VertexAI server") + for job in getattr(self.client, class_name).list(): + yield from self._get_training_job_mcps(job) + + def _get_training_job_mcps( + self, job: VertexAiResourceNoun + ) -> Iterable[MetadataChangeProposalWrapper]: + job_meta: TrainingJobMetadata = self._get_training_job_metadata(job) + # Create DataProcessInstance for the training job + yield from self._gen_training_job_mcps(job_meta) + # Create Dataset entity for Input Dataset of Training job + yield from self._get_input_dataset_mcps(job_meta) + # Create ML Model entity for output ML model of this training job + yield from self._gen_output_model_mcps(job_meta) + + def _gen_output_model_mcps( + self, job_meta: TrainingJobMetadata + ) -> Iterable[MetadataChangeProposalWrapper]: + if job_meta.output_model and job_meta.output_model_version: + job = job_meta.job + job_urn = builder.make_data_process_instance_urn( + self._make_vertexai_job_name(entity_id=job.name) + ) + + yield from self._gen_ml_model_mcps( + ModelMetadata( + model=job_meta.output_model, + model_version=job_meta.output_model_version, + training_job_urn=job_urn, + ) + ) + + def _gen_training_job_mcps( + self, job_meta: TrainingJobMetadata + ) -> Iterable[MetadataChangeProposalWrapper]: + """ + Generate a mcp for VertexAI Training Job + """ + job = job_meta.job + job_id = self._make_vertexai_job_name(entity_id=job.name) + job_urn = builder.make_data_process_instance_urn(job_id) + + created_time = ( + datetime_to_ts_millis(job.create_time) + if job.create_time + else datetime_to_ts_millis(datetime.now()) + ) + created_actor = f"urn:li:platformResource:{self.platform}" + + aspects: List[_Aspect] = list() + aspects.append( + DataProcessInstancePropertiesClass( + name=job_id, + created=AuditStampClass( + time=created_time, + actor=created_actor, + ), + externalUrl=self._make_job_external_url(job), + customProperties={ + "displayName": job.display_name, + "jobType": job.__class__.__name__, + }, + ) + ) + aspects.append( + MLTrainingRunProperties( + externalUrl=self._make_job_external_url(job), id=job.name + ) + ) + aspects.append(SubTypesClass(typeNames=[MLTypes.TRAINING_JOB])) + aspects.append(ContainerClass(container=self._get_project_container().as_urn())) + + # If Training job has Input Dataset + if job_meta.input_dataset: + dataset_urn = builder.make_dataset_urn( + platform=self.platform, + name=self._make_vertexai_dataset_name( + entity_id=job_meta.input_dataset.name + ), + env=self.config.env, + ) + aspects.append( + DataProcessInstanceInputClass(inputs=[dataset_urn]), + ) + + yield from MetadataChangeProposalWrapper.construct_many( + job_urn, aspects=aspects + ) + + def _gen_ml_group_mcps( + self, + model: Model, + ) -> Iterable[MetadataChangeProposalWrapper]: + """ + Generate an MLModelGroup mcp for a VertexAI Model. + """ + ml_model_group_urn = self._make_ml_model_group_urn(model) + + aspects: List[_Aspect] = list() + aspects.append( + MLModelGroupPropertiesClass( + name=self._make_vertexai_model_group_name(model.name), + description=model.description, + created=TimeStampClass(time=datetime_to_ts_millis(model.create_time)) + if model.create_time + else None, + lastModified=TimeStampClass( + time=datetime_to_ts_millis(model.update_time) + ) + if model.update_time + else None, + customProperties={"displayName": model.display_name}, + ) + ) + + # TODO add following when metadata model for mlgroup is updated (these aspects not supported currently) + # aspects.append(SubTypesClass(typeNames=[MLTypes.MODEL_GROUP])) + # aspects.append(ContainerClass(container=self._get_project_container().as_urn())) + + yield from MetadataChangeProposalWrapper.construct_many( + ml_model_group_urn, aspects=aspects + ) + + def _make_ml_model_group_urn(self, model: Model) -> str: + urn = builder.make_ml_model_group_urn( + platform=self.platform, + group_name=self._make_vertexai_model_group_name(model.name), + env=self.config.env, + ) + return urn + + def _get_project_container(self) -> ProjectIdKey: + return ProjectIdKey(project_id=self.config.project_id, platform=self.platform) + + def _is_automl_job(self, job: VertexAiResourceNoun) -> bool: + return ( + isinstance(job, AutoMLTabularTrainingJob) + or isinstance(job, AutoMLTextTrainingJob) + or isinstance(job, AutoMLImageTrainingJob) + or isinstance(job, AutoMLVideoTrainingJob) + or isinstance(job, AutoMLForecastingTrainingJob) + ) + + def _search_model_version( + self, model: Model, version_id: str + ) -> Optional[VersionInfo]: + for version in model.versioning_registry.list_versions(): + if version.version_id == version_id: + return version + return None + + def _search_dataset(self, dataset_id: str) -> Optional[VertexAiResourceNoun]: + """ + Search for a dataset by its ID in Vertex AI. + This method iterates through different types of datasets (Text, Tabular, Image, + TimeSeries, and Video) to find a dataset that matches the given dataset ID. + """ + + dataset_types = [ + "TextDataset", + "TabularDataset", + "ImageDataset", + "TimeSeriesDataset", + "VideoDataset", + ] + + if self.datasets is None: + self.datasets = dict() + + for dtype in dataset_types: + dataset_class = getattr(self.client.datasets, dtype) + for ds in dataset_class.list(): + self.datasets[ds.name] = ds + + return self.datasets.get(dataset_id) if dataset_id in self.datasets else None + + def _get_input_dataset_mcps( + self, job_meta: TrainingJobMetadata + ) -> Iterable[MetadataChangeProposalWrapper]: + """ + Create a DatasetPropertiesClass aspect for a given Vertex AI dataset. + """ + ds = job_meta.input_dataset + + if ds: + # Create URN of Input Dataset for Training Job + dataset_name = self._make_vertexai_dataset_name(entity_id=ds.name) + dataset_urn = builder.make_dataset_urn( + platform=self.platform, + name=dataset_name, + env=self.config.env, + ) + + # Create aspects for the dataset + aspects: List[_Aspect] = list() + aspects.append( + DatasetPropertiesClass( + name=self._make_vertexai_dataset_name(ds.name), + created=TimeStampClass(time=datetime_to_ts_millis(ds.create_time)) + if ds.create_time + else None, + description=f"Dataset: {ds.display_name}", + customProperties={ + "displayName": ds.display_name, + "resourceName": ds.resource_name, + }, + qualifiedName=ds.resource_name, + ) + ) + + aspects.append(SubTypesClass(typeNames=[MLTypes.DATASET])) + # Create a container for Project as parent of the dataset + aspects.append( + ContainerClass(container=self._get_project_container().as_urn()) + ) + yield from MetadataChangeProposalWrapper.construct_many( + dataset_urn, aspects=aspects + ) + + def _get_training_job_metadata( + self, job: VertexAiResourceNoun + ) -> TrainingJobMetadata: + """ + Retrieve metadata for a given Vertex AI training job. + This method extracts metadata for a Vertex AI training job, including input datasets + and output models. It checks if the job is an AutoML job and retrieves the relevant + input dataset and output model information. + """ + + job_meta = TrainingJobMetadata(job=job) + + # Check if the job is an AutoML job + if self._is_automl_job(job): + # Check if input dataset is present in the job configuration + if ( + hasattr(job, "_gca_resource") + and hasattr(job._gca_resource, "input_data_config") + and hasattr(job._gca_resource.input_data_config, "dataset_id") + ): + # Create URN of Input Dataset for Training Job + dataset_id = job._gca_resource.input_data_config.dataset_id + logger.info( + f"Found input dataset (id: {dataset_id}) for training job ({job.display_name})" + ) + + if dataset_id: + input_ds = self._search_dataset(dataset_id) + if input_ds: + logger.info( + f"Found the name of input dataset ({input_ds.display_name}) with dataset id ({dataset_id})" + ) + job_meta.input_dataset = input_ds + + # Check if output model is present in the job configuration + if hasattr(job, "_gca_resource") and hasattr( + job._gca_resource, "model_to_upload" + ): + model_version_str = job._gca_resource.model_to_upload.version_id + model_name = job._gca_resource.model_to_upload.name + try: + model = Model(model_name=model_name) + model_version = self._search_model_version(model, model_version_str) + if model and model_version: + logger.info( + f"Found output model (name:{model.display_name} id:{model_version_str}) " + f"for training job: {job.display_name}" + ) + job_meta.output_model = model + job_meta.output_model_version = model_version + except GoogleAPICallError: + logger.error( + f"Error while fetching model version {model_version_str}" + ) + return job_meta + + def _gen_endpoint_mcps( + self, model_meta: ModelMetadata + ) -> Iterable[MetadataChangeProposalWrapper]: + model: Model = model_meta.model + model_version: VersionInfo = model_meta.model_version + + if model_meta.endpoints: + for endpoint in model_meta.endpoints: + endpoint_urn = builder.make_ml_model_deployment_urn( + platform=self.platform, + deployment_name=self._make_vertexai_endpoint_name( + entity_id=endpoint.name + ), + env=self.config.env, + ) + + aspects: List[_Aspect] = list() + aspects.append( + MLModelDeploymentPropertiesClass( + description=model.description, + createdAt=datetime_to_ts_millis(endpoint.create_time), + version=VersionTagClass( + versionTag=str(model_version.version_id) + ), + customProperties={"displayName": endpoint.display_name}, + ) + ) + + # TODO add followings when metadata for MLModelDeployment is updated (these aspects not supported currently) + # aspects.append( + # ContainerClass(container=self._get_project_container().as_urn()) + # ) + # aspects.append(SubTypesClass(typeNames=[MLTypes.ENDPOINT])) + + yield from MetadataChangeProposalWrapper.construct_many( + endpoint_urn, aspects=aspects + ) + + def _gen_ml_model_mcps( + self, ModelMetadata: ModelMetadata + ) -> Iterable[MetadataChangeProposalWrapper]: + """ + Generate an MLModel and Endpoint mcp for an VertexAI Model Version. + """ + + model: Model = ModelMetadata.model + model_version: VersionInfo = ModelMetadata.model_version + training_job_urn: Optional[str] = ModelMetadata.training_job_urn + endpoints: Optional[List[Endpoint]] = ModelMetadata.endpoints + endpoint_urns: List[str] = list() + + logging.info(f"generating model mcp for {model.name}") + + # Generate list of endpoint URL + if endpoints: + for endpoint in endpoints: + logger.info( + f"found endpoint ({endpoint.display_name}) for model ({model.resource_name})" + ) + endpoint_urns.append( + builder.make_ml_model_deployment_urn( + platform=self.platform, + deployment_name=self._make_vertexai_endpoint_name( + entity_id=endpoint.display_name + ), + env=self.config.env, + ) + ) + + # Create URN for Model and Model Version + model_group_urn = self._make_ml_model_group_urn(model) + model_name = self._make_vertexai_model_name(entity_id=model.name) + model_version_name = f"{model_name}_{model_version.version_id}" + model_urn = self._make_ml_model_urn(model_version, model_name=model_name) + + # Create aspects for ML Model + aspects: List[_Aspect] = list() + + aspects.append( + MLModelPropertiesClass( + name=model_version_name, + description=model_version.version_description, + customProperties={ + "displayName": f"{model_version.model_display_name}", + "versionId": f"{model_version.version_id}", + "resourceName": model.resource_name, + }, + created=TimeStampClass( + datetime_to_ts_millis(model_version.version_create_time) + ) + if model_version.version_create_time + else None, + lastModified=TimeStampClass( + datetime_to_ts_millis(model_version.version_update_time) + ) + if model_version.version_update_time + else None, + version=VersionTagClass(versionTag=str(model_version.version_id)), + groups=[model_group_urn], # link model version to model group + trainingJobs=[training_job_urn] + if training_job_urn + else None, # link to training job + deployments=endpoint_urns, + externalUrl=self._make_model_version_external_url(model), + type="ML Model", + ) + ) + + # TODO Add a container for Project as parent of the dataset + # aspects.append( + # ContainerClass( + # container=self._get_project_container().as_urn(), + # ) + # ) + + yield from MetadataChangeProposalWrapper.construct_many( + entityUrn=model_urn, aspects=aspects + ) + + def _search_endpoint(self, model: Model) -> List[Endpoint]: + """ + Search for an endpoint associated with the model. + """ + if self.endpoints is None: + endpoint_dict = defaultdict(list) + for endpoint in self.client.Endpoint.list(): + for resource in endpoint.list_models(): + endpoint_dict[resource.model].append(endpoint) + self.endpoints = endpoint_dict + + endpoints = self.endpoints[model.resource_name] + return endpoints + + def _make_ml_model_urn(self, model_version: VersionInfo, model_name: str) -> str: + urn = builder.make_ml_model_urn( + platform=self.platform, + model_name=f"{model_name}_{model_version.version_id}", + env=self.config.env, + ) + return urn + + def _make_job_urn(self, job: VertexAiResourceNoun) -> str: + job_id = self._make_vertexai_job_name(entity_id=job.name) + urn = builder.make_data_process_instance_urn(dataProcessInstanceId=job_id) + return urn + + def _make_vertexai_model_group_name( + self, + entity_id: str, + ) -> str: + separator: str = "." + return f"{self.config.project_id}{separator}model_group{separator}{entity_id}" + + def _make_vertexai_endpoint_name(self, entity_id: str) -> str: + separator: str = "." + return f"{self.config.project_id}{separator}endpoint{separator}{entity_id}" + + def _make_vertexai_model_name(self, entity_id: str) -> str: + separator: str = "." + return f"{self.config.project_id}{separator}model{separator}{entity_id}" + + def _make_vertexai_dataset_name(self, entity_id: str) -> str: + separator: str = "." + return f"{self.config.project_id}{separator}dataset{separator}{entity_id}" + + def _make_vertexai_job_name( + self, + entity_id: Optional[str], + ) -> str: + separator: str = "." + return f"{self.config.project_id}{separator}job{separator}{entity_id}" + + def _make_job_external_url(self, job: VertexAiResourceNoun) -> str: + """ + Model external URL in Vertex AI + Sample URLs: + https://console.cloud.google.com/vertex-ai/training/training-pipelines?project=acryl-poc&trainingPipelineId=5401695018589093888 + """ + external_url: str = ( + f"{self.config.vertexai_url}/training/training-pipelines?trainingPipelineId={job.name}" + f"?project={self.config.project_id}" + ) + return external_url + + def _make_model_external_url(self, model: Model) -> str: + """ + Model external URL in Vertex AI + Sample URL: + https://console.cloud.google.com/vertex-ai/models/locations/us-west2/models/812468724182286336?project=acryl-poc + """ + external_url: str = ( + f"{self.config.vertexai_url}/models/locations/{self.config.region}/models/{model.name}" + f"?project={self.config.project_id}" + ) + return external_url + + def _make_model_version_external_url(self, model: Model) -> str: + """ + Model Version external URL in Vertex AI + Sample URL: + https://console.cloud.google.com/vertex-ai/models/locations/us-west2/models/812468724182286336/versions/1?project=acryl-poc + """ + external_url: str = ( + f"{self.config.vertexai_url}/models/locations/{self.config.region}/models/{model.name}" + f"/versions/{model.version_id}" + f"?project={self.config.project_id}" + ) + return external_url diff --git a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py index 9f53ae2382d40..1aab4e9419014 100644 --- a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py +++ b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py @@ -9,7 +9,7 @@ from datahub.configuration.common import ConfigurationWarning from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.run.pipeline import Pipeline -from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryCredential +from datahub.ingestion.source.common.gcp_credentials_config import GCPCredential from datahub.ingestion.source.fivetran.config import ( BigQueryDestinationConfig, FivetranSourceConfig, @@ -398,7 +398,7 @@ def test_fivetran_snowflake_destination_config(): @freeze_time(FROZEN_TIME) def test_fivetran_bigquery_destination_config(): bigquery_dest = BigQueryDestinationConfig( - credential=BigQueryCredential( + credential=GCPCredential( private_key_id="testprivatekey", project_id="test-project", client_email="fivetran-connector@test-project.iam.gserviceaccount.com", diff --git a/metadata-ingestion/tests/integration/vertexai/test_vertexai.py b/metadata-ingestion/tests/integration/vertexai/test_vertexai.py new file mode 100644 index 0000000000000..740f6d1dcbe37 --- /dev/null +++ b/metadata-ingestion/tests/integration/vertexai/test_vertexai.py @@ -0,0 +1,185 @@ +import contextlib +from pathlib import Path +from typing import Any, Dict, List, TypeVar +from unittest.mock import MagicMock, patch + +import pytest +from google.cloud.aiplatform import AutoMLTabularTrainingJob, CustomJob, Model +from google.cloud.aiplatform.base import VertexAiResourceNoun +from google.cloud.aiplatform.models import VersionInfo +from google.protobuf import timestamp_pb2 +from pytest import Config + +from datahub.ingestion.run.pipeline import Pipeline +from datahub.ingestion.source.vertexai import TrainingJobMetadata +from tests.test_helpers import mce_helpers + +T = TypeVar("T") + +PROJECT_ID = "test-project-id" +REGION = "us-west2" + + +@pytest.fixture +def sink_file_path(tmp_path: Path) -> str: + return str(tmp_path / "vertexai_source_mcps.json") + + +def get_pipeline_config(sink_file_path: str) -> Dict[str, Any]: + source_type = "vertexai" + return { + "run_id": "vertexai-source-test", + "source": { + "type": source_type, + "config": { + "project_id": PROJECT_ID, + "region": REGION, + }, + }, + "sink": { + "type": "file", + "config": { + "filename": sink_file_path, + }, + }, + } + + +def gen_mock_models() -> List[Model]: + mock_model_1 = MagicMock(spec=Model) + mock_model_1.name = "mock_prediction_model_1" + mock_model_1.create_time = timestamp_pb2.Timestamp().GetCurrentTime() + mock_model_1.update_time = timestamp_pb2.Timestamp().GetCurrentTime() + mock_model_1.version_id = "1" + mock_model_1.display_name = "mock_prediction_model_1_display_name" + mock_model_1.description = "mock_prediction_model_1_description" + mock_model_1.resource_name = "projects/123/locations/us-central1/models/456" + + mock_model_2 = MagicMock(spec=Model) + mock_model_2.name = "mock_prediction_model_2" + + mock_model_2.create_time = timestamp_pb2.Timestamp().GetCurrentTime() + mock_model_2.update_time = timestamp_pb2.Timestamp().GetCurrentTime() + mock_model_2.version_id = "1" + mock_model_2.display_name = "mock_prediction_model_2_display_name" + mock_model_2.description = "mock_prediction_model_1_description" + mock_model_2.resource_name = "projects/123/locations/us-central1/models/789" + + return [mock_model_1, mock_model_2] + + +def gen_mock_training_custom_job() -> CustomJob: + mock_training_job = MagicMock(spec=CustomJob) + mock_training_job.name = "mock_training_job" + mock_training_job.create_time = timestamp_pb2.Timestamp().GetCurrentTime() + mock_training_job.update_time = timestamp_pb2.Timestamp().GetCurrentTime() + mock_training_job.display_name = "mock_training_job_display_name" + mock_training_job.description = "mock_training_job_description" + + return mock_training_job + + +def gen_mock_training_automl_job() -> AutoMLTabularTrainingJob: + mock_automl_job = MagicMock(spec=AutoMLTabularTrainingJob) + mock_automl_job.name = "mock_auto_automl_tabular_job" + mock_automl_job.create_time = timestamp_pb2.Timestamp().GetCurrentTime() + mock_automl_job.update_time = timestamp_pb2.Timestamp().GetCurrentTime() + mock_automl_job.display_name = "mock_auto_automl_tabular_job_display_name" + mock_automl_job.description = "mock_auto_automl_tabular_job_display_name" + return mock_automl_job + + +def gen_mock_model_version(mock_model: Model) -> VersionInfo: + version = "1" + return VersionInfo( + version_id=version, + version_description="test", + version_create_time=timestamp_pb2.Timestamp().GetCurrentTime(), + version_update_time=timestamp_pb2.Timestamp().GetCurrentTime(), + model_display_name=mock_model.name, + model_resource_name=mock_model.resource_name, + ) + + +def gen_mock_dataset() -> VertexAiResourceNoun: + mock_dataset = MagicMock(spec=VertexAiResourceNoun) + mock_dataset.name = "mock_dataset" + mock_dataset.create_time = timestamp_pb2.Timestamp().GetCurrentTime() + mock_dataset.update_time = timestamp_pb2.Timestamp().GetCurrentTime() + mock_dataset.display_name = "mock_dataset_display_name" + mock_dataset.description = "mock_dataset_description" + mock_dataset.resource_name = "projects/123/locations/us-central1/datasets/456" + return mock_dataset + + +def test_vertexai_source_ingestion(pytestconfig: Config, sink_file_path: str) -> None: + mock_automl_job = gen_mock_training_automl_job() + mock_models = gen_mock_models() + mock_model_version = gen_mock_model_version(mock_models[0]) + mock_dataset = gen_mock_dataset() + + with contextlib.ExitStack() as exit_stack: + for func_to_mock in [ + "google.cloud.aiplatform.init", + "google.cloud.aiplatform.Model.list", + "google.cloud.aiplatform.datasets.TextDataset.list", + "google.cloud.aiplatform.datasets.TabularDataset.list", + "google.cloud.aiplatform.datasets.ImageDataset.list", + "google.cloud.aiplatform.datasets.TimeSeriesDataset.list", + "google.cloud.aiplatform.datasets.VideoDataset.list", + "google.cloud.aiplatform.CustomJob.list", + "google.cloud.aiplatform.CustomTrainingJob.list", + "google.cloud.aiplatform.CustomContainerTrainingJob.list", + "google.cloud.aiplatform.CustomPythonPackageTrainingJob.list", + "google.cloud.aiplatform.AutoMLTabularTrainingJob.list", + "google.cloud.aiplatform.AutoMLTextTrainingJob.list", + "google.cloud.aiplatform.AutoMLImageTrainingJob.list", + "google.cloud.aiplatform.AutoMLVideoTrainingJob.list", + "google.cloud.aiplatform.AutoMLForecastingTrainingJob.list", + "datahub.ingestion.source.vertexai.VertexAISource._get_training_job_metadata", + ]: + mock = exit_stack.enter_context(patch(func_to_mock)) + + if func_to_mock == "google.cloud.aiplatform.Model.list": + mock.return_value = gen_mock_models() + elif func_to_mock == "google.cloud.aiplatform.CustomJob.list": + mock.return_value = [ + gen_mock_training_custom_job(), + gen_mock_training_automl_job(), + ] + elif ( + func_to_mock == "google.cloud.aiplatform.AutoMLTabularTrainingJob.list" + ): + mock.return_value = [mock_automl_job] + elif ( + func_to_mock + == "datahub.ingestion.source.vertexai.VertexAISource._get_training_job_metadata" + ): + mock.return_value = TrainingJobMetadata( + job=mock_automl_job, + input_dataset=mock_dataset, + output_model=mock_models[0], + output_model_version=mock_model_version, + ) + + else: + mock.return_value = [] + + golden_file_path = ( + pytestconfig.rootpath + / "tests/integration/vertexai/vertexai_mcps_golden.json" + ) + + print(f"mcps file path: {str(sink_file_path)}") + print(f"golden file path: {str(golden_file_path)}") + + pipeline = Pipeline.create(get_pipeline_config(sink_file_path)) + pipeline.run() + pipeline.pretty_print_summary() + pipeline.raise_from_status() + + mce_helpers.check_golden_file( + pytestconfig=pytestconfig, + output_path=sink_file_path, + golden_path=golden_file_path, + ) diff --git a/metadata-ingestion/tests/integration/vertexai/vertexai_mcps_golden.json b/metadata-ingestion/tests/integration/vertexai/vertexai_mcps_golden.json new file mode 100644 index 0000000000000..1c6d57b423971 --- /dev/null +++ b/metadata-ingestion/tests/integration/vertexai/vertexai_mcps_golden.json @@ -0,0 +1,790 @@ +[ +{ + "entityType": "container", + "entityUrn": "urn:li:container:29746a9030349f4340ed74b46913dab6", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "vertexai", + "project_id": "test-project-id" + }, + "name": "test-project-id" + } + }, + "systemMetadata": { + "lastObserved": 1741205717242, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:29746a9030349f4340ed74b46913dab6", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1741205717242, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:29746a9030349f4340ed74b46913dab6", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:vertexai" + } + }, + "systemMetadata": { + "lastObserved": 1741205717243, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:29746a9030349f4340ed74b46913dab6", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Project" + ] + } + }, + "systemMetadata": { + "lastObserved": 1741205717243, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:29746a9030349f4340ed74b46913dab6", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [] + } + }, + "systemMetadata": { + "lastObserved": 1741205717243, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "mlModelGroup", + "entityUrn": "urn:li:mlModelGroup:(urn:li:dataPlatform:vertexai,test-project-id.model_group.mock_prediction_model_1,PROD)", + "changeType": "UPSERT", + "aspectName": "mlModelGroupProperties", + "aspect": { + "json": { + "customProperties": { + "displayName": "mock_prediction_model_1_display_name" + }, + "name": "test-project-id.model_group.mock_prediction_model_1", + "description": "mock_prediction_model_1_description" + } + }, + "systemMetadata": { + "lastObserved": 1741205717244, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "mlModelGroup", + "entityUrn": "urn:li:mlModelGroup:(urn:li:dataPlatform:vertexai,test-project-id.model_group.mock_prediction_model_2,PROD)", + "changeType": "UPSERT", + "aspectName": "mlModelGroupProperties", + "aspect": { + "json": { + "customProperties": { + "displayName": "mock_prediction_model_2_display_name" + }, + "name": "test-project-id.model_group.mock_prediction_model_2", + "description": "mock_prediction_model_1_description" + } + }, + "systemMetadata": { + "lastObserved": 1741205717245, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:test-project-id.job.mock_auto_automl_tabular_job", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceProperties", + "aspect": { + "json": { + "customProperties": { + "displayName": "mock_auto_automl_tabular_job_display_name", + "jobType": "AutoMLTabularTrainingJob" + }, + "externalUrl": "https://console.cloud.google.com/vertex-ai/training/training-pipelines?trainingPipelineId=mock_auto_automl_tabular_job?project=test-project-id", + "name": "test-project-id.job.mock_auto_automl_tabular_job", + "created": { + "time": 1741205717245, + "actor": "urn:li:platformResource:vertexai" + } + } + }, + "systemMetadata": { + "lastObserved": 1741205717246, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:test-project-id.job.mock_auto_automl_tabular_job", + "changeType": "UPSERT", + "aspectName": "mlTrainingRunProperties", + "aspect": { + "json": { + "customProperties": {}, + "externalUrl": "https://console.cloud.google.com/vertex-ai/training/training-pipelines?trainingPipelineId=mock_auto_automl_tabular_job?project=test-project-id", + "id": "mock_auto_automl_tabular_job" + } + }, + "systemMetadata": { + "lastObserved": 1741205717246, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:test-project-id.job.mock_auto_automl_tabular_job", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Training Job" + ] + } + }, + "systemMetadata": { + "lastObserved": 1741205717246, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:test-project-id.job.mock_auto_automl_tabular_job", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:29746a9030349f4340ed74b46913dab6" + } + }, + "systemMetadata": { + "lastObserved": 1741205717247, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:test-project-id.job.mock_auto_automl_tabular_job", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceInput", + "aspect": { + "json": { + "inputs": [ + "urn:li:dataset:(urn:li:dataPlatform:vertexai,test-project-id.dataset.mock_dataset,PROD)" + ] + } + }, + "systemMetadata": { + "lastObserved": 1741205717247, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:vertexai,test-project-id.dataset.mock_dataset,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": { + "displayName": "mock_dataset_display_name", + "resourceName": "projects/123/locations/us-central1/datasets/456" + }, + "name": "test-project-id.dataset.mock_dataset", + "qualifiedName": "projects/123/locations/us-central1/datasets/456", + "description": "Dataset: mock_dataset_display_name", + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1741205717247, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:vertexai,test-project-id.dataset.mock_dataset,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Dataset" + ] + } + }, + "systemMetadata": { + "lastObserved": 1741205717248, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:vertexai,test-project-id.dataset.mock_dataset,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:29746a9030349f4340ed74b46913dab6" + } + }, + "systemMetadata": { + "lastObserved": 1741205717248, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:vertexai,test-project-id.dataset.mock_dataset,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:29746a9030349f4340ed74b46913dab6", + "urn": "urn:li:container:29746a9030349f4340ed74b46913dab6" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1741205717248, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "mlModel", + "entityUrn": "urn:li:mlModel:(urn:li:dataPlatform:vertexai,test-project-id.model.mock_prediction_model_1_1,PROD)", + "changeType": "UPSERT", + "aspectName": "mlModelProperties", + "aspect": { + "json": { + "customProperties": { + "displayName": "mock_prediction_model_1", + "versionId": "1", + "resourceName": "projects/123/locations/us-central1/models/456" + }, + "externalUrl": "https://console.cloud.google.com/vertex-ai/models/locations/us-west2/models/mock_prediction_model_1/versions/1?project=test-project-id", + "trainingJobs": [ + "urn:li:dataProcessInstance:test-project-id.job.mock_auto_automl_tabular_job" + ], + "name": "test-project-id.model.mock_prediction_model_1_1", + "description": "test", + "version": { + "versionTag": "1" + }, + "type": "ML Model", + "tags": [], + "deployments": [], + "groups": [ + "urn:li:mlModelGroup:(urn:li:dataPlatform:vertexai,test-project-id.model_group.mock_prediction_model_1,PROD)" + ] + } + }, + "systemMetadata": { + "lastObserved": 1741205717249, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:test-project-id.job.mock_auto_automl_tabular_job", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceProperties", + "aspect": { + "json": { + "customProperties": { + "displayName": "mock_auto_automl_tabular_job_display_name", + "jobType": "AutoMLTabularTrainingJob" + }, + "externalUrl": "https://console.cloud.google.com/vertex-ai/training/training-pipelines?trainingPipelineId=mock_auto_automl_tabular_job?project=test-project-id", + "name": "test-project-id.job.mock_auto_automl_tabular_job", + "created": { + "time": 1741205717249, + "actor": "urn:li:platformResource:vertexai" + } + } + }, + "systemMetadata": { + "lastObserved": 1741205717249, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:test-project-id.job.mock_auto_automl_tabular_job", + "changeType": "UPSERT", + "aspectName": "mlTrainingRunProperties", + "aspect": { + "json": { + "customProperties": {}, + "externalUrl": "https://console.cloud.google.com/vertex-ai/training/training-pipelines?trainingPipelineId=mock_auto_automl_tabular_job?project=test-project-id", + "id": "mock_auto_automl_tabular_job" + } + }, + "systemMetadata": { + "lastObserved": 1741205717250, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:test-project-id.job.mock_auto_automl_tabular_job", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Training Job" + ] + } + }, + "systemMetadata": { + "lastObserved": 1741205717250, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:test-project-id.job.mock_auto_automl_tabular_job", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:29746a9030349f4340ed74b46913dab6" + } + }, + "systemMetadata": { + "lastObserved": 1741205717250, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:test-project-id.job.mock_auto_automl_tabular_job", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceInput", + "aspect": { + "json": { + "inputs": [ + "urn:li:dataset:(urn:li:dataPlatform:vertexai,test-project-id.dataset.mock_dataset,PROD)" + ] + } + }, + "systemMetadata": { + "lastObserved": 1741205717251, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:vertexai,test-project-id.dataset.mock_dataset,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": { + "displayName": "mock_dataset_display_name", + "resourceName": "projects/123/locations/us-central1/datasets/456" + }, + "name": "test-project-id.dataset.mock_dataset", + "qualifiedName": "projects/123/locations/us-central1/datasets/456", + "description": "Dataset: mock_dataset_display_name", + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1741205717251, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:vertexai,test-project-id.dataset.mock_dataset,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Dataset" + ] + } + }, + "systemMetadata": { + "lastObserved": 1741205717251, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:vertexai,test-project-id.dataset.mock_dataset,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:29746a9030349f4340ed74b46913dab6" + } + }, + "systemMetadata": { + "lastObserved": 1741205717252, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "mlModel", + "entityUrn": "urn:li:mlModel:(urn:li:dataPlatform:vertexai,test-project-id.model.mock_prediction_model_1_1,PROD)", + "changeType": "UPSERT", + "aspectName": "mlModelProperties", + "aspect": { + "json": { + "customProperties": { + "displayName": "mock_prediction_model_1", + "versionId": "1", + "resourceName": "projects/123/locations/us-central1/models/456" + }, + "externalUrl": "https://console.cloud.google.com/vertex-ai/models/locations/us-west2/models/mock_prediction_model_1/versions/1?project=test-project-id", + "trainingJobs": [ + "urn:li:dataProcessInstance:test-project-id.job.mock_auto_automl_tabular_job" + ], + "name": "test-project-id.model.mock_prediction_model_1_1", + "description": "test", + "version": { + "versionTag": "1" + }, + "type": "ML Model", + "tags": [], + "deployments": [], + "groups": [ + "urn:li:mlModelGroup:(urn:li:dataPlatform:vertexai,test-project-id.model_group.mock_prediction_model_1,PROD)" + ] + } + }, + "systemMetadata": { + "lastObserved": 1741205717252, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:test-project-id.job.mock_auto_automl_tabular_job", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceProperties", + "aspect": { + "json": { + "customProperties": { + "displayName": "mock_auto_automl_tabular_job_display_name", + "jobType": "AutoMLTabularTrainingJob" + }, + "externalUrl": "https://console.cloud.google.com/vertex-ai/training/training-pipelines?trainingPipelineId=mock_auto_automl_tabular_job?project=test-project-id", + "name": "test-project-id.job.mock_auto_automl_tabular_job", + "created": { + "time": 1741205717252, + "actor": "urn:li:platformResource:vertexai" + } + } + }, + "systemMetadata": { + "lastObserved": 1741205717253, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:test-project-id.job.mock_auto_automl_tabular_job", + "changeType": "UPSERT", + "aspectName": "mlTrainingRunProperties", + "aspect": { + "json": { + "customProperties": {}, + "externalUrl": "https://console.cloud.google.com/vertex-ai/training/training-pipelines?trainingPipelineId=mock_auto_automl_tabular_job?project=test-project-id", + "id": "mock_auto_automl_tabular_job" + } + }, + "systemMetadata": { + "lastObserved": 1741205717253, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:test-project-id.job.mock_auto_automl_tabular_job", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Training Job" + ] + } + }, + "systemMetadata": { + "lastObserved": 1741205717253, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:test-project-id.job.mock_auto_automl_tabular_job", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:29746a9030349f4340ed74b46913dab6" + } + }, + "systemMetadata": { + "lastObserved": 1741205717254, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:test-project-id.job.mock_auto_automl_tabular_job", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceInput", + "aspect": { + "json": { + "inputs": [ + "urn:li:dataset:(urn:li:dataPlatform:vertexai,test-project-id.dataset.mock_dataset,PROD)" + ] + } + }, + "systemMetadata": { + "lastObserved": 1741205717254, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:vertexai,test-project-id.dataset.mock_dataset,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": { + "displayName": "mock_dataset_display_name", + "resourceName": "projects/123/locations/us-central1/datasets/456" + }, + "name": "test-project-id.dataset.mock_dataset", + "qualifiedName": "projects/123/locations/us-central1/datasets/456", + "description": "Dataset: mock_dataset_display_name", + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1741205717254, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:vertexai,test-project-id.dataset.mock_dataset,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Dataset" + ] + } + }, + "systemMetadata": { + "lastObserved": 1741205717255, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:vertexai,test-project-id.dataset.mock_dataset,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:29746a9030349f4340ed74b46913dab6" + } + }, + "systemMetadata": { + "lastObserved": 1741205717255, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "mlModel", + "entityUrn": "urn:li:mlModel:(urn:li:dataPlatform:vertexai,test-project-id.model.mock_prediction_model_1_1,PROD)", + "changeType": "UPSERT", + "aspectName": "mlModelProperties", + "aspect": { + "json": { + "customProperties": { + "displayName": "mock_prediction_model_1", + "versionId": "1", + "resourceName": "projects/123/locations/us-central1/models/456" + }, + "externalUrl": "https://console.cloud.google.com/vertex-ai/models/locations/us-west2/models/mock_prediction_model_1/versions/1?project=test-project-id", + "trainingJobs": [ + "urn:li:dataProcessInstance:test-project-id.job.mock_auto_automl_tabular_job" + ], + "name": "test-project-id.model.mock_prediction_model_1_1", + "description": "test", + "version": { + "versionTag": "1" + }, + "type": "ML Model", + "tags": [], + "deployments": [], + "groups": [ + "urn:li:mlModelGroup:(urn:li:dataPlatform:vertexai,test-project-id.model_group.mock_prediction_model_1,PROD)" + ] + } + }, + "systemMetadata": { + "lastObserved": 1741205717255, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:test-project-id.job.mock_auto_automl_tabular_job", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1741205717256, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:vertexai,test-project-id.dataset.mock_dataset,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1741205717256, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "mlModel", + "entityUrn": "urn:li:mlModel:(urn:li:dataPlatform:vertexai,test-project-id.model.mock_prediction_model_1_1,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1741205717256, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "mlModelGroup", + "entityUrn": "urn:li:mlModelGroup:(urn:li:dataPlatform:vertexai,test-project-id.model_group.mock_prediction_model_1,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1741205717257, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "mlModelGroup", + "entityUrn": "urn:li:mlModelGroup:(urn:li:dataPlatform:vertexai,test-project-id.model_group.mock_prediction_model_2,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1741205717257, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/test_vertexai_source.py b/metadata-ingestion/tests/unit/test_vertexai_source.py new file mode 100644 index 0000000000000..c3dd3bd696480 --- /dev/null +++ b/metadata-ingestion/tests/unit/test_vertexai_source.py @@ -0,0 +1,446 @@ +import contextlib +import json +from datetime import datetime +from typing import List +from unittest.mock import MagicMock, patch + +import pytest +from google.cloud.aiplatform import AutoMLTabularTrainingJob +from google.cloud.aiplatform.base import VertexAiResourceNoun +from google.cloud.aiplatform.models import Endpoint, Model, VersionInfo +from google.protobuf import timestamp_pb2 + +import datahub.emitter.mce_builder as builder +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.source.vertexai import ( + MLTypes, + ModelMetadata, + TrainingJobMetadata, + VertexAIConfig, + VertexAISource, +) +from datahub.metadata.com.linkedin.pegasus2avro.ml.metadata import ( + MLModelGroupProperties, + MLModelProperties, +) +from datahub.metadata.schema_classes import ( + ContainerClass, + DataProcessInstanceInputClass, + DataProcessInstancePropertiesClass, + MLModelDeploymentPropertiesClass, + MLTrainingRunPropertiesClass, + SubTypesClass, +) + +PROJECT_ID = "acryl-poc" +REGION = "us-west2" + + +def gen_mock_model() -> Model: + mock_model_1 = MagicMock(spec=Model) + mock_model_1.name = "mock_prediction_model_1" + mock_model_1.create_time = timestamp_pb2.Timestamp().GetCurrentTime() + mock_model_1.update_time = timestamp_pb2.Timestamp().GetCurrentTime() + mock_model_1.version_id = "1" + mock_model_1.display_name = "mock_prediction_model_1_display_name" + mock_model_1.resource_name = ( + "projects/872197881936/locations/us-west2/models/3583871344875405312" + ) + return mock_model_1 + + +def gen_mock_models() -> List[Model]: + mock_model_1 = MagicMock(spec=Model) + mock_model_1.name = "mock_prediction_model_1" + mock_model_1.create_time = timestamp_pb2.Timestamp().GetCurrentTime() + mock_model_1.update_time = timestamp_pb2.Timestamp().GetCurrentTime() + mock_model_1.version_id = "1" + mock_model_1.display_name = "mock_prediction_model_1_display_name" + mock_model_1.description = "mock_prediction_model_1_description" + + mock_model_2 = MagicMock(spec=Model) + mock_model_2.name = "mock_prediction_model_2" + + mock_model_2.create_time = timestamp_pb2.Timestamp().GetCurrentTime() + mock_model_2.update_time = timestamp_pb2.Timestamp().GetCurrentTime() + mock_model_2.version_id = "1" + mock_model_2.display_name = "mock_prediction_model_2_display_name" + mock_model_2.description = "mock_prediction_model_1_description" + + return [mock_model_1, mock_model_2] + + +def gen_mock_training_job() -> VertexAiResourceNoun: + mock_training_job = MagicMock(spec=VertexAiResourceNoun) + mock_training_job.name = "mock_training_job" + mock_training_job.create_time = timestamp_pb2.Timestamp().GetCurrentTime() + mock_training_job.update_time = timestamp_pb2.Timestamp().GetCurrentTime() + mock_training_job.display_name = "mock_training_job_display_name" + mock_training_job.description = "mock_training_job_description" + return mock_training_job + + +def gen_mock_dataset() -> VertexAiResourceNoun: + mock_dataset = MagicMock(spec=VertexAiResourceNoun) + mock_dataset.name = "mock_dataset" + mock_dataset.create_time = timestamp_pb2.Timestamp().GetCurrentTime() + mock_dataset.update_time = timestamp_pb2.Timestamp().GetCurrentTime() + mock_dataset.display_name = "mock_dataset_display_name" + mock_dataset.description = "mock_dataset_description" + return mock_dataset + + +def gen_mock_training_automl_job() -> AutoMLTabularTrainingJob: + mock_automl_job = MagicMock(spec=AutoMLTabularTrainingJob) + mock_automl_job.name = "mock_auto_automl_tabular_job" + mock_automl_job.create_time = timestamp_pb2.Timestamp().GetCurrentTime() + mock_automl_job.update_time = timestamp_pb2.Timestamp().GetCurrentTime() + mock_automl_job.display_name = "mock_auto_automl_tabular_job_display_name" + mock_automl_job.description = "mock_auto_automl_tabular_job_display_name" + return mock_automl_job + + +def gen_mock_endpoint() -> Endpoint: + mock_endpoint = MagicMock(spec=Endpoint) + mock_endpoint.description = "test endpoint" + mock_endpoint.create_time = datetime.now() + mock_endpoint.display_name = "test endpoint display name" + return mock_endpoint + + +def gen_mock_model_version(mock_model: Model) -> VersionInfo: + version = "1" + return VersionInfo( + version_id=version, + version_description="test", + version_create_time=timestamp_pb2.Timestamp().GetCurrentTime(), + version_update_time=timestamp_pb2.Timestamp().GetCurrentTime(), + model_display_name=mock_model.name, + model_resource_name=mock_model.resource_name, + ) + + +@pytest.fixture +def source() -> VertexAISource: + return VertexAISource( + ctx=PipelineContext(run_id="vertexai-source-test"), + config=VertexAIConfig(project_id=PROJECT_ID, region=REGION), + ) + + +@patch("google.cloud.aiplatform.Model.list") +def test_get_ml_model_mcps(mock_list: List[Model], source: VertexAISource) -> None: + mock_models = gen_mock_models() + assert hasattr(mock_list, "return_value") # this check needed to go ground lint + mock_list.return_value = mock_models + + # Running _get_ml_models_mcps + actual_mcps = [mcp for mcp in source._get_ml_models_mcps()] + + actual_urns = [mcp.entityUrn for mcp in actual_mcps] + expected_urns = [] + for mock_model in mock_models: + expected_urns.append( + builder.make_ml_model_group_urn( + platform=source.platform, + group_name=source._make_vertexai_model_group_name(mock_model.name), + env=source.config.env, + ) + ) + + # expect 2 model groups + assert actual_urns == expected_urns + + for mcp in actual_mcps: + assert hasattr(mcp, "aspect") + aspect = mcp.aspect + if isinstance(aspect, MLModelGroupProperties): + assert ( + aspect.name + == f"{source._make_vertexai_model_group_name(mock_models[0].name)}" + or aspect.name + == f"{source._make_vertexai_model_group_name(mock_models[1].name)}" + ) + assert ( + aspect.description == mock_models[0].description + or aspect.description == mock_models[1].description + ) + + +def test_get_ml_model_properties_mcps( + source: VertexAISource, +) -> None: + mock_model = gen_mock_model() + model_version = gen_mock_model_version(mock_model) + model_meta = ModelMetadata(mock_model, model_version) + + # Run _gen_ml_model_mcps + mcp = [mcp for mcp in source._gen_ml_model_mcps(model_meta)] + assert len(mcp) == 1 + assert hasattr(mcp[0], "aspect") + aspect = mcp[0].aspect + assert isinstance(aspect, MLModelProperties) + assert ( + aspect.name + == f"{source._make_vertexai_model_name(mock_model.name)}_{mock_model.version_id}" + ) + assert aspect.description == model_version.version_description + assert aspect.date == model_version.version_create_time + assert aspect.hyperParams is None + + +def test_get_endpoint_mcps( + source: VertexAISource, +) -> None: + mock_model = gen_mock_model() + model_version = gen_mock_model_version(mock_model) + mock_endpoint = gen_mock_endpoint() + model_meta = ModelMetadata( + model=mock_model, model_version=model_version, endpoints=[mock_endpoint] + ) + + # Run _gen_endpoint_mcps + actual_mcps = [mcp for mcp in source._gen_endpoint_mcps(model_meta)] + actual_urns = [mcp.entityUrn for mcp in actual_mcps] + endpoint_urn = builder.make_ml_model_deployment_urn( + platform=source.platform, + deployment_name=source._make_vertexai_endpoint_name( + entity_id=mock_endpoint.name + ), + env=source.config.env, + ) + + expected_urns = [endpoint_urn] * 1 + # expect 1 endpoint urn + assert actual_urns == expected_urns + + for mcp in source._gen_endpoint_mcps(model_meta): + assert hasattr(mcp, "aspect") + aspect = mcp.aspect + if isinstance(aspect, MLModelDeploymentPropertiesClass): + assert aspect.description == mock_model.description + assert aspect.customProperties == { + "displayName": mock_endpoint.display_name + } + assert aspect.createdAt == int(mock_endpoint.create_time.timestamp() * 1000) + # TODO: Add following when container/subtype supported + # elif isinstance(aspect, ContainerClass): + # assert aspect.container == source._get_project_container().as_urn() + # elif isinstance(aspect, SubTypesClass): + # assert aspect.typeNames == ["Endpoint"] + + +def test_get_training_jobs_mcps( + source: VertexAISource, +) -> None: + mock_training_job = gen_mock_training_job() + mock_training_automl_job = gen_mock_training_automl_job() + with contextlib.ExitStack() as exit_stack: + for func_to_mock in [ + "google.cloud.aiplatform.init", + "google.cloud.aiplatform.CustomJob.list", + "google.cloud.aiplatform.CustomTrainingJob.list", + "google.cloud.aiplatform.CustomContainerTrainingJob.list", + "google.cloud.aiplatform.CustomPythonPackageTrainingJob.list", + "google.cloud.aiplatform.AutoMLTabularTrainingJob.list", + "google.cloud.aiplatform.AutoMLImageTrainingJob.list", + "google.cloud.aiplatform.AutoMLTextTrainingJob.list", + "google.cloud.aiplatform.AutoMLVideoTrainingJob.list", + "google.cloud.aiplatform.AutoMLForecastingTrainingJob.list", + ]: + mock = exit_stack.enter_context(patch(func_to_mock)) + if func_to_mock == "google.cloud.aiplatform.CustomJob.list": + mock.return_value = [mock_training_job] + else: + mock.return_value = [] + + """ + Test the retrieval of training jobs work units from Vertex AI. + This function mocks customJob and AutoMLTabularTrainingJob, + and verifies the properties of the work units + """ + + # Run _get_training_jobs_mcps + actual_mcps = [mcp for mcp in source._get_training_jobs_mcps()] + actual_urns = [mcp.entityUrn for mcp in actual_mcps] + expected_urns = [ + builder.make_data_process_instance_urn( + source._make_vertexai_job_name(mock_training_job.name) + ) + ] * 4 # expect 4 aspects + + assert actual_urns == expected_urns + + for mcp in actual_mcps: + assert hasattr(mcp, "aspect") + aspect = mcp.aspect + if isinstance(aspect, DataProcessInstancePropertiesClass): + assert ( + aspect.name + == f"{source.config.project_id}.job.{mock_training_job.name}" + or f"{source.config.project_id}.job.{mock_training_automl_job.name}" + ) + assert ( + aspect.customProperties["displayName"] + == mock_training_job.display_name + or mock_training_automl_job.display_name + ) + if isinstance(aspect, MLTrainingRunPropertiesClass): + assert aspect.id == mock_training_job.name + assert aspect.externalUrl == source._make_job_external_url( + mock_training_job + ) + if isinstance(aspect, SubTypesClass): + assert aspect.typeNames == [MLTypes.TRAINING_JOB] + + if isinstance(aspect, ContainerClass): + assert aspect.container == source._get_project_container().as_urn() + + +def test_gen_training_job_mcps(source: VertexAISource) -> None: + mock_training_job = gen_mock_training_job() + mock_dataset = gen_mock_dataset() + mock_job = gen_mock_training_job() + job_meta = TrainingJobMetadata(mock_job, input_dataset=mock_dataset) + + actual_mcps = [mcp for mcp in source._gen_training_job_mcps(job_meta)] + actual_urns = [mcp.entityUrn for mcp in actual_mcps] + expected_urns = [ + builder.make_data_process_instance_urn( + source._make_vertexai_job_name(mock_training_job.name) + ) + ] * 5 # expect 5 aspects under the same urn for the job + + assert actual_urns == expected_urns + + dataset_name = source._make_vertexai_dataset_name(entity_id=mock_dataset.name) + dataset_urn = builder.make_dataset_urn( + platform=source.platform, + name=dataset_name, + env=source.config.env, + ) + + for mcp in actual_mcps: + assert hasattr(mcp, "aspect") + aspect = mcp.aspect + if isinstance(aspect, DataProcessInstancePropertiesClass): + assert ( + aspect.name + == f"{source.config.project_id}.job.{mock_training_job.name}" + ) + assert ( + aspect.customProperties["displayName"] == mock_training_job.display_name + ) + if isinstance(aspect, MLTrainingRunPropertiesClass): + assert aspect.id == mock_training_job.name + assert aspect.externalUrl == source._make_job_external_url( + mock_training_job + ) + + if isinstance(aspect, SubTypesClass): + assert aspect.typeNames == [MLTypes.TRAINING_JOB] + + if isinstance(aspect, ContainerClass): + assert aspect.container == source._get_project_container().as_urn() + + if isinstance(aspect, DataProcessInstanceInputClass): + assert aspect.inputs == [dataset_urn] + + +def test_vertexai_config_init(): + config_data = { + "project_id": "test-project", + "region": "us-central1", + "bucket_uri": "gs://test-bucket", + "vertexai_url": "https://console.cloud.google.com/vertex-ai", + "credential": { + "private_key_id": "test-key-id", + "private_key": "-----BEGIN PRIVATE KEY-----\ntest-private-key\n-----END PRIVATE KEY-----\n", + "client_email": "test-email@test-project.iam.gserviceaccount.com", + "client_id": "test-client-id", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://oauth2.googleapis.com/token", + "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", + "type": "service_account", + }, + } + + config = VertexAIConfig(**config_data) + + assert config.project_id == "test-project" + assert config.region == "us-central1" + assert config.bucket_uri == "gs://test-bucket" + assert config.vertexai_url == "https://console.cloud.google.com/vertex-ai" + assert config.credential is not None + assert config.credential.private_key_id == "test-key-id" + assert ( + config.credential.private_key + == "-----BEGIN PRIVATE KEY-----\ntest-private-key\n-----END PRIVATE KEY-----\n" + ) + assert ( + config.credential.client_email + == "test-email@test-project.iam.gserviceaccount.com" + ) + assert config.credential.client_id == "test-client-id" + assert config.credential.auth_uri == "https://accounts.google.com/o/oauth2/auth" + assert config.credential.token_uri == "https://oauth2.googleapis.com/token" + assert ( + config.credential.auth_provider_x509_cert_url + == "https://www.googleapis.com/oauth2/v1/certs" + ) + + assert config._credentials_path is not None + with open(config._credentials_path, "r") as file: + content = json.loads(file.read()) + assert content["project_id"] == "test-project" + assert content["private_key_id"] == "test-key-id" + assert content["private_key_id"] == "test-key-id" + assert ( + content["private_key"] + == "-----BEGIN PRIVATE KEY-----\ntest-private-key\n-----END PRIVATE KEY-----\n" + ) + assert ( + content["client_email"] == "test-email@test-project.iam.gserviceaccount.com" + ) + assert content["client_id"] == "test-client-id" + assert content["auth_uri"] == "https://accounts.google.com/o/oauth2/auth" + assert content["token_uri"] == "https://oauth2.googleapis.com/token" + assert ( + content["auth_provider_x509_cert_url"] + == "https://www.googleapis.com/oauth2/v1/certs" + ) + + +def test_get_input_dataset_mcps(source: VertexAISource) -> None: + mock_dataset = gen_mock_dataset() + mock_job = gen_mock_training_job() + job_meta = TrainingJobMetadata(mock_job, input_dataset=mock_dataset) + + # Run _get_input_dataset_mcps + for mcp in source._get_input_dataset_mcps(job_meta): + assert hasattr(mcp, "aspect") + aspect = mcp.aspect + if isinstance(aspect, DataProcessInstancePropertiesClass): + assert aspect.name == f"{source._make_vertexai_job_name(mock_dataset.name)}" + assert aspect.customProperties["displayName"] == mock_dataset.display_name + elif isinstance(aspect, ContainerClass): + assert aspect.container == source._get_project_container().as_urn() + elif isinstance(aspect, SubTypesClass): + assert aspect.typeNames == ["Dataset"] + + +def test_make_model_external_url(source: VertexAISource) -> None: + mock_model = gen_mock_model() + assert ( + source._make_model_external_url(mock_model) + == f"{source.config.vertexai_url}/models/locations/{source.config.region}/models/{mock_model.name}" + f"?project={source.config.project_id}" + ) + + +def test_make_job_urn(source: VertexAISource) -> None: + mock_training_job = gen_mock_training_job() + assert ( + source._make_job_urn(mock_training_job) + == f"{builder.make_data_process_instance_urn(source._make_vertexai_job_name(mock_training_job.name))}" + ) diff --git a/metadata-service/configuration/src/main/resources/bootstrap_mcps/data-platforms.yaml b/metadata-service/configuration/src/main/resources/bootstrap_mcps/data-platforms.yaml index 4c4a7c2183073..a4a4e426258d4 100644 --- a/metadata-service/configuration/src/main/resources/bootstrap_mcps/data-platforms.yaml +++ b/metadata-service/configuration/src/main/resources/bootstrap_mcps/data-platforms.yaml @@ -747,4 +747,14 @@ displayName: Neo4j type: OTHERS logoUrl: "/assets/platforms/neo4j.png" +- entityUrn: urn:li:dataPlatform:vertexai + entityType: dataPlatform + aspectName: dataPlatformInfo + changeType: UPSERT + aspect: + datasetNameDelimiter: "." + name: vertexai + displayName: vertexai + type: OTHERS + logoUrl: "/assets/platforms/vertexai.png"