Skip to content

Commit

Permalink
add metadata to handling paramteres
Browse files Browse the repository at this point in the history
  • Loading branch information
kubitre committed Jan 26, 2025
1 parent fe3e55c commit 7603a7f
Showing 1 changed file with 3 additions and 3 deletions.
6 changes: 3 additions & 3 deletions pgqueue/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func NewSubscriber[T any](db *sql.DB) *Subscriber[T] {
// Subscribe subscribes to a queue and processes messages using the provided handler.
// It accepts a context (context.Context), a queue name (queueName), and a message handler (handler).
// The handler function processes messages of type T and returns an error if processing fails.
func (s *Subscriber[T]) Subscribe(ctx context.Context, queueName string, handler func(payload T) error) error {
func (s *Subscriber[T]) Subscribe(ctx context.Context, queueName string, handler func(payload T, meta pgq.Metadata) error) error {
if err := InitQueue(s.db, queueName); err != nil {
return fmt.Errorf("failed to initialize queue: %w", err)
}
Expand All @@ -62,14 +62,14 @@ func (s *Subscriber[T]) Subscribe(ctx context.Context, queueName string, handler
return consumer.Run(ctx)
}

func (s *Subscriber[T]) baseHandler(handler func(payload T) error) pgq.MessageHandler {
func (s *Subscriber[T]) baseHandler(handler func(payload T, metadata pgq.Metadata) error) pgq.MessageHandler {
return pgq.MessageHandlerFunc(func(ctx context.Context, mi *pgq.MessageIncoming) (bool, error) {
var payload T
if err := json.Unmarshal(mi.Payload, &payload); err != nil {
return false, fmt.Errorf("failed to unmarshal payload: %w", err)
}

if err := handler(payload); err != nil {
if err := handler(payload, mi.Metadata); err != nil {
return false, fmt.Errorf("handler failed: %w", err)
}

Expand Down

0 comments on commit 7603a7f

Please sign in to comment.