Beam - Postgres Connector provides an io connector for PostgreSQL read/write in Apache Beam pipelines.
pip install beam-postgres-connector
- Read From PostgreSQL
import apache_beam as beam
from beam_postgres import splitters
from beam_postgres.io import ReadFromPostgres
with beam.Pipeline(options=options) as p:
read_from_postgres = ReadFromPostgres(
query="SELECT * FROM test_db.test.test;",
host="localhost",
database="test_db",
user="test",
password="test",
port=5432,
splitter=splitters.NoSplitter() # you can select how to split query for performance
)
(
p
| "ReadFromPostgres" >> read_from_postgres
| "WriteToStdout" >> beam.Map(print)
)
- Write To Postgres
import apache_beam as beam
from beam_postgres.io import WriteToPostgres
with beam.Pipeline(options=options) as p:
write_to_postgres = WriteToPostgres(
host="localhost",
database="test_db",
table="test.test",
user="test",
password="test",
port=5432,
batch_size=1000,
)
(
p
| "ReadFromInMemory"
>> beam.Create(
[
{
"name": "test data",
}
]
)
| "WriteToPostgres" >> write_to_postgres
)
See here for more examples.
-
NoSplitter
Do not split the query
-
QuerySplitter
Split the query by a specified separator string and distribute it for parallel processing across multiple nodes. Specify non-overlapping ranges for each query in the WHERE clause. The processing results will be combined using UNION ALL. This feature is also effective when dealing with a data volume that cannot fit in the RAM of a single node. (Which would result in an error in Dataflow)
MIT License.