Skip to content

Commit

Permalink
feat(disk-transforms): handle cbt/ nbd in disk transform
Browse files Browse the repository at this point in the history
  • Loading branch information
fbeauchamp committed Feb 10, 2025
1 parent 3ca6799 commit 8bf7350
Show file tree
Hide file tree
Showing 11 changed files with 188 additions and 334 deletions.
14 changes: 10 additions & 4 deletions @xen-orchestra/backups/_incrementalVm.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { Task } from './Task.mjs'
import pick from 'lodash/pick.js'
import { BASE_DELTA_VDI, COPY_OF, VM_UUID } from './_otherConfig.mjs'

import { XapiVhdStreamSource } from '../../@xen-orchestra/disk-transform/dist/producer/XapiVhdStreamSource.mjs'
import { XapiDiskSource } from '../../@xen-orchestra/disk-transform/dist/producer/Xapi.mjs'

Check failure on line 13 in @xen-orchestra/backups/_incrementalVm.mjs

View workflow job for this annotation

GitHub Actions / CI

"../../@xen-orchestra/disk-transform/dist/producer/Xapi.mjs" is not published
import { VhdStream } from '../disk-transform/dist/consumer/VhdStream.mjs'

Check failure on line 14 in @xen-orchestra/backups/_incrementalVm.mjs

View workflow job for this annotation

GitHub Actions / CI

"../disk-transform/dist/consumer/VhdStream.mjs" is not published

const ensureArray = value => (value === undefined ? [] : Array.isArray(value) ? value : [value])
Expand Down Expand Up @@ -55,7 +55,10 @@ export async function exportIncrementalVm(
$snapshot_of$uuid: vdi.$snapshot_of?.uuid,
$SR$uuid: vdi.$SR.uuid,
}
disks[`${vdiRef}.vhd`] = new XapiVhdStreamSource({ vdiRef, xapi: vm.$xapi, baseRef: baseVdi?.$ref })
disks[`${vdiRef}.vhd`] = new XapiDiskSource({
vdiRef, xapi: vm.$xapi, baseRef: baseVdi?.$ref,
nbdConcurrency,
preferNbd, })
await disks[`${vdiRef}.vhd`].init()
})

Expand All @@ -66,8 +69,11 @@ export async function exportIncrementalVm(
...suspendVdi,
$SR$uuid: suspendVdi.$SR.uuid,
}
// @todo implement suspend disk handling
throw new Error('suspend disk is not implemented yet')
disks[`${vdiRef}.vhd`] = new XapiDiskSource({
vdiRef:suspendVdi.$ref, xapi: vm.$xapi,
nbdConcurrency,
preferNbd, })
await disks[`${vdiRef}.vhd`].init()

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ export const IncrementalXapi = class IncrementalXapiVmBackupRunner extends Abstr
})

const isVhdDifferencing = {}

Object.entries(deltaExport.disks).forEach(([key, disk])=>{
isVhdDifferencing[key] = disk.isDifferencing()
})
// @todo : reimplement fork, throttle, validation,isVhdDifferencingDisk , nbd use
const timestamp = Date.now()

Expand Down
1 change: 0 additions & 1 deletion @xen-orchestra/disk-transform/src/PortableDisk.mts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ export abstract class PortableDisk {
abstract getBlockIndexes(): Array<number>
abstract hasBlock(index: number): boolean
abstract buildDiskBlockGenerator(): Promise<AsyncGenerator<DiskBlock>> | AsyncGenerator<DiskBlock>

async *diskBlocks(): AsyncGenerator<DiskBlock> {
const generator = await this.buildDiskBlockGenerator()
try {
Expand Down
15 changes: 0 additions & 15 deletions @xen-orchestra/disk-transform/src/factory/XapiSourceFactory.mts

This file was deleted.

163 changes: 163 additions & 0 deletions @xen-orchestra/disk-transform/src/producer/Xapi.mts
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@

import { DiskBlock, PortableDisk } from "../PortableDisk.mjs";
import { XapiVhdCbtSource } from "./XapiVhdCbt.mjs";
import { XapiVhdStreamNbdSource } from "./XapiVhdStreamNbd.mjs";
import { XapiVhdStreamSource } from "./XapiVhdStreamSource.mjs";

// import { createLogger } from '@xen-orchestra/log'

// @todo : I can't find the right type for createLogger with it's dynamic properties
const warn = console.error

/**
* meta class that handle the fall back logic when trying to export a disk from xapi
* use nbd , change block tracking and stream export depending of capabilities
*/
export class XapiDiskSource extends PortableDisk{
#vdiRef:string
#baseRef?:string
#preferNbd:boolean
#nbdConcurrency:number
#xapi:any // @todo do a better type here

#source:PortableDisk
public get virtualSize(): number {
return this.#source.virtualSize
}
public set virtualSize(value: number) {
this.#source.virtualSize = value
}

public get blockSize(): number {
return this.#source.blockSize
}
public set blockSize(value: number) {
this.#source.blockSize = value
}
constructor({xapi, vdiRef, baseRef, preferNbd=true, nbdConcurrency=2}){
super()
this.#vdiRef = vdiRef
this.#baseRef = baseRef
this.#preferNbd = preferNbd
this.#nbdConcurrency = nbdConcurrency
this.#xapi = xapi

}

/**
* create a disk source using stream export + NBD
* on failure fall back to a full
*
* @returns {Promise<XapiVhdStreamSource>}
*/
async #openNbdStream():Promise<XapiVhdStreamNbdSource>{
const xapi = this.#xapi
const baseRef = this.#baseRef
const vdiRef = this.#vdiRef
let source = new XapiVhdStreamNbdSource({vdiRef, baseRef, xapi, nbdConcurrency:this.#nbdConcurrency})
try{
await source.init()
}catch(err){
await source.close()
if(err.code === 'VDI_CANT_DO_DELTA'){
warn(`can't compute delta of XapiVhdStreamNbdSource ${vdiRef} from ${baseRef}, fallBack to a full`)
source = new XapiVhdStreamNbdSource({vdiRef, baseRef, xapi})
await source.init()
} else {
throw err
}
}
return source
}

/**
* create a disk source using stream export
* on failure fall back to a full
*
* @returns {Promise<XapiVhdStreamSource>}
*/

async #openExportStream():Promise<XapiVhdStreamSource>{
const xapi = this.#xapi
const baseRef = this.#baseRef
const vdiRef = this.#vdiRef
let source = new XapiVhdStreamSource({vdiRef, baseRef, xapi})
try{
await source.init()
}catch(err){
await source.close()
if(err.code === 'VDI_CANT_DO_DELTA'){
warn(`can't compute delta of XapiVhdStreamSource ${vdiRef} from ${baseRef}, fallBack to a full`)
source = new XapiVhdStreamSource({vdiRef, baseRef, xapi})
await source.init()
} else {
throw err
}
}
return source

}

/**
* create a disk source using nbd and CBT
* on failure fall back to stream + nbd
*
* @returns {Promise<XapiVhdCbtSource|XapiVhdStreamNbdSource>}
*/

async #openNbdCbt():Promise<XapiVhdCbtSource|XapiVhdStreamNbdSource>{
const xapi = this.#xapi
const baseRef = this.#baseRef
const vdiRef = this.#vdiRef
let source = new XapiVhdCbtSource({vdiRef, baseRef, xapi, nbdConcurrency:this.#nbdConcurrency})
try{
await source.init()
return source
}catch(error){
await source.close()
// a lot of things can go wrong with cbt:
// no enabled on the basref
// not anebaled on the vdi
// disabled/enabled in between
// sr not supporting it
// Plus the standard failures

// try without CBT on failure
return this.#openNbdStream()
}
}


async init(): Promise<void> {
if(this.#preferNbd){
if(this.#baseRef !== undefined){
this.#source = await this.#openNbdCbt()
} else {
// pure CBT/nbd is not available for base :
// the base incremental needs the block list to work efficiently
this.#source = await this.#openNbdStream()
}
} else {
this.#source = await this.#openExportStream()
}
}
close(): Promise<void> {
return this.#source?.close()
}
isDifferencing(): boolean {
return this.#source?.isDifferencing()
}
openParent(): Promise<PortableDisk> {
return this.#source?.openParent()
}
getBlockIndexes(): Array<number> {
return this.#source?.getBlockIndexes()
}
hasBlock(index: number): boolean {
return this.#source?.hasBlock(index)
}
buildDiskBlockGenerator(): Promise<AsyncGenerator<DiskBlock>> | AsyncGenerator<DiskBlock> {
return this.#source?.buildDiskBlockGenerator()
}

}
2 changes: 1 addition & 1 deletion @xen-orchestra/disk-transform/src/producer/XapiVhdCbt.mts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export class XapiVhdCbtSource extends PortableDisk {
#parentId: string
#xapi: any
#cbt: Buffer
constructor({ vdiRef, baseRef, xapi, nbdConcurrency }) {
constructor({ vdiRef, baseRef, xapi, nbdConcurrency }:{vdiRef:string, baseRef:string, xapi:any, nbdConcurrency:number}) {
super()
this.#ref = vdiRef
this.#baseRef = baseRef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ export class XapiVhdStreamNbdSource extends XapiVhdStreamSource {

async close() {
await super.close()
await this.#nbdClient.disconnect()
await this.#nbdClient?.disconnect()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export class XapiVhdStreamSource extends PortableDisk {

#isDifferencing: boolean

constructor({ vdiRef, baseRef, xapi }) {
constructor({ vdiRef, baseRef, xapi }:{vdiRef: string, baseRef?:string, xapi:any}) {
super()
this.#ref = vdiRef
this.#baseRef = baseRef
Expand Down Expand Up @@ -96,11 +96,9 @@ export class XapiVhdStreamSource extends PortableDisk {
blocks.sort((b1, b2) => b1.offset - b2.offset)
this.#blocks = blocks
this.#initDone = true
console.log('init done')
}

async close() {
assert.strictEqual(this.#initDone, true, 'init must be done to call close')
this.#vhdStream?.destroy()
}

Expand Down
Loading

0 comments on commit 8bf7350

Please sign in to comment.