Skip to content

Commit

Permalink
Create a dispatcher framework for managing request lifecycle. (#1508)
Browse files Browse the repository at this point in the history
  • Loading branch information
keyurva authored Jan 24, 2025
1 parent 43c4b00 commit 3933206
Show file tree
Hide file tree
Showing 6 changed files with 253 additions and 23 deletions.
18 changes: 16 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ import (
"github.com/datacommonsorg/mixer/internal/server/cache"
"github.com/datacommonsorg/mixer/internal/server/datasource"
"github.com/datacommonsorg/mixer/internal/server/datasources"
"github.com/datacommonsorg/mixer/internal/server/dispatcher"
"github.com/datacommonsorg/mixer/internal/server/healthcheck"
"github.com/datacommonsorg/mixer/internal/server/remote"
"github.com/datacommonsorg/mixer/internal/server/spanner"
"github.com/datacommonsorg/mixer/internal/server/v3/observation"
"github.com/datacommonsorg/mixer/internal/sqldb"
"github.com/datacommonsorg/mixer/internal/store"
"github.com/datacommonsorg/mixer/internal/store/bigtable"
Expand Down Expand Up @@ -278,9 +280,21 @@ func main() {
sources = append(sources, &ds)
}

// Create server object
// DataSources
dataSources := datasources.NewDataSources(sources)
mixerServer := server.NewMixerServer(store, metadata, c, mapsClient, dataSources)

// Processors
processors := []*dispatcher.Processor{}
if *enableV3 {
var calculationProcessor dispatcher.Processor = &observation.CalculationProcessor{}
processors = append(processors, &calculationProcessor)
}

// Dispatcher
dispatcher := dispatcher.NewDispatcher(processors, dataSources)

// Create server object
mixerServer := server.NewMixerServer(store, metadata, c, mapsClient, dispatcher)
pbs.RegisterMixerServer(srv, mixerServer)

// Subscribe to branch cache update
Expand Down
154 changes: 154 additions & 0 deletions internal/server/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dispatcher

import (
"context"

"github.com/datacommonsorg/mixer/internal/server/datasources"
"google.golang.org/protobuf/proto"

pbv2 "github.com/datacommonsorg/mixer/internal/proto/v2"
)

// RequestType represents the type of request.
type RequestType string

const (
TypeNode RequestType = "Node"
TypeNodeSearch RequestType = "NodeSearch"
TypeObservation RequestType = "Observation"
TypeResolve RequestType = "Resolve"
)

// RequestContext holds the context for a given request.

// NOTE: We are using the base proto.Message for requests and responses.
// Other options were using generics or going with different *RequestContext struct for each type of request.
// The downside of using a base type is that it needs casting wherever it it used.
// The upside is that we only have one context object
// and if there aren't many processors that deal with the RequestContext, casting in a small number of places is ok.
// We can revisit and use a different approach if this proves to be cumbersome.
type RequestContext struct {
context.Context
Type RequestType
OriginalRequest proto.Message
CurrentRequest proto.Message
CurrentResponse proto.Message
}

// Processor interface defines methods for performing pre and post processing operations.
type Processor interface {
PreProcess(*RequestContext) error
PostProcess(*RequestContext) error
}

// Dispatcher struct handles requests by dispatching requests to various processors and datasources as appropriate.
type Dispatcher struct {
processors []*Processor
sources *datasources.DataSources
}

func NewDispatcher(processors []*Processor, sources *datasources.DataSources) *Dispatcher {
return &Dispatcher{
processors: processors,
sources: sources,
}
}

// handle handles a request lifecycle - pre-processing, core handling and post-processing.
func (dispatcher *Dispatcher) handle(requestContext *RequestContext, handler func(context.Context, proto.Message) (proto.Message, error)) (proto.Message, error) {
for _, processor := range dispatcher.processors {
if err := (*processor).PreProcess(requestContext); err != nil {
return nil, err
}
}

response, err := handler(requestContext.Context, requestContext.CurrentRequest)
if err != nil {
return nil, err
}

requestContext.CurrentResponse = response

for _, processor := range dispatcher.processors {
if err := (*processor).PostProcess(requestContext); err != nil {
return nil, err
}
}

return requestContext.CurrentResponse, nil
}

func (dispatcher *Dispatcher) Node(ctx context.Context, in *pbv2.NodeRequest) (*pbv2.NodeResponse, error) {
requestContext := newRequestContext(ctx, in, TypeNode)

response, err := dispatcher.handle(requestContext, func(ctx context.Context, request proto.Message) (proto.Message, error) {
return dispatcher.sources.Node(ctx, request.(*pbv2.NodeRequest))
})

if err != nil {
return nil, err
}
return response.(*pbv2.NodeResponse), nil
}

func (dispatcher *Dispatcher) Observation(ctx context.Context, in *pbv2.ObservationRequest) (*pbv2.ObservationResponse, error) {
requestContext := newRequestContext(ctx, in, TypeObservation)

response, err := dispatcher.handle(requestContext, func(ctx context.Context, request proto.Message) (proto.Message, error) {
return dispatcher.sources.Observation(ctx, request.(*pbv2.ObservationRequest))
})

if err != nil {
return nil, err
}
return response.(*pbv2.ObservationResponse), nil
}

func (dispatcher *Dispatcher) NodeSearch(ctx context.Context, in *pbv2.NodeSearchRequest) (*pbv2.NodeSearchResponse, error) {
requestContext := newRequestContext(ctx, in, TypeNodeSearch)

response, err := dispatcher.handle(requestContext, func(ctx context.Context, request proto.Message) (proto.Message, error) {
return dispatcher.sources.NodeSearch(ctx, request.(*pbv2.NodeSearchRequest))
})

if err != nil {
return nil, err
}
return response.(*pbv2.NodeSearchResponse), nil
}

func (dispatcher *Dispatcher) Resolve(ctx context.Context, in *pbv2.ResolveRequest) (*pbv2.ResolveResponse, error) {
requestContext := newRequestContext(ctx, in, TypeResolve)

response, err := dispatcher.handle(requestContext, func(ctx context.Context, request proto.Message) (proto.Message, error) {
return dispatcher.sources.Resolve(ctx, request.(*pbv2.ResolveRequest))
})

if err != nil {
return nil, err
}
return response.(*pbv2.ResolveResponse), nil
}

func newRequestContext(ctx context.Context, request proto.Message, requestType RequestType) *RequestContext {
return &RequestContext{
Context: ctx,
Type: requestType,
OriginalRequest: proto.Clone(request),
CurrentRequest: request,
}
}
8 changes: 4 additions & 4 deletions internal/server/handler_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,26 @@ import (
func (s *Server) V3Node(ctx context.Context, in *pbv2.NodeRequest) (
*pbv2.NodeResponse, error,
) {
return s.dataSources.Node(ctx, in)
return s.dispatcher.Node(ctx, in)
}

// V3Observation implements API for mixer.V3Observation.
func (s *Server) V3Observation(ctx context.Context, in *pbv2.ObservationRequest) (
*pbv2.ObservationResponse, error,
) {
return s.dataSources.Observation(ctx, in)
return s.dispatcher.Observation(ctx, in)
}

// V3NodeSearch implements API for mixer.V3NodeSearch.
func (s *Server) V3NodeSearch(ctx context.Context, in *pbv2.NodeSearchRequest) (
*pbv2.NodeSearchResponse, error,
) {
return s.dataSources.NodeSearch(ctx, in)
return s.dispatcher.NodeSearch(ctx, in)
}

// V3Resolve implements API for mixer.V3Resolve.
func (s *Server) V3Resolve(ctx context.Context, in *pbv2.ResolveRequest) (
*pbv2.ResolveResponse, error,
) {
return s.dataSources.Resolve(ctx, in)
return s.dispatcher.Resolve(ctx, in)
}
28 changes: 14 additions & 14 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/datacommonsorg/mixer/internal/parser/mcf"
dcpubsub "github.com/datacommonsorg/mixer/internal/pubsub"
"github.com/datacommonsorg/mixer/internal/server/cache"
"github.com/datacommonsorg/mixer/internal/server/datasources"
"github.com/datacommonsorg/mixer/internal/server/dispatcher"
"github.com/datacommonsorg/mixer/internal/server/resource"
"github.com/datacommonsorg/mixer/internal/store"
"github.com/datacommonsorg/mixer/internal/store/bigtable"
Expand All @@ -42,12 +42,12 @@ import (

// Server holds resources for a mixer server
type Server struct {
store *store.Store
metadata *resource.Metadata
cachedata atomic.Pointer[cache.Cache]
mapsClient *maps.Client
httpClient *http.Client
dataSources *datasources.DataSources
store *store.Store
metadata *resource.Metadata
cachedata atomic.Pointer[cache.Cache]
mapsClient *maps.Client
httpClient *http.Client
dispatcher *dispatcher.Dispatcher
}

func (s *Server) updateBranchTable(ctx context.Context, branchTableName string) error {
Expand Down Expand Up @@ -149,15 +149,15 @@ func NewMixerServer(
metadata *resource.Metadata,
cachedata *cache.Cache,
mapsClient *maps.Client,
dataSources *datasources.DataSources,
dispatcher *dispatcher.Dispatcher,
) *Server {
s := &Server{
store: store,
metadata: metadata,
cachedata: atomic.Pointer[cache.Cache]{},
mapsClient: mapsClient,
httpClient: &http.Client{},
dataSources: dataSources,
store: store,
metadata: metadata,
cachedata: atomic.Pointer[cache.Cache]{},
mapsClient: mapsClient,
httpClient: &http.Client{},
dispatcher: dispatcher,
}
s.cachedata.Store(cachedata)
return s
Expand Down
50 changes: 50 additions & 0 deletions internal/server/v3/observation/calculation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package observation

import (
"log"

"github.com/datacommonsorg/mixer/internal/server/datasources"
"github.com/datacommonsorg/mixer/internal/server/dispatcher"
)

// CalculationProcessor implements the dispatcher.Processor interface for performing calculations.
type CalculationProcessor struct {
// Set and use datasources if needed.
_ *datasources.DataSources
}

func (processor *CalculationProcessor) PreProcess(requestContext *dispatcher.RequestContext) error {
switch requestContext.Type {
case dispatcher.TypeObservation:
log.Printf("Pre-processing observation request.")
return nil
default:
log.Printf("NOT pre-processing request of type: %s", requestContext.Type)
return nil
}
}

func (processor *CalculationProcessor) PostProcess(requestContext *dispatcher.RequestContext) error {
switch requestContext.Type {
case dispatcher.TypeObservation:
log.Printf("Post-processing observation request.")
return nil
default:
log.Printf("NOT post-processing request of type: %s", requestContext.Type)
return nil
}
}
18 changes: 15 additions & 3 deletions test/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@ import (
"github.com/datacommonsorg/mixer/internal/server/cache"
"github.com/datacommonsorg/mixer/internal/server/datasource"
"github.com/datacommonsorg/mixer/internal/server/datasources"
"github.com/datacommonsorg/mixer/internal/server/dispatcher"
"github.com/datacommonsorg/mixer/internal/server/remote"
"github.com/datacommonsorg/mixer/internal/server/resource"
"github.com/datacommonsorg/mixer/internal/server/spanner"
"github.com/datacommonsorg/mixer/internal/server/v3/observation"
"github.com/datacommonsorg/mixer/internal/sqldb"
"github.com/datacommonsorg/mixer/internal/store"
"github.com/datacommonsorg/mixer/internal/store/bigtable"
Expand Down Expand Up @@ -203,7 +205,17 @@ func setupInternal(
}

dataSources := datasources.NewDataSources(sources)
return newClient(st, tables, metadata, c, mapsClient, dataSources)
// Processors
processors := []*dispatcher.Processor{}
if enableV3 {
var calculationProcessor dispatcher.Processor = &observation.CalculationProcessor{}
processors = append(processors, &calculationProcessor)
}

// Dispatcher
dispatcher := dispatcher.NewDispatcher(processors, dataSources)

return newClient(st, tables, metadata, c, mapsClient, dispatcher)
}

// SetupBqOnly creates local server and client with access to BigQuery only.
Expand Down Expand Up @@ -243,9 +255,9 @@ func newClient(
metadata *resource.Metadata,
cachedata *cache.Cache,
mapsClient *maps.Client,
dataSources *datasources.DataSources,
dispatcher *dispatcher.Dispatcher,
) (pbs.MixerClient, error) {
mixerServer := server.NewMixerServer(mixerStore, metadata, cachedata, mapsClient, dataSources)
mixerServer := server.NewMixerServer(mixerStore, metadata, cachedata, mapsClient, dispatcher)
srv := grpc.NewServer()
pbs.RegisterMixerServer(srv, mixerServer)
reflection.Register(srv)
Expand Down

0 comments on commit 3933206

Please sign in to comment.