Skip to content

Commit

Permalink
Add live metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
nzws committed Jun 4, 2024
1 parent 75c0acc commit d403f0f
Show file tree
Hide file tree
Showing 23 changed files with 881 additions and 1,427 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export const apiInternalOnPublish: Middleware = async ctx => {
return;
}

await Action.startStream(liveId, watchToken, body.client_id);
await Action.startStream(liveId, watchToken, body.client_id, body.stream_id);

void client.v1.internals.push.action.$post({
body: {
Expand Down
33 changes: 31 additions & 2 deletions apps/push-serverless/src/services/action.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import { lives, rejectSession, sessions } from '../utils/sessions';
import { Encoder } from './encoder';
import { generateToken } from '../utils/token';
import { client, serverToken } from '../utils/api';
import { getStream } from './srs-api';

export class Action {
static async startStream(
liveId: number,
watchToken: string,
clientId: string
clientId: string,
streamId: string
) {
const currentSession = sessions.get(liveId);
if (currentSession) {
Expand All @@ -17,17 +20,43 @@ export class Action {
const internalToken = generateToken();
const encoder = new Encoder(liveId, watchToken, internalToken);

const sendHeartbeat = async () => {
const streamInfo = await getStream(streamId);
const stats = streamInfo?.stream;
if (!stats) {
console.warn('stream not found', streamId, liveId);
return;
}

void client.v1.internals.push.action.$post({
body: {
liveId,
action: 'stream:heartbeat',
serverToken,
stats: stats
}
});
};

const heartbeatInterval = setInterval(
() => void sendHeartbeat(),
1000 * 15
);

sessions.set(liveId, {
clientId,
encoder,
internalToken
internalToken,
heartbeatInterval
});

setTimeout(() => {
void encoder.encodeToHighQualityHls();
void encoder.encodeToLowQualityHls();
void encoder.encodeToAudio();

void sendHeartbeat();

// this.startRecording(liveId);
}, 500);
}
Expand Down
98 changes: 98 additions & 0 deletions apps/push-serverless/src/services/srs-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,101 @@ export const kickoffClient = async (id: string) => {
}
});
};

/** "streams": [
{
"id": "vid-ff500jr",
"name": "10_628f27feff6be7187b295317f8d5fe81e20e524f7f57e581c9430d3b3eea06ada579f23ba03c78ea43bf6c64520db4d6",
"vhost": "vid-8n5l812",
"app": "live",
"tcUrl": "rtmp://127.0.0.1/live",
"url": "/live/10_628f27feff6be7187b295317f8d5fe81e20e524f7f57e581c9430d3b3eea06ada579f23ba03c78ea43bf6c64520db4d6",
"live_ms": 1717517428337,
"clients": 6,
"frames": 7795,
"send_bytes": 186135731,
"recv_bytes": 44801527,
"kbps": {
"recv_30s": 2668,
"send_30s": 10686
},
"publish": {
"active": true,
"cid": "69kaucce"
},
"video": {
"codec": "H264",
"profile": "High",
"level": "Other",
"width": 1920,
"height": 1080
},
"audio": {
"codec": "AAC",
"sample_rate": 44100,
"channel": 2,
"profile": "LC"
}
}
]
*/
interface Stream {
id: string;
name: string;
vhost: string;
app: string;
tcUrl: string;
url: string;
live_ms: number;
clients: number;
frames: number;
send_bytes: number;
recv_bytes: number;
kbps: {
recv_30s: number;
send_30s: number;
};
publish: {
active: boolean;
cid: string;
};
video: {
codec: string;
profile: string;
level: string;
width: number;
height: number;
} | null;
audio: {
codec: string;
sample_rate: number;
channel: number;
profile: string;
} | null;
}

interface StreamsResponse {
code: number;
server: string;
service: string;
pid: string;
streams: Stream[];
}

interface StreamResponse {
code: number;
server: string;
service: string;
pid: string;
stream?: Stream;
}

export const getStreams = async () => {
const res = await fetch(`${API_ENDPOINT}/api/v1/streams`);
return res.json() as Promise<StreamsResponse>;
};

export const getStream = async (id: string) => {
const res = await fetch(`${API_ENDPOINT}/api/v1/streams/${id}`);
return res.json() as Promise<StreamResponse>;
};
1 change: 1 addition & 0 deletions apps/push-serverless/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export type SRSPublishCallback = {
stream: string;
tcUrl: string;
param: string;
stream_id: string;
};

export type SRSUnPublishCallback = {
Expand Down
5 changes: 5 additions & 0 deletions apps/push-serverless/src/utils/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export const sessions = new Map<
clientId: string;
encoder: Encoder;
internalToken: string;
heartbeatInterval?: NodeJS.Timeout;
}
>();

Expand Down Expand Up @@ -37,6 +38,10 @@ export const rejectSession = async (liveId: number) => {
console.warn('cleanup error', liveId, e);
}

if (session.heartbeatInterval) {
clearInterval(session.heartbeatInterval);
}

try {
await kickoffClient(session.clientId);
} catch (e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "Live" ADD COLUMN "stats" JSONB NOT NULL DEFAULT '{}';
1 change: 1 addition & 0 deletions apps/server/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ model Live {
watchToken String? @db.VarChar(100)
thumbnailId Int?
config Json @default("{}")
stats Json @default("{}")
isDeleted Boolean @default(false)
isRecording Boolean @default(false)
isPushing Boolean @default(false)
Expand Down
73 changes: 72 additions & 1 deletion apps/server/src/controllers/v1/internals/push/action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ const reqBodySchema: JSONSchemaType<Request> = {
'stream:stop',
'record:processing',
'record:done',
'record:failed'
'record:failed',
'stream:heartbeat'
]
},
serverToken: {
Expand All @@ -42,6 +43,66 @@ const reqBodySchema: JSONSchemaType<Request> = {
fileSize: {
type: 'string',
nullable: true
},
stats: {
type: 'object',
properties: {
kbps: {
type: 'object',
properties: {
recv_30s: {
type: 'number'
},
send_30s: {
type: 'number'
}
},
required: ['recv_30s', 'send_30s']
},
video: {
type: 'object',
properties: {
codec: {
type: 'string'
},
profile: {
type: 'string'
},
level: {
type: 'string'
},
width: {
type: 'number'
},
height: {
type: 'number'
}
},
required: ['codec', 'profile', 'level', 'width', 'height'],
nullable: true
},
audio: {
type: 'object',
properties: {
codec: {
type: 'string'
},
sample_rate: {
type: 'number'
},
channel: {
type: 'number'
},
profile: {
type: 'string'
}
},
required: ['codec', 'sample_rate', 'channel', 'profile'],
nullable: true
}
},
required: ['kbps'],
nullable: true
}
},
required: ['liveId', 'action', 'serverToken'],
Expand Down Expand Up @@ -127,6 +188,16 @@ export const postV1InternalsPushAction: APIRoute<
live.id,
LiveRecordingStatus.Failed
);
} else if (body.action === 'stream:heartbeat') {
if (!body.stats) {
ctx.status = 400;
ctx.body = {
errorCode: 'invalid_request'
};
return;
}

await lives.updateStats(live, body.stats);
}

if (newLive) {
Expand Down
5 changes: 3 additions & 2 deletions apps/server/src/controllers/v1/streams/get.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ export const getV1Streams: APIRoute<
never,
Response,
UserState & LiveState
> = ctx => {
> = async ctx => {
ctx.body = {
live: lives.getPrivate(ctx.state.live)
live: lives.getPrivate(ctx.state.live),
stats: await lives.getStats(ctx.state.live)
};
};
Loading

0 comments on commit d403f0f

Please sign in to comment.