From 50872e6378ea9d4a37abb7dc48c3b75556d69776 Mon Sep 17 00:00:00 2001 From: bruwbird Date: Wed, 12 Jun 2024 11:25:00 +0900 Subject: [PATCH] cln+lnd: liveness check add a plugin hook in the case of core lightning and an interceptor in the case of lnd, and pre-install a dead/alive monitor for each daemon. --- clightning/clightning.go | 23 +++++++++++++++++++++++ cmd/peerswaplnd/peerswapd/main.go | 19 ++++++++++++++++++- electrum/client.go | 4 ++++ electrum/electrum.go | 1 + electrum/mock/electrum.go | 14 ++++++++++++++ lwk/lwkwallet.go | 11 +++++++++++ wallet/elementsrpcwallet.go | 5 +++++ wallet/wallet.go | 1 + 8 files changed, 77 insertions(+), 1 deletion(-) diff --git a/clightning/clightning.go b/clightning/clightning.go index ea0d9ec2..d4e3113d 100644 --- a/clightning/clightning.go +++ b/clightning/clightning.go @@ -131,6 +131,7 @@ func NewClightningClient(ctx context.Context) (*ClightningClient, <-chan interfa cl.Plugin = glightning.NewPlugin(cl.onInit) err := cl.Plugin.RegisterHooks(&glightning.Hooks{ CustomMsgReceived: cl.OnCustomMsg, + RpcCommand: cl.OnRPCCommand, }) if err != nil { return nil, nil, err @@ -408,6 +409,28 @@ func (cl *ClightningClient) OnCustomMsg(event *glightning.CustomMsgReceivedEvent return event.Continue(), nil } +func (cl *ClightningClient) OnRPCCommand(event *glightning.RpcCommandEvent) (*glightning.RpcCommandResponse, error) { + if cl.gbitcoin != nil { + ok, err := cl.gbitcoin.Ping() + if err != nil { + return event.ReturnError("bitcoin daemon not reachable", 0) + } + if !ok { + return event.ReturnError("bitcoin daemon not reachable", 0) + } + } + if cl.liquidWallet != nil { + ok, err := cl.liquidWallet.Ping() + if err != nil { + return event.ReturnError("liquid daemon not reachable", 0) + } + if !ok { + return event.ReturnError("liquid daemon not reachable", 0) + } + } + return event.Continue(), nil +} + // AddMessageHandler adds a listener for incoming peermessages func (cl *ClightningClient) AddMessageHandler(f func(peerId string, msgType string, payload []byte) error) { cl.msgHandlers = append(cl.msgHandlers, f) diff --git a/cmd/peerswaplnd/peerswapd/main.go b/cmd/peerswaplnd/peerswapd/main.go index 04554373..02ad7587 100644 --- a/cmd/peerswaplnd/peerswapd/main.go +++ b/cmd/peerswaplnd/peerswapd/main.go @@ -397,7 +397,9 @@ func run() error { } defer lis.Close() - grpcSrv := grpc.NewServer() + grpcSrv := grpc.NewServer( + grpc.UnaryInterceptor(livenessCheckInterceptor(liquidRpcWallet)), + ) peerswaprpc.RegisterPeerSwapServer(grpcSrv, peerswaprpcServer) @@ -439,6 +441,21 @@ func run() error { return nil } +func livenessCheckInterceptor(liquidWallet wallet.Wallet) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + if liquidWallet != nil { + ok, err := liquidWallet.Ping() + if err != nil { + return nil, fmt.Errorf("liquid rpc not reachable: %v", err) + } + if !ok { + return nil, errors.New("liquid rpc not reachable") + } + } + return handler(ctx, req) + } +} + func getBitcoinChain(ctx context.Context, li lnrpc.LightningClient) (*chaincfg.Params, error) { gi, err := li.GetInfo(ctx, &lnrpc.GetInfoRequest{}) if err != nil { diff --git a/electrum/client.go b/electrum/client.go index fac23ed9..6e35557e 100644 --- a/electrum/client.go +++ b/electrum/client.go @@ -84,3 +84,7 @@ func (c *electrumClient) GetFee(ctx context.Context, target uint32) (float32, er } return c.client.GetFee(ctx, target) } + +func (c *electrumClient) Ping(ctx context.Context) error { + return c.reconnect(ctx) +} diff --git a/electrum/electrum.go b/electrum/electrum.go index 43dd0c60..332f7208 100644 --- a/electrum/electrum.go +++ b/electrum/electrum.go @@ -12,4 +12,5 @@ type RPC interface { GetRawTransaction(ctx context.Context, txHash string) (string, error) BroadcastTransaction(ctx context.Context, rawTx string) (string, error) GetFee(ctx context.Context, target uint32) (float32, error) + Ping(ctx context.Context) error } diff --git a/electrum/mock/electrum.go b/electrum/mock/electrum.go index b6ff9a0a..44313eea 100644 --- a/electrum/mock/electrum.go +++ b/electrum/mock/electrum.go @@ -100,6 +100,20 @@ func (mr *MockRPCMockRecorder) GetRawTransaction(ctx, txHash any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRawTransaction", reflect.TypeOf((*MockRPC)(nil).GetRawTransaction), ctx, txHash) } +// Ping mocks base method. +func (m *MockRPC) Ping(ctx context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Ping", ctx) + ret0, _ := ret[0].(error) + return ret0 +} + +// Ping indicates an expected call of Ping. +func (mr *MockRPCMockRecorder) Ping(ctx any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ping", reflect.TypeOf((*MockRPC)(nil).Ping), ctx) +} + // SubscribeHeaders mocks base method. func (m *MockRPC) SubscribeHeaders(ctx context.Context) (<-chan *electrum.SubscribeHeadersResult, error) { m.ctrl.T.Helper() diff --git a/lwk/lwkwallet.go b/lwk/lwkwallet.go index e84afd38..6009ae13 100644 --- a/lwk/lwkwallet.go +++ b/lwk/lwkwallet.go @@ -280,3 +280,14 @@ func (r *LWKRpcWallet) SetLabel(txID, address, label string) error { // TODO: call set label return nil } + +func (r *LWKRpcWallet) Ping() (bool, error) { + ctx, cancel := context.WithTimeout(context.Background(), defaultContextTimeout) + defer cancel() + _, err := r.lwkClient.version(ctx) + if err != nil { + return false, errors.New("lwk connection failed: " + err.Error()) + } + err = r.electrumClient.Ping(ctx) + return err == nil, err +} diff --git a/wallet/elementsrpcwallet.go b/wallet/elementsrpcwallet.go index 06bf11ee..6f36cf41 100644 --- a/wallet/elementsrpcwallet.go +++ b/wallet/elementsrpcwallet.go @@ -31,6 +31,7 @@ type RpcClient interface { SendRawTx(txHex string) (string, error) EstimateFee(blocks uint32, mode string) (*gelements.FeeResponse, error) SetLabel(address, label string) error + Ping() (bool, error) } // ElementsRpcWallet uses the elementsd rpc wallet @@ -191,3 +192,7 @@ func satsToAmountString(sats uint64) string { bitcoinAmt := float64(sats) / 100000000 return fmt.Sprintf("%f", bitcoinAmt) } + +func (r *ElementsRpcWallet) Ping() (bool, error) { + return r.rpcClient.Ping() +} diff --git a/wallet/wallet.go b/wallet/wallet.go index b0a016b3..9c2ed177 100644 --- a/wallet/wallet.go +++ b/wallet/wallet.go @@ -22,4 +22,5 @@ type Wallet interface { SendRawTx(rawTx string) (txid string, err error) GetFee(txSize int64) (uint64, error) SetLabel(txID, address, label string) error + Ping() (bool, error) }