Skip to content

Commit

Permalink
Merge pull request #15 from adambaumeister/kafka
Browse files Browse the repository at this point in the history
Kafka
  • Loading branch information
adambaumeister authored Jan 1, 2019
2 parents f31ceef + 5529060 commit 59464a9
Show file tree
Hide file tree
Showing 10 changed files with 297 additions and 16 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Frontend | Backend
------ | ------ |
Netflow | Mysql
Netflow | Timescaledb
Netflow | Apache Kafka


# Prereqs
Expand Down
29 changes: 29 additions & 0 deletions backends/kafka/backend_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package kafka

import (
"fmt"
"github.com/adambaumeister/goflow/backends"
"testing"
"time"
)

const BENCH_MAX = 10

func DontTestBackend(t *testing.T) {
fmt.Printf("Testing KAFKA Backend.\n")
k := Kafka{}
config := map[string]string{
"TEST_MODE": "true",
"KAFKA_TOPIC": "test",
"SASL_USER": "admin",
}

k.Configure(config)
k.Init()
i := 0
for i < BENCH_MAX {
k.Add(backends.GetTestFlow())
i++
}
time.Sleep(2 * time.Second)
}
167 changes: 167 additions & 0 deletions backends/kafka/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package kafka

import (
"crypto/tls"
"encoding/json"
"fmt"
"github.com/Shopify/sarama"
"github.com/adambaumeister/goflow/fields"
"os"
)

type Kafka struct {
server string
kconfig *sarama.Config
testMode bool
tc chan (string)
topic string

producer sarama.AsyncProducer
}

type JsonFLow struct {
Src_ip string
Dst_ip string
Src_port int
Dst_port int
Protocol int
In_bytes int
In_packets int
Src_ip6 string
Dst_ip6 string
}

func (j *JsonFLow) route(values map[uint16]fields.Value) {
// There's probably a nicer way of doing this.
for f, v := range values {
switch f {
case fields.IPV4_SRC_ADDR:
j.Src_ip = v.ToString()
case fields.IPV4_DST_ADDR:
j.Dst_ip = v.ToString()
case fields.L4_SRC_PORT:
j.Src_port = v.ToInt()
case fields.L4_DST_PORT:
j.Dst_port = v.ToInt()
case fields.PROTOCOL:
j.Protocol = v.ToInt()
case fields.IN_BYTES:
j.In_bytes = v.ToInt()
case fields.IN_PKTS:
j.In_packets = v.ToInt()
case fields.IPV6_SRC_ADDR:
j.Src_ip6 = v.ToString()
case fields.IPV6_DST_ADDR:
j.Dst_ip6 = v.ToString()
}
}
}

func (b *Kafka) Prune(string) {
}

func (b *Kafka) Status() string {
b.Init()
return "Kafka connection looks ok.\n"
}

func (b *Kafka) Configure(config map[string]string) {

// Config required - no defaults
cr := []string{
"KAFKA_SERVER",
"KAFKA_TOPIC",
}
// Config defauls - Optional arguments - they have defaults
cd := map[string]string{
"SSL": "true",
"SSL_VERIFY": "false",
"TEST_MODE": "true",
"SASL_USER": "",
"SASL_PASSWORD": "",
}

// Overwrite the defaults with the real values
for k, v := range config {
cd[k] = v
}

// Overwrite any values with those set in the environment, if existing
for k, _ := range cd {
if len(os.Getenv(k)) > 0 {
cd[k] = os.Getenv(k)
}
}

c := sarama.NewConfig()
for _, v := range cr {
if _, ok := config[v]; !ok {
if len(os.Getenv(v)) > 0 {
config[v] = os.Getenv(v)
} else {
panic(fmt.Sprintf("Invalid Kafka Configuration. Missing %v", v))
}
}
}
b.server = config["KAFKA_SERVER"]
b.topic = config["KAFKA_TOPIC"]

tls_config := tls.Config{}
if cd["SSL"] == "true" {
c.Net.TLS.Enable = true
if cd["SSL_VERIFY"] != "true" {
tls_config.InsecureSkipVerify = true
}
c.Net.TLS.Config = &tls_config
}

if cd["SASL_USER"] != "" {
c.Net.SASL.Enable = true
c.Net.SASL.User = cd["SASL_USER"]
c.Net.SASL.Password = cd["SASL_PASSWORD"]
}
b.kconfig = c

if cd["TEST_MODE"] == "true" {
b.testMode = true
}

}

//
func (b *Kafka) Init() {

config := b.kconfig
//config.Producer.Return.Successes = true
producer, err := sarama.NewAsyncProducer([]string{b.server}, config)
if err != nil {
panic(err)
}

b.producer = producer
}

func (b *Kafka) Add(values map[uint16]fields.Value) {
producer := b.producer

jf := JsonFLow{}
jf.route(values)

s, err := json.Marshal(jf)
if err != nil {
panic(fmt.Sprintf("Failed to marshal JSON Flow: %v", err))
}

// The idea is to try and read from producer.Errors() channel, if there's nothin' there, send the next message
// Because add is called constantly, this will stop whenever an error is received.
select {
case err := <-producer.Errors():
panic(fmt.Sprintf("Failed to produce message: %v", err))
default:
producer.Input() <- &sarama.ProducerMessage{Topic: "test", Key: nil, Value: sarama.ByteEncoder(s)}
}
// This is here to prevent Main from exiting preemptively when running Go Test.
if b.testMode {
//time.Sleep(2 * time.Second)
}
}
41 changes: 41 additions & 0 deletions backends/setup_scripts/kafka.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#!/usr/bin/env bash
## Start a test Kafka instance
## This is used for testing only, this doesn't build a prod ready Kafka instance.
## Make sure you change the below if you're actually using this!
SSLPW=changeme!

apt install default-jre
wget http://mirror.ventraip.net.au/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
tar -xzf kafka_2.11-2.1.0.tgz
cd kafka_2.11-2.1.0
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
# holy shit I hate java look at how hard this is to trust ONE CA certificate.
#Step 1
keytool -keystore server.keystore.jks -alias localhost -validity 365 -keyalg RSA -genkey
#Step 2
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
#Step 3
keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:$SSLPW
keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
printf " !! If you're using a server with less than 1G memory\n";
printf " !! change line 29 in bin/kafka-start-server.sh to reflect a more reasonable amount\n";
printf " !!\n";
printf " !! Also, if you're running on AWS, change advertised.listeners=PLAINTEXT://[ your actual servers name ]]:9092 in server.config."
printf "!!\n";
printf "!! For SSL Support, add the following snippet:\n\n";
echo "listeners=PLAINTEXT://:9092, SSL://:9093
ssl.keystore.location=/home/ubuntu/kafka_2.11-2.1.0/server.keystore.jks
ssl.keystore.password=$SSLPW
ssl.key.password=$SSLPW
ssl.truststore.location=/home/ubuntu/kafka_2.11-2.1.0/server.truststore.jks
ssl.truststore.password=spaghett
"




9 changes: 7 additions & 2 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,22 @@ backends:
rdssql:
type: timescale
config:
SQL_SERVER: ec2-13-239-40-194.ap-southeast-2.compute.amazonaws.com
SQL_SERVER: ec2-13-210-246-197.ap-southeast-2.compute.amazonaws.com
SQL_USERNAME: remoteuser
#SQL_SERVER: 52.62.226.159
#SQL_USERNAME: testgoflow
SQL_DB: testgoflow

dumper:
type: dump
config:

frontends:
netflow:
config:
bindaddr: 192.168.1.158
bindport: 9999
backend: rdssql
backend: dumper

utilities:
max_age: 180
Expand Down
5 changes: 5 additions & 0 deletions config/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package config
import (
"fmt"
"github.com/adambaumeister/goflow/backends"
"github.com/adambaumeister/goflow/backends/kafka"
"github.com/adambaumeister/goflow/backends/mysql"
"github.com/adambaumeister/goflow/backends/timescale"
"github.com/adambaumeister/goflow/frontends"
Expand Down Expand Up @@ -59,6 +60,10 @@ func (gc *GlobalConfig) GetBackends() map[string]backends.Backend {
b := backends.Dump{}
b.Configure(gc.Backends[n].Config)
bm[n] = &b
case "kafka":
b := kafka.Kafka{}
b.Configure(gc.Backends[n].Config)
bm[n] = &b
default:
panic(fmt.Sprintf("Error: Invalid backend type %v", bc.Type))
}
Expand Down
17 changes: 15 additions & 2 deletions config_example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
### Backend configuration
###
backends:
# Key is the identifying name
rdssql:
# Currently supported types:
# - mysql
Expand All @@ -16,16 +17,28 @@ backends:
dumper:
type: dump
config:
# Kafka example
kafka:
type: kafka
config:
# Required
KAFKA_SERVER: [your-server]:[listening-port]
KAFKA_TOPIC: [your-topic]
# Optional
SASL_USER: [SASL User]
SSL: true
SSL_VERIFY: false

###
### Frontend config
###
frontends:
netflow:
config:
# Bindaddr: IP address on local system to listen for netflow
bindaddr: 192.168.1.164
bindaddr: [local address]
# Bindport: Port to listen on
bindport: 9999
bindport: [local port]
# Backend to store data received by this frontend
backend: rdssql

Expand Down
Loading

0 comments on commit 59464a9

Please sign in to comment.