From 7bb5c282ffe48c36baf9152e76204245a38fc241 Mon Sep 17 00:00:00 2001 From: Tyler Smith Date: Mon, 19 Dec 2016 02:46:12 -0800 Subject: [PATCH] REFACTOR: Make API Gateway closable. --- api/gateway.go | 95 +++++++++++++++++++++--------------------------- openbazaard.go | 97 ++++++++++++++++++++++++-------------------------- 2 files changed, 87 insertions(+), 105 deletions(-) diff --git a/api/gateway.go b/api/gateway.go index 72c99c4b2b..78e7587f9e 100644 --- a/api/gateway.go +++ b/api/gateway.go @@ -1,29 +1,35 @@ package api import ( - manet "gx/ipfs/QmPpRcbNUXauP3zWZ1NJMLWpe4QnmEHrd2ba2D3yqWznw7/go-multiaddr-net" - "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess" "net" "net/http" "time" "github.com/OpenBazaar/openbazaar-go/core" "github.com/OpenBazaar/openbazaar-go/repo" - "github.com/ipfs/go-ipfs/commands" "github.com/ipfs/go-ipfs/core/corehttp" "github.com/op/go-logging" ) var log = logging.MustGetLogger("api") -func makeHandler(n *core.OpenBazaarNode, ctx commands.Context, authCookie http.Cookie, l net.Listener, config repo.APIConfig, options ...corehttp.ServeOption) (http.Handler, error) { +// Gateway represents an HTTP API gateway +type Gateway struct { + listener net.Listener + handler http.Handler + config repo.APIConfig + shutdownCh chan struct{} +} + +// NewGateway instantiates a new `Gateway` +func NewGateway(n *core.OpenBazaarNode, authCookie http.Cookie, l net.Listener, config repo.APIConfig, options ...corehttp.ServeOption) (*Gateway, error) { topMux := http.NewServeMux() restAPI, err := newJsonAPIHandler(n, authCookie, config) if err != nil { return nil, err } - wsAPI, err := newWSAPIHandler(n, ctx, config.Authenticated, authCookie, config.Username, config.Password) + wsAPI, err := newWSAPIHandler(n, n.Context, config.Authenticated, authCookie, config.Username, config.Password) if err != nil { return nil, err } @@ -35,66 +41,47 @@ func makeHandler(n *core.OpenBazaarNode, ctx commands.Context, authCookie http.C mux := topMux for _, option := range options { - var err error mux, err = option(n.IpfsNode, l, mux) if err != nil { return nil, err } } - return topMux, nil -} -func Serve(cb chan<- bool, node *core.OpenBazaarNode, ctx commands.Context, authCookie http.Cookie, lis net.Listener, config repo.APIConfig, options ...corehttp.ServeOption) error { - handler, err := makeHandler(node, ctx, authCookie, lis, config, options...) - cb <- true - if err != nil { - return err - } + return &Gateway{ + listener: l, + handler: topMux, + config: config, + shutdownCh: make(chan struct{}), + }, nil +} - addr, err := manet.FromNetAddr(lis.Addr()) - if err != nil { - return err - } +// Close shutsdown the Gateway listener +func (g *Gateway) Close() error { + log.Infof("server at %s terminating...", g.listener.Addr()) - // If the server exits beforehand - var serverError error - serverExited := make(chan struct{}) + // Print shutdown message every few seconds if we're taking too long + go func() { + select { + case <-g.shutdownCh: + return + case <-time.After(5 * time.Second): + log.Infof("waiting for server at %s to terminate...", g.listener.Addr()) - node.IpfsNode.Process().Go(func(p goprocess.Process) { - if config.SSL { - serverError = http.ListenAndServeTLS(lis.Addr().String(), config.SSLCert, config.SSLKey, handler) - } else { - serverError = http.Serve(lis, handler) } - close(serverExited) - }) - - // Wait for server to exit - select { - case <-serverExited: + }() - // If node being closed before server exits, close server - case <-node.IpfsNode.Process().Closing(): - log.Infof("server at %s terminating...", addr) - if config.SSL { - close(serverExited) - } else { - lis.Close() - } + // Shutdown the listener + close(g.shutdownCh) + return g.listener.Close() +} - outer: - for { - // Wait until server exits - select { - case <-serverExited: - // If the server exited as we are closing, we really do not care about errors - serverError = nil - break outer - case <-time.After(5 * time.Second): - log.Infof("waiting for server at %s to terminate...", addr) - } - } +// Serve begins listening on the configured address +func (g *Gateway) Serve() error { + var err error + if g.config.SSL { + err = http.ListenAndServeTLS(g.listener.Addr().String(), g.config.SSLCert, g.config.SSLKey, g.handler) + } else { + err = http.Serve(g.listener, g.handler) } - log.Infof("server at %s terminated", addr) - return serverError + return err } diff --git a/openbazaard.go b/openbazaard.go index 9a8ea99d8c..75792ceeb6 100644 --- a/openbazaard.go +++ b/openbazaard.go @@ -111,6 +111,8 @@ var decryptDatabase DecryptDatabase var parser = flags.NewParser(nil, flags.Default) +var ErrNoGateways = errors.New("No gateway addresses configured") + func main() { c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) @@ -505,46 +507,40 @@ func (x *Start) Execute(args []string) error { UserAgent: USERAGENT, } - var gwErrc <-chan error - var cb <-chan bool - if len(cfg.Addresses.Gateway) > 0 { - if (apiConfig.SSL && apiConfig.SSLCert == "") || (apiConfig.SSL && apiConfig.SSLKey == "") { - return errors.New("SSL cert and key files must be set when SSL is enabled") - } - err, cb, gwErrc = serveHTTPGateway(core.Node, authCookie, *apiConfig) - if err != nil { - log.Error(err) - return err - } + if len(cfg.Addresses.Gateway) <= 0 { + return ErrNoGateways + } + if (apiConfig.SSL && apiConfig.SSLCert == "") || (apiConfig.SSL && apiConfig.SSLKey == "") { + return errors.New("SSL cert and key files must be set when SSL is enabled") } - /* Wait for gateway to start before starting the network service. - This way the websocket channel we pass into the service gets created first. - FIXME: There has to be a better way */ - for b := range cb { - if b == true { - core.Node.Service = service.New(core.Node, ctx, sqliteDB) - MR := ret.NewMessageRetriever(sqliteDB, ctx, nd, core.Node.Service, 16, core.Node.SendOfflineAck) - go MR.Run() - core.Node.MessageRetriever = MR - PR := rep.NewPointerRepublisher(nd, sqliteDB) - go PR.Run() - core.Node.PointerRepublisher = PR - if !x.DisableWallet { - MR.Wait() - TL := lis.NewTransactionListener(core.Node.Datastore, core.Node.Broadcast, core.Node.Wallet.Params()) - wallet.AddTransactionListener(TL.OnTransactionReceived) - log.Info("Starting bitcoin wallet...") - go wallet.Start() - } - core.Node.UpdateFollow() - core.Node.SeedNode() - } - break + gateway, err := newHTTPGateway(core.Node, authCookie, *apiConfig) + if err != nil { + log.Error(err) + return err } - for err := range gwErrc { - fmt.Println(err) + core.Node.Service = service.New(core.Node, ctx, sqliteDB) + MR := ret.NewMessageRetriever(sqliteDB, ctx, nd, core.Node.Service, 16, core.Node.SendOfflineAck) + go MR.Run() + core.Node.MessageRetriever = MR + PR := rep.NewPointerRepublisher(nd, sqliteDB) + go PR.Run() + core.Node.PointerRepublisher = PR + if !x.DisableWallet { + MR.Wait() + TL := lis.NewTransactionListener(core.Node.Datastore, core.Node.Broadcast, core.Node.Wallet.Params()) + wallet.AddTransactionListener(TL.OnTransactionReceived) + log.Info("Starting bitcoin wallet...") + go wallet.Start() + } + core.Node.UpdateFollow() + core.Node.SeedNode() + + // Start gateway + err = gateway.Serve() + if err != nil { + log.Error(err) } return nil @@ -596,37 +592,40 @@ func (d *DummyListener) Close() error { } // Collects options, creates listener, prints status message and starts serving requests -func serveHTTPGateway(node *core.OpenBazaarNode, authCookie http.Cookie, config repo.APIConfig) (error, <-chan bool, <-chan error) { +func newHTTPGateway(node *core.OpenBazaarNode, authCookie http.Cookie, config repo.APIConfig) (*api.Gateway, error) { + // Get API configuration cfg, err := node.Context.GetConfig() if err != nil { - return err, nil, nil + return nil, err } + // Create a network listener gatewayMaddr, err := ma.NewMultiaddr(cfg.Addresses.Gateway) if err != nil { - return fmt.Errorf("serveHTTPGateway: invalid gateway address: %q (err: %s)", cfg.Addresses.Gateway, err), nil, nil + return nil, fmt.Errorf("newHTTPGateway: invalid gateway address: %q (err: %s)", cfg.Addresses.Gateway, err) } var gwLis manet.Listener if config.SSL { netAddr, err := manet.ToNetAddr(gatewayMaddr) if err != nil { - return err, nil, nil + return nil, err } gwLis, err = manet.WrapNetListener(&DummyListener{netAddr}) if err != nil { - return err, nil, nil + return nil, err } } else { gwLis, err = manet.Listen(gatewayMaddr) if err != nil { - return fmt.Errorf("serveHTTPGateway: manet.Listen(%s) failed: %s", gatewayMaddr, err), nil, nil + return nil, fmt.Errorf("newHTTPGateway: manet.Listen(%s) failed: %s", gatewayMaddr, err) } } + // We might have listened to /tcp/0 - let's see what we are listing on gatewayMaddr = gwLis.Multiaddr() - log.Infof("Gateway/API server listening on %s\n", gatewayMaddr) + // Setup an options slice var opts = []corehttp.ServeOption{ corehttp.MetricsCollectionOption("gateway"), corehttp.CommandsROOption(node.Context), @@ -640,15 +639,11 @@ func serveHTTPGateway(node *core.OpenBazaarNode, authCookie http.Cookie, config } if err != nil { - return fmt.Errorf("serveHTTPGateway: ConstructNode() failed: %s", err), nil, nil + return nil, fmt.Errorf("newHTTPGateway: ConstructNode() failed: %s", err) } - errc := make(chan error) - cb := make(chan bool) - go func() { - errc <- api.Serve(cb, node, node.Context, authCookie, gwLis.NetListener(), config, opts...) - close(errc) - }() - return nil, cb, errc + + // Create and return an API gateway + return api.NewGateway(node, authCookie, gwLis.NetListener(), config, opts...) } /* Returns the directory to store repo data in.