Skip to content

Commit

Permalink
Add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
pankajastro committed Jan 24, 2025
1 parent 8987ef1 commit bed60ca
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 1 deletion.
2 changes: 1 addition & 1 deletion cosmos/operators/_asynchronous/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def drop_table_sql(self) -> None:
hook.insert_job(configuration=self.configuration, location=self.location, project_id=self.gcp_project)

def execute(self, context: Context) -> Any | None:
print("hello from bigquery")

if not self.full_refresh:
raise CosmosValueError("The async execution only supported for full_refresh")

Check warning on line 89 in cosmos/operators/_asynchronous/bigquery.py

View check run for this annotation

Codecov / codecov/patch

cosmos/operators/_asynchronous/bigquery.py#L89

Added line #L89 was not covered by tests
else:
Expand Down
3 changes: 3 additions & 0 deletions cosmos/operators/_asynchronous/databricks.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# pragma: no cover
# TODO: Implement it

from typing import Any

Check warning on line 4 in cosmos/operators/_asynchronous/databricks.py

View check run for this annotation

Codecov / codecov/patch

cosmos/operators/_asynchronous/databricks.py#L4

Added line #L4 was not covered by tests

from airflow.models.baseoperator import BaseOperator
Expand Down
Empty file.
75 changes: 75 additions & 0 deletions tests/operators/_asynchronous/test_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import json
from unittest.mock import patch

import pytest
from airflow.models.connection import Connection

from cosmos import ProfileConfig
from cosmos.operators._asynchronous.base import DbtRunAirflowAsyncFactoryOperator, _create_async_operator_class
from cosmos.operators._asynchronous.bigquery import DbtRunAirflowAsyncBigqueryOperator
from cosmos.operators.local import DbtRunLocalOperator
from cosmos.profiles import get_automatic_profile_mapping


@pytest.fixture()
def mock_bigquery_conn(): # type: ignore
"""
Mocks and returns an Airflow BigQuery connection.
"""
extra = {
"project": "my_project",
"key_path": "my_key_path.json",
}
conn = Connection(
conn_id="my_bigquery_connection",
conn_type="google_cloud_platform",
extra=json.dumps(extra),
)

with patch("airflow.hooks.base.BaseHook.get_connection", return_value=conn):
yield conn


@pytest.mark.parametrize(
"profile_type, dbt_class, expected_operator_class",
[
("bigquery", "DbtRun", DbtRunAirflowAsyncBigqueryOperator),
("snowflake", "DbtRun", DbtRunLocalOperator),
("bigquery", "DbtTest", DbtRunLocalOperator),
],
)
def test_create_async_operator_class_success(profile_type, dbt_class, expected_operator_class):
"""Test the successful loading of the async operator class."""

operator_class = _create_async_operator_class(profile_type, dbt_class)

assert operator_class == expected_operator_class


@patch("cosmos.operators._asynchronous.bigquery.DbtRunAirflowAsyncBigqueryOperator.drop_table_sql")
@patch("cosmos.operators._asynchronous.bigquery.DbtRunAirflowAsyncBigqueryOperator.get_remote_sql")
@patch("cosmos.operators._asynchronous.bigquery.BigQueryInsertJobOperator.execute")
def test_factory_async_class(mock_execute, get_remote_sql, drop_table_sql, mock_bigquery_conn):
profile_mapping = get_automatic_profile_mapping(
mock_bigquery_conn.conn_id,
profile_args={
"dataset": "my_dataset",
},
)
bigquery_profile_config = ProfileConfig(
profile_name="my_profile", target_name="dev", profile_mapping=profile_mapping
)
factory_class = DbtRunAirflowAsyncFactoryOperator(
task_id="run",
project_dir="/tmp",
profile_config=bigquery_profile_config,
full_refresh=True,
extra_context={"dbt_node_config": {"resource_name": "customer"}},
)

async_operator = factory_class.create_async_operator()
assert async_operator == DbtRunAirflowAsyncBigqueryOperator

factory_class.execute(context={})

mock_execute.assert_called_once_with({})

0 comments on commit bed60ca

Please sign in to comment.