Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding fix to refill non last block earlier staged with half data #1643

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 64 additions & 12 deletions component/block_cache/block_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1570,7 +1570,7 @@
}
}

blockIDList, err := bc.getBlockIDList(handle)
blockIDList, restageId, err := bc.getBlockIDList(handle)
if err != nil {
log.Err("BlockCache::commitBlocks : Failed to get block id list for %v [%v]", handle.Path, err.Error())
return err
Expand Down Expand Up @@ -1598,11 +1598,31 @@
listMap[k].committed = true
}

if restageId != "" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There can be multiple occurrences in a file where the block staged had size less than the configured block size

// We need to restage with block by merging it to the next block
for i := range blockIDList {
if blockIDList[i] == restageId {
// Read one block from offset of this block, which shall effectively read this block and the next block
// Thne stage this block again with correct length

Check failure on line 1606 in component/block_cache/block_cache.go

View workflow job for this annotation

GitHub Actions / Check for spelling errors

Thne ==> Then
// Remove the next block from blockIDList
// Commit the block list again
block, err := bc.getOrCreateBlock(handle, uint64(i)*bc.blockSize)
if err != nil {
log.Err("BlockCache::commitBlocks : Failed to get block for %v [%v]", handle.Path, err.Error())
return err
}

block.Dirty()
return bc.commitBlocks(handle)
}
}
}

handle.Flags.Clear(handlemap.HandleFlagDirty)
return nil
}

func (bc *BlockCache) getBlockIDList(handle *handlemap.Handle) ([]string, error) {
func (bc *BlockCache) getBlockIDList(handle *handlemap.Handle) ([]string, string, error) {
// generate the block id list order
list, _ := handle.GetValue("blockList")
listMap := list.(map[int64]*blockInfo)
Expand All @@ -1617,27 +1637,59 @@

zeroBlockStaged := false
zeroBlockID := ""
restageId := ""
index := int64(0)
i := 0

for i < len(offsets) {
if index == offsets[i] {
// TODO: when a staged block (not last block) has data less than block size
if i != len(offsets)-1 && listMap[offsets[i]].size != bc.blockSize {
log.Err("BlockCache::getBlockIDList : Staged block %v has less data %v for %v=>%s\n%v", offsets[i], listMap[offsets[i]].size, handle.ID, handle.Path, common.BlockCacheRWErrMsg)
return nil, fmt.Errorf("staged block %v has less data %v for %v=>%s\n%v", offsets[i], listMap[offsets[i]].size, handle.ID, handle.Path, common.BlockCacheRWErrMsg)
}
// A non last block was staged earlier and it is not of the same size as block size
// This happens when a block which is not full is staged and at that moment it was the last block
// Now we have written data beyond that point and its no longer the last block
// In such case we need to fill the gap with zero blocks
// For simplicity we will fill the gap with a new block and later merge both these blocks in one block
id := base64.StdEncoding.EncodeToString(common.NewUUIDWithLength(16))
fillerSize := (bc.blockSize - listMap[offsets[i]].size)
fillerOffset := uint64(offsets[i]*int64(bc.blockSize)) + listMap[offsets[i]].size

log.Debug("BlockCache::getBlockIDList : Staging semi zero block for %v=>%v offset %v, size %v", handle.ID, handle.Path, fillerOffset, fillerSize)
err := bc.NextComponent().StageData(internal.StageDataOptions{
Name: handle.Path,
Data: bc.blockPool.zeroBlock.data[:fillerSize],
Id: id,
})

blockIDList = append(blockIDList, listMap[offsets[i]].id)
log.Debug("BlockCache::getBlockIDList : Preparing blocklist for %v=>%s (%v : %v, size %v)", handle.ID, handle.Path, offsets[i], listMap[offsets[i]].id, listMap[offsets[i]].size)
index++
i++
if err != nil {
log.Err("BlockCache::getBlockIDList : Failed to write semi zero block for %v=>%v [%s]", handle.ID, handle.Path, err.Error())
return nil, "", err
}

blockIDList = append(blockIDList, listMap[offsets[i]].id)
log.Debug("BlockCache::getBlockIDList : Preparing blocklist for %v=>%s (%v : %v, size %v)", handle.ID, handle.Path, offsets[i], listMap[offsets[i]].id, listMap[offsets[i]].size)

// After the flush call we need to merge this particular block with the next block (semi zero block)
restageId = listMap[offsets[i]].id

// Add the semi zero block to the list
blockIDList = append(blockIDList, id)
log.Debug("BlockCache::getBlockIDList : Preparing blocklist for %v=>%s (%v : %v, size %v)", handle.ID, handle.Path, fillerOffset, id, fillerSize)

index++
i++

} else {
blockIDList = append(blockIDList, listMap[offsets[i]].id)
log.Debug("BlockCache::getBlockIDList : Preparing blocklist for %v=>%s (%v : %v, size %v)", handle.ID, handle.Path, offsets[i], listMap[offsets[i]].id, listMap[offsets[i]].size)
index++
i++
}
} else {
for index < offsets[i] {
if !zeroBlockStaged {
id, err := bc.stageZeroBlock(handle, 1)
if err != nil {
return nil, err
return nil, "", err
}

zeroBlockStaged = true
Expand All @@ -1657,7 +1709,7 @@
}
}

return blockIDList, nil
return blockIDList, restageId, nil
}

func (bc *BlockCache) stageZeroBlock(handle *handlemap.Handle, tryCnt int) (string, error) {
Expand Down
51 changes: 51 additions & 0 deletions component/block_cache/block_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2830,6 +2830,57 @@
suite.assert.NotEqualValues(xattrMd5sum1, xattrMd5sum2)
}

func (suite *blockCacheTestSuite) TestReadCommittedLastBlockAfterAppends() {
prefetch := 12
cfg := fmt.Sprintf("block_cache:\n block-size-mb: 1\n mem-size-mb: 12\n prefetch: %v\n parallelism: 10", prefetch)
tobj, err := setupPipeline(cfg)
defer tobj.cleanupPipeline()

suite.assert.Nil(err)
suite.assert.NotNil(tobj.blockCache)

path := getTestFileName(suite.T().Name())
storagePath := filepath.Join(tobj.fake_storage_path, path)

// write using block cache
options := internal.CreateFileOptions{Name: path, Mode: 0777}
h, err := tobj.blockCache.CreateFile(options)
suite.assert.Nil(err)
suite.assert.NotNil(h)
suite.assert.Equal(h.Size, int64(0))
suite.assert.False(h.Dirty())

// Jump to 13thMB offset and write 500kb of data
n, err := tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: int64(13 * _1MB), Data: dataBuff[:(_1MB / 2)]})
suite.assert.Nil(err)
suite.assert.Equal(n, int(_1MB/2))
suite.assert.True(h.Dirty())

// Write remaining data backwords so that last block is staged first

Check failure on line 2859 in component/block_cache/block_cache_test.go

View workflow job for this annotation

GitHub Actions / Check for spelling errors

backwords ==> backwards
for i := 0; i < 12; i++ {

n, err := tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: int64(uint64(12-i) * _1MB), Data: dataBuff[:_1MB]})
suite.assert.Nil(err)
suite.assert.Equal(n, int(_1MB))
suite.assert.True(h.Dirty())
}

// Now Jump to 20thMB offset and write 500kb of data
n, err = tobj.blockCache.WriteFile(internal.WriteFileOptions{Handle: h, Offset: int64(20 * _1MB), Data: dataBuff[:(_1MB / 2)]})
suite.assert.Nil(err)
suite.assert.Equal(n, int(_1MB/2))
suite.assert.True(h.Dirty())

tobj.blockCache.FlushFile(internal.FlushFileOptions{Handle: h})

err = tobj.blockCache.CloseFile(internal.CloseFileOptions{Handle: h})
suite.assert.Nil(err)

_, err = os.Stat(storagePath)
suite.assert.Nil(err)
suite.assert.Equal(h.Size, int64((20*_1MB)+(_1MB/2)))
}

// In order for 'go test' to run this suite, we need to create
// a normal test function and pass our suite to suite.Run
func TestBlockCacheTestSuite(t *testing.T) {
Expand Down
Loading