Skip to content
This repository has been archived by the owner on Jun 21, 2022. It is now read-only.

PMM-5194 Tunnels #643

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from
5 changes: 3 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ env:
# what we used for the latest PMM release
- GO_VERSION=1.15.x
# what will be used for PMM releases soon
# - GO_VERSION=1.16.x
- GO_VERSION=1.16.x
# the latest HEAD
- GO_VERSION=tip
# TODO enable when 1.15.x is removed
# - GO_VERSION=tip

matrix:
fast_finish: true
Expand Down
6 changes: 3 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ required = [

[[constraint]]
name = "github.com/percona/pmm"
branch = "PMM-2.0"
# branch = "PMM-2.0"
branch = "PMM-5194-tunnels"

[[constraint]]
name = "github.com/percona-platform/saas"
Expand All @@ -24,7 +25,7 @@ required = [
name = "github.com/percona-platform/dbaas-api"
branch = "main"

# to prevent unexpected downgrades; see https://github.com/percona/exporter_shared/releases/tag/v0.6.0
[[constraint]]
name = "github.com/percona/exporter_shared"
version = "0.7.2"
# # to prevent unexpected downgrades; see https://github.com/percona/exporter_shared/releases/tag/v0.6.0
# [[constraint]]
# name = "github.com/percona/exporter_shared"
# version = "0.7.2"
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
version: '3.7'
services:
pmm-managed-server:
image: ${PMM_SERVER_IMAGE:-perconalab/pmm-server:dev-latest}
image: perconalab/pmm-server-fb:PR-1459-bf4d9f9
container_name: pmm-managed-server
hostname: pmm-managed-server
environment:
Expand Down
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func runGRPCServer(ctx context.Context, deps *gRPCServerDeps) {
inventorypb.RegisterNodesServer(gRPCServer, inventorygrpc.NewNodesServer(nodesSvc))
inventorypb.RegisterServicesServer(gRPCServer, inventorygrpc.NewServicesServer(servicesSvc))
inventorypb.RegisterAgentsServer(gRPCServer, inventorygrpc.NewAgentsServer(agentsSvc))
inventorypb.RegisterTunnelsServer(gRPCServer, inventory.NewTunnelsService(deps.db, deps.agentsRegistry))

nodeSvc := management.NewNodeService(deps.db, deps.agentsRegistry)
serviceSvc := management.NewServiceService(deps.db, deps.agentsRegistry, deps.vmdb)
Expand Down Expand Up @@ -276,6 +277,7 @@ func runHTTP1Server(ctx context.Context, deps *http1ServerDeps) {
inventorypb.RegisterNodesHandlerFromEndpoint,
inventorypb.RegisterServicesHandlerFromEndpoint,
inventorypb.RegisterAgentsHandlerFromEndpoint,
inventorypb.RegisterTunnelsHandlerFromEndpoint,

managementpb.RegisterNodeHandlerFromEndpoint,
managementpb.RegisterServiceHandlerFromEndpoint,
Expand Down
28 changes: 28 additions & 0 deletions models/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,12 +413,15 @@ var databaseSchema = [][]string{
PRIMARY KEY (id)
)`,
},

25: {
`ALTER TABLE agents ADD COLUMN mongo_db_tls_options JSONB`,
},

26: {
`ALTER TABLE ia_rules ALTER COLUMN channel_ids DROP NOT NULL`,
},

27: {
`CREATE TABLE backup_locations (
id VARCHAR NOT NULL,
Expand All @@ -436,9 +439,26 @@ var databaseSchema = [][]string{
UNIQUE (name)
)`,
},

28: {
`ALTER TABLE agents ADD COLUMN disabled_collectors VARCHAR[]`,
},

29: {
`CREATE TABLE tunnels (
tunnel_id VARCHAR NOT NULL,
listen_agent_id VARCHAR NOT NULL,
listen_port INTEGER NOT NULL CHECK (listen_port > 0),
connect_agent_id VARCHAR NOT NULL,
connect_port INTEGER NOT NULL CHECK (connect_port > 0),
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL,

PRIMARY KEY (tunnel_id),
FOREIGN KEY (listen_agent_id) REFERENCES agents (agent_id),
FOREIGN KEY (connect_agent_id) REFERENCES agents (agent_id)
)`,
},
}

// ^^^ Avoid default values in schema definition. ^^^
Expand Down Expand Up @@ -524,9 +544,17 @@ func SetupDB(sqlDB *sql.DB, params *SetupDBParams) (*reform.DB, error) {
}

queries := databaseSchema[version]
if len(queries) == 0 {
return fmt.Errorf("no queries in version %d: missed migration number?", version)
}

queries = append(queries, fmt.Sprintf(`INSERT INTO schema_migrations (id) VALUES (%d)`, version))
for _, q := range queries {
q = strings.TrimSpace(q)
if q == "" {
return fmt.Errorf("empty query in version %d: missed migration number?", version)
}

if _, err = tx.Exec(q); err != nil {
return errors.Wrapf(err, "failed to execute statement:\n%s", q)
}
Expand Down
127 changes: 127 additions & 0 deletions models/tunnel_helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// pmm-managed
// Copyright (C) 2017 Percona LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

package models

import (
"github.com/google/uuid"
"github.com/pkg/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"gopkg.in/reform.v1"
)

func checkUniqueTunnelID(q *reform.Querier, id string) error {
if id == "" {
panic("empty Tunnel ID")
}

tunnel := &Tunnel{TunnelID: id}
switch err := q.Reload(tunnel); err {
case nil:
return status.Errorf(codes.AlreadyExists, "Tunnel with ID %q already exists.", id)
case reform.ErrNoRows:
return nil
default:
return errors.WithStack(err)
}
}

// FindTunnels returns Tunnels for given pmm-agent, or all, if pmmAgentID is empty.
func FindTunnels(q *reform.Querier, pmmAgentID string) ([]*Tunnel, error) {
var args []interface{}
tail := "ORDER BY tunnel_id"
if pmmAgentID != "" {
// TODO check that agent exist
args = []interface{}{pmmAgentID, pmmAgentID}
tail = "WHERE listen_agent_id = $1 OR connect_agent_id = $2 " + tail
}

structs, err := q.SelectAllFrom(TunnelTable, tail, args...)
if err != nil {
return nil, errors.WithStack(err)
}

tunnels := make([]*Tunnel, len(structs))
for i, s := range structs {
tunnels[i] = s.(*Tunnel)
}

return tunnels, nil
}

// FindTunnelByID finds Tunnel by ID.
func FindTunnelByID(q *reform.Querier, id string) (*Tunnel, error) {
if id == "" {
return nil, status.Error(codes.InvalidArgument, "Empty Tunnel ID.")
}

tunnel := &Tunnel{TunnelID: id}
switch err := q.Reload(tunnel); err {
case nil:
return tunnel, nil
case reform.ErrNoRows:
return nil, status.Errorf(codes.NotFound, "Tunnel with ID %q not found.", id)
default:
return nil, errors.WithStack(err)
}
}

// CreateTunnelParams TODO.
type CreateTunnelParams struct {
ListenAgentID string
ListenPort uint16
ConnectAgentID string
ConnectPort uint16
}

// CreateTunnel creates Tunnel.
func CreateTunnel(q *reform.Querier, params *CreateTunnelParams) (*Tunnel, error) {
id := "/tunnel_id/" + uuid.New().String()
if err := checkUniqueTunnelID(q, id); err != nil {
return nil, err
}

// TODO check that agents exist
// TODO check that ports > 0

row := &Tunnel{
TunnelID: id,
ListenAgentID: params.ListenAgentID,
ListenPort: params.ListenPort,
ConnectAgentID: params.ConnectAgentID,
ConnectPort: params.ConnectPort,
}

if err := q.Insert(row); err != nil {
return nil, errors.WithStack(err)
}

return row, nil
}

// RemoveTunnel removes Tunnel by ID.
func RemoveTunnel(q *reform.Querier, id string, mode RemoveMode) (*Tunnel, error) {
// TODO find agents
// TODO cascade delete

t := &Tunnel{TunnelID: id}
if err := q.Delete(t); err != nil {
return nil, errors.Wrap(err, "failed to delete Tunnel")
}

return t, nil
}
64 changes: 64 additions & 0 deletions models/tunnel_model.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// pmm-managed
// Copyright (C) 2017 Percona LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

package models

import (
"time"

"gopkg.in/reform.v1"
)

//go:generate reform

//reform:tunnels

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚫 [golangci-lint] reported by reviewdog 🐶
comment on exported type Tunnel should be of the form "Tunnel ..." (with optional leading article) (golint)

type Tunnel struct {
TunnelID string `reform:"tunnel_id,pk"`
ListenAgentID string `reform:"listen_agent_id"`
ListenPort uint16 `reform:"listen_port"`
ConnectAgentID string `reform:"connect_agent_id"`
ConnectPort uint16 `reform:"connect_port"`
CreatedAt time.Time `reform:"created_at"`
UpdatedAt time.Time `reform:"updated_at"`
}

// BeforeInsert implements reform.BeforeInserter interface.
func (s *Tunnel) BeforeInsert() error {
now := Now()
s.CreatedAt = now
s.UpdatedAt = now
return nil
}

// BeforeUpdate implements reform.BeforeUpdater interface.
func (s *Tunnel) BeforeUpdate() error {
s.UpdatedAt = Now()
return nil
}

// AfterFind implements reform.AfterFinder interface.
func (s *Tunnel) AfterFind() error {
s.CreatedAt = s.CreatedAt.UTC()
s.UpdatedAt = s.UpdatedAt.UTC()
return nil
}

// check interfaces
var (
_ reform.BeforeInserter = (*Tunnel)(nil)
_ reform.BeforeUpdater = (*Tunnel)(nil)
_ reform.AfterFinder = (*Tunnel)(nil)
)
Loading