diff --git a/.gitignore b/.gitignore index f66f450d4b..239cf9e303 100644 --- a/.gitignore +++ b/.gitignore @@ -24,6 +24,7 @@ yarn-error.log* api-test/__pycache__/api_test.cpython-39-pytest-6.2.4.pyc # python +.venv/ **/*-venv **/venv **/venv* @@ -50,3 +51,7 @@ api-test/__pycache__/api_test.cpython-39-pytest-6.2.4.pyc docker/volume services/libs/*/dist + +**/.tinyb +**/.tinyenv +services/libs/tinybird/.diff_tmp \ No newline at end of file diff --git a/backend/src/database/migrations/U1741006323__AddSlugToCollectionsAndInsightsProjects.sql b/backend/src/database/migrations/U1741006323__AddSlugToCollectionsAndInsightsProjects.sql new file mode 100644 index 0000000000..e69de29bb2 diff --git a/backend/src/database/migrations/V1741006323__AddSlugToCollectionsAndInsightsProjects.sql b/backend/src/database/migrations/V1741006323__AddSlugToCollectionsAndInsightsProjects.sql new file mode 100644 index 0000000000..2984e52378 --- /dev/null +++ b/backend/src/database/migrations/V1741006323__AddSlugToCollectionsAndInsightsProjects.sql @@ -0,0 +1,37 @@ +ALTER TABLE collections ADD COLUMN slug TEXT; +ALTER TABLE "insightsProjects" ADD COLUMN slug TEXT; + +CREATE OR REPLACE FUNCTION generate_slug(table_name TEXT, input_text TEXT) RETURNS TEXT AS $$ +DECLARE + base_slug TEXT; + unique_slug TEXT; + counter INT := 1; + query TEXT; + slug_exists BOOLEAN; +BEGIN + base_slug := lower(regexp_replace(input_text, '[^a-zA-Z0-9]+', '-', 'g')); + base_slug := regexp_replace(base_slug, '-$', '', 'g'); + unique_slug := base_slug; + + -- Ensure uniqueness by appending a counter if needed + LOOP + query := format('SELECT EXISTS (SELECT 1 FROM %I WHERE slug = $1)', table_name); + EXECUTE query INTO slug_exists USING unique_slug; + + EXIT WHEN NOT slug_exists; + unique_slug := base_slug || '-' || counter; + counter := counter + 1; + END LOOP; + + RETURN unique_slug; +END; +$$ LANGUAGE plpgsql; + +UPDATE collections SET slug = generate_slug('collections', name) WHERE name IS NOT NULL; +UPDATE "insightsProjects" SET slug = generate_slug('insightsProjects', name) WHERE name IS NOT NULL; + +ALTER TABLE collections ALTER COLUMN slug SET NOT NULL; +ALTER TABLE collections ADD CONSTRAINT idx_collections_slug_unique UNIQUE (slug); + +ALTER TABLE "insightsProjects" ALTER COLUMN slug SET NOT NULL; +ALTER TABLE "insightsProjects" ADD CONSTRAINT "idx_insightsProjects_slug_unique" UNIQUE (slug); diff --git a/backend/src/services/collectionService.ts b/backend/src/services/collectionService.ts index 1154483246..5bc414e047 100644 --- a/backend/src/services/collectionService.ts +++ b/backend/src/services/collectionService.ts @@ -367,13 +367,18 @@ export class CollectionService extends LoggerBase { const qx = SequelizeRepository.getQueryExecutor(this.options) const integrations = await fetchIntegrationsForSegment(qx, segmentId) - const result = {} - const addToResult = (platform: PlatformType, url: string) => { - if (!result[url]) { - result[url] = [] - } - if (!result[url].includes(platform)) { - result[url].push(platform) + // Initialize result with platform arrays + const result: Record> = { + git: [], + github: [], + gitlab: [], + gerrit: [] + } + + const addToResult = (platform: PlatformType, fullUrl: string, label: string) => { + const platformKey = platform.toLowerCase() + if (!result[platformKey].some(item => item.url === fullUrl)) { + result[platformKey].push({ url: fullUrl, label }) } } @@ -381,34 +386,38 @@ export class CollectionService extends LoggerBase { if (i.platform === PlatformType.GITHUB) { for (const org of (i.settings as any).orgs) { for (const repo of org.repos) { - addToResult(i.platform, `${org.name}/${repo.name}`) + const label = `${org.name}/${repo.name}` + const fullUrl = `https://github.com/${label}` + addToResult(i.platform, fullUrl, label) } } } if (i.platform === PlatformType.GIT) { for (const r of (i.settings as any).remotes) { - if (r.includes('https://gitlab.com')) { - addToResult(i.platform, r.replace('https://gitlab.com/', '')) - } else if (r.includes('https://github.com')) { - addToResult(i.platform, r.replace('https://github.com/', '')) - } else { - addToResult(i.platform, r) + let label = r + if (r.includes('https://gitlab.com/')) { + label = r.replace('https://gitlab.com/', '') + } else if (r.includes('https://github.com/')) { + label = r.replace('https://github.com/', '') } + addToResult(i.platform, r, label) } } if (i.platform === PlatformType.GITLAB) { for (const group of Object.values((i.settings as any).groupProjects) as any[]) { for (const r of group) { - addToResult(i.platform, r.path_with_namespace) + const label = r.path_with_namespace + const fullUrl = `https://gitlab.com/${label}` + addToResult(i.platform, fullUrl, label) } } } if (i.platform === PlatformType.GERRIT) { for (const r of (i.settings as any).remote.repos) { - addToResult(i.platform, r) + addToResult(i.platform, r, r) } } } diff --git a/frontend/src/modules/admin/modules/collections/components/lf-collection-add.vue b/frontend/src/modules/admin/modules/collections/components/lf-collection-add.vue index aa3b30915a..a427113669 100644 --- a/frontend/src/modules/admin/modules/collections/components/lf-collection-add.vue +++ b/frontend/src/modules/admin/modules/collections/components/lf-collection-add.vue @@ -182,6 +182,7 @@ const onSubmit = () => { starred: project?.starred || false, })), isLF: true, + slug: form.name.toLowerCase().replace(/ /g, '-'), }; if (isEditForm.value) { handleCollectionUpdate(request); diff --git a/frontend/src/modules/admin/modules/collections/components/lf-insights-projects-list-dropdown.vue b/frontend/src/modules/admin/modules/collections/components/lf-insights-projects-list-dropdown.vue index 4bb778dcc2..4121a13805 100644 --- a/frontend/src/modules/admin/modules/collections/components/lf-insights-projects-list-dropdown.vue +++ b/frontend/src/modules/admin/modules/collections/components/lf-insights-projects-list-dropdown.vue @@ -39,7 +39,7 @@ @click="onOptionClick(project)" > { }; const handleCreate = () => { - const request = buildRequest(form); + const request = buildRequest({ + ...form, + slug: form.name.toLowerCase().replace(/ /g, '-'), + }); Message.info(null, { title: 'Insights project is being created', }); diff --git a/frontend/src/modules/admin/modules/insights-projects/insight-project-helper.ts b/frontend/src/modules/admin/modules/insights-projects/insight-project-helper.ts index deb399902d..edac295094 100644 --- a/frontend/src/modules/admin/modules/insights-projects/insight-project-helper.ts +++ b/frontend/src/modules/admin/modules/insights-projects/insight-project-helper.ts @@ -5,6 +5,7 @@ import { defaultWidgetsValues } from './widgets'; export const buildRequest = (form: InsightsProjectAddFormModel) => ({ segmentId: form.segmentId, name: form.name, + slug: form.slug, description: form.description, logoUrl: form.logoUrl, collections: form.collectionsIds, @@ -13,13 +14,7 @@ export const buildRequest = (form: InsightsProjectAddFormModel) => ({ github: form.github, twitter: form.twitter, linkedin: form.linkedin, - repositories: - form.repositories - ?.filter((repository: any) => repository.enabled) - ?.map((repository: any) => ({ - url: repository.url, - platforms: repository.platforms, - })) || [], + repositories: form.repositories?.filter((r) => r.enabled).map((r) => r.url), widgets: Object.keys(form.widgets).filter((key: string) => form.widgets[key]), }); @@ -46,14 +41,33 @@ export const buildForm = ( ), }); -export const buildRepositories = (res: any) => { - const repositories: any[] = []; - Object.keys(res).forEach((repoUrl: string) => { - repositories.push({ - url: repoUrl, - enabled: true, - platforms: res[repoUrl], +export const buildRepositories = (res: Record>) => { + const urlMap = new Map(); + + // Iterate through each platform (git, github, gitlab, gerrit) + Object.entries(res).forEach(([platform, repos]) => { + // Process each repository from the platform + repos.forEach((repo) => { + if (urlMap.has(repo.url)) { + // If URL exists, add the platform to its platforms array + const existing = urlMap.get(repo.url)!; + existing.platforms.push(platform); + } else { + // If URL is new, create a new entry + urlMap.set(repo.url, { + url: repo.url, + label: repo.label, + enabled: true, + platforms: [platform], + }); + } }); }); - return repositories; + + return Array.from(urlMap.values()); }; diff --git a/frontend/src/modules/admin/modules/insights-projects/models/insights-project-add-form.model.ts b/frontend/src/modules/admin/modules/insights-projects/models/insights-project-add-form.model.ts index c5b498feee..23d971bfc5 100644 --- a/frontend/src/modules/admin/modules/insights-projects/models/insights-project-add-form.model.ts +++ b/frontend/src/modules/admin/modules/insights-projects/models/insights-project-add-form.model.ts @@ -5,6 +5,7 @@ export interface InsightsProjectAddFormModel { logoUrl: string; collectionsIds: string[]; organizationId: string | undefined; + slug: string; organization: { id: string | undefined; displayName: string; @@ -16,6 +17,7 @@ export interface InsightsProjectAddFormModel { linkedin: string; repositories: { url: string; + label: string; enabled: boolean; platforms: string[]; }[]; diff --git a/frontend/src/modules/admin/modules/insights-projects/models/insights-project.model.ts b/frontend/src/modules/admin/modules/insights-projects/models/insights-project.model.ts index 8b96e2d6e2..eb16d10365 100644 --- a/frontend/src/modules/admin/modules/insights-projects/models/insights-project.model.ts +++ b/frontend/src/modules/admin/modules/insights-projects/models/insights-project.model.ts @@ -2,6 +2,7 @@ import { CollectionModel } from '../../collections/models/collection.model'; export interface InsightsProjectModel { id: string; + slug: string; segmentId: string; segment: { id: string; @@ -25,6 +26,7 @@ export interface InsightsProjectModel { enabled: boolean; repositories: { url: string; + label: string; enabled: boolean; platforms: string[]; }[]; diff --git a/scripts/.env.dist b/scripts/.env.dist new file mode 100644 index 0000000000..b67e5e6ad8 --- /dev/null +++ b/scripts/.env.dist @@ -0,0 +1,8 @@ +# Environment variables that override the defaults in the Docker Compose scaffold.yml file. +# In order to use, make a copy of this file named .env and change the values as needed. + +# Sequin +CROWD_SEQUIN_SERVER_HOST=192.168.122.31 +CROWD_SEQUIN_SERVER_PORT=7376 +CROWD_SEQUIN_SECRET_KEY_BASE=WyPLiGs0pvD6qJhKJICO4dauYPXfO/Yl782Zjtpew5qRBDp7CZvbWtQmY0eB13If +CROWD_SEQUIN_VAULT_KEY=2Sig69bIpuSm2kv0VQfDekET2qy8qUZGI8v3/h3ASiY= \ No newline at end of file diff --git a/scripts/.gitignore b/scripts/.gitignore index 94f12c2f1e..8a6309f4e1 100644 --- a/scripts/.gitignore +++ b/scripts/.gitignore @@ -1,2 +1,5 @@ db_dumps/* -!db_dumps/.gitkeep \ No newline at end of file +!db_dumps/.gitkeep +.env +kafka-connect-http.zip +scaffold/kafka-connect/kafka-connect-http/* \ No newline at end of file diff --git a/scripts/cli b/scripts/cli index 3396a6ce56..aa942587a7 100755 --- a/scripts/cli +++ b/scripts/cli @@ -536,7 +536,87 @@ function migrate_env() { -f /tmp/migrations/V1716382997__init-questdb.sql } -function migrate_local() { +function wait_for_tinybird() { + say "Waiting for Tinybird to get ready.." + until curl -s -f http://localhost:80/tokens >/dev/null 2>&1; do + yell "Tinybird is not ready yet. Retrying in 5 seconds.." + sleep 5 + done + say "Tinybird is ready!" +} + +function fetch_tinybird_tokens() { + wait_for_tinybird + read -r CROWD_TINYBIRD_WORKSPACE_ADMIN_TOKEN CROWD_TINYBIRD_USER_TOKEN < <(curl -s http://localhost:80/tokens | jq -r '[.workspace_admin_token, .user_token] | @tsv') + progress "Workspace Admin Token=$CROWD_TINYBIRD_WORKSPACE_ADMIN_TOKEN" + progress "User Token=$CROWD_TINYBIRD_USER_TOKEN" + export CROWD_TINYBIRD_WORKSPACE_ADMIN_TOKEN + export CROWD_TINYBIRD_USER_TOKEN +} + +function run_tinybird_cli_command() { + fetch_tinybird_tokens + yell "Running Tinybird CLI command: sh -c '$@'" + docker run --rm --network "${PROJECT_NAME}-bridge" \ + -e TB_TOKEN=$CROWD_TINYBIRD_WORKSPACE_ADMIN_TOKEN \ + -v "$CLI_HOME/../services/libs/tinybird:/mnt/data:rw" \ + -w /mnt/data \ + tinybirdco/tinybird-cli-docker:latest \ + sh -c "$@" +} + +function migrate_tinybird_local() { + set +e +o pipefail + + say "Applying Tinybird project!" + fetch_tinybird_tokens + + if [ -z "$CROWD_TINYBIRD_WORKSPACE_ADMIN_TOKEN" ] || + [ "$CROWD_TINYBIRD_WORKSPACE_ADMIN_TOKEN" = "null" ] || + [ -z "$CROWD_TINYBIRD_USER_TOKEN" ] || + [ "$CROWD_TINYBIRD_USER_TOKEN" = "null" ]; then + echo "Error: Could not fetch Tinybird tokens. Exiting." + return 1 + fi + + say "Pushing Tinybird project!" + + MAX_RETRIES=5 + RETRY_DELAY=5 + attempt=1 + + while [ $attempt -le $MAX_RETRIES ]; do + whisper "Attempt $attempt/$MAX_RETRIES to deploy Tinybird project.." + + OUTPUT=$(docker run --rm --network "${PROJECT_NAME}-bridge" \ + -e TB_TOKEN=$CROWD_TINYBIRD_WORKSPACE_ADMIN_TOKEN \ + -v $CLI_HOME/../services/libs/tinybird:/mnt/data:rw \ + -w /mnt/data \ + tinybirdco/tinybird-cli-docker \ + sh -c 'tb auth --host http://tinybird:80 && tb push') + + STATUS=$? + + whisper "$OUTPUT" + + if [[ "$OUTPUT" == *"502 error responses"* ]] || [[ "$OUTPUT" == *"Max retries exceeded"* ]] || [ $STATUS -ne 0 ]; then + yell "Tinybird push failed (Attempt: $attempt/$MAX_RETRIES). Retrying in $RETRY_DELAY seconds..." + sleep $RETRY_DELAY + else + say "Tinybird project deployed successfully!" + return 0 + fi + + attempt=$((attempt + 1)) + done + + set -eo pipefail + + error "Tinybird push failed after $MAX_RETRIES attempts. Exiting." + return 1 +} + +function migrate_postgres_local() { say "Building crowd flyway migration image..." docker build $DOCKER_PLATFORM_FLAGS -t crowd_flyway -f $CLI_HOME/../backend/src/database/Dockerfile.flyway $CLI_HOME/../backend/src/database --load say "Applying PostgreSQL migrations!" @@ -547,15 +627,19 @@ function migrate_local() { -e PGPASSWORD=example \ -e PGDATABASE=crowd-web \ crowd_flyway +} +function migrate_questdb_local() { say "Applying QuestDB migrations!" docker run --rm --network "${PROJECT_NAME}-bridge" \ -v $CLI_HOME/../services/migrations/questdb/:/tmp/migrations \ postgres psql postgresql://admin:quest@questdb:8812/qdb \ -f /tmp/migrations/V1716382997__init-questdb.sql +} +function migrate_productdb_local() { say "Building product flyway migration image..." - docker build $DOCKER_PLATFORM_FLAGS -t product_flyway -f $CLI_HOME/../backend/src/product/Dockerfile.flyway $CLI_HOME/../backend/src/product --load + docker build $DOCKER_PLATFORM_FLAGS -t product_flyway -f $CLI_HOME/../backend/src/product/Dockerfile.flyway $CLI_HOME/../backend/src/product say "Applying product database migrations!" docker run --rm --network "${PROJECT_NAME}-bridge" \ -e PGHOST=product \ @@ -566,6 +650,13 @@ function migrate_local() { product_flyway } +function migrate_local() { + migrate_postgres_local + migrate_questdb_local + migrate_productdb_local + migrate_tinybird_local +} + function up_test_scaffold() { scaffold_set_up_network "${PROJECT_NAME}-bridge-test" $DOCKET_TEST_NETWORK_SUBNET $DOCKER_TEST_NETWORK_GATEWAY $_DC -p "$PROJECT_NAME-test" -f $CLI_HOME/../backend/docker-compose.test.yaml down @@ -597,6 +688,14 @@ function install_libs() { (cd $CLI_HOME/.. && pnpm i --frozen-lockfile) } +function download_kafka_connect_http() { + say "Downloading Kafka Connect HTTP plugin..." + wget -nv https://github.com/lensesio/stream-reactor/releases/download/8.1.28/kafka-connect-http-8.1.28.zip -O $CLI_HOME/kafka-connect-http.zip + unzip -q $CLI_HOME/kafka-connect-http.zip -d $CLI_HOME/scaffold/kafka-connect/kafka-connect-http + rm $CLI_HOME/kafka-connect-http.zip + say "Kafka Connect HTTP plugin downloaded." +} + function up_scaffold() { scaffold_set_up_network "$PROJECT_NAME-bridge" $DOCKER_NETWORK_SUBNET $DOCKER_NETWORK_GATEWAY @@ -605,6 +704,13 @@ function up_scaffold() { profile='--profile nginx' fi + if [[ ! -d "$CLI_HOME/scaffold/kafka-connect/kafka-connect-http" || + -z "$(ls -A "$CLI_HOME/scaffold/kafka-connect/kafka-connect-http")" || + ! -d "$CLI_HOME/scaffold/kafka-connect/kafka-connect-http/kafka-connect-http-8.1.28" || + -z "$(ls -A "$CLI_HOME/scaffold/kafka-connect/kafka-connect-http/kafka-connect-http-8.1.28")" ]]; then + download_kafka_connect_http + fi + $_DC --compatibility -p $PROJECT_NAME -f $CLI_HOME/scaffold.yaml ${profile} up -d --build } @@ -897,6 +1003,10 @@ while test $# -gt 0; do (cd $CLI_HOME/../services/scripts && ./lint_apps.sh && ./lint_libs.sh) exit ;; + tb) + run_tinybird_cli_command "$*" + exit + ;; *) error "Invalid command '$1'" && say "$SCRIPT_USAGE" exit 1 diff --git a/scripts/scaffold.yaml b/scripts/scaffold.yaml index 70efd14a64..c4db50e866 100644 --- a/scripts/scaffold.yaml +++ b/scripts/scaffold.yaml @@ -1,10 +1,8 @@ -version: '3.1' - services: db: image: postgres:14-alpine restart: unless-stopped - command: -c 'max_connections=300' + command: -c 'max_connections=300' -c 'wal_level=logical' -c 'max_replication_slots=11' -c 'max_replication_slots=15' environment: - POSTGRES_PASSWORD=example - POSTGRES_DB=crowd-web @@ -12,6 +10,7 @@ services: - 5432:5432 volumes: - pgdata-dev:/var/lib/postgresql/data + - ./scaffold/sequin/postgres-docker-entrypoint-initdb.d/create-sequin-user-and-database.sql:/docker-entrypoint-initdb.d/create-sequin-user-and-database.sql shm_size: 1gb networks: - crowd-bridge @@ -154,6 +153,7 @@ services: - KAFKA_CFG_LOG_DIRS=/opt/bitnami/kafka/data - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=1 - KAFKA_KRAFT_CLUSTER_ID=OTMwNzFhYTY1ODNiNGE5OT + - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true ports: - '9092:9092' - '9093:9093' @@ -165,16 +165,15 @@ services: build: context: scaffold/kafka-connect restart: unless-stopped - entrypoint: - - connect-standalone - - /etc/kafka-connect/worker-local.properties - - /etc/kafka-connect/console-local-sink.properties - - /etc/kafka-connect/questdb-local-sink.properties + entrypoint: ["/bin/sh", "-c", "/wait-for-tinybird.sh"] volumes: - kafka-connect-dev:/storage + - ./scaffold/kafka-connect/wait-for-tinybird.sh:/wait-for-tinybird.sh - ./scaffold/kafka-connect/worker-local.properties:/etc/kafka-connect/worker-local.properties - ./scaffold/kafka-connect/console-local-sink.properties:/etc/kafka-connect/console-local-sink.properties - ./scaffold/kafka-connect/questdb-local-sink.properties:/etc/kafka-connect/questdb-local-sink.properties + - ./scaffold/kafka-connect/tinybird-local-sink.properties:/etc/kafka-connect/tinybird-local-sink.properties + networks: - crowd-bridge @@ -188,6 +187,44 @@ services: networks: - crowd-bridge + sequin: + image: sequin/sequin:latest + restart: unless-stopped + ports: + - "7376:7376" + environment: + - PG_HOSTNAME=db + - PG_DATABASE=sequin + - PG_PORT=5432 + - PG_USERNAME=postgres + - PG_PASSWORD=example + - PG_POOL_SIZE=10 + - SECRET_KEY_BASE=${CROWD_SEQUIN_SECRET_KEY_BASE:-WyPLiGs0pvD6qJhKJICO4dauYPXfO/Yl782Zjtpew5qRBDp7CZvbWtQmY0eB13If} + - VAULT_KEY=${CROWD_SEQUIN_VAULT_KEY:-2Sig69bIpuSm2kv0VQfDekET2qy8qUZGI8v3/h3ASiY=} + - REDIS_URL=redis://default:crowdtest@redis:6379 + - SERVER_HOST=${CROWD_SEQUIN_SERVER_HOST:-localhost} + - SERVER_PORT=${CROWD_SEQUIN_SERVER_PORT:-7376} + volumes: + - ./scaffold/sequin.yml:/config/sequin.yml + depends_on: + - db + - redis + - kafka + networks: + - crowd-bridge + + tinybird: + image: tinybirdco/tinybird-local:latest + ports: + - "80:80" + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:80/tokens"] + interval: 10s + retries: 5 + start_period: 10s + networks: + - crowd-bridge + networks: crowd-bridge: external: true @@ -200,3 +237,5 @@ volumes: s3-dev: redis-dev: kafka-connect-dev: + sequin-dev: + tinybird-data: diff --git a/scripts/scaffold/kafka-connect/Dockerfile b/scripts/scaffold/kafka-connect/Dockerfile index 8a9b6f1815..7dabc07505 100644 --- a/scripts/scaffold/kafka-connect/Dockerfile +++ b/scripts/scaffold/kafka-connect/Dockerfile @@ -6,8 +6,10 @@ RUN yum install -y jq findutils unzip RUN confluent-hub install snowflakeinc/snowflake-kafka-connector:2.5.0 --no-prompt RUN confluent-hub install questdb/kafka-questdb-connector:0.12 --no-prompt +COPY kafka-connect-http/ /usr/share/confluent-hub-components/kafka-connect-http/ + VOLUME /storage -USER appuser +USER root diff --git a/scripts/scaffold/kafka-connect/README.md b/scripts/scaffold/kafka-connect/README.md new file mode 100644 index 0000000000..482e3dddc9 --- /dev/null +++ b/scripts/scaffold/kafka-connect/README.md @@ -0,0 +1,5 @@ +The Kafka HTTP Connector we're using comes from here: +https://github.com/lensesio/stream-reactor/releases + +Its documentation is available here: +https://docs.lenses.io/latest/connectors/kafka-connectors/sinks/http \ No newline at end of file diff --git a/scripts/scaffold/kafka-connect/tinybird-local-sink.properties b/scripts/scaffold/kafka-connect/tinybird-local-sink.properties new file mode 100644 index 0000000000..38beb19462 --- /dev/null +++ b/scripts/scaffold/kafka-connect/tinybird-local-sink.properties @@ -0,0 +1,13 @@ +name=tinybird-sink +connector.class=io.lenses.streamreactor.connect.http.sink.HttpSinkConnector +tasks.max=1 +topics=activities,activityRelations,members,memberIdentities,organizations,collections,insightsProjects,collectionsInsightsProjects +connect.http.method=POST +connect.http.endpoint=http://tinybird:80/v0/events?name={{topic}} +connect.http.request.headers=Content-Type: application/json,Authorization: Bearer ${CROWD_TINYBIRD_WORKSPACE_ADMIN_TOKEN} +connect.http.batch.count=10 +connect.http.time.interval=5 +key.converter=org.apache.kafka.connect.storage.StringConverter +value.converter=org.apache.kafka.connect.storage.StringConverter +value.converter.schemas.enable=false +connect.http.request.content={{value}} \ No newline at end of file diff --git a/scripts/scaffold/kafka-connect/wait-for-tinybird.sh b/scripts/scaffold/kafka-connect/wait-for-tinybird.sh new file mode 100755 index 0000000000..a4a80b89a0 --- /dev/null +++ b/scripts/scaffold/kafka-connect/wait-for-tinybird.sh @@ -0,0 +1,37 @@ +#!/bin/sh +: ' + Waits the tinybird-local container to be ready and fetches the workspace admin token. + Then copies the tinybird sink files into tmp and + replaces the token placeholder in the file. + Finally starts kafka connect with updated sink files. +' + +set -e + +echo "Waiting for Tinybird to be ready..." +until curl -s -f http://tinybird:80/tokens; do + echo "Tinybird is not ready yet. Retrying in 5 seconds..." + sleep 5 +done + +echo "Tinybird is ready! Fetching workspace token..." +CROWD_TINYBIRD_WORKSPACE_ADMIN_TOKEN=$(curl -s http://tinybird:80/tokens | jq -r '.workspace_admin_token') + +if [ -z "$CROWD_TINYBIRD_WORKSPACE_ADMIN_TOKEN" ] || [ "$CROWD_TINYBIRD_WORKSPACE_ADMIN_TOKEN" = "null" ]; then + echo "Error: Could not fetch Tinybird token. Exiting." + exit 1 +fi +TINYBIRD_SINK_FILE="/etc/kafka-connect/tinybird-local-sink.properties" +TINYBIRD_SINK_TEMP_FILE="/tmp/tinybird-local-sink.properties" + +cp "$TINYBIRD_SINK_FILE" "$TINYBIRD_SINK_TEMP_FILE" + +sed -i "s|\${CROWD_TINYBIRD_WORKSPACE_ADMIN_TOKEN}|$CROWD_TINYBIRD_WORKSPACE_ADMIN_TOKEN|g" "$TINYBIRD_SINK_TEMP_FILE" + +echo "✅ Using token [$CROWD_TINYBIRD_WORKSPACE_ADMIN_TOKEN] in Tinybird http sink of Kafka Connect." + +exec connect-standalone \ + /etc/kafka-connect/worker-local.properties \ + /etc/kafka-connect/console-local-sink.properties \ + /etc/kafka-connect/questdb-local-sink.properties \ + "$TINYBIRD_SINK_TEMP_FILE" diff --git a/scripts/scaffold/sequin.yml b/scripts/scaffold/sequin.yml new file mode 100644 index 0000000000..6e89328846 --- /dev/null +++ b/scripts/scaffold/sequin.yml @@ -0,0 +1,7 @@ +account: + name: "LFX Insights" + +users: + - account: "LFX Insights" + email: "admin@sequinstream.com" + password: "sequinpassword!" diff --git a/scripts/scaffold/sequin/postgres-docker-entrypoint-initdb.d/create-sequin-user-and-database.sql b/scripts/scaffold/sequin/postgres-docker-entrypoint-initdb.d/create-sequin-user-and-database.sql new file mode 100644 index 0000000000..df6964d8ab --- /dev/null +++ b/scripts/scaffold/sequin/postgres-docker-entrypoint-initdb.d/create-sequin-user-and-database.sql @@ -0,0 +1,22 @@ +CREATE DATABASE sequin; + +CREATE USER sequin +WITH + PASSWORD 'supersecretpassword'; + +GRANT CONNECT ON DATABASE sequin to sequin; + +GRANT +SELECT + ON ALL TABLES IN SCHEMA public TO sequin; + +ALTER USER sequin +WITH + replication; + +create publication sequin_pub for table activities +with + (publish_via_partition_root = true); + +select + pg_create_logical_replication_slot ('sequin_slot', 'pgoutput'); diff --git a/services/apps/cron_service/src/jobs/nangoTrigger.job.ts b/services/apps/cron_service/src/jobs/nangoTrigger.job.ts index 2e91401e98..a5cde72307 100644 --- a/services/apps/cron_service/src/jobs/nangoTrigger.job.ts +++ b/services/apps/cron_service/src/jobs/nangoTrigger.job.ts @@ -18,7 +18,6 @@ const job: IJobDefinition = { name: 'nango-trigger', cronTime: IS_DEV_ENV ? CronTime.everyMinute() : CronTime.every(15).minutes(), timeout: 5 * 60, - enabled: async () => false, process: async (ctx) => { ctx.log.info('Triggering nango API check as if a webhook was received!') diff --git a/services/apps/data_sink_worker/src/service/member.service.ts b/services/apps/data_sink_worker/src/service/member.service.ts index 9d7af1bbdf..78d7528e81 100644 --- a/services/apps/data_sink_worker/src/service/member.service.ts +++ b/services/apps/data_sink_worker/src/service/member.service.ts @@ -304,15 +304,11 @@ export default class MemberService extends LoggerBase { this.log.trace({ memberId: id }, 'Updating member data in db!') - // TODO uros - temp hack to prevent updating just joinedAt - const keys = Object.keys(dataToUpdate) - if (!(keys.length === 1 && dataToUpdate.joinedAt)) { - await logExecutionTimeV2( - () => txRepo.update(id, dataToUpdate), - this.log, - 'memberService -> update -> update', - ) - } + await logExecutionTimeV2( + () => txRepo.update(id, dataToUpdate), + this.log, + 'memberService -> update -> update', + ) this.log.trace({ memberId: id }, 'Updating member segment association data in db!') await logExecutionTimeV2( diff --git a/services/apps/nango_webhook_api/src/main.ts b/services/apps/nango_webhook_api/src/main.ts index 668465a347..2705ebc7bc 100644 --- a/services/apps/nango_webhook_api/src/main.ts +++ b/services/apps/nango_webhook_api/src/main.ts @@ -12,11 +12,12 @@ import { HttpStatusError } from '@crowd/common' import { Logger, getChildLogger, getServiceLogger } from '@crowd/logging' import { ALL_NANGO_INTEGRATIONS, INangoWebhookPayload, NangoIntegration } from '@crowd/nango' import { telemetryExpressMiddleware } from '@crowd/telemetry' +import { TEMPORAL_CONFIG, WorkflowIdReusePolicy, getTemporalClient } from '@crowd/temporal' const log = getServiceLogger() setImmediate(async () => { - // const temporal = await getTemporalClient(TEMPORAL_CONFIG()) + const temporal = await getTemporalClient(TEMPORAL_CONFIG()) const app = express() @@ -49,15 +50,15 @@ setImmediate(async () => { 'Received nango webhook!', ) - // await temporal.workflow.start('processNangoWebhook', { - // taskQueue: 'nango', - // workflowId: `nango-webhook/${payload.providerConfigKey}/${payload.connectionId}/${payload.model}`, - // workflowIdReusePolicy: WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING, - // retry: { - // maximumAttempts: 10, - // }, - // args: [payload], - // }) + await temporal.workflow.start('processNangoWebhook', { + taskQueue: 'nango', + workflowId: `nango-webhook/${payload.providerConfigKey}/${payload.connectionId}/${payload.model}`, + workflowIdReusePolicy: WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING, + retry: { + maximumAttempts: 10, + }, + args: [payload], + }) res.sendStatus(204) }), diff --git a/services/apps/script_executor_worker/src/activities.ts b/services/apps/script_executor_worker/src/activities.ts index 890d89879d..534dbe5f32 100644 --- a/services/apps/script_executor_worker/src/activities.ts +++ b/services/apps/script_executor_worker/src/activities.ts @@ -5,6 +5,13 @@ import { unmergeMembersPreview, waitForTemporalWorkflowExecutionFinish, } from './activities/common' +import { + getActivitiesToCopyToTinybird, + getLatestSyncedActivityTimestampForSyncingActivitiesToTinybird, + markActivitiesAsIndexedForSyncingActivitiesToTinybird, + resetIndexedIdentitiesForSyncingActivitiesToTinybird, + sendActivitiesToTinybird, +} from './activities/copy-activities-from-questdb-to-tinybird' import { findMemberById, findMemberIdentitiesGroupedByPlatform, @@ -48,15 +55,20 @@ export { updateOrganizationIdentity, deleteOrganizationIdentity, isLfxMember, + resetIndexedIdentitiesForSyncingActivitiesToTinybird, + getActivitiesToCopy, + getLatestSyncedActivityTimestampForSyncingActivitiesToTinybird, + markActivitiesAsIndexed, + sendActivitiesToTinybird, createRelations, resetIndexedIdentities, - getActivitiesToCopy, getLatestSyncedActivityTimestamp, - markActivitiesAsIndexed, syncMembersBatch, getMembersForSync, getOrganizationsForSync, syncOrganizationsBatch, deleteIndexedEntities, markEntitiesIndexed, + getActivitiesToCopyToTinybird, + markActivitiesAsIndexedForSyncingActivitiesToTinybird, } diff --git a/services/apps/script_executor_worker/src/activities/copy-activities-from-questdb-to-tinybird/index.ts b/services/apps/script_executor_worker/src/activities/copy-activities-from-questdb-to-tinybird/index.ts new file mode 100644 index 0000000000..09e94123e5 --- /dev/null +++ b/services/apps/script_executor_worker/src/activities/copy-activities-from-questdb-to-tinybird/index.ts @@ -0,0 +1,100 @@ +import axios from 'axios' + +import { getActivitiesSortedByTimestamp } from '@crowd/data-access-layer' +import { RedisCache } from '@crowd/redis' + +import { svc } from '../../main' + +export async function resetIndexedIdentitiesForSyncingActivitiesToTinybird(): Promise { + const redisCache = new RedisCache(`sync-activities-to-tinybird`, svc.redis, svc.log) + await redisCache.delete('latest-synced-activity-timestamp') +} + +export async function getLatestSyncedActivityTimestampForSyncingActivitiesToTinybird(): Promise { + const redisCache = new RedisCache(`sync-activities-to-tinybird`, svc.redis, svc.log) + const result = await redisCache.get('latest-synced-activity-timestamp') + return result || null +} + +export async function markActivitiesAsIndexedForSyncingActivitiesToTinybird( + activitiesRedisKey: string, +): Promise { + const activities = await getActivitiyDataFromRedis(activitiesRedisKey) + const redisCache = new RedisCache(`sync-activities-to-tinybird`, svc.redis, svc.log) + const lastSyncedTimestamp = activities[activities.length - 1].timestamp + await redisCache.set('latest-synced-activity-timestamp', lastSyncedTimestamp) + return lastSyncedTimestamp +} + +export async function getActivitiesToCopyToTinybird( + latestSyncedActivityTimestamp: string, + limit: number, + segmentIds: string[], +) { + const activities = await getActivitiesSortedByTimestamp( + svc.questdbSQL, + latestSyncedActivityTimestamp, + segmentIds, + limit, + ) + + if (activities.length === 0) { + return null + } + + // generate a random key + const key = Math.random().toString(36).substring(7) + await saveActivityDataToRedis(key, activities) + + return { + activitiesRedisKey: key, + activitiesLength: activities.length, + lastTimestamp: activities[activities.length - 1].timestamp, + } +} + +export async function sendActivitiesToTinybird(activitiesRedisKey: string): Promise { + let response + const activities = await getActivitiyDataFromRedis(activitiesRedisKey) + + try { + const url = `https://api.us-west-2.aws.tinybird.co/v0/events?name=activities` + const config = { + method: 'post', + url, + data: activities.map((a) => JSON.stringify(a)).join('\n'), + headers: { + Authorization: `Bearer ${process.env['CROWD_TINYBIRD_ACCESS_TOKEN']}`, + }, + validateStatus: function (status) { + return (status >= 200 && status < 300) || status === 404 || status === 422 + }, + } + + response = (await axios(config)).data + console.log(`Data sent to tinybird -> ${JSON.stringify(response)}`) + } catch (err) { + if (axios.isAxiosError(err)) { + this.log.warn( + `Axios error occurred while sending activities to tinybird. ${err.response?.status} - ${err.response?.statusText}`, + ) + throw new Error(`Sending data to tinybird failed with status: ${err.response?.status}`) + } else { + this.log.error(`Unexpected error while sending data to tinybird: ${err}`) + throw new Error('An unexpected error occurred') + } + } + + return response +} + +export async function saveActivityDataToRedis(key: string, activities): Promise { + const redisCache = new RedisCache(`sync-activities-to-tinybird`, svc.redis, svc.log) + await redisCache.set(key, JSON.stringify(activities), 30) +} + +export async function getActivitiyDataFromRedis(key: string) { + const redisCache = new RedisCache(`sync-activities-to-tinybird`, svc.redis, svc.log) + const result = await redisCache.get(key) + return JSON.parse(result) +} diff --git a/services/apps/script_executor_worker/src/main.ts b/services/apps/script_executor_worker/src/main.ts index 82fbf21f0c..151be28ebb 100644 --- a/services/apps/script_executor_worker/src/main.ts +++ b/services/apps/script_executor_worker/src/main.ts @@ -1,10 +1,11 @@ import { Config } from '@crowd/archetype-standard' import { Options, ServiceWorker } from '@crowd/archetype-worker' +import { scheduleCopyActivitiesFromQuestdbToTinybird } from './schedules/scheduleCopyActivitiesFromQuestdbToTinybird' import { schedulePopulateActivityRelations } from './schedules/schedulePopulateActivityRelations' const config: Config = { - envvars: ['CROWD_API_SERVICE_URL', 'CROWD_API_SERVICE_USER_TOKEN'], + envvars: ['CROWD_TINYBIRD_ACCESS_TOKEN'], producer: { enabled: false, }, @@ -37,6 +38,7 @@ setImmediate(async () => { await svc.init() await schedulePopulateActivityRelations() + await scheduleCopyActivitiesFromQuestdbToTinybird() await svc.start() }) diff --git a/services/apps/script_executor_worker/src/schedules/scheduleCopyActivitiesFromQuestdbToTinybird.ts b/services/apps/script_executor_worker/src/schedules/scheduleCopyActivitiesFromQuestdbToTinybird.ts new file mode 100644 index 0000000000..852be8d1a5 --- /dev/null +++ b/services/apps/script_executor_worker/src/schedules/scheduleCopyActivitiesFromQuestdbToTinybird.ts @@ -0,0 +1,49 @@ +import { ScheduleAlreadyRunning, ScheduleOverlapPolicy } from '@temporalio/client' + +import { svc } from '../main' +import { copyActivitiesFromQuestdbToTinybird } from '../workflows' + +export const scheduleCopyActivitiesFromQuestdbToTinybird = async () => { + try { + await svc.temporal.schedule.create({ + scheduleId: 'copyActivitiesFromQuestdbToTinybird', + spec: { + cronExpressions: ['30 0 * * *'], + }, + policies: { + overlap: ScheduleOverlapPolicy.BUFFER_ONE, + catchupWindow: '1 minute', + }, + action: { + type: 'startWorkflow', + workflowType: copyActivitiesFromQuestdbToTinybird, + taskQueue: 'script-executor', + retry: { + initialInterval: '15 seconds', + backoffCoefficient: 2, + maximumAttempts: 3, + }, + args: [ + { + batchSizePerRun: 50000, + deleteIndexedEntities: false, + segmentIds: [ + '7b7bf9ce-6051-4c97-85eb-35c790203a0c', + 'f809d51c-68fa-401a-8aea-eb67989285c1', + 'e2fa0373-7379-416f-a651-70b1fc1b89b9', + 'e2c3321f-0d85-4a16-b603-66fd9f882a06', + '2cd61b6e-de33-4a51-bb17-fbe33cc0ac0c', + ], + }, + ], + }, + }) + } catch (err) { + if (err instanceof ScheduleAlreadyRunning) { + svc.log.info('Schedule already registered in Temporal.') + svc.log.info('Configuration may have changed since. Please make sure they are in sync.') + } else { + throw new Error(err) + } + } +} diff --git a/services/apps/script_executor_worker/src/types.ts b/services/apps/script_executor_worker/src/types.ts index e816064419..2e2cf78cbd 100644 --- a/services/apps/script_executor_worker/src/types.ts +++ b/services/apps/script_executor_worker/src/types.ts @@ -16,6 +16,13 @@ export interface IDissectMemberArgs { forceSplitAllIdentities?: boolean } +export interface ICopyActivitiesFromQuestDbToTinybirdArgs { + deleteIndexedEntities?: boolean + batchSizePerRun?: number + latestSyncedActivityTimestamp?: string + segmentIds?: string[] +} + export interface IFixOrgIdentitiesWithWrongUrlsArgs { testRun?: boolean } diff --git a/services/apps/script_executor_worker/src/workflows.ts b/services/apps/script_executor_worker/src/workflows.ts index e4eacec898..30da8d3955 100644 --- a/services/apps/script_executor_worker/src/workflows.ts +++ b/services/apps/script_executor_worker/src/workflows.ts @@ -1,3 +1,4 @@ +import { copyActivitiesFromQuestdbToTinybird } from './workflows/copyActivitiesFromQuestdbToTinybird' import { dissectMember } from './workflows/dissectMember' import { findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization } from './workflows/findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization' import { findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms } from './workflows/findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms' @@ -11,6 +12,7 @@ export { findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization, dissectMember, fixOrgIdentitiesWithWrongUrls, + copyActivitiesFromQuestdbToTinybird, populateActivityRelations, syncMembers, syncOrganizations, diff --git a/services/apps/script_executor_worker/src/workflows/copyActivitiesFromQuestdbToTinybird.ts b/services/apps/script_executor_worker/src/workflows/copyActivitiesFromQuestdbToTinybird.ts new file mode 100644 index 0000000000..8bedf5023e --- /dev/null +++ b/services/apps/script_executor_worker/src/workflows/copyActivitiesFromQuestdbToTinybird.ts @@ -0,0 +1,52 @@ +import { continueAsNew, proxyActivities } from '@temporalio/workflow' + +import * as activities from '../activities/copy-activities-from-questdb-to-tinybird' +import { ICopyActivitiesFromQuestDbToTinybirdArgs } from '../types' + +const activity = proxyActivities({ + startToCloseTimeout: '3 minute', + retry: { maximumAttempts: 3 }, +}) + +export async function copyActivitiesFromQuestdbToTinybird( + args: ICopyActivitiesFromQuestDbToTinybirdArgs, +): Promise { + const BATCH_SIZE_PER_RUN = args.batchSizePerRun || 1000 + let latestSyncedActivityTimestamp + + if (args.deleteIndexedEntities) { + await activity.resetIndexedIdentitiesForSyncingActivitiesToTinybird() + } else { + latestSyncedActivityTimestamp = + args.latestSyncedActivityTimestamp || + (await activity.getLatestSyncedActivityTimestampForSyncingActivitiesToTinybird()) + } + + const { activitiesLength, activitiesRedisKey, lastTimestamp } = + await activity.getActivitiesToCopyToTinybird( + latestSyncedActivityTimestamp ?? undefined, + BATCH_SIZE_PER_RUN, + args.segmentIds ?? [], + ) + + if (activitiesLength === 0) { + return + } + + if (activitiesLength < BATCH_SIZE_PER_RUN) { + if (lastTimestamp === args.latestSyncedActivityTimestamp) { + return + } + } + + // 4- Send activities to tinybird + await activity.sendActivitiesToTinybird(activitiesRedisKey) + + // 5- Mark activities as indexed + await activity.markActivitiesAsIndexedForSyncingActivitiesToTinybird(activitiesRedisKey) + + await continueAsNew({ + batchSizePerRun: args.batchSizePerRun, + latestSyncedActivityTimestamp: lastTimestamp, + }) +} diff --git a/services/libs/data-access-layer/src/activities/sql.ts b/services/libs/data-access-layer/src/activities/sql.ts index 3654d39966..5a067b6153 100644 --- a/services/libs/data-access-layer/src/activities/sql.ts +++ b/services/libs/data-access-layer/src/activities/sql.ts @@ -1800,3 +1800,40 @@ export async function getActivityRelationsSortedByTimestamp( return rows } + +export async function getActivitiesSortedByTimestamp( + qdbConn: DbConnOrTx, + cursorActivityTimestamp?: string, + segmentIds?: string[], + limit = 100, +) { + let cursorQuery = '' + let segmentQuery = '' + + if (cursorActivityTimestamp) { + cursorQuery = `AND "timestamp" >= $(cursorActivityTimestamp)` + } + + if (segmentIds && segmentIds.length > 0) { + segmentQuery = `AND "segmentId" IN ($(segmentIds:csv))` + } + + const query = ` + SELECT + * + FROM activities + WHERE "deletedAt" IS NULL + ${cursorQuery} + ${segmentQuery} + ORDER BY "timestamp" asc + LIMIT ${limit} + ` + + const rows = await qdbConn.any(query, { + cursorActivityTimestamp, + limit, + segmentIds, + }) + + return rows +} diff --git a/services/libs/data-access-layer/src/collections/index.ts b/services/libs/data-access-layer/src/collections/index.ts index 30770790d4..7139f2655b 100644 --- a/services/libs/data-access-layer/src/collections/index.ts +++ b/services/libs/data-access-layer/src/collections/index.ts @@ -105,8 +105,8 @@ export async function createCollection( ): Promise { return qx.selectOne( ` - INSERT INTO collections (name, description, "isLF") - VALUES ($(name), $(description), $(isLF)) + INSERT INTO collections (name, description, "isLF", slug) + VALUES ($(name), $(description), $(isLF), $(slug)) RETURNING * `, collection, @@ -137,6 +137,7 @@ export enum InsightsProjectField { TWITTER = 'twitter', WIDGETS = 'widgets', REPOSITORIES = 'repositories', + SLUG = 'slug', } export async function queryInsightsProjects( diff --git a/services/libs/opensearch/src/repo/indexing.repo.ts b/services/libs/opensearch/src/repo/indexing.repo.ts index f57570165b..b89de477a8 100644 --- a/services/libs/opensearch/src/repo/indexing.repo.ts +++ b/services/libs/opensearch/src/repo/indexing.repo.ts @@ -31,4 +31,21 @@ export class IndexingRepository extends RepositoryBase { await this.db().none(query) } } + + public async getLatestIndexedEntityId(type: IndexedEntityType): Promise { + const result = await this.db().oneOrNone<{ entity_id: string }>( + ` + select entity_id + from indexed_entities + where type = $(type) + order by entity_id desc + limit 1 + `, + { + type, + }, + ) + + return result?.entity_id ?? null + } } diff --git a/services/libs/queue/src/queue.ts b/services/libs/queue/src/queue.ts index b6b8a2de6c..a2c188b184 100644 --- a/services/libs/queue/src/queue.ts +++ b/services/libs/queue/src/queue.ts @@ -17,7 +17,11 @@ export abstract class QueueBase extends LoggerBase { queueName: queueConf.name, }) - this.channelName = `${queueConf.name}${this.getQueueSuffix()}` + if (queueConf.useOnlyNameAsChannel) { + this.channelName = queueConf.name + } else { + this.channelName = `${queueConf.name}${this.getQueueSuffix()}` + } } public isInitialized(): boolean { diff --git a/services/libs/queue/src/types.ts b/services/libs/queue/src/types.ts index bae1ea7f34..97efaa7972 100644 --- a/services/libs/queue/src/types.ts +++ b/services/libs/queue/src/types.ts @@ -9,10 +9,12 @@ export type IQueueClient = Kafka // export type IQueueConfig = IKafkaConfig export interface IQueueConfig { name: string + useOnlyNameAsChannel?: boolean } export interface IQueueInitChannelConfig { name: string + useOnlyNameAsChannel?: boolean } export type IQueueSendResult = RecordMetadata[] diff --git a/services/libs/queue/src/vendors/kafka/config.ts b/services/libs/queue/src/vendors/kafka/config.ts index cfd964c994..4ae0e6f94f 100644 --- a/services/libs/queue/src/vendors/kafka/config.ts +++ b/services/libs/queue/src/vendors/kafka/config.ts @@ -47,6 +47,7 @@ export const INTEGRATION_STREAM_WORKER_QUEUE_SETTINGS: IKafkaChannelConfig = { export const ACTIVITIES_QUEUE_SETTINGS: IKafkaChannelConfig = { name: CrowdQueue.ACTIVITIES, + useOnlyNameAsChannel: true, replicationFactor: 1, partitions: { default: 1, diff --git a/services/libs/tinybird/datasources/activities.datasource b/services/libs/tinybird/datasources/activities.datasource new file mode 100644 index 0000000000..971898cea7 --- /dev/null +++ b/services/libs/tinybird/datasources/activities.datasource @@ -0,0 +1,34 @@ + +SCHEMA > + `id` String `json:$.id`, + `type` LowCardinality(String) `json:$.type`, + `timestamp` DateTime `json:$.timestamp`, + `platform` LowCardinality(String) `json:$.platform`, + `isContribution` UInt8 `json:$.isContribution`, + `score` Int8 `json:$.score` DEFAULT -1, + `sourceId` String `json:$.sourceId`, + `createdAt` DateTime64(3) `json:$.createdAt`, + `updatedAt` DateTime64(3) `json:$.updatedAt`, + `sourceParentId` String `json:$.sourceParentId` DEFAULT '', + `attributes` String `json:$.attributes`, + `title` String `json:$.title` DEFAULT '', + `body` String `json:$.body` DEFAULT '', + `channel` String `json:$.channel` DEFAULT '', + `url` String `json:$.url` DEFAULT '', + `sentimentLabel` String `json:$.sentimentLabel` DEFAULT '', + `sentimentScore` Int8 `json:$.sentimentScore` DEFAULT -1, + `sentimentScoreMixed` Int8 `json:$.sentimentScoreMixed` DEFAULT -1, + `sentimentScoreNeutral` Int8 `json:$.sentimentScoreNeutral` DEFAULT -1, + `sentimentScoreNegative` Int8 `json:$.sentimentScoreNegative` DEFAULT -1, + `sentimentScorePositive` Int8 `json:$.sentimentScorePositive` DEFAULT -1, + `gitIsMainBranch` UInt8 `json:$.gitIsMainBranch` DEFAULT 0, + `gitIsIndirectFork` UInt8 `json:$.gitIsIndirectFork` DEFAULT 0, + `gitLines` Int32 `json:$.gitLines` DEFAULT 0, + `gitInsertions` Int32 `json:$.gitInsertions` DEFAULT 0, + `gitDeletions` Int32 `json:$.gitDeletions` DEFAULT 0, + `gitIsMerge` UInt8 `json:$.gitIsMerge` DEFAULT 0 + +ENGINE "ReplacingMergeTree" +ENGINE_PARTITION_KEY "toYear(createdAt)" +ENGINE_SORTING_KEY "isContribution, platform, type, channel, sourceId, timestamp" +ENGINE_VER "updatedAt" diff --git a/services/libs/tinybird/datasources/activityRelations.datasource b/services/libs/tinybird/datasources/activityRelations.datasource new file mode 100644 index 0000000000..e6dd70d657 --- /dev/null +++ b/services/libs/tinybird/datasources/activityRelations.datasource @@ -0,0 +1,18 @@ + +SCHEMA > + `activityId` String, + `conversationId` String, + `createdAt` DateTime64(3), + `updatedAt` DateTime64(3), + `memberId` String, + `objectMemberId` String, + `objectMemberUsername` String, + `organizationId` String, + `parentId` String, + `platform` LowCardinality(String), + `segmentId` String, + `username` String + +ENGINE "ReplacingMergeTree" +ENGINE_PARTITION_KEY "toYear(createdAt)" +ENGINE_SORTING_KEY "segmentId, platform, activityId" diff --git a/services/libs/tinybird/datasources/activityRepositories.datasource b/services/libs/tinybird/datasources/activityRepositories.datasource new file mode 100644 index 0000000000..3eec14456e --- /dev/null +++ b/services/libs/tinybird/datasources/activityRepositories.datasource @@ -0,0 +1,10 @@ +# Data Source created from Pipe 'activity_repositories' + +SCHEMA > + `projectId` String, + `projectName` String, + `projectSlug` String, + `repo` String + +ENGINE "ReplacingMergeTree" +ENGINE_SORTING_KEY "projectName, projectSlug, repo" diff --git a/services/libs/tinybird/datasources/collections.datasource b/services/libs/tinybird/datasources/collections.datasource new file mode 100644 index 0000000000..a651bb119b --- /dev/null +++ b/services/libs/tinybird/datasources/collections.datasource @@ -0,0 +1,14 @@ + +SCHEMA > + `id` String `json:$.record.id`, + `name` String `json:$.record.name`, + `slug` String `json:$.record.slug`, + `description` String `json:$.record.description` DEFAULT '', + `isLF` UInt8 `json:$.record.isLF` DEFAULT 0, + `createdAt` DateTime64(3) `json:$.record.createdAt`, + `updatedAt` DateTime64(3) `json:$.record.updatedAt` + +ENGINE "ReplacingMergeTree" +ENGINE_PARTITION_KEY "toYear(createdAt)" +ENGINE_SORTING_KEY "id" +ENGINE_VER "updatedAt" diff --git a/services/libs/tinybird/datasources/collectionsInsightsProjects.datasource b/services/libs/tinybird/datasources/collectionsInsightsProjects.datasource new file mode 100644 index 0000000000..dbbae8cd64 --- /dev/null +++ b/services/libs/tinybird/datasources/collectionsInsightsProjects.datasource @@ -0,0 +1,13 @@ + +SCHEMA > + `id` String `json:$.record.id`, + `collectionId` String `json:$.record.collectionId`, + `insightsProjectId` String `json:$.record.insightsProjectId`, + `starred` UInt8 `json:$.record.starred` DEFAULT 0, + `createdAt` DateTime64(3) `json:$.record.createdAt`, + `updatedAt` DateTime64(3) `json:$.record.updatedAt` + +ENGINE "ReplacingMergeTree" +ENGINE_PARTITION_KEY "toYear(createdAt)" +ENGINE_SORTING_KEY "id" +ENGINE_VER "updatedAt" diff --git a/services/libs/tinybird/datasources/insightsProjects.datasource b/services/libs/tinybird/datasources/insightsProjects.datasource new file mode 100644 index 0000000000..a96c6b0aa4 --- /dev/null +++ b/services/libs/tinybird/datasources/insightsProjects.datasource @@ -0,0 +1,22 @@ + +SCHEMA > + `id` String `json:$.record.id`, + `name` String `json:$.record.name`, + `slug` String `json:$.record.slug`, + `description` String `json:$.record.description` DEFAULT '', + `segmentId` String `json:$.record.segmentId` DEFAULT '', + `createdAt` DateTime64(3) `json:$.record.createdAt`, + `updatedAt` DateTime64(3) `json:$.record.updatedAt`, + `logoUrl` String `json:$.record.logoUrl` DEFAULT '', + `organizationId` String `json:$.record.organizationId` DEFAULT '', + `website` String `json:$.record.website` DEFAULT '', + `github` String `json:$.record.github` DEFAULT '', + `linkedin` String `json:$.record.linkedin` DEFAULT '', + `twitter` String `json:$.record.twitter` DEFAULT '', + `widgets` Array(String) `json:$.record.widgets[:]` DEFAULT [], + `repositories` String `json:$.record.repositories` DEFAULT '' + +ENGINE "ReplacingMergeTree" +ENGINE_PARTITION_KEY "toYear(createdAt)" +ENGINE_SORTING_KEY "id" +ENGINE_VER "updatedAt" diff --git a/services/libs/tinybird/datasources/memberIdentities.datasource b/services/libs/tinybird/datasources/memberIdentities.datasource new file mode 100644 index 0000000000..5e8c90ec37 --- /dev/null +++ b/services/libs/tinybird/datasources/memberIdentities.datasource @@ -0,0 +1,17 @@ + +SCHEMA > + `id` String `json:$.id`, + `createdAt` DateTime64(3) `json:$.record.createdAt`, + `updatedAt` DateTime64(3) `json:$.record.updatedAt`, + `integrationId` String `json:$.record.integrationId` DEFAULT '', + `memberId` String `json:$.record.memberId`, + `platform` LowCardinality(String) `json:$.record.platform`, + `sourceId` String `json:$.record.sourceId` DEFAULT '', + `type` String `json:$.record.type`, + `value` String `json:$.record.value`, + `verified` UInt8 `json:$.record.verified` + +ENGINE "ReplacingMergeTree" +ENGINE_PARTITION_KEY "toYear(createdAt)" +ENGINE_SORTING_KEY "memberId, platform, type" +ENGINE_VER "updatedAt" diff --git a/services/libs/tinybird/datasources/members.datasource b/services/libs/tinybird/datasources/members.datasource new file mode 100644 index 0000000000..37a6fe562a --- /dev/null +++ b/services/libs/tinybird/datasources/members.datasource @@ -0,0 +1,20 @@ + +SCHEMA > + `id` String `json:$.record.id`, + `attributes` String `json:$.record.attributes` DEFAULT '', + `location` String `json:$.record.attributes.location.default` DEFAULT '', + `country` String `json:$.record.attributes.country.default` DEFAULT '', + `avatar` String `json:$.record.attributes.avatarUrl.default` DEFAULT '', + `isBot` UInt8 `json:$.record.attributes.isBot.default` DEFAULT 0, + `isTeamMember` UInt8 `json:$.record.attributes.isTeamMember.default` DEFAULT 0, + `contributions` String `json:$.record.contributions` DEFAULT '', + `joinedAt` DateTime64(3) `json:$.record.joinedAt`, + `createdAt` DateTime64(3) `json:$.record.createdAt`, + `updatedAt` DateTime64(3) `json:$.record.updatedAt`, + `displayName` String `json:$.record.displayName`, + `score` Int32 `json:$.record.score` DEFAULT -1 + +ENGINE "ReplacingMergeTree" +ENGINE_PARTITION_KEY "toYear(createdAt)" +ENGINE_SORTING_KEY "id" +ENGINE_VER "updatedAt" diff --git a/services/libs/tinybird/datasources/organizations.datasource b/services/libs/tinybird/datasources/organizations.datasource new file mode 100644 index 0000000000..5d376cbe54 --- /dev/null +++ b/services/libs/tinybird/datasources/organizations.datasource @@ -0,0 +1,21 @@ + +SCHEMA > + `id` String `json:$.record.id`, + `displayName` String `json:$.record.displayName`, + `location` String `json:$.record.location` DEFAULT '', + `logo` String `json:$.record.logo` DEFAULT '', + `tags` String `json:$.record.tags` DEFAULT '', + `employees` UInt8 `json:$.record.employees` DEFAULT 0, + `createdAt` DateTime64(3) `json:$.record.createdAt`, + `updatedAt` DateTime64(3) `json:$.record.updatedAt`, + `isTeamOrganization` UInt8 `json:$.record.isTeamOrganization` DEFAULT 0, + `type` String `json:$.record.type` DEFAULT '', + `size` String `json:$.record.size` DEFAULT '', + `headline` String `json:$.record.headline` DEFAULT '', + `industry` String `json:$.record.industry` DEFAULT '', + `founded` UInt8 `json:$.record.founded` DEFAULT 0 + +ENGINE "ReplacingMergeTree" +ENGINE_PARTITION_KEY "toYear(createdAt)" +ENGINE_SORTING_KEY "id" +ENGINE_VER "updatedAt" diff --git a/services/libs/tinybird/datasources/projectsAggregatedMV.datasource b/services/libs/tinybird/datasources/projectsAggregatedMV.datasource new file mode 100644 index 0000000000..3ce088ba4a --- /dev/null +++ b/services/libs/tinybird/datasources/projectsAggregatedMV.datasource @@ -0,0 +1,15 @@ +# Data Source created from Pipe 'projects_aggregation_mv' + +SCHEMA > + `id` String, + `name` String, + `slug` String, + `description` String, + `logo` String, + `collectionSlug` String, + `contributorCount` UInt64, + `organizationCount` UInt64, + `repositories` Array(Map(String, String)) + +ENGINE "ReplacingMergeTree" +ENGINE_SORTING_KEY "collectionSlug, slug" diff --git a/services/libs/tinybird/datasources/segments.datasource b/services/libs/tinybird/datasources/segments.datasource new file mode 100644 index 0000000000..a61b54996a --- /dev/null +++ b/services/libs/tinybird/datasources/segments.datasource @@ -0,0 +1,24 @@ + +SCHEMA > + `id` String `json:$.record.id`, + `parentId` String `json:$.record.parentId` DEFAULT '', + `grandparentId` String `json:$.record.grandparentId` DEFAULT '', + `type` String `json:$.record.type`, + `url` String `json:$.record.url` DEFAULT '', + `name` String `json:$.record.name` DEFAULT '', + `parentName` String `json:$.record.parentName` DEFAULT '', + `grandparentName` String `json:$.record.grandparentName` DEFAULT '', + `slug` String `json:$.record.slug`, + `parentSlug` String `json:$.record.parentSlug` DEFAULT '', + `grandparentSlug` String `json:$.record.grandparentSlug` DEFAULT '', + `status` LowCardinality(String) `json:$.record.status`, + `description` String `json:$.record.description` DEFAULT '', + `sourceId` String `json:$.record.sourceId` DEFAULT '', + `sourceParentId` String `json:$.record.sourceParentId` DEFAULT '', + `createdAt` DateTime64(3) `json:$.record.createdAt`, + `updatedAt` DateTime64(3) `json:$.record.updatedAt` + +ENGINE "ReplacingMergeTree" +ENGINE_PARTITION_KEY "toYear(createdAt)" +ENGINE_SORTING_KEY "slug" +ENGINE_VER "updatedAt" diff --git a/services/libs/tinybird/pipes/active_contributors.pipe b/services/libs/tinybird/pipes/active_contributors.pipe new file mode 100644 index 0000000000..a779c8630e --- /dev/null +++ b/services/libs/tinybird/pipes/active_contributors.pipe @@ -0,0 +1,49 @@ +TOKEN "active_contributors_endpoint_read_4407" READ + +TOKEN "raul_dev_access_token" READ + +NODE timeseries_generation_for_active_contributors +SQL > + + % + {% if defined(granularity) %} + SELECT + ds."startDate", + ds."endDate", + COALESCE(uniq( case when af.memberId != '' then af.memberId else null end), 0) AS "contributorCount" + FROM generate_timeseries ds + LEFT JOIN activities_filtered af ON + CASE + WHEN {{granularity}} = 'daily' + THEN toDate(af.timestamp) + WHEN {{granularity}} = 'weekly' + THEN toStartOfWeek(af.timestamp) + WHEN {{granularity}} = 'monthly' + THEN toStartOfMonth(af.timestamp) + WHEN {{granularity}} = 'quarterly' + THEN toStartOfQuarter(af.timestamp) + WHEN {{granularity}} = 'yearly' + THEN toStartOfYear(af.timestamp) + END = ds."startDate" + GROUP BY ds."startDate", "endDate" + ORDER BY ds."startDate" ASC + + {% else %} + select 1 + {% end %} + + + + +NODE active_contributors_merged +SQL > + + % + {% if not defined(granularity) %} + SELECT uniq( case when memberId != '' then memberId else null end) AS contributorCount FROM activities_filtered + {% else %} + select * from timeseries_generation_for_active_contributors + {% end %} + + + diff --git a/services/libs/tinybird/pipes/active_organizations.pipe b/services/libs/tinybird/pipes/active_organizations.pipe new file mode 100644 index 0000000000..4b92f2b07d --- /dev/null +++ b/services/libs/tinybird/pipes/active_organizations.pipe @@ -0,0 +1,49 @@ +TOKEN "active_organizations_endpoint_read_1975" READ + +TOKEN "raul_dev_access_token" READ + +NODE timeseries_generation_for_active_organizations +SQL > + + % + {% if defined(granularity) %} + SELECT + ds."startDate", + ds."endDate", + COALESCE(uniq( case when af.organizationId != '' then af.organizationId else null end), 0) AS "organizationCount" + FROM generate_timeseries ds + LEFT JOIN activities_filtered af ON + CASE + WHEN {{granularity}} = 'daily' + THEN toDate(af.timestamp) + WHEN {{granularity}} = 'weekly' + THEN toStartOfWeek(af.timestamp) + WHEN {{granularity}} = 'monthly' + THEN toStartOfMonth(af.timestamp) + WHEN {{granularity}} = 'quarterly' + THEN toStartOfQuarter(af.timestamp) + WHEN {{granularity}} = 'yearly' + THEN toStartOfYear(af.timestamp) + END = ds."startDate" + GROUP BY ds."startDate", "endDate" + ORDER BY ds."startDate" ASC + + {% else %} + select 1 + {% end %} + + + + +NODE active_organizations_merged +SQL > + + % + {% if not defined(granularity) %} + SELECT uniq( case when organizationId != '' then organizationId else null end) AS organizationCount FROM activities_filtered + {% else %} + select * from timeseries_generation_for_active_organizations + {% end %} + + + diff --git a/services/libs/tinybird/pipes/activities_count.pipe b/services/libs/tinybird/pipes/activities_count.pipe new file mode 100644 index 0000000000..1e4434ab15 --- /dev/null +++ b/services/libs/tinybird/pipes/activities_count.pipe @@ -0,0 +1,47 @@ +TOKEN "activities_count_endpoint_read_4271" READ + +NODE timeseries_generation_for_activity_count +SQL > + + % + {% set onlyContributions = False %} + {% if defined(granularity) %} + SELECT + ds."startDate", + ds."endDate", + count(case when af.id != '' then af.id else null end) AS "activityCount" + FROM generate_timeseries ds + LEFT JOIN + activities_filtered af + ON CASE + WHEN {{ granularity }} = 'daily' + THEN toDate(af.timestamp) + WHEN {{ granularity }} = 'weekly' + THEN toStartOfWeek(af.timestamp) + WHEN {{ granularity }} = 'monthly' + THEN toStartOfMonth(af.timestamp) + WHEN {{ granularity }} = 'quarterly' + THEN toStartOfQuarter(af.timestamp) + WHEN {{ granularity }} = 'yearly' + THEN toStartOfYear(af.timestamp) + END + = ds."startDate" + GROUP BY ds."startDate", ds."endDate" + order by ds."startDate" + {% else %} SELECT 1 + {% end %} + + + +NODE activity_count_merged +SQL > + + % + {% if not defined(granularity) %} + SELECT count( case when activities_filtered.id != '' then activities_filtered.id else null end) AS activityCount FROM activities_filtered + {% else %} + select * from timeseries_generation_for_activity_count + {% end %} + + + diff --git a/services/libs/tinybird/pipes/activities_cumulative_count.pipe b/services/libs/tinybird/pipes/activities_cumulative_count.pipe new file mode 100644 index 0000000000..2ab17a3671 --- /dev/null +++ b/services/libs/tinybird/pipes/activities_cumulative_count.pipe @@ -0,0 +1,65 @@ +TOKEN "activities_cumulative_count_endpoint_read_5317" READ + +NODE historical_activity_count +SQL > + + % + select count(*) from activities_filtered_historical_cutoff + + + +NODE cumulative_activity_count +SQL > + + % + {% if defined(granularity) %} + WITH base AS ( + SELECT + ds."startDate", + ds."endDate", + count(case when af.id != '' then af.id else null end) AS "activityCount" + FROM generate_timeseries ds + LEFT JOIN activities_filtered af ON + CASE + WHEN {{granularity}} = 'daily' + THEN toDate(af.timestamp) + WHEN {{granularity}} = 'weekly' + THEN toStartOfWeek(af.timestamp) + WHEN {{granularity}} = 'monthly' + THEN toStartOfMonth(af.timestamp) + WHEN {{granularity}} = 'quarterly' + THEN toStartOfQuarter(af.timestamp) + WHEN {{granularity}} = 'yearly' + THEN toStartOfYear(af.timestamp) + END = ds."startDate" + GROUP BY ds."startDate", ds."endDate" + ), + + rolling_cumulative AS ( + SELECT + "startDate", + "endDate", + "activityCount", + sumState("activityCount") AS "cumulativeState" + FROM base + GROUP BY "startDate", "endDate", "activityCount" + ) + + SELECT + "startDate", + "endDate", + COALESCE( + sumMerge(cumulativeState) OVER (ORDER BY "startDate" ASC), + 0 + ) + + (select count(*) from activities_filtered_historical_cutoff) AS "cumulativeActivityCount" + + FROM rolling_cumulative + ORDER BY "startDate" ASC + + {% else %} + SELECT 1 + {% end %} + + + diff --git a/services/libs/tinybird/pipes/activities_filtered.pipe b/services/libs/tinybird/pipes/activities_filtered.pipe new file mode 100644 index 0000000000..3d6c6f4384 --- /dev/null +++ b/services/libs/tinybird/pipes/activities_filtered.pipe @@ -0,0 +1,63 @@ +TOKEN "activities_filtered_endpoint_read_8708" READ + +TOKEN "raul_dev_access_token" READ + +NODE activities_filtered_by_timestamp_and_channel +SQL > + + % + SELECT id, type, timestamp, channel, platform FROM activities final + where 1=1 + {% if defined(startDate) %} + AND activities.timestamp > {{DateTime(startDate, description="Filter activity timestamp after", required=False)}} + {% end %} + {% if defined(endDate) %} + AND activities.timestamp < {{DateTime(endDate, description="Filter activity timestamp before", required=False)}} + {% end %} + {% if defined(repo) %} + AND activities.channel = {{String(repo, description="Filter activity repo", required=False)}} + {% end %} + {% if defined(platform) %} + AND activities.platform = {{String(platform, description="Filter activity platform", required=False)}} + {% end %} + {% if not defined (onlyContributions) or (defined(onlyContributions) and onlyContributions == 1) %} + AND activities.isContribution + {% end %} + {% if defined(activity_type) %} + AND activities.type = {{String(activity_type, description="Filter activity type", required=False)}} + {% end %} + + + + + +NODE segments_filtered +SQL > + + % + SELECT "segmentId" as id FROM insightsProjects FINAL + {% if defined(project) %} + where slug = {{String(project, description="Filter by project slug", required=True)}} + {% else %} + where false + {% end %} + + + + +NODE activity_relations_filtered +SQL > + + % + SELECT activities_filtered_by_timestamp_and_channel.id, activities_filtered_by_timestamp_and_channel.timestamp, activities_filtered_by_timestamp_and_channel.type, activities_filtered_by_timestamp_and_channel.platform, activityRelations.memberId, activityRelations.organizationId, activityRelations.segmentId FROM activities_filtered_by_timestamp_and_channel + join activityRelations final on activityRelations.activityId = activities_filtered_by_timestamp_and_channel.id + where 1=1 + and activityRelations.memberId in (select id from members where not members.isBot and not members.isTeamMember) + {% if defined(project) %} + and activityRelations.segmentId in (select id from segments_filtered) + {% end %} + + + + + diff --git a/services/libs/tinybird/pipes/activities_filtered_historical_cutoff.pipe b/services/libs/tinybird/pipes/activities_filtered_historical_cutoff.pipe new file mode 100644 index 0000000000..09c9433bca --- /dev/null +++ b/services/libs/tinybird/pipes/activities_filtered_historical_cutoff.pipe @@ -0,0 +1,61 @@ +TOKEN "activities_filtered_historical_cutoff_endpoint_read_5308" READ + +NODE activities_filtered_by_timestamp_and_channel +DESCRIPTION > + only difference between activities_filtered and this pipe is that, this'll count all activities until startDate (so the timerange filter is reverted) it's useful because it'll also support other filters and it'll be mainly used in activitird_cumulative_counts pipe to get the historical total + +SQL > + + % + SELECT id, type, timestamp, channel, platform FROM activities final + where 1=1 + {% if defined(startDate) %} + AND activities.timestamp <= {{DateTime(startDate, description="Filter activity timestamp after", required=False)}} + {% else %} + AND false + {% end %} + {% if defined(repo) %} + AND activities.channel = {{String(repo, description="Filter activity repo", required=False)}} + {% end %} + {% if defined(platform) %} + AND activities.platform = {{String(platform, description="Filter activity platform", required=False)}} + {% end %} + {% if not defined (onlyContributions) or (defined(onlyContributions) and onlyContributions == 1) %} + AND activities.isContribution + {% end %} + {% if defined(activity_type) %} + AND activities.type = {{String(activity_type, description="Filter activity type", required=False)}} + {% end %} + + + + + +NODE segments_filtered +SQL > + + % + SELECT "segmentId" as id FROM insightsProjects FINAL + {% if defined(project) %} + where slug = {{String(project, description="Filter by project slug", required=True)}} + {% end %} + + + + +NODE activity_relations_filtered +SQL > + + % + SELECT activities_filtered_by_timestamp_and_channel.id, activities_filtered_by_timestamp_and_channel.timestamp, activities_filtered_by_timestamp_and_channel.type, activities_filtered_by_timestamp_and_channel.platform, activityRelations.memberId, activityRelations.organizationId, activityRelations.segmentId FROM activities_filtered_by_timestamp_and_channel + join activityRelations final on activityRelations.activityId = activities_filtered_by_timestamp_and_channel.id + where 1=1 + and activityRelations.memberId in (select id from members where not members.isBot and not members.isTeamMember) + {% if defined(project) %} + and activityRelations.segmentId in (select id from segments_filtered) + {% end %} + order by timestamp desc + + + + diff --git a/services/libs/tinybird/pipes/activities_filtered_retention.pipe b/services/libs/tinybird/pipes/activities_filtered_retention.pipe new file mode 100644 index 0000000000..5f8cbdfd3b --- /dev/null +++ b/services/libs/tinybird/pipes/activities_filtered_retention.pipe @@ -0,0 +1,72 @@ +NODE activities_filtered_by_timestamp_and_channel +SQL > + + % + SELECT id, type, timestamp, channel, platform FROM activities final + where 1=1 + {% if defined(startDate) %} + AND activities.timestamp > + {% if defined(granularity) and granularity == "daily" %} + {{ DateTime(startDate, description="Filter activity timestamp after", required=False) }} - INTERVAL 1 DAY + {% elif defined(granularity) and granularity == "weekly" %} + {{ DateTime(startDate, description="Filter activity timestamp after", required=False) }} - INTERVAL 7 DAY + {% elif defined(granularity) and granularity == "monthly" %} + {{ DateTime(startDate, description="Filter activity timestamp after", required=False) }} - INTERVAL 1 MONTH + {% elif defined(granularity) and granularity == "quarterly" %} + {{ DateTime(startDate, description="Filter activity timestamp after", required=False) }} - INTERVAL 3 MONTH + {% elif defined(granularity) and granularity == "yearly" %} + {{ DateTime(startDate, description="Filter activity timestamp after", required=False) }} - INTERVAL 1 YEAR + {% else %} + {{ DateTime(startDate, description="Filter activity timestamp after", required=False) }} + {% end %} + {% end %} + {% if defined(endDate) %} + AND activities.timestamp < {{DateTime(endDate, description="Filter activity timestamp before", required=False)}} + {% end %} + {% if defined(repo) %} + AND activities.channel = {{String(repo, description="Filter activity repo", required=False)}} + {% end %} + {% if defined(platform) %} + AND activities.platform = {{String(platform, description="Filter activity platform", required=False)}} + {% end %} + {% if not defined (onlyContributions) or (defined(onlyContributions) and onlyContributions == 1) %} + AND activities.isContribution + {% end %} + {% if defined(activity_type) %} + AND activities.type = {{String(activity_type, description="Filter activity type", required=False)}} + {% end %} + + + + + +NODE segments_filtered +SQL > + + % + SELECT "segmentId" as id FROM insightsProjects FINAL + {% if defined(project) %} + where slug = {{String(project, description="Filter by project slug", required=True)}} + {% else %} + where false + {% end %} + + + + +NODE activity_relations_filtered +SQL > + + % + SELECT activities_filtered_by_timestamp_and_channel.id, activities_filtered_by_timestamp_and_channel.timestamp, activities_filtered_by_timestamp_and_channel.type, activities_filtered_by_timestamp_and_channel.platform, activityRelations.memberId, activityRelations.organizationId, activityRelations.segmentId FROM activities_filtered_by_timestamp_and_channel + join activityRelations final on activityRelations.activityId = activities_filtered_by_timestamp_and_channel.id + where 1=1 + and activityRelations.memberId in (select id from members where not members.isBot and not members.isTeamMember) + {% if defined(project) %} + and activityRelations.segmentId in (select id from segments_filtered) + {% end %} + + + + + diff --git a/services/libs/tinybird/pipes/activityRepositories_filtered.pipe b/services/libs/tinybird/pipes/activityRepositories_filtered.pipe new file mode 100644 index 0000000000..447894ec54 --- /dev/null +++ b/services/libs/tinybird/pipes/activityRepositories_filtered.pipe @@ -0,0 +1,21 @@ +TOKEN "activity_repositories_filtered_endpoint_read_9088" READ + +TOKEN "raul_dev_access_token" READ + +NODE activity_repositories_filtered_0 +SQL > + + % + SELECT * FROM activityRepositories final + where 1=1 + {% if defined(repo) %} + AND activityRepositories.repo = {{String(repo, description="Filter activity repo", required=False)}} + {% end %} + {% if defined(search) %} + AND activityRepositories.repo like 'https://%' || {{String(search, description="Search repos open ended wildcard ", required=False)}} || '%' + {% end %} + {% if defined(projectSlug) %} + AND activityRepositories.projectSlug = {{String(projectSlug, description="Search repos using projectSlug", required=False)}} + {% end %} + + diff --git a/services/libs/tinybird/pipes/activity_repositories.pipe b/services/libs/tinybird/pipes/activity_repositories.pipe new file mode 100644 index 0000000000..78fec57144 --- /dev/null +++ b/services/libs/tinybird/pipes/activity_repositories.pipe @@ -0,0 +1,26 @@ +TOKEN "raul_dev_access_token" READ + +NODE project_slug_segment_id_mapping +SQL > + + select distinct + activityRelations.activityId, + activityRelations.segmentId, + insightsProjects.name, + insightsProjects.slug, + insightsProjects.id as "projectId" + from activityRelations + join insightsProjects on insightsProjects.segmentId = activityRelations.segmentId + + + +NODE repo_project_slug_mapping +SQL > + + select distinct project_slug_segment_id_mapping."projectId" as "projectId", project_slug_segment_id_mapping."name" as "projectName", project_slug_segment_id_mapping.slug as "projectSlug", activities.channel as "repo" from project_slug_segment_id_mapping + join activities on activities.id = project_slug_segment_id_mapping.activityId + +TYPE materialized +DATASOURCE activityRepositories + + diff --git a/services/libs/tinybird/pipes/collections_filtered.pipe b/services/libs/tinybird/pipes/collections_filtered.pipe new file mode 100644 index 0000000000..ebaed73a3d --- /dev/null +++ b/services/libs/tinybird/pipes/collections_filtered.pipe @@ -0,0 +1,19 @@ +TOKEN "collections_filtered_endpoint_read_3036" READ + +TOKEN "raul_dev_access_token" READ + +NODE collections_filtered_0 +SQL > + + % + SELECT * FROM collections FINAL + where 1=1 + {% if defined(slug) %} + AND collections.slug = {{String(slug, description="Filter collection by slug", required=False)}} + {% end %} + {% if defined(search) %} + AND collections.slug like {{String(search, description="Search collection open ended wildcard using slug", required=False)}} || '%' + {% end %} + + + diff --git a/services/libs/tinybird/pipes/collections_list.pipe b/services/libs/tinybird/pipes/collections_list.pipe new file mode 100644 index 0000000000..7c4325ffae --- /dev/null +++ b/services/libs/tinybird/pipes/collections_list.pipe @@ -0,0 +1,122 @@ +TOKEN "collections_list_endpoint_read_0317" READ + +TOKEN "raul_dev_access_token" READ + +NODE collections_paginated +SQL > + + % + SELECT + collections_filtered.id, + collections_filtered.name, + collections_filtered.slug, + collections_filtered.description, + SUM( + CASE WHEN collectionsInsightsProjects.insightsProjectId != '' THEN 1 ELSE 0 END + ) as "projectCount" + FROM collections_filtered + left join + collectionsInsightsProjects + on collectionsInsightsProjects.collectionId = collections_filtered.id + group by + collections_filtered.id, + collections_filtered.name, + collections_filtered.slug, + collections_filtered.description + order by + {{ + column( + String(orderByField, "name", description="Order by collection field.", required=False) + ) + }} + {% if String( + orderByDirection, + 'asc', + description="Order by direction. ASC or DESC", + required=False, + ) == 'asc' or String( + orderByDirection, + 'asc', + description="Order by direction. ASC or DESC", + required=False, + ) == 'ASC' %} ASC + {% else %} DESC + {% end %} + LIMIT {{ Int32(pageSize, 10) }} + OFFSET {{ Int32(page, 0) * Int32(pageSize, 10) }} + + + +NODE collections_featured_projects +SQL > + + SELECT collectionsInsightsProjects.collectionId, insightsProjects_filtered.* + from insightsProjects_filtered + join + collectionsInsightsProjects + on collectionsInsightsProjects.insightsProjectId = insightsProjects_filtered.id + where + (collectionsInsightsProjects.collectionId in (select id from collections_paginated)) + and collectionsInsightsProjects.starred + + + +NODE merging_fields_together +SQL > + + % + {% if Boolean(count, false) %} SELECT count(collections_filtered.id) from collections_filtered + {% else %} + SELECT + collections_paginated.id as id, + collections_paginated.name as name, + collections_paginated.slug as slug, + collections_paginated.description as description, + collections_paginated.projectCount as "projectCount", + arrayFilter( + x -> x['name'] != '', -- ✅ Removes objects with empty 'name' + groupArray( + map( + 'name', + toString(collections_featured_projects.name), + 'slug', + toString(collections_featured_projects.slug), + 'logo', + toString(collections_featured_projects.logo) + ) + ) + ) AS featuredProjects + FROM collections_paginated + LEFT JOIN + collections_featured_projects + ON collections_featured_projects.collectionId = collections_paginated.id + GROUP BY + collections_paginated.id as id, + collections_paginated.name as name, + collections_paginated.slug as slug, + collections_paginated.description as description, + collections_paginated.projectCount as "projectCount" + order by + {{ + column( + String( + orderByField, "name", description="Order by collection field.", required=False + ) + ) + }} + {% if String( + orderByDirection, + 'asc', + description="Order by direction. ASC or DESC", + required=False, + ) == 'asc' or String( + orderByDirection, + 'asc', + description="Order by direction. ASC or DESC", + required=False, + ) == 'ASC' %} ASC + {% else %} DESC + {% end %} + {% end %} + + diff --git a/services/libs/tinybird/pipes/contributor_dependency.pipe b/services/libs/tinybird/pipes/contributor_dependency.pipe new file mode 100644 index 0000000000..d25b5205f7 --- /dev/null +++ b/services/libs/tinybird/pipes/contributor_dependency.pipe @@ -0,0 +1,19 @@ +TOKEN "raul_dev_access_token" READ + +TOKEN "contributor_dependency_endpoint_read_4095" READ + +NODE contributions_percentage_running_total +SQL > + + + SELECT t.*, active_contributors.contributorCount as "totalContributorCount" + FROM ( + SELECT id, displayName, contributionPercentage, + sum(contributionPercentage) OVER (ORDER BY contributionPercentage DESC, id) AS contributionPercentageRunningTotal + FROM contributors_leaderboard + ) t + left join active_contributors on 1=1 + WHERE contributionPercentageRunningTotal <= 51 OR (contributionPercentageRunningTotal - contributionPercentage < 51) + + + diff --git a/services/libs/tinybird/pipes/contributor_retention.pipe b/services/libs/tinybird/pipes/contributor_retention.pipe new file mode 100644 index 0000000000..0b2309e828 --- /dev/null +++ b/services/libs/tinybird/pipes/contributor_retention.pipe @@ -0,0 +1,55 @@ +TOKEN "raul_dev_access_token" READ + +TOKEN "member_retention_endpoint_read_0213" READ + +NODE member_retention_1 +SQL > + + % + with aggregated_members AS ( + SELECT + CASE + WHEN {{granularity}} = 'daily' THEN toDate(timestamp) + WHEN {{granularity}} = 'weekly' THEN toStartOfWeek(timestamp) + WHEN {{granularity}} = 'monthly' THEN toStartOfMonth(timestamp) + WHEN {{granularity}} = 'quarterly' THEN toStartOfQuarter(timestamp) + WHEN {{granularity}} = 'yearly' THEN toStartOfYear(timestamp) + END AS period, + groupUniqArray(memberId) AS mems + FROM activities_filtered_retention + GROUP BY period + ), + retention AS ( + SELECT + ts."startDate" AS "startDate", + ts."endDate" as "endDate", + -- coalesce(prev.mems, []) AS previous_member_ids, + -- coalesce(curr.mems, []) AS current_member_ids, + -- arrayIntersect(coalesce(curr.mems, []), coalesce(prev.mems, [])) AS retained_member_ids, + length(arrayIntersect(coalesce(curr.mems, []), coalesce(prev.mems, []))) AS retained_members, + length(coalesce(prev.mems, [])) AS previous_period_total, + if(previous_period_total > 0, + round(100 * retained_members / previous_period_total, 2), + 0) AS "retentionRate" + FROM generate_timeseries ts + LEFT JOIN aggregated_members AS curr + ON ts."startDate" = curr.period + LEFT JOIN aggregated_members AS prev + ON prev.period = + CASE + WHEN {{granularity}} = 'daily' THEN ts."startDate" - INTERVAL 1 DAY + WHEN {{granularity}} = 'weekly' THEN ts."startDate" - INTERVAL 1 WEEK + WHEN {{granularity}} = 'monthly' THEN ts."startDate" - INTERVAL 1 MONTH + WHEN {{granularity}} = 'quarterly' THEN ts."startDate" - INTERVAL 1 QUARTER + WHEN {{granularity}} = 'yearly' THEN ts."startDate" - INTERVAL 1 YEAR + END + ) + SELECT + "startDate", + "endDate", + "retentionRate" + FROM retention + ORDER BY "startDate" + + + diff --git a/services/libs/tinybird/pipes/contributors_geo_distribution.pipe b/services/libs/tinybird/pipes/contributors_geo_distribution.pipe new file mode 100644 index 0000000000..23371c4dd1 --- /dev/null +++ b/services/libs/tinybird/pipes/contributors_geo_distribution.pipe @@ -0,0 +1,36 @@ +TOKEN "raul_dev_access_token" READ + +TOKEN "contributors_geo_distribution_endpoint_read_8966" READ + +NODE contributors_geo_distribution_2 +SQL > + + WITH + country_mapping_array AS ( + SELECT groupArray((country, flag, country_code)) AS country_data FROM country_mapping + ), + + parsed_country AS ( + SELECT + m.id, + m.location, + arrayFilter( + x -> position(coalesce(nullIf(m.country, ''), m.location), upper(x.1)) > 0, + (SELECT country_data FROM country_mapping_array) + ) AS matched_countries, + arrayJoin(if(empty(matched_countries), [('Unknown', '❓', 'XX')], matched_countries)) AS country_data + FROM members AS m + WHERE m.id IN (SELECT "memberId" FROM activities_filtered) + ) + + SELECT + country_data.1 AS country, + country_data.2 AS flag, + country_data.3 AS country_code, + COUNT(id) AS contributorCount, + round((count(id)/ (select "contributorCount" from active_contributors))*100) as "contributorPercentage" + FROM parsed_country + GROUP BY country, flag, country_code + ORDER BY contributorCount DESC + + diff --git a/services/libs/tinybird/pipes/contributors_leaderboard.pipe b/services/libs/tinybird/pipes/contributors_leaderboard.pipe new file mode 100644 index 0000000000..4283f9967b --- /dev/null +++ b/services/libs/tinybird/pipes/contributors_leaderboard.pipe @@ -0,0 +1,30 @@ +TOKEN "contributors_leaderboard_endpoint_read_3484" READ + +TOKEN "raul_dev_access_token" READ + +NODE total_activity_count +SQL > + + % + SELECT count(activities_filtered.id) as "totalContributions" FROM activities_filtered + + + +NODE contributors_leaderboard_1 +SQL > + + % + SELECT + members.id as id, + members.avatar as avatar, + members.displayName as displayName, + count(activities_filtered.id) as "contributionCount", + round((count(activities_filtered.id)/ (select "totalContributions" from total_activity_count))*100) as "contributionPercentage" + FROM activities_filtered + join members final on members.id = activities_filtered.memberId + where not members.isTeamMember and not members.isBot + group by members.id, members.avatar, members.displayName + order by count(activities_filtered.id) as "activityCount" desc + limit {{Int32(limit, 10)}} + + diff --git a/services/libs/tinybird/pipes/country_mapping.pipe b/services/libs/tinybird/pipes/country_mapping.pipe new file mode 100644 index 0000000000..be173637e8 --- /dev/null +++ b/services/libs/tinybird/pipes/country_mapping.pipe @@ -0,0 +1,44 @@ +TOKEN "country_mapping_endpoint_read_0281" READ + +NODE map_country_name_flag_code +SQL > + + SELECT + country_map.1 AS country, + country_map.2 AS flag, + country_map.3 AS country_code + FROM ( + SELECT arrayJoin([ + ('Afghanistan', '🇦🇫', 'AF'), ('Albania', '🇦🇱', 'AL'), ('Algeria', '🇩🇿', 'DZ'), + ('Andorra', '🇦🇩', 'AD'), ('Angola', '🇦🇴', 'AO'), ('Argentina', '🇦🇷', 'AR'), + ('Armenia', '🇦🇲', 'AM'), ('Australia', '🇦🇺', 'AU'), ('Austria', '🇦🇹', 'AT'), + ('Azerbaijan', '🇦🇿', 'AZ'), ('Bahamas', '🇧🇸', 'BS'), ('Bahrain', '🇧🇭', 'BH'), + ('Bangladesh', '🇧🇩', 'BD'), ('Barbados', '🇧🇧', 'BB'), ('Belarus', '🇧🇾', 'BY'), + ('Belgium', '🇧🇪', 'BE'), ('Belize', '🇧🇿', 'BZ'), ('Benin', '🇧🇯', 'BJ'), + ('Bhutan', '🇧🇹', 'BT'), ('Bolivia', '🇧🇴', 'BO'), ('Bosnia and Herzegovina', '🇧🇦', 'BA'), + ('Botswana', '🇧🇼', 'BW'), ('Brazil', '🇧🇷', 'BR'), ('Bulgaria', '🇧🇬', 'BG'), + ('Canada', '🇨🇦', 'CA'), ('Chile', '🇨🇱', 'CL'), ('China', '🇨🇳', 'CN'), + ('Colombia', '🇨🇴', 'CO'), ('Costa Rica', '🇨🇷', 'CR'), ('Croatia', '🇭🇷', 'HR'), + ('Cuba', '🇨🇺', 'CU'), ('Cyprus', '🇨🇾', 'CY'), ('Czech Republic', '🇨🇿', 'CZ'), + ('Denmark', '🇩🇰', 'DK'), ('Dominican Republic', '🇩🇴', 'DO'), ('Ecuador', '🇪🇨', 'EC'), + ('Egypt', '🇪🇬', 'EG'), ('France', '🇫🇷', 'FR'), ('Germany', '🇩🇪', 'DE'), + ('India', '🇮🇳', 'IN'), ('Indonesia', '🇮🇩', 'ID'), ('Iran', '🇮🇷', 'IR'), + ('Iraq', '🇮🇶', 'IQ'), ('Ireland', '🇮🇪', 'IE'), ('Israel', '🇮🇱', 'IL'), + ('Italy', '🇮🇹', 'IT'), ('Jamaica', '🇯🇲', 'JM'), ('Japan', '🇯🇵', 'JP'), + ('Jordan', '🇯🇴', 'JO'), ('Kazakhstan', '🇰🇿', 'KZ'), ('Kenya', '🇰🇪', 'KE'), + ('Malaysia', '🇲🇾', 'MY'), ('Mexico', '🇲🇽', 'MX'), ('Morocco', '🇲🇦', 'MA'), + ('Nepal', '🇳🇵', 'NP'), ('Netherlands', '🇳🇱', 'NL'), ('New Zealand', '🇳🇿', 'NZ'), + ('Nigeria', '🇳🇬', 'NG'), ('Pakistan', '🇵🇰', 'PK'), ('Philippines', '🇵🇭', 'PH'), + ('Poland', '🇵🇱', 'PL'), ('Portugal', '🇵🇹', 'PT'), ('Romania', '🇷🇴', 'RO'), + ('Russia', '🇷🇺', 'RU'), ('Saudi Arabia', '🇸🇦', 'SA'), ('Singapore', '🇸🇬', 'SG'), + ('South Africa', '🇿🇦', 'ZA'), ('South Korea', '🇰🇷', 'KR'), ('Spain', '🇪🇸', 'ES'), + ('Sri Lanka', '🇱🇰', 'LK'), ('Sweden', '🇸🇪', 'SE'), ('Switzerland', '🇨🇭', 'CH'), + ('Thailand', '🇹🇭', 'TH'), ('Turkey', '🇹🇷', 'TR'), ('Ukraine', '🇺🇦', 'UA'), + ('United Arab Emirates', '🇦🇪', 'AE'), ('United Kingdom', '🇬🇧', 'GB'), + ('United States', '🇺🇸', 'US'), ('Venezuela', '🇻🇪', 'VE'), ('Vietnam', '🇻🇳', 'VN'), + ('Zambia', '🇿🇲', 'ZM'), ('Zimbabwe', '🇿🇼', 'ZW') + ]) AS country_map + ) + + + diff --git a/services/libs/tinybird/pipes/generate_timeseries.pipe b/services/libs/tinybird/pipes/generate_timeseries.pipe new file mode 100644 index 0000000000..0f8233370f --- /dev/null +++ b/services/libs/tinybird/pipes/generate_timeseries.pipe @@ -0,0 +1,47 @@ +TOKEN "generate_timeseries_endpoint_read_3418" READ + +NODE generate_timeseriez +SQL > + + % + SELECT + CASE + WHEN {{granularity}} = 'daily' + THEN toDate(addDays(generate_timeseries_bounds.actual_start_date, number)) + WHEN {{granularity}} = 'weekly' + THEN toStartOfWeek(addDays(generate_timeseries_bounds.actual_start_date, number * 7)) + WHEN {{granularity}} = 'monthly' + THEN toStartOfMonth(addMonths(generate_timeseries_bounds.actual_start_date, number)) + WHEN {{granularity}} = 'quarterly' + THEN toStartOfQuarter(addMonths(generate_timeseries_bounds.actual_start_date, number * 3)) + WHEN {{granularity}} = 'yearly' + THEN toStartOfYear(addYears(generate_timeseries_bounds.actual_start_date, number)) + END AS "startDate", + CASE + WHEN {{granularity}} = 'daily' + THEN toDate(addDays(actual_start_date, number)) + WHEN {{granularity}} = 'weekly' + THEN toDate(toStartOfWeek(addDays(actual_start_date, number * 7)) + INTERVAL 6 DAY) + WHEN {{granularity}} = 'monthly' + THEN toDate(toStartOfMonth(addMonths(actual_start_date, number)) + INTERVAL 1 MONTH - INTERVAL 1 DAY) + WHEN {{granularity}} = 'quarterly' + THEN toDate(toStartOfQuarter(addMonths(actual_start_date, number * 3)) + INTERVAL 3 MONTH - INTERVAL 1 DAY) + WHEN {{granularity}} = 'yearly' + THEN toDate(toStartOfYear(addYears(actual_start_date, number)) + INTERVAL 1 YEAR - INTERVAL 1 DAY) + END AS "endDate" + FROM numbers(1000) + CROSS JOIN ( + SELECT + CASE + WHEN {{granularity}} = 'weekly' THEN toStartOfWeek(actual_start_date) + WHEN {{granularity}} = 'monthly' THEN toStartOfMonth(actual_start_date) + WHEN {{granularity}} = 'quarterly' THEN toStartOfQuarter(actual_start_date) + WHEN {{granularity}} = 'yearly' THEN toStartOfYear(actual_start_date) + ELSE actual_start_date + END AS actual_start_date, + actual_end_date + FROM generate_timeseries_bounds + ) generate_timeseries_bounds + WHERE "startDate" >= actual_start_date AND "startDate" < actual_end_date + + diff --git a/services/libs/tinybird/pipes/generate_timeseries_bounds.pipe b/services/libs/tinybird/pipes/generate_timeseries_bounds.pipe new file mode 100644 index 0000000000..815f460082 --- /dev/null +++ b/services/libs/tinybird/pipes/generate_timeseries_bounds.pipe @@ -0,0 +1,30 @@ +TOKEN "generate_timeseries_bounds_endpoint_read_4511" READ + +NODE generate_timeseries_boundz_0 +SQL > + + % + {% if defined(startDate) and not defined(endDate) %} + select + toDate({{startDate}}) as actual_start_date, + toDate(now()) as actual_end_date + {% end %} + {% if not defined(startDate) and defined(endDate) %} + select + min(toDate(activities_filtered.timestamp)) as actual_start_date, + toDate({{endDate}}) as actual_end_date + from activities_filtered + {% end %} + {% if not defined(startDate) and not defined(endDate) %} + select + min(toDate(activities_filtered.timestamp)) as actual_start_date, + toDate(now()) as actual_end_date + from activities_filtered + {% end %} + {% if defined(startDate) and defined(endDate) %} + select + toDate({{startDate}}) as actual_start_date, + toDate({{endDate}}) as actual_end_date + {% end %} + + diff --git a/services/libs/tinybird/pipes/insightsProjects_filtered.pipe b/services/libs/tinybird/pipes/insightsProjects_filtered.pipe new file mode 100644 index 0000000000..f200da4f3a --- /dev/null +++ b/services/libs/tinybird/pipes/insightsProjects_filtered.pipe @@ -0,0 +1,22 @@ +TOKEN "insighsProjects_filtered_endpoint_read_2583" READ + +TOKEN "raul_dev_access_token" READ + +NODE insighsProjects_filtered_0 +SQL > + + % + SELECT projectsAggregatedMV.* FROM projectsAggregatedMV FINAL + where 1=1 + {% if defined(slug) %} + AND projectsAggregatedMV.slug = {{String(slug, description="Filter collection by slug", required=False)}} + {% end %} + {% if defined(search) %} + AND projectsAggregatedMV.slug like {{String(search, description="Search collection open ended wildcard using slug", required=False)}} || '%' + {% end %} + {% if defined(collectionSlug) %} + AND projectsAggregatedMV.collectionSlug = {{String(collectionSlug, description="Filter collection by slug", required=False)}} + {% end %} + + + diff --git a/services/libs/tinybird/pipes/organization_dependency.pipe b/services/libs/tinybird/pipes/organization_dependency.pipe new file mode 100644 index 0000000000..82fd7a203c --- /dev/null +++ b/services/libs/tinybird/pipes/organization_dependency.pipe @@ -0,0 +1,19 @@ +TOKEN "raul_dev_access_token" READ + +TOKEN "organization_dependency_endpoint_read_6465" READ + +NODE organization_dependency_0 +SQL > + + + SELECT t.*, active_organizations.organizationCount as "totalOrganizationCount" + FROM ( + SELECT id, displayName, contributionPercentage, + sum(contributionPercentage) OVER (ORDER BY contributionPercentage DESC, id) AS contributionPercentageRunningTotal + FROM organizations_leaderboard + ) t + left join active_organizations on 1=1 + WHERE contributionPercentageRunningTotal <= 51 OR (contributionPercentageRunningTotal - contributionPercentage < 51) + + + diff --git a/services/libs/tinybird/pipes/organization_retention.pipe b/services/libs/tinybird/pipes/organization_retention.pipe new file mode 100644 index 0000000000..ee62c75c68 --- /dev/null +++ b/services/libs/tinybird/pipes/organization_retention.pipe @@ -0,0 +1,55 @@ +TOKEN "raul_dev_access_token" READ + +TOKEN "organization_retention_endpoint_read_0731" READ + +NODE organization_retention_0 +SQL > + + % + with aggregated_organizations AS ( + SELECT + CASE + WHEN {{granularity}} = 'daily' THEN toDate(timestamp) + WHEN {{granularity}} = 'weekly' THEN toStartOfWeek(timestamp) + WHEN {{granularity}} = 'monthly' THEN toStartOfMonth(timestamp) + WHEN {{granularity}} = 'quarterly' THEN toStartOfQuarter(timestamp) + WHEN {{granularity}} = 'yearly' THEN toStartOfYear(timestamp) + END AS period, + groupUniqArray(organizationId) AS orgs + FROM activities_filtered_retention + GROUP BY period + ), + retention AS ( + SELECT + ts."startDate" AS "startDate", + ts."endDate" as "endDate", + -- coalesce(prev.orgs, []) AS previous_org_ids, + -- coalesce(curr.orgs, []) AS current_org_ids, + -- arrayIntersect(coalesce(curr.orgs, []), coalesce(prev.orgs, [])) AS retained_org_ids, + length(arrayIntersect(coalesce(curr.orgs, []), coalesce(prev.orgs, []))) AS retained_orgs, + length(coalesce(prev.orgs, [])) AS previous_period_total, + if(previous_period_total > 0, + round(100 * retained_orgs / previous_period_total, 2), + 0) AS "retentionRate" + FROM generate_timeseries ts + LEFT JOIN aggregated_organizations AS curr + ON ts."startDate" = curr.period + LEFT JOIN aggregated_organizations AS prev + ON prev.period = + CASE + WHEN {{granularity}} = 'daily' THEN ts."startDate" - INTERVAL 1 DAY + WHEN {{granularity}} = 'weekly' THEN ts."startDate" - INTERVAL 1 WEEK + WHEN {{granularity}} = 'monthly' THEN ts."startDate" - INTERVAL 1 MONTH + WHEN {{granularity}} = 'quarterly' THEN ts."startDate" - INTERVAL 1 QUARTER + WHEN {{granularity}} = 'yearly' THEN ts."startDate" - INTERVAL 1 YEAR + END + ) + SELECT + "startDate", + "endDate", + "retentionRate" + FROM retention + ORDER BY "startDate" + + + diff --git a/services/libs/tinybird/pipes/organizations_geo_distribution.pipe b/services/libs/tinybird/pipes/organizations_geo_distribution.pipe new file mode 100644 index 0000000000..9345f333fd --- /dev/null +++ b/services/libs/tinybird/pipes/organizations_geo_distribution.pipe @@ -0,0 +1,37 @@ +TOKEN "raul_dev_access_token" READ + +TOKEN "organizations_geo_distribution_endpoint_read_0957" READ + +NODE organizations_geo_distribution_0 +SQL > + + WITH + country_mapping_array AS ( + SELECT groupArray((country, flag, country_code)) AS country_data FROM country_mapping + ), + + parsed_country AS ( + SELECT + o.id, + o.location, + arrayFilter( + x -> position(upper(o.location), upper(x.1)) > 0, + (SELECT country_data FROM country_mapping_array) + ) AS matched_countries, + arrayJoin(if(empty(matched_countries), [('Unknown', '❓', 'XX')], matched_countries)) AS country_data + FROM organizations AS o + WHERE o.id IN (SELECT "organizationId" FROM activities_filtered) + ) + + SELECT + country_data.1 AS country, + country_data.2 AS flag, + country_data.3 AS country_code, + COUNT(id) AS organizationCount, + round((count(id)/ (select "organizationCount" from active_organizations))*100) as "organizationPercentage" + FROM parsed_country + GROUP BY country, flag, country_code + ORDER BY organizationCount DESC + + + diff --git a/services/libs/tinybird/pipes/organizations_leaderboard.pipe b/services/libs/tinybird/pipes/organizations_leaderboard.pipe new file mode 100644 index 0000000000..5260855bdd --- /dev/null +++ b/services/libs/tinybird/pipes/organizations_leaderboard.pipe @@ -0,0 +1,32 @@ +TOKEN "organizations_leaderboard_endpoint_read_1189" READ + +TOKEN "raul_dev_access_token" READ + +NODE total_activity_count +SQL > + + % + SELECT count(activities_filtered.id) as "totalContributions" FROM activities_filtered + + + +NODE organizations_leaderboard_1 +SQL > + + % + SELECT + organizations.id as id, + organizations.logo as logo, + organizations.displayName as displayName, + count(activities_filtered.id) as "contributionCount", + + round( count(activities_filtered.id) / (select "totalContributions" from total_activity_count) * 100 ) + + as "contributionPercentage" + FROM activities_filtered + join organizations final on organizations.id = activities_filtered.organizationId + group by organizations.id, organizations.logo, organizations.displayName + order by count(activities_filtered.id) as "activityCount" desc + limit {{ Int32(limit, 10) }} + + diff --git a/services/libs/tinybird/pipes/projects_aggregation_mv.pipe b/services/libs/tinybird/pipes/projects_aggregation_mv.pipe new file mode 100644 index 0000000000..7a64b75957 --- /dev/null +++ b/services/libs/tinybird/pipes/projects_aggregation_mv.pipe @@ -0,0 +1,76 @@ +NODE projects_collections +SQL > + + SELECT cip.insightsProjectId, c.slug as "collectionSlug" FROM collectionsInsightsProjects cip + join collections c on c.id = cip.collectionId + + + +NODE projects_repositories +SQL > + + + SELECT + activityRepositories_filtered.* + from activityRepositories_filtered + join collectionsInsightsProjects on collectionsInsightsProjects.insightsProjectId = activityRepositories_filtered.projectId + + + + +NODE segmentId_activity_aggregates +SQL > + + SELECT "segmentId", + count(distinct "memberId") as "contributorCount", + count(distinct "organizationId") as "organizationCount" + FROM activityRelations + group by "segmentId" + + + + +NODE merge_repos_and_activity_aggregates +SQL > + + SELECT + insightsProjects.id as id, + insightsProjects.name as name, + insightsProjects.slug as slug, + insightsProjects.description as description, + insightsProjects.logoUrl as logo, + projects_collections.collectionSlug as "collectionSlug", + segmentId_activity_aggregates ."contributorCount" as "contributorCount", + segmentId_activity_aggregates ."organizationCount" as "organizationCount", + arrayFilter( + x -> x['repo'] != '', + groupArray( + map( + 'projectId', toString(projects_repositories.projectId), + 'projectName', toString(projects_repositories.projectName), + 'projectSlug', toString(projects_repositories.projectSlug), + 'repo', toString(projects_repositories.repo) + ) + ) + ) AS repositories + FROM insightsProjects + LEFT JOIN segmentId_activity_aggregates + ON segmentId_activity_aggregates.segmentId = insightsProjects.segmentId + LEFT JOIN projects_repositories + ON projects_repositories.projectId = insightsProjects.id + LEFT JOIN projects_collections + on projects_collections.insightsProjectId = insightsProjects.id + GROUP BY + insightsProjects.id, + insightsProjects.name, + insightsProjects.slug, + insightsProjects.description, + insightsProjects.logoUrl, + projects_collections.collectionSlug, + segmentId_activity_aggregates ."contributorCount", + segmentId_activity_aggregates ."organizationCount" + +TYPE materialized +DATASOURCE projectsAggregatedMV + + diff --git a/services/libs/tinybird/pipes/projects_list.pipe b/services/libs/tinybird/pipes/projects_list.pipe new file mode 100644 index 0000000000..8391e3c8a3 --- /dev/null +++ b/services/libs/tinybird/pipes/projects_list.pipe @@ -0,0 +1,28 @@ +TOKEN "projects_list_endpoint_read_5981" READ + +TOKEN "raul_dev_access_token" READ + +NODE projects_paginated +SQL > + + % + {% if Boolean(count, false) %} + SELECT count(collections_filtered.id) from collections_filtered + {% else %} + SELECT * + FROM insightsProjects_filtered + order by + {{ column(String(orderByField, "name", description="Order by project field.", required=False)) }} + {% if String( + orderByDirection, 'asc', description="Order by direction. ASC or DESC", required=False + ) == 'asc' or String( + orderByDirection, 'asc', description="Order by direction. ASC or DESC", required=False + ) == 'ASC' %} ASC + {% else %} DESC + {% end %} + + LIMIT {{ Int32(pageSize, 10) }} + OFFSET {{ Int32(page, 0) * Int32(pageSize, 10) }} + {% end %} + + diff --git a/services/libs/tinybird/pipes/search_collections_projects_repos.pipe b/services/libs/tinybird/pipes/search_collections_projects_repos.pipe new file mode 100644 index 0000000000..fda3dad158 --- /dev/null +++ b/services/libs/tinybird/pipes/search_collections_projects_repos.pipe @@ -0,0 +1,36 @@ +TOKEN "search_collections_projects_repos_endpoint_read_0215" READ + +TOKEN "raul_dev_access_token" READ + +NODE merge_results_from_collections_projects_repos_filtered +SQL > + + % + select + 'collection' as type, + collections_filtered.slug, + null as logo, + null as projectSlug, + collections_filtered.name + from collections_filtered + limit {{Integer(limit, 10, description="Limit number of records for each type", required=False)}} + union all + select + 'project' as type, + insightsProjects_filtered.slug, + insightsProjects_filtered.logo, + insightsProjects_filtered.slug as "projectSlug", + insightsProjects_filtered.name + from insightsProjects_filtered + limit {{Integer(limit, 10, description="Limit number of records for each type", required=False)}} + union all + select + 'repository' as type, + activityRepositories_filtered.repo as slug, + null as logo, + activityRepositories_filtered.projectSlug as "projectSlug", + null as name + from activityRepositories_filtered + limit {{Integer(limit, 10, description="Limit number of records for each type", required=False)}} + + diff --git a/services/libs/tinybird/requirements.txt b/services/libs/tinybird/requirements.txt new file mode 100644 index 0000000000..03f6ab9088 --- /dev/null +++ b/services/libs/tinybird/requirements.txt @@ -0,0 +1 @@ +tinybird-cli>=5,<6 \ No newline at end of file diff --git a/services/libs/tinybird/scripts/append_fixtures.sh b/services/libs/tinybird/scripts/append_fixtures.sh new file mode 100755 index 0000000000..e8745565af --- /dev/null +++ b/services/libs/tinybird/scripts/append_fixtures.sh @@ -0,0 +1,21 @@ + +#!/usr/bin/env bash +set -euxo pipefail + +directory="datasources/fixtures" +extensions=("csv" "ndjson") + +absolute_directory=$(realpath "$directory") + +for extension in "${extensions[@]}"; do + file_list=$(find "$absolute_directory" -type f -name "*.$extension") + + for file_path in $file_list; do + file_name=$(basename "$file_path") + file_name_without_extension="${file_name%.*}" + + command="tb datasource append $file_name_without_extension datasources/fixtures/$file_name" + echo $command + $command + done +done diff --git a/services/libs/tinybird/scripts/exec_test.sh b/services/libs/tinybird/scripts/exec_test.sh new file mode 100755 index 0000000000..50571d952d --- /dev/null +++ b/services/libs/tinybird/scripts/exec_test.sh @@ -0,0 +1,58 @@ + +#!/usr/bin/env bash +set -euxo pipefail + +export TB_VERSION_WARNING=0 + +run_test() { + t=$1 + echo "** Running $t **" + echo "** $(cat $t)" + tmpfile=$(mktemp) + retries=0 + TOTAL_RETRIES=3 + + # When appending fixtures, we need to retry in case of the data is not replicated in time + while [ $retries -lt $TOTAL_RETRIES ]; do + # Run the test and store the output in a temporary file + bash $t $2 >$tmpfile + exit_code=$? + if [ "$exit_code" -eq 0 ]; then + # If the test passed, break the loop + if diff -B ${t}.result $tmpfile >/dev/null 2>&1; then + break + # If the test failed, increment the retries counter and try again + else + retries=$((retries+1)) + fi + # If the bash command failed, print an error message and break the loop + else + break + fi + done + + if diff -B ${t}.result $tmpfile >/dev/null 2>&1; then + echo "✅ Test $t passed" + rm $tmpfile + return 0 + elif [ $retries -eq $TOTAL_RETRIES ]; then + echo "🚨 ERROR: Test $t failed, diff:"; + diff -B ${t}.result $tmpfile + rm $tmpfile + return 1 + else + echo "🚨 ERROR: Test $t failed with bash command exit code $?" + cat $tmpfile + rm $tmpfile + return 1 + fi + echo "" +} +export -f run_test + +fail=0 +find ./tests -name "*.test" -print0 | xargs -0 -I {} -P 4 bash -c 'run_test "$@"' _ {} || fail=1 + +if [ $fail == 1 ]; then + exit -1; +fi diff --git a/services/libs/types/src/activities.ts b/services/libs/types/src/activities.ts index a67b57befc..869e6dc072 100644 --- a/services/libs/types/src/activities.ts +++ b/services/libs/types/src/activities.ts @@ -48,6 +48,11 @@ export interface IActivityCreateData { } export interface IActivityData { + /** + * Unique identifier of the activity. + */ + id?: string + /** * Type of activity. * For example: comment, like, post, etc.