Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch certificate RPCs from pipe to component calls #708

Merged
merged 2 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions deployerControl.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,18 @@ func (*localDeployerControl) GetListenerAddress(context.Context, *protos.GetList
func (*localDeployerControl) ExportListener(context.Context, *protos.ExportListenerRequest) (*protos.ExportListenerReply, error) {
return nil, fmt.Errorf("localDeployerControl.ExportListener not implemented")
}

// GetSelfCertificate implements the control.DeployerControl interface.
func (*localDeployerControl) GetSelfCertificate(context.Context, *protos.GetSelfCertificateRequest) (*protos.GetSelfCertificateReply, error) {
return nil, fmt.Errorf("localDeployerControl.GetSelfCertificate not implemented")
}

// VerifyClientCertificate implements the control.DeployerControl interface.
func (*localDeployerControl) VerifyClientCertificate(context.Context, *protos.VerifyClientCertificateRequest) (*protos.VerifyClientCertificateReply, error) {
return nil, fmt.Errorf("localDeployerControl.VerifyClientCertificate not implemented")
}

// VerifyServerCertificate implements the control.DeployerControl interface.
func (*localDeployerControl) VerifyServerCertificate(context.Context, *protos.VerifyServerCertificateRequest) (*protos.VerifyServerCertificateReply, error) {
return nil, fmt.Errorf("localDeployerControl.VerifyServerCertificate not implemented")
}
22 changes: 12 additions & 10 deletions examples/examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestExamples(t *testing.T) {
t.Skip("no test case defined")
}

ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
t.Cleanup(cancel)

// Build the example binary.
Expand Down Expand Up @@ -113,22 +113,24 @@ func TestExamples(t *testing.T) {
})

// "weaver multi deploy" the application with MTLS.
t.Run("weaver-multi-mtls", func(t *testing.T) {
if os.Getenv("GITHUB_RUN_ID") != "" {
t.Skip("test takes too long (over 30s) on github")
}
cmd := startCmd(ctx, t, nil, "../../cmd/weaver/weaver", "multi", "deploy", "weaver_mtls.toml")
t.Cleanup(terminateCmdAndWait(t, cmd))
run(t, test)
})
if name == "collatz" {
mwhittaker marked this conversation as resolved.
Show resolved Hide resolved
t.Run("weaver-multi-mtls", func(t *testing.T) {
if os.Getenv("GITHUB_RUN_ID") != "" {
t.Skip("test takes too long (over 30s) on github")
}
cmd := startCmd(ctx, t, nil, "../../cmd/weaver/weaver", "multi", "deploy", "weaver_mtls.toml")
t.Cleanup(terminateCmdAndWait(t, cmd))
run(t, test)
})
}

// TODO: other deployers?
})
}
}

func run(t *testing.T, test test) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
t.Cleanup(cancel)

// Send a GET request to the endpoint, retrying on error.
Expand Down
27 changes: 27 additions & 0 deletions internal/control/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,31 @@ type DeployerControl interface {
// typically, but not always, involves running a proxy that forwards
// traffic to the provided address.
ExportListener(context.Context, *protos.ExportListenerRequest) (*protos.ExportListenerReply, error)

// GetSelfCertificate returns the certificate and the private key the
// weavelet should use for network connection establishment. The weavelet
// will issue this request each time it establishes a connection with
// another weavelet.
// NOTE: This method is only called if mTLS was enabled for the weavelet,
// by passing it an EnvelopeInfo with mtls=true.
GetSelfCertificate(context.Context, *protos.GetSelfCertificateRequest) (*protos.GetSelfCertificateReply, error)

// VerifyClientCertificate verifies the certificate chain presented by
// a network client attempting to connect to the weavelet. It returns an
// error if the network connection should not be established with the
// client. Otherwise, it returns the list of weavelet components that the
// client is authorized to invoke methods on.
//
// NOTE: This method is only called if mTLS was enabled for the weavelet,
// by passing it an EnvelopeInfo with mtls=true.
VerifyClientCertificate(context.Context, *protos.VerifyClientCertificateRequest) (*protos.VerifyClientCertificateReply, error)

// VerifyServerCertificate verifies the certificate chain presented by
// the server the weavelet is attempting to connect to. It returns an
// error iff the server identity doesn't match the identity of the specified
// component.
//
// NOTE: This method is only called if mTLS was enabled for the weavelet,
// by passing it an EnvelopeInfo with mtls=true.
VerifyServerCertificate(context.Context, *protos.VerifyServerCertificateRequest) (*protos.VerifyServerCertificateReply, error)
}
48 changes: 0 additions & 48 deletions internal/envelope/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package conn

import (
"fmt"
"io"
sync "sync"

Expand All @@ -34,7 +33,6 @@ type conn struct {

mu sync.Mutex
writer io.WriteCloser
lastId int64 // Id used for last request/response pair
waiters map[int64]chan response // Response waiters
failure error // Non-nil when error has been encountered
}
Expand All @@ -55,15 +53,6 @@ func getId(msg proto.Message) int64 {
}
}

func setId(msg proto.Message, id int64) {
switch x := msg.(type) {
case *protos.WeaveletMsg:
x.Id = id
case *protos.EnvelopeMsg:
x.Id = id
}
}

// recv reads the next request from the pipe and writes it to msg. Note that
// recv does NOT return RPC replies. These replies are returned directly the
// invoker of the RPC.
Expand Down Expand Up @@ -128,40 +117,3 @@ func (c *conn) send(msg proto.Message) error {
}
return err
}

// doBlockingRPC performs an RPC request, and blocks until a response is received by
// handleResponse, or cleanup is called with an error.
func (c *conn) doBlockingRPC(request proto.Message) (proto.Message, error) {
ch := c.startRPC(request)
r, ok := <-ch
if !ok {
return nil, fmt.Errorf("%s: connection to peer broken", c.name)
}
return r.result, r.err
}

func (c *conn) startRPC(request proto.Message) chan response {
ch := make(chan response, 1)

// Assign request ID and register in set of waiters.
c.mu.Lock()
defer c.mu.Unlock()
if c.failure != nil {
ch <- response{nil, c.failure}
return ch
}
c.lastId++
id := c.lastId
if c.waiters == nil {
c.waiters = map[int64]chan response{}
}
c.waiters[id] = ch

setId(request, id)
if err := protomsg.Write(c.writer, request); err != nil {
delete(c.waiters, id)
c.cleanupLocked(err)
ch <- response{nil, err}
}
return ch
}
28 changes: 0 additions & 28 deletions internal/envelope/conn/envelope_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,35 +189,7 @@ func (e *EnvelopeConn) WeaveletInfo() *protos.WeaveletInfo {
// handleMessage handles all messages initiated by the weavelet. Note that this
// method doesn't handle RPC replies from weavelet.
func (e *EnvelopeConn) handleMessage(msg *protos.WeaveletMsg, h EnvelopeHandler) error {
errstring := func(err error) string {
if err == nil {
return ""
}
return err.Error()
}

switch {
case msg.GetSelfCertificateRequest != nil:
reply, err := h.GetSelfCertificate(e.ctx, msg.GetSelfCertificateRequest)
return e.conn.send(&protos.EnvelopeMsg{
Id: -msg.Id,
Error: errstring(err),
GetSelfCertificateReply: reply,
})
case msg.VerifyClientCertificateRequest != nil:
reply, err := h.VerifyClientCertificate(e.ctx, msg.VerifyClientCertificateRequest)
return e.conn.send(&protos.EnvelopeMsg{
Id: -msg.Id,
Error: errstring(err),
VerifyClientCertificateReply: reply,
})
case msg.VerifyServerCertificateRequest != nil:
reply, err := h.VerifyServerCertificate(e.ctx, msg.VerifyServerCertificateRequest)
return e.conn.send(&protos.EnvelopeMsg{
Id: -msg.Id,
Error: errstring(err),
VerifyServerCertificateReply: reply,
})
case msg.LogEntry != nil:
return h.HandleLogEntry(e.ctx, msg.LogEntry)
case msg.TraceSpans != nil:
Expand Down
56 changes: 0 additions & 56 deletions internal/envelope/conn/weavelet_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,62 +119,6 @@ func (w *WeaveletConn) Listener() net.Listener {
return w.lis
}

// GetSelfCertificateRPC returns the certificate and the private key the
// weavelet should use for network connection establishment.
func (w *WeaveletConn) GetSelfCertificateRPC(req *protos.GetSelfCertificateRequest) (*protos.GetSelfCertificateReply, error) {
reply, err := w.rpc(&protos.WeaveletMsg{GetSelfCertificateRequest: req})
if err != nil {
return nil, err
}
if reply.GetSelfCertificateReply == nil {
return nil, fmt.Errorf("nil GetSelfCertificateReply received from envelope")
}
return reply.GetSelfCertificateReply, nil
}

// VerifyClientCertificateRPC verifies the identity of a client that is
// attempting to connect to the weavelet.
func (w *WeaveletConn) VerifyClientCertificateRPC(req *protos.VerifyClientCertificateRequest) (*protos.VerifyClientCertificateReply, error) {
reply, err := w.rpc(&protos.WeaveletMsg{VerifyClientCertificateRequest: req})
if err != nil {
return nil, err
}
if reply.VerifyClientCertificateReply == nil {
return nil, fmt.Errorf("nil VerifyClientCertificateReply received from envelope")
}
return reply.VerifyClientCertificateReply, nil
}

// VerifyServerCertificateRPC verifies the identity of the server the weavelet
// is attempting to connect to.
func (w *WeaveletConn) VerifyServerCertificateRPC(req *protos.VerifyServerCertificateRequest) error {
reply, err := w.rpc(&protos.WeaveletMsg{VerifyServerCertificateRequest: req})
if err != nil {
return err
}
if reply.VerifyServerCertificateReply == nil {
return fmt.Errorf("nil VerifyServerCertificateReply received from envelope")
}
return nil
}

func (w *WeaveletConn) rpc(request *protos.WeaveletMsg) (*protos.EnvelopeMsg, error) {
response, err := w.conn.doBlockingRPC(request)
if err != nil {
err := fmt.Errorf("connection to envelope broken: %w", err)
w.conn.cleanup(err)
return nil, err
}
msg, ok := response.(*protos.EnvelopeMsg)
if !ok {
return nil, fmt.Errorf("envelope response has wrong type %T", response)
}
if msg.Error != "" {
return nil, fmt.Errorf(msg.Error)
}
return msg, nil
}

// SendLogEntry sends a log entry to the envelope, without waiting for a reply.
func (w *WeaveletConn) SendLogEntry(entry *protos.LogEntry) error {
return w.conn.send(&protos.WeaveletMsg{LogEntry: entry})
Expand Down
6 changes: 3 additions & 3 deletions internal/tool/multi/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,15 +403,15 @@ func (h *handler) subscribeTo(req *protos.ActivateComponentRequest) error {
return h.envelope.UpdateRoutingInfo(target.routing(req.Component))
}

// GetSelfCertificate implements the envelope.EnvelopeHandler interface.
// GetSelfCertificate implements the control.DeployerControl interface.
func (h *handler) GetSelfCertificate(context.Context, *protos.GetSelfCertificateRequest) (*protos.GetSelfCertificateReply, error) {
return &protos.GetSelfCertificateReply{
Cert: h.g.certPEM,
Key: h.g.keyPEM,
}, nil
}

// VerifyClientCertificate implements the envelope.EnvelopeHandler interface.
// VerifyClientCertificate implements the control.DeployerControl interface.
func (h *handler) VerifyClientCertificate(_ context.Context, req *protos.VerifyClientCertificateRequest) (*protos.VerifyClientCertificateReply, error) {
groupName, err := h.verifyCertificate(req.CertChain)
if err != nil {
Expand All @@ -426,7 +426,7 @@ func (h *handler) VerifyClientCertificate(_ context.Context, req *protos.VerifyC
return &protos.VerifyClientCertificateReply{Components: g.callable}, nil
}

// VerifyServerCertificate implements the envelope.EnvelopeHandler interface.
// VerifyServerCertificate implements the control.DeployerControl interface.
func (h *handler) VerifyServerCertificate(_ context.Context, req *protos.VerifyServerCertificateRequest) (*protos.VerifyServerCertificateReply, error) {
actual, err := h.verifyCertificate(req.CertChain)
if err != nil {
Expand Down
7 changes: 4 additions & 3 deletions internal/weaver/remoteweavelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ func (w *RemoteWeavelet) getListenerAddress(ctx context.Context, name string) (s

func (w *RemoteWeavelet) getSelfCertificate() (*tls.Certificate, error) {
request := &protos.GetSelfCertificateRequest{}
reply, err := w.conn.GetSelfCertificateRPC(request)
reply, err := w.deployer.GetSelfCertificate(context.TODO(), request)
if err != nil {
return nil, err
}
Expand All @@ -775,7 +775,7 @@ func (w *RemoteWeavelet) getSelfCertificate() (*tls.Certificate, error) {

func (w *RemoteWeavelet) verifyClientCertificate(certChain [][]byte) ([]string, error) {
request := &protos.VerifyClientCertificateRequest{CertChain: certChain}
reply, err := w.conn.VerifyClientCertificateRPC(request)
reply, err := w.deployer.VerifyClientCertificate(context.TODO(), request)
if err != nil {
return nil, err
}
Expand All @@ -787,7 +787,8 @@ func (w *RemoteWeavelet) verifyServerCertificate(certChain [][]byte, targetCompo
CertChain: certChain,
TargetComponent: targetComponent,
}
return w.conn.VerifyServerCertificateRPC(request)
_, err := w.deployer.VerifyServerCertificate(context.TODO(), request)
return err
}

// server serves RPC traffic from other RemoteWeavelets.
Expand Down
Loading