Skip to content

Commit

Permalink
Update proto, add cache, fix some issues with blob storage
Browse files Browse the repository at this point in the history
  • Loading branch information
riccardobl committed Apr 22, 2024
1 parent d945830 commit b66cfd1
Show file tree
Hide file tree
Showing 9 changed files with 327 additions and 44 deletions.
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
"install": "^0.13.0",
"nostr-tools": "^2.3.2",
"npm": "^10.5.0",
"openagents-grpc-proto": "https://github.com/riccardobl/openagents-grpc-proto/releases/download/v0.6/openagents_grpc_proto-JAVASCRIPT.tgz",
"openagents-grpc-proto": "https://github.com/riccardobl/openagents-grpc-proto/releases/download/v0.7.2/openagents_grpc_proto-JAVASCRIPT.tgz",
"ts-node": "^10.9.2",
"ts-protoc-gen": "^0.15.0",
"uuidv4": "^6.2.13",
Expand Down
2 changes: 1 addition & 1 deletion src/Auth.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import * as GRPC from "@grpc/grpc-js";
import Utils from "./Utils";
import { generateSecretKey, getPublicKey } from "nostr-tools";

export default class Auth {


Expand All @@ -22,6 +21,7 @@ export default class Auth {
const token: string = metadata["authorization"] || Utils.uuidFrom(call.getPeer());
const id = Utils.uuidFrom(token);
call.metadata.set("nodeid", id);
call.metadata.set("cacheid", id);
if (this.isAuthorized(methodName, id)) {
methodImplementation(call, callback);
} else {
Expand Down
125 changes: 125 additions & 0 deletions src/Cache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
import HyperdrivePool, { DriverOut, ReadableGreedy } from "./HyperdrivePool";
import { SharedDrive } from "./HyperdrivePool";
import Path from "path";
import Fs from "fs";
import Utils from "./Utils";
export class CacheDisk {
drive: SharedDrive;
version: number;
constructor(drive: SharedDrive, cacheVersion: number) {
this.drive = drive;
this.version = cacheVersion ;

}

async getAsStream(path: string, requestedEntryVersion?:number): Promise<ReadableGreedy> {
if (!(await this.drive.exists(path))) {
console.log("Cache miss", path);
return undefined;
}

const meta = JSON.parse((await this.drive.get(path + ".meta.json")).toString());


if (meta.cacheVersion != this.version) {
console.log(
"Cache version mismatch",
path,
"current:",
this.version,
"stored:",
meta.cacheVersion
);
return undefined;
}

if (requestedEntryVersion && requestedEntryVersion != meta.entryVersion) {
console.log(
"Entry version mismatch",
path,
"requested:",
requestedEntryVersion,
"stored:",
meta.entryVersion
);
return undefined;
}

if (meta.expiration && meta.expiration < Date.now()) {
console.log("Expired cache", path, "expiration:", meta.expiration, "now:", Date.now());
return undefined;
}

return await this.drive.inputStream(path);
}

async setAsStream(path: string, version?: number, expiration?: number): Promise<DriverOut> {
const meta = {
cacheVersion: this.version || 0,
expiration: expiration || 0,
entryVersion: version
};
await this.drive.put(path + ".meta.json", JSON.stringify(meta));

return await this.drive.outputStream(path);
}

async commit(){
await this.drive.commit();
}
}

export default class Cache {
CACHES: { [key: string]: CacheDisk } = {};
cacheUrl: string;
drives: HyperdrivePool;
poolId: string;
cacheVersion: number = 0;
constructor(cacheUrl: string, drives: HyperdrivePool, poolId: string, version?: number) {
this.cacheUrl = cacheUrl;
this.drives = drives;
this.poolId = poolId;
if(version){
this.cacheVersion = version;
}else{

this.cacheVersion = Math.floor(Utils.satoshiTimestamp()/ 1000);
}
this._commit();
}

async _commit(){
for (const cache of Object.values(this.CACHES)){
await cache.commit();
}
setTimeout(()=>this._commit(),10000);
}

async get(name: string): Promise<CacheDisk> {
if (this.CACHES[name]) {
console.log("Cache instance found");
return this.CACHES[name];
}

const bundleUrlPath = Path.join(this.cacheUrl, name + ".bundledcache");
let bundleUrl = "";
if (Fs.existsSync(bundleUrlPath)) {
console.log("Freezed cache instance found. Warming up...");
bundleUrl = await Fs.promises.readFile(bundleUrlPath, { encoding: "utf-8" });
} else {
console.log("Create new cache instance.");
bundleUrl = await this.drives.create(this.poolId);
await Fs.promises.writeFile(bundleUrlPath, bundleUrl, { encoding: "utf-8" });
}

let version;
[bundleUrl, version] = await this.drives.open(this.poolId, bundleUrl);
const drive = await this.drives.get(this.poolId, bundleUrl);
drive.on("close", () => {
delete this.CACHES[name];
});
const cacheDrive = new CacheDisk(drive, this.cacheVersion);
this.CACHES[name] = cacheDrive;
return cacheDrive;
}
}
54 changes: 37 additions & 17 deletions src/HyperdrivePool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,22 @@ import Utils from "./Utils";
import { generateSecretKey } from "nostr-tools";
import Fs from "fs";
import Path from "path";
import { EventEmitter } from "events";

import { hexToBytes, bytesToHex } from "@noble/hashes/utils";
export type DriverOut = {
flushAndWait:()=>Promise<void>
write:(data:Buffer|string|Uint8Array)=>void
};
export type ReadableGreedy = Readable & {readAll:()=>Promise<Buffer>};
export class SharedDrive {
export class SharedDrive extends EventEmitter {
drives: Hyperdrive[] = [];
bundleUrl: string;
lastAccess: number;
isClosed: boolean = false;
conn: NostrConnector;
constructor(bundleUrl: string, conn: NostrConnector) {
super();
this.bundleUrl = bundleUrl;
this.lastAccess = Date.now();
this.conn = conn;
Expand All @@ -41,7 +44,6 @@ export class SharedDrive {
return this.drives.some((drive) => drive.key.toString("hex") === key);
}


getVersion(): number {
let version = 0;
for (const drive of this.drives) {
Expand All @@ -52,36 +54,47 @@ export class SharedDrive {
return version;
}

async put(path: string, data: string|Uint8Array) {
async put(path: string, data: string | Uint8Array) {
if (!path.startsWith("/")) {
path = "/" + path;
}
this.lastAccess = Date.now();
let atLeastOneDrive = false;
for (const drive of this.drives) {
if (drive.writable) {
await drive.put(path, data);
atLeastOneDrive = true;
}
}

if (!atLeastOneDrive) throw "No writable drives";
}

async get(path: string) : Promise<Buffer> {
async get(path: string): Promise<Buffer> {
if (!path.startsWith("/")) path = "/" + path;
// TODO: Pick newer
this.lastAccess = Date.now();
let data = undefined;
for (const drive of this.drives) {
if (await drive.exists(path)) {
return drive.get(path);
console.log("Found file ",path," in drive", drive.key.toString("hex"));
data = drive.get(path);
}
}
throw "File not found";
if (data) return data;

throw "File not found";
}


async outputStream(path: string): Promise<DriverOut> {
const streams = [];
let atLeastOneDrive = false;
for (const drive of this.drives) {
if (drive.writable) {
streams.push(drive.createWriteStream(path));
atLeastOneDrive = true;
}
}
if (!atLeastOneDrive) throw "No writable drives";
this.lastAccess = Date.now();
// const pt = new PassThrough() as any;
// pt.on("error", (e) => console.log(`Error in outputStream: ${e}`));
Expand Down Expand Up @@ -112,6 +125,7 @@ export class SharedDrive {
const pt: DriverOut = {
flushAndWait: async () => {
let endedStreams = 0;

for (const stream of streams) {
stream.end();
stream.on("close", () => {
Expand All @@ -128,22 +142,24 @@ export class SharedDrive {
}
},
};


return pt;
}

async exists(path: string) {
if (!path.startsWith("/")) path = "/" + path;
this.lastAccess = Date.now();
// let f=await this.list(path);
// return f.length>0;
for (const drive of this.drives) {
if (await drive.exists(path)) {
return true;
}
if (await drive.exists(path)) return true;
}
console.log("File not found", path);
return false;
}

async list(path: string) {
async list(path: string = "/") {
if (!path.startsWith("/")) path = "/" + path;
this.lastAccess = Date.now();
const files = [];
for (const drive of this.drives) {
Expand Down Expand Up @@ -190,8 +206,13 @@ export class SharedDrive {
this.lastAccess = Date.now();
}

async close(){
await this.conn.unannounceHyperdrive( this.bundleUrl);
async close() {
this.emit("close");
try {
await this.conn.unannounceHyperdrive(this.bundleUrl);
} catch (e) {
console.log("Error unannouncing", e);
}
for (const drive of this.drives) {
await drive.close();
}
Expand All @@ -200,7 +221,7 @@ export class SharedDrive {

async commit() {
this.lastAccess = Date.now();
for(const drive of this.drives) {
for (const drive of this.drives) {
if (!drive._instanceLocal) {
console.log("Skip commit of remote drive", drive.key.toString("hex"));
continue;
Expand Down Expand Up @@ -228,7 +249,6 @@ export default class HyperdrivePool {
nPeers: number = 0;
driverTimeout: number = 1000 * 60 * 60 * 1; // 1 hour
isClosed: boolean = false;
createdNow: string[] = []
constructor(storagePath: string, conn: NostrConnector, topic: string = "OpenAgentsBlobStore") {
this.store = new Corestore(storagePath, {
secretKey: conn.sk,
Expand Down
Loading

0 comments on commit b66cfd1

Please sign in to comment.