Skip to content

Commit b3640ee

Browse files
samwillistudor
and
tudor
authored
fix: update sync plugin to work with the latest Electric sync server (#531)
* update sync plugin * docs stylecheck * Fit types in tests --------- Co-authored-by: tudor <tudor@swisstch.com>
1 parent f94d591 commit b3640ee

File tree

6 files changed

+32
-37
lines changed

6 files changed

+32
-37
lines changed

.changeset/gentle-ways-knock.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@electric-sql/pglite-sync': patch
3+
---
4+
5+
Update the sync plugin to work with the latest Electric sync server

docs/docs/api.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -362,9 +362,10 @@ await pg.describeQuery('SELECT * FROM test WHERE name = $1', ['test'])
362362
```
363363

364364
### clone
365+
365366
`.clone(): Promise<PGlite>`
366367

367-
Clones the current instance. This is useful when a series of operations, like unit or integration test, need to be run on the same database without having to recreate the database each time, or for each test.
368+
Clones the current instance. This is useful when a series of operations, like unit or integration test, need to be run on the same database without having to recreate the database each time, or for each test.
368369

369370
## Properties
370371

packages/pglite-sync/package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
"dist"
4646
],
4747
"dependencies": {
48-
"@electric-sql/client": "~0.9.0"
48+
"@electric-sql/client": "1.0.0-beta.3"
4949
},
5050
"devDependencies": {
5151
"@electric-sql/pglite": "workspace:*",

packages/pglite-sync/src/index.ts

+18-5
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { Offset, ShapeStreamOptions } from '@electric-sql/client'
1+
import type { Offset, Row, ShapeStreamOptions } from '@electric-sql/client'
22
import {
33
ChangeMessage,
44
ShapeStream,
@@ -12,6 +12,10 @@ import type {
1212
Transaction,
1313
} from '@electric-sql/pglite'
1414

15+
interface LegacyChangeMessage<T extends Row<unknown>> extends ChangeMessage<T> {
16+
offset?: Offset
17+
}
18+
1519
export type MapColumnsMap = Record<string, string>
1620
export type MapColumnsFn = (message: ChangeMessage<any>) => Record<string, any>
1721
export type MapColumns = MapColumnsMap | MapColumnsFn
@@ -154,7 +158,7 @@ async function createPlugin(
154158
// _very_ large shapes - either we should commit batches to
155159
// a temporary table and copy over the transactional result
156160
// or use a separate connection to hold a long transaction
157-
let messageAggregator: ChangeMessage<any>[] = []
161+
let messageAggregator: LegacyChangeMessage<any>[] = []
158162
let truncateNeeded = false
159163
// let lastLSN: string | null = null // Removed until Electric has stabilised on LSN metadata
160164
let lastCommitAt: number = 0
@@ -248,8 +252,9 @@ async function createPlugin(
248252
metadataSchema,
249253
shapeKey: options.shapeKey,
250254
shapeId: shapeHandle,
251-
lastOffset:
252-
messageAggregator[messageAggregator.length - 1].offset,
255+
lastOffset: getMessageOffset(
256+
messageAggregator[messageAggregator.length - 1],
257+
),
253258
})
254259
}
255260
})
@@ -359,7 +364,7 @@ async function createPlugin(
359364
return stream.isUpToDate
360365
},
361366
get shapeId() {
362-
return stream.shapeHandle
367+
return stream.shapeHandle!
363368
},
364369
stream,
365370
subscribe: (cb: () => void, error: (err: Error) => void) => {
@@ -667,3 +672,11 @@ function subscriptionMetadataTableName(metadatSchema: string) {
667672
}
668673

669674
const subscriptionTableName = `shape_subscriptions_metadata`
675+
676+
function getMessageOffset(message: LegacyChangeMessage<any>): Offset {
677+
if (message.offset) {
678+
return message.offset
679+
} else {
680+
return `${message.headers.lsn}_${message.headers.op_position}` as Offset
681+
}
682+
}

packages/pglite-sync/test/sync.test.ts

-25
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ describe('pglite-sync', () => {
6464
// insert
6565
await feedMessage({
6666
headers: { operation: 'insert' },
67-
offset: '-1',
6867
key: 'id1',
6968
value: {
7069
id: 1,
@@ -83,7 +82,6 @@ describe('pglite-sync', () => {
8382
// update
8483
await feedMessage({
8584
headers: { operation: 'update' },
86-
offset: '-1',
8785
key: 'id1',
8886
value: {
8987
id: 1,
@@ -102,7 +100,6 @@ describe('pglite-sync', () => {
102100
// delete
103101
await feedMessage({
104102
headers: { operation: 'delete' },
105-
offset: '-1',
106103
key: 'id1',
107104
value: {
108105
id: 1,
@@ -315,7 +312,6 @@ describe('pglite-sync', () => {
315312
await feedMessages([
316313
{
317314
headers: { operation: 'insert' },
318-
offset: `1_${numInserts}`,
319315
key: `id${numInserts}`,
320316
value: {
321317
id: numInserts,
@@ -326,7 +322,6 @@ describe('pglite-sync', () => {
326322
{ headers: { control: 'must-refetch' } },
327323
{
328324
headers: { operation: 'insert' },
329-
offset: `2_1`,
330325
key: `id21`,
331326
value: {
332327
id: 21,
@@ -466,7 +461,6 @@ describe('pglite-sync', () => {
466461
// insert
467462
await feedMessage({
468463
headers: { operation: 'insert' },
469-
offset: '-1',
470464
key: 'id1',
471465
value: {
472466
id: 1,
@@ -485,7 +479,6 @@ describe('pglite-sync', () => {
485479
// update with no columns to update
486480
await feedMessage({
487481
headers: { operation: 'update' },
488-
offset: '-1',
489482
key: 'id1',
490483
value: {
491484
id: 1,
@@ -555,7 +548,6 @@ describe('pglite-sync', () => {
555548

556549
await feedMessage({
557550
headers: { operation: 'insert' },
558-
offset: '-1',
559551
key: 'id1',
560552
value: {
561553
id: 'id1',
@@ -619,7 +611,6 @@ describe('pglite-sync', () => {
619611
),
620612
{
621613
headers: { operation: 'update' as const },
622-
offset: `1_${numInserts}`,
623614
key: `id0`,
624615
value: {
625616
id: 0,
@@ -683,7 +674,6 @@ describe('pglite-sync', () => {
683674
const specialCharMessages: Message[] = [
684675
{
685676
headers: { operation: 'insert' },
686-
offset: '1_0',
687677
key: 'id1',
688678
value: {
689679
id: 1,
@@ -693,7 +683,6 @@ describe('pglite-sync', () => {
693683
},
694684
{
695685
headers: { operation: 'insert' },
696-
offset: '2_0',
697686
key: 'id2',
698687
value: {
699688
id: 2,
@@ -703,7 +692,6 @@ describe('pglite-sync', () => {
703692
},
704693
{
705694
headers: { operation: 'insert' },
706-
offset: '3_0',
707695
key: 'id3',
708696
value: {
709697
id: 3,
@@ -784,7 +772,6 @@ describe('pglite-sync', () => {
784772
(_, idx) =>
785773
({
786774
headers: { operation: 'insert' },
787-
offset: `1_${idx}`,
788775
key: `id${idx}`,
789776
value: {
790777
id: idx,
@@ -872,7 +859,6 @@ describe('pglite-sync', () => {
872859
// await feedMessages([
873860
// {
874861
// headers: { operation: 'insert' },
875-
// offset: '1_1', // Transaction 1
876862
// key: 'id1',
877863
// value: {
878864
// id: 1,
@@ -882,7 +868,6 @@ describe('pglite-sync', () => {
882868
// },
883869
// {
884870
// headers: { operation: 'insert' },
885-
// offset: '1_2', // Same transaction
886871
// key: 'id2',
887872
// value: {
888873
// id: 2,
@@ -892,7 +877,6 @@ describe('pglite-sync', () => {
892877
// },
893878
// {
894879
// headers: { operation: 'insert' },
895-
// offset: '2_1', // New transaction
896880
// key: 'id3',
897881
// value: {
898882
// id: 3,
@@ -974,19 +958,16 @@ describe('pglite-sync', () => {
974958
await feedMessages([
975959
{
976960
headers: { operation: 'insert' },
977-
offset: '1_1',
978961
key: 'id1',
979962
value: { id: 1, task: 'task1', done: false },
980963
},
981964
{
982965
headers: { operation: 'insert' },
983-
offset: '2_1',
984966
key: 'id2',
985967
value: { id: 2, task: 'task2', done: false },
986968
},
987969
{
988970
headers: { operation: 'insert' },
989-
offset: '3_1',
990971
key: 'id3',
991972
value: { id: 3, task: 'task3', done: false },
992973
},
@@ -1054,19 +1035,16 @@ describe('pglite-sync', () => {
10541035
await feedMessages([
10551036
{
10561037
headers: { operation: 'insert' },
1057-
offset: '1_1',
10581038
key: 'id1',
10591039
value: { id: 1, task: 'task1', done: false },
10601040
},
10611041
{
10621042
headers: { operation: 'insert' },
1063-
offset: '1_2',
10641043
key: 'id2',
10651044
value: { id: 2, task: 'task2', done: false },
10661045
},
10671046
{
10681047
headers: { operation: 'insert' },
1069-
offset: '1_3',
10701048
key: 'id3',
10711049
value: { id: 3, task: 'task3', done: false },
10721050
},
@@ -1230,7 +1208,6 @@ describe('pglite-sync', () => {
12301208
await feedMessages([
12311209
{
12321210
headers: { operation: 'insert' },
1233-
offset: '1_1',
12341211
key: 'id1',
12351212
value: {
12361213
id: 1,
@@ -1240,7 +1217,6 @@ describe('pglite-sync', () => {
12401217
},
12411218
{
12421219
headers: { operation: 'insert' },
1243-
offset: '1_2',
12441220
key: 'id2',
12451221
value: {
12461222
id: 2,
@@ -1257,7 +1233,6 @@ describe('pglite-sync', () => {
12571233
await feedMessages([
12581234
{
12591235
headers: { operation: 'insert' },
1260-
offset: '1_3',
12611236
key: 'id3',
12621237
value: {
12631238
id: 3,

pnpm-lock.yaml

+6-5
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)