-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbridge_options.go
147 lines (125 loc) · 3.4 KB
/
bridge_options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package bridge
import (
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
"go.uber.org/zap"
)
// BridgeOption configures bridge behavior
type BridgeOption func(*BridgeConfig)
// BridgeConfig holds bridge-specific configuration
type BridgeConfig struct {
rootTopic string
qos byte
logger *zap.Logger
mqttClient mqtt.Client
rateLimit float64
rateBurst int
maxConns int
cleanUpInterval time.Duration
}
const (
defaultRootTopic = "golain"
defaultQoS = byte(1)
defaultRateLimit = 100 // Default operations per second
defaultRateBurst = 200 // Default burst size
defaultConnLimit = 1000 // Default maximum concurrent connections
defaultCleanUpInterval = 30 * time.Minute
)
// WithMQTTClient sets the MQTT client for the bridge
func WithMQTTClient(client mqtt.Client) BridgeOption {
return func(cfg *BridgeConfig) {
cfg.mqttClient = client
}
}
// WithLogger sets the logger for the bridge
func WithLogger(logger *zap.Logger) BridgeOption {
return func(cfg *BridgeConfig) {
cfg.logger = logger
}
}
// WithRootTopic sets the root topic for the bridge
func WithRootTopic(topic string) BridgeOption {
return func(cfg *BridgeConfig) {
cfg.rootTopic = topic
}
}
func WithCleanUpInterval(interval time.Duration) BridgeOption {
return func(cfg *BridgeConfig) {
cfg.cleanUpInterval = interval
}
}
// WithQoS sets the MQTT QoS level for the bridge
func WithQoS(qos byte) BridgeOption {
return func(cfg *BridgeConfig) {
cfg.qos = qos
}
}
// WithRateLimit sets the rate limit for operations
func WithRateLimit(ops float64) BridgeOption {
return func(cfg *BridgeConfig) {
cfg.rateLimit = ops
}
}
// WithRateBurst sets the burst size for rate limiting
func WithRateBurst(burst int) BridgeOption {
return func(cfg *BridgeConfig) {
cfg.rateBurst = burst
}
}
// WithMaxConnections sets the maximum number of concurrent connections
func WithMaxConnections(max int) BridgeOption {
return func(cfg *BridgeConfig) {
cfg.maxConns = max
}
}
// BridgeSessionState represents the current state of a bridge session
type BridgeSessionState int
const (
BridgeSessionStateActive BridgeSessionState = iota
BridgeSessionStateSuspended
BridgeSessionStateClosed
)
// String returns the string representation of BridgeSessionState
func (s BridgeSessionState) String() string {
switch s {
case BridgeSessionStateActive:
return "active"
case BridgeSessionStateSuspended:
return "suspended"
case BridgeSessionStateClosed:
return "closed"
default:
return "unknown"
}
}
// SessionOption configures session behavior
type SessionOption func(*SessionConfig)
// SessionConfig holds session-specific configuration
type SessionConfig struct {
SessionID string
State BridgeSessionState
Timeout time.Duration
DialTimeout time.Duration
}
// WithSessionID sets a specific session ID for connection
func WithSessionID(sessionID string) SessionOption {
return func(cfg *SessionConfig) {
cfg.SessionID = sessionID
}
}
// WithSessionState sets the initial session state
func WithSessionState(state BridgeSessionState) SessionOption {
return func(cfg *SessionConfig) {
cfg.State = state
}
}
func WithDialTimeout(timeout time.Duration) SessionOption {
return func(cfg *SessionConfig) {
cfg.DialTimeout = timeout
}
}
func WithSessionTimeout(timeout time.Duration) SessionOption {
return func(cfg *SessionConfig) {
cfg.Timeout = timeout
}
}