Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark POC #24

Open
mattiasthalen opened this issue Jan 20, 2025 · 0 comments
Open

Spark POC #24

mattiasthalen opened this issue Jan 20, 2025 · 0 comments
Labels
enhancement New feature or request

Comments

@mattiasthalen
Copy link
Owner

mattiasthalen commented Jan 20, 2025

Requirements:

pip install "pyspark[sql]>=3.4" pandas pyarrow
pip install "apache-iceberg>=0.4.0"

SQLMesh config:

gateways:
  local:
    connection:
      type: spark
      config:
        spark.jars.packages: "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2"
        spark.sql.extensions: "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
        spark.sql.catalog.spark_catalog: "org.apache.iceberg.spark.SparkSessionCatalog"
        spark.sql.catalog.spark_catalog.type: "hive"
        spark.sql.catalog.local: "org.apache.iceberg.spark.SparkCatalog"
        spark.sql.catalog.local.type: "hadoop"
        spark.sql.catalog.local.warehouse: "./warehouse"

    state_connection:
      type: duckdb
      catalogs:
        persistent: './warehouse/state.duckdb'
        ephemeral: ':memory:'

Python model example:

import typing as t
from datetime import datetime

import pandas as pd
from pyspark.sql import DataFrame, functions

from sqlmesh import ExecutionContext, model

@model(
    "docs_example.pyspark",
    columns={
        "id": "int",
        "name": "text",
        "country": "text",
    },
)
def execute(
    context: ExecutionContext,
    start: datetime,
    end: datetime,
    execution_time: datetime,
    **kwargs: t.Any,
) -> DataFrame:
    # get the upstream model's name and register it as a dependency
    table = context.resolve_table("upstream_model")

    # use the spark DataFrame api to add the country column
    df = context.spark.table(table).withColumn("country", functions.lit("USA"))

    # returns the pyspark DataFrame directly, so no data is computed locally
    return df
@mattiasthalen mattiasthalen added the enhancement New feature or request label Jan 20, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant