Skip to content

Commit

Permalink
Merge pull request #2 from ethpandaops/feat/beacon-chain-stop
Browse files Browse the repository at this point in the history
feat(beacon-chain): add graceful stop
  • Loading branch information
Savid authored Mar 2, 2023
2 parents bd7a26f + a5cfaf5 commit e2e676d
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 15 deletions.
51 changes: 37 additions & 14 deletions beacon_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ type EthereumBeaconChain struct {

epochChangedCallbacks []func(current Epoch)
slotChangedCallbacks []func(current Slot)

slotCh chan struct{}
epochCh chan struct{}
}

func NewEthereumBeaconChain(genesis time.Time, durationPerSlot time.Duration, slotsPerEpoch uint64) *EthereumBeaconChain {
Expand All @@ -19,30 +22,43 @@ func NewEthereumBeaconChain(genesis time.Time, durationPerSlot time.Duration, sl

epochChangedCallbacks: []func(current Epoch){},
slotChangedCallbacks: []func(current Slot){},

slotCh: make(chan struct{}),
epochCh: make(chan struct{}),
}

go func() {
for {
slot := e.slots.Current()

time.Sleep(slot.TimeWindow().End().Sub(time.Now()))

slot = e.slots.Current()
for _, callback := range e.slotChangedCallbacks {
go callback(slot)
select {
case <-e.slotCh:
return
default:
slot := e.slots.Current()

time.Sleep(time.Until(slot.TimeWindow().End()))

slot = e.slots.Current()
for _, callback := range e.slotChangedCallbacks {
go callback(slot)
}
}
}
}()

go func() {
for {
epoch := e.epochs.Current()

time.Sleep(epoch.TimeWindow().End().Sub(time.Now()))

epoch = e.epochs.Current()
for _, callback := range e.epochChangedCallbacks {
go callback(epoch)
select {
case <-e.epochCh:
return
default:
epoch := e.epochs.Current()

time.Sleep(time.Until(epoch.TimeWindow().End()))

epoch = e.epochs.Current()
for _, callback := range e.epochChangedCallbacks {
go callback(epoch)
}
}
}
}()
Expand Down Expand Up @@ -79,3 +95,10 @@ func (e *EthereumBeaconChain) OnEpochChanged(callback func(current Epoch)) {
func (e *EthereumBeaconChain) OnSlotChanged(callback func(current Slot)) {
e.slotChangedCallbacks = append(e.slotChangedCallbacks, callback)
}

func (e *EthereumBeaconChain) Stop() {
e.slotCh <- struct{}{}
e.epochCh <- struct{}{}
close(e.slotCh)
close(e.epochCh)
}
4 changes: 3 additions & 1 deletion beacon_chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestBeaconChainEventCallbacks(t *testing.T) {
slotCallbacks++
})

time.Sleep(5 * time.Second)
time.Sleep(5100 * time.Millisecond)

if epochCallbacks != 2 {
t.Errorf("incorrect number of epoch callbacks: got %v, want %v", epochCallbacks, 2)
Expand All @@ -30,4 +30,6 @@ func TestBeaconChainEventCallbacks(t *testing.T) {
t.Errorf("incorrect number of slot callbacks: got %v, want %v", slotCallbacks, 5)
}
})

beacon.Stop()
}

0 comments on commit e2e676d

Please sign in to comment.