Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add structure to support multiple db for async operator execution #1483

Merged
merged 18 commits into from
Jan 27, 2025

Conversation

pankajastro
Copy link
Contributor

@pankajastro pankajastro commented Jan 23, 2025

Pluggable Async Operator Interface

This PR enhances the initial async operator support in Cosmos,
as introduced in PR #1230. The changes decouple the DbtRunAirflowAsyncOperator
from BigQueryInsertJobOperator, making it more flexible and allowing
support for async operators with other data sources in the future

Introducing the DbtRunAirflowAsyncFactoryOperator class,
which dynamically selects the parent class containing
the async operator implementation based on dbt profile.

I’ve added a template for implementing the Databricks async operator.

ATM moved async operator-related code at path cosmos/operators/_async/, but open for suggestion
After discussing this with the team, I have moved async operator-related code at path cosmos/operators/_asynchronous/

Design principle

Introduced the DbtRunAirflowAsyncFactoryOperator class that uses a Factory Method design pattern combined with dynamic inheritance at runtime.

  • Factory Method: The create_async_operator() method generates a specific async operator class based on the profile_config provided. This allows the operator to adapt to different types of async operator at runtime.
  • Dynamic Inheritance: The class dynamically changes its base class (bases) to use the async operator class created in the factory method. This ensures the correct async class is used during execution.
  • Execution: The execute() method calls the super().execute() to trigger the execution logic, but it dynamically uses the appropriate operator class for async behavior.

Class hierarchy

                                    BigQueryInsertJobOperator
                                               |
DbtRunLocalOperator  DbtRunAirflowAsyncBigqueryOperator DbtRunAirflowAsyncDatabricksOperator (inject these parent class at runtime)
              \              /
             DbtRunFactoryAirflowAsyncOperator
                         |
            DbtRunAirflowAsyncOperator

How to add a new async db operator

  • Implement the operator at the path cosmos/operators/_asynchronous/
  • The operator module should be in the format of: cosmos.operators._asynchronous.{profile_type}.{dbt_class}{_snake_case_to_camelcase(execution_mode)}{profile_type.capitalize()}Operator
  • For details example, I have added a dummy implementation for Databricks

Example DAG

import os
from datetime import datetime
from pathlib import Path

from cosmos import DbtDag, ExecutionConfig, ExecutionMode, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.profiles import GoogleCloudServiceAccountDictProfileMapping

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))

profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=GoogleCloudServiceAccountDictProfileMapping(
        conn_id="gcp_gs_conn", profile_args={"dataset": "release_17", "project": "astronomer-dag-authoring"}
    ),
)


# [START airflow_async_execution_mode_example]
simple_dag_async = DbtDag(
    # dbt/cosmos-specific parameters
    project_config=ProjectConfig(
        DBT_ROOT_PATH / "original_jaffle_shop",
    ),
    profile_config=profile_config,
    execution_config=ExecutionConfig(
        execution_mode=ExecutionMode.AIRFLOW_ASYNC,
    ),
    render_config=RenderConfig(
        select=["path:models"],
        # test_behavior=TestBehavior.NONE
    ),
    # normal dag parameters
    schedule_interval=None,
    start_date=datetime(2023, 1, 1),
    catchup=False,
    dag_id="simple_dag_async",
    tags=["simple"],
    operator_args={"full_refresh": True, "location": "northamerica-northeast1"},
)
# [END airflow_async_execution_mode_example]
Screenshot 2025-01-23 at 3 30 48 PM

Graph View
Screenshot 2025-01-27 at 3 41 14 PM

closes: #1238
closes: #1239

Copy link

netlify bot commented Jan 23, 2025

Deploy Preview for sunny-pastelito-5ecb04 canceled.

Name Link
🔨 Latest commit fdd1f8c
🔍 Latest deploy log https://app.netlify.com/sites/sunny-pastelito-5ecb04/deploys/679764f9a6a7d70008924597

Copy link

cloudflare-workers-and-pages bot commented Jan 23, 2025

Deploying astronomer-cosmos with  Cloudflare Pages  Cloudflare Pages

Latest commit: fdd1f8c
Status: ✅  Deploy successful!
Preview URL: https://97584574.astronomer-cosmos.pages.dev
Branch Preview URL: https://async-execution-2.astronomer-cosmos.pages.dev

View logs

Copy link

codecov bot commented Jan 23, 2025

Codecov Report

Attention: Patch coverage is 95.00000% with 5 lines in your changes missing coverage. Please review.

Project coverage is 97.03%. Comparing base (2b99777) to head (fdd1f8c).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
cosmos/operators/_asynchronous/bigquery.py 90.56% 5 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1483      +/-   ##
==========================================
+ Coverage   96.62%   97.03%   +0.41%     
==========================================
  Files          74       77       +3     
  Lines        4413     4456      +43     
==========================================
+ Hits         4264     4324      +60     
+ Misses        149      132      -17     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Collaborator

@tatiana tatiana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @pankajastro , really exciting to see progress on this.

I understand the PR is still under draft, just giving some early feedback, as you requested it.

cosmos/operators/_async/__init__.py Outdated Show resolved Hide resolved
cosmos/operators/_async/base.py Outdated Show resolved Hide resolved
@pankajastro pankajastro marked this pull request as ready for review January 24, 2025 15:21
@dosubot dosubot bot added size:L This PR changes 100-499 lines, ignoring generated files. area:execution Related to the execution environment/mode, like Docker, Kubernetes, Local, VirtualEnv, etc area:profile Related to ProfileConfig, like Athena, BigQuery, Clickhouse, Spark, Trino, etc profile:databricks Related to Databricks ProfileConfig labels Jan 24, 2025
@tatiana tatiana added this to the Cosmos 1.9.0 milestone Jan 24, 2025
Copy link
Collaborator

@tatiana tatiana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent work, @pankajastro. Great idea and implementation on dealing with the inheritance issues we've faced before.

Please, if possible, add a screenshot of the graph view in Airflow.

@dosubot dosubot bot added the lgtm This PR has been approved by a maintainer label Jan 27, 2025
@dosubot dosubot bot added size:XL This PR changes 500-999 lines, ignoring generated files. and removed size:L This PR changes 100-499 lines, ignoring generated files. labels Jan 27, 2025
@tatiana tatiana added the priority:high High priority issues are blocking or critical issues without a workaround and large impact label Jan 27, 2025
@pankajastro pankajastro merged commit bdc8746 into main Jan 27, 2025
68 of 69 checks passed
@pankajastro pankajastro deleted the async_execution_2 branch January 27, 2025 11:29
pankajkoti added a commit that referenced this pull request Feb 5, 2025
…LOW_ASYNC` (#1474)

# Overview

This PR introduces a reliable way to extract SQL statements run by
`dbt-core` so Airflow asynchronous operators can use them. It fixes the
experimental BQ implementation of `ExecutionMode.AIRFLOW_ASYNC`
introduced in Cosmos 1.7 (#1230).

Previously, in #1230, we attempted to understand the implementation of
how `dbt-core` runs `--full-refresh` for BQ, and we hard-coded the SQL
header in Cosmos as an experimental feature. Since then, we realised
that this approach was prone to errors (e.g. #1260) and that it is
unrealistic for Cosmos to try to recreate the logic of how `dbt-core`
and its adaptors generate all the SQL statements for different
operations, data warehouses, and types of materialisation.

With this PR, we use `dbt-core` to create the complete SQL statements
without `dbt-core` running those transformations. This enables better
compatibility with various `dbt-core` features while ensuring
correctness in running models.

The drawback of the current approach is that it relies on monkey
patching, a technique used to dynamically update the behaviour of a
piece of code at run-time. Cosmos is monkey patching `dbt-core` adaptors
methods at the moment that they would generally execute SQL statements -
Cosmos modifies this behaviour so that the SQL statements are writen to
disk without performing any operations to the actual data warehouse.

The main drawback of this strategy is in case dbt changes its interface.
For this reason, we logged the follow-up ticket
#1489 to make sure
we test the latest version of dbt and its adapters and confirm the
monkey patching works as expected regardless of the version being used.
That said, since the method being monkey patched is part of the
`dbt-core` interface with its adaptors, we believe the risks of breaking
changes will be low.

The other challenge with the current approach is that every Cosmos task
relies on the following:
1. `dbt-core` being installed alongside the Airflow installation
2. the execution of a significant part of the `dbtRunner` logic

We have logged a follow-up ticket to evaluate the possibility of
overcoming these challenges: #1477

## Key Changes

1. Mocked BigQuery Adapter Execution:
- Introduced `_mock_bigquery_adapter()` to override
`BigQueryConnectionManager.execute`, ensuring SQL is only written to the
`target` directory and skipping execution in the warehouse.
- The generated SQL is then submitted using Airflow’s
BigQueryInsertJobOperator in deferrable mode.
4. Refactoring `AbstractDbtBaseOperator`:
- Previously, `AbstractDbtBaseOperator` inherited `BaseOperator`,
causing conflicts when used with `BigQueryInsertJobOperator` with
our`EXECUTIONMODE.AIRFLOW_ASYNC` classes and the interface built in
#1483
- Refactored to `AbstractDbtBase` (no longer inheriting `BaseOperator`),
requiring explicit `BaseOperator` initialization in all derived
operators.
- Updated the below existing operators to consider this refactoring
needing derived classes to initialise `BaseOperator`:
        - `DbtAzureContainerInstanceBaseOperator`
        - `DbtDockerBaseOperator`
        - `DbtGcpCloudRunJobBaseOperator`
        - `DbtKubernetesBaseOperator`
5. Changes to dbt Compilation Workflow
- Removed `_add_dbt_compile_task`, which previously pre-generated SQL
and uploaded it to remote storage and subsequent task downloaded this
compiled SQL for their execution.
- Instead, `dbt run` is now directly invoked in each task using the
mocked adapter to generate the full SQL.
- A future
[issue](#1477)
will assess whether we should reintroduce a compile task using the
mocked adapter for SQL generation and upload, reducing redundant dbt
calls in each task.

## Issue updates
The PR fixes the following issues:
1. closes: #1260 
- Previously, we only supported --full-refresh dbt run with static SQL
headers (e.g., CREATE/DROP TABLE).
- Now, we support dynamic SQL headers based on materializations,
including CREATE OR REPLACE TABLE, CREATE OR REPLACE VIEW, etc.
2. closes: #1271 
- dbt macros are evaluated at runtime during dbt run invocation using
mocked adapter, and this PR lays the groundwork for supporting them in
async execution mode.
3. closes: #1265 
- Now, large datasets can avoid full drops and recreations, enabling
incremental model updates.
6. closes: #1261 
- Previously, only tables (--full-refresh) were supported; this PR
implements logic for handling different materializations that dbt
supports like table, view, incremental, ephemeral, and materialized
views.
7. closes: #1266 
- Instead of relying on dbt compile (which only outputs SELECT
statements), we now let dbt generate complete SQL queries, including SQL
headers/DDL statements for the queries corresponding to the resource
nodes and state of tables/views in the backend warehouse
8. closes: #1264 
- We support emitting datasets for `EXECUTIONMODE.AIRFLOW_ASYNC` too
with this PR

## Example DAG showing `EXECUTIONMODE.AIRFLOW_ASYNC` deferring tasks and
the dynamic query submitted in the logs

<img width="1532" alt="Screenshot 2025-02-04 at 1 02 42 PM"
src="https://github.com/user-attachments/assets/baf15864-9bf8-4f35-95b7-954a1f547bfe"
/>


## Next Steps & Considerations:
- It's acknowledged that using mock patching may have downsides,
however, this currently seems the best approach to achieve our goals.
It's understood and accepted the risks associated with this method. To
mitigate them, we are expanding our test coverage to include all
currently supported dbt adapter versions in our test matrix in #1489.
This will ensure compatibility across different dbt versions and helps
us catch potential issues early.
- Further validation of different dbt macros and materializations with
`ExecutionMode.AIRFLOW_ASYNC` by seeking feedback from users by testing
alpha
https://github.com/astronomer/astronomer-cosmos/releases/tag/astronomer-cosmos-v1.9.0a5
created with changes from this PR.
- #1477, Compare
the efficiency of generating SQL dynamically vs. pre-compiling and
uploading SQL via a separate task.
- Add compatibility across all major cloud datawarehouse backends (dbt
adapters).

---------

Co-authored-by: Tatiana Al-Chueyr <tatiana.alchueyr@gmail.com>
Co-authored-by: Pankaj Singh <98807258+pankajastro@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:execution Related to the execution environment/mode, like Docker, Kubernetes, Local, VirtualEnv, etc area:profile Related to ProfileConfig, like Athena, BigQuery, Clickhouse, Spark, Trino, etc lgtm This PR has been approved by a maintainer priority:high High priority issues are blocking or critical issues without a workaround and large impact profile:databricks Related to Databricks ProfileConfig size:XL This PR changes 500-999 lines, ignoring generated files.
Projects
None yet
2 participants