Skip to content

Commit

Permalink
Merge pull request #6 from matth-/CreateTopics
Browse files Browse the repository at this point in the history
add CreateTopics request
  • Loading branch information
adamflott authored Feb 7, 2019
2 parents 23f4ee1 + a511524 commit d51fa99
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 2 deletions.
20 changes: 20 additions & 0 deletions Network/Kafka.hs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,26 @@ metadata request = withAnyHandle $ flip metadata' request
metadata' :: Kafka m => Handle -> MetadataRequest -> m MetadataResponse
metadata' h request = makeRequest h $ MetadataRR request


createTopic :: Kafka m => CreateTopicsRequest -> m CreateTopicsResponse
createTopic request = withAnyHandle $ flip createTopic' request

createTopic' ::
Kafka m => Handle -> CreateTopicsRequest -> m CreateTopicsResponse
createTopic' h request = makeRequest h $ TopicsRR request

createTopicsRequest ::
TopicName
-> Partition
-> ReplicationFactor
-> [(Partition, Replicas)]
-> [(KafkaString, Metadata)]
-> CreateTopicsRequest
createTopicsRequest topic partition replication_factor replica_assignment config =
CreateTopicsReq
([(topic, partition, replication_factor, replica_assignment, config)], defaultRequestTimeout)


getTopicPartitionLeader :: Kafka m => TopicName -> Partition -> m Broker
getTopicPartitionLeader t p = do
let s = stateTopicMetadata . at t
Expand Down
22 changes: 21 additions & 1 deletion Network/Kafka/Protocol.hs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ data ReqResp a where
ProduceRR :: MonadIO m => ProduceRequest -> ReqResp (m ProduceResponse)
FetchRR :: MonadIO m => FetchRequest -> ReqResp (m FetchResponse)
OffsetRR :: MonadIO m => OffsetRequest -> ReqResp (m OffsetResponse)
TopicsRR :: MonadIO m => CreateTopicsRequest -> ReqResp (m CreateTopicsResponse)

doRequest' :: (Deserializable a, MonadIO m) => CorrelationId -> Handle -> Request -> m (Either String a)
doRequest' correlationId h r = do
Expand All @@ -51,6 +52,7 @@ doRequest clientId correlationId h (MetadataRR req) = doRequest' correlationId h
doRequest clientId correlationId h (ProduceRR req) = doRequest' correlationId h $ Request (correlationId, clientId, ProduceRequest req)
doRequest clientId correlationId h (FetchRR req) = doRequest' correlationId h $ Request (correlationId, clientId, FetchRequest req)
doRequest clientId correlationId h (OffsetRR req) = doRequest' correlationId h $ Request (correlationId, clientId, OffsetRequest req)
doRequest clientId correlationId h (TopicsRR req) = doRequest' correlationId h $ Request (correlationId, clientId, CreateTopicsRequest req)

class Serializable a where
serialize :: a -> Put
Expand All @@ -72,6 +74,7 @@ data RequestMessage = MetadataRequest MetadataRequest
| OffsetCommitRequest OffsetCommitRequest
| OffsetFetchRequest OffsetFetchRequest
| GroupCoordinatorRequest GroupCoordinatorRequest
| CreateTopicsRequest CreateTopicsRequest
deriving (Show, Generic, Eq)

newtype MetadataRequest = MetadataReq [TopicName] deriving (Show, Eq, Serializable, Generic, Deserializable)
Expand Down Expand Up @@ -99,6 +102,10 @@ newtype FetchResponse =
FetchResp { _fetchResponseFields :: [(TopicName, [(Partition, KafkaError, Offset, MessageSet)])] }
deriving (Show, Eq, Serializable, Deserializable, Generic)

newtype CreateTopicsResponse =
TopicsResp { _topicsResponseFields :: [(TopicName, KafkaError)] }
deriving (Show, Eq, Deserializable, Serializable, Generic)

newtype MetadataResponse = MetadataResp { _metadataResponseFields :: ([Broker], [TopicMetadata]) } deriving (Show, Eq, Deserializable, Generic)
newtype Broker = Broker { _brokerFields :: (NodeId, Host, Port) } deriving (Show, Eq, Ord, Deserializable, Generic)
newtype NodeId = NodeId { _nodeId :: Int32 } deriving (Show, Eq, Deserializable, Num, Integral, Ord, Real, Enum, Generic)
Expand Down Expand Up @@ -167,10 +174,13 @@ data ResponseMessage = MetadataResponse MetadataResponse
| OffsetCommitResponse OffsetCommitResponse
| OffsetFetchResponse OffsetFetchResponse
| GroupCoordinatorResponse GroupCoordinatorResponse
| CreateTopicsResponse CreateTopicsResponse
deriving (Show, Eq, Generic)

newtype GroupCoordinatorRequest = GroupCoordinatorReq ConsumerGroup deriving (Show, Eq, Serializable, Generic)
newtype ReplicationFactor = ReplicationFactor Int16 deriving (Show, Eq, Num, Integral, Ord, Real, Enum, Serializable, Deserializable, Generic)

newtype GroupCoordinatorRequest = GroupCoordinatorReq ConsumerGroup deriving (Show, Eq, Serializable, Generic)
newtype CreateTopicsRequest = CreateTopicsReq ([(TopicName, Partition, ReplicationFactor, [(Partition, Replicas)], [(KafkaString, Metadata)])], Timeout) deriving (Show, Eq, Serializable, Generic)
newtype OffsetCommitRequest = OffsetCommitReq (ConsumerGroup, [(TopicName, [(Partition, Offset, Time, Metadata)])]) deriving (Show, Eq, Serializable, Generic)
newtype OffsetFetchRequest = OffsetFetchReq (ConsumerGroup, [(TopicName, [Partition])]) deriving (Show, Eq, Serializable, Generic)
newtype ConsumerGroup = ConsumerGroup KafkaString deriving (Show, Eq, Serializable, Deserializable, IsString, Generic)
Expand All @@ -194,6 +204,8 @@ errorKafka OffsetMetadataTooLargeCode = 12
errorKafka OffsetsLoadInProgressCode = 14
errorKafka ConsumerCoordinatorNotAvailableCode = 15
errorKafka NotCoordinatorForConsumerCode = 16
errorKafka TopicAlreadyExists = 36
errorKafka UnsupportedCompressionType = 76

data KafkaError = NoError -- ^ @0@ No error--it worked!
| Unknown -- ^ @-1@ An unexpected server error
Expand All @@ -212,6 +224,8 @@ data KafkaError = NoError -- ^ @0@ No error--it worked!
| OffsetsLoadInProgressCode -- ^ @14@ The broker returns this error code for an offset fetch request if it is still loading offsets (after a leader change for that offsets topic partition).
| ConsumerCoordinatorNotAvailableCode -- ^ @15@ The broker returns this error code for consumer metadata requests or offset commit requests if the offsets topic has not yet been created.
| NotCoordinatorForConsumerCode -- ^ @16@ The broker returns this error code if it receives an offset fetch or commit request for a consumer group that it is not a coordinator for.
| TopicAlreadyExists -- ^@36@ Topic with this name already exists.
| UnsupportedCompressionType -- ^@76@ The requesting client does not support the compression type of given partition.
deriving (Bounded, Enum, Eq, Generic, Show)

instance Serializable KafkaError where
Expand All @@ -238,6 +252,8 @@ instance Deserializable KafkaError where
14 -> return OffsetsLoadInProgressCode
15 -> return ConsumerCoordinatorNotAvailableCode
16 -> return NotCoordinatorForConsumerCode
36 -> return TopicAlreadyExists
76 -> return UnsupportedCompressionType
_ -> fail $ "invalid error code: " ++ show x

instance Exception KafkaError
Expand Down Expand Up @@ -269,6 +285,7 @@ apiKey MetadataRequest{} = ApiKey 3
apiKey OffsetCommitRequest{} = ApiKey 8
apiKey OffsetFetchRequest{} = ApiKey 9
apiKey GroupCoordinatorRequest{} = ApiKey 10
apiKey CreateTopicsRequest{} = ApiKey 19

instance Serializable RequestMessage where
serialize (ProduceRequest r) = serialize r
Expand All @@ -278,6 +295,7 @@ instance Serializable RequestMessage where
serialize (OffsetCommitRequest r) = serialize r
serialize (OffsetFetchRequest r) = serialize r
serialize (GroupCoordinatorRequest r) = serialize r
serialize (CreateTopicsRequest r) = serialize r

instance Serializable Int64 where serialize = putWord64be . fromIntegral
instance Serializable Int32 where serialize = putWord32be . fromIntegral
Expand Down Expand Up @@ -524,6 +542,8 @@ makeLenses ''Message
makeLenses ''Key
makeLenses ''Value

makeLenses ''CreateTopicsResponse

makePrisms ''ResponseMessage

-- * Composed lenses
Expand Down
10 changes: 9 additions & 1 deletion test/tests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import Control.Monad.Trans (liftIO)
import Network.Kafka
import Network.Kafka.Consumer
import Network.Kafka.Producer
import Network.Kafka.Protocol (ProduceResponse(..), KafkaError(..), CompressionCodec(..))
import Network.Kafka.Protocol (ProduceResponse(..), KafkaError(..), CompressionCodec(..), CreateTopicsResponse(..))
import Test.Tasty
import Test.Tasty.Hspec
import Test.Tasty.QuickCheck
Expand Down Expand Up @@ -129,5 +129,13 @@ specs = do
use stateAddresses
result `shouldBe` fmap NE.nub result

describe "create topics" $
it "create topics with multiple partitions" $ do
let t = "milena-test-13-partitions"
result <- run $ do
stateAddresses %= NE.cons ("localhost", 9092)
createTopic (createTopicsRequest t 13 1 [] [])
result `shouldBe` (Right $ TopicsResp [(t, NoError)])

prop :: Testable prop => String -> prop -> SpecWith ()
prop s = it s . property

0 comments on commit d51fa99

Please sign in to comment.