-
Notifications
You must be signed in to change notification settings - Fork 188
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add structure to support multiple db for async operator execution #1483
Conversation
✅ Deploy Preview for sunny-pastelito-5ecb04 canceled.
|
Deploying astronomer-cosmos with
|
Latest commit: |
fdd1f8c
|
Status: | ✅ Deploy successful! |
Preview URL: | https://97584574.astronomer-cosmos.pages.dev |
Branch Preview URL: | https://async-execution-2.astronomer-cosmos.pages.dev |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1483 +/- ##
==========================================
+ Coverage 96.62% 97.03% +0.41%
==========================================
Files 74 77 +3
Lines 4413 4456 +43
==========================================
+ Hits 4264 4324 +60
+ Misses 149 132 -17 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @pankajastro , really exciting to see progress on this.
I understand the PR is still under draft, just giving some early feedback, as you requested it.
8e6060c
to
a04da86
Compare
62793ce
to
bed60ca
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent work, @pankajastro. Great idea and implementation on dealing with the inheritance issues we've faced before.
Please, if possible, add a screenshot of the graph view in Airflow.
…LOW_ASYNC` (#1474) # Overview This PR introduces a reliable way to extract SQL statements run by `dbt-core` so Airflow asynchronous operators can use them. It fixes the experimental BQ implementation of `ExecutionMode.AIRFLOW_ASYNC` introduced in Cosmos 1.7 (#1230). Previously, in #1230, we attempted to understand the implementation of how `dbt-core` runs `--full-refresh` for BQ, and we hard-coded the SQL header in Cosmos as an experimental feature. Since then, we realised that this approach was prone to errors (e.g. #1260) and that it is unrealistic for Cosmos to try to recreate the logic of how `dbt-core` and its adaptors generate all the SQL statements for different operations, data warehouses, and types of materialisation. With this PR, we use `dbt-core` to create the complete SQL statements without `dbt-core` running those transformations. This enables better compatibility with various `dbt-core` features while ensuring correctness in running models. The drawback of the current approach is that it relies on monkey patching, a technique used to dynamically update the behaviour of a piece of code at run-time. Cosmos is monkey patching `dbt-core` adaptors methods at the moment that they would generally execute SQL statements - Cosmos modifies this behaviour so that the SQL statements are writen to disk without performing any operations to the actual data warehouse. The main drawback of this strategy is in case dbt changes its interface. For this reason, we logged the follow-up ticket #1489 to make sure we test the latest version of dbt and its adapters and confirm the monkey patching works as expected regardless of the version being used. That said, since the method being monkey patched is part of the `dbt-core` interface with its adaptors, we believe the risks of breaking changes will be low. The other challenge with the current approach is that every Cosmos task relies on the following: 1. `dbt-core` being installed alongside the Airflow installation 2. the execution of a significant part of the `dbtRunner` logic We have logged a follow-up ticket to evaluate the possibility of overcoming these challenges: #1477 ## Key Changes 1. Mocked BigQuery Adapter Execution: - Introduced `_mock_bigquery_adapter()` to override `BigQueryConnectionManager.execute`, ensuring SQL is only written to the `target` directory and skipping execution in the warehouse. - The generated SQL is then submitted using Airflow’s BigQueryInsertJobOperator in deferrable mode. 4. Refactoring `AbstractDbtBaseOperator`: - Previously, `AbstractDbtBaseOperator` inherited `BaseOperator`, causing conflicts when used with `BigQueryInsertJobOperator` with our`EXECUTIONMODE.AIRFLOW_ASYNC` classes and the interface built in #1483 - Refactored to `AbstractDbtBase` (no longer inheriting `BaseOperator`), requiring explicit `BaseOperator` initialization in all derived operators. - Updated the below existing operators to consider this refactoring needing derived classes to initialise `BaseOperator`: - `DbtAzureContainerInstanceBaseOperator` - `DbtDockerBaseOperator` - `DbtGcpCloudRunJobBaseOperator` - `DbtKubernetesBaseOperator` 5. Changes to dbt Compilation Workflow - Removed `_add_dbt_compile_task`, which previously pre-generated SQL and uploaded it to remote storage and subsequent task downloaded this compiled SQL for their execution. - Instead, `dbt run` is now directly invoked in each task using the mocked adapter to generate the full SQL. - A future [issue](#1477) will assess whether we should reintroduce a compile task using the mocked adapter for SQL generation and upload, reducing redundant dbt calls in each task. ## Issue updates The PR fixes the following issues: 1. closes: #1260 - Previously, we only supported --full-refresh dbt run with static SQL headers (e.g., CREATE/DROP TABLE). - Now, we support dynamic SQL headers based on materializations, including CREATE OR REPLACE TABLE, CREATE OR REPLACE VIEW, etc. 2. closes: #1271 - dbt macros are evaluated at runtime during dbt run invocation using mocked adapter, and this PR lays the groundwork for supporting them in async execution mode. 3. closes: #1265 - Now, large datasets can avoid full drops and recreations, enabling incremental model updates. 6. closes: #1261 - Previously, only tables (--full-refresh) were supported; this PR implements logic for handling different materializations that dbt supports like table, view, incremental, ephemeral, and materialized views. 7. closes: #1266 - Instead of relying on dbt compile (which only outputs SELECT statements), we now let dbt generate complete SQL queries, including SQL headers/DDL statements for the queries corresponding to the resource nodes and state of tables/views in the backend warehouse 8. closes: #1264 - We support emitting datasets for `EXECUTIONMODE.AIRFLOW_ASYNC` too with this PR ## Example DAG showing `EXECUTIONMODE.AIRFLOW_ASYNC` deferring tasks and the dynamic query submitted in the logs <img width="1532" alt="Screenshot 2025-02-04 at 1 02 42 PM" src="https://github.com/user-attachments/assets/baf15864-9bf8-4f35-95b7-954a1f547bfe" /> ## Next Steps & Considerations: - It's acknowledged that using mock patching may have downsides, however, this currently seems the best approach to achieve our goals. It's understood and accepted the risks associated with this method. To mitigate them, we are expanding our test coverage to include all currently supported dbt adapter versions in our test matrix in #1489. This will ensure compatibility across different dbt versions and helps us catch potential issues early. - Further validation of different dbt macros and materializations with `ExecutionMode.AIRFLOW_ASYNC` by seeking feedback from users by testing alpha https://github.com/astronomer/astronomer-cosmos/releases/tag/astronomer-cosmos-v1.9.0a5 created with changes from this PR. - #1477, Compare the efficiency of generating SQL dynamically vs. pre-compiling and uploading SQL via a separate task. - Add compatibility across all major cloud datawarehouse backends (dbt adapters). --------- Co-authored-by: Tatiana Al-Chueyr <tatiana.alchueyr@gmail.com> Co-authored-by: Pankaj Singh <98807258+pankajastro@users.noreply.github.com>
Pluggable Async Operator Interface
This PR enhances the initial async operator support in Cosmos,
as introduced in PR #1230. The changes decouple the DbtRunAirflowAsyncOperator
from BigQueryInsertJobOperator, making it more flexible and allowing
support for async operators with other data sources in the future
Introducing the DbtRunAirflowAsyncFactoryOperator class,
which dynamically selects the parent class containing
the async operator implementation based on dbt profile.
I’ve added a template for implementing the Databricks async operator.
ATM moved async operator-related code at pathcosmos/operators/_async/
, but open for suggestionAfter discussing this with the team, I have moved async operator-related code at path
cosmos/operators/_asynchronous/
Design principle
Introduced the
DbtRunAirflowAsyncFactoryOperator
class that uses a Factory Method design pattern combined with dynamic inheritance at runtime.create_async_operator()
method generates a specific async operator class based on the profile_config provided. This allows the operator to adapt to different types of async operator at runtime.super().execute()
to trigger the execution logic, but it dynamically uses the appropriate operator class for async behavior.Class hierarchy
How to add a new async db operator
cosmos/operators/_asynchronous/
cosmos.operators._asynchronous.{profile_type}.{dbt_class}{_snake_case_to_camelcase(execution_mode)}{profile_type.capitalize()}Operator
Databricks
Example DAG
Graph View
![Screenshot 2025-01-27 at 3 41 14 PM](https://private-user-images.githubusercontent.com/98807258/406877047-d74a1851-39d9-4575-b9cb-11c286094643.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MzkxMTI5MzAsIm5iZiI6MTczOTExMjYzMCwicGF0aCI6Ii85ODgwNzI1OC80MDY4NzcwNDctZDc0YTE4NTEtMzlkOS00NTc1LWI5Y2ItMTFjMjg2MDk0NjQzLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNTAyMDklMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjUwMjA5VDE0NTAzMFomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPWNiYWQzMWQ4MDM1MTg3YjJmNzhjZGU5YTBlYTI2MmEyMzY2MDYzNzVmOWNhNDljMmMxYTRmNTBlNzNlYTg0OTQmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0In0.PRSBBdx1tnMhMAHDzWH43T7GFzr_ACIY3U26vUivS7k)
closes: #1238
closes: #1239