Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initialize Tinybird for Insights #2803

Open
wants to merge 80 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
075f58b
migration for creating activityRelations and getting historical data in
epipav Jan 24, 2025
5705be3
creating activityRelation rows on integrations
epipav Jan 24, 2025
f24b762
add missing fields to activityRelations
epipav Jan 29, 2025
5160443
move activity relations on entity merge/unmerges
epipav Jan 29, 2025
bf10454
Merge branch 'main' into feature/managing-activity-relations-in-a-sep…
epipav Jan 29, 2025
1a62f1b
fix activity overrides when no rows are returned from queries
epipav Jan 29, 2025
7b3b2df
Merge branch 'bugfix/data-sink-activity-overrides-fix' into feature/m…
epipav Jan 29, 2025
3074852
fix fk
epipav Jan 29, 2025
a1925da
add segmentId to activityRelations
epipav Jan 30, 2025
25e7b47
managing activity relations in the profiles worker
epipav Jan 30, 2025
8c9298b
Merge branch 'main' into feature/managing-activity-relations-in-a-sep…
epipav Jan 30, 2025
4241313
fix missing segmentId in sql
epipav Jan 30, 2025
62bc9d2
Merge branch 'main' into feature/managing-activity-relations-in-a-sep…
epipav Jan 31, 2025
39946ae
affiliation overrides improvements, some fixing with activityRelation…
epipav Jan 31, 2025
601f6c8
getting activity data to tinybird with kafka-connect http sink
epipav Feb 4, 2025
6d2a7e6
Merge branch 'main' into feature/activity-event-data-to-tinybird
epipav Feb 5, 2025
449a5d1
copying activities from questdb to tinybird via script executor
epipav Feb 5, 2025
e9f6ec5
enable going through all activities
epipav Feb 5, 2025
2057104
remove console logs
epipav Feb 5, 2025
d585202
Merge branch 'main' into feature/managing-activity-relations-in-a-sep…
epipav Feb 5, 2025
357ad55
cleaning
epipav Feb 5, 2025
07e974d
add tinybird activity_events.datasource to lib/tinybird for reference
epipav Feb 5, 2025
4c1df33
increase batch size to 1000 for activity ingestion to tinybird
epipav Feb 5, 2025
1ee34ab
parametrized batch size
epipav Feb 5, 2025
8d21776
cli functions for tinybird migrations and tb cli usage, scaffold for …
epipav Feb 10, 2025
d7c76b6
bit cli formatting, improving activities datasource
epipav Feb 10, 2025
cd4868c
improve tinybird retries
epipav Feb 10, 2025
f63826b
Merge branch 'feature/activity-event-data-to-tinybird' into feature/i…
epipav Feb 10, 2025
1654c75
Merge branch 'feature/managing-activity-relations-in-a-separate-table…
epipav Feb 10, 2025
dfed39b
cleanup tinybird library, add tmp files to gitignore
epipav Feb 10, 2025
1363ae9
add missing segmentId to activityRelations migration
epipav Feb 10, 2025
433cd49
fix relation migration when sorting by uuid
epipav Feb 10, 2025
afdf99e
Add PostgreSQL preparation for logical replication
borfast Feb 10, 2025
39b33a8
Add Sequin docker service
borfast Feb 10, 2025
7738111
Add SASL Kafka authentication to the frontend code
borfast Feb 10, 2025
e9201a7
Remove deprecated docker compose version declaration
borfast Feb 10, 2025
191b553
Fix file formatting
borfast Feb 10, 2025
2aff508
Fix file formatting
borfast Feb 10, 2025
717eefc
sync project with tb remote using git integration
epipav Feb 11, 2025
bf5a34a
improve activities schema
epipav Feb 11, 2025
4de8066
Merge branch 'feature/initialize-tinybird-for-insights' of github.com…
epipav Feb 11, 2025
409b31c
improve activity datasource, add smt for removing unwanted fields fro…
epipav Feb 12, 2025
79f0d48
update tinybird sink for batch processing
epipav Feb 12, 2025
3575e24
activityRelations with kafka-connect
epipav Feb 13, 2025
6307f64
generic sequin http sink works, tested with members and activityRelat…
epipav Feb 13, 2025
1fc4035
Merge branch 'main' into feature/managing-activity-relations-in-a-sep…
epipav Feb 17, 2025
7fcb62d
managing activityRelations updates within updateActivities & streamAc…
epipav Feb 17, 2025
72c4485
bit cleaning & types
epipav Feb 17, 2025
1027282
revert cli --load changes
epipav Feb 17, 2025
7d2c9b4
add record prefixes to memberIdentities datasource
epipav Feb 17, 2025
e8bddb1
Programmatically download Kafka Connect HTTP (#2846)
borfast Feb 18, 2025
f5a96a3
Merge branch 'main' into feature/initialize-tinybird-for-insights
epipav Feb 21, 2025
205e034
improve tinybird datasources from feedback
epipav Feb 24, 2025
3ad6561
updated sorting keys
epipav Feb 24, 2025
b55c0a8
add organizations datasource
epipav Feb 24, 2025
3293bde
default value for activityRelations conversationId
epipav Feb 24, 2025
b0eb2fd
improve sync script performance
epipav Feb 24, 2025
761375e
get full activity by createdat on migration to tinybird
epipav Feb 24, 2025
b2d36e7
update tinybird url
epipav Feb 24, 2025
5ca9c77
fix activities datasource score default value
epipav Feb 24, 2025
240f0b3
new segments datasource, updated pipes with modular activities_filter…
epipav Feb 25, 2025
b1d6cc3
Merge branch 'main' into feature/initialize-tinybird-for-insights
epipav Feb 26, 2025
4433079
copy activities workflow now uses timestamp as cursor for performance…
epipav Feb 26, 2025
80f604f
add new topics to sequin sink
epipav Feb 27, 2025
905ca45
collections datasources for tinybird, removing redundant pipes
epipav Feb 27, 2025
b965596
configurable queue suffixes
epipav Mar 2, 2025
3e971a2
Merge branch 'main' into feature/initialize-tinybird-for-insights
epipav Mar 3, 2025
ec63ba5
Merge branch 'main' into feature/initialize-tinybird-for-insights
epipav Mar 3, 2025
b8ce814
check kafka-connect plugins contents as well before deciding to download
epipav Mar 3, 2025
8f26d87
add unique slug fields to collections and insightsProjects
epipav Mar 3, 2025
c0e5f6c
add slug to tinybird datasources for collections and insightsProjects
epipav Mar 3, 2025
60e4a23
new datasource for repositories - new pipes for search, project list/…
epipav Mar 4, 2025
a58c704
add avatar as a denormalized field to members datasource
epipav Mar 4, 2025
0233631
improve members and activities datasources
epipav Mar 5, 2025
942ea80
new pipes or widgets, added denormalized country to members datasource
epipav Mar 10, 2025
85a7b88
support copyActivitiesFromQuestdbToTinybird workflow run against few …
epipav Mar 11, 2025
583b495
sending segmentIds from schedule to also test the retries
epipav Mar 11, 2025
8748936
add schedule function to main in script executor
epipav Mar 11, 2025
0a9b150
existing pipe updates, retention pipes introduced
epipav Mar 11, 2025
ca57e93
Merge branch 'main' into feature/initialize-tinybird-for-insights
epipav Mar 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ yarn-error.log*
api-test/__pycache__/api_test.cpython-39-pytest-6.2.4.pyc

# python
.venv/
**/*-venv
**/venv
**/venv*
Expand All @@ -50,3 +51,7 @@ api-test/__pycache__/api_test.cpython-39-pytest-6.2.4.pyc
docker/volume

services/libs/*/dist

**/.tinyb
**/.tinyenv
services/libs/tinybird/.diff_tmp
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
ALTER TABLE collections ADD COLUMN slug TEXT;
ALTER TABLE "insightsProjects" ADD COLUMN slug TEXT;

CREATE OR REPLACE FUNCTION generate_slug(table_name TEXT, input_text TEXT) RETURNS TEXT AS $$
DECLARE
base_slug TEXT;
unique_slug TEXT;
counter INT := 1;
query TEXT;
slug_exists BOOLEAN;
BEGIN
base_slug := lower(regexp_replace(input_text, '[^a-zA-Z0-9]+', '-', 'g'));
base_slug := regexp_replace(base_slug, '-$', '', 'g');
unique_slug := base_slug;

-- Ensure uniqueness by appending a counter if needed
LOOP
query := format('SELECT EXISTS (SELECT 1 FROM %I WHERE slug = $1)', table_name);
EXECUTE query INTO slug_exists USING unique_slug;

EXIT WHEN NOT slug_exists;
unique_slug := base_slug || '-' || counter;
counter := counter + 1;
END LOOP;

RETURN unique_slug;
END;
$$ LANGUAGE plpgsql;

UPDATE collections SET slug = generate_slug('collections', name) WHERE name IS NOT NULL;
UPDATE "insightsProjects" SET slug = generate_slug('insightsProjects', name) WHERE name IS NOT NULL;

ALTER TABLE collections ALTER COLUMN slug SET NOT NULL;
ALTER TABLE collections ADD CONSTRAINT idx_collections_slug_unique UNIQUE (slug);

ALTER TABLE "insightsProjects" ALTER COLUMN slug SET NOT NULL;
ALTER TABLE "insightsProjects" ADD CONSTRAINT "idx_insightsProjects_slug_unique" UNIQUE (slug);
8 changes: 8 additions & 0 deletions scripts/.env.dist
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Environment variables that override the defaults in the Docker Compose scaffold.yml file.
# In order to use, make a copy of this file named .env and change the values as needed.

# Sequin
CROWD_SEQUIN_SERVER_HOST=192.168.122.31
CROWD_SEQUIN_SERVER_PORT=7376
CROWD_SEQUIN_SECRET_KEY_BASE=WyPLiGs0pvD6qJhKJICO4dauYPXfO/Yl782Zjtpew5qRBDp7CZvbWtQmY0eB13If
CROWD_SEQUIN_VAULT_KEY=2Sig69bIpuSm2kv0VQfDekET2qy8qUZGI8v3/h3ASiY=
5 changes: 4 additions & 1 deletion scripts/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
db_dumps/*
!db_dumps/.gitkeep
!db_dumps/.gitkeep
.env
kafka-connect-http.zip
scaffold/kafka-connect/kafka-connect-http/*
114 changes: 112 additions & 2 deletions scripts/cli
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,87 @@ function migrate_env() {
-f /tmp/migrations/V1716382997__init-questdb.sql
}

function migrate_local() {
function wait_for_tinybird() {
say "Waiting for Tinybird to get ready.."
until curl -s -f http://localhost:80/tokens >/dev/null 2>&1; do
yell "Tinybird is not ready yet. Retrying in 5 seconds.."
sleep 5
done
say "Tinybird is ready!"
}

function fetch_tinybird_tokens() {
wait_for_tinybird
read -r CROWD_TINYBIRD_WORKSPACE_ADMIN_TOKEN CROWD_TINYBIRD_USER_TOKEN < <(curl -s http://localhost:80/tokens | jq -r '[.workspace_admin_token, .user_token] | @tsv')
progress "Workspace Admin Token=$CROWD_TINYBIRD_WORKSPACE_ADMIN_TOKEN"
progress "User Token=$CROWD_TINYBIRD_USER_TOKEN"
export CROWD_TINYBIRD_WORKSPACE_ADMIN_TOKEN
export CROWD_TINYBIRD_USER_TOKEN
}

function run_tinybird_cli_command() {
fetch_tinybird_tokens
yell "Running Tinybird CLI command: sh -c '$@'"
docker run --rm --network "${PROJECT_NAME}-bridge" \
-e TB_TOKEN=$CROWD_TINYBIRD_WORKSPACE_ADMIN_TOKEN \
-v "$CLI_HOME/../services/libs/tinybird:/mnt/data:rw" \
-w /mnt/data \
tinybirdco/tinybird-cli-docker:latest \
sh -c "$@"
}

function migrate_tinybird_local() {
set +e +o pipefail

say "Applying Tinybird project!"
fetch_tinybird_tokens

if [ -z "$CROWD_TINYBIRD_WORKSPACE_ADMIN_TOKEN" ] ||
[ "$CROWD_TINYBIRD_WORKSPACE_ADMIN_TOKEN" = "null" ] ||
[ -z "$CROWD_TINYBIRD_USER_TOKEN" ] ||
[ "$CROWD_TINYBIRD_USER_TOKEN" = "null" ]; then
echo "Error: Could not fetch Tinybird tokens. Exiting."
return 1
fi

say "Pushing Tinybird project!"

MAX_RETRIES=5
RETRY_DELAY=5
attempt=1

while [ $attempt -le $MAX_RETRIES ]; do
whisper "Attempt $attempt/$MAX_RETRIES to deploy Tinybird project.."

OUTPUT=$(docker run --rm --network "${PROJECT_NAME}-bridge" \
-e TB_TOKEN=$CROWD_TINYBIRD_WORKSPACE_ADMIN_TOKEN \
-v $CLI_HOME/../services/libs/tinybird:/mnt/data:rw \
-w /mnt/data \
tinybirdco/tinybird-cli-docker \
sh -c 'tb auth --host http://tinybird:80 && tb push')

STATUS=$?

whisper "$OUTPUT"

if [[ "$OUTPUT" == *"502 error responses"* ]] || [[ "$OUTPUT" == *"Max retries exceeded"* ]] || [ $STATUS -ne 0 ]; then
yell "Tinybird push failed (Attempt: $attempt/$MAX_RETRIES). Retrying in $RETRY_DELAY seconds..."
sleep $RETRY_DELAY
else
say "Tinybird project deployed successfully!"
return 0
fi

attempt=$((attempt + 1))
done

set -eo pipefail

error "Tinybird push failed after $MAX_RETRIES attempts. Exiting."
return 1
}

function migrate_postgres_local() {
say "Building crowd flyway migration image..."
docker build $DOCKER_PLATFORM_FLAGS -t crowd_flyway -f $CLI_HOME/../backend/src/database/Dockerfile.flyway $CLI_HOME/../backend/src/database --load
say "Applying PostgreSQL migrations!"
Expand All @@ -547,15 +627,19 @@ function migrate_local() {
-e PGPASSWORD=example \
-e PGDATABASE=crowd-web \
crowd_flyway
}

function migrate_questdb_local() {
say "Applying QuestDB migrations!"
docker run --rm --network "${PROJECT_NAME}-bridge" \
-v $CLI_HOME/../services/migrations/questdb/:/tmp/migrations \
postgres psql postgresql://admin:quest@questdb:8812/qdb \
-f /tmp/migrations/V1716382997__init-questdb.sql
}

function migrate_productdb_local() {
say "Building product flyway migration image..."
docker build $DOCKER_PLATFORM_FLAGS -t product_flyway -f $CLI_HOME/../backend/src/product/Dockerfile.flyway $CLI_HOME/../backend/src/product --load
docker build $DOCKER_PLATFORM_FLAGS -t product_flyway -f $CLI_HOME/../backend/src/product/Dockerfile.flyway $CLI_HOME/../backend/src/product
say "Applying product database migrations!"
docker run --rm --network "${PROJECT_NAME}-bridge" \
-e PGHOST=product \
Expand All @@ -566,6 +650,13 @@ function migrate_local() {
product_flyway
}

function migrate_local() {
migrate_postgres_local
migrate_questdb_local
migrate_productdb_local
migrate_tinybird_local
}

function up_test_scaffold() {
scaffold_set_up_network "${PROJECT_NAME}-bridge-test" $DOCKET_TEST_NETWORK_SUBNET $DOCKER_TEST_NETWORK_GATEWAY
$_DC -p "$PROJECT_NAME-test" -f $CLI_HOME/../backend/docker-compose.test.yaml down
Expand Down Expand Up @@ -597,6 +688,14 @@ function install_libs() {
(cd $CLI_HOME/.. && pnpm i --frozen-lockfile)
}

function download_kafka_connect_http() {
say "Downloading Kafka Connect HTTP plugin..."
wget -nv https://github.com/lensesio/stream-reactor/releases/download/8.1.28/kafka-connect-http-8.1.28.zip -O $CLI_HOME/kafka-connect-http.zip
unzip -q $CLI_HOME/kafka-connect-http.zip -d $CLI_HOME/scaffold/kafka-connect/kafka-connect-http
rm $CLI_HOME/kafka-connect-http.zip
say "Kafka Connect HTTP plugin downloaded."
}

function up_scaffold() {
scaffold_set_up_network "$PROJECT_NAME-bridge" $DOCKER_NETWORK_SUBNET $DOCKER_NETWORK_GATEWAY

Expand All @@ -605,6 +704,13 @@ function up_scaffold() {
profile='--profile nginx'
fi

if [[ ! -d "$CLI_HOME/scaffold/kafka-connect/kafka-connect-http" ||
-z "$(ls -A "$CLI_HOME/scaffold/kafka-connect/kafka-connect-http")" ||
! -d "$CLI_HOME/scaffold/kafka-connect/kafka-connect-http/kafka-connect-http-8.1.28" ||
-z "$(ls -A "$CLI_HOME/scaffold/kafka-connect/kafka-connect-http/kafka-connect-http-8.1.28")" ]]; then
download_kafka_connect_http
fi

$_DC --compatibility -p $PROJECT_NAME -f $CLI_HOME/scaffold.yaml ${profile} up -d --build
}

Expand Down Expand Up @@ -897,6 +1003,10 @@ while test $# -gt 0; do
(cd $CLI_HOME/../services/scripts && ./lint_apps.sh && ./lint_libs.sh)
exit
;;
tb)
run_tinybird_cli_command "$*"
exit
;;
*)
error "Invalid command '$1'" && say "$SCRIPT_USAGE"
exit 1
Expand Down
55 changes: 47 additions & 8 deletions scripts/scaffold.yaml
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
version: '3.1'

services:
db:
image: postgres:14-alpine
restart: unless-stopped
command: -c 'max_connections=300'
command: -c 'max_connections=300' -c 'wal_level=logical' -c 'max_replication_slots=11' -c 'max_replication_slots=15'
environment:
- POSTGRES_PASSWORD=example
- POSTGRES_DB=crowd-web
ports:
- 5432:5432
volumes:
- pgdata-dev:/var/lib/postgresql/data
- ./scaffold/sequin/postgres-docker-entrypoint-initdb.d/create-sequin-user-and-database.sql:/docker-entrypoint-initdb.d/create-sequin-user-and-database.sql
shm_size: 1gb
networks:
- crowd-bridge
Expand Down Expand Up @@ -154,6 +153,7 @@ services:
- KAFKA_CFG_LOG_DIRS=/opt/bitnami/kafka/data
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1
- KAFKA_KRAFT_CLUSTER_ID=OTMwNzFhYTY1ODNiNGE5OT
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
ports:
- '9092:9092'
- '9093:9093'
Expand All @@ -165,16 +165,15 @@ services:
build:
context: scaffold/kafka-connect
restart: unless-stopped
entrypoint:
- connect-standalone
- /etc/kafka-connect/worker-local.properties
- /etc/kafka-connect/console-local-sink.properties
- /etc/kafka-connect/questdb-local-sink.properties
entrypoint: ["/bin/sh", "-c", "/wait-for-tinybird.sh"]
volumes:
- kafka-connect-dev:/storage
- ./scaffold/kafka-connect/wait-for-tinybird.sh:/wait-for-tinybird.sh
- ./scaffold/kafka-connect/worker-local.properties:/etc/kafka-connect/worker-local.properties
- ./scaffold/kafka-connect/console-local-sink.properties:/etc/kafka-connect/console-local-sink.properties
- ./scaffold/kafka-connect/questdb-local-sink.properties:/etc/kafka-connect/questdb-local-sink.properties
- ./scaffold/kafka-connect/tinybird-local-sink.properties:/etc/kafka-connect/tinybird-local-sink.properties

networks:
- crowd-bridge

Expand All @@ -188,6 +187,44 @@ services:
networks:
- crowd-bridge

sequin:
image: sequin/sequin:latest
restart: unless-stopped
ports:
- "7376:7376"
environment:
- PG_HOSTNAME=db
- PG_DATABASE=sequin
- PG_PORT=5432
- PG_USERNAME=postgres
- PG_PASSWORD=example
- PG_POOL_SIZE=10
- SECRET_KEY_BASE=${CROWD_SEQUIN_SECRET_KEY_BASE:-WyPLiGs0pvD6qJhKJICO4dauYPXfO/Yl782Zjtpew5qRBDp7CZvbWtQmY0eB13If}
- VAULT_KEY=${CROWD_SEQUIN_VAULT_KEY:-2Sig69bIpuSm2kv0VQfDekET2qy8qUZGI8v3/h3ASiY=}
- REDIS_URL=redis://default:crowdtest@redis:6379
- SERVER_HOST=${CROWD_SEQUIN_SERVER_HOST:-localhost}
- SERVER_PORT=${CROWD_SEQUIN_SERVER_PORT:-7376}
volumes:
- ./scaffold/sequin.yml:/config/sequin.yml
depends_on:
- db
- redis
- kafka
networks:
- crowd-bridge

tinybird:
image: tinybirdco/tinybird-local:latest
ports:
- "80:80"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:80/tokens"]
interval: 10s
retries: 5
start_period: 10s
networks:
- crowd-bridge

networks:
crowd-bridge:
external: true
Expand All @@ -200,3 +237,5 @@ volumes:
s3-dev:
redis-dev:
kafka-connect-dev:
sequin-dev:
tinybird-data:
4 changes: 3 additions & 1 deletion scripts/scaffold/kafka-connect/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ RUN yum install -y jq findutils unzip

RUN confluent-hub install snowflakeinc/snowflake-kafka-connector:2.5.0 --no-prompt
RUN confluent-hub install questdb/kafka-questdb-connector:0.12 --no-prompt
COPY kafka-connect-http/ /usr/share/confluent-hub-components/kafka-connect-http/


VOLUME /storage

USER appuser
USER root

5 changes: 5 additions & 0 deletions scripts/scaffold/kafka-connect/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
The Kafka HTTP Connector we're using comes from here:
https://github.com/lensesio/stream-reactor/releases

Its documentation is available here:
https://docs.lenses.io/latest/connectors/kafka-connectors/sinks/http
13 changes: 13 additions & 0 deletions scripts/scaffold/kafka-connect/tinybird-local-sink.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
name=tinybird-sink
connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector
tasks.max=1
topics=activities,activityRelations,members,memberIdentities,organizations,collections,insightsProjects,collectionsInsightsProjects
connect.http.method=POST
connect.http.endpoint=http://tinybird:80/v0/events?name={{topic}}
connect.http.request.headers=Content-Type: application/json,Authorization: Bearer ${CROWD_TINYBIRD_WORKSPACE_ADMIN_TOKEN}
connect.http.batch.count=10
connect.http.time.interval=5
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false
connect.http.request.content={{value}}
Loading
Loading