Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor workflows into a proper go library #14

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 17 additions & 35 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,26 +56,24 @@ import (
"log/slog"
"os"

"connectrpc.com/connect"
"github.com/google/uuid"
"github.com/tilebox/tilebox-go/grpc"
"github.com/tilebox/tilebox-go/protogen/go/workflows/v1/workflowsv1connect"
"github.com/tilebox/tilebox-go/workflows/v1"
)

const serverURL = "https://api.tilebox.com"

type HelloTask struct {
Name string
}

func main() {
ctx := context.Background()

jobsClient := clientFromConfig(serverURL, os.Getenv("TILEBOX_API_KEY"))
jobs := workflows.NewJobService(jobsClient)
jobs := workflows.NewJobService(
workflows.NewJobClient(
workflows.WithAPIKey(os.Getenv("TILEBOX_API_KEY")),
),
)

job, err := jobs.Submit(ctx, "hello-world", workflows.DefaultClusterSlug,
job, err := jobs.Submit(ctx, "hello-world", "testing-4qgCk4qHH85qR7", 0,
&HelloTask{
Name: "Tilebox",
},
Expand All @@ -87,16 +85,6 @@ func main() {

slog.InfoContext(ctx, "Job submitted", "job_id", uuid.Must(uuid.FromBytes(job.GetId().GetUuid())))
}

func clientFromConfig(serverURL, authToken string) workflowsv1connect.JobServiceClient {
return workflowsv1connect.NewJobServiceClient(
grpc.RetryHTTPClient(), serverURL, connect.WithInterceptors(
grpc.NewAddAuthTokenInterceptor(func() string {
return authToken
})),
)
}

```

### Running a Worker
Expand All @@ -111,14 +99,9 @@ import (
"log/slog"
"os"

"connectrpc.com/connect"
"github.com/tilebox/tilebox-go/grpc"
"github.com/tilebox/tilebox-go/protogen/go/workflows/v1/workflowsv1connect"
"github.com/tilebox/tilebox-go/workflows/v1"
)

const serverURL = "https://api.tilebox.com"

type HelloTask struct {
Name string
}
Expand All @@ -130,10 +113,18 @@ func (t *HelloTask) Execute(context.Context) error {
}

func main() {
taskClient := clientFromConfig(serverURL, os.Getenv("TILEBOX_API_KEY"))
runner := workflows.NewTaskRunner(taskClient)
runner, err := workflows.NewTaskRunner(
workflows.NewTaskClient(
workflows.WithAPIKey(os.Getenv("TILEBOX_API_KEY")),
),
workflows.WithCluster("testing-4qgCk4qHH85qR7"),
)
if err != nil {
slog.Error("failed to create task runner", "error", err)
return
}

err := runner.RegisterTasks(
err = runner.RegisterTasks(
&HelloTask{},
)
if err != nil {
Expand All @@ -143,13 +134,4 @@ func main() {

runner.Run(context.Background())
}

func clientFromConfig(serverURL, authToken string) workflowsv1connect.TaskServiceClient {
return workflowsv1connect.NewTaskServiceClient(
grpc.RetryHTTPClient(), serverURL, connect.WithInterceptors(
grpc.NewAddAuthTokenInterceptor(func() string {
return authToken
})),
)
}
```
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/klauspost/compress v1.17.7 // indirect
github.com/remychantenay/slog-otel v1.3.0 // indirect
github.com/stretchr/testify v1.9.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.48.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ github.com/pkg/profile v1.7.0 h1:hnbDkaNWPCLMO9wGLdBFTIZvzDrDfBM2072E1S9gJkA=
github.com/pkg/profile v1.7.0/go.mod h1:8Uer0jas47ZQMJ7VD+OHknK4YDY07LPUC6dEvqDjvNo=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/remychantenay/slog-otel v1.3.0 h1:mppL97agkmwR416lKzltRQ9QRhrPdxwVidt0AnI3Ts4=
github.com/remychantenay/slog-otel v1.3.0/go.mod h1:L2VAe6WOMAk/kRzzuv2B/rWe/IDXAhUNae0919b4kHU=
github.com/rs/cors v1.10.1 h1:L0uuZVXIKlI1SShY2nhFfo44TYvDPQ1w4oFkUJNfhyo=
github.com/rs/cors v1.10.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
Expand Down
54 changes: 25 additions & 29 deletions observability/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
adapter "github.com/axiomhq/axiom-go/adapters/slog"
"github.com/axiomhq/axiom-go/axiom"
axiotel "github.com/axiomhq/axiom-go/axiom/otel"
slogotel "github.com/remychantenay/slog-otel"
workflowsv1 "github.com/tilebox/tilebox-go/protogen/go/workflows/v1"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
Expand All @@ -21,53 +21,49 @@ import (

var propagator = propagation.TraceContext{}

// AxiomLogHandler returns an Axiom handler for slog.
func AxiomLogHandler(dataset, token string, level slog.Level) (*adapter.Handler, error) {
// NewAxiomLogger returns a slog.Logger that logs to Axiom.
// It also returns a shutdown function that should be called when the logger is no longer needed, to ensure
// all logs are flushed.
func NewAxiomLogger(dataset, token string, level slog.Level) (*slog.Logger, func(), error) {
noShutdown := func() {}
client, err := axiom.NewClient(axiom.SetToken(token))
if err != nil {
return nil, err
return nil, noShutdown, err
}

return adapter.New(
axiomHandler, err := adapter.New(
adapter.SetDataset(dataset),
adapter.SetClient(client),
adapter.SetLevel(level),
)
}
if err != nil {
return nil, noShutdown, err
}

// AxiomTraceExporter returns an Axiom OpenTelemetry trace exporter.
func AxiomTraceExporter(ctx context.Context, dataset, token string) (trace.SpanExporter, error) {
return axiotel.TraceExporter(ctx, dataset, axiotel.SetToken(token))
return slog.New(slogotel.OtelHandler{Next: axiomHandler}), axiomHandler.Close, nil
}

func SetupOtelTracing(serviceName, serviceVersion string, exporters ...trace.SpanExporter) func(ctx context.Context) {
tp := tracerProvider(serviceName, serviceVersion, exporters)
otel.SetTracerProvider(tp)

shutDownFunc := func(ctx context.Context) {
_ = tp.Shutdown(ctx)
func NewAxiomTracerProvider(ctx context.Context, dataset, token, serviceName, serviceVersion string) (oteltrace.TracerProvider, func(), error) {
noShutdown := func() {}
exporter, err := axiotel.TraceExporter(ctx, dataset, axiotel.SetToken(token))
if err != nil {
return nil, noShutdown, err
}

return shutDownFunc
}

// tracerProvider configures and returns a new OpenTelemetry tracer provider.
func tracerProvider(serviceName, serviceVersion string, exporters []trace.SpanExporter) *trace.TracerProvider {
rs := resource.NewWithAttributes(
traceResource := resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(serviceName),
semconv.ServiceVersionKey.String(serviceVersion),
)

opts := []trace.TracerProviderOption{
trace.WithResource(rs),
}

for _, exporter := range exporters {
opts = append(opts, trace.WithBatcher(exporter, trace.WithMaxQueueSize(10*1024)))
provider := trace.NewTracerProvider(
trace.WithResource(traceResource),
trace.WithBatcher(exporter, trace.WithMaxQueueSize(10*1024)),
)
shutdown := func() {
_ = provider.Shutdown(ctx)
}

return trace.NewTracerProvider(opts...)
return provider, shutdown, nil
}

// generateTraceParent generates a random traceparent.
Expand Down
103 changes: 103 additions & 0 deletions workflows/v1/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package workflows

import (
"context"
"net"
"net/http"
"strings"

"connectrpc.com/connect"
"github.com/tilebox/tilebox-go/grpc"
"github.com/tilebox/tilebox-go/protogen/go/workflows/v1/workflowsv1connect"
)

// clientConfig contains the configuration for a gRPC client to a workflows service.
type clientConfig struct {
httpClient connect.HTTPClient
url string
apiKey string
connectOptions []connect.ClientOption
}

// ClientOption is an interface for configuring a client. Using such options helpers is a
// quite common pattern in Go, as it allows for optional parameters in constructors.
// This concrete implementation here is inspired by how libraries such as axiom-go and connect do their
// configuration.
type ClientOption func(*clientConfig)

func WithHTTPClient(httpClient connect.HTTPClient) ClientOption {
return func(cfg *clientConfig) {
cfg.httpClient = httpClient
}
}

func WithURL(url string) ClientOption {
return func(cfg *clientConfig) {
cfg.url = url
}
}

func WithAPIKey(apiKey string) ClientOption {
return func(cfg *clientConfig) {
cfg.apiKey = apiKey
}
}

func WithConnectClientOptions(options ...connect.ClientOption) ClientOption {
return func(cfg *clientConfig) {
cfg.connectOptions = append(cfg.connectOptions, options...)
}
}

func newClientConfig(options []ClientOption) *clientConfig {
cfg := &clientConfig{
url: "https://api.tilebox.com",
}
for _, option := range options {
option(cfg)
}

// if no http client is set by the user, we use a default one
if cfg.httpClient == nil {
// if the URL looks like an HTTP URL, we use a retrying HTTP client
if strings.HasPrefix(cfg.url, "https://") || strings.HasPrefix(cfg.url, "http://") {
cfg.httpClient = grpc.RetryHTTPClient()
} else { // we connect to a unix socket
dial := func(context.Context, string, string) (net.Conn, error) {
return net.Dial("unix", cfg.url)
}
transport := &http.Transport{DialContext: dial}
cfg.httpClient = &http.Client{Transport: transport}
}
}

return cfg
}

func NewTaskClient(options ...ClientOption) workflowsv1connect.TaskServiceClient {
cfg := newClientConfig(options)

return workflowsv1connect.NewTaskServiceClient(
cfg.httpClient,
cfg.url,
connect.WithClientOptions(cfg.connectOptions...),
connect.WithInterceptors(
grpc.NewAddAuthTokenInterceptor(func() string {
return cfg.apiKey
})),
)
}

func NewJobClient(options ...ClientOption) workflowsv1connect.JobServiceClient {
cfg := newClientConfig(options)

return workflowsv1connect.NewJobServiceClient(
cfg.httpClient,
cfg.url,
connect.WithClientOptions(cfg.connectOptions...),
connect.WithInterceptors(
grpc.NewAddAuthTokenInterceptor(func() string {
return cfg.apiKey
})),
)
}
36 changes: 34 additions & 2 deletions workflows/v1/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,47 @@ import (
"google.golang.org/protobuf/proto"
)

type jobServiceConfig struct {
tracerProvider trace.TracerProvider
tracerName string
}

type JobServiceOption func(*jobServiceConfig)

func WithJobServiceTracerProvider(tracerProvider trace.TracerProvider) JobServiceOption {
return func(cfg *jobServiceConfig) {
cfg.tracerProvider = tracerProvider
}
}

func WithJobServiceTracerName(tracerName string) JobServiceOption {
return func(cfg *jobServiceConfig) {
cfg.tracerName = tracerName
}
}

type JobService struct {
client workflowsv1connect.JobServiceClient
tracer trace.Tracer
}

func NewJobService(client workflowsv1connect.JobServiceClient) *JobService {
func newJobServiceConfig(options []JobServiceOption) *jobServiceConfig {
cfg := &jobServiceConfig{
tracerProvider: otel.GetTracerProvider(), // use the global tracer provider by default
tracerName: "tilebox.com/observability", // the default tracer name we use
}
for _, option := range options {
option(cfg)
}

return cfg
}

func NewJobService(client workflowsv1connect.JobServiceClient, options ...JobServiceOption) *JobService {
cfg := newJobServiceConfig(options)
return &JobService{
client: client,
tracer: otel.Tracer("tilebox.com/observability"),
tracer: cfg.tracerProvider.Tracer(cfg.tracerName),
}
}

Expand Down
Loading
Loading