Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add SSE example #287

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .scripts/list-of-samples.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"saga",
"schedules",
"search-attributes",
"server-sent-events",
"signals-queries",
"sinks",
"snippets",
Expand Down
3 changes: 3 additions & 0 deletions server-sent-events/.eslintignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
node_modules
lib
.eslintrc.js
48 changes: 48 additions & 0 deletions server-sent-events/.eslintrc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
const { builtinModules } = require('module');

const ALLOWED_NODE_BUILTINS = new Set(['assert']);

module.exports = {
root: true,
parser: '@typescript-eslint/parser',
parserOptions: {
project: './tsconfig.json',
tsconfigRootDir: __dirname,
},
plugins: ['@typescript-eslint', 'deprecation'],
extends: [
'eslint:recommended',
'plugin:@typescript-eslint/eslint-recommended',
'plugin:@typescript-eslint/recommended',
'prettier',
],
rules: {
// recommended for safety
'@typescript-eslint/no-floating-promises': 'error', // forgetting to await Activities and Workflow APIs is bad
'deprecation/deprecation': 'warn',

// code style preference
'object-shorthand': ['error', 'always'],

// relaxed rules, for convenience
'@typescript-eslint/no-unused-vars': [
'warn',
{
argsIgnorePattern: '^_',
varsIgnorePattern: '^_',
},
],
'@typescript-eslint/no-explicit-any': 'off',
},
overrides: [
{
files: ['src/workflows.ts', 'src/workflows-*.ts', 'src/workflows/*.ts'],
rules: {
'no-restricted-imports': [
'error',
...builtinModules.filter((m) => !ALLOWED_NODE_BUILTINS.has(m)).flatMap((m) => [m, `node:${m}`]),
],
},
},
],
};
2 changes: 2 additions & 0 deletions server-sent-events/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
lib
node_modules
1 change: 1 addition & 0 deletions server-sent-events/.npmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package-lock=false
1 change: 1 addition & 0 deletions server-sent-events/.nvmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
16
18 changes: 18 additions & 0 deletions server-sent-events/.post-create
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
To begin development, install the Temporal CLI:

Mac: {cyan brew install temporal}
Other: Download and extract the latest release from https://github.com/temporalio/cli/releases/latest

Start Temporal Server:

{cyan temporal server start-dev}

Use Node version 16+:

Mac: {cyan brew install node@16}
Other: https://nodejs.org/en/download/

Then, in the project directory, using two other shells, run these commands:

{cyan npm run start.watch}
{cyan npm run workflow}
1 change: 1 addition & 0 deletions server-sent-events/.prettierignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
lib
2 changes: 2 additions & 0 deletions server-sent-events/.prettierrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
printWidth: 120
singleQuote: true
15 changes: 15 additions & 0 deletions server-sent-events/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Server sent events

This example shows how to integrate an SSE server and Temporal in a simple workflow.

### Running this sample

1. `temporal server start-dev` to start [Temporal Server](https://github.com/temporalio/cli/#installation).
1. `npm install` to install dependencies.
1. `npm run server` to start the server.
1. `PORT=3001 npm run server` to start yet another server in another port.
1. In another shell, connect to the first server with `curl localhost:3000/events?room_id=A`.
1. In yet another shell, connect to the second server (note port 3001) with `curl localhost:3001/events?room_id=A`.
1. In yet _another_ shell, connect to the first server with `curl localhost:3000/events?room_id=B`.
1. In yet another shell, send a message to room A: `curl -XPOST "localhost:3000/events?room_id=A&message=Hi%20room%20A"`. This message will be broadcasted to your first and second shells, even though they are connected through different servers!
1. Now, send a message to room B: `curl -XPOST "localhost:3000/events?room_id=B&message=Hi%20room%20B"`. You should now only see a message pop up in the last shell!
40 changes: 40 additions & 0 deletions server-sent-events/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
{
"name": "temporal-server-sent-events",
"version": "0.1.0",
"private": true,
"scripts": {
"build": "tsc --build",
"build.watch": "tsc --build --watch",
"lint": "eslint .",
"server": "ts-node src/server.ts"
},
"nodemonConfig": {
"execMap": {
"ts": "ts-node"
},
"ext": "ts",
"watch": [
"src"
]
},
"dependencies": {
"@temporalio/activity": "1.7.0",
"@temporalio/client": "1.7.0",
"@temporalio/worker": "1.7.0",
"@temporalio/workflow": "1.7.0",
"nanoid": "3.x"
},
"devDependencies": {
"@tsconfig/node16": "^1.0.0",
"@types/node": "^16.11.43",
"@typescript-eslint/eslint-plugin": "^5.0.0",
"@typescript-eslint/parser": "^5.0.0",
"eslint": "^7.32.0",
"eslint-config-prettier": "^8.3.0",
"eslint-plugin-deprecation": "^1.2.1",
"nodemon": "^2.0.12",
"prettier": "^2.3.2",
"ts-node": "^10.8.1",
"typescript": "^4.4.2"
}
}
10 changes: 10 additions & 0 deletions server-sent-events/src/activities.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { Hub } from './hub';

export type Activities = ReturnType<typeof createActivities>;
export function createActivities(hubInstance: Hub) {
return {
async localBroadcast(args: { roomId: string; event: unknown }) {
hubInstance.broadcast(args.roomId, args.event);
},
};
}
49 changes: 49 additions & 0 deletions server-sent-events/src/hub.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import http from 'node:http';

type Client = {
id: string;
roomId: string;
res: http.ServerResponse;
};

// class Hub maintains the client connections in memory
// In practice, you could use something like Redis PubSub to work out the pub/sub part
export class Hub {
constructor(private readonly clients: Map<string, Client> = new Map()) {}

addClient(client: Client) {
console.log('adding client', client.id);
this.clients.set(client.id, client);
}

removeClient(id: string) {
console.log('removing client', id);
this.clients.delete(id);
}

broadcast(roomId: string, data: unknown) {
console.log(`broadcasting to ${this.clients.size}...`);
for (const client of this.clients.values()) {
if (client.roomId === roomId) {
this.writeToClient(client.id, data);
}
}
}

send(id: string, data: unknown) {
console.log(`sending to a single client: ${id}`);
const successful = this.writeToClient(id, data);
if (!successful) {
console.warn(`no client with id ${id} found`);
}
}

private writeToClient(id: string, data: unknown) {
if (!this.clients.has(id)) {
return false;
}
this.clients.get(id)?.res.write(`data: ${JSON.stringify(data)}\n\n`);
}
}

export const hubInstance = new Hub(new Map());
160 changes: 160 additions & 0 deletions server-sent-events/src/server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
import { Client } from '@temporalio/client';
import { Worker } from '@temporalio/worker';
import { randomUUID } from 'crypto';
import http from 'http';
import { nanoid } from 'nanoid';
import { createActivities } from './activities';
import { Hub } from './hub';
import { chatRoomWorkflow, Event, newEventSignal } from './workflows';

const temporalClient = new Client();
const serverTaskQueue = randomUUID();
const hub = new Hub();

// handleEvents adds the incoming conection as a client in the Hub
function handleEvents(req: http.IncomingMessage, res: http.ServerResponse) {
const headers = {
'Content-Type': 'text/event-stream',
Connection: 'keep-alive',
'Cache-Control': 'no-cache',
};

res.writeHead(200, headers);

const qs = new URL(req.url || '', `http://${req.headers.host}`);
const clientId = qs.searchParams.get('client_id') || nanoid();
const roomId = qs.searchParams.get('room_id') || 'default';

hub.addClient({
id: clientId,
roomId,
res,
});

req.on('close', () => {
hub.removeClient(clientId);
});

temporalClient.workflow
.signalWithStart<typeof chatRoomWorkflow, [Event]>(chatRoomWorkflow, {
args: [
{
roomId,
},
],
signal: newEventSignal,
signalArgs: [
{
type: 'join',
data: {
clientId,
},
},
],
workflowId: `room:${roomId}`,
taskQueue: serverTaskQueue,
})
.catch((err) => {
console.error(err);
res.end('{"ok": false}');
});
}

function handlePushEvents(req: http.IncomingMessage, res: http.ServerResponse) {
const headers = {
'Content-Type': 'text/event-stream',
Connection: 'keep-alive',
'Cache-Control': 'no-cache',
};

const qs = new URL(req.url || '', `http://${req.headers.host}`);
const clientId = qs.searchParams.get('client_id') || nanoid();
const roomId = qs.searchParams.get('room_id') || 'default';
const message = qs.searchParams.get('message') || 'hey wtf';

temporalClient.workflow
.signalWithStart<typeof chatRoomWorkflow, [Event]>(chatRoomWorkflow, {
args: [
{
roomId,
},
],
signal: newEventSignal,
signalArgs: [
{
type: 'message',
data: {
message,
clientId,
},
},
],
workflowId: `room:${roomId}`,
taskQueue: serverTaskQueue,
})
.then(() => {
res.writeHead(200, headers);
res.end('{"ok": true}');
})
.catch(() => {
res.writeHead(500, headers);
res.end('{"ok": false}');
});
}

// handleHealth works as a simple health check
function handleHealth(_req: http.IncomingMessage, res: http.ServerResponse) {
const headers = {
'Content-Type': 'application/json',
Connection: 'keep-alive',
'Cache-Control': 'no-cache',
};

res.writeHead(200, headers);

res.end('{"ok": true}');
}

async function main() {
const server = http.createServer((req, res) => {
if (req.method === 'GET' && req.url?.includes('/events')) {
handleEvents(req, res);
return;
}

if (req.method === 'POST' && req.url?.includes('/events')) {
handlePushEvents(req, res);
return;
}

handleHealth(req, res);
});

const activities = createActivities(hub);

// every server will have two components:
// - an http listener
// - a temporal worker that is able to broadcast messages to it's own connection list through SSE
const worker = await Worker.create({
activities,
workflowsPath: require.resolve('./workflows'),
taskQueue: serverTaskQueue,
});

const serverP = new Promise((resolve, reject) => {
const port = process.env['PORT'] || 3000;
server.listen(port, () => {
console.log(`🚀 :: server is listening on port ${port}`);
});

server.on('error', reject);
server.on('close', resolve);
});

await Promise.all([worker.run(), serverP]);
}

main().catch((err) => {
console.error(err);
process.exit(1);
});
Loading