From a51152460f8d8d3456c29d7412a1b32eb2383eb9 Mon Sep 17 00:00:00 2001 From: Matthieu Morel Date: Thu, 7 Feb 2019 15:21:19 +0100 Subject: [PATCH] add CreateTopics request --- Network/Kafka.hs | 20 ++++++++++++++++++++ Network/Kafka/Protocol.hs | 22 +++++++++++++++++++++- test/tests.hs | 10 +++++++++- 3 files changed, 50 insertions(+), 2 deletions(-) diff --git a/Network/Kafka.hs b/Network/Kafka.hs index 18ed34c..87597fe 100644 --- a/Network/Kafka.hs +++ b/Network/Kafka.hs @@ -183,6 +183,26 @@ metadata request = withAnyHandle $ flip metadata' request metadata' :: Kafka m => Handle -> MetadataRequest -> m MetadataResponse metadata' h request = makeRequest h $ MetadataRR request + +createTopic :: Kafka m => CreateTopicsRequest -> m CreateTopicsResponse +createTopic request = withAnyHandle $ flip createTopic' request + +createTopic' :: + Kafka m => Handle -> CreateTopicsRequest -> m CreateTopicsResponse +createTopic' h request = makeRequest h $ TopicsRR request + +createTopicsRequest :: + TopicName + -> Partition + -> ReplicationFactor + -> [(Partition, Replicas)] + -> [(KafkaString, Metadata)] + -> CreateTopicsRequest +createTopicsRequest topic partition replication_factor replica_assignment config = + CreateTopicsReq + ([(topic, partition, replication_factor, replica_assignment, config)], defaultRequestTimeout) + + getTopicPartitionLeader :: Kafka m => TopicName -> Partition -> m Broker getTopicPartitionLeader t p = do let s = stateTopicMetadata . at t diff --git a/Network/Kafka/Protocol.hs b/Network/Kafka/Protocol.hs index 756670a..9d9200a 100644 --- a/Network/Kafka/Protocol.hs +++ b/Network/Kafka/Protocol.hs @@ -30,6 +30,7 @@ data ReqResp a where ProduceRR :: MonadIO m => ProduceRequest -> ReqResp (m ProduceResponse) FetchRR :: MonadIO m => FetchRequest -> ReqResp (m FetchResponse) OffsetRR :: MonadIO m => OffsetRequest -> ReqResp (m OffsetResponse) + TopicsRR :: MonadIO m => CreateTopicsRequest -> ReqResp (m CreateTopicsResponse) doRequest' :: (Deserializable a, MonadIO m) => CorrelationId -> Handle -> Request -> m (Either String a) doRequest' correlationId h r = do @@ -51,6 +52,7 @@ doRequest clientId correlationId h (MetadataRR req) = doRequest' correlationId h doRequest clientId correlationId h (ProduceRR req) = doRequest' correlationId h $ Request (correlationId, clientId, ProduceRequest req) doRequest clientId correlationId h (FetchRR req) = doRequest' correlationId h $ Request (correlationId, clientId, FetchRequest req) doRequest clientId correlationId h (OffsetRR req) = doRequest' correlationId h $ Request (correlationId, clientId, OffsetRequest req) +doRequest clientId correlationId h (TopicsRR req) = doRequest' correlationId h $ Request (correlationId, clientId, CreateTopicsRequest req) class Serializable a where serialize :: a -> Put @@ -72,6 +74,7 @@ data RequestMessage = MetadataRequest MetadataRequest | OffsetCommitRequest OffsetCommitRequest | OffsetFetchRequest OffsetFetchRequest | GroupCoordinatorRequest GroupCoordinatorRequest + | CreateTopicsRequest CreateTopicsRequest deriving (Show, Generic, Eq) newtype MetadataRequest = MetadataReq [TopicName] deriving (Show, Eq, Serializable, Generic, Deserializable) @@ -99,6 +102,10 @@ newtype FetchResponse = FetchResp { _fetchResponseFields :: [(TopicName, [(Partition, KafkaError, Offset, MessageSet)])] } deriving (Show, Eq, Serializable, Deserializable, Generic) +newtype CreateTopicsResponse = + TopicsResp { _topicsResponseFields :: [(TopicName, KafkaError)] } + deriving (Show, Eq, Deserializable, Serializable, Generic) + newtype MetadataResponse = MetadataResp { _metadataResponseFields :: ([Broker], [TopicMetadata]) } deriving (Show, Eq, Deserializable, Generic) newtype Broker = Broker { _brokerFields :: (NodeId, Host, Port) } deriving (Show, Eq, Ord, Deserializable, Generic) newtype NodeId = NodeId { _nodeId :: Int32 } deriving (Show, Eq, Deserializable, Num, Integral, Ord, Real, Enum, Generic) @@ -167,10 +174,13 @@ data ResponseMessage = MetadataResponse MetadataResponse | OffsetCommitResponse OffsetCommitResponse | OffsetFetchResponse OffsetFetchResponse | GroupCoordinatorResponse GroupCoordinatorResponse + | CreateTopicsResponse CreateTopicsResponse deriving (Show, Eq, Generic) -newtype GroupCoordinatorRequest = GroupCoordinatorReq ConsumerGroup deriving (Show, Eq, Serializable, Generic) +newtype ReplicationFactor = ReplicationFactor Int16 deriving (Show, Eq, Num, Integral, Ord, Real, Enum, Serializable, Deserializable, Generic) +newtype GroupCoordinatorRequest = GroupCoordinatorReq ConsumerGroup deriving (Show, Eq, Serializable, Generic) +newtype CreateTopicsRequest = CreateTopicsReq ([(TopicName, Partition, ReplicationFactor, [(Partition, Replicas)], [(KafkaString, Metadata)])], Timeout) deriving (Show, Eq, Serializable, Generic) newtype OffsetCommitRequest = OffsetCommitReq (ConsumerGroup, [(TopicName, [(Partition, Offset, Time, Metadata)])]) deriving (Show, Eq, Serializable, Generic) newtype OffsetFetchRequest = OffsetFetchReq (ConsumerGroup, [(TopicName, [Partition])]) deriving (Show, Eq, Serializable, Generic) newtype ConsumerGroup = ConsumerGroup KafkaString deriving (Show, Eq, Serializable, Deserializable, IsString, Generic) @@ -194,6 +204,8 @@ errorKafka OffsetMetadataTooLargeCode = 12 errorKafka OffsetsLoadInProgressCode = 14 errorKafka ConsumerCoordinatorNotAvailableCode = 15 errorKafka NotCoordinatorForConsumerCode = 16 +errorKafka TopicAlreadyExists = 36 +errorKafka UnsupportedCompressionType = 76 data KafkaError = NoError -- ^ @0@ No error--it worked! | Unknown -- ^ @-1@ An unexpected server error @@ -212,6 +224,8 @@ data KafkaError = NoError -- ^ @0@ No error--it worked! | OffsetsLoadInProgressCode -- ^ @14@ The broker returns this error code for an offset fetch request if it is still loading offsets (after a leader change for that offsets topic partition). | ConsumerCoordinatorNotAvailableCode -- ^ @15@ The broker returns this error code for consumer metadata requests or offset commit requests if the offsets topic has not yet been created. | NotCoordinatorForConsumerCode -- ^ @16@ The broker returns this error code if it receives an offset fetch or commit request for a consumer group that it is not a coordinator for. + | TopicAlreadyExists -- ^@36@ Topic with this name already exists. + | UnsupportedCompressionType -- ^@76@ The requesting client does not support the compression type of given partition. deriving (Bounded, Enum, Eq, Generic, Show) instance Serializable KafkaError where @@ -238,6 +252,8 @@ instance Deserializable KafkaError where 14 -> return OffsetsLoadInProgressCode 15 -> return ConsumerCoordinatorNotAvailableCode 16 -> return NotCoordinatorForConsumerCode + 36 -> return TopicAlreadyExists + 76 -> return UnsupportedCompressionType _ -> fail $ "invalid error code: " ++ show x instance Exception KafkaError @@ -269,6 +285,7 @@ apiKey MetadataRequest{} = ApiKey 3 apiKey OffsetCommitRequest{} = ApiKey 8 apiKey OffsetFetchRequest{} = ApiKey 9 apiKey GroupCoordinatorRequest{} = ApiKey 10 +apiKey CreateTopicsRequest{} = ApiKey 19 instance Serializable RequestMessage where serialize (ProduceRequest r) = serialize r @@ -278,6 +295,7 @@ instance Serializable RequestMessage where serialize (OffsetCommitRequest r) = serialize r serialize (OffsetFetchRequest r) = serialize r serialize (GroupCoordinatorRequest r) = serialize r + serialize (CreateTopicsRequest r) = serialize r instance Serializable Int64 where serialize = putWord64be . fromIntegral instance Serializable Int32 where serialize = putWord32be . fromIntegral @@ -524,6 +542,8 @@ makeLenses ''Message makeLenses ''Key makeLenses ''Value +makeLenses ''CreateTopicsResponse + makePrisms ''ResponseMessage -- * Composed lenses diff --git a/test/tests.hs b/test/tests.hs index 625d8ba..c4796d4 100644 --- a/test/tests.hs +++ b/test/tests.hs @@ -11,7 +11,7 @@ import Control.Monad.Trans (liftIO) import Network.Kafka import Network.Kafka.Consumer import Network.Kafka.Producer -import Network.Kafka.Protocol (ProduceResponse(..), KafkaError(..), CompressionCodec(..)) +import Network.Kafka.Protocol (ProduceResponse(..), KafkaError(..), CompressionCodec(..), CreateTopicsResponse(..)) import Test.Tasty import Test.Tasty.Hspec import Test.Tasty.QuickCheck @@ -129,5 +129,13 @@ specs = do use stateAddresses result `shouldBe` fmap NE.nub result + describe "create topics" $ + it "create topics with multiple partitions" $ do + let t = "milena-test-13-partitions" + result <- run $ do + stateAddresses %= NE.cons ("localhost", 9092) + createTopic (createTopicsRequest t 13 1 [] []) + result `shouldBe` (Right $ TopicsResp [(t, NoError)]) + prop :: Testable prop => String -> prop -> SpecWith () prop s = it s . property