diff --git a/.github/workflows/deploy-agent-api.yaml b/.github/workflows/deploy-agent-api.yaml index 7f6a14f01e..336056c284 100644 --- a/.github/workflows/deploy-agent-api.yaml +++ b/.github/workflows/deploy-agent-api.yaml @@ -2,9 +2,6 @@ name: Deploy agent-api on: workflow_dispatch: {} - # TODO(johnny): Remove after merging. - push: - branches: [johnny/dpc-cd] env: CARGO_INCREMENTAL: 0 # Faster from-scratch builds. diff --git a/.github/workflows/deploy-data-plane-controller.yaml b/.github/workflows/deploy-data-plane-controller.yaml index 93a91cf4a7..3987f63b7e 100644 --- a/.github/workflows/deploy-data-plane-controller.yaml +++ b/.github/workflows/deploy-data-plane-controller.yaml @@ -2,9 +2,6 @@ name: Deploy data-plane-controller on: workflow_dispatch: {} - # TODO(johnny): Remove after merging. - push: - branches: [johnny/dpc-cd] env: CARGO_INCREMENTAL: 0 # Faster from-scratch builds. diff --git a/crates/connector-init/src/capture.rs b/crates/connector-init/src/capture.rs index 7462e22bea..3847f00f49 100644 --- a/crates/connector-init/src/capture.rs +++ b/crates/connector-init/src/capture.rs @@ -24,7 +24,7 @@ impl proto_grpc::capture::connector_server::Connector for Proxy { rpc::new_command(&self.entrypoint), self.codec, request.into_inner().map_ok(|mut request| { - request.internal.clear(); // TODO(johnny): Temporarily remove $internal. + request.internal.clear(); request }), ops::stderr_log_handler, diff --git a/crates/connector-init/src/derive.rs b/crates/connector-init/src/derive.rs index 0e194dcfee..7bacfe6ae8 100644 --- a/crates/connector-init/src/derive.rs +++ b/crates/connector-init/src/derive.rs @@ -24,7 +24,7 @@ impl proto_grpc::derive::connector_server::Connector for Proxy { rpc::new_command(&self.entrypoint), self.codec, request.into_inner().map_ok(|mut request| { - request.internal.clear(); // TODO(johnny): Temporarily remove $internal. + request.internal.clear(); request }), ops::stderr_log_handler, diff --git a/crates/connector-init/src/materialize.rs b/crates/connector-init/src/materialize.rs index 79a9a39c84..2b76a32142 100644 --- a/crates/connector-init/src/materialize.rs +++ b/crates/connector-init/src/materialize.rs @@ -24,7 +24,7 @@ impl proto_grpc::materialize::connector_server::Connector for Proxy { rpc::new_command(&self.entrypoint), self.codec, request.into_inner().map_ok(|mut request| { - request.internal.clear(); // TODO(johnny): Temporarily remove $internal. + request.internal.clear(); request }), ops::stderr_log_handler, diff --git a/crates/data-plane-controller/src/controller.rs b/crates/data-plane-controller/src/controller.rs index 615fc3a097..3d2bd5137e 100644 --- a/crates/data-plane-controller/src/controller.rs +++ b/crates/data-plane-controller/src/controller.rs @@ -20,7 +20,6 @@ pub enum Message { Preview, Refresh, Converge, - // TODO(johnny): `Destroy` variant for managed tear-down. } #[derive(Debug, serde::Deserialize, serde::Serialize)] diff --git a/go/network/auth.go b/go/network/auth.go index 8f7c581293..e8818f2fd1 100644 --- a/go/network/auth.go +++ b/go/network/auth.go @@ -6,13 +6,14 @@ import ( "net/http" "net/url" + pf "github.com/estuary/flow/go/protocols/flow" pb "go.gazette.dev/core/broker/protocol" "google.golang.org/grpc/metadata" ) // verifyAuthorization ensures the request has an authorization which // is valid for capability NETWORK_PROXY to `taskName`. -func verifyAuthorization(req *http.Request, verifier pb.Verifier, taskName string) error { +func verifyAuthorization(req *http.Request, verifier pb.Verifier, shardIDPrefix string) error { var bearer = req.Header.Get("authorization") if bearer != "" { // Pass. @@ -27,21 +28,29 @@ func verifyAuthorization(req *http.Request, verifier pb.Verifier, taskName strin req.Context(), metadata.Pairs("authorization", bearer), ), - 0, // TODO(johnny): Should be pf.Capability_NETWORK_PROXY. + pf.Capability_NETWORK_PROXY, ) if err != nil { return err } cancel() // We don't use the returned context. - /* TODO(johnny): Inspect claims once UI is updated to use /authorize/user/task API. + // When we resolved SNI, we stripped the shard ID prefix of its creation + // publication ID suffix (like `/0123457890abcdef/`) to ensure the SNI cache + // is invariant to a task being deleted and re-created. + // + // Account for that here by extending `shardIDPrefix` with the creation + // ID suffix indicated by `claims`. + var suffix string + if id := claims.Selector.Include.ValueOf("id"); len(id) > 17 { + suffix = id[len(id)-17:] // 16 hex bytes, plus trailing '/'. + } + if !claims.Selector.Matches(pb.MustLabelSet( - labels.TaskName, taskName, + "id", shardIDPrefix+suffix, )) { - return fmt.Errorf("invalid authorization for task %s (%s)", taskName, bearer) + return fmt.Errorf("invalid authorization for task prefix %s (%s)", shardIDPrefix, bearer) } - */ - _ = claims return nil } diff --git a/go/network/frontend.go b/go/network/frontend.go index 41dce40954..eb58ba8ea5 100644 --- a/go/network/frontend.go +++ b/go/network/frontend.go @@ -382,7 +382,7 @@ func (p *Frontend) serveConnHTTP(user *frontendConn) { } else if req.URL.Path == "/auth-redirect" { completeAuthRedirect(w, req) httpHandledCounter.WithLabelValues(task, port, "CompleteAuth").Inc() - } else if err := verifyAuthorization(req, p.verifier, user.resolved.taskName); err == nil { + } else if err := verifyAuthorization(req, p.verifier, user.resolved.shardIDPrefix); err == nil { reverse.ServeHTTP(w, req) } else if req.Method == "GET" && strings.Contains(req.Header.Get("accept"), "html") { // Presence of "html" in Accept means this is probably a browser.