Skip to content

Commit

Permalink
Changed atomic lock to use channel, adjusted logging
Browse files Browse the repository at this point in the history
  • Loading branch information
alarso16 committed Feb 28, 2025
1 parent ed8bef6 commit 0dbbd8f
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 8 deletions.
2 changes: 1 addition & 1 deletion plugin/evm/statesync/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (d *Downloader) SnapSync() error {
case newPivot := <-d.newPivot:
// If a new pivot block is found, cancel the current state sync and
// start a new one.
log.Debug("Pivot block updated to", "hash", d.pivotBlock.Root(), "height", d.pivotBlock.NumberU64())
log.Debug("Pivot block updated to", "hash", d.Pivot().Root(), "height", d.Pivot().NumberU64())
sync.Cancel()
sync = d.syncState(newPivot.Root())
}
Expand Down
21 changes: 14 additions & 7 deletions plugin/evm/syncervm_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,14 @@ type stateSyncerClient struct {
// Dynamic sync
syncing utils.Atomic[bool]
dl *ethstatesync.Downloader
downloaderLock sync.Mutex // to prevent writing during atomic sync
downloaderLock sync.Mutex
atomicDone chan struct{} // to prevent writing during atomic sync
}

func NewStateSyncClient(config *stateSyncClientConfig) StateSyncClient {
return &stateSyncerClient{
stateSyncClientConfig: config,
atomicDone: make(chan struct{}),
}
}

Expand Down Expand Up @@ -120,16 +122,24 @@ type Syncer interface {
// AsyncReceive returns true if the client is ready to receive a message from the engine
// Should return true if syncing and useUpstream is true, i.e. currently dynamicaling syncing
func (client *stateSyncerClient) AsyncReceive() bool {
// Block until atomic sync is completed for bootstrapping
// Block until atomic sync is completed for bootstrapping and after sync completes until blockchain updates
client.downloaderLock.Lock()
client.downloaderLock.Unlock()
return client.useUpstream && client.syncing.Get() && client.dl != nil
}

func (client *stateSyncerClient) QueueBlockOrPivot(b *Block, req ethstatesync.SyncBlockRequest) error {
return client.dl.QueueBlockOrPivot(b.ethBlock, req, getSyncBlockHandler(b, req))
// Wait for atomic sync to be done prior to queueing
<-client.atomicDone
err := client.dl.QueueBlockOrPivot(b.ethBlock, req, getSyncBlockHandler(b, req))
if err != nil {
log.Error("Queue failed", "error", err)
}
return err
}

// Depending on the request type, returns the hook to properly handle the block
// If final, will run normal operation. Otherwise, will only perform atomic ops
func getSyncBlockHandler(b *Block, req ethstatesync.SyncBlockRequest) func(bool) error {
switch req {
case ethstatesync.AcceptSyncBlockRequest:
Expand Down Expand Up @@ -203,7 +213,6 @@ func (client *stateSyncerClient) stateSync(ctx context.Context) error {
if err := client.syncAtomicTrie(ctx); err != nil {
return err
}
client.downloaderLock.Unlock()

return client.syncStateTrie(ctx)
}
Expand Down Expand Up @@ -251,9 +260,6 @@ func (client *stateSyncerClient) acceptSyncSummary(proposedSummary message.SyncS

log.Info("Starting state sync", "summary", proposedSummary)

// Lock the atomic trie to prevent pivots during the atomic sync from dynamic syncing
client.downloaderLock.Lock()

// create a cancellable ctx for the state sync goroutine
ctx, cancel := context.WithCancel(context.Background())
client.cancel = cancel
Expand Down Expand Up @@ -373,6 +379,7 @@ func (client *stateSyncerClient) syncAtomicTrie(ctx context.Context) error {
return err
}
err = <-atomicSyncer.Done()
close(client.atomicDone)
log.Info("atomic tx: sync finished", "root", client.syncSummary.AtomicRoot, "err", err)
return err
}
Expand Down

0 comments on commit 0dbbd8f

Please sign in to comment.