diff --git a/package-lock.json b/package-lock.json index 474a46e..eec330a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -25,7 +25,7 @@ "install": "^0.13.0", "nostr-tools": "^2.3.2", "npm": "^10.5.0", - "openagents-grpc-proto": "https://github.com/riccardobl/openagents-grpc-proto/releases/download/v0.6/openagents_grpc_proto-JAVASCRIPT.tgz", + "openagents-grpc-proto": "https://github.com/riccardobl/openagents-grpc-proto/releases/download/v0.7.2/openagents_grpc_proto-JAVASCRIPT.tgz", "ts-node": "^10.9.2", "ts-protoc-gen": "^0.15.0", "uuidv4": "^6.2.13", @@ -4268,9 +4268,9 @@ } }, "node_modules/openagents-grpc-proto": { - "version": "0.6", - "resolved": "https://github.com/riccardobl/openagents-grpc-proto/releases/download/v0.6/openagents_grpc_proto-JAVASCRIPT.tgz", - "integrity": "sha512-gn3nEpZueOnK0rsuVv+zQoXwA7RfrWc4/3FPswOgrfRtfwRWrm402nOkZGMXeEAjuwrfXbqJ2998/q2TXtgbxA==", + "version": "0.7.2", + "resolved": "https://github.com/riccardobl/openagents-grpc-proto/releases/download/v0.7.2/openagents_grpc_proto-JAVASCRIPT.tgz", + "integrity": "sha512-Z3eGXIKMnLB8GklHeWoASOD8X0h34cjUkBLsyGCj5wmfwE+CNdkO2bhB3WakTFhx2JTgX38ihLrKoBPBXs3h8Q==", "dependencies": { "@grpc/grpc-js": "^1.10.6", "@protobuf-ts/plugin": "^2.9.4" diff --git a/package.json b/package.json index d5a6d5d..58d339a 100644 --- a/package.json +++ b/package.json @@ -34,7 +34,7 @@ "install": "^0.13.0", "nostr-tools": "^2.3.2", "npm": "^10.5.0", - "openagents-grpc-proto": "https://github.com/riccardobl/openagents-grpc-proto/releases/download/v0.6/openagents_grpc_proto-JAVASCRIPT.tgz", + "openagents-grpc-proto": "https://github.com/riccardobl/openagents-grpc-proto/releases/download/v0.7.2/openagents_grpc_proto-JAVASCRIPT.tgz", "ts-node": "^10.9.2", "ts-protoc-gen": "^0.15.0", "uuidv4": "^6.2.13", diff --git a/src/Auth.ts b/src/Auth.ts index 6cc9d61..f0a7609 100644 --- a/src/Auth.ts +++ b/src/Auth.ts @@ -1,7 +1,6 @@ import * as GRPC from "@grpc/grpc-js"; import Utils from "./Utils"; import { generateSecretKey, getPublicKey } from "nostr-tools"; - export default class Auth { @@ -22,6 +21,7 @@ export default class Auth { const token: string = metadata["authorization"] || Utils.uuidFrom(call.getPeer()); const id = Utils.uuidFrom(token); call.metadata.set("nodeid", id); + call.metadata.set("cacheid", id); if (this.isAuthorized(methodName, id)) { methodImplementation(call, callback); } else { diff --git a/src/Cache.ts b/src/Cache.ts new file mode 100644 index 0000000..74b4aa9 --- /dev/null +++ b/src/Cache.ts @@ -0,0 +1,125 @@ +import HyperdrivePool, { DriverOut, ReadableGreedy } from "./HyperdrivePool"; +import { SharedDrive } from "./HyperdrivePool"; +import Path from "path"; +import Fs from "fs"; +import Utils from "./Utils"; +export class CacheDisk { + drive: SharedDrive; + version: number; + constructor(drive: SharedDrive, cacheVersion: number) { + this.drive = drive; + this.version = cacheVersion ; + + } + + async getAsStream(path: string, requestedEntryVersion?:number): Promise { + if (!(await this.drive.exists(path))) { + console.log("Cache miss", path); + return undefined; + } + + const meta = JSON.parse((await this.drive.get(path + ".meta.json")).toString()); + + + if (meta.cacheVersion != this.version) { + console.log( + "Cache version mismatch", + path, + "current:", + this.version, + "stored:", + meta.cacheVersion + ); + return undefined; + } + + if (requestedEntryVersion && requestedEntryVersion != meta.entryVersion) { + console.log( + "Entry version mismatch", + path, + "requested:", + requestedEntryVersion, + "stored:", + meta.entryVersion + ); + return undefined; + } + + if (meta.expiration && meta.expiration < Date.now()) { + console.log("Expired cache", path, "expiration:", meta.expiration, "now:", Date.now()); + return undefined; + } + + return await this.drive.inputStream(path); + } + + async setAsStream(path: string, version?: number, expiration?: number): Promise { + const meta = { + cacheVersion: this.version || 0, + expiration: expiration || 0, + entryVersion: version + }; + await this.drive.put(path + ".meta.json", JSON.stringify(meta)); + + return await this.drive.outputStream(path); + } + + async commit(){ + await this.drive.commit(); + } +} + +export default class Cache { + CACHES: { [key: string]: CacheDisk } = {}; + cacheUrl: string; + drives: HyperdrivePool; + poolId: string; + cacheVersion: number = 0; + constructor(cacheUrl: string, drives: HyperdrivePool, poolId: string, version?: number) { + this.cacheUrl = cacheUrl; + this.drives = drives; + this.poolId = poolId; + if(version){ + this.cacheVersion = version; + }else{ + + this.cacheVersion = Math.floor(Utils.satoshiTimestamp()/ 1000); + } + this._commit(); + } + + async _commit(){ + for (const cache of Object.values(this.CACHES)){ + await cache.commit(); + } + setTimeout(()=>this._commit(),10000); + } + + async get(name: string): Promise { + if (this.CACHES[name]) { + console.log("Cache instance found"); + return this.CACHES[name]; + } + + const bundleUrlPath = Path.join(this.cacheUrl, name + ".bundledcache"); + let bundleUrl = ""; + if (Fs.existsSync(bundleUrlPath)) { + console.log("Freezed cache instance found. Warming up..."); + bundleUrl = await Fs.promises.readFile(bundleUrlPath, { encoding: "utf-8" }); + } else { + console.log("Create new cache instance."); + bundleUrl = await this.drives.create(this.poolId); + await Fs.promises.writeFile(bundleUrlPath, bundleUrl, { encoding: "utf-8" }); + } + + let version; + [bundleUrl, version] = await this.drives.open(this.poolId, bundleUrl); + const drive = await this.drives.get(this.poolId, bundleUrl); + drive.on("close", () => { + delete this.CACHES[name]; + }); + const cacheDrive = new CacheDisk(drive, this.cacheVersion); + this.CACHES[name] = cacheDrive; + return cacheDrive; + } +} \ No newline at end of file diff --git a/src/HyperdrivePool.ts b/src/HyperdrivePool.ts index 7560120..b624cbe 100644 --- a/src/HyperdrivePool.ts +++ b/src/HyperdrivePool.ts @@ -12,19 +12,22 @@ import Utils from "./Utils"; import { generateSecretKey } from "nostr-tools"; import Fs from "fs"; import Path from "path"; +import { EventEmitter } from "events"; + import { hexToBytes, bytesToHex } from "@noble/hashes/utils"; export type DriverOut = { flushAndWait:()=>Promise write:(data:Buffer|string|Uint8Array)=>void }; export type ReadableGreedy = Readable & {readAll:()=>Promise}; -export class SharedDrive { +export class SharedDrive extends EventEmitter { drives: Hyperdrive[] = []; bundleUrl: string; lastAccess: number; isClosed: boolean = false; conn: NostrConnector; constructor(bundleUrl: string, conn: NostrConnector) { + super(); this.bundleUrl = bundleUrl; this.lastAccess = Date.now(); this.conn = conn; @@ -41,7 +44,6 @@ export class SharedDrive { return this.drives.some((drive) => drive.key.toString("hex") === key); } - getVersion(): number { let version = 0; for (const drive of this.drives) { @@ -52,36 +54,47 @@ export class SharedDrive { return version; } - async put(path: string, data: string|Uint8Array) { + async put(path: string, data: string | Uint8Array) { + if (!path.startsWith("/")) { + path = "/" + path; + } this.lastAccess = Date.now(); + let atLeastOneDrive = false; for (const drive of this.drives) { if (drive.writable) { await drive.put(path, data); + atLeastOneDrive = true; } } - + if (!atLeastOneDrive) throw "No writable drives"; } - async get(path: string) : Promise { + async get(path: string): Promise { + if (!path.startsWith("/")) path = "/" + path; // TODO: Pick newer this.lastAccess = Date.now(); + let data = undefined; for (const drive of this.drives) { if (await drive.exists(path)) { - return drive.get(path); + console.log("Found file ",path," in drive", drive.key.toString("hex")); + data = drive.get(path); } } - throw "File not found"; + if (data) return data; + throw "File not found"; } - async outputStream(path: string): Promise { const streams = []; + let atLeastOneDrive = false; for (const drive of this.drives) { if (drive.writable) { streams.push(drive.createWriteStream(path)); + atLeastOneDrive = true; } } + if (!atLeastOneDrive) throw "No writable drives"; this.lastAccess = Date.now(); // const pt = new PassThrough() as any; // pt.on("error", (e) => console.log(`Error in outputStream: ${e}`)); @@ -112,6 +125,7 @@ export class SharedDrive { const pt: DriverOut = { flushAndWait: async () => { let endedStreams = 0; + for (const stream of streams) { stream.end(); stream.on("close", () => { @@ -128,22 +142,24 @@ export class SharedDrive { } }, }; - return pt; } async exists(path: string) { + if (!path.startsWith("/")) path = "/" + path; this.lastAccess = Date.now(); + // let f=await this.list(path); + // return f.length>0; for (const drive of this.drives) { - if (await drive.exists(path)) { - return true; - } + if (await drive.exists(path)) return true; } + console.log("File not found", path); return false; } - async list(path: string) { + async list(path: string = "/") { + if (!path.startsWith("/")) path = "/" + path; this.lastAccess = Date.now(); const files = []; for (const drive of this.drives) { @@ -190,8 +206,13 @@ export class SharedDrive { this.lastAccess = Date.now(); } - async close(){ - await this.conn.unannounceHyperdrive( this.bundleUrl); + async close() { + this.emit("close"); + try { + await this.conn.unannounceHyperdrive(this.bundleUrl); + } catch (e) { + console.log("Error unannouncing", e); + } for (const drive of this.drives) { await drive.close(); } @@ -200,7 +221,7 @@ export class SharedDrive { async commit() { this.lastAccess = Date.now(); - for(const drive of this.drives) { + for (const drive of this.drives) { if (!drive._instanceLocal) { console.log("Skip commit of remote drive", drive.key.toString("hex")); continue; @@ -228,7 +249,6 @@ export default class HyperdrivePool { nPeers: number = 0; driverTimeout: number = 1000 * 60 * 60 * 1; // 1 hour isClosed: boolean = false; - createdNow: string[] = [] constructor(storagePath: string, conn: NostrConnector, topic: string = "OpenAgentsBlobStore") { this.store = new Corestore(storagePath, { secretKey: conn.sk, diff --git a/src/NostrConnector.ts b/src/NostrConnector.ts index 0a6bb4c..159e139 100644 --- a/src/NostrConnector.ts +++ b/src/NostrConnector.ts @@ -500,6 +500,29 @@ export default class NostrConnector { })); } + async deleteEvents(ids){ + const maxIdsPerEvent = 21; + const waitQueue = []; + for(let i = 0; i < ids.length;i+=maxIdsPerEvent){ + const idsToDelete = []; + for(let j = 0; j < maxIdsPerEvent && i+j < ids.length;j++){ + idsToDelete.push(ids[i]); + } + waitQueue.push(this.sendEvent( + { + kind: 5, + created_at: Math.floor(Date.now() / 1000), + tags: [...idsToDelete.map((id) => ["e", id])], + content: "Driver closed", + }, + true + )); + } + + return Promise.all(waitQueue); + + } + async unannounceHyperdrive( bundleUrl: string): Promise { const discoveryHash = await Utils.getHyperdriveDiscoveryKey(bundleUrl); const filter: Filter = { @@ -508,19 +531,44 @@ export default class NostrConnector { "#m": ["application/hyperdrive+bundle"] }; const events = await this.query(filter); - await this.sendEvent({ - kind: 5, - created_at: Math.floor(Date.now() / 1000), - tags: [ - ...events.map((event) => ["e", event.id]), - ], - content: "Driver closed" - }, true); + // await this.sendEvent({ + // kind: 5, + // created_at: Math.floor(Date.now() / 1000), + // tags: [ + // ...events.map((event) => ["e", event.id]), + // ], + // content: "Driver closed" + // }, true); + await this.deleteEvents(events.map((event) => event.id)); } async announceHyperdrive(nodeId: string, bundleUrl: string, driverUrl: string, version: string|number): Promise { const encryptedData = await Utils.encryptHyperdrive(driverUrl, bundleUrl); + const filter: Filter = { + kinds: [1063], + "#x": [encryptedData.discoveryHash], + "#m": ["application/hyperdrive+bundle"] + }; + const oldAnnouncements = await this.query(filter); + // for(const event of oldAnnouncements){ + // const oldUrl = Utils.getTagVars(event, ["url"])[0][0]; + // if(oldUrl == encryptedData.encryptedUrl){ + // return event.id; + // } + // } + let deleteOlds; + try{ + deleteOlds = this.deleteEvents(oldAnnouncements.filter((event)=>{ + const oldUrl = Utils.getTagVars(event, ["url"])[0][0]; + const nodeId = Utils.getTagVars(event, ["d"])[0][0]; + return oldUrl == encryptedData.encryptedUrl && nodeId == nodeId; + }).map((event) => event.id)); + }catch(e){ + console.error("Error deleting old announcements", e); + deleteOlds = Promise.resolve(); + } + const event: EventTemplate = { kind: 1063, created_at: Math.floor(Date.now() / 1000), @@ -533,7 +581,9 @@ export default class NostrConnector { ], content: "", }; - const submittedEvent = await this.sendEvent(event, true); - return submittedEvent.id; + + const submittedEvent = this.sendEvent(event, true); + await deleteOlds; + return (await submittedEvent).id; } } \ No newline at end of file diff --git a/src/RPCServer.ts b/src/RPCServer.ts index cd5a3d2..c637f50 100644 --- a/src/RPCServer.ts +++ b/src/RPCServer.ts @@ -53,20 +53,28 @@ import { RpcDiskWriteFileRequest, RpcDiskWriteFileResponse, RpcOpenDiskRequest, - RpcOpenDiskResponse + RpcOpenDiskResponse, + RpcCacheGetRequest, + RpcCacheGetResponse, + RpcCacheSetRequest, + RpcCacheSetResponse } from "openagents-grpc-proto"; import Job from "./Job"; import NostrConnector from "./NostrConnector"; import Fs from 'fs'; -import HyperdrivePool from "./HyperdrivePool"; +import HyperdrivePool, { SharedDrive } from "./HyperdrivePool"; import {DriverOut} from "./HyperdrivePool"; +import Cache, { CacheDisk } from "./Cache"; + class RpcConnector implements IPoolConnector { conn: NostrConnector; hyp: HyperdrivePool; - constructor(conn, hyp) { + cache: Cache; + constructor(conn, hyp, cache) { this.conn = conn; this.hyp = hyp; + this.cache = cache; } async createDisk( @@ -91,10 +99,11 @@ class RpcConnector implements IPoolConnector { async openDisk(request: RpcOpenDiskRequest, context: ServerCallContext): Promise { try { const nodeId = this.getNodeId(context); - const [disk,version] = await this.hyp.open(nodeId, request.url); + const [disk, version] = await this.hyp.open(nodeId, request.url); return { success: true, diskId: disk, + version: version, }; } catch (e) { console.log(e); @@ -114,6 +123,7 @@ class RpcConnector implements IPoolConnector { throw e; } } + async diskDeleteFile( request: RpcDiskDeleteFileRequest, context: ServerCallContext @@ -158,7 +168,7 @@ class RpcConnector implements IPoolConnector { const disk = await this.hyp.get(nodeId, request.diskId); const readStream = await disk.inputStream(request.path); for await (const chunk of readStream) { - responses.send({ data: chunk }); + responses.send({ data: chunk, exists: true }); } await responses.complete(); } catch (e) { @@ -201,6 +211,63 @@ class RpcConnector implements IPoolConnector { } } + async cacheSet( + requests: RpcOutputStream, + context: ServerCallContext + ): Promise { + try { + const nodeId = this.getNodeId(context); + const cache = await this.getCache(context); + let outputStream: DriverOut | undefined; + for await (const request of requests) { + try { + if (!outputStream) { + outputStream = await cache.setAsStream( + request.key, + request.version, + request.expireAt || 0 + ); + } + await outputStream.write(request.data); + } catch (e) { + console.log(e); + throw e; + } + } + await outputStream.flushAndWait(); + return { + success: true, + }; + } catch (e) { + console.log(e); + throw e; + } + } + + async cacheGet( + request: RpcCacheGetRequest, + responses: RpcInputStream, + context: ServerCallContext + ): Promise { + try { + const nodeId = this.getNodeId(context); + const cache = await this.getCache(context); + const readStream = await cache.getAsStream(request.key, request.lastVersion); + if (!readStream) { + await responses.send({ data: Buffer.from([]), exists: false }); + await responses.complete(); + } else { + for await (const chunk of readStream) { + await responses.send({ data: chunk, exists: true }); + } + await responses.complete(); + } + } catch (e) { + console.log(e); + throw e; + } + } + async diskWriteSmallFile( request: RpcDiskWriteFileRequest, context: ServerCallContext @@ -228,6 +295,7 @@ class RpcConnector implements IPoolConnector { const data = await disk.get(request.path); return { data, + exists: true, }; } catch (e) { console.log(e); @@ -239,6 +307,11 @@ class RpcConnector implements IPoolConnector { return context.headers["nodeid"] as string; } + async getCache(context: ServerCallContext): Promise { + const cacheId = context.headers["cacheid"] as string; + return this.cache.get(cacheId); + } + async getJob(request: RpcGetJob, context: ServerCallContext): Promise { try { const nodeId = this.getNodeId(context); @@ -401,7 +474,7 @@ class RpcConnector implements IPoolConnector { } async getEvents(request: RpcGetEventsRequest, context: ServerCallContext): Promise { - try{ + try { const nodeId = this.getNodeId(context); const events: string[] = await this.conn.getAndConsumeCustomEvents( request.groupId, @@ -489,6 +562,7 @@ export default class RPCServer { serverKey: Buffer | undefined; hyperdrivePool: HyperdrivePool; poolSecretKey: string; + cache: Cache; constructor( poolSecretKey: string, addr: string, @@ -496,6 +570,7 @@ export default class RPCServer { descriptorPath: string, nostrConnector: NostrConnector, hyperdrivePool: HyperdrivePool, + cache: Cache, caCrt?: Buffer, serverCrt?: Buffer, serverKey?: Buffer @@ -508,6 +583,8 @@ export default class RPCServer { this.serverCrt = serverCrt; this.serverKey = serverKey; this.hyperdrivePool = hyperdrivePool; + this.cache = cache; + this.poolSecretKey = poolSecretKey; } async start() { @@ -521,7 +598,7 @@ export default class RPCServer { this.poolSecretKey, GPRCBackend.adaptService( PoolConnector, - new RpcConnector(this.nostrConnector, this.hyperdrivePool) + new RpcConnector(this.nostrConnector, this.hyperdrivePool, this.cache) ) ) ); diff --git a/src/Utils.ts b/src/Utils.ts index 9abcbf4..3ac17a3 100644 --- a/src/Utils.ts +++ b/src/Utils.ts @@ -8,6 +8,13 @@ export default class Utils { return uuidv4(); } + static satoshiTimestamp(){ + // time in milliseconds since 3 january 2009 + const jan32009=new Date("2009-01-03").getTime(); + const now=new Date().getTime(); + return now-jan32009; + } + static async encryptNostr( text: string, ourPrivateKey: string | Uint8Array, diff --git a/src/main.ts b/src/main.ts index 7675bbd..d62f435 100644 --- a/src/main.ts +++ b/src/main.ts @@ -1,10 +1,12 @@ import RPCServer from "./RPCServer"; import Fs from "fs"; import NostrConnector from "./NostrConnector"; -import { generateSecretKey } from "nostr-tools"; -import { bytesToHex } from '@noble/hashes/utils' +import { generateSecretKey, getPublicKey } from "nostr-tools"; +import { bytesToHex, hexToBytes } from '@noble/hashes/utils' import WebHooks from "./WebHooks"; import HyperdrivePool from "./HyperdrivePool"; +import Cache from "./Cache"; +import Path from "path"; async function main(){ process.on("uncaughtException", (err) => { console.error("There was an uncaught error", err); @@ -18,6 +20,7 @@ async function main(){ const SECRET_KEY = process.env.NOSTR_SECRET_KEY || bytesToHex(generateSecretKey()); const RELAYS = (process.env.NOSTR_RELAYS || "wss://nostr.rblb.it:7777").split(","); const WEBHOOKS = (process.env.WEBHOOKS || "").split(","); + const PUBLIC_KEY = getPublicKey(hexToBytes(SECRET_KEY)); const CA_CRT_PATH: string = process.env.GRPC_CA_CRT || ""; const SERVER_CRT_PATH: string = process.env.GRPC_SERVER_CRT || ""; @@ -30,13 +33,13 @@ async function main(){ const SERVER_KEY: Buffer | undefined = SERVER_KEY_PATH && Fs.existsSync(SERVER_KEY_PATH) ? Fs.readFileSync(SERVER_KEY_PATH) : undefined; - - const BLOB_STORAGE_PATH = process.env.BLOB_STORAGE_PATH || "./data/hyperpool"; + const BLOB_STORAGE_PATH = Path.join((process.env.BLOB_STORAGE_PATH || "./data/hyperpool"),PUBLIC_KEY); const webhooks = new WebHooks(WEBHOOKS); const nostr = new NostrConnector(SECRET_KEY, RELAYS, undefined); nostr.setWebHooks(webhooks); const hyp = new HyperdrivePool(BLOB_STORAGE_PATH, nostr); + const cache = new Cache(BLOB_STORAGE_PATH, hyp, PUBLIC_KEY); const server = new RPCServer( SECRET_KEY, IP, @@ -44,6 +47,7 @@ async function main(){ DESCRIPTOR_PATH, nostr, hyp, + cache, CA_CRT, SERVER_CRT, SERVER_KEY