diff --git a/pgqueue/subscriber.go b/pgqueue/subscriber.go index 61cf7a1..9c1bd2b 100644 --- a/pgqueue/subscriber.go +++ b/pgqueue/subscriber.go @@ -47,7 +47,7 @@ func NewSubscriber[T any](db *sql.DB) *Subscriber[T] { // Subscribe subscribes to a queue and processes messages using the provided handler. // It accepts a context (context.Context), a queue name (queueName), and a message handler (handler). // The handler function processes messages of type T and returns an error if processing fails. -func (s *Subscriber[T]) Subscribe(ctx context.Context, queueName string, handler func(payload T) error) error { +func (s *Subscriber[T]) Subscribe(ctx context.Context, queueName string, handler func(payload T, meta pgq.Metadata) error) error { if err := InitQueue(s.db, queueName); err != nil { return fmt.Errorf("failed to initialize queue: %w", err) } @@ -62,14 +62,14 @@ func (s *Subscriber[T]) Subscribe(ctx context.Context, queueName string, handler return consumer.Run(ctx) } -func (s *Subscriber[T]) baseHandler(handler func(payload T) error) pgq.MessageHandler { +func (s *Subscriber[T]) baseHandler(handler func(payload T, metadata pgq.Metadata) error) pgq.MessageHandler { return pgq.MessageHandlerFunc(func(ctx context.Context, mi *pgq.MessageIncoming) (bool, error) { var payload T if err := json.Unmarshal(mi.Payload, &payload); err != nil { return false, fmt.Errorf("failed to unmarshal payload: %w", err) } - if err := handler(payload); err != nil { + if err := handler(payload, mi.Metadata); err != nil { return false, fmt.Errorf("handler failed: %w", err) }