From 939276231cc68ced380399224f6fce7428602e66 Mon Sep 17 00:00:00 2001 From: ziyuan Date: Sat, 21 Aug 2021 01:10:34 +0800 Subject: [PATCH] feature: wait for xraycore to start before detect latency --- .gitignore | 7 +++++ gen/gen.go | 78 ++++++++++++++++++++++++++++++++++++++++++------------ 2 files changed, 68 insertions(+), 17 deletions(-) diff --git a/.gitignore b/.gitignore index f36daf8..03fde8c 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ *.dll *.so *.dylib +bin # Test binary, built with `go test -c` *.test @@ -16,3 +17,9 @@ # IDEA .idea + +# Dev +xray-config.json +xray-core.log +xraysub.log +xraysub.exe diff --git a/gen/gen.go b/gen/gen.go index 669b2fe..47de304 100644 --- a/gen/gen.go +++ b/gen/gen.go @@ -1,6 +1,7 @@ package gen import ( + "bytes" "encoding/base64" "encoding/json" "errors" @@ -12,6 +13,7 @@ import ( "github.com/schollz/progressbar/v3" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" + "io" "io/ioutil" "net" "os" @@ -87,7 +89,7 @@ func detectLatency(xCfg *xray.Config) error { if len(xCfg.Outbounds) == 0 { return errors.New("outbounds empty") } - // 根据 outbounds 生成 inbounds 和 routing rules + // generate inbound and routing rules based on outbound to test latency var inbounds []*xray.Inbound var routingRules []*xray.Rule inboundPorts := randomInboundPorts(xCfg.Outbounds) @@ -108,19 +110,29 @@ func detectLatency(xCfg *xray.Config) error { if err != nil { return err } + defer os.RemoveAll(f.Name()) cmd := exec.Command(Cfg.XrayCorePath, "-c", f.Name(), "-format=json") - if err != nil { - return fmt.Errorf("init xray-core command error: %w", err) - } + // for: + // 1. xray-core.log + // 2. buffer for check xray-core status xlf, err := appendXrayCoreLogFile() if err != nil { return fmt.Errorf("create xray-core.log error: %w", err) } - defer util.CheckErr(xlf.Close()) - cmd.Stdout = xlf - cmd.Stderr = xlf + defer xlf.Close() + buf := new(bytes.Buffer) + w := io.MultiWriter(xlf, buf) + + outPipe, err := cmd.StdoutPipe() + util.CheckErr(err) + + go func() { + _, err = io.Copy(w, outPipe) + util.CheckErr(err) + }() + if err = cmd.Start(); err != nil { return fmt.Errorf("exec xray-core error: %w", err) } @@ -131,11 +143,15 @@ func detectLatency(xCfg *xray.Config) error { // start rendering progress bar bar := initProgressBar(xCfg) + if err = checkXrayCoreStatus(buf); err != nil { + return err + } + wg := new(sync.WaitGroup) outboundChan := make(chan *xray.ShadowsocksOutbound) for i := 0; i < Cfg.DetectThreadNumber; i++ { wg.Add(1) - go pingWorker(outboundChan, wg, bar) + go detectWorker(outboundChan, wg, bar) } for _, outbound := range xCfg.Outbounds { outboundChan <- outbound @@ -145,6 +161,22 @@ func detectLatency(xCfg *xray.Config) error { fmt.Println() // filter fasted outbound + fastedOutbound, err := getFastedOutbound(xCfg) + if err != nil { + return err + } + xCfg.Routing.Rules = []*xray.Rule{ + { + Type: "field", + OutboundTag: fastedOutbound.Tag, + Port: "0-65535", + }, + } + + return nil +} + +func getFastedOutbound(xCfg *xray.Config) (*xray.ShadowsocksOutbound, error) { var fastedOutbound *xray.ShadowsocksOutbound for _, outbound := range xCfg.Outbounds { if outbound.Latency == nil { @@ -157,19 +189,31 @@ func detectLatency(xCfg *xray.Config) error { } } if fastedOutbound == nil { - fmt.Println("All nodes detectLatency test failed") + return nil, errors.New("all nodes detectLatency test failed") } else { s := fastedOutbound.Settings.Servers[0] fmt.Printf("Got fastest node: %s:%d\n", s.Address, s.Port) } - xCfg.Routing.Rules = []*xray.Rule{ - { - Type: "field", - OutboundTag: fastedOutbound.Tag, - Port: "0-65535", - }, - } + return fastedOutbound, nil +} +// check xray-core started status +func checkXrayCoreStatus(buf *bytes.Buffer) error { + // wait up to 3 seconds for Xray to start + timeout := time.After(3 * time.Second) +LOOP: + for { + s := buf.String() + select { + case <-timeout: + return errors.New("start xray-core error, please check xray-core's log") + default: + if strings.Contains(s, "started") { + break LOOP + } + time.Sleep(100 * time.Millisecond) + } + } return nil } @@ -194,7 +238,7 @@ func appendXrayCoreLogFile() (*os.File, error) { return f, err } -func pingWorker(oc chan *xray.ShadowsocksOutbound, wg *sync.WaitGroup, bar *progressbar.ProgressBar) { +func detectWorker(oc chan *xray.ShadowsocksOutbound, wg *sync.WaitGroup, bar *progressbar.ProgressBar) { defer wg.Done() for outbound := range oc {