Skip to content

Commit

Permalink
Merge pull request #7 from adambaumeister/api
Browse files Browse the repository at this point in the history
Api
  • Loading branch information
adambaumeister authored Dec 26, 2018
2 parents 2053ad4 + 4cf6f56 commit 99808d8
Show file tree
Hide file tree
Showing 11 changed files with 213 additions and 23 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,14 @@ export SQL_PASSWORD=your_sql_pw_here

In the future, an installation script will be packaged for most systems but for now, you will need to create your own systemd or init scripts to start it.

# Monitoring and utilities
The goflow binary doubles as a client interface, and a JSON API is started at the same time as the daemon.

The API listens on localhost by default, but this can be tuned (see the configuration example).

The Goflow API is not for retrieving flow data, but performing ongoing maintenence and ops on the collector itself.

Goflow help displays a list of options.
```bash
./goflow help
```
33 changes: 28 additions & 5 deletions api/api.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,49 @@
package api

import (
"encoding/json"
"fmt"
"github.com/adambaumeister/goflow/config"
"log"
"net/http"
)

type API struct {
c chan string
c chan string
config *config.GlobalConfig
}

func Start() {
type JsonMessage struct {
Msg string
}

func Start(gc *config.GlobalConfig) {
a := API{}
a.config = gc

http.HandleFunc("/", a.getHandler)
http.HandleFunc("/test", a.Test)
log.Fatal(http.ListenAndServe(":8080", nil))
http.HandleFunc("/status", a.Test)
log.Fatal(http.ListenAndServe(a.config.Api, nil))

}

func (a *API) getHandler(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "API works!")
}

func (a *API) Test(w http.ResponseWriter, r *http.Request) {
TestNFFrontend()
var s string
b := a.config.GetBackends()
for _, be := range b {
s = s + be.Test() + "\n"
}
jm := JsonMessage{
Msg: s,
}
j, err := json.Marshal(jm)
if err != nil {
fmt.Println("error:", err)
}

w.Write(j)
}
71 changes: 71 additions & 0 deletions api/commands.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package api

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
)

type Commands struct {
Paths map[string]Path
HelpString string
}

type Path interface {
Get()
Help() string
}

func (c *Commands) Get() {
s := "Goflow command line client help\n------------------\n\n"
for cmd, p := range c.Paths {
s = s + fmt.Sprintf("%v : %v\n", cmd, p.Help())
}
fmt.Printf(s)
}
func (c *Commands) Help() string {
return "Display this help message."
}

/*
HTTP Path do a call to the API to present their results
*/
type HttpPath struct {
Url string
Data []byte
HelpString string
}

func (p *HttpPath) Get() {
resp, err := http.Get("http://127.0.0.1:8880" + p.Url)
if err != nil {
panic(err)
}
jm := JsonMessage{}
body, _ := ioutil.ReadAll(resp.Body)
json.Unmarshal(body, &jm)
fmt.Printf("%v\n", jm.Msg)
}

func (p *HttpPath) Help() string {
return p.HelpString
}

func (c *Commands) Parse() {
c.Paths = make(map[string]Path)

// Setup the routes
testPath := HttpPath{
Url: "/status",
HelpString: "Displays the status of configured Backends",
}

c.Paths["status"] = &testPath
c.Paths["help"] = c
if len(os.Args) > 1 {
c.Paths[os.Args[1]].Get()
os.Exit(0)
}
}
2 changes: 1 addition & 1 deletion backends/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import "github.com/adambaumeister/goflow/fields"
//
type Backend interface {
Init()
Test()
Test() string
Configure(map[string]string)
Add(map[uint16]fields.Value)
}
2 changes: 1 addition & 1 deletion backends/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

type Dump struct{}

func (b Dump) Test() {}
func (b Dump) Test() string { return "Works!" }
func (b Dump) Init() {}
func (b Dump) Configure(config map[string]string) {}

Expand Down
34 changes: 30 additions & 4 deletions backends/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
MySQL Backend
*/
const USE_QUERY = "USE %v"
const TEST_QUERY = "SELECT count(last_switched)/TIMESTAMPDIFF(SECOND, MIN(last_switched), MAX(last_switched)) AS fps, MAX(last_switched), MIN(last_switched) FROM goflow_records;"

type Mysql struct {
Dbname string
Expand Down Expand Up @@ -202,6 +203,15 @@ func (b *Mysql) Configure(config map[string]string) {
b.Server = config["SQL_SERVER"]
}

func (b *Mysql) connect() *sql.DB {
b.Dbpass = os.Getenv("SQL_PASSWORD")
db, err := sql.Open("mysql", fmt.Sprintf("%v:%v@tcp(%v:3306)/%v", b.Dbuser, b.Dbpass, b.Server, b.Dbname))
if err != nil {
panic(err.Error())
}
return db
}

func (b *Mysql) Init() {

b.CheckQuery = "SHOW COLUMNS IN goflow_records;"
Expand All @@ -213,7 +223,7 @@ func (b *Mysql) Init() {
b.AlterColQuery = "ALTER TABLE goflow_records MODIFY COLUMN %v"

b.Dbpass = os.Getenv("SQL_PASSWORD")
db, err := sql.Open("mysql", fmt.Sprintf("%v:%v@tcp(%v:3306)/%v", b.Dbuser, b.Dbpass, b.Server, b.Dbname))
db := b.connect()
b.db = db
s := Schema{
columnIndex: make(map[uint16]Column),
Expand All @@ -234,7 +244,7 @@ func (b *Mysql) Init() {
b.schema = &s

// Open doesn't open a connection. Validate DSN data:
err = db.Ping()
err := db.Ping()
if err != nil {
panic(err.Error())
}
Expand All @@ -250,11 +260,27 @@ func (b *Mysql) Init() {
b.CheckSchema()
}

func (b *Mysql) Test() {
err := b.db.Ping()
func (b *Mysql) Test() string {
db := b.connect()
err := db.Ping()
if err != nil {
panic(err.Error())
}

var (
minday sql.NullString
maxday sql.NullString
fps sql.NullString
)
rows, err := db.Query(TEST_QUERY)
for rows.Next() {
err := rows.Scan(&fps, &minday, &maxday)
if err != nil {
panic(err.Error())
}
}

return fmt.Sprintf("MYSQL status: Flows/second: %v, Last Flow received: %v", fps.String, maxday.String)
}

func (b *Mysql) CheckSchema() {
Expand Down
59 changes: 53 additions & 6 deletions backends/timescale/timescaledb.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ import (
"strings"
)

const SIZE_QUERY = "SELECT * FROM hypertable_relation_size('goflow_records');"
const TEST_QUERY = `SELECT MIN(last_switched) as minday, MAX(last_switched) as maxday, count(last_switched)
, count(last_switched)/
( EXTRACT(hour FROM MAX(last_switched) - MIN(last_switched))*60*60
+ EXTRACT(minutes FROM MAX(last_switched) - MIN(last_switched))*60
+ EXTRACT(seconds FROM MAX(last_switched) - MIN(last_switched))
) AS fps
FROM goflow_records;`

type Tsdb struct {
Dbname string
Dbpass string
Expand Down Expand Up @@ -201,6 +210,14 @@ func (b *Tsdb) Configure(config map[string]string) {
b.Server = config["SQL_SERVER"]
}

func (b *Tsdb) connect() *sql.DB {
b.Dbpass = os.Getenv("SQL_PASSWORD")
db, err := sql.Open("postgres", fmt.Sprintf("user=%v password=%v host=%v dbname=%v", b.Dbuser, b.Dbpass, b.Server, b.Dbname))
if err != nil {
panic(err.Error())
}
return db
}
func (b *Tsdb) Init() {

b.CheckQuery = "select column_name, data_type, character_maximum_length from INFORMATION_SCHEMA.COLUMNS where table_name = 'goflow_records';"
Expand All @@ -211,9 +228,6 @@ func (b *Tsdb) Init() {
b.DropQuery = "DROP TABLE goflow_records"
b.AlterColQuery = "ALTER TABLE goflow_records MODIFY COLUMN %v"

b.Dbpass = os.Getenv("SQL_PASSWORD")
db, err := sql.Open("postgres", fmt.Sprintf("user=%v password=%v host=%v dbname=%v", b.Dbuser, b.Dbpass, b.Server, b.Dbname))
b.db = db
s := Schema{
columnIndex: make(map[uint16]Column),
}
Expand All @@ -230,10 +244,12 @@ func (b *Tsdb) Init() {
s.AddBinaryColumn(fields.IPV6_DST_ADDR, "dst_ipv6", "inet", "DEFAULT NULL")
InitQuery := s.GetColumnStrings(b.InitQuery)

db := b.connect()
b.db = db
b.schema = &s

// Open doesn't open a connection. Validate DSN data:
err = db.Ping()
err := db.Ping()
if err != nil {
panic(err.Error())
}
Expand All @@ -256,11 +272,42 @@ func (b *Tsdb) Init() {
b.CheckSchema()
}

func (b *Tsdb) Test() {
err := b.db.Ping()
func (b *Tsdb) Test() string {
db := b.connect()
err := db.Ping()
if err != nil {
panic(err.Error())
}

var (
table_bytes sql.NullString
index_bytes sql.NullString
toast_bytes sql.NullString
total_bytes sql.NullString

minday sql.NullString
maxday sql.NullString
totalflows sql.NullString
fps sql.NullString
)

rows, err := db.Query(SIZE_QUERY)
for rows.Next() {
err := rows.Scan(&table_bytes, &index_bytes, &toast_bytes, &total_bytes)
if err != nil {
panic(err.Error())
}
}
rows, err = db.Query(TEST_QUERY)
for rows.Next() {
err := rows.Scan(&minday, &maxday, &totalflows, &fps)
if err != nil {
panic(err.Error())
}
}

return fmt.Sprintf("Timescale DB Status: Table size: %v Bytes, Index: %v Bytes, Flows/second: %v", table_bytes.String, index_bytes.String, fps.String)

}

func (b *Tsdb) CheckSchema() {
Expand Down
5 changes: 2 additions & 3 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@ backends:
SQL_SERVER: 52.62.226.159
SQL_USERNAME: testgoflow
SQL_DB: testgoflow
dumper:
type: dump
config:
frontends:
netflow:
config:
bindaddr: 192.168.1.164
bindport: 9999
backend: rdssql

api: 127.0.0.1:8880
8 changes: 6 additions & 2 deletions config/global.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import (
"fmt"
"github.com/adambaumeister/goflow/backends"
"github.com/adambaumeister/goflow/backends/mysql"
"github.com/adambaumeister/goflow/backends/timescale"
Expand All @@ -13,6 +14,7 @@ import (
type GlobalConfig struct {
Frontends map[string]FrontendConfig
Backends map[string]BackendConfig
Api string
}

type FrontendConfig struct {
Expand All @@ -39,7 +41,7 @@ func Read(filename string) GlobalConfig {
return gc
}

func (gc *GlobalConfig) getBackends() map[string]backends.Backend {
func (gc *GlobalConfig) GetBackends() map[string]backends.Backend {
bm := make(map[string]backends.Backend)
for n, bc := range gc.Backends {
switch bc.Type {
Expand All @@ -55,6 +57,8 @@ func (gc *GlobalConfig) getBackends() map[string]backends.Backend {
b := backends.Dump{}
b.Configure(gc.Backends[n].Config)
bm[n] = &b
default:
panic(fmt.Sprintf("Error: Invalid backend type %v", bc.Type))
}
}
return bm
Expand All @@ -66,7 +70,7 @@ func (gc GlobalConfig) GetFrontends() []frontends.Frontend {
Maps frontends to backends in the same run
*/
var r []frontends.Frontend
bm := gc.getBackends()
bm := gc.GetBackends()
for n, fields := range gc.Frontends {
switch n {
case "netflow":
Expand Down
Loading

0 comments on commit 99808d8

Please sign in to comment.