From dc391b7e9b60ee1048fc38218d6115beeaeef589 Mon Sep 17 00:00:00 2001 From: Florent BEAUCHAMP Date: Wed, 12 Feb 2025 16:32:10 +0000 Subject: [PATCH] more refacto, mirror backup works, implement differential restore --- @xen-orchestra/backups/ImportVmBackup.mjs | 63 ++++++++-------- @xen-orchestra/backups/RemoteAdapter.mjs | 10 ++- @xen-orchestra/backups/_incrementalVm.mjs | 4 +- .../_runners/_vmRunners/IncrementalRemote.mjs | 14 ++-- .../_writers/IncrementalRemoteWriter.mjs | 35 ++++----- @xen-orchestra/disk-transform/package.json | 19 ++++- .../disk-transform/src/BlockSizeChanger.mts | 16 +++-- .../disk-transform/src/DiskChain.mts | 13 ++-- .../disk-transform/src/NegativeDisk.mts | 61 ++++++++++++++++ .../disk-transform/src/PortableDisk.mts | 30 +++----- .../disk-transform/src/consumer/BaseVhd.mts | 19 ++--- .../src/consumer/VhdDirectory.mts | 6 ++ .../disk-transform/src/consumer/VhdStream.mts | 2 +- .../disk-transform/src/producer/Raw.mts | 71 +++++++++++++++++++ .../src/producer/RemoteChain.mts | 63 ++++++++++++++++ .../disk-transform/src/producer/RemoteVhd.mts | 16 ++++- .../disk-transform/src/producer/Xapi.mts | 31 ++++---- .../src/producer/XapiVhdCbt.mts | 38 +++++----- .../src/producer/XapiVhdStreamNbd.mts | 2 +- .../src/producer/XapiVhdStreamSource.mts | 13 +++- @xen-orchestra/disk-transform/tsconfig.json | 2 +- @xen-orchestra/log/index.js | 6 +- packages/vhd-lib/Vhd/VhdAbstract.js | 8 --- packages/vhd-lib/index.js | 1 - packages/vhd-lib/isVhdDifferencingDisk.js | 11 --- 25 files changed, 391 insertions(+), 163 deletions(-) create mode 100644 @xen-orchestra/disk-transform/src/NegativeDisk.mts create mode 100644 @xen-orchestra/disk-transform/src/producer/Raw.mts create mode 100644 @xen-orchestra/disk-transform/src/producer/RemoteChain.mts delete mode 100644 packages/vhd-lib/isVhdDifferencingDisk.js diff --git a/@xen-orchestra/backups/ImportVmBackup.mjs b/@xen-orchestra/backups/ImportVmBackup.mjs index d79d3bf7cdd..d5e009df3c4 100644 --- a/@xen-orchestra/backups/ImportVmBackup.mjs +++ b/@xen-orchestra/backups/ImportVmBackup.mjs @@ -4,12 +4,13 @@ import { formatFilenameDate } from './_filenameDate.mjs' import { importIncrementalVm } from './_incrementalVm.mjs' import { Task } from './Task.mjs' import { watchStreamSize } from './_watchStreamSize.mjs' -import { VhdNegative, VhdSynthetic } from 'vhd-lib' import { decorateClass } from '@vates/decorate-with' import { createLogger } from '@xen-orchestra/log' import { dirname, join } from 'node:path' import pickBy from 'lodash/pickBy.js' -import { defer } from 'golike-defer' +import { defer } from 'golike-defer' +import { RemoteChain} from '../disk-transform/dist/producer/RemoteChain.mjs' +import { NegativeDisk} from '../disk-transform/dist/NegativeDisk.mjs' const { debug, info, warn } = createLogger('xo:backups:importVmBackup') async function resolveUuid(xapi, cache, uuid, type) { @@ -59,7 +60,7 @@ export class ImportVmBackup { const metadata = this._metadata const { mapVdisSrs } = this._importIncrementalVmSettings const { vbds, vhds, vifs, vm, vmSnapshot, vtpms } = metadata - const streams = {} + const disks = {} const metdataDir = dirname(metadata._filename) const vdis = ignoredVdis === undefined ? metadata.vdis : pickBy(metadata.vdis, vdi => !ignoredVdis.has(vdi.uuid)) @@ -110,21 +111,24 @@ export class ImportVmBackup { } } - let stream + let disk const backupWithSnapshotPath = join(metdataDir, backupCandidate ?? '') if (vhdPath === backupWithSnapshotPath) { // all the data are already on the host debug('direct reuse of a snapshot') - stream = null + disk = null vdis[vdiRef].baseVdi = snapshotCandidate // go next disk , we won't use this stream continue } - let disposableDescendants - - const disposableSynthetic = await VhdSynthetic.fromVhdChain(this._adapter._handler, vhdPath) + const parent = new RemoteChain({ + handler: this._adapter._handler, + path: vhdPath + }) + await parent.init() + // this will also clean if another disk of this VM backup fails // if user really only need to restore non failing disks he can retry with ignoredVdis let disposed = false @@ -132,8 +136,7 @@ export class ImportVmBackup { if (!disposed) { disposed = true try { - await disposableDescendants?.dispose() - await disposableSynthetic?.dispose() + await parent?.close() } catch (error) { warn('openVhd: failed to dispose VHDs', { error }) } @@ -141,11 +144,11 @@ export class ImportVmBackup { } $defer.onFailure(() => disposeOnce()) - const parentVhd = disposableSynthetic.value - await parentVhd.readBlockAllocationTable() - debug('got vhd synthetic of parents', parentVhd.length) + + debug('got vhd synthetic of parents', parent) if (snapshotCandidate !== undefined) { + let descendant,negativeDisk try { debug('will try to use differential restore', { backupWithSnapshotPath, @@ -153,39 +156,39 @@ export class ImportVmBackup { vdiRef, }) - disposableDescendants = await VhdSynthetic.fromVhdChain(this._adapter._handler, backupWithSnapshotPath, { + descendant = new RemoteChain({ + handler: this._adapter._handler, + path: backupWithSnapshotPath, until: vhdPath, }) - const descendantsVhd = disposableDescendants.value - await descendantsVhd.readBlockAllocationTable() + await descendant.init() + debug('got vhd synthetic of descendants') - const negativeVhd = new VhdNegative(parentVhd, descendantsVhd) + negativeDisk = new NegativeDisk(parent, descendant) debug('got vhd negative') // update the stream with the negative vhd stream - stream = await negativeVhd.stream() + disk = negativeDisk vdis[vdiRef].baseVdi = snapshotCandidate + } catch (error) { // can be a broken VHD chain, a vhd chain with a key backup, .... // not an irrecuperable error, don't dispose parentVhd, and fallback to full restore warn(`can't use differential restore`, { error }) - disposableDescendants?.dispose() + descendant?.close() + negativeDisk?.close() } } // didn't make a negative stream : fallback to classic stream - if (stream === undefined) { + if (disk === undefined) { debug('use legacy restore') - stream = await parentVhd.stream() + disk = parent } - - stream.on('end', disposeOnce) - stream.on('close', disposeOnce) - stream.on('error', disposeOnce) - info('everything is ready, will transfer', stream.length) - streams[`${vdiRef}.vhd`] = stream + info('everything is ready, will transfer', disk) + disks[`${vdiRef}.vhd`] = disk } return { - streams, + disks, vbds, vdis, version: '1.0.0', @@ -205,8 +208,7 @@ export class ImportVmBackup { ) let backup if (useDifferentialRestore) { - throw new Error('differentiela resotre is not supported') - // backup = await this._reuseNearestSnapshot(ignoredVdis) + backup = await this._reuseNearestSnapshot(ignoredVdis) } else { backup = await this._adapter.readIncrementalVmBackup(this._metadata, ignoredVdis) } @@ -242,7 +244,6 @@ export class ImportVmBackup { assert.strictEqual(metadata.mode, 'delta') backup = await this.#decorateIncrementalVmMetadata() - // @todo put back the stream size } return Task.run( diff --git a/@xen-orchestra/backups/RemoteAdapter.mjs b/@xen-orchestra/backups/RemoteAdapter.mjs index b7682e1a188..143ac55d6a6 100644 --- a/@xen-orchestra/backups/RemoteAdapter.mjs +++ b/@xen-orchestra/backups/RemoteAdapter.mjs @@ -33,7 +33,8 @@ import { watchStreamSize } from './_watchStreamSize.mjs' import { VhdDirectoryRemote } from '../../@xen-orchestra/disk-transform/dist/consumer/VhdDirectory.mjs' import { VhdStream } from '../../@xen-orchestra/disk-transform/dist/consumer/VhdStream.mjs' -import { openRemoteDisk,openRemoteChain } from '../../@xen-orchestra/disk-transform/dist/factory/Remote.mjs' +import {RemoteVhd} from '../../@xen-orchestra/disk-transform/dist/producer/RemoteVhd.mjs' +import {RemoteChain} from '../../@xen-orchestra/disk-transform/dist/producer/RemoteChain.mjs' export const DIR_XO_CONFIG_BACKUPS = 'xo-config-backups' @@ -684,7 +685,7 @@ export class RemoteAdapter { return path } - async writeVhd(path, disk, { checksum = true, validator = noop, writeBlockConcurrency } = {}) { + async writeVhd(path, disk, { validator = noop, writeBlockConcurrency } = {}) { const handler = this._handler if (this.useVhdDirectory()) { @@ -725,7 +726,10 @@ export class RemoteAdapter { // open the hierarchy of ancestors until we find a full one async _createVhdDisk(handler, path, { useChain }) { - const disk = useChain ? await openRemoteDisk(handler, path) : await openRemoteChain(handler, path) + + + const disk = useChain ? new RemoteChain({handler, path}) : new RemoteVhd({handler, path}) + await disk.init() return disk } diff --git a/@xen-orchestra/backups/_incrementalVm.mjs b/@xen-orchestra/backups/_incrementalVm.mjs index ba0cda89340..7e44c287510 100644 --- a/@xen-orchestra/backups/_incrementalVm.mjs +++ b/@xen-orchestra/backups/_incrementalVm.mjs @@ -241,8 +241,8 @@ export const importIncrementalVm = defer(async function importIncrementalVm( await xapi.setField('VDI', vdi.$ref, 'name_label', `[Importing] ${vdiRecords[id].name_label}`) const streamBuilder = new VhdStream(disk) - - await vdi.$importContent(streamBuilder.toStream(), { cancelToken, format: 'vhd' }) + const stream = streamBuilder.toStream() + await vdi.$importContent(stream, { cancelToken, format: 'vhd' }) await xapi.setField('VDI', vdi.$ref, 'name_label', vdiRecords[id].name_label) } }), diff --git a/@xen-orchestra/backups/_runners/_vmRunners/IncrementalRemote.mjs b/@xen-orchestra/backups/_runners/_vmRunners/IncrementalRemote.mjs index 5214dd32c30..00a94b96736 100644 --- a/@xen-orchestra/backups/_runners/_vmRunners/IncrementalRemote.mjs +++ b/@xen-orchestra/backups/_runners/_vmRunners/IncrementalRemote.mjs @@ -3,11 +3,8 @@ import { createLogger } from '@xen-orchestra/log' import { asyncEach } from '@vates/async-each' import assert from 'node:assert' import * as UUID from 'uuid' -import isVhdDifferencingDisk from 'vhd-lib/isVhdDifferencingDisk.js' -import mapValues from 'lodash/mapValues.js' import { AbstractRemote } from './_AbstractRemote.mjs' -import { forkDeltaExport } from './_forkDeltaExport.mjs' import { IncrementalRemoteWriter } from '../_writers/IncrementalRemoteWriter.mjs' import { Disposable } from 'promise-toolbox' import { openVhd } from 'vhd-lib' @@ -71,9 +68,9 @@ class IncrementalRemoteVmBackupRunner extends AbstractRemote { // recompute if disks are differencing or not const isVhdDifferencing = {} - await asyncEach(Object.entries(incrementalExport.streams), async ([key, stream]) => { - isVhdDifferencing[key] = await isVhdDifferencingDisk(stream) - }) + Object.entries(incrementalExport.disks).forEach (([key, disk]) => { + isVhdDifferencing[key] = disk.isDifferencing() + }) const hasDifferencingDisk = Object.values(isVhdDifferencing).includes(true) if (metadata.isBase === hasDifferencingDisk) { warn(`Metadata isBase and real disk value are different`, { @@ -86,12 +83,11 @@ class IncrementalRemoteVmBackupRunner extends AbstractRemote { metadata.isVhdDifferencing = isVhdDifferencing await this._selectBaseVm(metadata) await this._callWriters(writer => writer.prepare({ isBase: metadata.isBase }), 'writer.prepare()') - - incrementalExport.streams = mapValues(incrementalExport.streams, this._throttleStream) + await this._callWriters( writer => writer.transfer({ - deltaExport: forkDeltaExport(incrementalExport), + deltaExport: incrementalExport, isVhdDifferencing, timestamp: metadata.timestamp, vm: metadata.vm, diff --git a/@xen-orchestra/backups/_runners/_writers/IncrementalRemoteWriter.mjs b/@xen-orchestra/backups/_runners/_writers/IncrementalRemoteWriter.mjs index a39a7fb5f0f..6897b163116 100644 --- a/@xen-orchestra/backups/_runners/_writers/IncrementalRemoteWriter.mjs +++ b/@xen-orchestra/backups/_runners/_writers/IncrementalRemoteWriter.mjs @@ -224,23 +224,24 @@ export class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrement } let size = 0 await Task.run({ name: 'transfer' }, async () => { - await asyncEach( - Object.keys(deltaExport.vdis), - async id => { - const path = `${this._vmBackupDir}/${vhds[id]}` - await adapter.writeVhd(path, deltaExport.disks[`${id}.vhd`], { - // no checksum for VHDs, because they will be invalidated by - // merges and chainings - checksum: false, - validator: tmpPath => checkVhd(handler, tmpPath), - writeBlockConcurrency: this._config.writeBlockConcurrency, - }) - size = size + deltaExport.disks[`${id}.vhd`].generatedDiskBlocks * 2 * 1024 * 1024 - }, - { - concurrency: settings.diskPerVmConcurrency, - } - ) + await asyncEach( + Object.keys(deltaExport.vdis), + async id => { + const path = `${this._vmBackupDir}/${vhds[id]}` + await adapter.writeVhd(path, deltaExport.disks[`${id}.vhd`], { + // no checksum for VHDs, because they will be invalidated by + // merges and chainings + checksum: false, + validator: tmpPath => checkVhd(handler, tmpPath), + writeBlockConcurrency: this._config.writeBlockConcurrency, + }) + size = size + deltaExport.disks[`${id}.vhd`].generatedDiskBlocks * 2 * 1024 * 1024 + }, + { + concurrency: settings.diskPerVmConcurrency, + } + ) + return { size } }) diff --git a/@xen-orchestra/disk-transform/package.json b/@xen-orchestra/disk-transform/package.json index 7827bf7c742..f8a33540ae8 100644 --- a/@xen-orchestra/disk-transform/package.json +++ b/@xen-orchestra/disk-transform/package.json @@ -14,6 +14,23 @@ "scripts": { "build": "tsc", "dev": "tsc --watch", - "start": "node ." + "test": "node --test" + }, + "homepage": "https://github.com/vatesfr/xen-orchestra/tree/master/@xen-orchestra/disk-transform", + "bugs": "https://github.com/vatesfr/xen-orchestra/issues", + "repository": { + "directory": "@xen-orchestra/disk-transform", + "type": "git", + "url": "https://github.com/vatesfr/xen-orchestra.git" + }, + "author": { + "name": "Vates SAS", + "url": "https://vates.fr" + }, + "engines": { + "node": ">=20.17" + }, + "dependencies": { + "vhd-lib": "^4.11.2" } } diff --git a/@xen-orchestra/disk-transform/src/BlockSizeChanger.mts b/@xen-orchestra/disk-transform/src/BlockSizeChanger.mts index d9d4c97a07e..d2cfcd5e943 100644 --- a/@xen-orchestra/disk-transform/src/BlockSizeChanger.mts +++ b/@xen-orchestra/disk-transform/src/BlockSizeChanger.mts @@ -8,11 +8,20 @@ import { RandomAccessDisk, type DiskBlock, type PortableDisk } from './PortableD * qcow2 use 64KB clusters, and can use subclusters */ export class BlockSizeChanger extends RandomAccessDisk { + #source: RandomAccessDisk + #blockSize constructor(source: RandomAccessDisk, blockSize: number) { super() this.#source = source - this.blockSize = blockSize + this.#blockSize = this.#blockSize = blockSize + } + + getVirtualSize(): number { + return this.#source.getVirtualSize() + } + getBlockSize(): number { + return this.#blockSize } readBlock(index: number): Promise { throw new Error('Method not implemented.') @@ -20,8 +29,7 @@ export class BlockSizeChanger extends RandomAccessDisk { buildDiskBlockGenerator(): AsyncGenerator { throw new Error('Method not implemented.') } - generatedDiskBlocks: number - yieldedDiskBlocks: number + init(): Promise { return this.#source.init() } @@ -30,7 +38,7 @@ export class BlockSizeChanger extends RandomAccessDisk { } async openParent(): Promise { const parent = (await this.#source.openParent()) as RandomAccessDisk - return new BlockSizeChanger(parent, this.blockSize) + return new BlockSizeChanger(parent, this.#blockSize) } isDifferencing(): boolean { return this.#source.isDifferencing() diff --git a/@xen-orchestra/disk-transform/src/DiskChain.mts b/@xen-orchestra/disk-transform/src/DiskChain.mts index e90d7a05947..f6378a26ef7 100644 --- a/@xen-orchestra/disk-transform/src/DiskChain.mts +++ b/@xen-orchestra/disk-transform/src/DiskChain.mts @@ -1,18 +1,19 @@ import { PortableDisk, RandomAccessDisk, type DiskBlock } from './PortableDisk.mjs' export class DiskChain extends RandomAccessDisk { + #disks: Array = [] - public get virtualSize(): number { - return this.#disks[this.#disks.length - 1].virtualSize - } - public get blockSize(): number { - return this.#disks[0].blockSize - } constructor({disks}:{disks:Array}) { super() this.#disks = disks } + getVirtualSize(): number { + return this.#disks[this.#disks.length - 1].getVirtualSize() + } + getBlockSize(): number { + return this.#disks[0].getBlockSize() + } hasBlock(index: number): boolean { for (let i = this.#disks.length - 1; i >= 0; i--) { if (this.#disks[i].hasBlock(index)) { diff --git a/@xen-orchestra/disk-transform/src/NegativeDisk.mts b/@xen-orchestra/disk-transform/src/NegativeDisk.mts new file mode 100644 index 00000000000..54b968b840e --- /dev/null +++ b/@xen-orchestra/disk-transform/src/NegativeDisk.mts @@ -0,0 +1,61 @@ +import { DiskBlock, PortableDisk, RandomAccessDisk } from "./PortableDisk.mjs"; +import assert from "node:assert" + +export class NegativeDisk extends RandomAccessDisk{ + + #parent:RandomAccessDisk + #child:RandomAccessDisk + + + constructor(parent:RandomAccessDisk, child:RandomAccessDisk){ + super() + this.#parent = parent + this.#child = child + // @todo : should also check chain + assert.strictEqual(parent.getBlockSize(), child.getBlockSize(), 'GEOMETRY_CHANGED') + assert.strictEqual(parent.getVirtualSize(), child.getVirtualSize(), 'GEOMETRY_CHANGED') + } + getVirtualSize(): number { + return this.#child.getVirtualSize() + } + getBlockSize(): number { + return this.#child.getBlockSize() + } + readBlock(index: number): Promise { + if (this.#parent.hasBlock(index)) { + return this.#parent.readBlock(index) + } + // anew block => return an empty one + return Promise.resolve({ + index, + data: Buffer.alloc(this.getBlockSize(), 0) + }) + + } + async init(): Promise { + await Promise.all([ + this.#parent.init(), + this.#child.init() + ]) + + } + async close(): Promise { + await Promise.all([ + this.#parent.close(), + this.#child.close() + ]) + } + isDifferencing(): boolean { + return true + } + openParent(): Promise { + throw new Error("Method not implemented."); + } + getBlockIndexes(): Array { + return this.#child.getBlockIndexes() + } + hasBlock(index: number): boolean { + return this.#child.hasBlock(index) + } + +} \ No newline at end of file diff --git a/@xen-orchestra/disk-transform/src/PortableDisk.mts b/@xen-orchestra/disk-transform/src/PortableDisk.mts index f779bbd5629..0c780b68f5e 100644 --- a/@xen-orchestra/disk-transform/src/PortableDisk.mts +++ b/@xen-orchestra/disk-transform/src/PortableDisk.mts @@ -7,27 +7,11 @@ export type DiskBlock = { export type BytesLength = number export abstract class PortableDisk { - // use accessor to allow overriding - #virtualSize: number - public get virtualSize(): number { - return this.#virtualSize - } - public set virtualSize(value: number) { - this.#virtualSize = value - } - - // use accessor to allow overriding - #blockSize: number - public get blockSize(): number { - return this.#blockSize - } - public set blockSize(value: number) { - this.#blockSize = value - } - generatedDiskBlocks = 0 yieldedDiskBlocks = 0 + abstract getVirtualSize():number + abstract getBlockSize():number abstract init(): Promise abstract close(): Promise @@ -37,13 +21,14 @@ export abstract class PortableDisk { /** * return the block without any order nor stability guarantee - */ + */ abstract getBlockIndexes(): Array abstract hasBlock(index: number): boolean abstract buildDiskBlockGenerator(): Promise> | AsyncGenerator async *diskBlocks(): AsyncGenerator { - const generator = await this.buildDiskBlockGenerator() + let generator:AsyncGenerator try { + generator = await this.buildDiskBlockGenerator() for await (const block of generator) { this.generatedDiskBlocks++ yield block @@ -53,8 +38,11 @@ export abstract class PortableDisk { console.error({ error, generatedDIskBlocks: this.generatedDiskBlocks, yieldedDiskBlocks: this.yieldedDiskBlocks }) throw error } finally { - this.close() + console.log('finally', this) + await this.close() + console.log('closed', this) } + console.log('DONE') } check() { diff --git a/@xen-orchestra/disk-transform/src/consumer/BaseVhd.mts b/@xen-orchestra/disk-transform/src/consumer/BaseVhd.mts index 6f6aeb46fb4..54400924032 100644 --- a/@xen-orchestra/disk-transform/src/consumer/BaseVhd.mts +++ b/@xen-orchestra/disk-transform/src/consumer/BaseVhd.mts @@ -1,5 +1,5 @@ import computeGeometryForSize from 'vhd-lib/_computeGeometryForSize.js' -import type { PortableDisk } from '../PortableDisk.mts' +import { PortableDisk } from "../PortableDisk.mjs" import { DEFAULT_BLOCK_SIZE, DISK_TYPES, FOOTER_SIZE, HEADER_SIZE, SECTOR_SIZE } from 'vhd-lib/_constants.js' import { createFooter, createHeader } from 'vhd-lib/_createFooterHeader.js' @@ -10,20 +10,21 @@ export abstract class BaseVhd { get source() { return this.#source } - constructor(source: PortableDisk) { + constructor(source: PortableDisk) { this.#source = source } - computeVhdHeader() { - const source = this.#source - const size = source.virtualSize + + computeVhdHeader():Buffer{ + const source = this.source + const size = source.getVirtualSize() const nbTotalBlocks = Math.ceil(size / DEFAULT_BLOCK_SIZE) const header = createHeader(nbTotalBlocks) return header } computeVhdFooter(): Buffer { - const source = this.#source - const size = source.virtualSize + const source = this.source + const size = source.getVirtualSize() const geometry = computeGeometryForSize(size) const diskType = source.isDifferencing() ? DISK_TYPES.DIFFERENCING : DISK_TYPES.DYNAMIC const footer = createFooter(size, Math.floor(Date.now() / 1000), geometry, FOOTER_SIZE, diskType) @@ -31,8 +32,8 @@ export abstract class BaseVhd { } computeVhdBatAndFileSize(): { fileSize: number; bat: Buffer } { - const source = this.#source - const size = source.virtualSize + const source = this.source + const size = source.getVirtualSize() const nbTotalBlocks = Math.ceil(size / DEFAULT_BLOCK_SIZE) // align bat size to a sector const batSize = Math.ceil((nbTotalBlocks * 4) / SECTOR_SIZE) * SECTOR_SIZE diff --git a/@xen-orchestra/disk-transform/src/consumer/VhdDirectory.mts b/@xen-orchestra/disk-transform/src/consumer/VhdDirectory.mts index b3a4d3e689c..9a79b601447 100644 --- a/@xen-orchestra/disk-transform/src/consumer/VhdDirectory.mts +++ b/@xen-orchestra/disk-transform/src/consumer/VhdDirectory.mts @@ -22,6 +22,7 @@ export class VhdDirectoryRemote extends BaseVhd { this.#target = target } async write() { + console.log('WILL WRITE VHDD ') const { handler, path, compression, flags, validator, concurrency } = this.#target const dataPath = `${dirname(path)}/data/${uuidv4()}.vhd` try { @@ -36,10 +37,15 @@ export class VhdDirectoryRemote extends BaseVhd { }, { concurrency } ) + console.log('BLOCK WRITTEN ') await Promise.all([vhd.writeFooter(), vhd.writeHeader(), vhd.writeBlockAllocationTable()]) + console.log('HEADER WRITTEN ') await validator(dataPath) + console.log('VALIDATOR DONE ') await VhdAbstract.createAlias(handler, path, dataPath) + console.log('ALIAS CREATED') } catch (err) { + console.log('error in write', err) await handler.unlink(dataPath).catch(() => {}) throw err } diff --git a/@xen-orchestra/disk-transform/src/consumer/VhdStream.mts b/@xen-orchestra/disk-transform/src/consumer/VhdStream.mts index 50b1a39f29c..93f72649635 100644 --- a/@xen-orchestra/disk-transform/src/consumer/VhdStream.mts +++ b/@xen-orchestra/disk-transform/src/consumer/VhdStream.mts @@ -19,7 +19,7 @@ export class VhdStream extends BaseVhd { yield footer yield header yield bat - for await (const { data } of blocks) { + for await (const { data, index } of blocks) { yield Buffer.concat([FULL_BLOCK_BITMAP, data]) } yield footer diff --git a/@xen-orchestra/disk-transform/src/producer/Raw.mts b/@xen-orchestra/disk-transform/src/producer/Raw.mts new file mode 100644 index 00000000000..0b7a03e5905 --- /dev/null +++ b/@xen-orchestra/disk-transform/src/producer/Raw.mts @@ -0,0 +1,71 @@ +import { FileAccessor } from "../FileAccessor.mjs"; +import { DiskBlock, PortableDisk, RandomAccessDisk } from "../PortableDisk.mjs"; + +/** + * create a portable disk from a raw disk + */ +export class RawDisk extends RandomAccessDisk{ + + #handler: FileAccessor + #path: string + + #fileHandle:number | Promise + #blockSize:number + #virtualSize:number + + constructor(handler: FileAccessor, path:string, blockSize:number){ + super() + this.#handler = handler + this.#path = path + this.#blockSize = blockSize + } + getVirtualSize(): number { + return this.#virtualSize + } + getBlockSize(): number { + return this.#blockSize + } + async readBlock(index: number): Promise { + if(this.#fileHandle === undefined){ + this.#fileHandle = this.#handler.open(this.#path, {offset: index*this.getBlockSize()}) + } + + if(typeof this.#fileHandle !== 'number'){ + this.#fileHandle = await this.#fileHandle + } + const data = Buffer.alloc(this.getBlockSize(), 0) + await this.#handler.read(this.#path, data, index*this.getBlockSize() ) + return {index, data} + } + async init(): Promise { + const size = await this.#handler.getSize(this.#path) + this.#virtualSize = size + } + async close(): Promise { + if(this.#fileHandle === undefined){ + return + } + if(typeof this.#fileHandle !== 'number'){ + this.#fileHandle = await this.#fileHandle + } + await this.#handler.close(this.#fileHandle) + } + isDifferencing(): boolean { + return false + } + openParent(): Promise { + throw new Error("Method not implemented."); + } + getBlockIndexes(): Array { + const nbBlocks = Math.ceil(this.getVirtualSize()/this.getBlockSize()) + const indexes = [] + for(let i=0; i { + return this.#chain.readBlock(index) + } + async init(): Promise { + let disk:RemoteVhd + disk = new RemoteVhd({handler: this.#handler, path: this.#path}) + await disk.init() + const disks = [disk] + + while(disk.isDifferencing()){ + disk = await disk.openParent() as RemoteVhd + disks.unshift(disk) + } + // the root disk + this.#chain = new DiskChain({disks}) + + // already initialized by individual open + // await this.#chain.init() + + } + close(): Promise { + return this.#chain.close() + } + isDifferencing(): boolean { + return this.#chain.isDifferencing() + } + openParent(): Promise { + throw new Error("Method not implemented."); + } + getBlockIndexes(): Array { + return this.#chain.getBlockIndexes() + } + hasBlock(index: number): boolean { + + return this.#chain.hasBlock(index) + } + +} \ No newline at end of file diff --git a/@xen-orchestra/disk-transform/src/producer/RemoteVhd.mts b/@xen-orchestra/disk-transform/src/producer/RemoteVhd.mts index 2df8fbe649d..973e94f9a75 100644 --- a/@xen-orchestra/disk-transform/src/producer/RemoteVhd.mts +++ b/@xen-orchestra/disk-transform/src/producer/RemoteVhd.mts @@ -5,6 +5,7 @@ import { type DiskBlock, } from '../PortableDisk.mjs' import { openVhd } from 'vhd-lib' +import { DISK_TYPES } from 'vhd-lib/_constants.js' import type { VhdDirectory } from 'vhd-lib/Vhd/VhdDirectory.js' import type { VhdFile } from 'vhd-lib/Vhd/VhdFile.js' @@ -12,6 +13,7 @@ export class RemoteVhd extends RandomAccessDisk { #path:string #handler: FileAccessor #vhd: VhdFile | VhdDirectory + #isDifferencing:boolean #dispose: () => any constructor({ handler, path }: { handler: FileAccessor; path: string }) { @@ -20,14 +22,21 @@ export class RemoteVhd extends RandomAccessDisk { this.#path =path this.#handler = handler } + getVirtualSize(): number { + return this.#vhd.footer.currentSize + } + getBlockSize(): number { + return 2*1024*1024 + } async init(): Promise { + console.log('INIT') const { value, dispose } = await openVhd(this.#handler, this.#path) this.#vhd = value - this.virtualSize = this.#vhd.footer.currentSize - this.blockSize = 2*1024*1024 this.#dispose = dispose await this.#vhd.readBlockAllocationTable() + this.#isDifferencing = value.footer.diskType === DISK_TYPES.DIFFERENCING + console.log('vhd',this.#vhd) } async close(): Promise { await this.#dispose() @@ -57,6 +66,7 @@ export class RemoteVhd extends RandomAccessDisk { async openParent():Promise{ const parentPath = this.#vhd.header.parentUnicodeName + console.log({parentPath}) if(!parentPath){ throw new Error(`Disk ${this.#path} doesn't have parents`) } @@ -65,6 +75,6 @@ export class RemoteVhd extends RandomAccessDisk { return parent } isDifferencing():boolean{ - return !!this.#vhd.header.parentUnicodeName + return this.#isDifferencing } } diff --git a/@xen-orchestra/disk-transform/src/producer/Xapi.mts b/@xen-orchestra/disk-transform/src/producer/Xapi.mts index 467baaacb57..94b5e39789f 100644 --- a/@xen-orchestra/disk-transform/src/producer/Xapi.mts +++ b/@xen-orchestra/disk-transform/src/producer/Xapi.mts @@ -14,6 +14,7 @@ const warn = console.error * use nbd , change block tracking and stream export depending of capabilities */ export class XapiDiskSource extends PortableDisk{ + #vdiRef:string #baseRef?:string #preferNbd:boolean @@ -21,19 +22,7 @@ export class XapiDiskSource extends PortableDisk{ #xapi:any // @todo do a better type here #source:PortableDisk - public get virtualSize(): number { - return this.#source.virtualSize - } - public set virtualSize(value: number) { - this.#source.virtualSize = value - } - - public get blockSize(): number { - return this.#source.blockSize - } - public set blockSize(value: number) { - this.#source.blockSize = value - } + constructor({xapi, vdiRef, baseRef, preferNbd=true, nbdConcurrency=2}){ super() this.#vdiRef = vdiRef @@ -43,7 +32,12 @@ export class XapiDiskSource extends PortableDisk{ this.#xapi = xapi } - + getVirtualSize(): number { + return this.#source.getVirtualSize() + } + getBlockSize(): number { + return this.#source.getBlockSize() + } /** * create a disk source using stream export + NBD * on failure fall back to a full @@ -51,6 +45,7 @@ export class XapiDiskSource extends PortableDisk{ * @returns {Promise} */ async #openNbdStream():Promise{ + console.log('open nbd stream') const xapi = this.#xapi const baseRef = this.#baseRef const vdiRef = this.#vdiRef @@ -78,6 +73,7 @@ export class XapiDiskSource extends PortableDisk{ */ async #openExportStream():Promise{ + console.log('open export stream') const xapi = this.#xapi const baseRef = this.#baseRef const vdiRef = this.#vdiRef @@ -106,6 +102,7 @@ export class XapiDiskSource extends PortableDisk{ */ async #openNbdCbt():Promise{ + console.log('open nbdcbt') const xapi = this.#xapi const baseRef = this.#baseRef const vdiRef = this.#vdiRef @@ -114,6 +111,7 @@ export class XapiDiskSource extends PortableDisk{ await source.init() return source }catch(error){ + warn('opennbdCBT',error) await source.close() // a lot of things can go wrong with cbt: // no enabled on the basref @@ -129,6 +127,11 @@ export class XapiDiskSource extends PortableDisk{ async init(): Promise { + console.log('INIT XAPI export', { + preferNbd:this.#preferNbd, + baseRef:this.#baseRef, + vdiRef:this.#vdiRef, + }) if(this.#preferNbd){ if(this.#baseRef !== undefined){ this.#source = await this.#openNbdCbt() diff --git a/@xen-orchestra/disk-transform/src/producer/XapiVhdCbt.mts b/@xen-orchestra/disk-transform/src/producer/XapiVhdCbt.mts index 07ca8926e0b..76882e66c61 100644 --- a/@xen-orchestra/disk-transform/src/producer/XapiVhdCbt.mts +++ b/@xen-orchestra/disk-transform/src/producer/XapiVhdCbt.mts @@ -1,11 +1,12 @@ import assert from 'node:assert' -import { PortableDisk, type DiskBlock } from '../PortableDisk.mjs' +import { PortableDisk, RandomAccessDisk, type DiskBlock } from '../PortableDisk.mjs' import { connectNbdClientIfPossible } from './nbdutils.mjs' type ErrorWithCode = Error & { code: string } -export class XapiVhdCbtSource extends PortableDisk { +export class XapiVhdCbtSource extends RandomAccessDisk { + #nbdClient: any #nbdConcurrency: number #ref: string @@ -13,6 +14,8 @@ export class XapiVhdCbtSource extends PortableDisk { #parentId: string #xapi: any #cbt: Buffer + #virtualSize + constructor({ vdiRef, baseRef, xapi, nbdConcurrency }:{vdiRef:string, baseRef:string, xapi:any, nbdConcurrency:number}) { super() this.#ref = vdiRef @@ -28,13 +31,11 @@ export class XapiVhdCbtSource extends PortableDisk { const baseRef = this.#baseRef const nbdConcurrency = this.#nbdConcurrency - const [cbt_enabled, size, uuid, vdiName] = await Promise.all([ + const [cbt_enabled, size] = await Promise.all([ xapi.getField('VDI', ref, 'cbt_enabled').catch(() => { /* on XS < 7.3 cbt is not supported */ }), - xapi.getField('VDI', ref, 'virtual_size'), - xapi.getField('VDI', ref, 'uuid'), - xapi.getField('VDI', ref, 'name_label'), + xapi.getField('VDI', ref, 'virtual_size') ]) if (cbt_enabled === false) { const error = new Error(`CBT is disabled`) as ErrorWithCode @@ -46,15 +47,20 @@ export class XapiVhdCbtSource extends PortableDisk { .getField('VDI', baseRef, 'sm_config') .then((sm_config: { 'vhd-parent': string }) => sm_config['vhd-parent']) //const baseParentType = await xapi.getField('VDI', baseRef, 'type') // cbt_metadata - this.virtualSize = size + this.#virtualSize = size + this.#cbt = (await this.#xapi.VDI_listChangedBlock(this.#ref, this.#baseRef)) as Buffer this.#nbdClient = await connectNbdClientIfPossible(xapi, ref, nbdConcurrency) - - this.#cbt = (await this.#xapi.VDI_listChangedBlock(this.#baseRef, this.#ref)) as Buffer } + getVirtualSize(): number { + return this.#virtualSize + } + getBlockSize(): number { + return 2 * 1024 * 1024 + } getBlockIndexes(): Array { - assert.strictEqual(this.blockSize, 2 * 1024 * 1024) + assert.strictEqual(this.getBlockSize(), 2 * 1024 * 1024) // block are aligned, we could probably compare the bytes to 255 // each CBT block is 64KB @@ -73,15 +79,15 @@ export class XapiVhdCbtSource extends PortableDisk { } return blocks } - async *buildDiskBlockGenerator(): AsyncGenerator { - for (const index of this.getBlockIndexes()) { - const data = await this.#nbdClient.readBlock(index, this.blockSize) - yield { index, data } - } + + async readBlock(index: number): Promise { + const data = await this.#nbdClient.readBlock(index, this.getBlockSize()) + return { index, data } } + async close() { - await this.#nbdClient.disconnect() + await this.#nbdClient?.disconnect() } isDifferencing(): boolean { return true diff --git a/@xen-orchestra/disk-transform/src/producer/XapiVhdStreamNbd.mts b/@xen-orchestra/disk-transform/src/producer/XapiVhdStreamNbd.mts index e8b55c0caae..9614b084b1f 100644 --- a/@xen-orchestra/disk-transform/src/producer/XapiVhdStreamNbd.mts +++ b/@xen-orchestra/disk-transform/src/producer/XapiVhdStreamNbd.mts @@ -18,7 +18,7 @@ export class XapiVhdStreamNbdSource extends XapiVhdStreamSource { await super.close() } async readBlock(index: number): Promise { - const data = await this.#nbdClient.readBlock(index, this.blockSize) + const data = await this.#nbdClient.readBlock(index, this.getBlockSize()) return { index, data } } async *buildDiskBlockGenerator(): AsyncGenerator { diff --git a/@xen-orchestra/disk-transform/src/producer/XapiVhdStreamSource.mts b/@xen-orchestra/disk-transform/src/producer/XapiVhdStreamSource.mts index 696f019001e..c06633af4e1 100644 --- a/@xen-orchestra/disk-transform/src/producer/XapiVhdStreamSource.mts +++ b/@xen-orchestra/disk-transform/src/producer/XapiVhdStreamSource.mts @@ -7,9 +7,11 @@ import { Readable } from 'node:stream' import assert from 'node:assert' export class XapiVhdStreamSource extends PortableDisk { + #vhdStream: Readable #busy = false #streamOffset = 0 + #virtualSize:number #blocks: Array<{ index: number; offset: number }> = [] #blockSet = new Set() @@ -19,7 +21,7 @@ export class XapiVhdStreamSource extends PortableDisk { #initDone = false - #xapi + #xapi:any get xapi() { return this.#xapi } @@ -41,6 +43,12 @@ export class XapiVhdStreamSource extends PortableDisk { this.#baseRef = baseRef this.#xapi = xapi } + getVirtualSize(): number { + return this.#virtualSize + } + getBlockSize(): number { + return 2*1024*1024 + } async #read(length: number): Promise { if (this.#busy) { throw new Error("Can't read/skip multiple block in parallel") @@ -76,9 +84,8 @@ export class XapiVhdStreamSource extends PortableDisk { this.#vhdStream = await this.#getExportStream() const footer = unpackFooter(await this.#read(FOOTER_SIZE)) this.#isDifferencing = footer.diskType === DISK_TYPES.DIFFERENCING - this.virtualSize = footer.currentSize + this.#virtualSize = footer.currentSize const header = unpackHeader(await this.#read(HEADER_SIZE)) - this.blockSize = header.blockSize const batSize = Math.ceil((header.maxTableEntries * 4) / SECTOR_SIZE) * SECTOR_SIZE // skip space between header and beginning of the table console.log({ header, already: FOOTER_SIZE + HEADER_SIZE }) diff --git a/@xen-orchestra/disk-transform/tsconfig.json b/@xen-orchestra/disk-transform/tsconfig.json index 5f3c75540a1..2c6491da2b6 100644 --- a/@xen-orchestra/disk-transform/tsconfig.json +++ b/@xen-orchestra/disk-transform/tsconfig.json @@ -5,7 +5,7 @@ "esModuleInterop": true, // Facilite les imports de modules CommonJS/ESM // "resolveJsonModule": true, // Permet d'importer des fichiers JSON "skipLibCheck": true, // Ignore les erreurs de type dans les dépendances externes - "target": "es2017", + "target": "ES2024", "module": "NodeNext", "moduleResolution": "nodenext", "declaration": true, // Generates `.d.ts` files for type safety diff --git a/@xen-orchestra/log/index.js b/@xen-orchestra/log/index.js index 3d864b09abf..fbb7279a982 100644 --- a/@xen-orchestra/log/index.js +++ b/@xen-orchestra/log/index.js @@ -74,7 +74,11 @@ prototype.wrap = function (message, fn) { } } } - +/** + * + * @param {string} namespace + * @returns {Logger} + */ const createLogger = namespace => new Logger(namespace) module.exports = exports = createLogger diff --git a/packages/vhd-lib/Vhd/VhdAbstract.js b/packages/vhd-lib/Vhd/VhdAbstract.js index e0044cde315..45140515712 100644 --- a/packages/vhd-lib/Vhd/VhdAbstract.js +++ b/packages/vhd-lib/Vhd/VhdAbstract.js @@ -39,14 +39,6 @@ exports.VhdAbstract = class VhdAbstract { return computeSectorsPerBlock(this.header.blockSize) } - get header() { - throw new Error('get header is not implemented') - } - - get footer() { - throw new Error('get footer not implemented') - } - /** * instantiate a Vhd * diff --git a/packages/vhd-lib/index.js b/packages/vhd-lib/index.js index 7f82d247a24..d0055436251 100644 --- a/packages/vhd-lib/index.js +++ b/packages/vhd-lib/index.js @@ -5,7 +5,6 @@ exports.checkFooter = require('./checkFooter') exports.checkVhdChain = require('./checkChain') exports.createReadableSparseStream = require('./createReadableSparseStream') exports.createVhdStreamWithLength = require('./createVhdStreamWithLength') -exports.isVhdDifferencingDisk = require('./isVhdDifferencingDisk') exports.peekFooterFromVhdStream = require('./peekFooterFromVhdStream') exports.openVhd = require('./openVhd').openVhd exports.VhdAbstract = require('./Vhd/VhdAbstract').VhdAbstract diff --git a/packages/vhd-lib/isVhdDifferencingDisk.js b/packages/vhd-lib/isVhdDifferencingDisk.js deleted file mode 100644 index 4d8b4c0ec5f..00000000000 --- a/packages/vhd-lib/isVhdDifferencingDisk.js +++ /dev/null @@ -1,11 +0,0 @@ -'use strict' -const { readChunkStrict } = require('@vates/read-chunk') -const { FOOTER_SIZE, DISK_TYPES } = require('./_constants') -const { unpackFooter } = require('./Vhd/_utils') - -module.exports = async function isVhdDifferencingDisk(stream) { - const bufFooter = await readChunkStrict(stream, FOOTER_SIZE) - stream.unshift(bufFooter) - const footer = unpackFooter(bufFooter) - return footer.diskType === DISK_TYPES.DIFFERENCING -}