Skip to content

Commit f2f2adf

Browse files
committed
update sql drivers config
1 parent 82ea726 commit f2f2adf

File tree

7 files changed

+247
-11
lines changed

7 files changed

+247
-11
lines changed

go.mod

+3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go 1.22
55
require (
66
github.com/golang-jwt/jwt v3.2.2+incompatible
77
github.com/joho/godotenv v1.5.1
8+
github.com/r3labs/sse/v2 v2.10.0
89
github.com/streadway/amqp v1.0.0
910
go.mongodb.org/mongo-driver v1.11.7
1011
golang.org/x/crypto v0.9.0
@@ -32,6 +33,8 @@ require (
3233
github.com/xdg-go/scram v1.1.1 // indirect
3334
github.com/xdg-go/stringprep v1.0.3 // indirect
3435
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
36+
golang.org/x/net v0.10.0 // indirect
3537
golang.org/x/sync v0.1.0 // indirect
3638
golang.org/x/text v0.9.0 // indirect
39+
gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect
3740
)

go.sum

+10
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
3737
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
3838
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
3939
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
40+
github.com/r3labs/sse/v2 v2.10.0 h1:hFEkLLFY4LDifoHdiCN/LlGBAdVJYsANaLqNYa1l/v0=
41+
github.com/r3labs/sse/v2 v2.10.0/go.mod h1:Igau6Whc+F17QUgML1fYe1VPZzTV6EMCnYktEmkNJ7I=
4042
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
4143
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
4244
github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
@@ -59,24 +61,32 @@ github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM
5961
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
6062
go.mongodb.org/mongo-driver v1.11.7 h1:LIwYxASDLGUg/8wOhgOOZhX8tQa/9tgZPgzZoVqJvcs=
6163
go.mongodb.org/mongo-driver v1.11.7/go.mod h1:G9TgswdsWjX4tmDA5zfs2+6AEPpYJwqblyjsfuh8oXY=
64+
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
6265
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
6366
golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g=
6467
golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0=
68+
golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
6569
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
70+
golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
71+
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
6672
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
6773
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
6874
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
75+
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
6976
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
7077
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
7178
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
7279
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
80+
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
7381
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
7482
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
7583
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
7684
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
7785
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
7886
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
7987
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
88+
gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=
89+
gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI=
8090
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
8191
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
8292
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

quickmq/client.go

+94
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package quickmq
2+
3+
import (
4+
"bytes"
5+
"encoding/base64"
6+
"fmt"
7+
"time"
8+
9+
"github.com/ochom/gutils/gttp"
10+
"github.com/ochom/gutils/helpers"
11+
"github.com/r3labs/sse/v2"
12+
)
13+
14+
// Client ...
15+
type Client struct {
16+
url string
17+
username string
18+
password string
19+
queue string
20+
}
21+
22+
// NewClient creates a new quickmq client
23+
func NewClient(quickUrl, queue string) *Client {
24+
url, username, password, err := parseUrl(quickUrl)
25+
if err != nil {
26+
panic(err)
27+
}
28+
29+
return &Client{url: url, username: username, password: password, queue: queue}
30+
}
31+
32+
// publish ...
33+
func (p *Client) publish(body []byte, delay time.Duration) error {
34+
message := map[string]any{
35+
"body": string(body),
36+
"delay": delay,
37+
"queue": p.queue,
38+
}
39+
40+
headers := map[string]string{
41+
"Content-Type": "application/json",
42+
"Authorization": "Basic " + base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%s", p.username, p.password))),
43+
}
44+
45+
url := fmt.Sprintf("%s/publish", p.url)
46+
res, err := gttp.Post(url, headers, helpers.ToBytes(message), time.Minute)
47+
if err != nil {
48+
return fmt.Errorf("failed to publish message: %s", err.Error())
49+
}
50+
51+
if res.Status != 200 {
52+
return fmt.Errorf("failed to publish message: %s", string(res.Body))
53+
}
54+
55+
return nil
56+
}
57+
58+
// PublishWithDelay ...
59+
func (p *Client) PublishWithDelay(body []byte, delay time.Duration) error {
60+
return p.publish(body, delay)
61+
}
62+
63+
// Publish ...
64+
func (p *Client) Publish(body []byte) error {
65+
return p.publish(body, 0)
66+
}
67+
68+
// Consume consume messages from the channels
69+
func (c *Client) Consume(stop chan bool, workerFunc func([]byte)) error {
70+
events := make(chan *sse.Event)
71+
url := fmt.Sprintf("%s/subscribe?queue=%s", c.url, c.queue)
72+
73+
client := sse.NewClient(url, func(sseClient *sse.Client) {
74+
headers := map[string]string{
75+
"Authorization": "Basic " + base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%s", c.username, c.password))),
76+
}
77+
sseClient.Headers = headers
78+
})
79+
if err := client.SubscribeChanRaw(events); err != nil {
80+
return err
81+
}
82+
83+
for {
84+
select {
85+
case <-stop:
86+
return fmt.Errorf("stop signal received")
87+
case message := <-events:
88+
if bytes.Equal(message.Data, []byte(`{}`)) {
89+
continue
90+
}
91+
workerFunc(message.Data)
92+
}
93+
}
94+
}

quickmq/client_test.go

+97
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package quickmq
2+
3+
import (
4+
"testing"
5+
"time"
6+
)
7+
8+
func Test_publisher_Publish(t *testing.T) {
9+
type fields struct {
10+
url string
11+
queue string
12+
}
13+
type args struct {
14+
body []byte
15+
}
16+
tests := []struct {
17+
name string
18+
fields fields
19+
args args
20+
wantErr bool
21+
}{
22+
{
23+
name: "test 1",
24+
fields: fields{
25+
url: "quick://admin:admin@localhost:16321",
26+
queue: "TEST",
27+
},
28+
args: args{
29+
body: []byte("test"),
30+
},
31+
wantErr: false,
32+
},
33+
}
34+
for _, tt := range tests {
35+
t.Run(tt.name, func(t *testing.T) {
36+
client := NewClient(tt.fields.url, tt.fields.queue)
37+
if err := client.Publish(tt.args.body); (err != nil) != tt.wantErr {
38+
t.Errorf("publisher.Publish() error = %v, wantErr %v", err, tt.wantErr)
39+
}
40+
})
41+
}
42+
}
43+
44+
func TestClient_Consume(t *testing.T) {
45+
type fields struct {
46+
url string
47+
queue string
48+
}
49+
type args struct {
50+
stop chan bool
51+
workerFunc func([]byte)
52+
}
53+
tests := []struct {
54+
name string
55+
fields fields
56+
args args
57+
wantErr bool
58+
}{
59+
{
60+
name: "test 1",
61+
fields: fields{
62+
url: "quick://admin:admin@localhost:16321",
63+
queue: "TEST",
64+
},
65+
args: args{
66+
stop: make(chan bool),
67+
workerFunc: func(body []byte) {
68+
t.Logf("received message: %v", string(body))
69+
},
70+
},
71+
wantErr: true,
72+
},
73+
}
74+
for _, tt := range tests {
75+
t.Run(tt.name, func(t *testing.T) {
76+
// wait 10 seconds and stop consuming
77+
go func() {
78+
time.Sleep(10 * time.Second)
79+
tt.args.stop <- true
80+
}()
81+
c := NewClient(tt.fields.url, tt.fields.queue)
82+
if err := c.Publish([]byte("test")); err != nil {
83+
t.Errorf("Client.Publish() error = %v", err)
84+
}
85+
if err := c.Publish([]byte("test")); err != nil {
86+
t.Errorf("Client.Publish() error = %v", err)
87+
}
88+
if err := c.Publish([]byte("test")); err != nil {
89+
t.Errorf("Client.Publish() error = %v", err)
90+
}
91+
92+
if err := c.Consume(tt.args.stop, tt.args.workerFunc); (err != nil) != tt.wantErr {
93+
t.Errorf("Client.Consume() error = %v, wantErr %v", err, tt.wantErr)
94+
}
95+
})
96+
}
97+
}

quickmq/quickmq.go

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package quickmq
2+
3+
import (
4+
"fmt"
5+
"net"
6+
"net/url"
7+
)
8+
9+
// quickUrl should be of the format quick://username:password@host:port
10+
func parseUrl(quickUrl string) (newUrl, username, password string, err error) {
11+
u, err := url.Parse(quickUrl)
12+
if err != nil {
13+
return "", "", "", err
14+
}
15+
16+
host, port, err := net.SplitHostPort(u.Host)
17+
if err != nil {
18+
return "", "", "", err
19+
}
20+
21+
username = u.User.Username()
22+
if username == "" {
23+
username = "admin"
24+
}
25+
26+
password, ok := u.User.Password()
27+
if !ok {
28+
password = "admin"
29+
}
30+
31+
newUrl = fmt.Sprintf("http://%s:%s", host, port)
32+
return newUrl, username, password, nil
33+
}

sql/db.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ func Conn() *gorm.DB { return connection }
1818

1919
// defaultConfig ...
2020
var config = &Config{
21-
DatabaseType: Sqlite,
21+
Driver: Sqlite,
2222
Url: "gorm.db",
2323
LogLevel: logger.Silent,
2424
MaxIdleConns: 10,
@@ -29,8 +29,8 @@ var config = &Config{
2929
// New initializes the database connection with GORM
3030
func New(configs ...*Config) (err error) {
3131
for _, cfg := range configs {
32-
if cfg.DatabaseType != "" {
33-
config.DatabaseType = cfg.DatabaseType
32+
if cfg.Driver != 0 {
33+
config.Driver = cfg.Driver
3434
}
3535

3636
if cfg.Url != "" {
@@ -58,7 +58,7 @@ func New(configs ...*Config) (err error) {
5858
}
5959

6060
func createInstance() error {
61-
switch config.DatabaseType {
61+
switch config.Driver {
6262
case Postgres:
6363
return createPgInstance()
6464
case MySQL:

sql/types.go

+6-7
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,13 @@ import (
66
"gorm.io/gorm/logger"
77
)
88

9-
// Platform ...
10-
type Platform string
9+
type Driver int
1110

12-
// Platforms ...
11+
// Drivers ...
1312
const (
14-
Postgres Platform = "postgres"
15-
MySQL Platform = "mysql"
16-
Sqlite Platform = "sqlite"
13+
Sqlite Driver = iota
14+
Postgres
15+
MySQL
1716
)
1817

1918
// LogLevels ...
@@ -26,7 +25,7 @@ const (
2625

2726
// Database configuration
2827
type Config struct {
29-
DatabaseType Platform
28+
Driver Driver
3029
Url string
3130
LogLevel logger.LogLevel
3231
MaxIdleConns int

0 commit comments

Comments
 (0)