diff --git a/mqtt-auth/pom.xml b/mqtt-auth/pom.xml index 5a0966a..64905cc 100644 --- a/mqtt-auth/pom.xml +++ b/mqtt-auth/pom.xml @@ -5,7 +5,7 @@ mqtt-wk cn.wizzer - 1.0.2-netty + 1.0.3-netty 4.0.0 jar diff --git a/mqtt-broker/pom.xml b/mqtt-broker/pom.xml index 13e22f7..112d2cd 100644 --- a/mqtt-broker/pom.xml +++ b/mqtt-broker/pom.xml @@ -5,7 +5,7 @@ mqtt-wk cn.wizzer - 1.0.2-netty + 1.0.3-netty 4.0.0 jar diff --git a/mqtt-broker/src/main/java/cn/wizzer/iot/mqtt/server/broker/cluster/RedisCluster.java b/mqtt-broker/src/main/java/cn/wizzer/iot/mqtt/server/broker/cluster/RedisCluster.java index 1b11705..44dc2eb 100644 --- a/mqtt-broker/src/main/java/cn/wizzer/iot/mqtt/server/broker/cluster/RedisCluster.java +++ b/mqtt-broker/src/main/java/cn/wizzer/iot/mqtt/server/broker/cluster/RedisCluster.java @@ -1,12 +1,12 @@ package cn.wizzer.iot.mqtt.server.broker.cluster; -import cn.hutool.core.util.HexUtil; import cn.wizzer.iot.mqtt.server.broker.config.BrokerProperties; import cn.wizzer.iot.mqtt.server.broker.internal.InternalMessage; import cn.wizzer.iot.mqtt.server.common.subscribe.SubscribeStore; import cn.wizzer.iot.mqtt.server.store.message.MessageIdService; import cn.wizzer.iot.mqtt.server.store.session.SessionStoreService; import cn.wizzer.iot.mqtt.server.store.subscribe.SubscribeStoreService; +import com.alibaba.fastjson.JSONObject; import io.netty.channel.Channel; import io.netty.channel.ChannelId; import io.netty.channel.group.ChannelGroup; @@ -16,7 +16,6 @@ import org.nutz.integration.jedis.pubsub.PubSubService; import org.nutz.ioc.loader.annotation.Inject; import org.nutz.ioc.loader.annotation.IocBean; -import org.nutz.lang.Lang; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,13 +51,13 @@ public void init() { @Override public void onMessage(String channel, String message) { - InternalMessage internalMessage = Lang.fromBytes(HexUtil.decodeHex(message), InternalMessage.class); + InternalMessage internalMessage = JSONObject.parseObject(message, InternalMessage.class); this.sendPublishMessage(internalMessage.getClientId(), internalMessage.getTopic(), MqttQoS.valueOf(internalMessage.getMqttQoS()), internalMessage.getMessageBytes(), internalMessage.isRetain(), internalMessage.isDup()); } @Async public void sendMessage(InternalMessage internalMessage) { - pubSubService.fire(CLUSTER_TOPIC, HexUtil.encodeHexStr(Lang.toBytes(internalMessage))); + pubSubService.fire(CLUSTER_TOPIC, JSONObject.toJSONString(internalMessage)); } private void sendPublishMessage(String clientId, String topic, MqttQoS mqttQoS, byte[] messageBytes, boolean retain, boolean dup) { @@ -73,7 +72,7 @@ private void sendPublishMessage(String clientId, String topic, MqttQoS mqttQoS, new MqttPublishVariableHeader(topic, 0), ByteBuffer.wrap(messageBytes)); LOGGER.debug("PUBLISH - clientId: {}, topic: {}, Qos: {}", subscribeStore.getClientId(), topic, respQoS.value()); ChannelId channelId = channelIdMap.get(sessionStoreService.get(subscribeStore.getClientId()).getChannelId()); - if(channelId!=null) { + if (channelId != null) { Channel channel = channelGroup.find(channelId); if (channel != null) channel.writeAndFlush(publishMessage); } @@ -85,7 +84,7 @@ private void sendPublishMessage(String clientId, String topic, MqttQoS mqttQoS, new MqttPublishVariableHeader(topic, messageId), ByteBuffer.wrap(messageBytes)); LOGGER.debug("PUBLISH - clientId: {}, topic: {}, Qos: {}, messageId: {}", subscribeStore.getClientId(), topic, respQoS.value(), messageId); ChannelId channelId = channelIdMap.get(sessionStoreService.get(subscribeStore.getClientId()).getChannelId()); - if(channelId!=null) { + if (channelId != null) { Channel channel = channelGroup.find(channelId); if (channel != null) channel.writeAndFlush(publishMessage); } @@ -97,7 +96,7 @@ private void sendPublishMessage(String clientId, String topic, MqttQoS mqttQoS, new MqttPublishVariableHeader(topic, messageId), ByteBuffer.wrap(messageBytes)); LOGGER.debug("PUBLISH - clientId: {}, topic: {}, Qos: {}, messageId: {}", subscribeStore.getClientId(), topic, respQoS.value(), messageId); ChannelId channelId = channelIdMap.get(sessionStoreService.get(subscribeStore.getClientId()).getChannelId()); - if(channelId!=null) { + if (channelId != null) { Channel channel = channelGroup.find(channelId); if (channel != null) channel.writeAndFlush(publishMessage); } diff --git a/mqtt-broker/src/main/java/cn/wizzer/iot/mqtt/server/broker/service/KafkaService.java b/mqtt-broker/src/main/java/cn/wizzer/iot/mqtt/server/broker/service/KafkaService.java index e573cfb..e7efc75 100644 --- a/mqtt-broker/src/main/java/cn/wizzer/iot/mqtt/server/broker/service/KafkaService.java +++ b/mqtt-broker/src/main/java/cn/wizzer/iot/mqtt/server/broker/service/KafkaService.java @@ -1,8 +1,8 @@ package cn.wizzer.iot.mqtt.server.broker.service; -import cn.hutool.core.util.HexUtil; import cn.wizzer.iot.mqtt.server.broker.config.BrokerProperties; import cn.wizzer.iot.mqtt.server.broker.internal.InternalMessage; +import com.alibaba.fastjson.JSONObject; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -29,7 +29,7 @@ public class KafkaService { public void send(InternalMessage internalMessage) { try { //消息体转换为Hex字符串进行转发 - ProducerRecord data = new ProducerRecord<>(brokerProperties.getProducerTopic(), internalMessage.getTopic(), HexUtil.encodeHexStr(internalMessage.getMessageBytes())); + ProducerRecord data = new ProducerRecord<>(brokerProperties.getProducerTopic(), internalMessage.getTopic(), JSONObject.toJSONString(internalMessage)); kafkaProducer.send(data, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { diff --git a/mqtt-common/pom.xml b/mqtt-common/pom.xml index 9ffe06f..b68c560 100644 --- a/mqtt-common/pom.xml +++ b/mqtt-common/pom.xml @@ -5,7 +5,7 @@ mqtt-wk cn.wizzer - 1.0.2-netty + 1.0.3-netty 4.0.0 diff --git a/mqtt-store/pom.xml b/mqtt-store/pom.xml index e9c8e2f..4def4ef 100644 --- a/mqtt-store/pom.xml +++ b/mqtt-store/pom.xml @@ -5,7 +5,7 @@ mqtt-wk cn.wizzer - 1.0.2-netty + 1.0.3-netty 4.0.0 jar diff --git a/mqtt-zoo/mqtt-test-kafka/pom.xml b/mqtt-zoo/mqtt-test-kafka/pom.xml index ed46930..343d835 100644 --- a/mqtt-zoo/mqtt-test-kafka/pom.xml +++ b/mqtt-zoo/mqtt-test-kafka/pom.xml @@ -4,14 +4,14 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> cn.wizzer mqtt-test-kafka - 1.0.0-netty + 1.0.3-netty 4.0.0 jar - 1.0.0-netty + 1.0.3-netty 2.3-SNAPSHOT 4.1.28.Final - 1.2.47 + 1.2.49 4.1.2 2.0.0 1.7.25 diff --git a/mqtt-zoo/mqtt-test-kafka/src/main/java/cn/wizzer/iot/mqtt/server/test/InternalMessage.java b/mqtt-zoo/mqtt-test-kafka/src/main/java/cn/wizzer/iot/mqtt/server/test/InternalMessage.java new file mode 100644 index 0000000..c6797ce --- /dev/null +++ b/mqtt-zoo/mqtt-test-kafka/src/main/java/cn/wizzer/iot/mqtt/server/test/InternalMessage.java @@ -0,0 +1,77 @@ +package cn.wizzer.iot.mqtt.server.test; + +import java.io.Serializable; +/** + * 内部消息 + */ +public class InternalMessage implements Serializable { + + private static final long serialVersionUID = -1L; + + //当前频道clientId + private String clientId; + + private String topic; + + private int mqttQoS; + + private byte[] messageBytes; + + private boolean retain; + + private boolean dup; + + public String getClientId() { + return clientId; + } + + public InternalMessage setClientId(String clientId) { + this.clientId = clientId; + return this; + } + + public String getTopic() { + return topic; + } + + public InternalMessage setTopic(String topic) { + this.topic = topic; + return this; + } + + public int getMqttQoS() { + return mqttQoS; + } + + public InternalMessage setMqttQoS(int mqttQoS) { + this.mqttQoS = mqttQoS; + return this; + } + + public byte[] getMessageBytes() { + return messageBytes; + } + + public InternalMessage setMessageBytes(byte[] messageBytes) { + this.messageBytes = messageBytes; + return this; + } + + public boolean isRetain() { + return retain; + } + + public InternalMessage setRetain(boolean retain) { + this.retain = retain; + return this; + } + + public boolean isDup() { + return dup; + } + + public InternalMessage setDup(boolean dup) { + this.dup = dup; + return this; + } +} diff --git a/mqtt-zoo/mqtt-test-kafka/src/main/java/cn/wizzer/iot/mqtt/server/test/KafkaLauncher.java b/mqtt-zoo/mqtt-test-kafka/src/main/java/cn/wizzer/iot/mqtt/server/test/KafkaLauncher.java index 533acbe..b745a90 100644 --- a/mqtt-zoo/mqtt-test-kafka/src/main/java/cn/wizzer/iot/mqtt/server/test/KafkaLauncher.java +++ b/mqtt-zoo/mqtt-test-kafka/src/main/java/cn/wizzer/iot/mqtt/server/test/KafkaLauncher.java @@ -1,6 +1,6 @@ package cn.wizzer.iot.mqtt.server.test; -import cn.hutool.core.util.HexUtil; +import com.alibaba.fastjson.JSONObject; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -8,7 +8,6 @@ import org.nutz.ioc.impl.PropertiesProxy; import org.nutz.ioc.loader.annotation.Inject; import org.nutz.ioc.loader.annotation.IocBean; -import org.nutz.json.Json; import org.nutz.log.Log; import org.nutz.log.Logs; import org.nutz.mvc.annotation.Modules; @@ -44,15 +43,15 @@ public Properties getProperties() { } public void init() { - KafkaConsumer kafkaConsumer=new KafkaConsumer(getProperties()); + KafkaConsumer kafkaConsumer = new KafkaConsumer(getProperties()); //kafka消费消息,接收MQTT发来的消息 kafkaConsumer.subscribe(Arrays.asList(conf.get("mqttwk.broker.kafka.producer.topic"))); - int sum=0; + int sum = 0; while (true) { ConsumerRecords records = kafkaConsumer.poll(Duration.ofMillis(500)); for (ConsumerRecord record : records) { - log.debugf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), new String(HexUtil.decodeHex(record.value()))); - log.debugf("总计收到 %s条",++sum); + log.debugf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), JSONObject.parseObject(record.value(), InternalMessage.class)); + log.debugf("总计收到 %s条", ++sum); } } diff --git a/pom.xml b/pom.xml index f41a9b5..d026445 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ cn.wizzer mqtt-wk pom - 1.0.2-netty + 1.0.3-netty MqttWk mqtt-common @@ -15,10 +15,10 @@ mqtt-store - 1.0.2-netty + 1.0.3-netty 2.3-SNAPSHOT 4.1.28.Final - 1.2.47 + 1.2.49 4.1.2 2.0.0 UTF-8