From 12424ca8b693d361da12425e580c3954743ba9fb Mon Sep 17 00:00:00 2001 From: Emanuel Pargov Date: Wed, 19 Feb 2025 19:47:41 +0200 Subject: [PATCH] Add block request (#268) * Add block request --- internal/chain/service.go | 238 ++++++++++++++++++++++++++ internal/chain/service_test.go | 77 +++++++++ pkg/network/handlers/block_request.go | 153 +++++++++++------ pkg/network/log.go | 60 +++++++ pkg/network/peer/node.go | 26 ++- 5 files changed, 495 insertions(+), 59 deletions(-) create mode 100644 internal/chain/service.go create mode 100644 internal/chain/service_test.go create mode 100644 pkg/network/log.go diff --git a/internal/chain/service.go b/internal/chain/service.go new file mode 100644 index 0000000..5611ec3 --- /dev/null +++ b/internal/chain/service.go @@ -0,0 +1,238 @@ +package chain + +import ( + "errors" + "fmt" + "sync" + "time" + + "github.com/eigerco/strawberry/internal/block" + "github.com/eigerco/strawberry/internal/crypto" + "github.com/eigerco/strawberry/internal/jamtime" + "github.com/eigerco/strawberry/internal/store" + "github.com/eigerco/strawberry/pkg/db/pebble" + "github.com/eigerco/strawberry/pkg/network" +) + +// BlockService manages the node's view of the blockchain state, including: +// - Known leaf blocks (blocks with no known children) +// - Latest finalized block +// - Block storage and retrieval +// +// It handles block announcements according to UP 0 protocol specification, +// maintaining the set of leaf blocks and tracking finalization status. +type BlockService struct { + Mu sync.RWMutex + KnownLeaves map[crypto.Hash]jamtime.Timeslot // Maps leaf block hashes to their timeslots + LatestFinalized LatestFinalized // Tracks the most recently finalized block + Store *store.Chain // Persistent block storage +} + +// LatestFinalized represents the latest finalized block in the chain. +// A block is considered finalized when it has a chain of 5 descendant blocks +// built on top of it according to the finalization rules. +type LatestFinalized struct { + Hash crypto.Hash // Hash of the finalized block + TimeSlotIndex jamtime.Timeslot // Timeslot of the finalized block +} + +// Leaf represents a block with no known children (a tip of the chain). +// The BlockService tracks all known leaves to implement the UP 0 protocol's +// requirement of announcing all leaves in handshake messages. +type Leaf struct { + Hash crypto.Hash // Hash of the leaf block + TimeSlotIndex jamtime.Timeslot // Timeslot of the leaf block +} + +// NewBlockService initializes a new BlockService with: +// - Empty leaf block set +// - Persistent block storage using PebbleDB +// - Genesis block as the latest finalized block +func NewBlockService() (*BlockService, error) { + kvStore, err := pebble.NewKVStore() + if err != nil { + return nil, err + } + chain := store.NewChain(kvStore) + bs := &BlockService{ + Store: chain, + KnownLeaves: make(map[crypto.Hash]jamtime.Timeslot), + } + // Initialize by finding leaves and finalized block + if err := bs.initializeState(); err != nil { + // Log error but continue - we can recover state as we process blocks + fmt.Printf("Failed to initialize block manager state: %v\n", err) + } + return bs, nil +} + +// initializeState sets up the initial blockchain state: +// 1. Creates and stores the genesis block +// 2. Sets genesis as the latest finalized block +// +// TODO: This is still a `mock` implementation. +func (bs *BlockService) initializeState() error { + // For now use genesis block + genesisHeader := block.Header{ + ParentHash: crypto.Hash{1}, + TimeSlotIndex: jamtime.Timeslot(1), + BlockAuthorIndex: 0, + } + hash, err := genesisHeader.Hash() + fmt.Printf("hash: %v\n", hash) + if err != nil { + return fmt.Errorf("failed to hash genesis block: %w", err) + } + if err := bs.Store.PutHeader(genesisHeader); err != nil { + return fmt.Errorf("failed to store genesis block: %w", err) + } + b := block.Block{ + Header: genesisHeader, + } + if err := bs.Store.PutBlock(b); err != nil { + return fmt.Errorf("failed to store genesis block: %w", err) + } + bs.Mu.Lock() + defer bs.Mu.Unlock() + bs.LatestFinalized = LatestFinalized{ + Hash: hash, + TimeSlotIndex: genesisHeader.TimeSlotIndex, + } + return nil +} + +// checkFinalization determines if a block can be finalized by: +// 1. Walking back 5 generations from the given block hash +// 2. If a complete chain of 5 blocks exists, finalizing the oldest block +// 3. Updating the latest finalized pointer +// 4. Removing the finalized block from the leaf set if present +// +// Returns nil if finalization check succeeds, error if any operations fail. +// Note: May return nil even if finalization isn't possible (e.g., missing ancestors). +// This is due to genesis block handling and is not considered an error. +func (bs *BlockService) checkFinalization(hash crypto.Hash) error { + // Start from current header and walk back 6 generations + currentHash := hash + var ancestorChain []block.Header + + // Walk back 6`` generations + for i := 0; i < 6; i++ { + header, err := bs.Store.GetHeader(currentHash) + if err != nil { + if errors.Is(err, store.ErrHeaderNotFound) { + // If we can't find a parent, we can't finalize + return nil + } + return fmt.Errorf("failed to get header in chain: %w", err) + } + + ancestorChain = append(ancestorChain, header) + currentHash = header.ParentHash + } + + // Get the oldest header (the one we'll finalize) + finalizeHeader := ancestorChain[len(ancestorChain)-1] + finalizeHash, err := finalizeHeader.Hash() + if err != nil { + return fmt.Errorf("failed to hash header during finalization: %w", err) + } + + bs.RemoveLeaf(finalizeHash) + bs.UpdateLatestFinalized(finalizeHash, finalizeHeader.TimeSlotIndex) + + return nil +} + +// HandleNewHeader processes a new block header announcement by: +// 1. Storing the header in persistent storage +// 2. Updating the leaf block set (removing parent, adding new block) +// 3. Checking if the parent block can now be finalized +// +// This implements the core block processing logic required by UP 0 protocol, +// maintaining the node's view of chain tips and finalization status. +func (bs *BlockService) HandleNewHeader(header *block.Header) error { + // Get the header hash + hash, err := header.Hash() + if err != nil { + return fmt.Errorf("failed to hash header: %w", err) + } + // Need to verify this block is a descendant of latest finalized block + // before considering it as a potential leaf + isDescendant, err := bs.isDescendantOfFinalized(header) + if err != nil { + return fmt.Errorf("failed to check if block is descendant of finalized: %w", err) + } + if !isDescendant { + return fmt.Errorf("block %s is not a descendant of latest finalized block", hash) + } + + // First store the header + if err := bs.Store.PutHeader(*header); err != nil { + return fmt.Errorf("failed to store header: %w", err) + } + + // Only update leaves if this is a descendant of finalized block + bs.RemoveLeaf(header.ParentHash) + bs.AddLeaf(hash, header.TimeSlotIndex) + + // Check if this creates a finalization condition starting from parent + if err := bs.checkFinalization(header.ParentHash); err != nil { + // Log but don't fail on finalization check errors + fmt.Printf("Failed to check finalization: %v\n", err) + } + + return nil +} + +// UpdateLatestFinalized updates the latest finalized block pointer. +func (bs *BlockService) UpdateLatestFinalized(hash crypto.Hash, slot jamtime.Timeslot) { + bs.Mu.Lock() + defer bs.Mu.Unlock() + bs.LatestFinalized = LatestFinalized{Hash: hash, TimeSlotIndex: slot} + network.LogBlockEvent(time.Now(), "finalizing", hash, slot.ToEpoch(), slot) +} + +// AddLeaf adds a block to the set of known leaves. +func (bs *BlockService) AddLeaf(hash crypto.Hash, slot jamtime.Timeslot) { + bs.Mu.Lock() + defer bs.Mu.Unlock() + bs.KnownLeaves[hash] = slot +} + +// RemoveLeaf removes a block from the set of known leaves. +func (bs *BlockService) RemoveLeaf(hash crypto.Hash) { + bs.Mu.Lock() + defer bs.Mu.Unlock() + delete(bs.KnownLeaves, hash) +} + +// isDescendantOfFinalized checks if a block is a descendant of the latest finalized block +// by walking back through its ancestors until we either: +// - Find the latest finalized block (true) +// - Find a different block at the same height as latest finalized (false) +// - Can't find a parent (error) +func (bs *BlockService) isDescendantOfFinalized(header *block.Header) (bool, error) { + bs.Mu.RLock() + finalizedSlot := bs.LatestFinalized.TimeSlotIndex + finalizedHash := bs.LatestFinalized.Hash + bs.Mu.RUnlock() + + current := header + for current.TimeSlotIndex > finalizedSlot { + parent, err := bs.Store.GetHeader(current.ParentHash) + if err != nil { + return false, fmt.Errorf("failed to get parent block: %w", err) + } + current = &parent + } + + // If we found the finalized block, this is a descendant + if current.TimeSlotIndex == finalizedSlot { + currentHash, err := current.Hash() + if err != nil { + return false, err + } + return currentHash == finalizedHash, nil + } + return false, nil +} diff --git a/internal/chain/service_test.go b/internal/chain/service_test.go new file mode 100644 index 0000000..8ed88eb --- /dev/null +++ b/internal/chain/service_test.go @@ -0,0 +1,77 @@ +package chain + +import ( + "testing" + + "github.com/eigerco/strawberry/internal/block" + "github.com/eigerco/strawberry/internal/crypto" + "github.com/eigerco/strawberry/internal/jamtime" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCheckFinalization(t *testing.T) { + // Create a new BlockService + bs, err := NewBlockService() + require.NoError(t, err) + + // Create a chain of 7 headers (0->1->2->3->4->5->6) + // Block 1 will have 5 descendants (2,3,4,5,6) and should be finalized + var headers []block.Header + var hashes []crypto.Hash + parentHash := crypto.Hash{} // Zero hash for first block + + // Create and store headers + for i := uint32(0); i < 7; i++ { + header := block.Header{ + TimeSlotIndex: jamtime.Timeslot(i), + ParentHash: parentHash, + } + + // Store the header + err := bs.Store.PutHeader(header) + require.NoError(t, err) + + // Get and store the hash for next iteration + hash, err := header.Hash() + require.NoError(t, err) + parentHash = hash + + headers = append(headers, header) + hashes = append(hashes, hash) + } + + // Add some headers as leaves + bs.AddLeaf(hashes[6], 6) // Last one is a leaf + + // Try to finalize using the last header + err = bs.checkFinalization(hashes[6]) + require.NoError(t, err) + + // Verify: + // 1. Header[1] should be finalized (having 5 descendants: 2,3,4,5,6) + // 2. Leaves should be updated + // 3. LatestFinalized should be set to header[1] + + // Check LatestFinalized + assert.Equal(t, hashes[1], bs.LatestFinalized.Hash) + assert.Equal(t, headers[1].TimeSlotIndex, bs.LatestFinalized.TimeSlotIndex) + + // Check leaves - only block 6 should remain as leaf + _, exists5 := bs.KnownLeaves[hashes[5]] + _, exists6 := bs.KnownLeaves[hashes[6]] + assert.False(t, exists5, "hash[5] should not be a leaf") + assert.True(t, exists6, "hash[6] should still be a leaf") + assert.Equal(t, 1, len(bs.KnownLeaves), "should only have one leaf") + + // Try finalizing again with same hash - should not change anything + prevFinalized := bs.LatestFinalized + err = bs.checkFinalization(hashes[6]) + require.NoError(t, err) + assert.Equal(t, prevFinalized, bs.LatestFinalized, "finalization should not change on second attempt") + + // Try finalizing with non-existent hash + err = bs.checkFinalization(crypto.Hash{1, 2, 3}) + require.NoError(t, err) // Should return nil error as per our implementation + assert.Equal(t, prevFinalized, bs.LatestFinalized, "finalization should not change with invalid hash") +} diff --git a/pkg/network/handlers/block_request.go b/pkg/network/handlers/block_request.go index 7b66338..b85984b 100644 --- a/pkg/network/handlers/block_request.go +++ b/pkg/network/handlers/block_request.go @@ -2,81 +2,127 @@ package handlers import ( "context" - "encoding/binary" "fmt" - "io" + "github.com/eigerco/strawberry/internal/block" + "github.com/eigerco/strawberry/internal/chain" "github.com/eigerco/strawberry/internal/crypto" + "github.com/eigerco/strawberry/pkg/serialization/codec/jam" "github.com/quic-go/quic-go" ) -// BlockRequestHandler processes incoming block requests from other peers. -// It implements the StreamHandler interface. -type BlockRequestHandler struct{} +// BlockRequestHandler processes CE 128 block request streams from peers. +// It implements protocol specification section "CE 128: Block request". +// Block requests allow peers to request sequences of blocks either: +// - Ascending from a given block (exclusive of the block itself) +// - Descending from a given block (inclusive of the block itself) +type BlockRequestHandler struct { + blockService *chain.BlockService +} // NewBlockRequestHandler creates a new handler for processing block requests. -func NewBlockRequestHandler() *BlockRequestHandler { - return &BlockRequestHandler{} +// It requires a BlockService to fetch requested blocks from storage. +func NewBlockRequestHandler(blockService *chain.BlockService) *BlockRequestHandler { + return &BlockRequestHandler{ + blockService: blockService, + } +} + +// blockRequestMessage represents the wire format for block requests. +// As per protocol spec: +// - Header Hash: Starting point for block sequence +// - Direction: 0 for ascending (exclusive), 1 for descending (inclusive) +// - MaxBlocks: Maximum number of blocks to return +type blockRequestMessage struct { + Hash crypto.Hash + Direction byte // 0 for ascending, 1 for descending + MaxBlocks uint32 } -// HandleStream processes an incoming block request stream. -// The expected message format is: -// - 32 bytes: Header hash -// - 1 byte: Direction (0 for ascending, 1 for descending) -// - 4 bytes: Maximum number of blocks (little-endian uint32) +// HandleStream processes an incoming block request stream according to CE 128 protocol. +// Message format: +// - Header hash (32 bytes): Starting block hash +// - Direction (1 byte): 0 for ascending exclusive, 1 for descending inclusive +// - Maximum blocks (4 bytes): Little-endian uint32 maximum blocks to return // -// Returns an error if: -// - Reading the request fails -// - The message format is invalid -// - Writing the response fails +// Response format: +// - Length-prefixed sequence of encoded blocks +// - Stream is closed with FIN bit set after response +// +// The response sequence starts from the given block hash and follows the chain +// either forward (for ascending) or backward (for descending), limited by MaxBlocks. +// For ascending requests, the sequence starts with a child of the given block. +// For descending requests, the sequence starts with the given block itself. func (h *BlockRequestHandler) HandleStream(ctx context.Context, stream quic.Stream) error { - fmt.Println("Received block request, reading request message...") - // Read the request message msg, err := ReadMessageWithContext(ctx, stream) if err != nil { - return fmt.Errorf("failed to read request message: %w", err) + return fmt.Errorf("read request message: %w", err) } - // Parse the message content into BlockRequestMessage + // Validate minimum message length: + // 32 bytes (hash) + 1 byte (direction) + 4 bytes (maxBlocks) = 37 bytes if len(msg.Content) < 37 { // 32 (hash) + 1 (direction) + 4 (maxBlocks) return fmt.Errorf("message too short") } + var maxBlocks uint32 + if err = jam.Unmarshal(msg.Content[33:37], &maxBlocks); err != nil { + return fmt.Errorf("unmarshal maxBlocks: %w", err) + } - request := BlockRequestMessage{ + // Parse fixed-size wire format + request := blockRequestMessage{ Direction: msg.Content[32], // After hash - MaxBlocks: binary.LittleEndian.Uint32(msg.Content[33:37]), + MaxBlocks: maxBlocks, } copy(request.Hash[:], msg.Content[:32]) - fmt.Printf("Got request for blocks: hash=%x, direction=%d, maxBlocks=%d\n", - request.Hash, request.Direction, request.MaxBlocks) + // Fetch block sequence based on direction + ascending := request.Direction == 0 + blocks, err := h.blockService.Store.GetBlockSequence(request.Hash, ascending, request.MaxBlocks) + if err != nil { + return fmt.Errorf("get blocks: %w", err) + } - // Test data: Pretend to send some blocks - response := []byte("test block response") + // Marshal all blocks into a single response + response, err := jam.Marshal(blocks) + if err != nil { + return fmt.Errorf("marshal blocks: %w", err) + } if err := WriteMessageWithContext(ctx, stream, response); err != nil { - return fmt.Errorf("failed to write response message: %w", err) + return fmt.Errorf("write response message: %w", err) + } + // Close the stream to signal we're done writing (this sets the FIN bit) + if err := stream.Close(); err != nil { + return fmt.Errorf("close stream: %w", err) } - - fmt.Println("Sent block response") return nil } -// BlockRequester handles outgoing block requests to peers. +// BlockRequester handles outgoing CE 128 block requests to peers. +// It implements the client side of the block request protocol. type BlockRequester struct{} -// TODO: Implement the RequestBlocks function. This is not a complete implementation. -// RequestBlocks sends a request for blocks to a peer. +// RequestBlocks sends a block request to a peer and receives the response. // Parameters: // - ctx: Context for cancellation -// - stream: The stream to write requests and read responses -// - headerHash: Hash of the header to start from -// - ascending: If true, gets blocks after header, if false, gets blocks before +// - stream: QUIC stream for the request +// - headerHash: Hash of the starting block +// - ascending: If true, gets blocks after header (exclusive) +// If false, gets blocks before and including header +// - maxBlocks: Maximum number of blocks to request +// +// The request follows CE 128 protocol format: +// +// --> Header Hash (32 bytes) ++ Direction (1 byte) ++ Maximum Blocks (4 bytes LE) +// --> FIN +// <-- [Block] +// <-- FIN // // Returns: -// - Block data if successful -// - Error if request fails or response cannot be read -func (r *BlockRequester) RequestBlocks(ctx context.Context, stream io.ReadWriter, headerHash [32]byte, ascending bool) ([]byte, error) { +// - Sequence of blocks if successful +// - Error if request fails, response invalid, or context cancelled +func (r *BlockRequester) RequestBlocks(ctx context.Context, stream quic.Stream, headerHash [32]byte, ascending bool, maxBlocks uint32) ([]block.Block, error) { direction := byte(0) if !ascending { direction = 1 @@ -84,27 +130,32 @@ func (r *BlockRequester) RequestBlocks(ctx context.Context, stream io.ReadWriter content := make([]byte, 37) copy(content[:32], headerHash[:]) content[32] = direction - binary.LittleEndian.PutUint32(content[33:], 10) + maxBlocksBytes, err := jam.Marshal(maxBlocks) + if err != nil { + return nil, fmt.Errorf("marshal max blocks: %w", err) + } + copy(content[33:], maxBlocksBytes) // Write with context if err := WriteMessageWithContext(ctx, stream, content); err != nil { - return nil, fmt.Errorf("failed to write request: %w", err) + return nil, fmt.Errorf("write request: %w", err) + } + // Closes only the write direction (sets FIN on our side) + if err := stream.Close(); err != nil { + return nil, fmt.Errorf("close write: %w", err) } - // Read with context response, err := ReadMessageWithContext(ctx, stream) if err != nil { - return nil, fmt.Errorf("failed to read response: %w", err) + return nil, fmt.Errorf("read response: %w", err) } - fmt.Printf("Received response: %s\n", response.Content) - return response.Content, nil -} + // Unmarshal block sequence + var blocks []block.Block + err = jam.Unmarshal(response.Content, &blocks) + if err != nil { + return nil, fmt.Errorf("unmarshal response: %w", err) + } -// BlockRequestMessage represents a block request message. -// The message format matches the wire protocol specification. -type BlockRequestMessage struct { - Hash crypto.Hash - Direction byte // 0 for ascending, 1 for descending - MaxBlocks uint32 + return blocks, nil } diff --git a/pkg/network/log.go b/pkg/network/log.go new file mode 100644 index 0000000..d2c094f --- /dev/null +++ b/pkg/network/log.go @@ -0,0 +1,60 @@ +// TODO: Temp file for Demo purposes +package network + +import ( + "fmt" + "time" + + "github.com/eigerco/strawberry/internal/crypto" + "github.com/eigerco/strawberry/internal/jamtime" +) + +const ( + // ANSI color codes + Reset = "\033[0m" + Green = "\033[32m" + Blue = "\033[34m" + Yellow = "\033[33m" + Cyan = "\033[36m" + + // Unicode symbols + CheckMark = "✓" + Arrow = "→" + Plus = "+" + Download = "⇩" +) + +func LogBlockEvent(timestamp time.Time, eventType string, hash crypto.Hash, epoch jamtime.Epoch, slot jamtime.Timeslot) { + timeStr := timestamp.Format("15:04:05") + hashStr := fmt.Sprintf("%x", hash[:5]) + + var color, symbol string + + switch eventType { + case "finalizing": + color = Green + symbol = CheckMark + case "producing", "announcing": // Handle both producing and announcing + color = Blue + symbol = Plus + case "requesting": + color = Yellow + symbol = Arrow + case "imported": + color = Cyan + symbol = Download + } + + // Pad the event type to 10 characters to align output + paddedEventType := fmt.Sprintf("%-10s", eventType) + + fmt.Printf("[%s] %s%s %s: %s | Epoch: %d | Slot: %d%s\n", + timeStr, + color, + symbol, + paddedEventType, + hashStr, + epoch, + slot, + Reset) +} diff --git a/pkg/network/peer/node.go b/pkg/network/peer/node.go index 254a590..f67d5b0 100644 --- a/pkg/network/peer/node.go +++ b/pkg/network/peer/node.go @@ -10,6 +10,8 @@ import ( "sync" "time" + "github.com/eigerco/strawberry/internal/block" + "github.com/eigerco/strawberry/internal/chain" "github.com/eigerco/strawberry/internal/crypto" "github.com/eigerco/strawberry/pkg/network/cert" "github.com/eigerco/strawberry/pkg/network/handlers" @@ -22,6 +24,7 @@ import ( type Node struct { Context context.Context Cancel context.CancelFunc + blockService *chain.BlockService transport *transport.Transport protocolManager *protocol.Manager peersLock sync.RWMutex @@ -127,13 +130,21 @@ func NewNode(nodeCtx context.Context, listenAddr *net.UDPAddr, keys ValidatorKey IsBuilder: true, MaxBuilderSlots: 20, } + + // Create block service + bs, err := chain.NewBlockService() + if err != nil { + return nil, fmt.Errorf("failed to create block service: %w", err) + } + node.blockService = bs protoManager, err := protocol.NewManager(protoConfig) if err != nil { return nil, fmt.Errorf("failed to create protocol manager: %w", err) } // Register what type of streams the Node will support. - protoManager.Registry.RegisterHandler(protocol.StreamKindBlockRequest, handlers.NewBlockRequestHandler()) + protoManager.Registry.RegisterHandler(protocol.StreamKindBlockRequest, handlers.NewBlockRequestHandler(bs)) + node.blockRequester = &handlers.BlockRequester{} // Create transport transportConfig := transport.Config{ @@ -214,24 +225,23 @@ func (n *Node) ConnectToPeer(addr *net.UDPAddr) error { return nil } -// TODO somehwat of Mock atm. Will add full implementaion in the coming PR's. -func (n *Node) RequestBlock(ctx context.Context, hash crypto.Hash, ascending bool, peerKey ed25519.PublicKey) ([]byte, error) { +// RequestBlocks requests a block with the given hash from peers +func (n *Node) RequestBlocks(ctx context.Context, hash crypto.Hash, ascending bool, maxBlocks uint32, peerKey ed25519.PublicKey) ([]block.Block, error) { n.peersLock.RLock() - existingPeer := n.peersSet.GetByEd25519Key(peerKey) - n.peersLock.RUnlock() + defer n.peersLock.RUnlock() - if existingPeer != nil { + if existingPeer := n.peersSet.GetByEd25519Key(peerKey); existingPeer != nil { stream, err := existingPeer.ProtoConn.OpenStream(ctx, protocol.StreamKindBlockRequest) if err != nil { return nil, fmt.Errorf("failed to open stream: %w", err) } defer stream.Close() - blockData, err := n.blockRequester.RequestBlocks(ctx, stream, hash, ascending) + blocks, err := n.blockRequester.RequestBlocks(ctx, stream, hash, ascending, maxBlocks) if err != nil { return nil, fmt.Errorf("failed to request block from peer: %w", err) } - return blockData, nil + return blocks, nil } return nil, fmt.Errorf("no peers available to request block from") }