-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathactivity.go
85 lines (65 loc) · 2.12 KB
/
activity.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package KafkaActivity
// Imports all of the flowGo binaries
import (
"github.com/TIBCOSoftware/flogo-lib/core/activity"
"github.com/TIBCOSoftware/flogo-lib/logger"
"github.com/optiopay/kafka"
"github.com/optiopay/kafka/proto"
"github.com/Shopify/sarama"
)
// log is the default package logger
var log = logger.GetLogger("activity-tibco-kafka")
// Construct Input Params
const (
topic = "topic"
message = "message"
partition = 0
)
//NBCU Kafka Dev Servers
var kafkaAddrs = []string{"ushapld00119la:9092", "ushapld00119la:9092"}
// KafkaActivity is a Kafka Activity implementation
type KafkaActivity struct {
metadata *activity.Metadata
syncProducerMap *map[string]sarama.SyncProducer
}
// init create & register activity
// func init() {
// md := activity.NewMetadata(jsonMetadata)
// activity.Register(&KafkaActivity{metadata: md})
// }
// NewActivity creates a new activity
func NewActivity(metadata *activity.Metadata) activity.Activity {
log.Debug("KafkaActivity NewActivity")
pKafkaActivity := &KafkaActivity{metadata: metadata}
producers := make(map[string]sarama.SyncProducer)
pKafkaActivity.syncProducerMap = &producers
return pKafkaActivity
}
// Metadata implements activity.Activity.Metadata
func (a *KafkaActivity) Metadata() *activity.Metadata {
return a.metadata
}
// Eval implements activity.Activity.Eval
func (a *KafkaActivity) Eval(context activity.Context) (done bool, err error) {
topicInput := context.GetInput(topic).(string)
messageInput := context.GetInput(message).(string)
conf := kafka.NewBrokerConf("NBCU-FloGo-Client")
conf.AllowTopicCreation = true
// connect to kafka cluster
broker, err := kafka.Dial(kafkaAddrs, conf)
if err != nil {
log.Error("cannot connect to kafka cluster:", err)
}
defer broker.Close()
// Connect & Send Message to Kafka
producer := broker.Producer(kafka.NewProducerConf())
msg := &proto.Message{Value: []byte(messageInput)}
resp, err := producer.Produce(topicInput, partition, msg)
if err != nil {
log.Error("Error sending message to Kafka broker:", err)
}
// if log.IsEnabledFor(log.DEBUG) {
log.Debug("Response:", resp)
// }
return true, nil
}