Skip to content

Commit

Permalink
minor fixes and cleanup, support optional long polling
Browse files Browse the repository at this point in the history
  • Loading branch information
riccardobl committed Apr 23, 2024
1 parent 593a8e2 commit 9095b33
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 34 deletions.
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
"install": "^0.13.0",
"nostr-tools": "^2.3.2",
"npm": "^10.5.0",
"openagents-grpc-proto": "https://github.com/riccardobl/openagents-grpc-proto/releases/download/v0.7.2/openagents_grpc_proto-JAVASCRIPT.tgz",
"openagents-grpc-proto": "https://github.com/riccardobl/openagents-grpc-proto/releases/download/v0.7.3/openagents_grpc_proto-JAVASCRIPT.tgz",
"ts-node": "^10.9.2",
"ts-protoc-gen": "^0.15.0",
"uuidv4": "^6.2.13",
Expand Down
15 changes: 10 additions & 5 deletions src/Job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,14 @@ export default class Job implements _Job {
nodeId?: string
) {
this.timestamp = Date.now();
this.maxEventDuration = Math.min(maxEventDuration,1000*60*2);
this.maxEventDuration = maxEventDuration;
this.expiration = this.timestamp + maxEventDuration;
this.maxExecutionTime = Math.min(maxExecutionTime,1000*60*2);
this.maxExecutionTime = maxExecutionTime;
this.nodeId = nodeId || "";
if(this.maxExecutionTime<=5000) throw new Error("Invalid max execution time");
if(this.maxEventDuration<=5000) throw new Error("Invalid max event duration");



if (outputFormat) {
this.outputFormat = outputFormat;
Expand Down Expand Up @@ -105,11 +109,11 @@ export default class Job implements _Job {
const runOn: string = Utils.getTagVars(event, ["param", "run-on"])[0][0] || "generic";
const customerPublicKey: string = event.pubkey;
const timestamp: number = Number(event.created_at) * 1000;
const expiration: number = Math.min(
const expiration: number = Math.max(Math.min(
Number(Utils.getTagVars(event, ["expiration"])[0][0] || "0") * 1000 ||
timestamp + this.maxEventDuration,
timestamp + this.maxEventDuration
);
),timestamp+60000);
const nodeId = Utils.getTagVars(event, ["d"])[0][0] || "";

const relays: Array<string> = Utils.getTagVars(event, ["relays"])[0] || defaultRelays;
Expand Down Expand Up @@ -251,7 +255,8 @@ export default class Job implements _Job {

} else {
// result
if(!this.result.timestamp){
// if (content=="") return;
if (!this.result.timestamp) {
const result = this.result;

if (result.timestamp < timestamp) {
Expand Down
6 changes: 4 additions & 2 deletions src/NostrConnector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ export default class NostrConnector {

announcementTimeout: number;
maxEventDuration: number;
minEventDuration: number = 1000 * 60 * 2;
maxJobExecutionTime: number;
minJobExecutionTime: number = 1000 * 60 * 1;
since: number;
filterProvider: ((provider: string) => boolean) | undefined;
webhooks: WebHooks | undefined;
Expand Down Expand Up @@ -444,7 +446,7 @@ export default class NostrConnector {
if (kind && !((kind >= 5000 && kind <= 5999) || (kind >= 6000 && kind <= 6999)))
throw new Error("Invalid kind " + kind);
const job = new Job(
Math.min(expireAfter,this.maxEventDuration),
Utils.clamp(expireAfter, this.minEventDuration, this.maxEventDuration),
runOn,
description,
input,
Expand All @@ -456,7 +458,7 @@ export default class NostrConnector {
nodeId
);
const events: Array<VerifiedEvent> = await job.toRequest(sk);
for (const event of events) this.sendEvent(event);
await Promise.all(events.map((event)=>this.sendEvent(event)));
return job;
}

Expand Down
86 changes: 64 additions & 22 deletions src/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import * as GPRCBackend from "@protobuf-ts/grpc-backend";
import { ReflectionService } from "@grpc/reflection";
import { loadFileDescriptorSetFromBuffer } from "@grpc/proto-loader";
import Auth from "./Auth";

import Utils from "./Utils";
import {
Writable,
Readable
Expand Down Expand Up @@ -316,7 +316,23 @@ class RpcConnector implements IPoolConnector {
try {
const nodeId = this.getNodeId(context);
const id = request.jobId;
const job = await this.conn.getJob(nodeId, id);
let job = undefined;
if (request.wait) {
job = await Utils.busyWaitForSomething(
async () => {
try {
job = await this.getJob(request, context);
const isDone = job && job.state.status == JobStatus.SUCCESS && job.result.timestamp;
if (isDone) return job;
} catch (e) {}
return undefined;
}, () => {
return job; // return last fetched job
},
request.wait
);
}
if(!job) job = await this.conn.getJob(nodeId, id);
return job;
} catch (e) {
console.log(e);
Expand All @@ -332,15 +348,31 @@ class RpcConnector implements IPoolConnector {
const runOnFilter: RegExp = new RegExp(request.filterByRunOn || ".*");
const descriptionFilter: RegExp = new RegExp(request.filterByDescription || ".*");
const kindFilter: RegExp = new RegExp(request.filterByKind || ".*");
const jobs = await this.conn.findJobs(
nodeId,
jobIdFilter,
runOnFilter,
descriptionFilter,
customerFilter,
kindFilter,
true
);

const findJobs=async ()=>{
return await this.conn.findJobs(
nodeId,
jobIdFilter,
runOnFilter,
descriptionFilter,
customerFilter,
kindFilter,
true
);
};

let jobs = [];
if(request.wait){
jobs=await Utils.busyWaitForSomething(async ()=>{
const j = await findJobs();
if(j.length>0)return j;
},()=>{
return [];
},request.wait);
}else[
jobs=await findJobs()
]

const pendingJobs: PendingJobs = {
jobs,
};
Expand All @@ -354,17 +386,27 @@ class RpcConnector implements IPoolConnector {
async isJobDone(request: RpcGetJob, context: ServerCallContext): Promise<RpcIsJobDone> {
try {
const nodeId = this.getNodeId(context);
const job = await this.getJob(request, context);

if (job && job.state.status == JobStatus.SUCCESS) {
return {
isDone: true,
};
} else {
return {
isDone: false,
};
}
let isDone=false
if(request.wait){
isDone=await Utils.busyWaitForSomething(async ()=>{
try{
const job = await this.getJob(request, context);
const isDone=job && job.state.status == JobStatus.SUCCESS &&job.result.timestamp;
if(isDone)return true;
}catch(e){
}
return undefined;
},()=>{
return false;
},request.wait);
}else{
const job = await this.getJob(request, context);
isDone=job && job.state.status == JobStatus.SUCCESS && job.result.timestamp>0;
}
return {
isDone
};

} catch (e) {
console.log(e);
throw e;
Expand Down
26 changes: 26 additions & 0 deletions src/Utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,29 @@ export default class Utils {
return uuidv4();
}

static async busyWaitForSomething(cb: ()=>any,onTimeoutResultGenerator: ()=>any,maxMs: number=10000, sleep: number=21): Promise<any> {
const start = Date.now();
return new Promise((resolve, reject) => {
const loop =async () => {
try {
const v=await cb();
if (v!==undefined) {
resolve(v);
} else {
const now = Date.now();
if (now - start > maxMs) {
resolve(await onTimeoutResultGenerator());
} else {
setTimeout(loop, sleep);
}
}
} catch (e) {
reject(e);
}
};
loop();
});
}
static satoshiTimestamp(){
// time in milliseconds since 3 january 2009
const jan32009=new Date("2009-01-03").getTime();
Expand Down Expand Up @@ -36,6 +59,9 @@ export default class Utils {
}
return getPublicKey(secret);
}
static clamp(value, min, max) {
return Math.min(Math.max(value, min), max);
}

static async encryptHyperdrive(url: string, secretKey: string | Uint8Array):Promise<{
bundleUrl: string,
Expand Down

0 comments on commit 9095b33

Please sign in to comment.