Skip to content

Commit

Permalink
Refactor workflows into a proper go library (#14)
Browse files Browse the repository at this point in the history
* Add option mechanism

* Make task runner logger configurable

* TaskRunner config

* Lint issues

* Observability library improvements

* Unix socket support

* Add options to NewJobService function

* Fix runner tests

* Update readme
  • Loading branch information
lukasbindreiter authored May 13, 2024
1 parent 74c2299 commit c443055
Show file tree
Hide file tree
Showing 8 changed files with 323 additions and 123 deletions.
52 changes: 17 additions & 35 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,26 +56,24 @@ import (
"log/slog"
"os"

"connectrpc.com/connect"
"github.com/google/uuid"
"github.com/tilebox/tilebox-go/grpc"
"github.com/tilebox/tilebox-go/protogen/go/workflows/v1/workflowsv1connect"
"github.com/tilebox/tilebox-go/workflows/v1"
)

const serverURL = "https://api.tilebox.com"

type HelloTask struct {
Name string
}

func main() {
ctx := context.Background()

jobsClient := clientFromConfig(serverURL, os.Getenv("TILEBOX_API_KEY"))
jobs := workflows.NewJobService(jobsClient)
jobs := workflows.NewJobService(
workflows.NewJobClient(
workflows.WithAPIKey(os.Getenv("TILEBOX_API_KEY")),
),
)

job, err := jobs.Submit(ctx, "hello-world", workflows.DefaultClusterSlug,
job, err := jobs.Submit(ctx, "hello-world", "testing-4qgCk4qHH85qR7", 0,
&HelloTask{
Name: "Tilebox",
},
Expand All @@ -87,16 +85,6 @@ func main() {

slog.InfoContext(ctx, "Job submitted", "job_id", uuid.Must(uuid.FromBytes(job.GetId().GetUuid())))
}

func clientFromConfig(serverURL, authToken string) workflowsv1connect.JobServiceClient {
return workflowsv1connect.NewJobServiceClient(
grpc.RetryHTTPClient(), serverURL, connect.WithInterceptors(
grpc.NewAddAuthTokenInterceptor(func() string {
return authToken
})),
)
}

```

### Running a Worker
Expand All @@ -111,14 +99,9 @@ import (
"log/slog"
"os"

"connectrpc.com/connect"
"github.com/tilebox/tilebox-go/grpc"
"github.com/tilebox/tilebox-go/protogen/go/workflows/v1/workflowsv1connect"
"github.com/tilebox/tilebox-go/workflows/v1"
)

const serverURL = "https://api.tilebox.com"

type HelloTask struct {
Name string
}
Expand All @@ -130,10 +113,18 @@ func (t *HelloTask) Execute(context.Context) error {
}

func main() {
taskClient := clientFromConfig(serverURL, os.Getenv("TILEBOX_API_KEY"))
runner := workflows.NewTaskRunner(taskClient)
runner, err := workflows.NewTaskRunner(
workflows.NewTaskClient(
workflows.WithAPIKey(os.Getenv("TILEBOX_API_KEY")),
),
workflows.WithCluster("testing-4qgCk4qHH85qR7"),
)
if err != nil {
slog.Error("failed to create task runner", "error", err)
return
}

err := runner.RegisterTasks(
err = runner.RegisterTasks(
&HelloTask{},
)
if err != nil {
Expand All @@ -143,13 +134,4 @@ func main() {

runner.Run(context.Background())
}

func clientFromConfig(serverURL, authToken string) workflowsv1connect.TaskServiceClient {
return workflowsv1connect.NewTaskServiceClient(
grpc.RetryHTTPClient(), serverURL, connect.WithInterceptors(
grpc.NewAddAuthTokenInterceptor(func() string {
return authToken
})),
)
}
```
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/klauspost/compress v1.17.7 // indirect
github.com/remychantenay/slog-otel v1.3.0 // indirect
github.com/stretchr/testify v1.9.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.48.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ github.com/pkg/profile v1.7.0 h1:hnbDkaNWPCLMO9wGLdBFTIZvzDrDfBM2072E1S9gJkA=
github.com/pkg/profile v1.7.0/go.mod h1:8Uer0jas47ZQMJ7VD+OHknK4YDY07LPUC6dEvqDjvNo=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/remychantenay/slog-otel v1.3.0 h1:mppL97agkmwR416lKzltRQ9QRhrPdxwVidt0AnI3Ts4=
github.com/remychantenay/slog-otel v1.3.0/go.mod h1:L2VAe6WOMAk/kRzzuv2B/rWe/IDXAhUNae0919b4kHU=
github.com/rs/cors v1.10.1 h1:L0uuZVXIKlI1SShY2nhFfo44TYvDPQ1w4oFkUJNfhyo=
github.com/rs/cors v1.10.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
Expand Down
54 changes: 25 additions & 29 deletions observability/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
adapter "github.com/axiomhq/axiom-go/adapters/slog"
"github.com/axiomhq/axiom-go/axiom"
axiotel "github.com/axiomhq/axiom-go/axiom/otel"
slogotel "github.com/remychantenay/slog-otel"
workflowsv1 "github.com/tilebox/tilebox-go/protogen/go/workflows/v1"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
Expand All @@ -21,53 +21,49 @@ import (

var propagator = propagation.TraceContext{}

// AxiomLogHandler returns an Axiom handler for slog.
func AxiomLogHandler(dataset, token string, level slog.Level) (*adapter.Handler, error) {
// NewAxiomLogger returns a slog.Logger that logs to Axiom.
// It also returns a shutdown function that should be called when the logger is no longer needed, to ensure
// all logs are flushed.
func NewAxiomLogger(dataset, token string, level slog.Level) (*slog.Logger, func(), error) {
noShutdown := func() {}
client, err := axiom.NewClient(axiom.SetToken(token))
if err != nil {
return nil, err
return nil, noShutdown, err
}

return adapter.New(
axiomHandler, err := adapter.New(
adapter.SetDataset(dataset),
adapter.SetClient(client),
adapter.SetLevel(level),
)
}
if err != nil {
return nil, noShutdown, err
}

// AxiomTraceExporter returns an Axiom OpenTelemetry trace exporter.
func AxiomTraceExporter(ctx context.Context, dataset, token string) (trace.SpanExporter, error) {
return axiotel.TraceExporter(ctx, dataset, axiotel.SetToken(token))
return slog.New(slogotel.OtelHandler{Next: axiomHandler}), axiomHandler.Close, nil
}

func SetupOtelTracing(serviceName, serviceVersion string, exporters ...trace.SpanExporter) func(ctx context.Context) {
tp := tracerProvider(serviceName, serviceVersion, exporters)
otel.SetTracerProvider(tp)

shutDownFunc := func(ctx context.Context) {
_ = tp.Shutdown(ctx)
func NewAxiomTracerProvider(ctx context.Context, dataset, token, serviceName, serviceVersion string) (oteltrace.TracerProvider, func(), error) {
noShutdown := func() {}
exporter, err := axiotel.TraceExporter(ctx, dataset, axiotel.SetToken(token))
if err != nil {
return nil, noShutdown, err
}

return shutDownFunc
}

// tracerProvider configures and returns a new OpenTelemetry tracer provider.
func tracerProvider(serviceName, serviceVersion string, exporters []trace.SpanExporter) *trace.TracerProvider {
rs := resource.NewWithAttributes(
traceResource := resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(serviceName),
semconv.ServiceVersionKey.String(serviceVersion),
)

opts := []trace.TracerProviderOption{
trace.WithResource(rs),
}

for _, exporter := range exporters {
opts = append(opts, trace.WithBatcher(exporter, trace.WithMaxQueueSize(10*1024)))
provider := trace.NewTracerProvider(
trace.WithResource(traceResource),
trace.WithBatcher(exporter, trace.WithMaxQueueSize(10*1024)),
)
shutdown := func() {
_ = provider.Shutdown(ctx)
}

return trace.NewTracerProvider(opts...)
return provider, shutdown, nil
}

// generateTraceParent generates a random traceparent.
Expand Down
103 changes: 103 additions & 0 deletions workflows/v1/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package workflows

import (
"context"
"net"
"net/http"
"strings"

"connectrpc.com/connect"
"github.com/tilebox/tilebox-go/grpc"
"github.com/tilebox/tilebox-go/protogen/go/workflows/v1/workflowsv1connect"
)

// clientConfig contains the configuration for a gRPC client to a workflows service.
type clientConfig struct {
httpClient connect.HTTPClient
url string
apiKey string
connectOptions []connect.ClientOption
}

// ClientOption is an interface for configuring a client. Using such options helpers is a
// quite common pattern in Go, as it allows for optional parameters in constructors.
// This concrete implementation here is inspired by how libraries such as axiom-go and connect do their
// configuration.
type ClientOption func(*clientConfig)

func WithHTTPClient(httpClient connect.HTTPClient) ClientOption {
return func(cfg *clientConfig) {
cfg.httpClient = httpClient
}
}

func WithURL(url string) ClientOption {
return func(cfg *clientConfig) {
cfg.url = url
}
}

func WithAPIKey(apiKey string) ClientOption {
return func(cfg *clientConfig) {
cfg.apiKey = apiKey
}
}

func WithConnectClientOptions(options ...connect.ClientOption) ClientOption {
return func(cfg *clientConfig) {
cfg.connectOptions = append(cfg.connectOptions, options...)
}
}

func newClientConfig(options []ClientOption) *clientConfig {
cfg := &clientConfig{
url: "https://api.tilebox.com",
}
for _, option := range options {
option(cfg)
}

// if no http client is set by the user, we use a default one
if cfg.httpClient == nil {
// if the URL looks like an HTTP URL, we use a retrying HTTP client
if strings.HasPrefix(cfg.url, "https://") || strings.HasPrefix(cfg.url, "http://") {
cfg.httpClient = grpc.RetryHTTPClient()
} else { // we connect to a unix socket
dial := func(context.Context, string, string) (net.Conn, error) {
return net.Dial("unix", cfg.url)
}
transport := &http.Transport{DialContext: dial}
cfg.httpClient = &http.Client{Transport: transport}
}
}

return cfg
}

func NewTaskClient(options ...ClientOption) workflowsv1connect.TaskServiceClient {
cfg := newClientConfig(options)

return workflowsv1connect.NewTaskServiceClient(
cfg.httpClient,
cfg.url,
connect.WithClientOptions(cfg.connectOptions...),
connect.WithInterceptors(
grpc.NewAddAuthTokenInterceptor(func() string {
return cfg.apiKey
})),
)
}

func NewJobClient(options ...ClientOption) workflowsv1connect.JobServiceClient {
cfg := newClientConfig(options)

return workflowsv1connect.NewJobServiceClient(
cfg.httpClient,
cfg.url,
connect.WithClientOptions(cfg.connectOptions...),
connect.WithInterceptors(
grpc.NewAddAuthTokenInterceptor(func() string {
return cfg.apiKey
})),
)
}
36 changes: 34 additions & 2 deletions workflows/v1/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,47 @@ import (
"google.golang.org/protobuf/proto"
)

type jobServiceConfig struct {
tracerProvider trace.TracerProvider
tracerName string
}

type JobServiceOption func(*jobServiceConfig)

func WithJobServiceTracerProvider(tracerProvider trace.TracerProvider) JobServiceOption {
return func(cfg *jobServiceConfig) {
cfg.tracerProvider = tracerProvider
}
}

func WithJobServiceTracerName(tracerName string) JobServiceOption {
return func(cfg *jobServiceConfig) {
cfg.tracerName = tracerName
}
}

type JobService struct {
client workflowsv1connect.JobServiceClient
tracer trace.Tracer
}

func NewJobService(client workflowsv1connect.JobServiceClient) *JobService {
func newJobServiceConfig(options []JobServiceOption) *jobServiceConfig {
cfg := &jobServiceConfig{
tracerProvider: otel.GetTracerProvider(), // use the global tracer provider by default
tracerName: "tilebox.com/observability", // the default tracer name we use
}
for _, option := range options {
option(cfg)
}

return cfg
}

func NewJobService(client workflowsv1connect.JobServiceClient, options ...JobServiceOption) *JobService {
cfg := newJobServiceConfig(options)
return &JobService{
client: client,
tracer: otel.Tracer("tilebox.com/observability"),
tracer: cfg.tracerProvider.Tracer(cfg.tracerName),
}
}

Expand Down
Loading

0 comments on commit c443055

Please sign in to comment.