Skip to content

satokiyo/beam-postgres-connector

Repository files navigation

Beam - Postgres connector

PyPI Supported Versions License: MIT

Beam - Postgres Connector provides an io connector for PostgreSQL read/write in Apache Beam pipelines.

Installation

pip install beam-postgres-connector

Usage

  • Read From PostgreSQL
import apache_beam as beam
from beam_postgres import splitters
from beam_postgres.io import ReadFromPostgres

with beam.Pipeline(options=options) as p:
    read_from_postgres = ReadFromPostgres(
            query="SELECT * FROM test_db.test.test;",
            host="localhost",
            database="test_db",
            user="test",
            password="test",
            port=5432,
            splitter=splitters.NoSplitter()  # you can select how to split query for performance
    )


    (
        p
        | "ReadFromPostgres" >> read_from_postgres
        | "WriteToStdout" >> beam.Map(print)
    )
  • Write To Postgres
import apache_beam as beam
from beam_postgres.io import WriteToPostgres


with beam.Pipeline(options=options) as p:
    write_to_postgres = WriteToPostgres(
            host="localhost",
            database="test_db",
            table="test.test",
            user="test",
            password="test",
            port=5432,
            batch_size=1000,
    )

    (
        p
        | "ReadFromInMemory"
        >> beam.Create(
            [
                {
                    "name": "test data",
                }
            ]
        )
        | "WriteToPostgres" >> write_to_postgres
    )

See here for more examples.

splitters

  • NoSplitter

    Do not split the query

  • QuerySplitter

    Split the query by a specified separator string and distribute it for parallel processing across multiple nodes. Specify non-overlapping ranges for each query in the WHERE clause. The processing results will be combined using UNION ALL. This feature is also effective when dealing with a data volume that cannot fit in the RAM of a single node. (Which would result in an error in Dataflow)

License

MIT License.