diff --git a/metadata-ingestion/examples/recipes/mlflow_to_datahub.dhub.yaml b/metadata-ingestion/examples/recipes/mlflow_to_datahub.dhub.yaml new file mode 100644 index 00000000000000..07e9ed5d786cd9 --- /dev/null +++ b/metadata-ingestion/examples/recipes/mlflow_to_datahub.dhub.yaml @@ -0,0 +1,9 @@ +source: + type: mlflow + config: + tracking_uri: "http://127.0.0.1:5000" + +sink: + type: datahub-rest + config: + server: "http://localhost:8080" \ No newline at end of file diff --git a/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py b/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py index 2d9bcfca91f2c2..bb6674fdc8e6d1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py +++ b/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py @@ -92,3 +92,8 @@ class BIAssetSubTypes(StrEnum): # SAP Analytics Cloud SAC_STORY = "Story" SAC_APPLICATION = "Application" + + +class MLAssetSubTypes(StrEnum): + MLFLOW_TRAINING_RUN = "ML Training Run" + MLFLOW_EXPERIMENT = "ML Experiment" diff --git a/metadata-ingestion/src/datahub/ingestion/source/mlflow.py b/metadata-ingestion/src/datahub/ingestion/source/mlflow.py index c614e6fe4c0933..c9e1e4b9796515 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/mlflow.py +++ b/metadata-ingestion/src/datahub/ingestion/source/mlflow.py @@ -1,17 +1,20 @@ +import time from dataclasses import dataclass from typing import Any, Callable, Iterable, List, Optional, TypeVar, Union from mlflow import MlflowClient -from mlflow.entities import Run +from mlflow.entities import Experiment, Run from mlflow.entities.model_registry import ModelVersion, RegisteredModel from mlflow.store.entities import PagedList from pydantic.fields import Field import datahub.emitter.mce_builder as builder -from datahub.configuration.source_common import ( - EnvConfigMixin, +from datahub.api.entities.dataprocess.dataprocess_instance import ( + DataProcessInstance, ) +from datahub.configuration.source_common import EnvConfigMixin from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.emitter.mcp_builder import ContainerKey from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.decorators import ( SupportStatus, @@ -26,6 +29,7 @@ SourceReport, ) from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.common.subtypes import MLAssetSubTypes from datahub.ingestion.source.state.stale_entity_removal_handler import ( StaleEntityRemovalHandler, StaleEntityRemovalSourceReport, @@ -35,20 +39,44 @@ StatefulIngestionSourceBase, ) from datahub.metadata.schema_classes import ( + AuditStampClass, + ContainerClass, + DataPlatformInstanceClass, + DataProcessInstanceOutputClass, + DataProcessInstancePropertiesClass, + DataProcessInstanceRunEventClass, + DataProcessInstanceRunResultClass, + DataProcessRunStatusClass, GlobalTagsClass, + MetadataAttributionClass, MLHyperParamClass, MLMetricClass, MLModelGroupPropertiesClass, MLModelPropertiesClass, + MLTrainingRunPropertiesClass, + PlatformResourceInfoClass, + SubTypesClass, TagAssociationClass, TagPropertiesClass, + TimeStampClass, + VersionPropertiesClass, VersionTagClass, _Aspect, ) +from datahub.metadata.urns import ( + DataPlatformUrn, + MlModelUrn, + VersionSetUrn, +) +from datahub.sdk.container import Container T = TypeVar("T") +class ContainerKeyWithId(ContainerKey): + id: str + + class MLflowConfig(StatefulIngestionConfigBase, EnvConfigMixin): tracking_uri: Optional[str] = Field( default=None, @@ -141,6 +169,7 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: yield from self._get_tags_workunits() + yield from self._get_experiment_workunits() yield from self._get_ml_model_workunits() def _get_tags_workunits(self) -> Iterable[MetadataWorkUnit]: @@ -174,22 +203,157 @@ def _create_workunit(self, urn: str, aspect: _Aspect) -> MetadataWorkUnit: aspect=aspect, ).as_workunit() - def _get_ml_model_workunits(self) -> Iterable[MetadataWorkUnit]: - """ - Traverse each Registered Model in Model Registry and generate a corresponding workunit. - """ - registered_models = self._get_mlflow_registered_models() - for registered_model in registered_models: - yield self._get_ml_group_workunit(registered_model) - model_versions = self._get_mlflow_model_versions(registered_model) - for model_version in model_versions: - run = self._get_mlflow_run(model_version) - yield self._get_ml_model_properties_workunit( - registered_model=registered_model, - model_version=model_version, - run=run, - ) - yield self._get_global_tags_workunit(model_version=model_version) + def _get_experiment_workunits(self) -> Iterable[MetadataWorkUnit]: + experiments = self._get_mlflow_experiments() + for experiment in experiments: + yield from self._get_experiment_container_workunit(experiment) + + runs = self._get_mlflow_runs_from_experiment(experiment) + if runs: + for run in runs: + yield from self._get_run_workunits(experiment, run) + + def _get_experiment_custom_properties(self, experiment): + experiment_custom_props = getattr(experiment, "tags", {}) or {} + experiment_custom_props.pop("mlflow.note.content", None) + experiment_custom_props["artifacts_location"] = experiment.artifact_location + return experiment_custom_props + + def _get_experiment_container_workunit( + self, experiment: Experiment + ) -> Iterable[MetadataWorkUnit]: + experiment_container = Container( + container_key=ContainerKeyWithId( + platform=str(DataPlatformUrn(platform_name=self.platform)), + id=experiment.name, + ), + subtype=MLAssetSubTypes.MLFLOW_EXPERIMENT, + display_name=experiment.name, + description=experiment.tags.get("mlflow.note.content"), + extra_properties=self._get_experiment_custom_properties(experiment), + ) + + yield from experiment_container.as_workunits() + + def _get_run_metrics(self, run: Run) -> List[MLMetricClass]: + return [ + MLMetricClass(name=k, value=str(v)) for k, v in run.data.metrics.items() + ] + + def _get_run_params(self, run: Run) -> List[MLHyperParamClass]: + return [ + MLHyperParamClass(name=k, value=str(v)) for k, v in run.data.params.items() + ] + + def _convert_run_result_type( + self, status: str + ) -> DataProcessInstanceRunResultClass: + if status == "FINISHED": + return DataProcessInstanceRunResultClass( + type="SUCCESS", nativeResultType=self.platform + ) + elif status == "FAILED": + return DataProcessInstanceRunResultClass( + type="FAILURE", nativeResultType=self.platform + ) + else: + return DataProcessInstanceRunResultClass( + type="SKIPPED", nativeResultType=self.platform + ) + + def _get_run_workunits( + self, experiment: Experiment, run: Run + ) -> Iterable[MetadataWorkUnit]: + experiment_key = ContainerKeyWithId( + platform=str(DataPlatformUrn(self.platform)), id=experiment.name + ) + + data_process_instance = DataProcessInstance( + id=run.info.run_id, + orchestrator=self.platform, + template_urn=None, + ) + + created_time = run.info.start_time or int(time.time() * 1000) + user_id = run.info.user_id if run.info.user_id else "mlflow" + guid_dict_user = {"platform": self.platform, "user": user_id} + platform_user_urn = ( + f"urn:li:platformResource:{builder.datahub_guid(guid_dict_user)}" + ) + + yield MetadataChangeProposalWrapper( + entityUrn=platform_user_urn, + aspect=PlatformResourceInfoClass( + resourceType="user", + primaryKey=user_id, + ), + ).as_workunit() + + yield MetadataChangeProposalWrapper( + entityUrn=str(data_process_instance.urn), + aspect=DataProcessInstancePropertiesClass( + name=run.info.run_name or run.info.run_id, + created=AuditStampClass( + time=created_time, + actor=platform_user_urn, + ), + externalUrl=self._make_external_url_from_run(experiment, run), + customProperties=getattr(run, "tags", {}) or {}, + ), + ).as_workunit() + + yield MetadataChangeProposalWrapper( + entityUrn=str(data_process_instance.urn), + aspect=ContainerClass(container=experiment_key.as_urn()), + ).as_workunit() + + model_versions = self.get_mlflow_model_versions_from_run(run.info.run_id) + if model_versions: + model_version_urn = self._make_ml_model_urn(model_versions[0]) + yield MetadataChangeProposalWrapper( + entityUrn=str(data_process_instance.urn), + aspect=DataProcessInstanceOutputClass(outputs=[model_version_urn]), + ).as_workunit() + + metrics = self._get_run_metrics(run) + hyperparams = self._get_run_params(run) + yield MetadataChangeProposalWrapper( + entityUrn=str(data_process_instance.urn), + aspect=MLTrainingRunPropertiesClass( + hyperParams=hyperparams, + trainingMetrics=metrics, + outputUrls=[run.info.artifact_uri], + id=run.info.run_id, + ), + ).as_workunit() + + if run.info.end_time: + duration_millis = run.info.end_time - run.info.start_time + + yield MetadataChangeProposalWrapper( + entityUrn=str(data_process_instance.urn), + aspect=DataProcessInstanceRunEventClass( + status=DataProcessRunStatusClass.COMPLETE, + timestampMillis=run.info.end_time, + result=DataProcessInstanceRunResultClass( + type=self._convert_run_result_type(run.info.status).type, + nativeResultType=self.platform, + ), + durationMillis=duration_millis, + ), + ).as_workunit() + + yield MetadataChangeProposalWrapper( + entityUrn=str(data_process_instance.urn), + aspect=DataPlatformInstanceClass( + platform=str(DataPlatformUrn(self.platform)) + ), + ).as_workunit() + + yield MetadataChangeProposalWrapper( + entityUrn=str(data_process_instance.urn), + aspect=SubTypesClass(typeNames=[MLAssetSubTypes.MLFLOW_TRAINING_RUN]), + ).as_workunit() def _get_mlflow_registered_models(self) -> Iterable[RegisteredModel]: """ @@ -202,6 +366,19 @@ def _get_mlflow_registered_models(self) -> Iterable[RegisteredModel]: ) return registered_models + def _get_mlflow_experiments(self) -> Iterable[Experiment]: + experiments: Iterable[Experiment] = self._traverse_mlflow_search_func( + search_func=self.client.search_experiments, + ) + return experiments + + def _get_mlflow_runs_from_experiment(self, experiment: Experiment) -> Iterable[Run]: + runs: Iterable[Run] = self._traverse_mlflow_search_func( + search_func=self.client.search_runs, + experiment_ids=[experiment.experiment_id], + ) + return runs + @staticmethod def _traverse_mlflow_search_func( search_func: Callable[..., PagedList[T]], @@ -218,6 +395,13 @@ def _traverse_mlflow_search_func( if not next_page_token: return + def _get_latest_version(self, registered_model: RegisteredModel) -> Optional[str]: + return ( + str(registered_model.latest_versions[0].version) + if registered_model.latest_versions + else None + ) + def _get_ml_group_workunit( self, registered_model: RegisteredModel, @@ -229,7 +413,20 @@ def _get_ml_group_workunit( ml_model_group_properties = MLModelGroupPropertiesClass( customProperties=registered_model.tags, description=registered_model.description, - createdAt=registered_model.creation_timestamp, + created=TimeStampClass( + time=registered_model.creation_timestamp, actor=None + ), + lastModified=TimeStampClass( + time=registered_model.last_updated_timestamp, + actor=None, + ), + version=VersionTagClass( + versionTag=self._get_latest_version(registered_model), + metadataAttribution=MetadataAttributionClass( + time=registered_model.last_updated_timestamp, + actor="urn:li:corpuser:datahub", + ), + ), ) wu = self._create_workunit( urn=ml_model_group_urn, @@ -259,6 +456,16 @@ def _get_mlflow_model_versions( ) return model_versions + def get_mlflow_model_versions_from_run(self, run_id): + filter_string = f"run_id = '{run_id}'" + + model_versions: Iterable[ModelVersion] = self._traverse_mlflow_search_func( + search_func=self.client.search_model_versions, + filter_string=filter_string, + ) + + return list(model_versions) + def _get_mlflow_run(self, model_version: ModelVersion) -> Union[None, Run]: """ Get a Run associated with a Model Version. Some MVs may exist without Run. @@ -269,6 +476,67 @@ def _get_mlflow_run(self, model_version: ModelVersion) -> Union[None, Run]: else: return None + def _get_ml_model_workunits(self) -> Iterable[MetadataWorkUnit]: + """ + Traverse each Registered Model in Model Registry and generate a corresponding workunit. + """ + registered_models = self._get_mlflow_registered_models() + for registered_model in registered_models: + version_set_urn = self._get_version_set_urn(registered_model) + yield self._get_ml_group_workunit(registered_model) + model_versions = self._get_mlflow_model_versions(registered_model) + for model_version in model_versions: + run = self._get_mlflow_run(model_version) + yield self._get_ml_model_properties_workunit( + registered_model=registered_model, + model_version=model_version, + run=run, + ) + yield self._get_ml_model_version_properties_workunit( + model_version=model_version, + version_set_urn=version_set_urn, + ) + yield self._get_global_tags_workunit(model_version=model_version) + + def _get_version_set_urn(self, registered_model: RegisteredModel) -> VersionSetUrn: + guid_dict = {"platform": self.platform, "name": registered_model.name} + version_set_urn = VersionSetUrn( + id=builder.datahub_guid(guid_dict), + entity_type=MlModelUrn.ENTITY_TYPE, + ) + + return version_set_urn + + def _get_ml_model_version_properties_workunit( + self, + model_version: ModelVersion, + version_set_urn: VersionSetUrn, + ) -> MetadataWorkUnit: + ml_model_urn = self._make_ml_model_urn(model_version) + + # get mlmodel name from ml model urn + ml_model_version_properties = VersionPropertiesClass( + version=VersionTagClass( + versionTag=str(model_version.version), + metadataAttribution=MetadataAttributionClass( + time=model_version.creation_timestamp, + actor="urn:li:corpuser:datahub", + ), + ), + versionSet=str(version_set_urn), + sortId=str(model_version.version).zfill(10), + aliases=[ + VersionTagClass(versionTag=alias) for alias in model_version.aliases + ], + ) + + wu = MetadataChangeProposalWrapper( + entityUrn=str(ml_model_urn), + aspect=ml_model_version_properties, + ).as_workunit() + + return wu + def _get_ml_model_properties_workunit( self, registered_model: RegisteredModel, @@ -282,28 +550,47 @@ def _get_ml_model_properties_workunit( """ ml_model_group_urn = self._make_ml_model_group_urn(registered_model) ml_model_urn = self._make_ml_model_urn(model_version) + if run: - hyperparams = [ - MLHyperParamClass(name=k, value=str(v)) - for k, v in run.data.params.items() - ] - training_metrics = [ - MLMetricClass(name=k, value=str(v)) for k, v in run.data.metrics.items() - ] + # Use the same metrics and hyperparams from the run + hyperparams = self._get_run_params(run) + training_metrics = self._get_run_metrics(run) + run_urn = DataProcessInstance( + id=run.info.run_id, + orchestrator=self.platform, + ).urn + + training_jobs = [str(run_urn)] if run_urn else [] else: hyperparams = None training_metrics = None + training_jobs = [] + + created_time = model_version.creation_timestamp + created_actor = ( + f"urn:li:platformResource:{model_version.user_id}" + if model_version.user_id + else None + ) + model_version_tags = [f"{k}:{v}" for k, v in model_version.tags.items()] + ml_model_properties = MLModelPropertiesClass( customProperties=model_version.tags, externalUrl=self._make_external_url(model_version), + lastModified=TimeStampClass( + time=model_version.last_updated_timestamp, + actor=None, + ), description=model_version.description, - date=model_version.creation_timestamp, - version=VersionTagClass(versionTag=str(model_version.version)), + created=TimeStampClass( + time=created_time, + actor=created_actor, + ), hyperParams=hyperparams, trainingMetrics=training_metrics, - # mlflow tags are dicts, but datahub tags are lists. currently use only keys from mlflow tags - tags=list(model_version.tags.keys()), + tags=model_version_tags, groups=[ml_model_group_urn], + trainingJobs=training_jobs, ) wu = self._create_workunit(urn=ml_model_urn, aspect=ml_model_properties) return wu @@ -337,6 +624,15 @@ def _make_external_url(self, model_version: ModelVersion) -> Optional[str]: else: return None + def _make_external_url_from_run( + self, experiment: Experiment, run: Run + ) -> Union[None, str]: + base_uri = self.client.tracking_uri + if base_uri.startswith("http"): + return f"{base_uri.rstrip('/')}/#/experiments/{experiment.experiment_id}/runs/{run.info.run_id}" + else: + return None + def _get_global_tags_workunit( self, model_version: ModelVersion, @@ -356,3 +652,8 @@ def _get_global_tags_workunit( aspect=global_tags, ) return wu + + @classmethod + def create(cls, config_dict: dict, ctx: PipelineContext) -> "MLflowSource": + config = MLflowConfig.parse_obj(config_dict) + return cls(ctx, config) diff --git a/metadata-ingestion/tests/integration/mlflow/mlflow_mcps_golden.json b/metadata-ingestion/tests/integration/mlflow/mlflow_mcps_golden.json index c70625c74d9983..12daaf68628e23 100644 --- a/metadata-ingestion/tests/integration/mlflow/mlflow_mcps_golden.json +++ b/metadata-ingestion/tests/integration/mlflow/mlflow_mcps_golden.json @@ -13,7 +13,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "mlflow-source-test" + "runId": "mlflow-source-test", + "lastRunId": "no-run-id-provided" } }, { @@ -30,7 +31,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "mlflow-source-test" + "runId": "mlflow-source-test", + "lastRunId": "no-run-id-provided" } }, { @@ -47,7 +49,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "mlflow-source-test" + "runId": "mlflow-source-test", + "lastRunId": "no-run-id-provided" } }, { @@ -64,7 +67,188 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "mlflow-source-test" + "runId": "mlflow-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:03d28ec52349332a202a252cb2388d83", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "ML Experiment" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "mlflow-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:03d28ec52349332a202a252cb2388d83", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "urn:li:dataPlatform:mlflow", + "id": "Default", + "artifacts_location": "/private/var/folders/8_/y6mv42x92bl57_f1n4sp37s40000gn/T/pytest-of-yoonhyejin/pytest-48/test_ingestion0/mlruns/0" + }, + "name": "Default" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "mlflow-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:03d28ec52349332a202a252cb2388d83", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:mlflow" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "mlflow-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:03d28ec52349332a202a252cb2388d83", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "mlflow-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:e37dbaee9481246e0997d7d8fefd8815", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "ML Experiment" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "mlflow-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:e37dbaee9481246e0997d7d8fefd8815", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "urn:li:dataPlatform:mlflow", + "id": "test-experiment", + "artifacts_location": "/private/var/folders/8_/y6mv42x92bl57_f1n4sp37s40000gn/T/pytest-of-yoonhyejin/pytest-48/test_ingestion0/mlruns/766847104871454225" + }, + "name": "test-experiment" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "mlflow-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:e37dbaee9481246e0997d7d8fefd8815", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:mlflow" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "mlflow-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:e37dbaee9481246e0997d7d8fefd8815", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "mlflow-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "platformResource", + "entityUrn": "urn:li:platformResource:1bbe5a4af9f91bbd2b4dd90c552008e1", + "changeType": "UPSERT", + "aspectName": "platformResourceInfo", + "aspect": { + "json": { + "resourceType": "user", + "primaryKey": "unknown" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "mlflow-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:2666299269d6ebea994d5ec0c29e3aca", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceProperties", + "aspect": { + "json": { + "customProperties": {}, + "name": "test-run", + "created": { + "time": 1615443388097, + "actor": "urn:li:platformResource:unknown" + } + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "mlflow-source-test", + "lastRunId": "no-run-id-provided" } }, { @@ -79,12 +263,92 @@ "model_id": "1" }, "description": "This a test registered model", - "createdAt": 1615443388097 + "created": { + "time": 1615443388097 + }, + "lastModified": { + "time": 1615443388097 + }, + "version": { + "versionTag": "1", + "metadataAttribution": { + "time": 1615443388097, + "actor": "urn:li:corpuser:datahub", + "sourceDetail": {} + } + } } }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "mlflow-source-test" + "runId": "mlflow-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:2666299269d6ebea994d5ec0c29e3aca", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:e37dbaee9481246e0997d7d8fefd8815" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "mlflow-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:2666299269d6ebea994d5ec0c29e3aca", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceOutput", + "aspect": { + "json": { + "outputs": [ + "urn:li:mlModel:(urn:li:dataPlatform:mlflow,test-model_1,PROD)" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "mlflow-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:2666299269d6ebea994d5ec0c29e3aca", + "changeType": "UPSERT", + "aspectName": "mlTrainingRunProperties", + "aspect": { + "json": { + "customProperties": {}, + "id": "02660a3bee9941ed983667f678ce5611", + "outputUrls": [ + "/private/var/folders/8_/y6mv42x92bl57_f1n4sp37s40000gn/T/pytest-of-yoonhyejin/pytest-35/test_ingestion0/mlruns/766847104871454225/02660a3bee9941ed983667f678ce5611/artifacts" + ], + "hyperParams": [ + { + "name": "p", + "value": "1" + } + ], + "trainingMetrics": [ + { + "name": "m", + "value": "0.85" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "mlflow-source-test", + "lastRunId": "no-run-id-provided" } }, { @@ -97,9 +361,14 @@ "customProperties": { "model_version_id": "1" }, - "date": 1615443388097, - "version": { - "versionTag": "1" + "trainingJobs": [ + "urn:li:dataProcessInstance:2666299269d6ebea994d5ec0c29e3aca" + ], + "created": { + "time": 1615443388097 + }, + "lastModified": { + "time": 1615443388097 }, "hyperParams": [ { @@ -114,7 +383,7 @@ } ], "tags": [ - "model_version_id" + "model_version_id:1" ], "groups": [ "urn:li:mlModelGroup:(urn:li:dataPlatform:mlflow,test-model,PROD)" @@ -123,7 +392,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "mlflow-source-test" + "runId": "mlflow-source-test", + "lastRunId": "no-run-id-provided" } }, { @@ -142,7 +412,51 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "mlflow-source-test" + "runId": "mlflow-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:2666299269d6ebea994d5ec0c29e3aca", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:mlflow" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "mlflow-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "mlModel", + "entityUrn": "urn:li:mlModel:(urn:li:dataPlatform:mlflow,test-model_1,PROD)", + "changeType": "UPSERT", + "aspectName": "versionProperties", + "aspect": { + "json": { + "versionSet": "urn:li:versionSet:(658730e7177b283e4f94410fe4b41519,mlModel)", + "version": { + "versionTag": "1", + "metadataAttribution": { + "time": 1615443388097, + "actor": "urn:li:corpuser:datahub", + "sourceDetail": {} + } + }, + "aliases": [], + "sortId": "0000000001", + "versioningScheme": "LEXICOGRAPHIC_STRING" + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "mlflow-source-test", + "lastRunId": "no-run-id-provided" } }, { @@ -157,7 +471,26 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "mlflow-source-test" + "runId": "mlflow-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:2666299269d6ebea994d5ec0c29e3aca", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "ML Training Run" + ] + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "mlflow-source-test", + "lastRunId": "no-run-id-provided" } }, { @@ -172,7 +505,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "mlflow-source-test" + "runId": "mlflow-source-test", + "lastRunId": "no-run-id-provided" } }, { @@ -187,7 +521,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "mlflow-source-test" + "runId": "mlflow-source-test", + "lastRunId": "no-run-id-provided" } }, { @@ -202,7 +537,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "mlflow-source-test" + "runId": "mlflow-source-test", + "lastRunId": "no-run-id-provided" } }, { @@ -217,7 +553,8 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "mlflow-source-test" + "runId": "mlflow-source-test", + "lastRunId": "no-run-id-provided" } }, { @@ -232,7 +569,72 @@ }, "systemMetadata": { "lastObserved": 1615443388097, - "runId": "mlflow-source-test" + "runId": "mlflow-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "platformResource", + "entityUrn": "urn:li:platformResource:1bbe5a4af9f91bbd2b4dd90c552008e1", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "mlflow-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:e37dbaee9481246e0997d7d8fefd8815", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "mlflow-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:2666299269d6ebea994d5ec0c29e3aca", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "mlflow-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:03d28ec52349332a202a252cb2388d83", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1615443388097, + "runId": "mlflow-source-test", + "lastRunId": "no-run-id-provided" } } ] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/mlflow/test_mlflow_source.py b/metadata-ingestion/tests/integration/mlflow/test_mlflow_source.py index 155199d5a04e97..48ffcc5a4fe1ca 100644 --- a/metadata-ingestion/tests/integration/mlflow/test_mlflow_source.py +++ b/metadata-ingestion/tests/integration/mlflow/test_mlflow_source.py @@ -1,3 +1,4 @@ +import uuid from pathlib import Path from typing import Any, Dict, TypeVar @@ -41,11 +42,15 @@ def pipeline_config(tracking_uri: str, sink_file_path: str) -> Dict[str, Any]: @pytest.fixture -def generate_mlflow_data(tracking_uri: str) -> None: +def generate_mlflow_data(tracking_uri: str, monkeypatch: pytest.MonkeyPatch) -> None: + test_uuid = "02660a3bee9941ed983667f678ce5611" + monkeypatch.setattr(uuid, "uuid4", lambda: uuid.UUID(test_uuid)) + client = MlflowClient(tracking_uri=tracking_uri) experiment_name = "test-experiment" run_name = "test-run" model_name = "test-model" + test_experiment_id = client.create_experiment(experiment_name) test_run = client.create_run( experiment_id=test_experiment_id, @@ -93,7 +98,10 @@ def test_ingestion( golden_file_path = ( pytestconfig.rootpath / "tests/integration/mlflow/mlflow_mcps_golden.json" ) - + ignore_paths = [ + r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['artifacts_location'\]", + r"root\[\d+\]\['aspect'\]\['json'\]\['outputUrls'\]", + ] pipeline = Pipeline.create(pipeline_config) pipeline.run() pipeline.pretty_print_summary() @@ -103,4 +111,5 @@ def test_ingestion( pytestconfig=pytestconfig, output_path=sink_file_path, golden_path=golden_file_path, + ignore_paths=ignore_paths, ) diff --git a/smoke-test/tests/cypress/cypress/e2e/ml/experiment.js b/smoke-test/tests/cypress/cypress/e2e/ml/experiment.js new file mode 100644 index 00000000000000..8deceb888aaf75 --- /dev/null +++ b/smoke-test/tests/cypress/cypress/e2e/ml/experiment.js @@ -0,0 +1,46 @@ +describe("experiment", () => { + beforeEach(() => { + cy.visit("/"); + cy.login(); + }); + + it("can visit experiment end run", () => { + // Then visit the specific page + cy.visit( + "/container/urn:li:container:airline_forecast_experiment/Summary?is_lineage_mode=false", + ); + + cy.contains("Airline Forecast Experiment"); + cy.contains("Experiment to forecast airline passenger numbers"); + + // the model has a training run + cy.contains("Simple Training Run").click(); + cy.contains("Airline Forecast Experiment"); + }); + + it("can visit container and run", () => { + cy.visit("/"); + cy.login(); + cy.visit( + "/dataProcessInstance/urn:li:dataProcessInstance:simple_training_run", + ); + + // the run has subtype, na + cy.contains("Simple Training Run"); + + // the run has its details + cy.contains("Failure"); + cy.contains("1 sec"); + cy.contains("simple_training_run"); + cy.contains("urn:li:corpuser:datahub"); + cy.contains("s3://my-bucket/output"); + + // the run has its metrics and parameters + cy.contains("accuracy"); + cy.contains("learning_rate"); + + // the run has a container and can visit it + cy.contains("Airline Forecast Experiment").click(); + cy.contains("Simple Training Run"); + }); +}); diff --git a/smoke-test/tests/cypress/cypress/e2e/ml/model_mlflow.js b/smoke-test/tests/cypress/cypress/e2e/ml/model_mlflow.js new file mode 100644 index 00000000000000..1cc85de0a4ac76 --- /dev/null +++ b/smoke-test/tests/cypress/cypress/e2e/ml/model_mlflow.js @@ -0,0 +1,85 @@ +describe("models", () => { + // Add global error handling + beforeEach(() => { + // This prevents test failures due to unhandled exceptions in the application + Cypress.on("uncaught:exception", (err, runnable) => { + console.error("Uncaught exception:", err); + return false; // Prevents Cypress from failing the test + }); + }); + + it("can visit mlflow model groups", () => { + // Monitor GraphQL requests to debug API issues + cy.intercept("POST", "/api/v2/graphql*").as("graphqlRequest"); + + // Visit with improved waiting for page load + cy.visitWithLogin( + "/mlModelGroup/urn:li:mlModelGroup:(urn:li:dataPlatform:mlflow,sample_ml_model_group,PROD)", + ); + + // Wait for initial GraphQL request to complete + cy.wait("@graphqlRequest"); + + // Ensure page has loaded by checking for specific content + cy.contains("2025-03-03").should("be.visible"); + cy.contains("2025-03-04").should("be.visible"); + cy.contains("urn:li:corpuser:datahub").should("be.visible"); + + // Navigate to Properties tab with verification + cy.url().should("include", "mlModelGroup"); + cy.get('[data-node-key="Properties"]').should("be.visible").first().click(); + + // Wait for content to load after tab change + cy.contains("data_science").should("be.visible"); + + // Navigate to Models tab with verification + cy.get('[data-node-key="Models"]').should("be.visible").click(); + + // Wait for models to load + cy.contains("SAMPLE ML MODEL").should("be.visible"); + cy.contains("A sample ML model").should("be.visible"); + + // Click model with verification + cy.contains("SAMPLE ML MODEL").click(); + + // Verify model details page loaded + cy.contains("A sample ML model").should("be.visible"); + }); + + it("can visit mlflow model", () => { + // Monitor GraphQL requests + cy.intercept("POST", "/api/v2/graphql*").as("graphqlRequest"); + + cy.visitWithLogin( + "/mlModels/urn:li:mlModel:(urn:li:dataPlatform:mlflow,sample_ml_model,PROD)", + ); + + // Wait for initial data load + cy.wait("@graphqlRequest"); + + // Verify model metadata + cy.contains("Simple Training Run").should("be.visible"); + cy.contains("A sample ML model").should("be.visible"); + cy.contains("val_loss").should("be.visible"); + cy.contains("max_depth").should("be.visible"); + + // Navigate to Properties tab with verification + cy.contains("Properties").should("be.visible").click(); + + // Wait for properties to load + cy.contains("data_science").should("be.visible"); + + // Navigate to Group tab with verification + cy.contains("Group").should("be.visible").click(); + + // Wait for group data to load + cy.contains("SAMPLE ML MODEL GROUP").should("be.visible"); + cy.contains("A sample ML model group").should("be.visible"); + + // Click model group with verification + cy.contains("SAMPLE ML MODEL GROUP").click(); + + // Verify group details page loaded + cy.contains("A sample ML model group").should("be.visible"); + }); +}); diff --git a/smoke-test/tests/cypress/cypress/e2e/ml/model.js b/smoke-test/tests/cypress/cypress/e2e/ml/model_sagemaker.js similarity index 89% rename from smoke-test/tests/cypress/cypress/e2e/ml/model.js rename to smoke-test/tests/cypress/cypress/e2e/ml/model_sagemaker.js index 9a351100c000f0..d6d3fca0ee6c4a 100644 --- a/smoke-test/tests/cypress/cypress/e2e/ml/model.js +++ b/smoke-test/tests/cypress/cypress/e2e/ml/model_sagemaker.js @@ -1,5 +1,5 @@ describe("models", () => { - it("can visit models and groups", () => { + it("can visit sagemaker models and groups", () => { cy.visitWithLogin( "/mlModels/urn:li:mlModel:(urn:li:dataPlatform:sagemaker,cypress-model,PROD)/Summary?is_lineage_mode=false", ); @@ -21,7 +21,7 @@ describe("models", () => { cy.contains("cypress-model-package-group"); }); - it("can visit models and groups", () => { + it("can visit sagemaker models and groups", () => { cy.visitWithLogin( "/mlModelGroup/urn:li:mlModelGroup:(urn:li:dataPlatform:sagemaker,cypress-model-package-group,PROD)", ); diff --git a/smoke-test/tests/cypress/data.json b/smoke-test/tests/cypress/data.json index 8d4ab371d30bf9..b5cc7caa7cc49d 100644 --- a/smoke-test/tests/cypress/data.json +++ b/smoke-test/tests/cypress/data.json @@ -2581,5 +2581,168 @@ "contentType": "application/json" }, "systemMetadata": null + }, + { + "entityType": "container", + "entityUrn": "urn:li:container:airline_forecast_experiment", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "team": "forecasting" + }, + "name": "Airline Forecast Experiment", + "description": "Experiment to forecast airline passenger numbers", + "created": { + "time": 1628580000000, + "actor": "urn:li:corpuser:datahub" + }, + "lastModified": { + "time": 1628580000000, + "actor": "urn:li:corpuser:datahub" + } + } + } + }, + { + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:simple_training_run", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceProperties", + "aspect": { + "json": { + "customProperties": { + "team": "forecasting" + }, + "name": "Simple Training Run", + "created": { + "time": 1628580000000, + "actor": "urn:li:corpuser:datahub" + } + } + } + }, + { + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:simple_training_run", + "changeType": "UPSERT", + "aspectName": "mlTrainingRunProperties", + "aspect": { + "json": { + "customProperties": {}, + "externalUrl": "https:localhost:5000", + "id": "simple_training_run", + "outputUrls": ["s3://my-bucket/output"], + "hyperParams": [ + { + "name": "learning_rate", + "value": "0.01" + } + ], + "trainingMetrics": [ + { + "name": "accuracy", + "value": "0.9" + } + ] + } + } + }, + { + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:simple_training_run", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1628580001000, + "partitionSpec": { + "partition": "FULL_TABLE_SNAPSHOT", + "type": "FULL_TABLE" + }, + "status": "COMPLETE", + "result": { + "type": "FAILURE", + "nativeResultType": "mlflow" + }, + "durationMillis": 1000 + } + } + }, + { + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:simple_training_run", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:airline_forecast_experiment" + } + } + }, + { + "entityType": "mlModelGroup", + "entityUrn": "urn:li:mlModelGroup:(urn:li:dataPlatform:mlflow,sample_ml_model_group,PROD)", + "changeType": "UPSERT", + "aspectName": "mlModelGroupProperties", + "aspect": { + "json": { + "customProperties": { + "team": "data_science" + }, + "trainingJobs": ["urn:li:dataProcessInstance:simple_training_run"], + "name": "SAMPLE ML MODEL GROUP", + "description": "A sample ML model group", + "created": { + "time": 1741096069000, + "actor": "urn:li:corpuser:datahub" + }, + "lastModified": { + "time": 1741000000000, + "actor": "urn:li:corpuser:datahub" + } + } + } + }, + { + "entityType": "mlModel", + "entityUrn": "urn:li:mlModel:(urn:li:dataPlatform:mlflow,sample_ml_model,PROD)", + "changeType": "UPSERT", + "aspectName": "mlModelProperties", + "aspect": { + "json": { + "customProperties": { + "team": "data_science" + }, + "trainingJobs": ["urn:li:dataProcessInstance:simple_training_run"], + "name": "SAMPLE ML MODEL", + "description": "A sample ML model", + "created": { + "time": 1741096069000, + "actor": "urn:li:corpuser:datahub" + }, + "lastModified": { + "time": 1741000000000, + "actor": "urn:li:corpuser:datahub" + }, + "hyperParams": [ + { + "name": "max_depth", + "value": "3" + } + ], + "trainingMetrics": [ + { + "name": "val_loss", + "value": "0.1" + } + ], + "tags": [], + "groups": [ + "urn:li:mlModelGroup:(urn:li:dataPlatform:mlflow,sample_ml_model_group,PROD)" + ] + } + } } ]