diff --git a/@xen-orchestra/vmdk/src/producer/VmdkSeSparse.mts b/@vates/generator-toolbox/src/bench.mts similarity index 100% rename from @xen-orchestra/vmdk/src/producer/VmdkSeSparse.mts rename to @vates/generator-toolbox/src/bench.mts diff --git a/@vates/generator-toolbox/src/synchronized.mts b/@vates/generator-toolbox/src/synchronized.mts index 6193239ae77..5e8c9c64d9f 100644 --- a/@vates/generator-toolbox/src/synchronized.mts +++ b/@vates/generator-toolbox/src/synchronized.mts @@ -18,6 +18,8 @@ export class Synchronized { } fork(uid: string): AsyncGenerator { + console.log('tooboxk FORK ') + uid = ''+Math.random() assert.strictEqual(this.#started, false, `can't create a fork after consuming the data`) const fork = new Forked(this, uid) this.#forks.set(uid, fork) @@ -28,6 +30,7 @@ export class Synchronized { if (!this.#nextValueForksReady) { throw new Error('Can t wait forks if there are noone waiting') } + console.log('wait for forks') const { promise, forksWaitingResolve } = this.#nextValueForksReady if (this.#waitingForks.size === this.#forks.size) { // reset value @@ -39,6 +42,7 @@ export class Synchronized { } async next(uid: string): Promise> { + console.log('next', uid) if (this.#removedForks.has(uid)) { return { done: true, value: undefined } } @@ -77,6 +81,7 @@ export class Synchronized { } async remove(uid: string, error?: Error): Promise> { + console.log('remove', uid) const fork = this.#forks.get(uid) if (fork === undefined) { if (this.#removedForks.has(uid)) { @@ -133,16 +138,20 @@ class Forked implements AsyncGenerator { this.#uid = uid } next(): Promise> { + console.log('forked next') return this.#parent.next(this.#uid) } async return(): Promise> { + console.log('forked return') return this.#parent.remove(this.#uid) } async throw(e: Error): Promise> { + console.log('forked throw') return this.#parent.remove(this.#uid, e) } [Symbol.asyncIterator](): AsyncGenerator { + console.log('in Forked.symbol') return this } } diff --git a/@xen-orchestra/backups/RemoteAdapter.mjs b/@xen-orchestra/backups/RemoteAdapter.mjs index 76cf1ba1a04..de8072a734e 100644 --- a/@xen-orchestra/backups/RemoteAdapter.mjs +++ b/@xen-orchestra/backups/RemoteAdapter.mjs @@ -689,7 +689,7 @@ export class RemoteAdapter { if (this.useVhdDirectory()) { await writeToVhdDirectory( - { disk }, + { disk , target: { handler, path, @@ -697,6 +697,7 @@ export class RemoteAdapter { validator, compression: 'brotli', } + } ) } else { const stream = await toVhdStream({ disk }) diff --git a/@xen-orchestra/backups/_runners/_vmRunners/IncrementalXapi.mjs b/@xen-orchestra/backups/_runners/_vmRunners/IncrementalXapi.mjs index c0e2cc3716f..84861460d4f 100644 --- a/@xen-orchestra/backups/_runners/_vmRunners/IncrementalXapi.mjs +++ b/@xen-orchestra/backups/_runners/_vmRunners/IncrementalXapi.mjs @@ -13,7 +13,8 @@ import { setVmDeltaChainLength, markExportSuccessfull, } from '../../_otherConfig.mjs' -import { forkDeltaExport } from './_forkDeltaExport.mjs' +import { forkDeltaExport } from './_forkDeltaExport.mjs' +import { SynchronizedDisk } from '@xen-orchestra/disk-transform' const { debug } = createLogger('xo:backups:IncrementalXapiVmBackup') @@ -40,10 +41,12 @@ export const IncrementalXapi = class IncrementalXapiVmBackupRunner extends Abstr }) const isVhdDifferencing = {} - Object.entries(deltaExport.disks).forEach(([key, disk])=>{ + for(const key in deltaExport.disks){ + const disk = deltaExport.disks[key] isVhdDifferencing[key] = disk.isDifferencing() - }) - + deltaExport.disks[key] = new SynchronizedDisk(disk) + await deltaExport.disks[key].init() + } //deltaExport.disks = mapValues(deltaExport.disks, disk=>this._throttleGenerator.createThrottledGenerator(disk) ) // @todo : reimplement fork, throttle, validation,isVhdDifferencingDisk , nbd use @@ -52,7 +55,7 @@ export const IncrementalXapi = class IncrementalXapiVmBackupRunner extends Abstr await this._callWriters( writer => writer.transfer({ - deltaExport: forkDeltaExport(deltaExport), + deltaExport, isVhdDifferencing, sizeContainers: {}, timestamp, diff --git a/@xen-orchestra/disk-transform/src/Disk.mts b/@xen-orchestra/disk-transform/src/Disk.mts index 7080224d92c..926c972edea 100644 --- a/@xen-orchestra/disk-transform/src/Disk.mts +++ b/@xen-orchestra/disk-transform/src/Disk.mts @@ -11,7 +11,6 @@ export type BytesLength = number export abstract class Disk { generatedDiskBlocks = 0 yieldedDiskBlocks = 0 - #synchronized:Synchronized | undefined #parent?: Disk get parent(): Disk | undefined { return this.#parent @@ -23,11 +22,17 @@ export abstract class Disk { abstract close(): Promise abstract isDifferencing(): boolean - // optional method, must throw if disk is not differencing + // optional method instantiateParent(): Promise { throw new Error('Method not implemented.') } async openParent(): Promise { + if(this.#parent!==undefined){ + return this.#parent + } + if(!this.isDifferencing()){ + throw new Error("Can't open the parent of a non differencing disk") + } this.#parent = await this.instantiateParent() await this.#parent.init() return this.#parent @@ -37,29 +42,27 @@ export abstract class Disk { * return the block without any order nor stability guarantee */ abstract getBlockIndexes(): Array + // must return true if the block is present in this disk + // without looking at the parent abstract hasBlock(index: number): boolean abstract buildDiskBlockGenerator(): Promise> | AsyncGenerator - async diskBlocks(uid?:string): Promise>{ - if(this.#synchronized === undefined){ - try{ + async * diskBlocks(uid?:string): AsyncGenerator{ + try { + const blockGenerator = await this.buildDiskBlockGenerator() + console.log('got generator', blockGenerator) + for await (const block of blockGenerator){ + this.generatedDiskBlocks ++ + yield block + this.yieldedDiskBlocks ++ - const blockGenerator = await this.buildDiskBlockGenerator() - const self = this - async function *trackedGenerator():AsyncGenerator{ - for await (const block of blockGenerator){ - self.generatedDiskBlocks ++ - yield block - self.yieldedDiskBlocks ++ - - } } - this.#synchronized = new Synchronized(trackedGenerator()) + console.log('done') + }catch(err){ + console.error('Disk.generator', err) }finally{ + console.log('finally') await this.close() } - } - return this.#synchronized.fork(uid ?? "unanamed generator fork") - } check() { @@ -82,6 +85,8 @@ export abstract class RandomAccessDisk extends Disk { get parent(): RandomAccessDisk | undefined { return this.#parent } + // can read data parent if block size are not aligned + // but only if this disk has data on this block abstract readBlock(index: number): Promise async *buildDiskBlockGenerator() { for (const index of this.getBlockIndexes()) { diff --git a/@xen-orchestra/disk-transform/src/DiskChain.mts b/@xen-orchestra/disk-transform/src/DiskChain.mts index af438cada34..084c40a9094 100644 --- a/@xen-orchestra/disk-transform/src/DiskChain.mts +++ b/@xen-orchestra/disk-transform/src/DiskChain.mts @@ -13,6 +13,12 @@ export class DiskChain extends RandomAccessDisk { getBlockSize(): number { return this.#disks[0].getBlockSize() } + + /** + * the main difference with the base disk block method + * is that if any of th disk has this block it return true + * + */ hasBlock(index: number): boolean { for (let i = this.#disks.length - 1; i >= 0; i--) { if (this.#disks[i].hasBlock(index)) { @@ -21,6 +27,7 @@ export class DiskChain extends RandomAccessDisk { } return false } + readBlock(index: number): Promise { for (let i = this.#disks.length - 1; i >= 0; i--) { if (this.#disks[i].hasBlock(index)) { @@ -43,4 +50,15 @@ export class DiskChain extends RandomAccessDisk { isDifferencing(): boolean { return this.#disks[0].isDifferencing() } + static async openFromChild(child:RandomAccessDisk):Promise{ + let disk = child + const disks=[disk] + while (disk.isDifferencing()) { + disk = await disk.openParent() as RandomAccessDisk + disks.unshift(disk) + // @todo handle until + } + return new DiskChain({ disks }) + + } } diff --git a/@xen-orchestra/disk-transform/src/RawDisk.mts b/@xen-orchestra/disk-transform/src/RawDisk.mts new file mode 100644 index 00000000000..c7a5d8a819b --- /dev/null +++ b/@xen-orchestra/disk-transform/src/RawDisk.mts @@ -0,0 +1,63 @@ +import { DiskBlock, RandomAccessDisk } from "./Disk.mjs"; +import { FileAccessor } from "./FileAccessor.mjs"; + + +export class RawDisk extends RandomAccessDisk{ + #accessor + #path + #descriptor:number|undefined + #blockSize:number + #size: number|undefined + + + constructor(accessor: FileAccessor, path: string, blockSize:number){ + super() + this.#accessor= accessor + this.#path = path + this.#blockSize = blockSize + } + async readBlock(index: number): Promise { + if(this.#descriptor === undefined){ + throw new Error("Can't call readBlock before init"); + } + const data = Buffer.alloc(this.getBlockSize(), 0) + await this.#accessor.read(this.#descriptor, data, index*this.getBlockSize()) + return { + index, + data + } + } + getVirtualSize(): number { + if(this.#size === undefined){ + throw new Error("Can't call getVirtualsize before init"); + } + return this.#size + } + getBlockSize(): number { + return this.#blockSize + } + async init(): Promise { + this.#descriptor = await this.#accessor.open(this.#path) + this.#size = await this.#accessor.getSize(this.#path) + } + async close(): Promise { + this.#descriptor && await this.#accessor.close(this.#descriptor) + this.#descriptor = undefined + this.#size = undefined + } + isDifferencing(): boolean { + return false + } + getBlockIndexes(): Array { + const nbBlocks = Math.ceil(this.getVirtualSize()/ this.getBlockSize()) + const index =[] + for(let i=0; i < nbBlocks; i ++){ + index.push(i) + } + return index + } + hasBlock(index: number): boolean { + return index * this.getBlockSize() < this.getVirtualSize() + } + +} \ No newline at end of file diff --git a/@xen-orchestra/disk-transform/src/SynchronizedDisk.mts b/@xen-orchestra/disk-transform/src/SynchronizedDisk.mts new file mode 100644 index 00000000000..7fe7b539387 --- /dev/null +++ b/@xen-orchestra/disk-transform/src/SynchronizedDisk.mts @@ -0,0 +1,29 @@ +import {DiskPassthrough} from './DiskPassthrough.mjs' +import {Synchronized} from '@vates/generator-toolbox' +import {Disk, DiskBlock} from './Disk.mjs' + +export class SynchronizedDisk extends DiskPassthrough{ + #synchronized:Synchronized |undefined + #source:Disk + constructor(source:Disk){ + super() + this.#source = source + } + async openSource():Promise{ + // await this.#source.init() + this.#synchronized = new Synchronized(await this.#source.buildDiskBlockGenerator()) + return this.#source + } + async * diskBlocks(uid:string): AsyncGenerator{ + console.log('will fork') + if(this.#synchronized === undefined){ + throw new Error("Can't cann fork before init") + } + return this.#synchronized.fork(uid) + } + + async close(){ + console.log('SynchronizedDisk.close') + await this.source.close() // this will trigger cleanup in syncrhonized + } +} \ No newline at end of file diff --git a/@xen-orchestra/disk-transform/src/index.mts b/@xen-orchestra/disk-transform/src/index.mts index 30499c3af55..8ae0d48976b 100644 --- a/@xen-orchestra/disk-transform/src/index.mts +++ b/@xen-orchestra/disk-transform/src/index.mts @@ -3,3 +3,5 @@ export { DiskChain } from './DiskChain.mjs' export { NegativeDisk } from './NegativeDisk.mjs' export { FileAccessor } from './FileAccessor.mjs' export { DiskPassthrough, RandomDiskPassthrough } from './DiskPassthrough.mjs' +export { RawDisk } from './RawDisk.mjs' +export { SynchronizedDisk } from './SynchronizedDisk.mjs' diff --git a/@xen-orchestra/vmdk/src/utils/streamOptimize.mts b/@xen-orchestra/vmdk/src/consumer/streamOptimize.mts similarity index 100% rename from @xen-orchestra/vmdk/src/utils/streamOptimize.mts rename to @xen-orchestra/vmdk/src/consumer/streamOptimize.mts diff --git a/@xen-orchestra/vmdk/src/disks/Vmdk.mts b/@xen-orchestra/vmdk/src/disks/Vmdk.mts new file mode 100644 index 00000000000..99a2c68c6df --- /dev/null +++ b/@xen-orchestra/vmdk/src/disks/Vmdk.mts @@ -0,0 +1,79 @@ +import { RandomDiskPassthrough, FileAccessor, RawDisk, RandomAccessDisk} from '@xen-orchestra/disk-transform' +import { VmdkCowd } from './VmdkCowd.mjs' + +/** + * read the descriptor and instantiate the right type of vmdk + */ + + + +export class VmdkDisk extends RandomDiskPassthrough{ + #accessor:FileAccessor + #path:string + constructor(accessor:FileAccessor, path:string){ + super() + this.#accessor = accessor + this.#path = path + } + + async openSource():Promise{ + // read the start of the vmdk file + // extract the descriptor + // depending on the descriptor type + + const descriptor = await this.#accessor.open(this.#path) + let buffer = Buffer.alloc(1024,0) + await this.#accessor.read(descriptor,buffer,0) + const metadata = buffer.toString() + await this.#accessor.close(descriptor) + + // Parse the descriptor to find the type + const createTypeMatch = metadata.match(/createType="([^"]+)"/); + const createType = createTypeMatch ? createTypeMatch[1] : null; + + const parentFileNameHintMatch = metadata.match(/createType="([^"]+)"/); + const parentFileNameHint = parentFileNameHintMatch ? parentFileNameHintMatch[1][0] : null; + + + if (!createType) { + throw new Error('Unable to determine VMDK type: createType not found in descriptor.'); + } + + // check if the vmdk is monolithic or if it reference another + // by looking for a line RW 4192256 SPARSE "disk-flat.vmdk" + const extentMatches = [...metadata.matchAll(/([A-Z]+) ([0-9]+) ([A-Z]+) "(.*)" ?(.*)$/g)] + if(extentMatches.length !==0){ + throw new Error(`No extent found in descriptor ${metadata}`); + } + if(extentMatches.length > 1){ + throw new Error(`${extentMatches.length} found in descriptor ${metadata}`); + } + const [, access, sizeSectors, type, name, offset] = extentMatches[0] + if(offset){ + throw new Error('reading from monolithic file is not implemnted yet') + } + + let vmdk:RandomAccessDisk + switch(type){ + case 'VMFS': //raw + vmdk = new RawDisk(this.#accessor, this.#path, 2*1024*1024) + break + case 'VMFSSPARSE': //cowd + if(parentFileNameHint === null){ + throw new Error(`no parentFileNameHint found in ${metadata}`) + } + vmdk = new VmdkCowd(this.#accessor, this.#path, parentFileNameHint) + break + case 'SESPARSE': + throw new Error('SE SPARSE not supported') + case 'SPARSE': // stream optimized , also check createType + throw new Error('streamoptimized not supported') + default: + throw new Error(`type${type} not supported`) + } + + return vmdk + + + } +} \ No newline at end of file diff --git a/@xen-orchestra/vmdk/src/disks/VmdkCowd.mts b/@xen-orchestra/vmdk/src/disks/VmdkCowd.mts new file mode 100644 index 00000000000..c5e29c796c7 --- /dev/null +++ b/@xen-orchestra/vmdk/src/disks/VmdkCowd.mts @@ -0,0 +1,131 @@ +import { DiskBlock,FileAccessor, RandomAccessDisk } from "@xen-orchestra/disk-transform"; + + +import { strictEqual } from 'node:assert' +import { VmdkDisk } from "./Vmdk.mjs"; +import { dirname, join } from "node:path"; + +export class VmdkCowd extends RandomAccessDisk{ + #accessor:FileAccessor + #parentPath:string + #path:string + #descriptor:number|undefined + #grainDirectory: Buffer|undefined + #size:number|undefined + + constructor(accessor:FileAccessor, path:string, parentPath:string){ + super() + this.#accessor = accessor + this.#parentPath = parentPath + this.#path = path + + } + + async instantiateParent():Promise{ + return new VmdkDisk(this.#accessor, join(dirname(this.#path), this.#parentPath)) + } + + async readBlock(index: number): Promise { + const parent = await this.openParent() as RandomAccessDisk + if(this.#grainDirectory === undefined){ + throw new Error("Can't read block before calling init") + } + if(this.#descriptor === undefined){ + throw new Error("Can't read block before calling init") + } + const sectorOffset = this.#grainDirectory.readUInt32LE(index * 4) + if(sectorOffset === 0){ + return parent.readBlock(index) + } else { + + let parentBlock:DiskBlock|undefined + const graintable = Buffer.alloc( 4096 * 4 /* grain table length */) + const data = Buffer.alloc(this.getBlockSize(), 0) + const sector = Buffer.alloc(512,0) + await this.#accessor.read(this.#descriptor, graintable,sectorOffset* 512 ) + + for (let i = 0; i < graintable.length / 4; i++) { + const grainOffset = graintable.readUInt32LE(i * 4) + if (grainOffset === 0) { + // look into parent + if(parentBlock === undefined){ + parentBlock = await parent.readBlock(index) + } + parentBlock.data.copy(data, i* 512,i*512, (i+1)*512) + } else if (grainOffset === 1) { + // this is a emptied grain, no data, don't look into parent + // buffer is already zeroed + } else if (grainOffset > 1) { + // non empty grain, read from file + await this.#accessor.read(this.#descriptor,sector,grainOffset*512 ) + } + } + return {index, data} + } + + } + getVirtualSize(): number { + if(this.#size === undefined){ + throw new Error("Can't getVirtualSize before calling init") + } + return this.#size + } + getBlockSize(): number { + return 2 *1024*1024 // one grain directory entry + } + + async init(): Promise { + this.#descriptor = await this.#accessor.open(this.#path) + const buffer = Buffer.alloc(2048) + await this.#accessor.read(this.#descriptor, buffer, 0) + + strictEqual(buffer.slice(0, 4).toString('ascii'), 'COWD') + strictEqual(buffer.readUInt32LE(4), 1) // version + strictEqual(buffer.readUInt32LE(8), 3) // flags + const numSectors = buffer.readUInt32LE(12) + const grainSize = buffer.readUInt32LE(16) + strictEqual(grainSize, 1) // 1 grain should be 1 sector long + strictEqual(buffer.readUInt32LE(20), 4) // grain directory position in sectors + + const nbGrainDirectoryEntries = buffer.readUInt32LE(24) + strictEqual(nbGrainDirectoryEntries, Math.ceil(numSectors / 4096)) + this.#size = numSectors * 512 + + // a grain directory entry contains the address of a grain table + // a grain table can adresses at most 4096 grain of 512 Bytes of data + this.#grainDirectory = Buffer.alloc(nbGrainDirectoryEntries*4) + // load the grain directory + await this.#accessor.read(this.#descriptor, this.#grainDirectory, 2048) + + } + async close(): Promise { + this.#descriptor && this.#accessor.close(this.#descriptor) + } + isDifferencing(): boolean { + return true + } + getBlockIndexes(): Array { + if(this.#grainDirectory === undefined){ + throw new Error("Can't getBlockIndexes before calling init") + } + const indexes = [] + for(let i=0;i < this.#grainDirectory.length /4; i ++){ + if(this.hasBlock(i)){ + indexes.push(i) + } + } + return indexes + } + hasBlock(index: number): boolean { + if(this.#grainDirectory === undefined){ + throw new Error("Can't hasBlock before calling init") + } + // only check if a grain table exist for on of the sector of the block + // the great news is that a grain size has 4096 entries of 512B = 2M + // and a vhd block is also 2M + // so we only need to check if a grain table exists (it's not created without data) + + return this.#grainDirectory.readUInt32LE(index * 4) !== 0 + } + +} \ No newline at end of file diff --git a/@xen-orchestra/vmdk/src/producer/VmdkStreamOptimized.mts b/@xen-orchestra/vmdk/src/disks/VmdkSeSparse.mts similarity index 100% rename from @xen-orchestra/vmdk/src/producer/VmdkStreamOptimized.mts rename to @xen-orchestra/vmdk/src/disks/VmdkSeSparse.mts diff --git a/@xen-orchestra/vmdk/src/utils/descriptorParser.mjt b/@xen-orchestra/vmdk/src/disks/VmdkStreamOptimized.mts similarity index 100% rename from @xen-orchestra/vmdk/src/utils/descriptorParser.mjt rename to @xen-orchestra/vmdk/src/disks/VmdkStreamOptimized.mts diff --git a/@xen-orchestra/vmdk/src/producer/Vmdk.mts b/@xen-orchestra/vmdk/src/producer/Vmdk.mts deleted file mode 100644 index a1db740eecf..00000000000 --- a/@xen-orchestra/vmdk/src/producer/Vmdk.mts +++ /dev/null @@ -1,3 +0,0 @@ -/** - * read the descriptor and instantiate the right type of vmdk - */ \ No newline at end of file diff --git a/@xen-orchestra/vmdk/src/producer/VmdkCowd.mts b/@xen-orchestra/vmdk/src/producer/VmdkCowd.mts deleted file mode 100644 index bc911743ff0..00000000000 --- a/@xen-orchestra/vmdk/src/producer/VmdkCowd.mts +++ /dev/null @@ -1,35 +0,0 @@ -import { Disk, DiskBlock, RandomAccessDisk } from "@xen-orchestra/disk-transform"; - - - -export class VmdkCowd extends RandomAccessDisk{ - - readBlock(index: number): Promise { - throw new Error("Method not implemented."); - } - getVirtualSize(): number { - throw new Error("Method not implemented."); - } - getBlockSize(): number { - throw new Error("Method not implemented."); - } - init(): Promise { - throw new Error("Method not implemented."); - } - close(): Promise { - throw new Error("Method not implemented."); - } - isDifferencing(): boolean { - throw new Error("Method not implemented."); - } - openParent(): Promise { - throw new Error("Method not implemented."); - } - getBlockIndexes(): Array { - throw new Error("Method not implemented."); - } - hasBlock(index: number): boolean { - throw new Error("Method not implemented."); - } - -} \ No newline at end of file diff --git a/@xen-orchestra/vmdk/src/producer/VmdkFlat.mts b/@xen-orchestra/vmdk/src/producer/VmdkFlat.mts deleted file mode 100644 index d2bab6114a9..00000000000 --- a/@xen-orchestra/vmdk/src/producer/VmdkFlat.mts +++ /dev/null @@ -1,62 +0,0 @@ -import { Disk, type DiskBlock, RandomAccessDisk,type FileAccessor} from '@xen-orchestra/disk-transform' - - -export class VmdkFlat extends RandomAccessDisk{ - #datastore: FileAccessor - #path:string - #descriptor?:number - #blockSize:number - #size?:number - - constructor(datastore:FileAccessor, path:string, blockSize:number){ - super() - this.#datastore = datastore - this.#path= path - this.#blockSize = blockSize - - } - async readBlock(index: number): Promise { - if(this.#descriptor === undefined){ - throw new Error(`can't call readBlock of VmdkFlat before init`); - } - const data = Buffer.alloc(this.getBlockSize()) - await this.#datastore.read(this.#descriptor, data, index*this.getBlockSize()) - return { - index, - data - } - - } - getVirtualSize(): number { - if(this.#size === undefined){ - throw new Error(`can't call getVirtualSize of VmdkFlat before init`); - } - return this.#size - } - getBlockSize(): number { - return this.#blockSize - } - async init(): Promise { - this.#descriptor = await this.#datastore.open(this.#path) - this.#size = await this.#datastore.getSize(this.#path) - } - async close(): Promise { - if(this.#descriptor !== undefined){ - await this.#datastore.close(this.#descriptor) - } - } - isDifferencing(): boolean { - return false - } - openParent(): Promise { - throw new Error('Method not implemented.'); - } - getBlockIndexes(): Array { - throw new Error('Method not implemented.'); - } - hasBlock(index: number): boolean { - // flat disk are full - return true - } - -} \ No newline at end of file diff --git a/@xen-orchestra/vmware-explorer/DatastoreSoapEsxi.mjs b/@xen-orchestra/vmware-explorer/DatastoreSoapEsxi.mjs deleted file mode 100644 index 5dc07ff771e..00000000000 --- a/@xen-orchestra/vmware-explorer/DatastoreSoapEsxi.mjs +++ /dev/null @@ -1,22 +0,0 @@ -import { Datastore } from './_Datastore.mjs' - -export class DatastoreSoapEsxi extends Datastore { - #esxi - constructor(datastoreName, { esxi, ...otherOptions } = {}) { - super(datastoreName, otherOptions) - this.#esxi = esxi - } - async getStream(path, start, end) { - const res = await this.#esxi.download(this.datastoreName, path, start || end ? `${start}-${end}` : undefined) - return res.body - } - async getBuffer(path, start, end) { - return ( - await this.#esxi.download(this.datastoreName, path, start || end ? `${start}-${end - 1}` : undefined) - ).buffer() - } - async getSize(path) { - const res = await this.#esxi.download(this.datastoreName, path) - return Number(res.headers.get('content-length')) - } -} diff --git a/@xen-orchestra/vmware-explorer/DatastoreXoRemote.mjs b/@xen-orchestra/vmware-explorer/DatastoreXoRemote.mjs deleted file mode 100644 index 481cce81057..00000000000 --- a/@xen-orchestra/vmware-explorer/DatastoreXoRemote.mjs +++ /dev/null @@ -1,22 +0,0 @@ -import assert from 'node:assert' -import { Datastore } from './_Datastore.mjs' - -export const DatastoreXoRemote = class DatastoreXoRemote extends Datastore { - #handler - constructor(datastoreName, { handler, ...otherOptions }) { - super(datastoreName, otherOptions) - this.#handler = handler - } - async getStream(path, start, end) { - return this.#handler.createReadStream(path, { start, end }) - } - async getBuffer(path, start, end) { - const buffer = Buffer.alloc(end - start) - const { bytesRead } = await this.#handler.read(path, buffer, start) - assert.strictEqual(bytesRead, end - start) - return buffer - } - async getSize(path) { - return this.#handler.getSize(path) - } -} diff --git a/@xen-orchestra/vmware-explorer/VhdEsxiCowd.mjs b/@xen-orchestra/vmware-explorer/VhdEsxiCowd.mjs deleted file mode 100644 index 704be6f21b6..00000000000 --- a/@xen-orchestra/vmware-explorer/VhdEsxiCowd.mjs +++ /dev/null @@ -1,160 +0,0 @@ -import _computeGeometryForSize from 'vhd-lib/_computeGeometryForSize.js' -import { createFooter, createHeader } from 'vhd-lib/_createFooterHeader.js' -import { FOOTER_SIZE } from 'vhd-lib/_constants.js' -import { notEqual, strictEqual } from 'node:assert' -import { unpackFooter, unpackHeader } from 'vhd-lib/Vhd/_utils.js' -import { VhdAbstract } from 'vhd-lib' -import { openDatastore } from './_openDatastore.mjs' - -export default class VhdEsxiCowd extends VhdAbstract { - #datastore - #parentVhd - #path - #lookMissingBlockInParent - - #header - #footer - - #grainDirectory - - static async open(datastoreName, path, parentVhd, opts) { - const datastore = openDatastore(datastoreName, opts) - const vhd = new VhdEsxiCowd(datastore, path, parentVhd, opts) - await vhd.readHeaderAndFooter() - return { value: vhd, dispose: () => {} } - } - constructor(datastore, path, parentVhd, { lookMissingBlockInParent = true } = {}) { - super() - this.#path = path - this.#datastore = datastore - this.#parentVhd = parentVhd - this.#lookMissingBlockInParent = lookMissingBlockInParent - } - - get header() { - return this.#header - } - - get footer() { - return this.#footer - } - - containsBlock(blockId) { - notEqual(this.#grainDirectory, undefined, "bat must be loaded to use contain blocks'") - // only check if a grain table exist for on of the sector of the block - // the great news is that a grain size has 4096 entries of 512B = 2M - // and a vhd block is also 2M - // so we only need to check if a grain table exists (it's not created without data) - - // depending on the paramters we also look into the parent data - return ( - this.#grainDirectory.readUInt32LE(blockId * 4) !== 0 || - (this.#lookMissingBlockInParent && this.#parentVhd.containsBlock(blockId)) - ) - } - - async #read(start, length) { - return this.#datastore.getBuffer(this.#path, start, start + length) - } - - async readHeaderAndFooter() { - const buffer = await this.#read(0, 2048) - - strictEqual(buffer.slice(0, 4).toString('ascii'), 'COWD') - strictEqual(buffer.readUInt32LE(4), 1) // version - strictEqual(buffer.readUInt32LE(8), 3) // flags - const numSectors = buffer.readUInt32LE(12) - const grainSize = buffer.readUInt32LE(16) - strictEqual(grainSize, 1) // 1 grain should be 1 sector long - strictEqual(buffer.readUInt32LE(20), 4) // grain directory position in sectors - - const nbGrainDirectoryEntries = buffer.readUInt32LE(24) - strictEqual(nbGrainDirectoryEntries, Math.ceil(numSectors / 4096)) - const size = numSectors * 512 - // a grain directory entry contains the address of a grain table - // a grain table can adresses at most 4096 grain of 512 Bytes of data - this.#header = unpackHeader(createHeader(Math.ceil(size / (4096 * 512)))) - const geometry = _computeGeometryForSize(size) - this.#footer = unpackFooter( - createFooter(size, Math.floor(Date.now() / 1000), geometry, FOOTER_SIZE, this.#parentVhd.footer.diskType) - ) - } - - async readBlockAllocationTable() { - const nbBlocks = this.header.maxTableEntries - this.#grainDirectory = await this.#read(2048 /* header length */, nbBlocks * 4) - } - - // we're lucky : a grain address can address exacty a full block - async readBlock(blockId) { - notEqual(this.#grainDirectory, undefined, 'grainDirectory is not loaded') - const sectorOffset = this.#grainDirectory.readUInt32LE(blockId * 4) - - const buffer = (await this.#parentVhd.readBlock(blockId)).buffer - - if (sectorOffset === 0) { - strictEqual(this.#lookMissingBlockInParent, true, "shouldn't have empty block in a delta alone") - return { - id: blockId, - bitmap: buffer.slice(0, 512), - data: buffer.slice(512), - buffer, - } - } - const offset = sectorOffset * 512 - - const graintable = await this.#read(offset, 4096 * 4 /* grain table length */) - - strictEqual(graintable.length, 4096 * 4) - // we have no guaranty that data are ordered or contiguous - // let's construct ranges to limit the number of queries - let rangeStart, offsetStart, offsetEnd - - const changeRange = async (index, offset) => { - if (offsetStart !== undefined) { - // if there was already a branch - if (offset === offsetEnd) { - offsetEnd++ - return - } - const grains = await this.#read(offsetStart * 512, (offsetEnd - offsetStart) * 512) - grains.copy(buffer, (rangeStart + 1) /* block bitmap */ * 512) - } - // start a new range - if (offset) { - // we're at the beginning of a range present in the file - rangeStart = index - offsetStart = offset - offsetEnd = offset + 1 - } else { - // we're at the beginning of a range from the parent or empty - rangeStart = undefined - offsetStart = undefined - offsetEnd = undefined - } - } - - for (let i = 0; i < graintable.length / 4; i++) { - const grainOffset = graintable.readUInt32LE(i * 4) - if (grainOffset === 0) { - // the content from parent : it is already in buffer - await changeRange() - } else if (grainOffset === 1) { - await changeRange() - // this is a emptied grain, no data, don't look into parent - buffer.fill(0, (i + 1) /* block bitmap */ * 512) - } else if (grainOffset > 1) { - // non empty grain, read from file - await changeRange(i, grainOffset) - } - } - // close last range - await changeRange() - return { - id: blockId, - bitmap: buffer.slice(0, 512), - data: buffer.slice(512), - buffer, - } - } -} diff --git a/@xen-orchestra/vmware-explorer/VhdEsxiRaw.mjs b/@xen-orchestra/vmware-explorer/VhdEsxiRaw.mjs deleted file mode 100644 index d359387b62d..00000000000 --- a/@xen-orchestra/vmware-explorer/VhdEsxiRaw.mjs +++ /dev/null @@ -1,215 +0,0 @@ -import _computeGeometryForSize from 'vhd-lib/_computeGeometryForSize.js' -import { createFooter, createHeader } from 'vhd-lib/_createFooterHeader.js' -import { DISK_TYPES, FOOTER_SIZE } from 'vhd-lib/_constants.js' -import { readChunkStrict, skipStrict } from '@vates/read-chunk' -import { Task } from '@vates/task' -import { unpackFooter, unpackHeader } from 'vhd-lib/Vhd/_utils.js' -import { VhdAbstract } from 'vhd-lib' -import assert from 'node:assert' -import { openDatastore } from './_openDatastore.mjs' - -/* eslint-disable no-console */ - -// create a thin VHD from a raw disk -const VHD_BLOCK_LENGTH = 2 * 1024 * 1024 -export default class VhdEsxiRaw extends VhdAbstract { - #datastore - #path - #thin - - #bat - #header - #footer - - #streamOffset = 0 - #stream - #reading = false - - static async open(datastoreName, path, opts) { - const datastore = openDatastore(datastoreName, opts) - const vhd = new VhdEsxiRaw(datastore, path, opts) - await vhd.readHeaderAndFooter() - return { - value: vhd, - dispose: () => vhd.dispose(), - } - } - - get header() { - return this.#header - } - - get footer() { - return this.#footer - } - - constructor(datastore, path, { thin = true } = {}) { - super() - this.#path = path - this.#datastore = datastore - this.#thin = thin - } - - async readHeaderAndFooter() { - const length = await this.#datastore.getSize(this.#path) - - this.#header = unpackHeader(createHeader(length / VHD_BLOCK_LENGTH)) - const geometry = _computeGeometryForSize(length) - - this.#footer = unpackFooter( - // length can be smaller than disk capacity due to alignment to head/cylinder/sector - createFooter(length, Math.floor(Date.now() / 1000), geometry, FOOTER_SIZE, DISK_TYPES.DYNAMIC) - ) - } - - containsBlock(blockId) { - if (!this.#thin) { - return true - } - assert.notEqual(this.#bat, undefined, 'bat is not loaded') - return this.#bat.has(blockId) - } - - async #readChunk(start, length) { - if (this.#reading) { - throw new Error('reading must be done sequentially') - } - try { - this.#reading = true - if (this.#stream !== undefined) { - // stream is too far ahead or to far behind - if (this.#streamOffset > start || this.#streamOffset + VHD_BLOCK_LENGTH < start) { - this.#stream.destroy() - this.#stream = undefined - this.#streamOffset = 0 - } - } - // no stream - if (this.#stream === undefined) { - const end = this.footer.currentSize - 1 - this.#stream = await this.#datastore.getStream(this.#path, start, end) - this.#streamOffset = start - } - - // stream a little behind - if (this.#streamOffset < start) { - await skipStrict(this.#stream, start - this.#streamOffset) - this.#streamOffset = start - } - - // really read data - this.#streamOffset += length - const data = await readChunkStrict(this.#stream, length) - return data - } catch (error) { - error.start = start - error.length = length - error.streamLength = this.footer.currentSize - this.#stream?.destroy() - this.#stream = undefined - this.#streamOffset = 0 - throw error - } finally { - this.#reading = false - } - } - - async #readBlock(blockId) { - const start = blockId * VHD_BLOCK_LENGTH - let length = VHD_BLOCK_LENGTH - let partial = false - if (start + length > this.footer.currentSize) { - length = this.footer.currentSize - start - partial = true - } - - let data = await this.#readChunk(start, length) - - if (partial) { - data = Buffer.concat([data, Buffer.alloc(VHD_BLOCK_LENGTH - data.length)]) - } - const bitmap = Buffer.alloc(512, 255) - return { - id: blockId, - bitmap, - data, - buffer: Buffer.concat([bitmap, data]), - } - } - - async readBlock(blockId) { - let tries = 5 - let lastError - while (tries > 0) { - try { - const res = await this.#readBlock(blockId) - return res - } catch (error) { - lastError = error - lastError.blockId = blockId - console.warn('got error , will retry in 2seconds', lastError) - } - await new Promise(resolve => setTimeout(() => resolve(), 2000)) - tries-- - } - - throw lastError - } - - // this will read all the disk once to check which block contains data, it can take a long time to execute depending on the network speed - async readBlockAllocationTable() { - if (!this.#thin) { - // fast path : is we do not use thin mode, the BAT is full - return - } - const empty = Buffer.alloc(VHD_BLOCK_LENGTH, 0) - let pos = 0 - this.#bat = new Set() - let nextChunkLength = Math.min(VHD_BLOCK_LENGTH, this.footer.currentSize) - Task.set('total', this.footer.currentSize / VHD_BLOCK_LENGTH) - const progress = setInterval(() => { - Task.set('progress', Math.round((pos * 100) / this.footer.currentSize)) - console.log('reading blocks', pos / VHD_BLOCK_LENGTH, '/', this.footer.currentSize / VHD_BLOCK_LENGTH) - }, 30 * 1000) - - while (nextChunkLength > 0) { - try { - const chunk = await this.#readChunk(pos, nextChunkLength) - let isEmpty - if (nextChunkLength === VHD_BLOCK_LENGTH) { - isEmpty = empty.equals(chunk) - } else { - // last block can be smaller - isEmpty = Buffer.alloc(nextChunkLength, 0).equals(chunk) - } - if (!isEmpty) { - this.#bat.add(pos / VHD_BLOCK_LENGTH) - } - pos += VHD_BLOCK_LENGTH - nextChunkLength = Math.min(VHD_BLOCK_LENGTH, this.footer.currentSize - pos) - } catch (error) { - clearInterval(progress) - throw error - } - } - console.log( - 'BAT reading done, remaining ', - this.#bat.size, - '/', - Math.ceil(this.footer.currentSize / VHD_BLOCK_LENGTH) - ) - clearInterval(progress) - } - - rawContent() { - return this.#datastore.getStream(this.#path).then(stream => { - stream.length = this.footer.currentSize - return stream - }) - } - async dispose() { - await this.#stream?.destroy() - } -} - -/* eslint-enable no-console */ diff --git a/@xen-orchestra/vmware-explorer/VmfsFileAccessor.mjs b/@xen-orchestra/vmware-explorer/VmfsFileAccessor.mjs index ace24170739..7ab9e5b17de 100644 --- a/@xen-orchestra/vmware-explorer/VmfsFileAccessor.mjs +++ b/@xen-orchestra/vmware-explorer/VmfsFileAccessor.mjs @@ -1,123 +1,140 @@ -// @ts-nocheck - -import { readChunkStrict, skipStrict } from '@vates/read-chunk' -import assert from 'node:assert' - -class VmfsFileDescriptor { - #datastore - #path - #streamOffset = 0 - #stream - #reading = false - - #esxi - #datastoreName - - #size - constructor(path, esxi, datastoreName) { - this.#path = path - this.#datastoreName = datastoreName - this.#esxi = esxi - } - - async open() { - const res = await this.#esxi.download(this.datastoreName, this.#path) - // @todo close the connection - this.#size = Number(res.headers.get('content-length')) - } - - async #readChunk(start, length) { - if (this.#reading) { - throw new Error('reading must be done sequentially') +import { readChunkStrict, skipStrict } from '@vates/read-chunk'; + +/** + * Implementation of FileAccessor for interacting with an ESXi datastore. + * @implements {FileAccessor} + */ +export class EsxiAccessor { + /** + * @param {Esxi} esxi - An instance of the Esxi class. + */ + constructor(esxi) { + this.#esxi = esxi; + this.#descriptors = new Map(); // Map to associate descriptors with paths/stream/size + this.#nextDescriptor = 0; // Counter to generate unique descriptors + this.#activeReads = new Map(); // Map to manage ongoing reads } - try { - this.#reading = true - if (this.#stream !== undefined) { - // stream is too far ahead or to far behind - if (this.#streamOffset > start || this.#streamOffset + 1024 * 1024 < start) { - this.#stream.destroy() - this.#stream = undefined - this.#streamOffset = 0 - } - } - // no stream - if (this.#stream === undefined) { - const end = this.#size - 1 - this.#stream = await this.#datastore.getStream(this.#path, start, end) - this.#streamOffset = start - } - - // stream a little behind - if (this.#streamOffset < start) { - await skipStrict(this.#stream, start - this.#streamOffset) - this.#streamOffset = start - } - // really read data - this.#streamOffset += length - const data = await readChunkStrict(this.#stream, length) - return data - } catch (error) { - error.start = start - error.length = length - error.streamLength = this.#size - this.#stream?.destroy() - this.#stream = undefined - this.#streamOffset = 0 - throw error - } finally { - this.#reading = false + // Private members + #esxi; + #descriptors; + #nextDescriptor; + #activeReads; + + /** + * Opens a stream for a given path and returns a descriptor. + * @param {string} path - The file path. + * @param {number?} from - Starting position (optional). + * @param {number?} to - Ending position (optional). + * @param {number?} useDescriptor - reuse a descriptor number + * @returns {Promise} - A file descriptor. + */ + async open(path, from, to, useDescriptor) { + const descriptor = useDescriptor ? useDescriptor : this.#nextDescriptor++; + const res = await this.#esxi.download(path, from || to ? `${from}-${to}`: undefined); + const stream = res.body + const size = Number(res.headers.get('content-length')) + this.#descriptors.set(descriptor, { stream, size,path, from, to, currentPosition:from ??0}); + return descriptor; } - } - - async read(buffer, position) { - const chunk = await this.#readChunk(position, buffer.length) - chunk.copy(buffer) - return { buffer, bytesRead: buffer.length } - } - - close() { - return this.#stream.destroy() - } -} -export class VmfsFileAccessor /* implements file accessor */ { - #descriptors = new Map() - #esxi - #datastoreName - - #nextDescriptorIndex = 0 + /** + * Reads data from a previously opened stream. + * @param {number} descriptor - The file descriptor. + * @param {Buffer} buffer - The pre allocated buffer that will contains the data + * @param {number?} from - Starting position. + * @returns {Promise<{buffer:Buffer, bytes_read:number}>} - The read data. + */ + async read(descriptor, buffer, from) { + if (!this.#descriptors.has(descriptor)) { + throw new Error('Descriptor not found'); + } + const to = from + buffer.length + const descriptor = this.#descriptors.get(descriptor); + const {path, stream , size, to:opennedTo,currentPosition} = descriptor + if(to > size || (opennedTo && opennedTo > to)){ + throw new Error('Try to read after the end of the file'); + } - get datastoreName() { - return this.#datastoreName - } + // Check if a read is already in progress for this descriptor + if (this.#activeReads.has(descriptor)) { + await this.#activeReads.get(descriptor); + } - constructor({ datastoreName, esxi, ...otherOptions } = {}) { - this.#datastoreName = datastoreName - this.#esxi = esxi - } - async open(path) { - const descriptor = new VmfsFileDescriptor(this.#esxi, this.#datastoreName, path) - this.#descriptors.set(this.#nextDescriptorIndex, descriptor) - this.#nextDescriptorIndex++ - } + // Handle parallel reads + const readPromise = (async () => { + const maxSkipSize = 2 * 1024 * 1024; // 2MB + + // If the distance to skip is less than 2MB, skip + if (currentPosition < from && from - currentPosition <= maxSkipSize) { + await skipStrict(stream, from - currentPosition); + } else { + // Otherwise, close and reopen the stream + this.close(descriptor); + await this.open(path, from, to, descriptor); + } + + // Read the requested data + const sizeToRead = to - from; + const buffer = await readChunkStrict(stream, sizeToRead); + descriptor.currentPosition = to + return buffer; + })(); + + // Store the read promise to avoid parallel reads + this.#activeReads.set(descriptor, readPromise); + const result = await readPromise; + this.#activeReads.delete(descriptor); + result.copy(buffer) + return {buffer, bytes_read: buffer.length}; + } - async close(filedecriptor) { - assert.strictEqual(typeof file, 'Number') - const descriptor = this.#descriptors.get(filedecriptor) - return descriptor.close() - } + /** + * Closes a previously opened stream. + * @param {number} descriptor - The file descriptor. + */ + close(descriptor) { + if (this.#descriptors.has(descriptor)) { + const {stream} = this.#descriptors.get(descriptor); + stream.destroy(); // Close the stream + this.#descriptors.delete(descriptor); + } + } - async read(file, buffer, position) { - assert.strictEqual(typeof file, 'Number') - const descriptor = this.#descriptors.get(file) - return descriptor.read(buffer, position) - } - async getSize(path) { - const res = await this.#esxi.download(this.datastoreName, path) - // @todo close the connection - return Number(res.headers.get('content-length')) - } + /** + * Reads the entire content of a file. + * @param {string} path - The file path. + * @returns {Promise} - The file content. + */ + async readFile(path) { + const descriptor = await this.open(path); + const {stream } = this.#descriptors.get(descriptor); + const chunks = []; + + for await (const chunk of stream) { + chunks.push(chunk); + } + this.close(descriptor); + return Buffer.concat(chunks); + } - // write function are not implemented on this backend -} + async getSize(path){ + const descriptor = await this.open(path) + const {size} = this.#descriptors.get(descriptor) + this.close(descriptor) + return size + } + mktree(path){ + throw new Error('not implemented') + } + rmtree(path){ + throw new Error('not implemented') + } + unlink(path){ + throw new Error('not implemented') + } + outputStream(stream){ + throw new Error('not implemented') + } + +} \ No newline at end of file diff --git a/@xen-orchestra/vmware-explorer/_Datastore.mjs b/@xen-orchestra/vmware-explorer/_Datastore.mjs deleted file mode 100644 index 998f2ea3b63..00000000000 --- a/@xen-orchestra/vmware-explorer/_Datastore.mjs +++ /dev/null @@ -1,17 +0,0 @@ -export class Datastore { - #datastoreName - - get datastoreName() { - return this.#datastoreName - } - constructor(dataStoreName) { - this.#datastoreName = dataStoreName - } - async getStream(path, start, end) { - throw new Error('Not implemented') - } - async getBuffer(path, start, end) { - throw new Error('Not implemented') - } - async getSize(path) {} -} diff --git a/@xen-orchestra/vmware-explorer/_openDatastore.mjs b/@xen-orchestra/vmware-explorer/_openDatastore.mjs deleted file mode 100644 index d8c98f080c0..00000000000 --- a/@xen-orchestra/vmware-explorer/_openDatastore.mjs +++ /dev/null @@ -1,21 +0,0 @@ -import { DatastoreSoapEsxi } from './DatastoreSoapEsxi.mjs' -import { DatastoreXoRemote } from './DatastoreXoRemote.mjs' -import { createLogger } from '@xen-orchestra/log' - -const { info } = createLogger('xo:vmware-explorer:openDatastore') - -export function openDatastore(dataStoreName, { esxi, dataStoreToHandlers = {}, ...otherOptions }) { - const handler = dataStoreToHandlers[dataStoreName] - if (handler === undefined) { - info(`use SOAP API to read datastore ${dataStoreName}`, { - dataStoreName, - dataStoreToHandlers: Object.keys(dataStoreToHandlers), - }) - return new DatastoreSoapEsxi(dataStoreName, { esxi, ...otherOptions }) - } - info(`use XO remote to read datastore ${dataStoreName}`, { - dataStoreName, - dataStoreToHandlers: Object.keys(dataStoreToHandlers), - }) - return new DatastoreXoRemote(dataStoreName, { handler, ...otherOptions }) -} diff --git a/@xen-orchestra/vmware-explorer/openDeltaVmdkAsVhd.mjs b/@xen-orchestra/vmware-explorer/openDeltaVmdkAsVhd.mjs deleted file mode 100644 index e6b584157c9..00000000000 --- a/@xen-orchestra/vmware-explorer/openDeltaVmdkAsVhd.mjs +++ /dev/null @@ -1,15 +0,0 @@ -import VHDEsxiSeSparse from './VhdEsxiSeSparse.mjs' -import VhdEsxiCowd from './VhdEsxiCowd.mjs' - -export default async function openDeltaVmdkasVhd(datastoreName, path, parentVhd, opts) { - let disposableVhd - if (path.endsWith('-sesparse.vmdk')) { - disposableVhd = await VHDEsxiSeSparse.open(datastoreName, path, parentVhd, opts) - } else if (path.endsWith('-delta.vmdk')) { - disposableVhd = await VhdEsxiCowd.open(datastoreName, path, parentVhd, opts) - } else { - throw new Error(`Vmdk ${path} does not seems to be a delta vmdk.`) - } - await disposableVhd.value.readBlockAllocationTable() - return disposableVhd -}