From a9f57c7d00dda448fa05dcc1f74e75d6eaca4af4 Mon Sep 17 00:00:00 2001 From: Ravi Suhag Date: Tue, 18 Jun 2024 17:05:23 -0500 Subject: [PATCH] refactor: rename collection to collector (#81) --- app/server.go | 6 +++--- {collection => collector}/collector.go | 2 +- {collection => collector}/mock.go | 2 +- {collection => collector}/service.go | 2 +- {collection => collector}/service_test.go | 2 +- services/grpc/handler.go | 8 ++++---- services/grpc/handler_test.go | 6 +++--- services/grpc/service.go | 6 +++--- services/rest/handler.go | 10 +++++----- services/rest/service.go | 6 +++--- services/rest/websocket/handler.go | 10 +++++----- services/rest/websocket/handler_test.go | 6 +++--- services/services.go | 6 +++--- worker/worker.go | 6 +++--- worker/worker_test.go | 8 ++++---- 15 files changed, 43 insertions(+), 43 deletions(-) rename {collection => collector}/collector.go (95%) rename {collection => collector}/mock.go (92%) rename {collection => collector}/service.go (94%) rename {collection => collector}/service_test.go (96%) diff --git a/app/server.go b/app/server.go index c43708bd..5efaa093 100644 --- a/app/server.go +++ b/app/server.go @@ -11,7 +11,7 @@ import ( "time" pubsubsdk "cloud.google.com/go/pubsub" - "github.com/raystack/raccoon/collection" + "github.com/raystack/raccoon/collector" "github.com/raystack/raccoon/config" "github.com/raystack/raccoon/logger" "github.com/raystack/raccoon/metrics" @@ -30,7 +30,7 @@ type Publisher interface { // StartServer starts the server func StartServer(ctx context.Context, cancel context.CancelFunc) { - bufferChannel := make(chan collection.CollectRequest, config.Worker.ChannelSize) + bufferChannel := make(chan collector.CollectRequest, config.Worker.ChannelSize) httpServices := services.Create(bufferChannel) logger.Info("Start Server -->") httpServices.Start(ctx, cancel) @@ -50,7 +50,7 @@ func StartServer(ctx context.Context, cancel context.CancelFunc) { go shutDownServer(ctx, cancel, httpServices, bufferChannel, workerPool, publisher) } -func shutDownServer(ctx context.Context, cancel context.CancelFunc, httpServices services.Services, bufferChannel chan collection.CollectRequest, workerPool *worker.Pool, pub Publisher) { +func shutDownServer(ctx context.Context, cancel context.CancelFunc, httpServices services.Services, bufferChannel chan collector.CollectRequest, workerPool *worker.Pool, pub Publisher) { signalChan := make(chan os.Signal) signal.Notify(signalChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) for { diff --git a/collection/collector.go b/collector/collector.go similarity index 95% rename from collection/collector.go rename to collector/collector.go index 716637b2..041c3992 100644 --- a/collection/collector.go +++ b/collector/collector.go @@ -1,4 +1,4 @@ -package collection +package collector import ( "context" diff --git a/collection/mock.go b/collector/mock.go similarity index 92% rename from collection/mock.go rename to collector/mock.go index 443a7ec9..9f6c4142 100644 --- a/collection/mock.go +++ b/collector/mock.go @@ -1,4 +1,4 @@ -package collection +package collector import ( "context" diff --git a/collection/service.go b/collector/service.go similarity index 94% rename from collection/service.go rename to collector/service.go index 7eb1434e..0fed828c 100644 --- a/collection/service.go +++ b/collector/service.go @@ -1,4 +1,4 @@ -package collection +package collector import ( "context" diff --git a/collection/service_test.go b/collector/service_test.go similarity index 96% rename from collection/service_test.go rename to collector/service_test.go index c668b329..8b3f8d4c 100644 --- a/collection/service_test.go +++ b/collector/service_test.go @@ -1,4 +1,4 @@ -package collection +package collector import ( "reflect" diff --git a/services/grpc/handler.go b/services/grpc/handler.go index 1b021d65..e027c9f8 100644 --- a/services/grpc/handler.go +++ b/services/grpc/handler.go @@ -5,7 +5,7 @@ import ( "errors" "time" - "github.com/raystack/raccoon/collection" + "github.com/raystack/raccoon/collector" "github.com/raystack/raccoon/config" "github.com/raystack/raccoon/identification" "github.com/raystack/raccoon/logger" @@ -15,7 +15,7 @@ import ( ) type Handler struct { - C collection.Collector + C collector.Collector pb.UnimplementedEventServiceServer } @@ -49,7 +49,7 @@ func (h *Handler) SendEvent(ctx context.Context, req *pb.SendEventRequest) (*pb. h.sendEventCounters(req.Events, identifier.Group) responseChannel := make(chan *pb.SendEventResponse, 1) - h.C.Collect(ctx, &collection.CollectRequest{ + h.C.Collect(ctx, &collector.CollectRequest{ ConnectionIdentifier: identifier, TimeConsumed: timeConsumed, SendEventRequest: req, @@ -59,7 +59,7 @@ func (h *Handler) SendEvent(ctx context.Context, req *pb.SendEventRequest) (*pb. } -func (h *Handler) Ack(responseChannel chan *pb.SendEventResponse, reqGuid, connGroup string) collection.AckFunc { +func (h *Handler) Ack(responseChannel chan *pb.SendEventResponse, reqGuid, connGroup string) collector.AckFunc { switch config.Event.Ack { case config.Asynchronous: responseChannel <- &pb.SendEventResponse{ diff --git a/services/grpc/handler_test.go b/services/grpc/handler_test.go index 996f1a01..3ed2cdd9 100644 --- a/services/grpc/handler_test.go +++ b/services/grpc/handler_test.go @@ -5,7 +5,7 @@ import ( "reflect" "testing" - "github.com/raystack/raccoon/collection" + "github.com/raystack/raccoon/collector" "github.com/raystack/raccoon/config" "github.com/raystack/raccoon/logger" "github.com/raystack/raccoon/metrics" @@ -23,7 +23,7 @@ func (v void) Write(_ []byte) (int, error) { func TestHandler_SendEvent(t *testing.T) { type fields struct { - C collection.Collector + C collector.Collector UnimplementedEventServiceServer pb.UnimplementedEventServiceServer } type args struct { @@ -33,7 +33,7 @@ func TestHandler_SendEvent(t *testing.T) { logger.SetOutput(void{}) metrics.SetVoid() - collector := new(collection.MockCollector) + collector := new(collector.MockCollector) ctx := context.Background() meta := metadata.MD{} meta.Set(config.ServerWs.ConnGroupHeader, "group") diff --git a/services/grpc/service.go b/services/grpc/service.go index cc12dd98..fa2bc1f4 100644 --- a/services/grpc/service.go +++ b/services/grpc/service.go @@ -5,18 +5,18 @@ import ( "fmt" "net" - "github.com/raystack/raccoon/collection" + "github.com/raystack/raccoon/collector" "github.com/raystack/raccoon/config" pb "github.com/raystack/raccoon/proto" "google.golang.org/grpc" ) type Service struct { - Collector collection.Collector + Collector collector.Collector s *grpc.Server } -func NewGRPCService(c collection.Collector) *Service { +func NewGRPCService(c collector.Collector) *Service { server := grpc.NewServer() pb.RegisterEventServiceServer(server, &Handler{C: c}) return &Service{ diff --git a/services/rest/handler.go b/services/rest/handler.go index ce87f6e8..66e7b735 100644 --- a/services/rest/handler.go +++ b/services/rest/handler.go @@ -7,7 +7,7 @@ import ( "net/http" "time" - "github.com/raystack/raccoon/collection" + "github.com/raystack/raccoon/collector" "github.com/raystack/raccoon/config" "github.com/raystack/raccoon/deserialization" "github.com/raystack/raccoon/identification" @@ -28,10 +28,10 @@ type serDe struct { } type Handler struct { serDeMap map[string]*serDe - collector collection.Collector + collector collector.Collector } -func NewHandler(collector collection.Collector) *Handler { +func NewHandler(collector collector.Collector) *Handler { serDeMap := make(map[string]*serDe) serDeMap[ContentJSON] = &serDe{ serializer: serialization.SerializeJSON, @@ -128,7 +128,7 @@ func (h *Handler) RESTAPIHandler(rw http.ResponseWriter, r *http.Request) { h.sendEventCounters(req.Events, identifier.Group) resChannel := make(chan struct{}, 1) - h.collector.Collect(r.Context(), &collection.CollectRequest{ + h.collector.Collect(r.Context(), &collector.CollectRequest{ ConnectionIdentifier: identifier, TimeConsumed: timeConsumed, SendEventRequest: req, @@ -137,7 +137,7 @@ func (h *Handler) RESTAPIHandler(rw http.ResponseWriter, r *http.Request) { <-resChannel } -func (h *Handler) Ack(rw http.ResponseWriter, resChannel chan struct{}, s serialization.SerializeFunc, reqGuid string, connGroup string) collection.AckFunc { +func (h *Handler) Ack(rw http.ResponseWriter, resChannel chan struct{}, s serialization.SerializeFunc, reqGuid string, connGroup string) collector.AckFunc { res := &Response{ SendEventResponse: &pb.SendEventResponse{}, } diff --git a/services/rest/service.go b/services/rest/service.go index 5942cc45..e762da50 100644 --- a/services/rest/service.go +++ b/services/rest/service.go @@ -6,7 +6,7 @@ import ( "time" "github.com/gorilla/mux" - "github.com/raystack/raccoon/collection" + "github.com/raystack/raccoon/collector" "github.com/raystack/raccoon/config" "github.com/raystack/raccoon/metrics" "github.com/raystack/raccoon/middleware" @@ -15,11 +15,11 @@ import ( ) type Service struct { - Collector collection.Collector + Collector collector.Collector s *http.Server } -func NewRestService(c collection.Collector) *Service { +func NewRestService(c collector.Collector) *Service { pingChannel := make(chan connection.Conn, config.ServerWs.ServerMaxConn) wh := websocket.NewHandler(pingChannel, c) go websocket.Pinger(pingChannel, config.ServerWs.PingerSize, config.ServerWs.PingInterval, config.ServerWs.WriteWaitInterval) diff --git a/services/rest/websocket/handler.go b/services/rest/websocket/handler.go index bdd68e32..561d6d94 100644 --- a/services/rest/websocket/handler.go +++ b/services/rest/websocket/handler.go @@ -6,7 +6,7 @@ import ( "time" "github.com/gorilla/websocket" - "github.com/raystack/raccoon/collection" + "github.com/raystack/raccoon/collector" "github.com/raystack/raccoon/config" "github.com/raystack/raccoon/deserialization" "github.com/raystack/raccoon/logger" @@ -23,7 +23,7 @@ type serDe struct { type Handler struct { upgrader *connection.Upgrader serdeMap map[int]*serDe - collector collection.Collector + collector collector.Collector PingChannel chan connection.Conn } @@ -41,7 +41,7 @@ func getSerDeMap() map[int]*serDe { return serDeMap } -func NewHandler(pingC chan connection.Conn, collector collection.Collector) *Handler { +func NewHandler(pingC chan connection.Conn, collector collector.Collector) *Handler { ugConfig := connection.UpgraderConfig{ ReadBufferSize: config.ServerWs.ReadBufferSize, WriteBufferSize: config.ServerWs.WriteBufferSize, @@ -115,7 +115,7 @@ func (h *Handler) HandlerWSEvents(w http.ResponseWriter, r *http.Request) { metrics.Increment("batches_read_total", map[string]string{"status": "success", "conn_group": conn.Identifier.Group, "reason": "NA"}) h.sendEventCounters(payload.Events, conn.Identifier.Group) - h.collector.Collect(r.Context(), &collection.CollectRequest{ + h.collector.Collect(r.Context(), &collector.CollectRequest{ ConnectionIdentifier: conn.Identifier, TimeConsumed: timeConsumed, SendEventRequest: payload, @@ -124,7 +124,7 @@ func (h *Handler) HandlerWSEvents(w http.ResponseWriter, r *http.Request) { } } -func (h *Handler) Ack(conn connection.Conn, resChannel chan AckInfo, s serialization.SerializeFunc, messageType int, reqGuid string, timeConsumed time.Time) collection.AckFunc { +func (h *Handler) Ack(conn connection.Conn, resChannel chan AckInfo, s serialization.SerializeFunc, messageType int, reqGuid string, timeConsumed time.Time) collector.AckFunc { switch config.Event.Ack { case config.Asynchronous: writeSuccessResponse(conn, s, messageType, reqGuid) diff --git a/services/rest/websocket/handler_test.go b/services/rest/websocket/handler_test.go index 4b9797b6..d2f4107b 100644 --- a/services/rest/websocket/handler_test.go +++ b/services/rest/websocket/handler_test.go @@ -10,7 +10,7 @@ import ( "github.com/gorilla/mux" "github.com/gorilla/websocket" - "github.com/raystack/raccoon/collection" + "github.com/raystack/raccoon/collector" "github.com/raystack/raccoon/logger" "github.com/raystack/raccoon/metrics" pb "github.com/raystack/raccoon/proto" @@ -66,7 +66,7 @@ func TestNewHandler(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := NewHandler(tt.args.pingC, &collection.MockCollector{}); !reflect.DeepEqual(got, tt.want) { + if got := NewHandler(tt.args.pingC, &collector.MockCollector{}); !reflect.DeepEqual(got, tt.want) { t.Errorf("NewHandler() = %v, want %v", got, tt.want) } }) @@ -188,7 +188,7 @@ func TestHandler_GETHandlerWSEvents(t *testing.T) { } func getRouter(hlr *Handler) http.Handler { - collector := new(collection.MockCollector) + collector := new(collector.MockCollector) collector.On("Collect", mock.Anything, mock.Anything).Return(nil) router := mux.NewRouter() subRouter := router.PathPrefix("/api/v1").Subrouter() diff --git a/services/services.go b/services/services.go index 94dc8bf3..aa60774a 100644 --- a/services/services.go +++ b/services/services.go @@ -4,7 +4,7 @@ import ( "context" "net/http" - "github.com/raystack/raccoon/collection" + "github.com/raystack/raccoon/collector" "github.com/raystack/raccoon/logger" "github.com/raystack/raccoon/services/grpc" "github.com/raystack/raccoon/services/pprof" @@ -43,8 +43,8 @@ func (s *Services) Shutdown(ctx context.Context) { } } -func Create(b chan collection.CollectRequest) Services { - c := collection.NewChannelCollector(b) +func Create(b chan collector.CollectRequest) Services { + c := collector.NewChannelCollector(b) return Services{ b: []bootstrapper{ grpc.NewGRPCService(c), diff --git a/worker/worker.go b/worker/worker.go index bebe2cd4..e56a2c98 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -5,7 +5,7 @@ import ( "sync" "time" - "github.com/raystack/raccoon/collection" + "github.com/raystack/raccoon/collector" "github.com/raystack/raccoon/logger" "github.com/raystack/raccoon/metrics" pb "github.com/raystack/raccoon/proto" @@ -25,13 +25,13 @@ type Producer interface { type Pool struct { Size int deliveryChannelSize int - EventsChannel <-chan collection.CollectRequest + EventsChannel <-chan collector.CollectRequest producer Producer wg sync.WaitGroup } // CreateWorkerPool create new Pool struct given size and EventsChannel worker. -func CreateWorkerPool(size int, eventsChannel <-chan collection.CollectRequest, deliveryChannelSize int, producer Producer) *Pool { +func CreateWorkerPool(size int, eventsChannel <-chan collector.CollectRequest, deliveryChannelSize int, producer Producer) *Pool { return &Pool{ Size: size, deliveryChannelSize: deliveryChannelSize, diff --git a/worker/worker_test.go b/worker/worker_test.go index 89b9b101..7853dc4e 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -5,7 +5,7 @@ import ( "testing" "time" - "github.com/raystack/raccoon/collection" + "github.com/raystack/raccoon/collector" "github.com/raystack/raccoon/identification" pb "github.com/raystack/raccoon/proto" "github.com/stretchr/testify/assert" @@ -14,7 +14,7 @@ import ( ) func TestWorker(t *testing.T) { - request := &collection.CollectRequest{ + request := &collector.CollectRequest{ ConnectionIdentifier: identification.Identifier{ ID: "12345", Group: "viewer", @@ -31,7 +31,7 @@ func TestWorker(t *testing.T) { m.On("Timing", "processing.latency", mock.Anything, "") m.On("Count", "kafka_messages_delivered_total", 0, "success=true") m.On("Count", "kafka_messages_delivered_total", 0, "success=false") - bc := make(chan collection.CollectRequest, 2) + bc := make(chan collector.CollectRequest, 2) worker := Pool{ Size: 1, deliveryChannelSize: 0, @@ -54,7 +54,7 @@ func TestWorker(t *testing.T) { t.Run("Flush", func(t *testing.T) { t.Run("Should block until all messages is processed", func(t *testing.T) { kp := mockKafkaPublisher{} - bc := make(chan collection.CollectRequest, 2) + bc := make(chan collector.CollectRequest, 2) m := &mockMetric{} m.On("Timing", "processing.latency", mock.Anything, "") m.On("Count", "kafka_messages_delivered_total", 0, "success=false")