Skip to content

Commit

Permalink
Merge pull request #10 from adambaumeister/maxage
Browse files Browse the repository at this point in the history
Maxage
  • Loading branch information
adambaumeister authored Dec 27, 2018
2 parents 0644f75 + de6fc79 commit 6686708
Show file tree
Hide file tree
Showing 11 changed files with 125 additions and 18 deletions.
25 changes: 17 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,24 +1,33 @@
# **This is pre-release software. Use at your own risk!**
# Description
<p align="center">
<img src="https://i.imgur.com/HdIxEOB.png">
</p>
This is a very early release of Goflow, a golang-based netflow collector with a flexible backend.

![Imgur](https://i.imgur.com/HdIxEOB.png)
A list of upcoming features can be found under the issue tracker for this project.

This is a very early release of Goflow, a golang-based netflow collector with a flexible backend.
Currently supported frontend/backend combinations are

Right now, there are many features not implemented and only one supported frontend/backend (netflow/mysql, respectively).
Frontend | Backend
------ | ------ |
Netflow | Mysql
Netflow | Timescaledb

A list of upcoming features can be found under the issue tracker for this project.

# Prereqs

For this release, you need a running mysql server and the following details from it
You need a running backend and associated connection information;

- IP address or hostname
- Database name
- Server fqdn
- Username
- Password
- Database name

For Mysql, you could use the free tier of Amazon RDS to get started.

A cost-effective option is Amazon RDS (https://aws.amazon.com/rds/) if you don't have spare servers to run a mysql instance on. Keep in mind that SSL for the backend is not yet implemented so uhhh did I mention this is pre-release software?
See the SETUP.md instructions within the backend directories in this repo for help
specific to each backend. (i.e ./backends/timescale/SETUP.MD)

# Installation
Goflow requires two files to run;
Expand Down
1 change: 1 addition & 0 deletions backends/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ type Backend interface {
Test() string
Configure(map[string]string)
Add(map[uint16]fields.Value)
Prune(string)
}
1 change: 1 addition & 0 deletions backends/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type Dump struct{}
func (b Dump) Test() string { return "Works!" }
func (b Dump) Init() {}
func (b Dump) Configure(config map[string]string) {}
func (b Dump) Prune(interval string) {}

func (b Dump) Add(values map[uint16]fields.Value) {
var sl []string
Expand Down
12 changes: 11 additions & 1 deletion backends/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ MySQL Backend
*/
const USE_QUERY = "USE %v"
const TEST_QUERY = "SELECT count(last_switched)/TIMESTAMPDIFF(SECOND, MIN(last_switched), MAX(last_switched)) AS fps, MAX(last_switched), MIN(last_switched) FROM goflow_records;"
const PRUNE_QUERY = "DELETE FROM goflow_records WHERE last_switched <= DATE_SUB(CURDATE(), INTERVAL %v DAY)"

type Mysql struct {
Dbname string
Expand Down Expand Up @@ -279,10 +280,19 @@ func (b *Mysql) Test() string {
panic(err.Error())
}
}

db.Close()
return fmt.Sprintf("MYSQL status: Flows/second: %v, Last Flow received: %v", fps.String, maxday.String)
}

func (b *Mysql) Prune(interval string) {
db := b.connect()
fmt.Printf("Pruning old data..\n")
_, err := db.Exec(fmt.Sprintf(PRUNE_QUERY, interval))
if err != nil {
panic(err.Error())
}
}

func (b *Mysql) CheckSchema() {
/*
Validates the SQL Schema matches
Expand Down
15 changes: 13 additions & 2 deletions backends/timescale/timescaledb.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ const TEST_QUERY = `SELECT MIN(last_switched) as minday, MAX(last_switched) as m
) AS fps
FROM goflow_records;`

const PRUNE_QUERY = "DELETE FROM goflow_records WHERE last_switched <= NOW() - INTERVAL '%v DAYS';"

type Tsdb struct {
Dbname string
Dbpass string
Expand Down Expand Up @@ -233,9 +235,9 @@ func (b *Tsdb) Init() {
}
datetimec := s.AddIntColumn(fields.TIMESTAMP, "last_switched", "TIMESTAMPTZ", "NOT NULL")
datetimec.Wrap = "to_timestamp(%v)"
s.AddIntColumn(fields.IPV4_SRC_ADDR, "src_ip", "integer", "DEFAULT NULL")
s.AddBinaryColumn(fields.IPV4_SRC_ADDR, "src_ip", "inet", "DEFAULT NULL")
s.AddIntColumn(fields.L4_SRC_PORT, "src_port", "integer", "NOT NULL")
s.AddIntColumn(fields.IPV4_DST_ADDR, "dst_ip", "integer", "DEFAULT NULL")
s.AddBinaryColumn(fields.IPV4_DST_ADDR, "dst_ip", "inet", "DEFAULT NULL")
s.AddIntColumn(fields.L4_DST_PORT, "dst_port", "integer", "NOT NULL")
s.AddIntColumn(fields.IN_BYTES, "in_bytes", "integer", "NOT NULL")
s.AddIntColumn(fields.IN_PKTS, "in_pkts", "integer", "NOT NULL")
Expand Down Expand Up @@ -310,6 +312,15 @@ func (b *Tsdb) Test() string {

}

func (b *Tsdb) Prune(interval string) {
db := b.connect()
_, err := db.Exec(fmt.Sprintf(PRUNE_QUERY, interval))
if err != nil {
fmt.Printf(fmt.Sprintf(PRUNE_QUERY, interval))
panic(err.Error())
}
}

func (b *Tsdb) CheckSchema() {
/*
Validates the SQL Schema matches
Expand Down
13 changes: 8 additions & 5 deletions config.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
backends:
rdssql:
type: mysql
type: timescale
config:
#SQL_SERVER: ec2-13-211-228-101.ap-southeast-2.compute.amazonaws.com
#SQL_USERNAME: remoteuser
SQL_SERVER: 52.62.226.159
SQL_USERNAME: testgoflow
SQL_SERVER: ec2-54-252-192-35.ap-southeast-2.compute.amazonaws.com
SQL_USERNAME: remoteuser
#SQL_SERVER: 52.62.226.159
#SQL_USERNAME: testgoflow
SQL_DB: testgoflow
frontends:
netflow:
Expand All @@ -14,4 +14,7 @@ frontends:
bindport: 9999
backend: rdssql

utilities:
max_age: 180

api: 127.0.0.1:8880
17 changes: 17 additions & 0 deletions config/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/adambaumeister/goflow/backends/mysql"
"github.com/adambaumeister/goflow/backends/timescale"
"github.com/adambaumeister/goflow/frontends"
"github.com/adambaumeister/goflow/utils"
"gopkg.in/yaml.v2"
"io/ioutil"
"log"
Expand All @@ -15,6 +16,7 @@ type GlobalConfig struct {
Frontends map[string]FrontendConfig
Backends map[string]BackendConfig
Api string
Utilities map[string]string
}

type FrontendConfig struct {
Expand Down Expand Up @@ -64,6 +66,21 @@ func (gc *GlobalConfig) GetBackends() map[string]backends.Backend {
return bm
}

func (gc *GlobalConfig) GetUtilities() map[string]utils.Utility {
um := make(map[string]utils.Utility)
for n, c := range gc.Utilities {
switch n {
case "max_age":
utility := utils.MaxAge{
MaxAgeDays: c,
}
utility.SetBackends(gc.GetBackends())
um[n] = &utility
}
}
return um
}

func (gc GlobalConfig) GetFrontends() []frontends.Frontend {
/*
Returns all the configured Frontends as frontend.Frontend objects
Expand Down
9 changes: 8 additions & 1 deletion config_example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,11 @@ frontends:
### API config
###
# Specify bindaddr for API
api: 127.0.0.1:8880
api: 127.0.0.1:8880

###
### Utility Config
###
utilities:
# Max_age is the length of time - in days - to store flow records for. Flows are pruned one daily based on this number.
max_age: 180
9 changes: 8 additions & 1 deletion goflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,17 @@ func main() {

gc := config.Read("config.yml")

fmt.Printf("Starting threads...")
fmt.Printf("Starting Frontends...")
fe := gc.GetFrontends()
for _, f := range fe {
go f.Start()
}
fmt.Printf("[ OK ]\n")
fmt.Printf("Starting utilities...")
utilities := gc.GetUtilities()
for _, u := range utilities {
go u.Run()
}
fmt.Printf("[ OK ]\n")
api.Start(&gc)
}
33 changes: 33 additions & 0 deletions utils/MaxAge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package utils

import (
"fmt"
"github.com/adambaumeister/goflow/backends"
"time"
)

/*
MaxAge
Using exposed "prune" methods from backends and associated GlobalConfig, deletes flow records.
*/
type MaxAge struct {
// String because it's going into a query string anyway
MaxAgeDays string
backends map[string]backends.Backend
}

// Set the backends to operate on
func (m *MaxAge) SetBackends(b map[string]backends.Backend) {
m.backends = b
}

func (m *MaxAge) Run() {
for {
for _, be := range m.backends {
be.Prune(m.MaxAgeDays)
}
fmt.Printf("Pruning configured backends...")
// This is really gross, we should have some sort of internal crontab-like thing for utilities
time.Sleep(86400 * time.Second)
}
}
8 changes: 8 additions & 0 deletions utils/default.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package utils

import "github.com/adambaumeister/goflow/backends"

type Utility interface {
SetBackends(map[string]backends.Backend)
Run()
}

0 comments on commit 6686708

Please sign in to comment.