Skip to content

Commit

Permalink
feat: support manual sync/backfill (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
kevcodez authored May 2, 2024
1 parent e3249cc commit 710f639
Show file tree
Hide file tree
Showing 19 changed files with 531 additions and 114 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
echo ORB_WEBHOOK_SECRET=whsec_ >> .env
echo SCHEMA=orb >> .env
echo PORT=8080 >> .env
echo API_KEY=api_key_test >> .env
echo API_KEY_SYNC=api_key_test >> .env
- name: Install dependencies
run: |
Expand Down
14 changes: 8 additions & 6 deletions apps/node-fastify/.env.sample
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
# Postgres database URL including auth and search path
DATABASE_URL=postgres://postgres:postgres@host:5432/postgres?sslmode=disable&search_path=orb

# optional
# Optional
SCHEMA=orb

# Secret to validate signatures of Orb webhooks
ORB_WEBHOOK_SECRET=whsec_

# optional, only needed when you want to actively sync data and call the Orb API, not needed for webhook processing
# Access the Orb API
ORB_API_KEY=test_

# optional
PORT=8080
# Optional, API key to authorize requests against the sync endpoints. When calling a sync endpoint this value has to be provided via HTTP header "Authorization".
#API_KEY_SYNC=

# API key to access the endpoints of the Node.js app
API_KEY=api_key_test
# Optional
PORT=8080

3 changes: 2 additions & 1 deletion apps/node-fastify/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
"license": "MIT",
"dependencies": {
"@fastify/autoload": "^5.8.0",
"@fastify/type-provider-typebox": "^4.0.0",
"dotenv": "^16.4.5",
"fastify": "^4.26.2",
"pino": "^8.20.0",
"pino": "^9.0.0",
"orb-sync-lib": "*"
},
"devDependencies": {
Expand Down
15 changes: 6 additions & 9 deletions apps/node-fastify/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import fastify, { FastifyInstance, FastifyServerOptions } from 'fastify';
import autoload from '@fastify/autoload';
import path from 'node:path';
import { OrbSync } from 'orb-sync-lib';
import assert from 'node:assert';
import { getConfig } from './utils/config';

export async function createApp(opts: FastifyServerOptions = {}): Promise<FastifyInstance> {
const app = fastify(opts);
Expand Down Expand Up @@ -35,16 +35,13 @@ export async function createApp(opts: FastifyServerOptions = {}): Promise<Fastif
dir: path.join(__dirname, 'routes'),
});

const { DATABASE_URL, ORB_WEBHOOK_SECRET, DATABASE_SCHEMA, ORB_API_KEY } = process.env;

assert(DATABASE_URL, 'DATABASE_URL is required');
assert(ORB_WEBHOOK_SECRET, 'ORB_WEBHOOK_SECRET is required');
const config = getConfig();

const orbSync = new OrbSync({
databaseUrl: DATABASE_URL,
orbWebhookSecret: ORB_WEBHOOK_SECRET,
databaseSchema: DATABASE_SCHEMA || 'orb',
orbApiKey: ORB_API_KEY,
databaseUrl: config.DATABASE_URL,
orbWebhookSecret: config.ORB_WEBHOOK_SECRET,
databaseSchema: config.DATABASE_SCHEMA,
orbApiKey: config.ORB_API_KEY,
});

app.decorate('orbSync', orbSync);
Expand Down
163 changes: 163 additions & 0 deletions apps/node-fastify/src/routes/sync.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
import { Static, Type } from '@fastify/type-provider-typebox';
import type { FastifyInstance } from 'fastify';
import { verifyApiKey } from '../utils/verifyApiKey';

const SchemaRequestParamsSyncCreditNotes = Type.Object({
limit: Type.Optional(Type.Number({ minimum: 1, maximum: 100 })),
});

const SchemaRequestParamsSyncCustomers = Type.Object({
limit: Type.Optional(Type.Number({ minimum: 1, maximum: 100 })),
createdAtGt: Type.Optional(
Type.String({
format: 'date-time',
})
),
createdAtGte: Type.Optional(
Type.String({
format: 'date-time',
})
),
createdAtLt: Type.Optional(
Type.String({
format: 'date-time',
})
),
createdAtLte: Type.Optional(
Type.String({
format: 'date-time',
})
),
});

const SchemaRequestParamsSyncSubscriptions = Type.Object({
limit: Type.Optional(Type.Number({ minimum: 1, maximum: 100 })),
createdAtGt: Type.Optional(
Type.String({
format: 'date-time',
})
),
createdAtGte: Type.Optional(
Type.String({
format: 'date-time',
})
),
createdAtLt: Type.Optional(
Type.String({
format: 'date-time',
})
),
createdAtLte: Type.Optional(
Type.String({
format: 'date-time',
})
),
});

const SchemaRequestParamsSyncInvoices = Type.Object({
limit: Type.Optional(Type.Number({ minimum: 1, maximum: 100 })),
createdAtGt: Type.Optional(
Type.String({
format: 'date-time',
})
),
createdAtGte: Type.Optional(
Type.String({
format: 'date-time',
})
),
createdAtLt: Type.Optional(
Type.String({
format: 'date-time',
})
),
createdAtLte: Type.Optional(
Type.String({
format: 'date-time',
})
),
});

export default async function routes(fastify: FastifyInstance) {
fastify.post<{
Querystring: Static<typeof SchemaRequestParamsSyncCreditNotes>;
}>('/sync/credit_notes', {
preHandler: [verifyApiKey],
schema: {
querystring: SchemaRequestParamsSyncCreditNotes,
},
handler: async (request, reply) => {
const query = request.query;

const count = await fastify.orbSync.sync('credit_notes', { limit: query.limit });

return reply.send({ count });
},
});

fastify.post<{
Querystring: Static<typeof SchemaRequestParamsSyncCustomers>;
}>('/sync/customers', {
preHandler: [verifyApiKey],
schema: {
querystring: SchemaRequestParamsSyncCustomers,
},
handler: async (request, reply) => {
const query = request.query;

const count = await fastify.orbSync.sync('customers', {
limit: query.limit,
createdAtGt: query.createdAtGt,
createdAtGte: query.createdAtGte,
createdAtLt: query.createdAtLt,
createdAtLte: query.createdAtLte,
});

return reply.send({ count });
},
});

fastify.post<{
Querystring: Static<typeof SchemaRequestParamsSyncSubscriptions>;
}>('/sync/subscriptions', {
preHandler: [verifyApiKey],
schema: {
querystring: SchemaRequestParamsSyncSubscriptions,
},
handler: async (request, reply) => {
const query = request.query;

const count = await fastify.orbSync.sync('subscriptions', {
limit: query.limit,
createdAtGt: query.createdAtGt,
createdAtGte: query.createdAtGte,
createdAtLt: query.createdAtLt,
createdAtLte: query.createdAtLte,
});

return reply.send({ count });
},
});

fastify.post<{
Querystring: Static<typeof SchemaRequestParamsSyncInvoices>;
}>('/sync/invoices', {
preHandler: [verifyApiKey],
schema: {
querystring: SchemaRequestParamsSyncInvoices,
},
handler: async (request, reply) => {
const query = request.query;

const count = await fastify.orbSync.sync('invoices', {
limit: query.limit,
createdAtGt: query.createdAtGt,
createdAtGte: query.createdAtGte,
createdAtLt: query.createdAtLt,
createdAtLte: query.createdAtLte,
});

return reply.send({ count });
},
});
}
2 changes: 1 addition & 1 deletion apps/node-fastify/src/routes/webhooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export default async function routes(fastify: FastifyInstance) {
const headers = request.headers;
const body: { raw: Buffer } = request.body as { raw: Buffer };

await fastify.orbSync.sync(body.raw.toString(), headers);
await fastify.orbSync.processWebhook(body.raw.toString(), headers);

return reply.send({ received: true });
},
Expand Down
9 changes: 6 additions & 3 deletions apps/node-fastify/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import type { FastifyInstance } from 'fastify';
import type { Server, IncomingMessage, ServerResponse } from 'node:http';
import { createApp } from './app';
import pino from 'pino';
import { TypeBoxTypeProvider } from '@fastify/type-provider-typebox';
import { getConfig } from './utils/config';

const logger = pino({
formatters: {
Expand All @@ -20,11 +22,12 @@ const main = async () => {
requestIdHeader: 'Request-Id',
});

// Init config
const port = process.env.PORT ? Number(process.env.PORT) : 8080;
app.withTypeProvider<TypeBoxTypeProvider>();

const config = getConfig();

// Start the server
app.listen({ port: Number(port), host: '0.0.0.0' }, (err, address) => {
app.listen({ port: config.PORT, host: '0.0.0.0' }, (err, address) => {
if (err) {
console.error(err);
process.exit(1);
Expand Down
50 changes: 50 additions & 0 deletions apps/node-fastify/src/utils/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import dotenv from 'dotenv';
import assert from 'assert';

type configType = {
/** Optional, API key to authorize requests against the sync endpoints */
API_KEY_SYNC?: string;

/** Port number the API is running on, defaults to 8080 */
PORT: number;

/** Postgres database URL including auth and search path */
DATABASE_URL: string

/** Secret to validate signatures of Orb webhooks */
ORB_WEBHOOK_SECRET: string

/** Defaults to Orb */
DATABASE_SCHEMA: string

/** Access the Orb API */
ORB_API_KEY?: string
};

function getConfigFromEnv(key: string, defaultValue?: string): string {
const value = process.env[key];
return value || defaultValue!;
}

let config: configType;

export function getConfig(): configType {
if (config) return config;

dotenv.config();

config = {
API_KEY_SYNC: getConfigFromEnv('API_KEY_SYNC'),
ORB_API_KEY: getConfigFromEnv('ORB_API_KEY'),
DATABASE_SCHEMA: getConfigFromEnv('DATABASE_SCHEMA', 'orb'),
DATABASE_URL: getConfigFromEnv('DATABASE_URL'),
ORB_WEBHOOK_SECRET: getConfigFromEnv('ORB_WEBHOOK_SECRET'),
PORT: Number(getConfigFromEnv('PORT', '8080')),
};

assert(!Number.isNaN(config.PORT), 'PORT must be a number');
assert(config.DATABASE_URL, 'DATABASE_URL is required');
assert(config.ORB_WEBHOOK_SECRET, 'ORB_WEBHOOK_SECRET is required');

return config;
}
15 changes: 15 additions & 0 deletions apps/node-fastify/src/utils/verifyApiKey.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { FastifyReply, FastifyRequest, HookHandlerDoneFunction } from 'fastify';
import { getConfig } from './config';

export const verifyApiKey = (request: FastifyRequest, reply: FastifyReply, done: HookHandlerDoneFunction): unknown => {
const config = getConfig();

if (!request.headers || !request.headers.authorization) {
return reply.code(401).send('Unauthorized');
}
const { authorization } = request.headers;
if (authorization !== config.API_KEY_SYNC) {
return reply.code(401).send('Unauthorized');
}
done();
};
Loading

0 comments on commit 710f639

Please sign in to comment.