Skip to content

Commit

Permalink
more refacto, mirror backup works, implement differential restore
Browse files Browse the repository at this point in the history
  • Loading branch information
fbeauchamp committed Feb 12, 2025
1 parent e682221 commit dc391b7
Show file tree
Hide file tree
Showing 25 changed files with 391 additions and 163 deletions.
63 changes: 32 additions & 31 deletions @xen-orchestra/backups/ImportVmBackup.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import { formatFilenameDate } from './_filenameDate.mjs'
import { importIncrementalVm } from './_incrementalVm.mjs'
import { Task } from './Task.mjs'
import { watchStreamSize } from './_watchStreamSize.mjs'
import { VhdNegative, VhdSynthetic } from 'vhd-lib'
import { decorateClass } from '@vates/decorate-with'
import { createLogger } from '@xen-orchestra/log'
import { dirname, join } from 'node:path'
import pickBy from 'lodash/pickBy.js'
import { defer } from 'golike-defer'
import { defer } from 'golike-defer'
import { RemoteChain} from '../disk-transform/dist/producer/RemoteChain.mjs'

Check failure on line 12 in @xen-orchestra/backups/ImportVmBackup.mjs

View workflow job for this annotation

GitHub Actions / CI

"../disk-transform/dist/producer/RemoteChain.mjs" is not published
import { NegativeDisk} from '../disk-transform/dist/NegativeDisk.mjs'

Check failure on line 13 in @xen-orchestra/backups/ImportVmBackup.mjs

View workflow job for this annotation

GitHub Actions / CI

"../disk-transform/dist/NegativeDisk.mjs" is not published

const { debug, info, warn } = createLogger('xo:backups:importVmBackup')
async function resolveUuid(xapi, cache, uuid, type) {
Expand Down Expand Up @@ -59,7 +60,7 @@ export class ImportVmBackup {
const metadata = this._metadata
const { mapVdisSrs } = this._importIncrementalVmSettings
const { vbds, vhds, vifs, vm, vmSnapshot, vtpms } = metadata
const streams = {}
const disks = {}
const metdataDir = dirname(metadata._filename)
const vdis = ignoredVdis === undefined ? metadata.vdis : pickBy(metadata.vdis, vdi => !ignoredVdis.has(vdi.uuid))

Expand Down Expand Up @@ -110,82 +111,84 @@ export class ImportVmBackup {
}
}

let stream
let disk
const backupWithSnapshotPath = join(metdataDir, backupCandidate ?? '')
if (vhdPath === backupWithSnapshotPath) {
// all the data are already on the host
debug('direct reuse of a snapshot')
stream = null
disk = null
vdis[vdiRef].baseVdi = snapshotCandidate
// go next disk , we won't use this stream
continue
}

let disposableDescendants

const disposableSynthetic = await VhdSynthetic.fromVhdChain(this._adapter._handler, vhdPath)

const parent = new RemoteChain({
handler: this._adapter._handler,
path: vhdPath
})
await parent.init()

// this will also clean if another disk of this VM backup fails
// if user really only need to restore non failing disks he can retry with ignoredVdis
let disposed = false
const disposeOnce = async () => {
if (!disposed) {
disposed = true
try {
await disposableDescendants?.dispose()
await disposableSynthetic?.dispose()
await parent?.close()
} catch (error) {
warn('openVhd: failed to dispose VHDs', { error })
}
}
}
$defer.onFailure(() => disposeOnce())

const parentVhd = disposableSynthetic.value
await parentVhd.readBlockAllocationTable()
debug('got vhd synthetic of parents', parentVhd.length)

debug('got vhd synthetic of parents', parent)

if (snapshotCandidate !== undefined) {
let descendant,negativeDisk
try {
debug('will try to use differential restore', {
backupWithSnapshotPath,
vhdPath,
vdiRef,
})

disposableDescendants = await VhdSynthetic.fromVhdChain(this._adapter._handler, backupWithSnapshotPath, {
descendant = new RemoteChain({
handler: this._adapter._handler,
path: backupWithSnapshotPath,
until: vhdPath,
})
const descendantsVhd = disposableDescendants.value
await descendantsVhd.readBlockAllocationTable()
await descendant.init()

debug('got vhd synthetic of descendants')
const negativeVhd = new VhdNegative(parentVhd, descendantsVhd)
negativeDisk = new NegativeDisk(parent, descendant)
debug('got vhd negative')

// update the stream with the negative vhd stream
stream = await negativeVhd.stream()
disk = negativeDisk
vdis[vdiRef].baseVdi = snapshotCandidate

} catch (error) {
// can be a broken VHD chain, a vhd chain with a key backup, ....
// not an irrecuperable error, don't dispose parentVhd, and fallback to full restore
warn(`can't use differential restore`, { error })
disposableDescendants?.dispose()
descendant?.close()
negativeDisk?.close()
}
}
// didn't make a negative stream : fallback to classic stream
if (stream === undefined) {
if (disk === undefined) {
debug('use legacy restore')
stream = await parentVhd.stream()
disk = parent
}

stream.on('end', disposeOnce)
stream.on('close', disposeOnce)
stream.on('error', disposeOnce)
info('everything is ready, will transfer', stream.length)
streams[`${vdiRef}.vhd`] = stream
info('everything is ready, will transfer', disk)
disks[`${vdiRef}.vhd`] = disk
}
return {
streams,
disks,
vbds,
vdis,
version: '1.0.0',
Expand All @@ -205,8 +208,7 @@ export class ImportVmBackup {
)
let backup
if (useDifferentialRestore) {
throw new Error('differentiela resotre is not supported')
// backup = await this._reuseNearestSnapshot(ignoredVdis)
backup = await this._reuseNearestSnapshot(ignoredVdis)
} else {
backup = await this._adapter.readIncrementalVmBackup(this._metadata, ignoredVdis)
}
Expand Down Expand Up @@ -242,7 +244,6 @@ export class ImportVmBackup {
assert.strictEqual(metadata.mode, 'delta')

backup = await this.#decorateIncrementalVmMetadata()
// @todo put back the stream size
}

return Task.run(
Expand Down
10 changes: 7 additions & 3 deletions @xen-orchestra/backups/RemoteAdapter.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import { watchStreamSize } from './_watchStreamSize.mjs'

import { VhdDirectoryRemote } from '../../@xen-orchestra/disk-transform/dist/consumer/VhdDirectory.mjs'

Check failure on line 34 in @xen-orchestra/backups/RemoteAdapter.mjs

View workflow job for this annotation

GitHub Actions / CI

"../../@xen-orchestra/disk-transform/dist/consumer/VhdDirectory.mjs" is not published
import { VhdStream } from '../../@xen-orchestra/disk-transform/dist/consumer/VhdStream.mjs'

Check failure on line 35 in @xen-orchestra/backups/RemoteAdapter.mjs

View workflow job for this annotation

GitHub Actions / CI

"../../@xen-orchestra/disk-transform/dist/consumer/VhdStream.mjs" is not published
import { openRemoteDisk,openRemoteChain } from '../../@xen-orchestra/disk-transform/dist/factory/Remote.mjs'
import {RemoteVhd} from '../../@xen-orchestra/disk-transform/dist/producer/RemoteVhd.mjs'

Check failure on line 36 in @xen-orchestra/backups/RemoteAdapter.mjs

View workflow job for this annotation

GitHub Actions / CI

"../../@xen-orchestra/disk-transform/dist/producer/RemoteVhd.mjs" is not published
import {RemoteChain} from '../../@xen-orchestra/disk-transform/dist/producer/RemoteChain.mjs'

Check failure on line 37 in @xen-orchestra/backups/RemoteAdapter.mjs

View workflow job for this annotation

GitHub Actions / CI

"../../@xen-orchestra/disk-transform/dist/producer/RemoteChain.mjs" is not published

export const DIR_XO_CONFIG_BACKUPS = 'xo-config-backups'

Expand Down Expand Up @@ -684,7 +685,7 @@ export class RemoteAdapter {
return path
}

async writeVhd(path, disk, { checksum = true, validator = noop, writeBlockConcurrency } = {}) {
async writeVhd(path, disk, { validator = noop, writeBlockConcurrency } = {}) {
const handler = this._handler

if (this.useVhdDirectory()) {
Expand Down Expand Up @@ -725,7 +726,10 @@ export class RemoteAdapter {

// open the hierarchy of ancestors until we find a full one
async _createVhdDisk(handler, path, { useChain }) {
const disk = useChain ? await openRemoteDisk(handler, path) : await openRemoteChain(handler, path)


const disk = useChain ? new RemoteChain({handler, path}) : new RemoteVhd({handler, path})
await disk.init()
return disk
}

Expand Down
4 changes: 2 additions & 2 deletions @xen-orchestra/backups/_incrementalVm.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,8 @@ export const importIncrementalVm = defer(async function importIncrementalVm(

await xapi.setField('VDI', vdi.$ref, 'name_label', `[Importing] ${vdiRecords[id].name_label}`)
const streamBuilder = new VhdStream(disk)

await vdi.$importContent(streamBuilder.toStream(), { cancelToken, format: 'vhd' })
const stream = streamBuilder.toStream()
await vdi.$importContent(stream, { cancelToken, format: 'vhd' })
await xapi.setField('VDI', vdi.$ref, 'name_label', vdiRecords[id].name_label)
}
}),
Expand Down
14 changes: 5 additions & 9 deletions @xen-orchestra/backups/_runners/_vmRunners/IncrementalRemote.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@ import { createLogger } from '@xen-orchestra/log'
import { asyncEach } from '@vates/async-each'
import assert from 'node:assert'
import * as UUID from 'uuid'
import isVhdDifferencingDisk from 'vhd-lib/isVhdDifferencingDisk.js'
import mapValues from 'lodash/mapValues.js'

import { AbstractRemote } from './_AbstractRemote.mjs'
import { forkDeltaExport } from './_forkDeltaExport.mjs'
import { IncrementalRemoteWriter } from '../_writers/IncrementalRemoteWriter.mjs'
import { Disposable } from 'promise-toolbox'
import { openVhd } from 'vhd-lib'
Expand Down Expand Up @@ -71,9 +68,9 @@ class IncrementalRemoteVmBackupRunner extends AbstractRemote {
// recompute if disks are differencing or not
const isVhdDifferencing = {}

await asyncEach(Object.entries(incrementalExport.streams), async ([key, stream]) => {
isVhdDifferencing[key] = await isVhdDifferencingDisk(stream)
})
Object.entries(incrementalExport.disks).forEach (([key, disk]) => {
isVhdDifferencing[key] = disk.isDifferencing()
})
const hasDifferencingDisk = Object.values(isVhdDifferencing).includes(true)
if (metadata.isBase === hasDifferencingDisk) {
warn(`Metadata isBase and real disk value are different`, {
Expand All @@ -86,12 +83,11 @@ class IncrementalRemoteVmBackupRunner extends AbstractRemote {
metadata.isVhdDifferencing = isVhdDifferencing
await this._selectBaseVm(metadata)
await this._callWriters(writer => writer.prepare({ isBase: metadata.isBase }), 'writer.prepare()')

incrementalExport.streams = mapValues(incrementalExport.streams, this._throttleStream)

await this._callWriters(
writer =>
writer.transfer({
deltaExport: forkDeltaExport(incrementalExport),
deltaExport: incrementalExport,
isVhdDifferencing,
timestamp: metadata.timestamp,
vm: metadata.vm,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,23 +224,24 @@ export class IncrementalRemoteWriter extends MixinRemoteWriter(AbstractIncrement
}
let size = 0
await Task.run({ name: 'transfer' }, async () => {
await asyncEach(
Object.keys(deltaExport.vdis),
async id => {
const path = `${this._vmBackupDir}/${vhds[id]}`
await adapter.writeVhd(path, deltaExport.disks[`${id}.vhd`], {
// no checksum for VHDs, because they will be invalidated by
// merges and chainings
checksum: false,
validator: tmpPath => checkVhd(handler, tmpPath),
writeBlockConcurrency: this._config.writeBlockConcurrency,
})
size = size + deltaExport.disks[`${id}.vhd`].generatedDiskBlocks * 2 * 1024 * 1024
},
{
concurrency: settings.diskPerVmConcurrency,
}
)
await asyncEach(
Object.keys(deltaExport.vdis),
async id => {
const path = `${this._vmBackupDir}/${vhds[id]}`
await adapter.writeVhd(path, deltaExport.disks[`${id}.vhd`], {
// no checksum for VHDs, because they will be invalidated by
// merges and chainings
checksum: false,
validator: tmpPath => checkVhd(handler, tmpPath),
writeBlockConcurrency: this._config.writeBlockConcurrency,
})
size = size + deltaExport.disks[`${id}.vhd`].generatedDiskBlocks * 2 * 1024 * 1024
},
{
concurrency: settings.diskPerVmConcurrency,
}
)


return { size }
})
Expand Down
19 changes: 18 additions & 1 deletion @xen-orchestra/disk-transform/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,23 @@
"scripts": {
"build": "tsc",
"dev": "tsc --watch",
"start": "node ."
"test": "node --test"
},
"homepage": "https://github.com/vatesfr/xen-orchestra/tree/master/@xen-orchestra/disk-transform",
"bugs": "https://github.com/vatesfr/xen-orchestra/issues",
"repository": {
"directory": "@xen-orchestra/disk-transform",
"type": "git",
"url": "https://github.com/vatesfr/xen-orchestra.git"
},
"author": {
"name": "Vates SAS",
"url": "https://vates.fr"
},
"engines": {
"node": ">=20.17"
},
"dependencies": {
"vhd-lib": "^4.11.2"
}
}
16 changes: 12 additions & 4 deletions @xen-orchestra/disk-transform/src/BlockSizeChanger.mts
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,28 @@ import { RandomAccessDisk, type DiskBlock, type PortableDisk } from './PortableD
* qcow2 use 64KB clusters, and can use subclusters
*/
export class BlockSizeChanger extends RandomAccessDisk {

#source: RandomAccessDisk
#blockSize
constructor(source: RandomAccessDisk, blockSize: number) {
super()
this.#source = source
this.blockSize = blockSize
this.#blockSize = this.#blockSize = blockSize
}

getVirtualSize(): number {
return this.#source.getVirtualSize()
}
getBlockSize(): number {
return this.#blockSize
}
readBlock(index: number): Promise<DiskBlock> {
throw new Error('Method not implemented.')
}
buildDiskBlockGenerator(): AsyncGenerator<DiskBlock, void, unknown> {
throw new Error('Method not implemented.')
}
generatedDiskBlocks: number
yieldedDiskBlocks: number

init(): Promise<void> {
return this.#source.init()
}
Expand All @@ -30,7 +38,7 @@ export class BlockSizeChanger extends RandomAccessDisk {
}
async openParent(): Promise<PortableDisk> {
const parent = (await this.#source.openParent()) as RandomAccessDisk
return new BlockSizeChanger(parent, this.blockSize)
return new BlockSizeChanger(parent, this.#blockSize)
}
isDifferencing(): boolean {
return this.#source.isDifferencing()
Expand Down
13 changes: 7 additions & 6 deletions @xen-orchestra/disk-transform/src/DiskChain.mts
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
import { PortableDisk, RandomAccessDisk, type DiskBlock } from './PortableDisk.mjs'

export class DiskChain extends RandomAccessDisk {

#disks: Array<RandomAccessDisk> = []
public get virtualSize(): number {
return this.#disks[this.#disks.length - 1].virtualSize
}
public get blockSize(): number {
return this.#disks[0].blockSize
}

constructor({disks}:{disks:Array<RandomAccessDisk>}) {
super()
this.#disks = disks
}
getVirtualSize(): number {
return this.#disks[this.#disks.length - 1].getVirtualSize()
}
getBlockSize(): number {
return this.#disks[0].getBlockSize()
}
hasBlock(index: number): boolean {
for (let i = this.#disks.length - 1; i >= 0; i--) {
if (this.#disks[i].hasBlock(index)) {
Expand Down
Loading

0 comments on commit dc391b7

Please sign in to comment.