diff --git a/syncer/syncer.go b/syncer/syncer.go index 333641f..4fdece1 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -37,8 +37,9 @@ const ( BundleStatusCreatedOnChain = 2 BundleStatusSealedOnChain = 3 - LoopSleepTime = 10 * time.Millisecond - BSCPauseTime = 3 * time.Second + LoopSleepTime = 10 * time.Millisecond + LoopErrorPauseTime = 2 * time.Second + BSCPauseTime = 3 * time.Second ETHPauseTime = 90 * time.Second RPCTimeout = 20 * time.Second @@ -111,6 +112,7 @@ func (s *BlobSyncer) StartLoop() { for range syncTicker.C { if err = s.sync(); err != nil { logging.Logger.Errorf("failed to sync, err=%s", err.Error()) + time.Sleep(LoopErrorPauseTime) } } }() @@ -119,6 +121,7 @@ func (s *BlobSyncer) StartLoop() { for range verifyTicket.C { if err := s.verify(); err != nil { logging.Logger.Errorf("failed to verify, err=%s", err.Error()) + time.Sleep(LoopErrorPauseTime) } } }() @@ -197,27 +200,30 @@ func (s *BlobSyncer) sync() error { if err != nil { return err } - var dbErr error + if isForkedBlock { - dbErr = s.blobDao.SaveBlockAndBlob(&db.Block{ + err := s.blobDao.SaveBlockAndBlob(&db.Block{ Slot: blockID, BundleName: bundleName, }, nil) + if err != nil { + logging.Logger.Errorf("failed to save block(h=%d) to DB, err=%s", blockID, err.Error()) + return err + } } else { blockToSave, blobToSave, err := s.toBlockAndBlobs(block, sideCars, blockID, bundleName) - if err == nil { - err = s.blobDao.SaveBlockAndBlob(blockToSave, blobToSave) - if err == nil { - metrics.SyncedBlockIDGauge.Set(float64(blockID)) - logging.Logger.Infof("saved block(block_id=%d) and blobs(num=%d) to DB \n", blockID, len(blobToSave)) - } + if err != nil { + logging.Logger.Errorf("failed to convert to block and blobs, err=%s", err.Error()) + return err } - dbErr = err - } - if dbErr != nil { - logging.Logger.Errorf("failed to save block(h=%d) to DB, err=%s", blockID, dbErr.Error()) - return dbErr + if err = s.blobDao.SaveBlockAndBlob(blockToSave, blobToSave); err != nil { + logging.Logger.Errorf("failed to save block(h=%d) to DB, err=%s", blockID, err.Error()) + return err + } + logging.Logger.Infof("saved block(block_id=%d) and blobs(num=%d) to DB \n", blockID, len(blobToSave)) } + metrics.SyncedBlockIDGauge.Set(float64(blockID)) + // update the block status to processed if blockID == s.bundleDetail.finalizeBlockID { // init next bundle startBlockID := blockID + 1 @@ -240,12 +246,18 @@ func (s *BlobSyncer) process(bundleName string, blockID uint64, sidecars []*type return err } } - if err = s.writeBlobToFile(blockID, bundleName, sidecars); err != nil { - return err + // for idempotent + _, err = os.Stat(s.getBundleDir(bundleName)) + if !os.IsNotExist(err) { + if err = s.writeBlobToFile(blockID, bundleName, sidecars); err != nil { + return err + } } if blockID == s.bundleDetail.finalizeBlockID { + // this is idempotent err = s.finalizeCurBundle(bundleName) if err != nil { + logging.Logger.Errorf("failed to finalize bundle, bundle=%s, err=%s", bundleName, err.Error()) return err } logging.Logger.Infof("finalized bundle, bundle_name=%s, bucket_name=%s\n", bundleName, s.getBucketName()) @@ -298,14 +310,8 @@ func (s *BlobSyncer) finalizeBundle(bundleName, bundleDir, bundleFilePath string return err } } - err = os.RemoveAll(bundleDir) - if err != nil { - return err - } - err = os.Remove(bundleFilePath) - if err != nil && !os.IsNotExist(err) { - return err - } + os.RemoveAll(bundleDir) + os.Remove(bundleFilePath) return s.blobDao.UpdateBundleStatus(bundleName, db.Finalized) } @@ -395,7 +401,9 @@ func (s *BlobSyncer) toBlockAndBlobs(blockResp *structs.GetBlockV2Response, side blobsReturn := make([]*db.Blob, 0) populateBlobTxDetails := func(blockNum uint64) error { - elBlock, err := s.client.BlockByNumber(context.Background(), big.NewInt(int64(blockNum))) + ctx, cancel := context.WithTimeout(context.Background(), RPCTimeout) + defer cancel() + elBlock, err := s.client.BlockByNumber(ctx, big.NewInt(int64(blockNum))) if err != nil { return fmt.Errorf("failed to get block at height %d, err=%s", blockNum, err.Error()) } @@ -415,7 +423,9 @@ func (s *BlobSyncer) toBlockAndBlobs(blockResp *structs.GetBlockV2Response, side switch { case s.BSCChain(): - header, err := s.client.GetBlockHeader(context.Background(), blockNumOrSlot) + ctx, cancel := context.WithTimeout(context.Background(), RPCTimeout) + defer cancel() + header, err := s.client.GetBlockHeader(ctx, blockNumOrSlot) if err != nil { return nil, nil, err } @@ -538,7 +548,9 @@ func (s *BlobSyncer) ETHChain() bool { func (s *BlobSyncer) GetParams() (*cmn.VersionedParams, error) { if s.params == nil { - params, err := s.chainClient.GetParams(context.Background()) + ctx, cancel := context.WithTimeout(context.Background(), RPCTimeout) + defer cancel() + params, err := s.chainClient.GetParams(ctx) if err != nil { logging.Logger.Errorf("failed to get params, err=%s", err.Error()) return nil, err