-
Notifications
You must be signed in to change notification settings - Fork 247
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add context propagation support #783
Conversation
@ghemawat do you mind taking a look? Tests are missing, but I would like to get your opinion on the APIs before I go ahead and polish the PR. |
a91af38
to
8225fbf
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Sanjay. Updated the PR to incorporate your feedback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
API looks good.
Can you update the commit message.
// component. Because the UpdateMetadata and GetMetadata methods are not | ||
// routed, we can end up updating the metadata at replica 0, and reading | ||
// the metadata from replica 1. To avoid this scenario we call the | ||
// UpdateMetadata method twice (one for each replica). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How are we sure that the two calls won't both go to the same replica?
Perhaps we should make it a bit better by calling it N times, where N is 10 for multi and 1 for others
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Done
if err := dst.UpdateMetadata(ctx); err != nil { | ||
t.Fatal(err) | ||
} | ||
if runner.Name == weavertest.Multi.Name { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The multi-replica comment belongs here, not above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
if err := dst.UpdateMetadata(ctx); err != nil { | ||
t.Fatal(err) | ||
} | ||
if runner.Name == weavertest.Multi.Name { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Repeated code. Maybe add a helper: setAndGetMetadata(map[string]string) map[string]string
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the code a little bit. Does it look better?
godeps.txt
Outdated
@@ -983,6 +984,9 @@ github.com/ServiceWeaver/weaver/sim/internal/bank | |||
go.opentelemetry.io/otel/codes | |||
go.opentelemetry.io/otel/trace | |||
reflect | |||
github.com/ServiceWeaver/weaver/weavermetadata |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggested rename of package "callmetadata".
Caller will look something like:
ctx = callmetadata.NewContext(ctx, map[string]string{...})
Or perhaps just "metadata", which is what gRPC does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Called it "metadata".
internal/net/call/call.go
Outdated
msgHeaderSize = 16 + 8 + traceHeaderLen // handler_key + deadline + trace_context | ||
) | ||
// Size of the header included in each message. | ||
const msgHeaderSize = 16 + 8 + traceHeaderLen + metadataHeaderLen |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Documentation of format should be updated above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't use a fixed header anymore, so all these constants are gone.
internal/net/call/metadata.go
Outdated
|
||
// readContextMetadata returns the context metadata (if any). | ||
func readContextMetadata(meta []byte) map[string]string { | ||
dec := codegen.NewDecoder(meta) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should be using codegen encoder/decoder to format the header completely. I.e., writeHeader that does something like:
put request id
put payload length
put trace boolean if present and then encode trace
put #metadata entries
put metadata entries
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
internal/net/call/call.go
Outdated
// Size of the header included in each message. | ||
msgHeaderSize = 16 + 8 + traceHeaderLen // handler_key + deadline + trace_context | ||
) | ||
// Size of the header included in each message. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is a format change, are there any versions do we need to update? Do we use "call.go" for any of our internal communication where the two sides may be at different versions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I think everything should work as expected
internal/net/call/trace.go
Outdated
@@ -20,7 +20,7 @@ import ( | |||
"go.opentelemetry.io/otel/trace" | |||
) | |||
|
|||
const traceHeaderLen = 25 | |||
const traceHeaderLen = 25 // handler_key + deadline + trace_context |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice if all of the header constants that depend on each other were described next to each other in the same constant block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
weavermetadata/metadata.go
Outdated
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
// Package weavermetadata define the structure of the metadata supported by weaver. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reword:
Package weavermetadata provides support for the propagation of metadata information from a component method caller to the callee. The metadata is propagated to the callee even if the caller and callee are not colocated in the same process.
The metadata is a map from string to string stored in context.Context. The map can be added to a context by calling NewContext.
... include example here ...
The metadata map can be extracted from a context by calling FromContext:
... include example here ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
weavermetadata/metadata.go
Outdated
// more complicated types, it's their responsibility to encode/decode these types | ||
// to/from string values. | ||
// | ||
// Note that all keys are automatically lowercase. This ensures the user avoids |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should drop this lower-casing I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
a6d09c3
to
b715910
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, Sanjay. Very good suggestions.
internal/net/call/call.go
Outdated
var hdr [msgHeaderSize]byte | ||
copy(hdr[0:], h[:]) | ||
enc := codegen.NewEncoder() | ||
enc.Bytes(h[:]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bytes() seems suitable for variable-length data - it encodes the length. How about using Encoder.Grow and Decoder.Read to write exactly the correct number of bytes.
It also means that we won't have to worry about the wrong number of bytes showing up at the reader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Changed.
internal/net/call/call.go
Outdated
// Extract trace context and create a new child span to trace the method | ||
// call on the server. | ||
span := trace.SpanFromContext(ctx) // noop span | ||
if sc := readTraceContext(dec); sc != nil && sc.IsValid() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if sc is invalid? Should we not fail the RPC with an error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. We are not failing the RPC right now if sc is invalid, but I think we should do that. Changed.
internal/net/call/call.go
Outdated
// Encode the length of the header. Note that the header can have variable length, | ||
// so we need to know at which offset in the encoded message starts the payload | ||
// when we decode the message at the receiver. | ||
enc.Int(len(enc.Data())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
enc.Uint? to avoid having to deal with negative number errors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to encode the length of the header based on your suggestions.
internal/net/call/call.go
Outdated
ctx = readContextMetadata(ctx, dec) | ||
|
||
// Compute the start offset of the payload in msg. | ||
payloadOffset := dec.Int() + 8 /*int64 that stores the length of the header*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to check that the decoded number is correct.
The more I look at it, the more I think that we should not try to store the encoded header length inside the encoded header. Instead we can do:
header_length : uint32
header : byte[header_length]
... rest is payload ...
So we read the first 4 bytes to get the header length, use a Decoder to parse just the header, and leave the rest as the payload.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed the encoding based on your suggestions.
"github.com/ServiceWeaver/weaver/runtime/logging" | ||
"github.com/ServiceWeaver/weaver/runtime/retry" | ||
"go.opentelemetry.io/otel/codes" | ||
"go.opentelemetry.io/otel/trace" | ||
) | ||
|
||
const ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a long comment in msg.go that explains the message format. That should be updated, but I don't see it in this commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Updated.
metadata/metadata.go
Outdated
if !ok { | ||
return nil, false | ||
} | ||
out := make(map[string]string, len(meta)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use maps.Clone instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Done
weavertest/internal/simple/simple.go
Outdated
@@ -28,6 +28,8 @@ import ( | |||
"sync" | |||
|
|||
"github.com/ServiceWeaver/weaver" | |||
"github.com/ServiceWeaver/weaver/metadata" | |||
"golang.org/x/exp/maps" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need x/exp/maps, or can we use the maps package in the standard library now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, replaced with "maps". Thank you
b715910
to
d478770
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, Sanjay
internal/net/call/call.go
Outdated
var hdr [msgHeaderSize]byte | ||
copy(hdr[0:], h[:]) | ||
enc := codegen.NewEncoder() | ||
enc.Bytes(h[:]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Changed.
internal/net/call/call.go
Outdated
// Encode the length of the header. Note that the header can have variable length, | ||
// so we need to know at which offset in the encoded message starts the payload | ||
// when we decode the message at the receiver. | ||
enc.Int(len(enc.Data())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to encode the length of the header based on your suggestions.
internal/net/call/call.go
Outdated
// Extract trace context and create a new child span to trace the method | ||
// call on the server. | ||
span := trace.SpanFromContext(ctx) // noop span | ||
if sc := readTraceContext(dec); sc != nil && sc.IsValid() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. We are not failing the RPC right now if sc is invalid, but I think we should do that. Changed.
internal/net/call/call.go
Outdated
ctx = readContextMetadata(ctx, dec) | ||
|
||
// Compute the start offset of the payload in msg. | ||
payloadOffset := dec.Int() + 8 /*int64 that stores the length of the header*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed the encoding based on your suggestions.
metadata/metadata.go
Outdated
if !ok { | ||
return nil, false | ||
} | ||
out := make(map[string]string, len(meta)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Done
weavertest/internal/simple/simple.go
Outdated
@@ -28,6 +28,8 @@ import ( | |||
"sync" | |||
|
|||
"github.com/ServiceWeaver/weaver" | |||
"github.com/ServiceWeaver/weaver/metadata" | |||
"golang.org/x/exp/maps" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, replaced with "maps". Thank you
"github.com/ServiceWeaver/weaver/runtime/logging" | ||
"github.com/ServiceWeaver/weaver/runtime/retry" | ||
"go.opentelemetry.io/otel/codes" | ||
"go.opentelemetry.io/otel/trace" | ||
) | ||
|
||
const ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Updated.
Sanjay, don't look at the PR yet. Some test is failing. |
In the current implementation, weaver doesn't allow the user to propagate context information. We recommend users to define a struct that encapsulates the metadata information and add it as an argument to the method. However, more and more users are asking for an option to propagate metadata information using the context. This request comes especially from users that are using gRPC to communicate between their services, and gRPC provides a way to propagate metadata information using the context. This PR enables the users to propagate metadata information as a map[string]string. ```main.go // To attach metadata with key "foo" and value "bar" to the context, you can do: ctx := context.Background() ctx = metadata.NewContext(ctx, map[string]string{"foo": "bar"}) // To read the metadata value associated with a key "foo" in the context, you can do: meta, found := metadata.FromContext(ctx) if found { value := meta["foo"] } ``` [1] https://pkg.go.dev/google.golang.org/grpc/metadata
d478770
to
492c104
Compare
Never mind. Everything works as expected. We have a flaky test but that's not something introduced by this change. |
internal/net/call/call.go
Outdated
|
||
// Note that we send the header and the payload as follows: | ||
// [header_length][encoded_header][payload] | ||
hdrSlice := make([]byte, 4) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unnecessary allocation. You can do:
var hdr [4]byte
binary...PutUint32(hdr[:], ...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
internal/net/call/call.go
Outdated
// Note that we send the header and the payload as follows: | ||
// [header_length][encoded_header][payload] | ||
hdrSlice := make([]byte, 4) | ||
binary.BigEndian.PutUint32(hdrSlice, uint32(len(enc.Data()))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why BigEndian? I think we are using LittleEndian everywhere else.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point.
internal/net/call/call.go
Outdated
// Extract request header from front of payload. | ||
if len(msg) < msgHeaderSize { | ||
msgLen := uint32(len(msg)) | ||
hdrLenEndOffset := uint32(4) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be a "const hdrLenEndOffset".
Also, maybe hdrLenLen would be a better name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
internal/net/call/call.go
Outdated
// call on the server. | ||
span := trace.SpanFromContext(ctx) // noop span | ||
if sc := readTraceContext(dec); sc != nil { | ||
if sc.IsValid() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about checking for error and returning early on error instead of placing the error-handling in the else branch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
internal/net/call/msg.go
Outdated
// traceContext [25]byte -- zero, or trace context | ||
// remainder -- call argument serialization | ||
// headerLen [4]byte -- length of the encoded header | ||
// header [length]byte -- encoded header information |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reword the type of header of [headerLen]byte
to match the name used above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
internal/net/call/msg.go
Outdated
// remainder -- call argument serialization | ||
// headerLen [4]byte -- length of the encoded header | ||
// header [length]byte -- encoded header information | ||
// headerKey [16]byte -- fingerprint of method name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The following part looks wrong? I think you need follow header by remainder and then extend the comment after that to say something like:
The header is encoded using Service Weaver's encoding format for a type that looks like:
struct {
...fill this in...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. Done
internal/net/call/msg.go
Outdated
// deadline [8]byte -- zero, or deadline in microseconds | ||
// traceContext [25]byte -- zero, or trace context | ||
// metadataContext [length]byte -- encoded map[string]string | ||
// remainder -- call argument serialization |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
payload would be a better name than remainder
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
weavertest/internal/simple/simple.go
Outdated
d.mu.Lock() | ||
defer d.mu.Unlock() | ||
meta, found := metadata.FromContext(ctx) | ||
if found { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can do:
if meta, found := ...; found {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
internal/net/call/call.go
Outdated
@@ -385,25 +381,36 @@ func (rc *reconnectingConnection) Call(ctx context.Context, h MethodKey, arg []b | |||
} | |||
|
|||
func (rc *reconnectingConnection) callOnce(ctx context.Context, h MethodKey, arg []byte, opts CallOptions) ([]byte, error) { | |||
var hdr [msgHeaderSize]byte | |||
copy(hdr[0:], h[:]) | |||
enc := codegen.NewEncoder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about moving the header encoding/decoding code into routines of their own. The code here can then be:
hdr := encodeHeader(ctx, h)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Thanks Sanjay |
internal/net/call/call.go
Outdated
var hdrLen [hdrLenLen]byte | ||
binary.LittleEndian.PutUint32(hdrLen[:], uint32(len(hdr))) | ||
hdrSlice := hdrLen[:] | ||
hdrSlice = append(hdrSlice, hdr...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just combine prev 2 lines?
hdrSlice := append(hdrLen[:], hdr...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
// The header is encoded using Service Weaver's encoding format for a type that | ||
// looks like: | ||
// | ||
// struct header { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line up the types just like "go fmt" would.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Sanjay. Looks great after your suggestions
internal/net/call/call.go
Outdated
var hdrLen [hdrLenLen]byte | ||
binary.LittleEndian.PutUint32(hdrLen[:], uint32(len(hdr))) | ||
hdrSlice := hdrLen[:] | ||
hdrSlice = append(hdrSlice, hdr...) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
// The header is encoded using Service Weaver's encoding format for a type that | ||
// looks like: | ||
// | ||
// struct header { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
In the current implementation, weaver doesn't allow the user to propagate context information. We recommend users to define a struct that encapsulates the metadata information and add it as an argument to the method. However, more and more users are asking for an option to propagate metadata information using the context. This request comes especially from users that are using gRPC to communicate between their services, and gRPC provides a way to propagate metadata information using the context.
This PR implements a similar approach to gRPC [1] to propagate metadata information. We allow users to create a context where they can pass metadata as a
map[string][]string
.E.g.,
The user APIs are as follows:
Note that similar to the gRPC metadata, we make sure the metadata keys are lowercased, because the user can create metadata w/o using our New() metadata constructor, hence they can set the keys however they want.
[1] https://pkg.go.dev/google.golang.org/grpc/metadata