Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

添加Zookeeper支持 #28

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .idea/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions .idea/gochat.iml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions .idea/modules.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions .idea/vcs.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 15 additions & 6 deletions api/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,21 @@ var RpcLogicObj *RpcLogic

func InitLogicRpcClient() {
once.Do(func() {
d := client.NewEtcdV3Discovery(
config.Conf.Common.CommonEtcd.BasePath,
config.Conf.Common.CommonEtcd.ServerPathLogic,
[]string{config.Conf.Common.CommonEtcd.Host},
nil,
)
var d client.ServiceDiscovery
if config.Conf.Common.Registy == "etcd" {
d = client.NewEtcdV3Discovery(
config.Conf.Common.CommonEtcd.BasePath,
config.Conf.Common.CommonEtcd.ServerPathLogic,
[]string{config.Conf.Common.CommonEtcd.Host},
nil,
)
}
if config.Conf.Common.Registy == "zookeeper" {
d = client.NewZookeeperDiscovery(config.Conf.Common.CommonZookeeper.BasePath,
config.Conf.Common.CommonZookeeper.ServerPathLogic,
[]string{config.Conf.Common.CommonZookeeper.Host},
nil)
}
LogicRpcClient = client.NewXClient(config.Conf.Common.CommonEtcd.ServerPathLogic, client.Failtry, client.RandomSelect, d, client.DefaultOption)
RpcLogicObj = new(RpcLogic)
})
Expand Down
12 changes: 10 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ type CommonEtcd struct {
ServerPathLogic string `mapstructure:"serverPathLogic"`
ServerPathConnect string `mapstructure:"serverPathConnect"`
}
type CommonZookeeper struct {
Host string `mapstructure:"host"`
BasePath string `mapstructure:"basePath"`
ServerPathLogic string `mapstructure:"serverPathLogic"`
ServerPathConnect string `mapstructure:"serverPathConnect"`
}

type CommonRedis struct {
RedisAddress string `mapstructure:"redisAddress"`
Expand All @@ -139,8 +145,10 @@ type CommonRedis struct {
}

type Common struct {
CommonEtcd CommonEtcd `mapstructure:"common-etcd"`
CommonRedis CommonRedis `mapstructure:"common-redis"`
Registy string `mapstructure:"registy"`
CommonEtcd CommonEtcd `mapstructure:"common-etcd"`
CommonZookeeper CommonZookeeper `mapstructure:"common-zookeeper"`
CommonRedis CommonRedis `mapstructure:"common-redis"`
}

type ConnectBase struct {
Expand Down
8 changes: 8 additions & 0 deletions config/dev/common.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
registy = "zookeeper"

[common-etcd]
host = "127.0.0.1:2379"
basePath = "/gochat_srv"
serverPathLogic = "LogicRpc"
serverPathConnect = "ConnectRpc"

[common-zookeeper]
host = "127.0.0.1:2181"
basePath = "/gochat_srv"
serverPathLogic = "LogicRpc"
serverPathConnect = "ConnectRpc"

[common-redis]
redisAddress = "127.0.0.1:6379"
redisPassword = ""
Expand Down
8 changes: 8 additions & 0 deletions config/prod/common.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
registy = "zookeeper"

[common-etcd]
host = "127.0.0.1:2379"
basePath = "/gochat_srv"
serverPathLogic = "LogicRpc"
serverPathConnect = "ConnectRpc"

[common-zookeeper]
host = "127.0.0.1:2181"
basePath = "/gochat_srv"
serverPathLogic = "LogicRpc"
serverPathConnect = "ConnectRpc"

[common-redis]
redisAddress = "127.0.0.1:6379"
redisPassword = ""
Expand Down
59 changes: 43 additions & 16 deletions connect/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,21 @@ type RpcConnect struct {

func (c *Connect) InitLogicRpcClient() (err error) {
once.Do(func() {
d := client.NewEtcdV3Discovery(
config.Conf.Common.CommonEtcd.BasePath,
config.Conf.Common.CommonEtcd.ServerPathLogic,
[]string{config.Conf.Common.CommonEtcd.Host},
nil,
)
var d client.ServiceDiscovery
if config.Conf.Common.Registy == "etcd" {
d = client.NewEtcdV3Discovery(
config.Conf.Common.CommonEtcd.BasePath,
config.Conf.Common.CommonEtcd.ServerPathLogic,
[]string{config.Conf.Common.CommonEtcd.Host},
nil,
)
}
if config.Conf.Common.Registy == "zookeeper" {
d = client.NewZookeeperDiscovery(config.Conf.Common.CommonZookeeper.BasePath,
config.Conf.Common.CommonZookeeper.ServerPathLogic,
[]string{config.Conf.Common.CommonZookeeper.Host},
nil)
}
logicRpcClient = client.NewXClient(config.Conf.Common.CommonEtcd.ServerPathLogic, client.Failtry, client.RandomSelect, d, client.DefaultOption)
})
if logicRpcClient == nil {
Expand Down Expand Up @@ -168,16 +177,34 @@ func (c *Connect) createConnectTcpRpcServer(network string, addr string) {
}

func addRegistryPlugin(s *server.Server, network string, addr string) {
r := &serverplugin.EtcdV3RegisterPlugin{
ServiceAddress: network + "@" + addr,
EtcdServers: []string{config.Conf.Common.CommonEtcd.Host},
BasePath: config.Conf.Common.CommonEtcd.BasePath,
Metrics: metrics.NewRegistry(),
UpdateInterval: time.Minute,

if config.Conf.Common.Registy == "etcd" {
r := &serverplugin.EtcdV3RegisterPlugin{
ServiceAddress: network + "@" + addr,
EtcdServers: []string{config.Conf.Common.CommonEtcd.Host},
BasePath: config.Conf.Common.CommonEtcd.BasePath,
Metrics: metrics.NewRegistry(),
UpdateInterval: time.Minute,
}
err := r.Start()
if err != nil {
logrus.Fatal(err)
}
s.Plugins.Add(r)
}
err := r.Start()
if err != nil {
logrus.Fatal(err)
if config.Conf.Common.Registy == "zookeeper" {
r := &serverplugin.ZooKeeperRegisterPlugin{
ServiceAddress: network + "@" + addr,
ZooKeeperServers: []string{config.Conf.Common.CommonZookeeper.Host},
BasePath: config.Conf.Common.CommonZookeeper.BasePath,
Metrics: metrics.NewRegistry(),
UpdateInterval: time.Minute,
}
err := r.Start()
if err != nil {
logrus.Fatal(err)
}
s.Plugins.Add(r)
}
s.Plugins.Add(r)

}
Binary file modified db/gochat.sqlite3
Binary file not shown.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/bwmarrin/snowflake v0.3.0
github.com/gin-gonic/gin v1.4.0
github.com/go-redis/redis v6.15.2+incompatible
github.com/google/uuid v1.1.1 // indirect
github.com/gorilla/websocket v1.4.0
github.com/jinzhu/gorm v1.9.10
github.com/pkg/errors v0.8.1
Expand Down
39 changes: 28 additions & 11 deletions logic/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,35 @@ func (logic *Logic) createRpcServer(network string, addr string) {
}

func (logic *Logic) addRegistryPlugin(s *server.Server, network string, addr string) {
r := &serverplugin.EtcdV3RegisterPlugin{
ServiceAddress: network + "@" + addr,
EtcdServers: []string{config.Conf.Common.CommonEtcd.Host},
BasePath: config.Conf.Common.CommonEtcd.BasePath,
Metrics: metrics.NewRegistry(),
UpdateInterval: time.Minute,
}
err := r.Start()
if err != nil {
logrus.Fatal(err)
if config.Conf.Common.Registy == "etcd" {
r := &serverplugin.EtcdV3RegisterPlugin{
ServiceAddress: network + "@" + addr,
EtcdServers: []string{config.Conf.Common.CommonEtcd.Host},
BasePath: config.Conf.Common.CommonEtcd.BasePath,
Metrics: metrics.NewRegistry(),
UpdateInterval: time.Minute,
}
err := r.Start()
if err != nil {
logrus.Fatal(err)
}
s.Plugins.Add(r)
}
if config.Conf.Common.Registy == "zookeeper" {
r := &serverplugin.ZooKeeperRegisterPlugin{
ServiceAddress: network + "@" + addr,
ZooKeeperServers: []string{config.Conf.Common.CommonZookeeper.Host},
BasePath: config.Conf.Common.CommonZookeeper.BasePath,
Metrics: metrics.NewRegistry(),
UpdateInterval: time.Minute,
}
err := r.Start()
if err != nil {
logrus.Fatal(err)
}
s.Plugins.Add(r)
}
s.Plugins.Add(r)

}

func (logic *Logic) RedisPublishChannel(serverId string, toUserId int, msg []byte) (err error) {
Expand Down
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"gochat/logic"
"gochat/site"
"gochat/task"

//"gochat/task"
"os"
"os/signal"
"syscall"
Expand Down
29 changes: 26 additions & 3 deletions task/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,31 @@ import (
var RpcConnectClientList map[string]client.XClient

func (task *Task) InitConnectRpcClient() (err error) {
etcdConfig := config.Conf.Common.CommonEtcd
d := client.NewEtcdV3Discovery(etcdConfig.BasePath, etcdConfig.ServerPathConnect, []string{etcdConfig.Host}, nil)
//etcdConfig := config.Conf.Common.CommonEtcd
//zookkperConfig := config.Conf.Common.CommonZookeeper
////d := client.NewEtcdV3Discovery(etcdConfig.BasePath, etcdConfig.ServerPathConnect, []string{etcdConfig.Host}, nil)
//d := client.NewZookeeperDiscovery(zookkperConfig.BasePath, zookkperConfig.ServerPathConnect, []string{zookkperConfig.Host}, nil)


var d client.ServiceDiscovery
var ServerPathConnect string
if config.Conf.Common.Registy == "etcd" {
ServerPathConnect=config.Conf.Common.CommonEtcd.ServerPathConnect
d = client.NewEtcdV3Discovery(
config.Conf.Common.CommonEtcd.BasePath,
config.Conf.Common.CommonEtcd.ServerPathConnect,
[]string{config.Conf.Common.CommonEtcd.Host},
nil,
)
}
if config.Conf.Common.Registy == "zookeeper" {
ServerPathConnect=config.Conf.Common.CommonZookeeper.ServerPathConnect
d = client.NewZookeeperDiscovery(config.Conf.Common.CommonZookeeper.BasePath,
config.Conf.Common.CommonZookeeper.ServerPathConnect,
[]string{config.Conf.Common.CommonZookeeper.Host},
nil)
}

if len(d.GetServices()) <= 0 {
logrus.Panicf("no etcd server find!")
}
Expand All @@ -35,7 +58,7 @@ func (task *Task) InitConnectRpcClient() (err error) {
}
d := client.NewPeer2PeerDiscovery(connectConf.Key, "")
//under serverId
RpcConnectClientList[serverId] = client.NewXClient(etcdConfig.ServerPathConnect, client.Failtry, client.RandomSelect, d, client.DefaultOption)
RpcConnectClientList[serverId] = client.NewXClient(ServerPathConnect, client.Failtry, client.RandomSelect, d, client.DefaultOption)
logrus.Infof("InitConnectRpcClient addr %s, v %+v", connectConf.Key, RpcConnectClientList[serverId])
}
return
Expand Down