Skip to content

Commit

Permalink
Merge pull request #5 from pavelkucera/compression
Browse files Browse the repository at this point in the history
proof of concept: compression support
  • Loading branch information
adamflott authored Jun 13, 2017
2 parents 811eae3 + 5f2cb78 commit f4a1c0c
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 16 deletions.
18 changes: 13 additions & 5 deletions Network/Kafka/Producer.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{-# LANGUAGE FlexibleContexts #-}

module Network.Kafka.Producer where

import Data.Bits ((.&.))
Expand Down Expand Up @@ -33,13 +34,20 @@ produceRequest ra ti ts =

-- | Send messages to partition calculated by 'partitionAndCollate'.
produceMessages :: Kafka m => [TopicAndMessage] -> m [ProduceResponse]
produceMessages tams = do
m <- fmap (fmap groupMessagesToSet) <$> partitionAndCollate tams
produceMessages = prod (groupMessagesToSet NoCompression)

-- | Send compressed messages to partition calculated by 'partitionAndCollate'.
produceCompressedMessages :: Kafka m => CompressionCodec -> [TopicAndMessage] -> m [ProduceResponse]
produceCompressedMessages c = prod (groupMessagesToSet c)

prod :: Kafka m => ([TopicAndMessage] -> MessageSet) -> [TopicAndMessage] -> m [ProduceResponse]
prod g tams = do
m <- fmap (fmap g) <$> partitionAndCollate tams
mapM (uncurry send) $ fmap M.toList <$> M.toList m

-- | Create a protocol message set from a list of messages.
groupMessagesToSet :: [TopicAndMessage] -> MessageSet
groupMessagesToSet xs = MessageSet $ msm <$> xs
groupMessagesToSet :: CompressionCodec -> [TopicAndMessage] -> MessageSet
groupMessagesToSet c xs = MessageSet c $ msm <$> xs
where msm = MessageSetMember (Offset (-1)) . _tamMessage

-- | Group messages together with the leader they should be sent to.
Expand Down Expand Up @@ -99,7 +107,7 @@ defaultMessageKey = Key Nothing

-- | Default: @0@
defaultMessageAttributes :: Attributes
defaultMessageAttributes = 0
defaultMessageAttributes = Attributes NoCompression

-- | Construct a message from a string of bytes using default attributes.
makeMessage :: ByteString -> Message
Expand Down
92 changes: 86 additions & 6 deletions Network/Kafka/Protocol.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import Control.Exception (Exception)
import Control.Lens
import Control.Monad (replicateM, liftM2, liftM3, liftM4, liftM5, unless)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.Bits ((.&.))
import Data.ByteString.Char8 (ByteString)
import Data.ByteString.Lens (unpackedChars)
import Data.Digest.CRC32
Expand All @@ -25,6 +26,8 @@ import System.IO
import Numeric.Lens
import Prelude hiding ((.), id)
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Lazy.Char8 as LB (fromStrict, toStrict)
import qualified Codec.Compression.GZip as GZip (compress, decompress)
import qualified Network

data ReqResp a where
Expand Down Expand Up @@ -142,8 +145,8 @@ newtype Timeout =
newtype Partition =
Partition Int32 deriving (Show, Eq, Serializable, Deserializable, Num, Integral, Ord, Real, Enum)

newtype MessageSet =
MessageSet { _messageSetMembers :: [MessageSetMember] } deriving (Show, Eq)
data MessageSet = MessageSet { _codec :: CompressionCodec, _messageSetMembers :: [MessageSetMember] }
deriving (Show, Eq)
data MessageSetMember =
MessageSetMember { _setOffset :: Offset, _setMessage :: Message } deriving (Show, Eq)

Expand All @@ -153,9 +156,11 @@ newtype Message =
Message { _messageFields :: (Crc, MagicByte, Attributes, Key, Value) }
deriving (Show, Eq, Deserializable)

data CompressionCodec = NoCompressionGzip deriving (Show, Eq)

newtype Crc = Crc Int32 deriving (Show, Eq, Serializable, Deserializable, Num, Integral, Ord, Real, Enum)
newtype MagicByte = MagicByte Int8 deriving (Show, Eq, Serializable, Deserializable, Num, Integral, Ord, Real, Enum)
newtype Attributes = Attributes Int8 deriving (Show, Eq, Serializable, Deserializable, Num, Integral, Ord, Real, Enum)
data Attributes = Attributes { _compressionCodec :: CompressionCodec } deriving (Show, Eq)

newtype Key = Key { _keyBytes :: Maybe KafkaBytes } deriving (Show, Eq)
newtype Value = Value { _valueBytes :: Maybe KafkaBytes } deriving (Show, Eq)
Expand Down Expand Up @@ -299,12 +304,36 @@ instance Serializable KafkaString where
putByteString bs

instance Serializable MessageSet where
serialize (MessageSet ms) = do
let bytes = runPut $ mapM_ serialize ms
serialize (MessageSet codec messageSet) = do
let bytes = runPut $ mapM_ serialize (compress codec messageSet)
l = fromIntegral (B.length bytes) :: Int32
serialize l
putByteString bytes

where compress :: CompressionCodec -> [MessageSetMember] -> [MessageSetMember]
compress NoCompression ms = ms
compress c ms = [MessageSetMember (Offset (-1)) (message c ms)]

message :: CompressionCodec -> [MessageSetMember] -> Message
message c ms = Message (0, 0, Attributes c, Key Nothing, value (compressor c) ms)

compressor :: CompressionCodec -> (ByteString -> ByteString)
compressor c = case c of
Gzip -> LB.toStrict . GZip.compress . LB.fromStrict
_ -> fail "Unsupported compression codec"

value :: (ByteString -> ByteString) -> [MessageSetMember] -> Value
value c ms = Value . Just . KBytes $ c (runPut $ mapM_ serialize ms)

instance Serializable Attributes where
serialize = serialize . bits
where bits :: Attributes -> Int8
bits = codecValue . _compressionCodec

codecValue :: CompressionCodec -> Int8
codecValue NoCompression = 0
codecValue Gzip = 1

instance Serializable KafkaBytes where
serialize (KBytes bs) = do
let l = fromIntegral (B.length bs) :: Int32
Expand Down Expand Up @@ -343,14 +372,51 @@ instance Deserializable MessageSet where
deserialize = do
l <- deserialize :: Get Int32
ms <- isolate (fromIntegral l) getMembers
return $ MessageSet ms

decompressed <- mapM decompress ms

return $ MessageSet NoCompression (concat decompressed)

where getMembers :: Get [MessageSetMember]
getMembers = do
wasEmpty <- isEmpty
if wasEmpty
then return []
else liftM2 (:) deserialize getMembers <|> (remaining >>= getBytes >> return [])

decompress :: MessageSetMember -> Get [MessageSetMember]
decompress m = if isCompressed m
then decompressSetMember m
else return [m]

isCompressed :: MessageSetMember -> Bool
isCompressed = messageCompressed . _setMessage

messageCompressed :: Message -> Bool
messageCompressed (Message (_, _, att, _, _)) = _compressionCodec att /= NoCompression

decompressSetMember :: MessageSetMember -> Get [MessageSetMember]
decompressSetMember (MessageSetMember _ (Message (_, _, att, _, Value v))) = case v of
Just bytes -> decompressMessage (decompressor att) (_kafkaByteString bytes)
Nothing -> fail "Expecting a compressed message set, empty data set received"

decompressor :: Attributes -> (ByteString -> ByteString)
decompressor att = case _compressionCodec att of
Gzip -> LB.toStrict . GZip.decompress . LB.fromStrict
_ -> fail "Unsupported compression codec."

decompressMessage :: (ByteString -> ByteString) -> ByteString -> Get [MessageSetMember]
decompressMessage f = getDecompressedMembers . f

getDecompressedMembers :: ByteString -> Get [MessageSetMember]
getDecompressedMembers "" = return [] -- a compressed empty message
getDecompressedMembers val = do
let res = runGetPartial deserialize val :: Result MessageSetMember
case res of
Fail err _ -> fail err
Partial _ -> fail "Could not consume all available data"
Done v vv -> fmap (v :) (getDecompressedMembers vv)

instance Deserializable MessageSetMember where
deserialize = do
o <- deserialize
Expand All @@ -364,6 +430,20 @@ instance Deserializable Leader where
let l = Leader $ if x == -1 then Nothing else Just x
return l

instance Deserializable Attributes where
deserialize = do
i <- deserialize :: Get Int8
codec <- case compressionCodecFromValue i of
Just c -> return c
Nothing -> fail $ "Unknown compression codec value found in: " ++ show i
return $ Attributes codec

compressionCodecFromValue :: Int8 -> Maybe CompressionCodec
compressionCodecFromValue i | eq 1 = Just Gzip
| eq 0 = Just NoCompression
| otherwise = Nothing
where eq y = i .&. y == y

instance Deserializable KafkaBytes where
deserialize = do
l <- deserialize :: Get Int32
Expand Down
3 changes: 2 additions & 1 deletion milena.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ library
resource-pool >=0.2.3.2 && <0.3,
lifted-base >=0.2.3.6 && <0.3,
murmur-hash >=0.1.0.8 && <0.2,
semigroups >=0.16.2.2 && <0.19
semigroups >=0.16.2.2 && <0.19,
zlib >=0.6.1.2 && <0.7

test-suite test
default-language: Haskell2010
Expand Down
28 changes: 24 additions & 4 deletions test/tests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import Control.Monad.Trans (liftIO)
import Network.Kafka
import Network.Kafka.Consumer
import Network.Kafka.Producer
import Network.Kafka.Protocol (ProduceResponse(..), KafkaError(..))
import Network.Kafka.Protocol (ProduceResponse(..), KafkaError(..), CompressionCodec(..))
import Test.Tasty
import Test.Tasty.Hspec
import Test.Tasty.QuickCheck
Expand All @@ -37,11 +37,16 @@ specs = do
result <- run . produceMessages $ byteMessages ms
result `shouldSatisfy` isRight

prop "can produce multiple messages" $ \(ms, ms') -> do
prop "can produce compressed messages" $ \ms -> do
result <- run . produceCompressedMessages Gzip $ byteMessages ms
result `shouldSatisfy` isRight

prop "can produce multiple messages" $ \(ms, ms', ms'') -> do
result <- run $ do
r1 <- produceMessages $ byteMessages ms
r2 <- produceMessages $ byteMessages ms'
return $ r1 ++ r2
r3 <- produceCompressedMessages Gzip $ byteMessages ms''
return $ r1 ++ r2 ++ r3
result `shouldSatisfy` isRight

prop "can fetch messages" $ do
Expand All @@ -58,7 +63,7 @@ specs = do

case getPartitionByKey (B.pack key) info of
Just PartitionAndLeader { _palLeader = leader, _palPartition = partition } -> do
let payload = [(TopicAndPartition topic partition, groupMessagesToSet messages)]
let payload = [(TopicAndPartition topic partition, groupMessagesToSet NoCompression messages)]
s = stateBrokers . at leader
[(_topicName, [(_, NoError, offset)])] <- _produceResponseFields <$> send leader payload
broker <- findMetadataOrElse [topic] s (KafkaInvalidBroker leader)
Expand All @@ -69,6 +74,21 @@ specs = do

result `shouldBe` Right (tamPayload <$> messages)

prop "can roundtrip compressed messages" $ \(NonEmpty ms) -> do
let messages = byteMessages ms
result <- run $ do
requireAllAcks
produceResps <- produceCompressedMessages Gzip messages

case map _produceResponseFields produceResps of
[[(_topicName, [(partition, NoError, offset)])]] -> do
resp <- fetch offset partition topic
return $ fmap tamPayload . fetchMessages $ resp

_ -> fail "Unexpected produce response"

result `shouldBe` Right (tamPayload <$> messages)

prop "can roundtrip keyed messages" $ \(NonEmpty ms) key -> do
let keyBytes = B.pack key
messages = fmap (TopicAndMessage topic . makeKeyedMessage keyBytes . B.pack) ms
Expand Down

0 comments on commit f4a1c0c

Please sign in to comment.