Skip to content

Commit

Permalink
Adds ability to change logging verbosity and adds more debug telemetry (
Browse files Browse the repository at this point in the history
#594)

* Adds ability to change log verbosity through cli / env var and also adds debug logging statements for upstream requests

Signed-off-by: Alex Creasy <alex@creasy.dev>

* Adds logging for k8s client

Signed-off-by: Alex Creasy <alex@creasy.dev>

---------

Signed-off-by: Alex Creasy <alex@creasy.dev>
  • Loading branch information
alexcreasy authored Jan 17, 2025
1 parent f9f78c3 commit 6374fb7
Show file tree
Hide file tree
Showing 26 changed files with 275 additions and 98 deletions.
3 changes: 2 additions & 1 deletion clients/ui/bff/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ STANDALONE_MODE ?= true
STATIC_ASSETS_DIR ?= ./static
# ENVTEST_K8S_VERSION refers to the version of kubebuilder assets to be downloaded by envtest binary.
ENVTEST_K8S_VERSION = 1.29.0
LOG_LEVEL ?= info

.PHONY: all
all: build
Expand Down Expand Up @@ -48,7 +49,7 @@ build: fmt vet test ## Builds the project to produce a binary executable.
.PHONY: run
run: fmt vet envtest ## Runs the project.
ENVTEST_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" \
go run ./cmd/main.go --port=$(PORT) --static-assets-dir=$(STATIC_ASSETS_DIR) --mock-k8s-client=$(MOCK_K8S_CLIENT) --mock-mr-client=$(MOCK_MR_CLIENT) --dev-mode=$(DEV_MODE) --dev-mode-port=$(DEV_MODE_PORT) --standalone-mode=$(STANDALONE_MODE)
go run ./cmd/main.go --port=$(PORT) --static-assets-dir=$(STATIC_ASSETS_DIR) --mock-k8s-client=$(MOCK_K8S_CLIENT) --mock-mr-client=$(MOCK_MR_CLIENT) --dev-mode=$(DEV_MODE) --dev-mode-port=$(DEV_MODE_PORT) --standalone-mode=$(STANDALONE_MODE) --log-level=$(LOG_LEVEL)

##@ Dependencies

Expand Down
5 changes: 5 additions & 0 deletions clients/ui/bff/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ If you want to use a different port, mock kubernetes client or model registry cl
```shell
make run PORT=8000 MOCK_K8S_CLIENT=true MOCK_MR_CLIENT=true
```
If you want to change the log level on deployment, add the LOG_LEVEL argument when running, supported levels are: ERROR, WARN, INFO, DEBUG. The default level is INFO.
```shell
# Run with debug logging
make run LOG_LEVEL=DEBUG
```

# Building and Deploying

Expand Down
30 changes: 29 additions & 1 deletion clients/ui/bff/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"flag"
"fmt"
"os/signal"
"strings"
"syscall"

"github.com/kubeflow/model-registry/ui/bff/internal/api"
Expand All @@ -26,9 +27,12 @@ func main() {
flag.IntVar(&cfg.DevModePort, "dev-mode-port", getEnvAsInt("DEV_MODE_PORT", 8080), "Use port when in development mode")
flag.BoolVar(&cfg.StandaloneMode, "standalone-mode", false, "Use standalone mode for enabling endpoints in standalone mode")
flag.StringVar(&cfg.StaticAssetsDir, "static-assets-dir", "./static", "Configure frontend static assets root directory")
flag.StringVar(&cfg.LogLevel, "log-level", getEnvAsString("LOG_LEVEL", "info"), "Sets server log level, possible values: debug, info, warn, error, fatal")
flag.Parse()

logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: getLogLevelFromString(cfg.LogLevel),
}))

app, err := api.NewApp(cfg, logger)
if err != nil {
Expand Down Expand Up @@ -88,3 +92,27 @@ func getEnvAsInt(name string, defaultVal int) int {
}
return defaultVal
}

func getEnvAsString(name string, defaultVal string) string {
if value, exists := os.LookupEnv(name); exists {
return value
}
return defaultVal
}

func getLogLevelFromString(level string) slog.Level {
switch strings.ToLower(level) {
case "debug":
return slog.LevelDebug
case "info":
return slog.LevelInfo
case "warn":
return slog.LevelWarn
case "error":
return slog.LevelError
case "fatal":
return slog.LevelError

}
return slog.LevelInfo
}
2 changes: 1 addition & 1 deletion clients/ui/bff/internal/api/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,5 +138,5 @@ func (app *App) Routes() http.Handler {
http.ServeFile(w, r, path.Join(app.config.StaticAssetsDir, "index.html"))
})

return app.RecoverPanic(app.enableCORS(app.InjectUserHeaders(appMux)))
return app.RecoverPanic(app.EnableTelemetry(app.enableCORS(app.InjectUserHeaders(appMux))))
}
3 changes: 2 additions & 1 deletion clients/ui/bff/internal/api/healthcheck__handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"github.com/kubeflow/model-registry/ui/bff/internal/config"
"github.com/kubeflow/model-registry/ui/bff/internal/constants"
"github.com/kubeflow/model-registry/ui/bff/internal/mocks"
"github.com/kubeflow/model-registry/ui/bff/internal/models"
"github.com/kubeflow/model-registry/ui/bff/internal/repositories"
Expand All @@ -26,7 +27,7 @@ func TestHealthCheckHandler(t *testing.T) {

rr := httptest.NewRecorder()
req, err := http.NewRequest(http.MethodGet, HealthCheckPath, nil)
ctx := context.WithValue(req.Context(), KubeflowUserIdKey, mocks.KubeflowUserIDHeaderValue)
ctx := context.WithValue(req.Context(), constants.KubeflowUserIdKey, mocks.KubeflowUserIDHeaderValue)
req = req.WithContext(ctx)
assert.NoError(t, err)

Expand Down
3 changes: 2 additions & 1 deletion clients/ui/bff/internal/api/healthcheck_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package api
import (
"errors"
"github.com/julienschmidt/httprouter"
"github.com/kubeflow/model-registry/ui/bff/internal/constants"
"net/http"
)

func (app *App) HealthcheckHandler(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {

userId, ok := r.Context().Value(KubeflowUserIdKey).(string)
userId, ok := r.Context().Value(constants.KubeflowUserIdKey).(string)
if !ok || userId == "" {
app.serverErrorResponse(w, r, errors.New("failed to retrieve kubeflow-userid from context"))
return
Expand Down
100 changes: 63 additions & 37 deletions clients/ui/bff/internal/api/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,15 @@ import (
"context"
"errors"
"fmt"
"net/http"
"strings"

"github.com/google/uuid"
"github.com/julienschmidt/httprouter"
"github.com/kubeflow/model-registry/ui/bff/internal/config"
"github.com/kubeflow/model-registry/ui/bff/internal/constants"
"github.com/kubeflow/model-registry/ui/bff/internal/integrations"
)

type contextKey string

const (
ModelRegistryHttpClientKey contextKey = "ModelRegistryHttpClientKey"
NamespaceHeaderParameterKey contextKey = "namespace"

//Kubeflow authorization operates using custom authentication headers:
// Note: The functionality for `kubeflow-groups` is not fully operational at Kubeflow platform at this time
// but it's supported on Model Registry BFF
KubeflowUserIdKey contextKey = "kubeflowUserId" // kubeflow-userid :contains the user's email address
KubeflowUserIDHeader = "kubeflow-userid"
KubeflowUserGroupsKey contextKey = "kubeflowUserGroups" // kubeflow-groups : Holds a comma-separated list of user groups
KubeflowUserGroupsIdHeader = "kubeflow-groups"
"log/slog"
"net/http"
"runtime/debug"
"strings"
)

func (app *App) RecoverPanic(next http.Handler) http.Handler {
Expand All @@ -33,6 +21,7 @@ func (app *App) RecoverPanic(next http.Handler) http.Handler {
if err := recover(); err != nil {
w.Header().Set("Connection", "close")
app.serverErrorResponse(w, r, fmt.Errorf("%s", err))
app.logger.Error("Recover from panic: " + string(debug.Stack()))
}
}()

Expand All @@ -49,8 +38,8 @@ func (app *App) InjectUserHeaders(next http.Handler) http.Handler {
return
}

userIdHeader := r.Header.Get(KubeflowUserIDHeader)
userGroupsHeader := r.Header.Get(KubeflowUserGroupsIdHeader)
userIdHeader := r.Header.Get(constants.KubeflowUserIDHeader)
userGroupsHeader := r.Header.Get(constants.KubeflowUserGroupsIdHeader)
//`kubeflow-userid`: Contains the user's email address.
if userIdHeader == "" {
app.badRequestResponse(w, r, errors.New("missing required header: kubeflow-userid"))
Expand All @@ -70,8 +59,8 @@ func (app *App) InjectUserHeaders(next http.Handler) http.Handler {
}

ctx := r.Context()
ctx = context.WithValue(ctx, KubeflowUserIdKey, userIdHeader)
ctx = context.WithValue(ctx, KubeflowUserGroupsKey, userGroups)
ctx = context.WithValue(ctx, constants.KubeflowUserIdKey, userIdHeader)
ctx = context.WithValue(ctx, constants.KubeflowUserGroupsKey, userGroups)

next.ServeHTTP(w, r.WithContext(ctx))
})
Expand All @@ -87,35 +76,72 @@ func (app *App) enableCORS(next http.Handler) http.Handler {
})
}

func (app *App) EnableTelemetry(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Adds a unique id to the context to allow tracing of requests
traceId := uuid.NewString()
ctx := context.WithValue(r.Context(), constants.TraceIdKey, traceId)

// logger will only be nil in tests.
if app.logger != nil {
traceLogger := app.logger.With(slog.String("trace_id", traceId))
ctx = context.WithValue(ctx, constants.TraceLoggerKey, traceLogger)

if traceLogger.Enabled(ctx, slog.LevelDebug) {
cloneBody, err := integrations.CloneBody(r)
if err != nil {
traceLogger.Debug("Error reading request body for debug logging", "error", err)
}
////TODO (Alex) Log headers, BUT we must ensure we don't log confidential data like tokens etc.
traceLogger.Debug("Incoming HTTP request", "method", r.Method, "url", r.URL.String(), "body", cloneBody)
}
}

next.ServeHTTP(w, r.WithContext(ctx))
})
}

func (app *App) AttachRESTClient(next func(http.ResponseWriter, *http.Request, httprouter.Params)) httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {

modelRegistryID := ps.ByName(ModelRegistryId)

namespace, ok := r.Context().Value(NamespaceHeaderParameterKey).(string)
namespace, ok := r.Context().Value(constants.NamespaceHeaderParameterKey).(string)
if !ok || namespace == "" {
app.badRequestResponse(w, r, fmt.Errorf("missing namespace in the context"))
}

modelRegistryBaseURL, err := resolveModelRegistryURL(namespace, modelRegistryID, app.kubernetesClient, app.config)
modelRegistryBaseURL, err := resolveModelRegistryURL(r.Context(), namespace, modelRegistryID, app.kubernetesClient, app.config)
if err != nil {
app.notFoundResponse(w, r)
return
}

client, err := integrations.NewHTTPClient(modelRegistryID, modelRegistryBaseURL)
// Set up a child logger for the rest client that automatically adds the request id to all statements for
// tracing.
restClientLogger := app.logger
traceId, ok := r.Context().Value(constants.TraceIdKey).(string)
if app.logger != nil {
if ok {
restClientLogger = app.logger.With(slog.String("trace_id", traceId))
} else {
app.logger.Warn("Failed to set trace_id for tracing")
}
}

client, err := integrations.NewHTTPClient(restClientLogger, modelRegistryID, modelRegistryBaseURL)
if err != nil {
app.serverErrorResponse(w, r, fmt.Errorf("failed to create Kubernetes client: %v", err))
return
}
ctx := context.WithValue(r.Context(), ModelRegistryHttpClientKey, client)
ctx := context.WithValue(r.Context(), constants.ModelRegistryHttpClientKey, client)
next(w, r.WithContext(ctx), ps)
}
}

func resolveModelRegistryURL(namespace string, serviceName string, client integrations.KubernetesClientInterface, config config.EnvConfig) (string, error) {
func resolveModelRegistryURL(sessionCtx context.Context, namespace string, serviceName string, client integrations.KubernetesClientInterface, config config.EnvConfig) (string, error) {

serviceDetails, err := client.GetServiceDetailsByName(namespace, serviceName)
serviceDetails, err := client.GetServiceDetailsByName(sessionCtx, namespace, serviceName)
if err != nil {
return "", err
}
Expand All @@ -131,13 +157,13 @@ func resolveModelRegistryURL(namespace string, serviceName string, client integr

func (app *App) AttachNamespace(next func(http.ResponseWriter, *http.Request, httprouter.Params)) httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
namespace := r.URL.Query().Get(string(NamespaceHeaderParameterKey))
namespace := r.URL.Query().Get(string(constants.NamespaceHeaderParameterKey))
if namespace == "" {
app.badRequestResponse(w, r, fmt.Errorf("missing required query parameter: %s", NamespaceHeaderParameterKey))
app.badRequestResponse(w, r, fmt.Errorf("missing required query parameter: %s", constants.NamespaceHeaderParameterKey))
return
}

ctx := context.WithValue(r.Context(), NamespaceHeaderParameterKey, namespace)
ctx := context.WithValue(r.Context(), constants.NamespaceHeaderParameterKey, namespace)
r = r.WithContext(ctx)

next(w, r, ps)
Expand All @@ -146,19 +172,19 @@ func (app *App) AttachNamespace(next func(http.ResponseWriter, *http.Request, ht

func (app *App) PerformSARonGetListServicesByNamespace(next func(http.ResponseWriter, *http.Request, httprouter.Params)) httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
user, ok := r.Context().Value(KubeflowUserIdKey).(string)
user, ok := r.Context().Value(constants.KubeflowUserIdKey).(string)
if !ok || user == "" {
app.badRequestResponse(w, r, fmt.Errorf("missing user in context"))
return
}
namespace, ok := r.Context().Value(NamespaceHeaderParameterKey).(string)
namespace, ok := r.Context().Value(constants.NamespaceHeaderParameterKey).(string)
if !ok || namespace == "" {
app.badRequestResponse(w, r, fmt.Errorf("missing namespace in context"))
return
}

var userGroups []string
if groups, ok := r.Context().Value(KubeflowUserGroupsKey).([]string); ok {
if groups, ok := r.Context().Value(constants.KubeflowUserGroupsKey).([]string); ok {
userGroups = groups
} else {
userGroups = []string{}
Expand All @@ -181,13 +207,13 @@ func (app *App) PerformSARonGetListServicesByNamespace(next func(http.ResponseWr
func (app *App) PerformSARonSpecificService(next func(http.ResponseWriter, *http.Request, httprouter.Params)) httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {

user, ok := r.Context().Value(KubeflowUserIdKey).(string)
user, ok := r.Context().Value(constants.KubeflowUserIdKey).(string)
if !ok || user == "" {
app.badRequestResponse(w, r, fmt.Errorf("missing user in context"))
return
}

namespace, ok := r.Context().Value(NamespaceHeaderParameterKey).(string)
namespace, ok := r.Context().Value(constants.NamespaceHeaderParameterKey).(string)
if !ok || namespace == "" {
app.badRequestResponse(w, r, fmt.Errorf("missing namespace in context"))
return
Expand All @@ -200,7 +226,7 @@ func (app *App) PerformSARonSpecificService(next func(http.ResponseWriter, *http
}

var userGroups []string
if groups, ok := r.Context().Value(KubeflowUserGroupsKey).([]string); ok {
if groups, ok := r.Context().Value(constants.KubeflowUserGroupsKey).([]string); ok {
userGroups = groups
} else {
userGroups = []string{}
Expand Down
5 changes: 3 additions & 2 deletions clients/ui/bff/internal/api/model_registry_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package api
import (
"fmt"
"github.com/julienschmidt/httprouter"
"github.com/kubeflow/model-registry/ui/bff/internal/constants"
"github.com/kubeflow/model-registry/ui/bff/internal/models"
"net/http"
)
Expand All @@ -11,12 +12,12 @@ type ModelRegistryListEnvelope Envelope[[]models.ModelRegistryModel, None]

func (app *App) ModelRegistryHandler(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {

namespace, ok := r.Context().Value(NamespaceHeaderParameterKey).(string)
namespace, ok := r.Context().Value(constants.NamespaceHeaderParameterKey).(string)
if !ok || namespace == "" {
app.badRequestResponse(w, r, fmt.Errorf("missing namespace in the context"))
}

registries, err := app.repositories.ModelRegistry.GetAllModelRegistries(app.kubernetesClient, namespace)
registries, err := app.repositories.ModelRegistry.GetAllModelRegistries(r.Context(), app.kubernetesClient, namespace)
if err != nil {
app.serverErrorResponse(w, r, err)
return
Expand Down
5 changes: 4 additions & 1 deletion clients/ui/bff/internal/api/model_registry_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/kubeflow/model-registry/ui/bff/internal/constants"
"github.com/kubeflow/model-registry/ui/bff/internal/mocks"
"github.com/kubeflow/model-registry/ui/bff/internal/models"
"github.com/kubeflow/model-registry/ui/bff/internal/repositories"
. "github.com/onsi/ginkgo/v2"
Expand All @@ -28,7 +30,8 @@ var _ = Describe("TestModelRegistryHandler", func() {
requestPath := fmt.Sprintf(" %s?namespace=kubeflow", ModelRegistryListPath)
req, err := http.NewRequest(http.MethodGet, requestPath, nil)

ctx := context.WithValue(req.Context(), NamespaceHeaderParameterKey, "kubeflow")
ctx := mocks.NewMockSessionContext(req.Context())
ctx = context.WithValue(ctx, constants.NamespaceHeaderParameterKey, "kubeflow")
req = req.WithContext(ctx)

Expect(err).NotTo(HaveOccurred())
Expand Down
Loading

0 comments on commit 6374fb7

Please sign in to comment.