Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
fbeauchamp committed Mar 2, 2025
1 parent 85f6079 commit 1ad7d03
Show file tree
Hide file tree
Showing 25 changed files with 495 additions and 710 deletions.
File renamed without changes.
9 changes: 9 additions & 0 deletions @vates/generator-toolbox/src/synchronized.mts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ export class Synchronized<T, TReturn, TNext> {
}

fork(uid: string): AsyncGenerator<T, TReturn, TNext> {
console.log('tooboxk FORK ')
uid = ''+Math.random()
assert.strictEqual(this.#started, false, `can't create a fork after consuming the data`)
const fork = new Forked<T, TReturn, TNext>(this, uid)
this.#forks.set(uid, fork)
Expand All @@ -28,6 +30,7 @@ export class Synchronized<T, TReturn, TNext> {
if (!this.#nextValueForksReady) {
throw new Error('Can t wait forks if there are noone waiting')
}
console.log('wait for forks')
const { promise, forksWaitingResolve } = this.#nextValueForksReady
if (this.#waitingForks.size === this.#forks.size) {
// reset value
Expand All @@ -39,6 +42,7 @@ export class Synchronized<T, TReturn, TNext> {
}

async next(uid: string): Promise<IteratorResult<T>> {
console.log('next', uid)
if (this.#removedForks.has(uid)) {
return { done: true, value: undefined }
}
Expand Down Expand Up @@ -77,6 +81,7 @@ export class Synchronized<T, TReturn, TNext> {
}

async remove(uid: string, error?: Error): Promise<IteratorResult<T>> {
console.log('remove', uid)
const fork = this.#forks.get(uid)
if (fork === undefined) {
if (this.#removedForks.has(uid)) {
Expand Down Expand Up @@ -133,16 +138,20 @@ class Forked<T, TReturn, TNext> implements AsyncGenerator<T, TReturn, TNext> {
this.#uid = uid
}
next(): Promise<IteratorResult<T>> {
console.log('forked next')
return this.#parent.next(this.#uid)
}
async return(): Promise<IteratorResult<T>> {
console.log('forked return')
return this.#parent.remove(this.#uid)
}
async throw(e: Error): Promise<IteratorResult<T>> {
console.log('forked throw')
return this.#parent.remove(this.#uid, e)
}

[Symbol.asyncIterator](): AsyncGenerator<T> {
console.log('in Forked.symbol')
return this
}
}
3 changes: 2 additions & 1 deletion @xen-orchestra/backups/RemoteAdapter.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -689,14 +689,15 @@ export class RemoteAdapter {

if (this.useVhdDirectory()) {
await writeToVhdDirectory(
{ disk },
{ disk , target:
{
handler,
path,
concurrency: writeBlockConcurrency,
validator,
compression: 'brotli',
}
}
)
} else {
const stream = await toVhdStream({ disk })
Expand Down
13 changes: 8 additions & 5 deletions @xen-orchestra/backups/_runners/_vmRunners/IncrementalXapi.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import {
setVmDeltaChainLength,
markExportSuccessfull,
} from '../../_otherConfig.mjs'
import { forkDeltaExport } from './_forkDeltaExport.mjs'
import { forkDeltaExport } from './_forkDeltaExport.mjs'
import { SynchronizedDisk } from '@xen-orchestra/disk-transform'

const { debug } = createLogger('xo:backups:IncrementalXapiVmBackup')

Expand All @@ -40,10 +41,12 @@ export const IncrementalXapi = class IncrementalXapiVmBackupRunner extends Abstr
})

const isVhdDifferencing = {}
Object.entries(deltaExport.disks).forEach(([key, disk])=>{
for(const key in deltaExport.disks){
const disk = deltaExport.disks[key]
isVhdDifferencing[key] = disk.isDifferencing()
})

deltaExport.disks[key] = new SynchronizedDisk(disk)
await deltaExport.disks[key].init()
}
//deltaExport.disks = mapValues(deltaExport.disks, disk=>this._throttleGenerator.createThrottledGenerator(disk) )

// @todo : reimplement fork, throttle, validation,isVhdDifferencingDisk , nbd use
Expand All @@ -52,7 +55,7 @@ export const IncrementalXapi = class IncrementalXapiVmBackupRunner extends Abstr
await this._callWriters(
writer =>
writer.transfer({
deltaExport: forkDeltaExport(deltaExport),
deltaExport,
isVhdDifferencing,
sizeContainers: {},
timestamp,
Expand Down
41 changes: 23 additions & 18 deletions @xen-orchestra/disk-transform/src/Disk.mts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ export type BytesLength = number
export abstract class Disk {
generatedDiskBlocks = 0
yieldedDiskBlocks = 0
#synchronized:Synchronized<DiskBlock, any, any> | undefined
#parent?: Disk
get parent(): Disk | undefined {
return this.#parent
Expand All @@ -23,11 +22,17 @@ export abstract class Disk {
abstract close(): Promise<void>

abstract isDifferencing(): boolean
// optional method, must throw if disk is not differencing
// optional method
instantiateParent(): Promise<Disk> {
throw new Error('Method not implemented.')
}
async openParent(): Promise<Disk> {
if(this.#parent!==undefined){
return this.#parent
}
if(!this.isDifferencing()){
throw new Error("Can't open the parent of a non differencing disk")
}
this.#parent = await this.instantiateParent()
await this.#parent.init()
return this.#parent
Expand All @@ -37,29 +42,27 @@ export abstract class Disk {
* return the block without any order nor stability guarantee
*/
abstract getBlockIndexes(): Array<number>
// must return true if the block is present in this disk
// without looking at the parent
abstract hasBlock(index: number): boolean
abstract buildDiskBlockGenerator(): Promise<AsyncGenerator<DiskBlock>> | AsyncGenerator<DiskBlock>
async diskBlocks(uid?:string): Promise<AsyncGenerator<DiskBlock>>{
if(this.#synchronized === undefined){
try{
async * diskBlocks(uid?:string): AsyncGenerator<DiskBlock>{
try {
const blockGenerator = await this.buildDiskBlockGenerator()
console.log('got generator', blockGenerator)
for await (const block of blockGenerator){
this.generatedDiskBlocks ++
yield block
this.yieldedDiskBlocks ++

const blockGenerator = await this.buildDiskBlockGenerator()
const self = this
async function *trackedGenerator():AsyncGenerator<DiskBlock>{
for await (const block of blockGenerator){
self.generatedDiskBlocks ++
yield block
self.yieldedDiskBlocks ++

}
}
this.#synchronized = new Synchronized(trackedGenerator())
console.log('done')
}catch(err){
console.error('Disk.generator', err)
}finally{
console.log('finally')
await this.close()
}
}
return this.#synchronized.fork(uid ?? "unanamed generator fork")

}

check() {
Expand All @@ -82,6 +85,8 @@ export abstract class RandomAccessDisk extends Disk {
get parent(): RandomAccessDisk | undefined {
return this.#parent
}
// can read data parent if block size are not aligned
// but only if this disk has data on this block
abstract readBlock(index: number): Promise<DiskBlock>
async *buildDiskBlockGenerator() {
for (const index of this.getBlockIndexes()) {
Expand Down
18 changes: 18 additions & 0 deletions @xen-orchestra/disk-transform/src/DiskChain.mts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ export class DiskChain extends RandomAccessDisk {
getBlockSize(): number {
return this.#disks[0].getBlockSize()
}

/**
* the main difference with the base disk block method
* is that if any of th disk has this block it return true
*
*/
hasBlock(index: number): boolean {
for (let i = this.#disks.length - 1; i >= 0; i--) {
if (this.#disks[i].hasBlock(index)) {
Expand All @@ -21,6 +27,7 @@ export class DiskChain extends RandomAccessDisk {
}
return false
}

readBlock(index: number): Promise<DiskBlock> {
for (let i = this.#disks.length - 1; i >= 0; i--) {
if (this.#disks[i].hasBlock(index)) {
Expand All @@ -43,4 +50,15 @@ export class DiskChain extends RandomAccessDisk {
isDifferencing(): boolean {
return this.#disks[0].isDifferencing()
}
static async openFromChild(child:RandomAccessDisk):Promise<RandomAccessDisk>{
let disk = child
const disks=[disk]
while (disk.isDifferencing()) {
disk = await disk.openParent() as RandomAccessDisk
disks.unshift(disk)
// @todo handle until
}
return new DiskChain({ disks })

}
}
63 changes: 63 additions & 0 deletions @xen-orchestra/disk-transform/src/RawDisk.mts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { DiskBlock, RandomAccessDisk } from "./Disk.mjs";
import { FileAccessor } from "./FileAccessor.mjs";


export class RawDisk extends RandomAccessDisk{
#accessor
#path
#descriptor:number|undefined
#blockSize:number
#size: number|undefined


constructor(accessor: FileAccessor, path: string, blockSize:number){
super()
this.#accessor= accessor
this.#path = path
this.#blockSize = blockSize
}
async readBlock(index: number): Promise<DiskBlock> {
if(this.#descriptor === undefined){
throw new Error("Can't call readBlock before init");
}
const data = Buffer.alloc(this.getBlockSize(), 0)
await this.#accessor.read(this.#descriptor, data, index*this.getBlockSize())
return {
index,
data
}
}
getVirtualSize(): number {
if(this.#size === undefined){
throw new Error("Can't call getVirtualsize before init");
}
return this.#size
}
getBlockSize(): number {
return this.#blockSize
}
async init(): Promise<void> {
this.#descriptor = await this.#accessor.open(this.#path)
this.#size = await this.#accessor.getSize(this.#path)
}
async close(): Promise<void> {
this.#descriptor && await this.#accessor.close(this.#descriptor)
this.#descriptor = undefined
this.#size = undefined
}
isDifferencing(): boolean {
return false
}
getBlockIndexes(): Array<number> {
const nbBlocks = Math.ceil(this.getVirtualSize()/ this.getBlockSize())
const index =[]
for(let i=0; i < nbBlocks; i ++){
index.push(i)
}
return index
}
hasBlock(index: number): boolean {
return index * this.getBlockSize() < this.getVirtualSize()
}

}
29 changes: 29 additions & 0 deletions @xen-orchestra/disk-transform/src/SynchronizedDisk.mts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import {DiskPassthrough} from './DiskPassthrough.mjs'
import {Synchronized} from '@vates/generator-toolbox'
import {Disk, DiskBlock} from './Disk.mjs'

export class SynchronizedDisk extends DiskPassthrough{
#synchronized:Synchronized<DiskBlock, any,any> |undefined
#source:Disk
constructor(source:Disk){
super()
this.#source = source
}
async openSource():Promise<Disk>{
// await this.#source.init()
this.#synchronized = new Synchronized(await this.#source.buildDiskBlockGenerator())
return this.#source
}
async * diskBlocks(uid:string): AsyncGenerator<DiskBlock>{
console.log('will fork')
if(this.#synchronized === undefined){
throw new Error("Can't cann fork before init")
}
return this.#synchronized.fork(uid)
}

async close(){
console.log('SynchronizedDisk.close')
await this.source.close() // this will trigger cleanup in syncrhonized
}
}
2 changes: 2 additions & 0 deletions @xen-orchestra/disk-transform/src/index.mts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ export { DiskChain } from './DiskChain.mjs'
export { NegativeDisk } from './NegativeDisk.mjs'
export { FileAccessor } from './FileAccessor.mjs'
export { DiskPassthrough, RandomDiskPassthrough } from './DiskPassthrough.mjs'
export { RawDisk } from './RawDisk.mjs'
export { SynchronizedDisk } from './SynchronizedDisk.mjs'
Loading

0 comments on commit 1ad7d03

Please sign in to comment.