diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..c1ee67f --- /dev/null +++ b/LICENSE @@ -0,0 +1,29 @@ +BSD 3-Clause License + +Copyright (c) 2019, Uptake +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +* Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..a778cb6 --- /dev/null +++ b/Makefile @@ -0,0 +1,37 @@ +PACKAGE = fRanz +INSTALLDIR = $(HOME)/.$(PACKAGE)/local +OS = $(shell uname -s) + +.PHONY: install smoke test docs roxygen pdf version clean distclean cleanRcpp unlock + +install: unlock clean cleanRcpp + # Install fRanz R package + Rscript -e 'if (!"Rcpp" %in% rownames(installed.packages())) {install.packages("Rcpp", repos = "https://cran.rstudio.com")}' && \ + Rscript -e "Rcpp::compileAttributes(file.path(getwd(),'$(PACKAGE)'))" && \ + Rscript -e 'if (!"devtools" %in% rownames(installed.packages())) {install.packages("devtools", repos = "https://cran.rstudio.com")}' && \ + Rscript -e "devtools::install(file.path(getwd(),'$(PACKAGE)'), force = TRUE, upgrade = FALSE)" + +cleanRcpp: + rm -f fRanz/R/RcppExport.R fRanz/src/RcppExport.cpp + +clean: + # Remove cpp object files + find $(PACKAGE)/src -name '*.o' -delete + find $(PACKAGE)/src -name '*.so' -delete + +distclean: clean cleanRcpp + +unlock: + # Remove 00LOCK-cpproll directory + for libpath in $$(Rscript -e "noquote(paste(.libPaths(), collapse = ' '))"); do \ + echo "Unlocking $$libpath..." && \ + rm -rf $$libpath/00LOCK-$(PACKAGE); \ + done + +docs roxygen: + # Regenerate documentation with roxygen + Rscript -e "roxygen2::roxygenize('$(PACKAGE)')" +test: + # Run unit tests + Rscript -e "devtools::test('$(PACKAGE)')" + diff --git a/README.md b/README.md new file mode 100644 index 0000000..51e71e0 --- /dev/null +++ b/README.md @@ -0,0 +1,52 @@ +# fRranz: An R Kafka Client +**THIS PROJECT IS STILL ALPHA CURRENTLY -- Check back often!!** + + +![](doc/sticker/fRanz.png) + + + +## What is fRanz + +fRanz is an open source R kafka client that allows users to read and write messages from kafka. It leverages the stability and performance of [librdkafka](https://github.com/edenhill/librdkafka) and implements ididiomatic R workflows ontop of it. + + +## Example of sending and reading a message + +```r +library(fRanz) + +BROKER_HOST <- 'localhost' +BROKER_PORT <- 9092 +TOPIC_NAME <- 'myTestTopic' + +# KafkaBroker +broker <- KafkaBroker$new(host=BROKER_HOST, port=BROKER_PORT) + +# KafkaProducer +producer <- KafkaProducer$new(brokers = list(broker)) +producer$produce(topic = TOPIC_NAME, + key = "myKey", + value = "My First Message") +# Number of messages successfuly sent is returned +# [1] 1 + + +#KafkaConsumer +consumer <- KafkaConsumer$new(brokers = list(broker), groupId = "test", extraOptions=list(`auto.offset.reset`="earliest")) +consumer$subscribe(topics = c(TOPIC_NAME)) +result <- consumer$consume(topic=TOPIC_NAME) + +result +# Consumed messages are returned in a list(list(key,val)) format +# [[1]] +# [[1]]$key +# [1] "myKey" +# +# [[1]]$payload +# [1] "My First Message" + +``` + + + diff --git a/doc/sticker/fRanz.png b/doc/sticker/fRanz.png new file mode 100644 index 0000000..14a8060 Binary files /dev/null and b/doc/sticker/fRanz.png differ diff --git a/doc/sticker/generate_sticker.R b/doc/sticker/generate_sticker.R new file mode 100644 index 0000000..58fa141 --- /dev/null +++ b/doc/sticker/generate_sticker.R @@ -0,0 +1,10 @@ +library(hexSticker) +library(showtext) +font_add_google("Reenie Beanie", "reenie") +showtext_auto() +hexSticker::sticker("the_thinker.png", + package = "fRanz", + p_color = "black", + h_fill = "#ABA63E", + p_family = "reenie", + filename = "fRanz.png") \ No newline at end of file diff --git a/doc/sticker/the_thinker.png b/doc/sticker/the_thinker.png new file mode 100644 index 0000000..26b4cd1 Binary files /dev/null and b/doc/sticker/the_thinker.png differ diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..d261213 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,20 @@ +version: '3' + +services: + zookeeper: + image: confluentinc/cp-zookeeper:5.0.0 + ports: + - "2181:2181" + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + + kafka: + image: confluentinc/cp-kafka:5.0.0 + ports: + - "9092:9092" + environment: + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 + KAFKA_BROKER_ID: 1 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + diff --git a/end_to_end.R b/end_to_end.R new file mode 100644 index 0000000..06f8452 --- /dev/null +++ b/end_to_end.R @@ -0,0 +1,48 @@ +library(fRanz) + +# system call to start a local kafka instance +system("docker-compose up -d --build") + +BROKER_HOST <- 'localhost' +BROKER_PORT <- 9092 +TOPIC_NAME <- 'myTestTopic' + +# KafkaBroker +broker <- KafkaBroker$new(host=BROKER_HOST, port=BROKER_PORT) + +# KafkaProducer +producer <- KafkaProducer$new(brokers = list(broker)) +producer$produce(topic = TOPIC_NAME, + key = "myKey", + value = "My First Message") + + + +#KafkaConsumer +consumer <- KafkaConsumer$new(brokers = list(broker), groupId = "test", extraOptions=list(`auto.offset.reset`="earliest")) + +consumer$subscribe(topics = c(TOPIC_NAME)) + +result <- consumer$consume(topic=TOPIC_NAME) + +result + + +#### Multiple messages + +producer <- KafkaProducer$new(brokers = list(broker)) +producer$produce(topic = TOPIC_NAME, + key = "mySecondKey", + value = "My Second Message") + +producer$produce(topic = TOPIC_NAME, + key = "myThirdKey", + value = "My Third Message") + +consumer <- KafkaConsumer$new(brokers = list(broker), groupId = "test", extraOptions=list(`auto.offset.reset`="earliest")) + +consumer$subscribe(topics = c(TOPIC_NAME)) + +result <- consumer$consume(topic=TOPIC_NAME) + +result \ No newline at end of file diff --git a/fRanz/DESCRIPTION b/fRanz/DESCRIPTION new file mode 100644 index 0000000..9f19528 --- /dev/null +++ b/fRanz/DESCRIPTION @@ -0,0 +1,21 @@ +Package: fRanz +Type: Package +Title: An R Kafka Client +Version: 0.1.0 +Date: 2019-05-13 +Authors@R: c( + person("Patrick", "Boueri", email = "patrick.boueri@uptake.com", role = c("aut")), + person("Mike", "Jermann", email = "mike.jermann@uptake.com", role = c("cre")) + ) +Maintainer: Patrick Boueri +Description: An R Kafka Client +License: BSD_3_clause + file LICENSE +Imports: + jsonlite, + Rcpp, + R6, + methods +LinkingTo: Rcpp +RoxygenNote: 6.0.1 +URL: https://github.com/UptakeOpenSource/fRanz +BugReports: https://github.com/UptakeOpenSource/fRanz/issues diff --git a/fRanz/LICENSE b/fRanz/LICENSE new file mode 100644 index 0000000..c1ee67f --- /dev/null +++ b/fRanz/LICENSE @@ -0,0 +1,29 @@ +BSD 3-Clause License + +Copyright (c) 2019, Uptake +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +* Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file diff --git a/fRanz/NAMESPACE b/fRanz/NAMESPACE new file mode 100644 index 0000000..f6b1d95 --- /dev/null +++ b/fRanz/NAMESPACE @@ -0,0 +1,3 @@ +useDynLib(fRanz, .registration=TRUE) +exportPattern("^[[:alpha:]]+") +import(methods, Rcpp) diff --git a/fRanz/R/KafkaBroker.R b/fRanz/R/KafkaBroker.R new file mode 100644 index 0000000..cbe5500 --- /dev/null +++ b/fRanz/R/KafkaBroker.R @@ -0,0 +1,36 @@ +#' @title KafkaBroker +#' @name Kafka Broker +#' @description TDB +#' @importFrom R6 R6Class +KafkaBroker <- R6::R6Class( + classname = "KafkaBroker" + , public = list( + initialize = function(host + , port) { + #TODO: Validate + private$host <- host + private$port <- port + } + + , getHost = function() { + private$host + } + + , getPort = function() { + private$port + } + + , getHostPort = function(){ + return(paste0(private$host,":",private$port)) + } + , getTopics = function() { + #TODO: Implement + return(NULL) + } + ) + , private = list( + host = NULL + , port = NULL + , partitions = NULL + ) +) diff --git a/fRanz/R/KafkaConsumer.R b/fRanz/R/KafkaConsumer.R new file mode 100644 index 0000000..532f331 --- /dev/null +++ b/fRanz/R/KafkaConsumer.R @@ -0,0 +1,40 @@ +#' @title Kakfa Consumer +#' @name KafkaConsumer +#' @description TDB +#' @importFrom R6 R6Class +KafkaConsumer <- R6::R6Class( + classname = "KafkaConsumer" + , public = list( + initialize = function(brokers + , groupId + , extraOptions = list()) { + #TODO: Assert broker class + private$brokers <- brokers + brokerList <- unlist(lapply(brokers,function(x) x$getHostPort())) + private$consumerPtr <- GetRdConsumer(c("metadata.broker.list", "group.id",names(extraOptions)) + ,c(brokerList, groupId,unlist(extraOptions,use.names = FALSE))) + } + + , subscribe = function(topics) { + for (topic in topics) { + result <- RdSubscribe(private$consumerPtr, topic) + if (result == 0) { + private$topics <- c(private$topics, topic) + } + } + } + + , consume = function(topic, numResults=100) { + Filter(function(msg) !is.null(msg), KafkaConsume(private$consumerPtr, numResults)) + } + + , getTopics = function() { + Reduce(c, lapply(brokers, function(broker) {broker$getTopics()})) + } + ) + , private = list( + brokers = NULL + , topics = NULL + , consumerPtr = NULL + ) +) diff --git a/fRanz/R/KafkaProducer.R b/fRanz/R/KafkaProducer.R new file mode 100644 index 0000000..412e391 --- /dev/null +++ b/fRanz/R/KafkaProducer.R @@ -0,0 +1,37 @@ +#' @title Kakfa Producer +#' @name KafkaProducer +#' @description TDB +#' @importFrom R6 R6Class +KafkaProducer <- R6::R6Class( + classname = "KafkaProducer" + , public = list( + initialize = function(brokers,extraOptions=list()) { + #TODO: Assert broker class + private$brokers <- brokers + brokerList <- unlist(lapply(brokers,function(x) x$getHostPort())) + private$producerPtr <- GetRdProducer(c("metadata.broker.list",names(extraOptions)) + ,c(brokerList,unlist(extraOptions,use.names = FALSE))) + } + + # Produce single message to topic + , produce = function(topic + , value + , key) { + KafkaProduce(private$producerPtr + ,topic + ,0 + ,key + ,value) + } + + , getTopics = function() { + #TODO: Get this working + Reduce(c, lapply(brokers, function(broker) {broker$getTopics()})) + } + + ) + , private = list( + brokers = NULL + , producerPtr = NULL + ) +) diff --git a/fRanz/src/Makevars b/fRanz/src/Makevars new file mode 100644 index 0000000..e557180 --- /dev/null +++ b/fRanz/src/Makevars @@ -0,0 +1,23 @@ +INSTALLDIR = $(HOME)/.fRanz/librdkafka +LIBRDKAFKADIR = $(PWD)/../inst/librdkafka-0.11.6 + +PKG_LIBS = -L$(INSTALLDIR)/src-cpp -lrdkafka++ +PKG_CXXFLAGS = -std=c++11 -I$(INSTALLDIR)/src-cpp + +.PHONY: all install_librdkadka + +all: install_librdkadka + +install_librdkadka: + if [ ! -s $(INSTALLDIR)/src-cpp/librdkafka++.a ] ; \ + then \ + mkdir -p $(INSTALLDIR) && \ + cd $(shell dirname $(LIBRDKAFKADIR)) && \ + tar xzf $(LIBRDKAFKADIR).tar.gz && \ + cd $(LIBRDKAFKADIR) && \ + ./configure && \ + $(MAKE) && \ + $(MAKE) install && \ + mv * $(INSTALLDIR) ; \ + fi + diff --git a/fRanz/src/consumer.cpp b/fRanz/src/consumer.cpp new file mode 100644 index 0000000..52027e4 --- /dev/null +++ b/fRanz/src/consumer.cpp @@ -0,0 +1,77 @@ +#include +#include +#include "utils.h" +#include +#include +#include +#include +#include +#include +#include +#include + +//////////////////////////////////////////////////////////////////////////////////////// +//' @title GetRdConsumer +//' @name GetRdConsumer +//' @description TBD +//' @export +// [[Rcpp::export]] +SEXP GetRdConsumer(Rcpp::StringVector keys, Rcpp::StringVector values) { + std::string errstr; + auto conf = MakeKafkaConfig(keys,values); + RdKafka::KafkaConsumer *consumer = RdKafka::KafkaConsumer::create(conf, errstr); + if(!consumer) { + Rcpp::stop("Consumer creation failed with error: " + errstr); + } + Rcpp::XPtr p(consumer, true) ; + return p; +} + +//' @title RdSubscribe +//' @name RdSubscribe +//' @description TBD +//' @export +// [[Rcpp::export]] +int RdSubscribe(SEXP consumerPtr, const Rcpp::StringVector Rtopics) { + Rcpp::XPtr consumer(consumerPtr); + std::vector topics(Rtopics.size()); + for (int i = 0; i < Rtopics.size(); i++){ + topics[i] = Rtopics(i); + } + RdKafka::ErrorCode resp; + resp = consumer->subscribe(topics); + return static_cast(resp); +} + +//' @title KafkaConsume +//' @name KafkaConsume +//' @description TBD +//' @export +// [[Rcpp::export]] +Rcpp::List KafkaConsume(SEXP consumerPtr, int numResults) { + Rcpp::XPtr consumer(consumerPtr); + + Rcpp::List messages(numResults); + for(int i = 0; i < numResults; i++) { + RdKafka::Message *msg = consumer->consume(10000); + switch(msg->err()){ + case RdKafka::ERR_NO_ERROR:{ + printf("Message %.*s\n", + static_cast(msg->len()), + static_cast(msg->payload())); + Rcpp::List message = Rcpp::List::create(Rcpp::Named("key") = *msg->key(), + Rcpp::Named("payload") = static_cast(msg->payload())); + messages[i] = message; + break; + } + default:{ + /* Errors */ + printf("Consume failed: %s", msg->errstr().c_str()); + goto exit_loop; + } + } + } + exit_loop:; + + return messages; +} diff --git a/fRanz/src/producer.cpp b/fRanz/src/producer.cpp new file mode 100644 index 0000000..b260187 --- /dev/null +++ b/fRanz/src/producer.cpp @@ -0,0 +1,51 @@ +#include +#include +#include "utils.h" +#include +#include +#include +#include +#include +#include + +//' @title Kafka Producer +//' @name GetRdProducer +//' @description Gets a handle to a kafka producer +//' @export +// [[Rcpp::export]] +SEXP GetRdProducer(Rcpp::StringVector keys, Rcpp::StringVector values) { + std::string errstr; + auto conf = MakeKafkaConfig(keys,values); + RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr); + if(!producer) { + Rcpp::stop("Producer creation failed with error: " + errstr); + } + Rcpp::XPtr p(producer, true) ; + return p; +} + +//' @title Produce to a topic +//' @name KafkaProduce +//' @description Gets a handle to a kafka producer +//' @export +// [[Rcpp::export]] +int KafkaProduce(SEXP producer_pointer, + SEXP topic, + Rcpp::IntegerVector partition, + SEXP key, + SEXP value) { + Rcpp::XPtr producer(producer_pointer); + std::string s_topic = Rcpp::as(topic); + std::string s_value = Rcpp::as(value); + std::string s_key = Rcpp::as(key); + + RdKafka::ErrorCode resp = producer->produce( + s_topic, partition[0], RdKafka::Producer::RK_MSG_COPY, + const_cast(s_value.c_str()),s_value.size(), + const_cast(s_key.c_str()), s_key.size(), + 0, NULL + ); + producer->flush(0); + + return static_cast(resp); +} diff --git a/fRanz/src/utils.cpp b/fRanz/src/utils.cpp new file mode 100644 index 0000000..8144693 --- /dev/null +++ b/fRanz/src/utils.cpp @@ -0,0 +1,19 @@ +#include +#include + + + +RdKafka::Conf* MakeKafkaConfig(Rcpp::StringVector keys, Rcpp::StringVector values) { + std::string errstr; + RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); + for(int i = 0; i < keys.size(); i ++){ + std::string temp_key = Rcpp::as< std::string >(keys[i]); + std::string temp_value = Rcpp::as< std::string >(values[i]); + if(conf->set(temp_key,temp_value,errstr) != + RdKafka::Conf::CONF_OK){ + throw std::invalid_argument(errstr); + } + } + return conf; +} + diff --git a/fRanz/src/utils.h b/fRanz/src/utils.h new file mode 100644 index 0000000..c4b9156 --- /dev/null +++ b/fRanz/src/utils.h @@ -0,0 +1,4 @@ +#include +#include + +RdKafka::Conf* MakeKafkaConfig(Rcpp::StringVector keys, Rcpp::StringVector values); \ No newline at end of file