Skip to content

Commit

Permalink
Merge pull request #1 in OSC/franz from oss_review to master
Browse files Browse the repository at this point in the history
* commit '8270c4a60be2f18eb1289eda8674032416d2bca4':
  fix(ossreview): Added checks for NULL ptrs in consumer/producer creation
  feature(init): Porting project over for OSS review
  • Loading branch information
patrick-boueri committed May 15, 2019
2 parents 5e439a0 + 8270c4a commit 5361131
Show file tree
Hide file tree
Showing 19 changed files with 536 additions and 0 deletions.
29 changes: 29 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
BSD 3-Clause License

Copyright (c) 2019, Uptake
All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.

* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.

* Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
37 changes: 37 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
PACKAGE = fRanz
INSTALLDIR = $(HOME)/.$(PACKAGE)/local
OS = $(shell uname -s)

.PHONY: install smoke test docs roxygen pdf version clean distclean cleanRcpp unlock

install: unlock clean cleanRcpp
# Install fRanz R package
Rscript -e 'if (!"Rcpp" %in% rownames(installed.packages())) {install.packages("Rcpp", repos = "https://cran.rstudio.com")}' && \
Rscript -e "Rcpp::compileAttributes(file.path(getwd(),'$(PACKAGE)'))" && \
Rscript -e 'if (!"devtools" %in% rownames(installed.packages())) {install.packages("devtools", repos = "https://cran.rstudio.com")}' && \
Rscript -e "devtools::install(file.path(getwd(),'$(PACKAGE)'), force = TRUE, upgrade = FALSE)"

cleanRcpp:
rm -f fRanz/R/RcppExport.R fRanz/src/RcppExport.cpp

clean:
# Remove cpp object files
find $(PACKAGE)/src -name '*.o' -delete
find $(PACKAGE)/src -name '*.so' -delete

distclean: clean cleanRcpp

unlock:
# Remove 00LOCK-cpproll directory
for libpath in $$(Rscript -e "noquote(paste(.libPaths(), collapse = ' '))"); do \
echo "Unlocking $$libpath..." && \
rm -rf $$libpath/00LOCK-$(PACKAGE); \
done

docs roxygen:
# Regenerate documentation with roxygen
Rscript -e "roxygen2::roxygenize('$(PACKAGE)')"
test:
# Run unit tests
Rscript -e "devtools::test('$(PACKAGE)')"

52 changes: 52 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# fRranz: An R Kafka Client
**THIS PROJECT IS STILL ALPHA CURRENTLY -- Check back often!!**


![](doc/sticker/fRanz.png)



## What is fRanz

fRanz is an open source R kafka client that allows users to read and write messages from kafka. It leverages the stability and performance of [librdkafka](https://github.com/edenhill/librdkafka) and implements ididiomatic R workflows ontop of it.


## Example of sending and reading a message

```r
library(fRanz)

BROKER_HOST <- 'localhost'
BROKER_PORT <- 9092
TOPIC_NAME <- 'myTestTopic'

# KafkaBroker
broker <- KafkaBroker$new(host=BROKER_HOST, port=BROKER_PORT)

# KafkaProducer
producer <- KafkaProducer$new(brokers = list(broker))
producer$produce(topic = TOPIC_NAME,
key = "myKey",
value = "My First Message")
# Number of messages successfuly sent is returned
# [1] 1


#KafkaConsumer
consumer <- KafkaConsumer$new(brokers = list(broker), groupId = "test", extraOptions=list(`auto.offset.reset`="earliest"))
consumer$subscribe(topics = c(TOPIC_NAME))
result <- consumer$consume(topic=TOPIC_NAME)

result
# Consumed messages are returned in a list(list(key,val)) format
# [[1]]
# [[1]]$key
# [1] "myKey"
#
# [[1]]$payload
# [1] "My First Message"

```



Binary file added doc/sticker/fRanz.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
10 changes: 10 additions & 0 deletions doc/sticker/generate_sticker.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
library(hexSticker)
library(showtext)
font_add_google("Reenie Beanie", "reenie")
showtext_auto()
hexSticker::sticker("the_thinker.png",
package = "fRanz",
p_color = "black",
h_fill = "#ABA63E",
p_family = "reenie",
filename = "fRanz.png")
Binary file added doc/sticker/the_thinker.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
20 changes: 20 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
version: '3'

services:
zookeeper:
image: confluentinc/cp-zookeeper:5.0.0
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181

kafka:
image: confluentinc/cp-kafka:5.0.0
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

48 changes: 48 additions & 0 deletions end_to_end.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
library(fRanz)

# system call to start a local kafka instance
system("docker-compose up -d --build")

BROKER_HOST <- 'localhost'
BROKER_PORT <- 9092
TOPIC_NAME <- 'myTestTopic'

# KafkaBroker
broker <- KafkaBroker$new(host=BROKER_HOST, port=BROKER_PORT)

# KafkaProducer
producer <- KafkaProducer$new(brokers = list(broker))
producer$produce(topic = TOPIC_NAME,
key = "myKey",
value = "My First Message")



#KafkaConsumer
consumer <- KafkaConsumer$new(brokers = list(broker), groupId = "test", extraOptions=list(`auto.offset.reset`="earliest"))

consumer$subscribe(topics = c(TOPIC_NAME))

result <- consumer$consume(topic=TOPIC_NAME)

result


#### Multiple messages

producer <- KafkaProducer$new(brokers = list(broker))
producer$produce(topic = TOPIC_NAME,
key = "mySecondKey",
value = "My Second Message")

producer$produce(topic = TOPIC_NAME,
key = "myThirdKey",
value = "My Third Message")

consumer <- KafkaConsumer$new(brokers = list(broker), groupId = "test", extraOptions=list(`auto.offset.reset`="earliest"))

consumer$subscribe(topics = c(TOPIC_NAME))

result <- consumer$consume(topic=TOPIC_NAME)

result
21 changes: 21 additions & 0 deletions fRanz/DESCRIPTION
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
Package: fRanz
Type: Package
Title: An R Kafka Client
Version: 0.1.0
Date: 2019-05-13
Authors@R: c(
person("Patrick", "Boueri", email = "patrick.boueri@uptake.com", role = c("aut")),
person("Mike", "Jermann", email = "mike.jermann@uptake.com", role = c("cre"))
)
Maintainer: Patrick Boueri <patrick.boueri@uptake.com>
Description: An R Kafka Client
License: BSD_3_clause + file LICENSE
Imports:
jsonlite,
Rcpp,
R6,
methods
LinkingTo: Rcpp
RoxygenNote: 6.0.1
URL: https://github.com/UptakeOpenSource/fRanz
BugReports: https://github.com/UptakeOpenSource/fRanz/issues
29 changes: 29 additions & 0 deletions fRanz/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
BSD 3-Clause License

Copyright (c) 2019, Uptake
All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.

* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.

* Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
3 changes: 3 additions & 0 deletions fRanz/NAMESPACE
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
useDynLib(fRanz, .registration=TRUE)
exportPattern("^[[:alpha:]]+")
import(methods, Rcpp)
36 changes: 36 additions & 0 deletions fRanz/R/KafkaBroker.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#' @title KafkaBroker
#' @name Kafka Broker
#' @description TDB
#' @importFrom R6 R6Class
KafkaBroker <- R6::R6Class(
classname = "KafkaBroker"
, public = list(
initialize = function(host
, port) {
#TODO: Validate
private$host <- host
private$port <- port
}

, getHost = function() {
private$host
}

, getPort = function() {
private$port
}

, getHostPort = function(){
return(paste0(private$host,":",private$port))
}
, getTopics = function() {
#TODO: Implement
return(NULL)
}
)
, private = list(
host = NULL
, port = NULL
, partitions = NULL
)
)
40 changes: 40 additions & 0 deletions fRanz/R/KafkaConsumer.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#' @title Kakfa Consumer
#' @name KafkaConsumer
#' @description TDB
#' @importFrom R6 R6Class
KafkaConsumer <- R6::R6Class(
classname = "KafkaConsumer"
, public = list(
initialize = function(brokers
, groupId
, extraOptions = list()) {
#TODO: Assert broker class
private$brokers <- brokers
brokerList <- unlist(lapply(brokers,function(x) x$getHostPort()))
private$consumerPtr <- GetRdConsumer(c("metadata.broker.list", "group.id",names(extraOptions))
,c(brokerList, groupId,unlist(extraOptions,use.names = FALSE)))
}

, subscribe = function(topics) {
for (topic in topics) {
result <- RdSubscribe(private$consumerPtr, topic)
if (result == 0) {
private$topics <- c(private$topics, topic)
}
}
}

, consume = function(topic, numResults=100) {
Filter(function(msg) !is.null(msg), KafkaConsume(private$consumerPtr, numResults))
}

, getTopics = function() {
Reduce(c, lapply(brokers, function(broker) {broker$getTopics()}))
}
)
, private = list(
brokers = NULL
, topics = NULL
, consumerPtr = NULL
)
)
37 changes: 37 additions & 0 deletions fRanz/R/KafkaProducer.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#' @title Kakfa Producer
#' @name KafkaProducer
#' @description TDB
#' @importFrom R6 R6Class
KafkaProducer <- R6::R6Class(
classname = "KafkaProducer"
, public = list(
initialize = function(brokers,extraOptions=list()) {
#TODO: Assert broker class
private$brokers <- brokers
brokerList <- unlist(lapply(brokers,function(x) x$getHostPort()))
private$producerPtr <- GetRdProducer(c("metadata.broker.list",names(extraOptions))
,c(brokerList,unlist(extraOptions,use.names = FALSE)))
}

# Produce single message to topic
, produce = function(topic
, value
, key) {
KafkaProduce(private$producerPtr
,topic
,0
,key
,value)
}

, getTopics = function() {
#TODO: Get this working
Reduce(c, lapply(brokers, function(broker) {broker$getTopics()}))
}

)
, private = list(
brokers = NULL
, producerPtr = NULL
)
)
Loading

0 comments on commit 5361131

Please sign in to comment.