-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathservice.go
86 lines (76 loc) · 1.94 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package natsrpc
import (
"context"
"fmt"
)
var _ ServiceInterface = (*Service)(nil)
type ServerInterface interface {
Encoder
Remove(string) bool
}
// Service 服务
type Service struct {
sd ServiceDesc // 描述
val interface{} // 值
server ServerInterface // rpc
methods map[string]MethodDesc // 方法集合
opt ServiceOptions
}
// Name 名字
func (s *Service) Name() string {
return joinSubject(s.opt.namespace, s.sd.ServiceName, s.opt.id)
}
// Close 关闭
// 会取消所有订阅
func (s *Service) Close() bool {
return s.server.Remove(s.Name())
}
// NewService 创建服务
func NewService(server ServerInterface, sd ServiceDesc, i interface{}, options ServiceOptions) (*Service, error) {
methods := map[string]MethodDesc{}
for _, md := range sd.Methods {
if _, ok := methods[md.MethodName]; ok {
return nil, fmt.Errorf("service [%s] duplicate method [%s]", sd.ServiceName, md.MethodName)
}
methods[md.MethodName] = md
}
s := &Service{
methods: methods,
sd: sd,
val: i,
server: server,
opt: options,
}
return s, nil
}
func (s *Service) Call(ctx context.Context, methodName string, b []byte, interceptor Interceptor) ([]byte, error) {
m, ok := s.methods[methodName]
if !ok {
return nil, ErrNoMethod
}
req := m.NewRequest()
if err := s.server.Decode(b, req); err != nil {
return nil, err
}
resp, err := s.call(ctx, m, req, interceptor)
if err != nil {
return nil, err
}
if !m.IsPublish {
if resp == nil {
return nil, nil
}
return s.server.Encode(resp)
}
return nil, nil
}
func (s *Service) call(ctx context.Context, m MethodDesc, req interface{}, interceptor Interceptor) (interface{}, error) {
if interceptor == nil {
return m.Handler(s.val, ctx, req)
} else {
invoker := func(ctx1 context.Context, req1 interface{}) (interface{}, error) {
return m.Handler(s.val, ctx1, req1)
}
return interceptor(ctx, m.MethodName, req, invoker)
}
}