forked from estuary/flow
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgrouped.flow.yaml
32 lines (30 loc) · 963 Bytes
/
grouped.flow.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
collections:
acmeBank/grouped-transfers:
schema:
# Enrich transfer with a window of *other* transfers.
$ref: transfers.schema.yaml
required: [window]
properties:
window: { type: array }
key: [/id]
derive:
using:
sqlite:
migrations:
- CREATE TABLE transfers (
id INTEGER PRIMARY KEY NOT NULL,
sender TEXT NOT NULL,
recipient TEXT NOT NULL,
amount REAL NOT NULL
);
CREATE INDEX idx_transfers_sender ON transfers (sender);
transforms:
- name: enrichAndAddToWindow
source: acmeBank/transfers
shuffle: { key: [/sender] }
lambda: enrichAndAddToWindow.sql
- name: removeFromWindow
source: acmeBank/transfers
shuffle: { key: [/sender] }
readDelay: 24h
lambda: DELETE FROM transfers WHERE id = $id;