Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP feat(pglite-sync): multi-shape sync and large refactor #566

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 74 additions & 19 deletions docs/docs/sync.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ await pg.exec(`
`)
```

You can then use the `syncShapeToTable` method to sync a table from Electric:
You can sync data from Electric using either the single table or multi-table API.

### Single Table Sync

Use the `syncShapeToTable` method to sync a single table from Electric:

```ts
const shape = await pg.electric.syncShapeToTable({
Expand All @@ -46,13 +50,47 @@ const shape = await pg.electric.syncShapeToTable({
},
table: 'todo',
primaryKey: ['id'],
shapeKey: 'todo', // or null if the shape state does not need to be persisted
})

// Stop syncing when done
shape.unsubscribe()
```

To stop syncing you can call `unsubscribe` on the shape:
### Multi-Table Sync

The multi-table API ensures transactional consistency across tables by syncing updates that happened in a single transaction in Postgres within a single transaction in PGLite.

Use the `syncShapesToTables` method to sync multiple tables simultaneously:

```ts
shape.unsubscribe()
const sync = await pg.electric.syncShapesToTables({
shapes: {
todos: {
shape: {
url: 'http://localhost:3000/v1/shape',
params: { table: 'todo' },
},
table: 'todo',
primaryKey: ['id'],
},
users: {
shape: {
url: 'http://localhost:3000/v1/shape',
params: { table: 'users' },
},
table: 'users',
primaryKey: ['id'],
},
},
key: 'my-sync', // or null if the sync state does not need to be persisted
onInitialSync: () => {
console.log('Initial sync complete')
},
})

// Stop syncing when done
sync.unsubscribe()
```

There is a full example you can run locally in the [GitHub repository](https://github.com/electric-sql/pglite/tree/main/packages/pglite-sync/example).
Expand Down Expand Up @@ -94,18 +132,6 @@ It takes the following options as an object:
- `useCopy: boolean`<br>
Whether to use the `COPY FROM` command to insert the initial data, defaults to `false`. This process may be faster than inserting row by row as it combines the inserts into a CSV to be passed to Postgres.

- `commitGranularity: CommitGranularity`<br>
The granularity of the commit operation, defaults to `"up-to-date"`. Note that a commit will always be performed immediately on the `up-to-date` message.
Options:

- `"up-to-date"`: Commit all messages when the `up-to-date` message is received.
<!-- - `"transaction"`: Commit all messages within transactions as they were applied to the source Postgres. -->
- `"operation"`: Commit each message in its own transaction.
- `number`: Commit every N messages.

- `commitThrottle: number`<br>
The number of milliseconds to wait between commits, defaults to `0`.

- `onInitialSync: () => void`<br>
A callback that is called when the initial sync is complete.

Expand All @@ -126,12 +152,41 @@ The returned `shape` object from the `syncShapeToTable` call has the following m
- `stream: ShapeStream`<br>
The underlying `ShapeStream` instance, see the [ShapeStream API](https://electric-sql.com/docs/api/clients/typescript#shapestream) for more details.

## syncShapesToTables API

The `syncShapesToTables` API allows syncing multiple shapes into multiple tables simultaneously while maintaining transactional consistency. It takes the following options:

- `shapes: Record<string, ShapeOptions>`<br>
An object mapping shape names to their configuration options. Each shape configuration includes:

- `shape: ShapeStreamOptions` - The shape stream specification
- `table: string` - The target table name
- `schema?: string` - Optional schema name (defaults to "public")
- `mapColumns?: MapColumns` - Optional column mapping
- `primaryKey: string[]` - Array of primary key columns

- `key: string | null`<br>
Identifier for the multi-shape subscription. If provided, sync state will be persisted to allow resuming between sessions.

- `useCopy?: boolean`<br>
Whether to use `COPY FROM` for faster initial data loading (defaults to false).

- `onInitialSync?: () => void`<br>
Optional callback that fires when initial sync is complete for all shapes.

The returned sync object provides:

- `isUpToDate: boolean`<br>
Whether all shapes have caught up to the main Postgres.

- `streams: Record<string, ShapeStream>`<br>
Access to individual shape streams by their names.

- `unsubscribe()`<br>
Stop syncing all shapes.

## Limitations

- It is currently not possible to sync multiple shapes into the same table, as shape subscriptions require being able to drop all data and start over. We are working on a fix for this case, but the current version will throw if a shape is synced into the same table more than once.

- In order to maintain transactional consistency, data is aggregated in-memory until we can guarantee its consistency, which might create a lot of memory usage for very large shapes. We are working on resolving this issue, and it is only a problem for initial syncing.

## Sync using legacy Electric

Prior to the development of the new sync engine, the previous version of PGlite and Electric also had a sync capability. You can [read more about it on our blog](https://electric-sql.com/blog/2024/05/14/electricsql-postgres-client-support).
37 changes: 36 additions & 1 deletion packages/pglite-sync/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,47 @@ await pg.exec(`
`)
```

You can then use the syncShapeToTable method to sync a table from Electric:
You can sync data from Electric using either the single table or multi-table API:

### Single Table Sync

Use `syncShapeToTable` to sync a single table:

```ts
const shape = await pg.electric.syncShapeToTable({
shape: { url: 'http://localhost:3000/v1/shape', table: 'todo' },
shapeKey: 'todo', // or null if the shape state does not need to be persisted
table: 'todo',
primaryKey: ['id'],
})
```

### Multi-Table Sync

The multi-table API is useful when you need to sync related tables together, ensuring consistency across multiple tables by syncing updates that happened in as single transaction in Postgres within a single transaction in PGLite.

Use `syncShapesToTables` to sync multiple tables simultaneously:

```ts
const sync = await pg.electric.syncShapesToTables({
shapes: {
todos: {
shape: { url: 'http://localhost:3000/v1/shape', table: 'todo' },
table: 'todo',
primaryKey: ['id'],
},
users: {
shape: { url: 'http://localhost:3000/v1/shape', table: 'users' },
table: 'users',
primaryKey: ['id'],
}
},
key: 'my-sync', // or null if the sync state does not need to be persisted
onInitialSync: () => {
console.log('Initial sync complete')
}
})

// Unsubscribe when done
sync.unsubscribe()
```
10 changes: 9 additions & 1 deletion packages/pglite-sync/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
"scripts": {
"build": "tsup",
"test": "vitest",
"test:e2e:up": "docker compose -f test-e2e/docker_compose.yaml up -d",
"test:e2e:down": "docker compose -f test-e2e/docker_compose.yaml down --volumes",
"test:e2e:reset": "pnpm test:e2e:down && pnpm test:e2e:up",
"test:e2e:run": "pnpm vitest --config vitest-e2e.config.ts",
"test:e2e": "pnpm test:e2e:reset && pnpm test:e2e:run && pnpm test:e2e:down",
"lint": "eslint ./src ./test --report-unused-disable-directives --max-warnings 0",
"format": "prettier --write ./src ./test",
"typecheck": "tsc",
Expand All @@ -45,13 +50,16 @@
"dist"
],
"dependencies": {
"@electric-sql/client": "1.0.0-beta.3"
"@electric-sql/client": "1.0.0-beta.5",
"@electric-sql/experimental": "0.1.2-beta.4"
},
"devDependencies": {
"@electric-sql/pglite": "workspace:*",
"@eslint-react/eslint-plugin": "^1.14.3",
"@types/node": "^20.16.11",
"@vitejs/plugin-react": "^4.3.2",
"globals": "^15.11.0",
"pg": "^8.14.0",
"vitest": "^2.1.2"
},
"peerDependencies": {
Expand Down
160 changes: 160 additions & 0 deletions packages/pglite-sync/src/apply.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
import { ChangeMessage } from '@electric-sql/client'
import type { PGliteInterface, Transaction } from '@electric-sql/pglite'
import type { MapColumns, InsertChangeMessage } from './types'

export interface ApplyMessageToTableOptions {
pg: PGliteInterface | Transaction
table: string
schema?: string
message: ChangeMessage<any>
mapColumns?: MapColumns
primaryKey: string[]
debug: boolean
}

export async function applyMessageToTable({
pg,
table,
schema = 'public',
message,
mapColumns,
primaryKey,
debug,
}: ApplyMessageToTableOptions) {
const data = mapColumns ? doMapColumns(mapColumns, message) : message.value

switch (message.headers.operation) {
case 'insert': {
if (debug) console.log('inserting', data)
const columns = Object.keys(data)
return await pg.query(
`
INSERT INTO "${schema}"."${table}"
(${columns.map((s) => '"' + s + '"').join(', ')})
VALUES
(${columns.map((_v, i) => '$' + (i + 1)).join(', ')})
`,
columns.map((column) => data[column]),
)
}

case 'update': {
if (debug) console.log('updating', data)
const columns = Object.keys(data).filter(
// we don't update the primary key, they are used to identify the row
(column) => !primaryKey.includes(column),
)
if (columns.length === 0) return // nothing to update
return await pg.query(
`
UPDATE "${schema}"."${table}"
SET ${columns
.map((column, i) => '"' + column + '" = $' + (i + 1))
.join(', ')}
WHERE ${primaryKey
.map(
(column, i) =>
'"' + column + '" = $' + (columns.length + i + 1),
)
.join(' AND ')}
`,
[
...columns.map((column) => data[column]),
...primaryKey.map((column) => data[column]),
],
)
}

case 'delete': {
if (debug) console.log('deleting', data)
return await pg.query(
`
DELETE FROM "${schema}"."${table}"
WHERE ${primaryKey
.map((column, i) => '"' + column + '" = $' + (i + 1))
.join(' AND ')}
`,
[...primaryKey.map((column) => data[column])],
)
}
}
}

export interface ApplyMessagesToTableWithCopyOptions {
pg: PGliteInterface | Transaction
table: string
schema?: string
messages: InsertChangeMessage[]
mapColumns?: MapColumns
primaryKey: string[]
debug: boolean
}

export async function applyMessagesToTableWithCopy({
pg,
table,
schema = 'public',
messages,
mapColumns,
debug,
}: ApplyMessagesToTableWithCopyOptions) {
if (debug) console.log('applying messages with COPY')

// Map the messages to the data to be inserted
const data: Record<string, any>[] = messages.map((message) =>
mapColumns ? doMapColumns(mapColumns, message) : message.value,
)

// Get column names from the first message
const columns = Object.keys(data[0])

// Create CSV data
const csvData = data
.map((message) => {
return columns
.map((column) => {
const value = message[column]
// Escape double quotes and wrap in quotes if necessary
if (
typeof value === 'string' &&
(value.includes(',') || value.includes('"') || value.includes('\n'))
) {
return `"${value.replace(/"/g, '""')}"`
}
return value === null ? '\\N' : value
})
.join(',')
})
.join('\n')
const csvBlob = new Blob([csvData], { type: 'text/csv' })

// Perform COPY FROM
await pg.query(
`
COPY "${schema}"."${table}" (${columns.map((c) => `"${c}"`).join(', ')})
FROM '/dev/blob'
WITH (FORMAT csv, NULL '\\N')
`,
[],
{
blob: csvBlob,
},
)

if (debug) console.log(`Inserted ${messages.length} rows using COPY`)
}

function doMapColumns(
mapColumns: MapColumns,
message: ChangeMessage<any>,
): Record<string, any> {
if (typeof mapColumns === 'function') {
return mapColumns(message)
} else {
const mappedColumns: Record<string, any> = {}
for (const [key, value] of Object.entries(mapColumns)) {
mappedColumns[key] = message.value[value]
}
return mappedColumns
}
}
Loading
Loading