Skip to content

Commit

Permalink
feat(mssql): adds subtypes aspect for dataflow and datajobs
Browse files Browse the repository at this point in the history
  • Loading branch information
sgomezvillamor committed Mar 4, 2025
1 parent 3b4f383 commit db089a1
Show file tree
Hide file tree
Showing 6 changed files with 401 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
DataJobInfoClass,
DataJobInputOutputClass,
DataPlatformInstanceClass,
SubTypesClass,
)


Expand Down Expand Up @@ -211,6 +212,14 @@ def as_datajob_info_aspect(self) -> DataJobInfoClass:
status=self.status,
)

@property
def as_subtypes_aspect(self) -> SubTypesClass:
assert isinstance(self.entity, (JobStep, StoredProcedure))
type = "Job Step" if isinstance(self.entity, JobStep) else "Stored Procedure"
return SubTypesClass(
typeNames=[type],
)

@property
def as_maybe_platform_instance_aspect(self) -> Optional[DataPlatformInstanceClass]:
if self.entity.flow.platform_instance:
Expand Down Expand Up @@ -276,6 +285,14 @@ def as_dataflow_info_aspect(self) -> DataFlowInfoClass:
externalUrl=self.external_url,
)

@property
def as_subtypes_aspect(self) -> SubTypesClass:
assert isinstance(self.entity, (MSSQLJob, MSSQLProceduresContainer))
type = "Job" if isinstance(self.entity, MSSQLJob) else "Procedures Container"
return SubTypesClass(
typeNames=[type],
)

@property
def as_maybe_platform_instance_aspect(self) -> Optional[DataPlatformInstanceClass]:
if self.entity.platform_instance:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,11 @@ def construct_job_workunits(
aspect=data_job.as_datajob_info_aspect,
).as_workunit()

yield MetadataChangeProposalWrapper(
entityUrn=data_job.urn,
aspect=data_job.as_subtypes_aspect,
).as_workunit()

data_platform_instance_aspect = data_job.as_maybe_platform_instance_aspect
if data_platform_instance_aspect:
yield MetadataChangeProposalWrapper(
Expand Down Expand Up @@ -676,8 +681,6 @@ def construct_job_workunits(
),
).as_workunit()

# TODO: Add SubType when it appear

def construct_flow_workunits(
self,
data_flow: MSSQLDataFlow,
Expand All @@ -687,6 +690,11 @@ def construct_flow_workunits(
aspect=data_flow.as_dataflow_info_aspect,
).as_workunit()

yield MetadataChangeProposalWrapper(
entityUrn=data_flow.urn,
aspect=data_flow.as_subtypes_aspect,
).as_workunit()

data_platform_instance_aspect = data_flow.as_maybe_platform_instance_aspect
if data_platform_instance_aspect:
yield MetadataChangeProposalWrapper(
Expand All @@ -700,8 +708,6 @@ def construct_flow_workunits(
aspect=data_flow.as_container_aspect,
).as_workunit()

# TODO: Add SubType when it appear

def get_inspectors(self) -> Iterable[Inspector]:
# This method can be overridden in the case that you want to dynamically
# run on multiple databases.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,24 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(mssql,Weekly Demo Data Backup,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Job"
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "mssql-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,Weekly Demo Data Backup,PROD),Weekly Demo Data Backup)",
Expand All @@ -112,11 +130,11 @@
"aspect": {
"json": {
"customProperties": {
"job_id": "2fc72675-0c68-4260-ab00-c361b96c8c36",
"job_id": "ae341aad-8ab2-421e-b46b-147afd4b0705",
"job_name": "Weekly Demo Data Backup",
"description": "No description available.",
"date_created": "2025-01-31 08:02:41.167000",
"date_modified": "2025-01-31 08:02:41.360000",
"date_created": "2025-03-04 16:55:50.893000",
"date_modified": "2025-03-04 16:55:51.043000",
"step_id": "1",
"step_name": "Set database to read only",
"subsystem": "TSQL",
Expand All @@ -134,6 +152,24 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,Weekly Demo Data Backup,PROD),Weekly Demo Data Backup)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Job Step"
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "mssql-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,Weekly Demo Data Backup,PROD),Weekly Demo Data Backup)",
Expand Down Expand Up @@ -2266,6 +2302,24 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Procedures Container"
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "mssql-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),Proc.With.SpecialChar)",
Expand All @@ -2279,8 +2333,8 @@
"code": "CREATE PROCEDURE [Foo].[Proc.With.SpecialChar] @ID INT\nAS\n SELECT @ID AS ThatDB;\n",
"input parameters": "['@ID']",
"parameter @ID": "{'type': 'int'}",
"date_created": "2025-01-31 08:02:40.980000",
"date_modified": "2025-01-31 08:02:40.980000"
"date_created": "2025-03-04 16:55:50.720000",
"date_modified": "2025-03-04 16:55:50.720000"
},
"name": "DemoData.Foo.Proc.With.SpecialChar",
"type": {
Expand All @@ -2294,6 +2348,24 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),Proc.With.SpecialChar)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Stored Procedure"
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "mssql-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),Proc.With.SpecialChar)",
Expand Down Expand Up @@ -2329,8 +2401,8 @@
"depending_on_procedure": "{}",
"code": "CREATE PROCEDURE [Foo].[NewProc]\n AS\n BEGIN\n --insert into items table from salesreason table\n insert into Foo.Items (ID, ItemName)\n SELECT TempID, Name\n FROM Foo.SalesReason;\n\n\n IF OBJECT_ID('Foo.age_dist', 'U') IS NULL\n BEGIN\n -- Create and populate if table doesn't exist\n SELECT Age, COUNT(*) as Count\n INTO Foo.age_dist\n FROM Foo.Persons\n GROUP BY Age\n END\n ELSE\n BEGIN\n -- Update existing table\n TRUNCATE TABLE Foo.age_dist;\n\n INSERT INTO Foo.age_dist (Age, Count)\n SELECT Age, COUNT(*) as Count\n FROM Foo.Persons\n GROUP BY Age\n END\n\n SELECT ID, Age INTO #TEMPTABLE FROM NewData.FooNew.PersonsNew\n \n UPDATE DemoData.Foo.Persons\n SET Age = t.Age\n FROM DemoData.Foo.Persons p\n JOIN #TEMPTABLE t ON p.ID = t.ID\n\n END\n",
"input parameters": "[]",
"date_created": "2025-01-31 08:02:40.987000",
"date_modified": "2025-01-31 08:02:40.987000"
"date_created": "2025-03-04 16:55:50.727000",
"date_modified": "2025-03-04 16:55:50.727000"
},
"name": "DemoData.Foo.NewProc",
"type": {
Expand All @@ -2344,6 +2416,24 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),NewProc)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Stored Procedure"
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "mssql-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),NewProc)",
Expand Down Expand Up @@ -4969,7 +5059,7 @@
"actor": "urn:li:corpuser:_ingestion"
},
"lastModified": {
"time": 1738310563767,
"time": 1741107354163,
"actor": "urn:li:corpuser:_ingestion"
}
}
Expand Down Expand Up @@ -5092,7 +5182,7 @@
"actor": "urn:li:corpuser:_ingestion"
},
"lastModified": {
"time": 1738310563770,
"time": 1741107354165,
"actor": "urn:li:corpuser:_ingestion"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,24 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(mssql,Weekly Demo Data Backup,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Job"
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "mssql-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,Weekly Demo Data Backup,PROD),Weekly Demo Data Backup)",
Expand All @@ -112,11 +130,11 @@
"aspect": {
"json": {
"customProperties": {
"job_id": "2fc72675-0c68-4260-ab00-c361b96c8c36",
"job_id": "ae341aad-8ab2-421e-b46b-147afd4b0705",
"job_name": "Weekly Demo Data Backup",
"description": "No description available.",
"date_created": "2025-01-31 08:02:41.167000",
"date_modified": "2025-01-31 08:02:41.360000",
"date_created": "2025-03-04 16:55:50.893000",
"date_modified": "2025-03-04 16:55:51.043000",
"step_id": "1",
"step_name": "Set database to read only",
"subsystem": "TSQL",
Expand All @@ -134,6 +152,24 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,Weekly Demo Data Backup,PROD),Weekly Demo Data Backup)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Job Step"
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "mssql-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,Weekly Demo Data Backup,PROD),Weekly Demo Data Backup)",
Expand Down Expand Up @@ -2266,6 +2302,24 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Procedures Container"
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "mssql-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),Proc.With.SpecialChar)",
Expand All @@ -2279,8 +2333,8 @@
"code": "CREATE PROCEDURE [Foo].[Proc.With.SpecialChar] @ID INT\nAS\n SELECT @ID AS ThatDB;\n",
"input parameters": "['@ID']",
"parameter @ID": "{'type': 'int'}",
"date_created": "2025-01-31 08:02:40.980000",
"date_modified": "2025-01-31 08:02:40.980000"
"date_created": "2025-03-04 16:55:50.720000",
"date_modified": "2025-03-04 16:55:50.720000"
},
"name": "DemoData.Foo.Proc.With.SpecialChar",
"type": {
Expand All @@ -2294,6 +2348,24 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),Proc.With.SpecialChar)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"Stored Procedure"
]
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "mssql-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),Proc.With.SpecialChar)",
Expand Down Expand Up @@ -2694,7 +2766,7 @@
"actor": "urn:li:corpuser:_ingestion"
},
"lastModified": {
"time": 1738310565884,
"time": 1741107356597,
"actor": "urn:li:corpuser:_ingestion"
}
}
Expand Down
Loading

0 comments on commit db089a1

Please sign in to comment.