-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathproducer.js
89 lines (78 loc) · 2.29 KB
/
producer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
const Kafka = require("node-rdkafka");
const { configFromPath } = require("./util");
function createConfigMap(config) {
if (config.hasOwnProperty("security.protocol")) {
return {
"bootstrap.servers": config["bootstrap.servers"],
"sasl.username": config["sasl.username"],
"sasl.password": config["sasl.password"],
"security.protocol": config["security.protocol"],
"sasl.mechanisms": config["sasl.mechanisms"],
dr_msg_cb: true,
};
} else {
return {
"bootstrap.servers": config["bootstrap.servers"],
dr_msg_cb: true,
};
}
}
function createProducer(config, onDeliveryReport) {
const producer = new Kafka.Producer(createConfigMap(config));
return new Promise((resolve, reject) => {
producer
.on("ready", () => resolve(producer))
.on("delivery-report", onDeliveryReport)
.on("event.error", (err) => {
console.warn("event.error", err);
reject(err);
});
producer.connect();
});
}
async function produceExample() {
if (process.argv.legth < 3) {
console.log(
"Please provide the configuration file path as the command line argument"
);
process.exit(1);
}
let configPath = process.argv.slice(2)[0];
const config = await configFromPath(configPath);
let topic = "purchases";
let users = [
"eabara",
"jsmith",
"sgarcia",
"jbernard",
"htanaka",
"awalther",
];
let items = ["book", "alarm clock", "t-shirts", "gift card", "batteries"];
const producer = await createProducer(config, (err, report) => {
if (err) {
console.warn("Error producing", err);
} else {
const { topic, key, value } = report;
let k = key.toString().padEnd(10, " ");
console.log(
`Produced event to topic ${topic}: key = ${k} value = ${value}`
);
}
});
let numEvents = 10;
for (let idx = 0; idx < numEvents; ++idx) {
const key = users[Math.floor(Math.random() * users.length)];
const value = Buffer.from(items[Math.floor(Math.random() * items.length)]);
producer.produce(topic, -1, value, key);
}
producer.flush(10000, () => {
producer.disconnect();
});
}
produceExample()
.then((result) => console.log(result))
.catch((err) => {
console.error(`Something went wrong:\n${err}`);
process.exit(1);
});