-
Notifications
You must be signed in to change notification settings - Fork 3.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix(smoke-test) add smoke test for restli ingestProposalBatch (#12404)
- Loading branch information
1 parent
eca9684
commit 8195f80
Showing
1 changed file
with
135 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
import time | ||
from typing import List | ||
|
||
import pytest | ||
|
||
from datahub.emitter.mce_builder import make_dashboard_urn | ||
from datahub.emitter.mcp import MetadataChangeProposalWrapper | ||
from datahub.emitter.serialization_helper import pre_json_transform | ||
from datahub.metadata._schema_classes import MetadataChangeProposalClass | ||
from datahub.metadata.schema_classes import ( | ||
AuditStampClass, | ||
ChangeAuditStampsClass, | ||
DashboardInfoClass, | ||
) | ||
from tests.consistency_utils import wait_for_writes_to_sync | ||
from tests.restli.restli_test import MetadataChangeProposalInvalidWrapper | ||
from tests.utils import delete_urns | ||
|
||
generated_urns: List[str] = [] | ||
|
||
|
||
@pytest.fixture(scope="module") | ||
def ingest_cleanup_data(auth_session, graph_client, request): | ||
yield | ||
delete_urns(graph_client, generated_urns) | ||
|
||
|
||
def _create_valid_dashboard_mcps() -> List[MetadataChangeProposalClass]: | ||
mcps = [] | ||
num_valid_mcp = 5 | ||
|
||
audit_stamp = pre_json_transform( | ||
ChangeAuditStampsClass( | ||
created=AuditStampClass( | ||
time=int(time.time() * 1000), | ||
actor="urn:li:corpuser:datahub", | ||
) | ||
).to_obj() | ||
) | ||
|
||
valid_dashboard_info = DashboardInfoClass( | ||
title="Dummy Title For Testing", | ||
description="Dummy Description For Testing", | ||
lastModified=audit_stamp, | ||
) | ||
|
||
for i in range(num_valid_mcp): | ||
mcp_valid = MetadataChangeProposalWrapper( | ||
entityUrn=make_dashboard_urn( | ||
platform="looker", name=f"dummy-test-invalid-{i}" | ||
), | ||
aspectName="dashboardInfo", | ||
aspect=valid_dashboard_info, | ||
) | ||
mcps.append(mcp_valid.make_mcp()) | ||
generated_urns.extend([mcp.entityUrn for mcp in mcps if mcp.entityUrn]) | ||
|
||
return mcps | ||
|
||
|
||
def _create_invalid_dashboard_mcp() -> MetadataChangeProposalClass: | ||
audit_stamp = pre_json_transform( | ||
ChangeAuditStampsClass( | ||
created=AuditStampClass( | ||
time=int(time.time() * 1000), | ||
actor="urn:li:corpuser:datahub", | ||
) | ||
).to_obj() | ||
) | ||
|
||
invalid_dashboard_info = { | ||
"title": "Dummy Title For Testing", | ||
"description": "Dummy Description For Testing", | ||
"lastModified": audit_stamp, | ||
"notValidField": "invalid field value", | ||
} | ||
|
||
mcp_invalid = MetadataChangeProposalInvalidWrapper( | ||
entityUrn=make_dashboard_urn(platform="looker", name="dummy-test-valid"), | ||
aspectName="dashboardInfo", | ||
aspect=invalid_dashboard_info, | ||
) | ||
generated_urns.append(mcp_invalid.entityUrn) if mcp_invalid.entityUrn else None | ||
return mcp_invalid.make_mcp() | ||
|
||
|
||
def test_restli_batch_ingestion_sync(graph_client): | ||
# Positive Test (all valid MetadataChangeProposal) | ||
mcps = _create_valid_dashboard_mcps() | ||
ret = graph_client.emit_mcps(mcps, async_flag=False) | ||
assert ret >= 0 | ||
|
||
# Negative Test (contains invalid MetadataChangeProposal) | ||
invalid_mcp = _create_invalid_dashboard_mcp() | ||
mcps.append(invalid_mcp) | ||
ret = graph_client.emit_mcps(mcps, async_flag=False) | ||
assert ret >= 0 | ||
|
||
# Expected that invalid field of MetadataChangeProposal is ignored, | ||
# Rest Fields are persistd into DB | ||
aspect = graph_client.get_aspect( | ||
entity_urn=invalid_mcp.entityUrn, aspect_type=DashboardInfoClass | ||
) | ||
|
||
assert aspect is not None | ||
assert isinstance(aspect, DashboardInfoClass) | ||
assert aspect.title == "Dummy Title For Testing" | ||
assert aspect.description == "Dummy Description For Testing" | ||
assert aspect.lastModified is not None | ||
|
||
|
||
def test_restli_batch_ingestion_async(graph_client): | ||
# Positive Test (all valid MetadataChangeProposal) | ||
mcps = _create_valid_dashboard_mcps() | ||
ret = graph_client.emit_mcps(mcps, async_flag=True) | ||
assert ret >= 0 | ||
|
||
# Negative Test (contains invalid MetadataChangeProposal) | ||
invalid_mcp = _create_invalid_dashboard_mcp() | ||
mcps.append(invalid_mcp) | ||
ret = graph_client.emit_mcps(mcps, async_flag=True) | ||
assert ret >= 0 | ||
|
||
# Expected that invalid field of MetadataChangeProposal is ignored, | ||
# Rest Fields are persistd into DB | ||
wait_for_writes_to_sync() | ||
aspect = graph_client.get_aspect( | ||
entity_urn=invalid_mcp.entityUrn, aspect_type=DashboardInfoClass | ||
) | ||
|
||
assert aspect is not None | ||
assert isinstance(aspect, DashboardInfoClass) | ||
assert aspect.title == "Dummy Title For Testing" | ||
assert aspect.description == "Dummy Description For Testing" | ||
assert aspect.lastModified is not None |