Skip to content

Commit

Permalink
Add context propagation support
Browse files Browse the repository at this point in the history
In the current implementation, weaver doesn't allow the user to
propagate context information. We recommend users to define a struct
that encapsulates the metadata information and add it as an argument to
the method. However, more and more users are asking for an option to
propagate metadata information using the context. This request comes
especially from users that are using gRPC to communicate between their
services, and gRPC provides a way to propagate metadata information
using the context.

This PR enables the users to propagate metadata information as a
map[string]string.

```main.go
// To attach metadata with key "foo" and value "bar" to the context, you can do:
ctx := context.Background()
ctx = weavermetadata.NewContext(ctx, map[string]string{"foo": "bar"})

// To read the metadata value associated with a key "foo" in the context, you can do:
meta, found := weavermetadata.FromContext(ctx)
if found {
  value := meta["foo"]
}
```

[1] https://pkg.go.dev/google.golang.org/grpc/metadata
  • Loading branch information
rgrandl committed Jul 17, 2024
1 parent a89085a commit a91af38
Show file tree
Hide file tree
Showing 9 changed files with 563 additions and 33 deletions.
6 changes: 6 additions & 0 deletions godeps.txt
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,13 @@ github.com/ServiceWeaver/weaver/examples/collatz
flag
fmt
github.com/ServiceWeaver/weaver
github.com/ServiceWeaver/weaver/metadata
github.com/ServiceWeaver/weaver/runtime/codegen
go.opentelemetry.io/otel/codes
go.opentelemetry.io/otel/trace
log
net/http
os
reflect
strconv
strings
Expand Down Expand Up @@ -332,6 +334,7 @@ github.com/ServiceWeaver/weaver/internal/net/call
errors
fmt
github.com/ServiceWeaver/weaver/internal/traceio
github.com/ServiceWeaver/weaver/metadata
github.com/ServiceWeaver/weaver/runtime/codegen
github.com/ServiceWeaver/weaver/runtime/logging
github.com/ServiceWeaver/weaver/runtime/retry
Expand Down Expand Up @@ -695,6 +698,9 @@ github.com/ServiceWeaver/weaver/internal/weaver
sync/atomic
syscall
time
github.com/ServiceWeaver/weaver/metadata
context
strings
github.com/ServiceWeaver/weaver/metrics
github.com/ServiceWeaver/weaver/runtime/metrics
github.com/ServiceWeaver/weaver/runtime/protos
Expand Down
30 changes: 24 additions & 6 deletions internal/net/call/call.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,13 @@ import (

"github.com/ServiceWeaver/weaver/runtime/logging"
"github.com/ServiceWeaver/weaver/runtime/retry"
"github.com/ServiceWeaver/weaver/weavermetadata"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

const (
// Size of the header included in each message.
msgHeaderSize = 16 + 8 + traceHeaderLen // handler_key + deadline + trace_context
)
// Size of the header included in each message.
const msgHeaderSize = 16 + 8 + traceHeaderLen + metadataHeaderLen

// Connection allows a client to send RPCs.
type Connection interface {
Expand Down Expand Up @@ -405,6 +404,18 @@ func (rc *reconnectingConnection) callOnce(ctx context.Context, h MethodKey, arg
// Send trace information in the header.
writeTraceContext(ctx, hdr[24:])

// Send context metadata in the header.
var meta []byte
m, found := weavermetadata.FromContext(ctx)
if found {
meta = writeContextMetadata(m)
}
binary.LittleEndian.PutUint64(hdr[49:], uint64(len(meta)))
hdrSlice := hdr[:]
if len(meta) > 0 {
hdrSlice = append(hdrSlice, meta...)
}

rpc := &call{}
rpc.doneSignal = make(chan struct{})

Expand All @@ -413,7 +424,7 @@ func (rc *reconnectingConnection) callOnce(ctx context.Context, h MethodKey, arg
if err != nil {
return nil, err
}
if err := writeMessage(nc, &conn.wlock, requestMessage, rpc.id, hdr[:], arg, rc.opts.WriteFlattenLimit); err != nil {
if err := writeMessage(nc, &conn.wlock, requestMessage, rpc.id, hdrSlice, arg, rc.opts.WriteFlattenLimit); err != nil {
conn.shutdown("client send request", err)
conn.endCall(rpc)
return nil, fmt.Errorf("%w: %s", CommunicationError, err)
Expand Down Expand Up @@ -984,8 +995,15 @@ func (c *serverConnection) runHandler(hmap *HandlerMap, id uint64, msg []byte) {
}
}()

// Extract metadata context information if any.
metaLen := binary.LittleEndian.Uint64(msg[49:])
if metaLen > 0 {
meta := readContextMetadata(msg[msgHeaderSize : msgHeaderSize+metaLen])
ctx = weavermetadata.NewContext(ctx, meta)
}

// Call the handler passing it the payload.
payload := msg[msgHeaderSize:]
payload := msg[msgHeaderSize+metaLen:]
var err error
var result []byte
fn, ok := hmap.handlers[hkey]
Expand Down
48 changes: 48 additions & 0 deletions internal/net/call/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package call

import (
"strings"

"github.com/ServiceWeaver/weaver/runtime/codegen"
)

const metadataHeaderLen = 8 // metadata length

// writeContextMetadata serializes the context metadata (if any).
func writeContextMetadata(meta map[string]string) []byte {
enc := codegen.NewEncoder()
enc.Len(len(meta))
for k, v := range meta {
enc.String(strings.ToLower(k))
enc.String(v)
}
return enc.Data()
}

// readContextMetadata returns the context metadata (if any).
func readContextMetadata(meta []byte) map[string]string {
dec := codegen.NewDecoder(meta)
n := dec.Len()
res := make(map[string]string, n)
var k, v string
for i := 0; i < n; i++ {
k = strings.ToLower(dec.String())
v = dec.String()
res[k] = v
}
return res
}
2 changes: 1 addition & 1 deletion internal/net/call/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"go.opentelemetry.io/otel/trace"
)

const traceHeaderLen = 25
const traceHeaderLen = 25 // handler_key + deadline + trace_context

// writeTraceContext serializes the trace context (if any) contained in ctx
// into b.
Expand Down
67 changes: 67 additions & 0 deletions weavermetadata/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package weavermetadata define the structure of the metadata supported by weaver.
//
// Note that we allow metadata information to propagate as a map, where the keys
// and the values are strings. It the user wants to propagate values that are
// more complicated types, it's their responsibility to encode/decode these types
// to/from string values.
//
// Note that all keys are automatically lowercase. This ensures the user avoids
// mistakes when creating and retrieving values for the same key. E.g., the user
// can create a map with the key "Foo" but try to retrieve the value for the
// key "foo".
//
// How to:
//
// To attach metadata with key "foo" and value "bar" to the context, you can do:
//
// ctx := context.Background()
// ctx = weavermetadata.NewContext(ctx, map[string]string{"foo": "bar"})
//
// To read the metadata value associated with a key "foo" in the context, you can do:
//
// meta, found := weavermetadata.FromContext(ctx)
// if found {
// value := meta["foo"]
// }
package weavermetadata

import (
"context"
"strings"
)

// metaKey is an unexported type for the key that stores the metadata.
type metaKey struct{}

// NewContext returns a new context that carries metadata meta.
func NewContext(ctx context.Context, meta map[string]string) context.Context {
return context.WithValue(ctx, metaKey{}, meta)
}

// FromContext returns the metadata value stored in ctx, if any.
func FromContext(ctx context.Context) (map[string]string, bool) {
meta, ok := ctx.Value(metaKey{}).(map[string]string)
if !ok {
return nil, false
}
out := make(map[string]string, len(meta))
for k, v := range meta {
key := strings.ToLower(k)
out[key] = v
}
return out, true
}
94 changes: 94 additions & 0 deletions weavermetadata/metadata_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package weavermetadata

import (
"context"
"reflect"
"testing"
)

func TestContextMetadata(t *testing.T) {
type testCase struct {
name string
meta map[string]string
isMetaExpected bool
want map[string]string
}
for _, test := range []testCase{
{
name: "no metadata",
},
{
name: "with empty metadata",
meta: map[string]string{},
isMetaExpected: false,
},
{
name: "with valid metadata",
meta: map[string]string{
"foo": "bar",
"baz": "waldo",
},
isMetaExpected: true,
want: map[string]string{
"foo": "bar",
"baz": "waldo",
},
},
{
name: "with valid metadata and uppercase keys",
meta: map[string]string{
"Foo": "bar",
"Baz": "waldo",
},
isMetaExpected: true,
want: map[string]string{
"foo": "bar",
"baz": "waldo",
},
},
{
name: "with valid metadata and uppercase values",
meta: map[string]string{
"Foo": "Bar",
"Baz": "Waldo",
},
isMetaExpected: true,
want: map[string]string{
"foo": "Bar",
"baz": "Waldo",
},
},
} {
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
if len(test.meta) > 0 {
ctx = NewContext(ctx, test.meta)
}

got, found := FromContext(ctx)
if !reflect.DeepEqual(found, test.isMetaExpected) {
t.Errorf("ExtractMetadata: expecting %v, got %v", test.isMetaExpected, found)
}
if !found {
return
}
if !reflect.DeepEqual(test.want, got) {
t.Errorf("ExtractMetadata: expecting %v, got %v", test.want, got)
}
})
}
}
25 changes: 23 additions & 2 deletions weavertest/internal/simple/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"sync"

"github.com/ServiceWeaver/weaver"
"github.com/ServiceWeaver/weaver/weavermetadata"
"golang.org/x/exp/maps"
)

//go:generate ../../../cmd/weaver/weaver generate
Expand All @@ -50,6 +52,8 @@ type Destination interface {
Record(_ context.Context, file, msg string) error
GetAll(_ context.Context, file string) ([]string, error)
RoutedRecord(_ context.Context, file, msg string) error
UpdateMetadata(_ context.Context) error
GetMetadata(_ context.Context) (map[string]string, error)
}

var (
Expand All @@ -67,7 +71,8 @@ func (r destRouter) RoutedRecord(_ context.Context, file, msg string) string {
type destination struct {
weaver.Implements[Destination]
weaver.WithRouter[destRouter]
mu sync.Mutex
mu sync.Mutex
metadata map[string]string
}

var pid = os.Getpid()
Expand All @@ -77,7 +82,7 @@ func (d *destination) Init(ctx context.Context) error {
return nil
}

func (d *destination) Getpid(_ context.Context) (int, error) {
func (d *destination) Getpid(context.Context) (int, error) {
return pid, nil
}

Expand Down Expand Up @@ -113,6 +118,22 @@ func (d *destination) GetAll(_ context.Context, file string) ([]string, error) {
return strings.Split(str, "\n"), nil
}

func (d *destination) UpdateMetadata(ctx context.Context) error {
d.mu.Lock()
defer d.mu.Unlock()
meta, found := weavermetadata.FromContext(ctx)
if found {
d.metadata = maps.Clone(meta)
}
return nil
}

func (d *destination) GetMetadata(_ context.Context) (map[string]string, error) {
d.mu.Lock()
defer d.mu.Unlock()
return d.metadata, nil
}

// Server is a component used to test Service Weaver listener handling.
// An HTTP server is started when this component is initialized.
// simple_test.go checks the functionality of the HTTP server by fetching
Expand Down
Loading

0 comments on commit a91af38

Please sign in to comment.