Skip to content

Commit

Permalink
less memory utilisation on received msgs (#79)
Browse files Browse the repository at this point in the history
* less memory utilisation on received msgs

making use of scatter/gather io rather than malloc

* better output on examples to show whats occuring

* put topic len inside cb, no change in cb size

* less memory used on recv & less memcpy

* dont print log for every message

* retry read on interrupt
  • Loading branch information
sshanks-kx authored Jan 20, 2025
1 parent 4873680 commit c0a2714
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 63 deletions.
11 changes: 8 additions & 3 deletions examples/consumer.q
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
\l ../q/mqtt.q

// consumer topics
topics:`topic1`topic2

// Define the table schema to handle incoming messages
.mqtt.tab:([]topic:`symbol$();
msg_sent:`timestamp$();
Expand All @@ -11,10 +14,12 @@ cbfn:{[topic;msg]
data:";" vs msg;
.mqtt.tab,:(`$topic;"P"$data 0;.z.p;"S"$data 1)}

.mqtt.msgrcvd:{cbfn[x;y];0N!"Message received"}
.mqtt.msgrcvd:{cbfn[x;y];}

// Connect and subscribe
//.mqtt.conn[`$"tcp://host.docker.internal:1883";`rcv;()!()];
.mqtt.conn[`$"tcp://localhost:1883";`rcv;()!()];
.mqtt.sub[`topic1];
.mqtt.sub[`topic2];

-1"Configured topics: ",","sv string topics;
-1"Populating .mqtt.tab with each received message";
.mqtt.sub each topics;
11 changes: 6 additions & 5 deletions examples/producer.q
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
\l ../q/mqtt.q

// Initialize a counter to allow us to stop after a set number of published messages
n:0
n:1
topics:`topic1`topic2
// Connect to the host
//.mqtt.conn[`$"tcp://host.docker.internal:1883";`src;()!()]
.mqtt.conn[`$"tcp://localhost:1883";`src;()!()]
// Set up a timed message publisher
.z.ts:{if[n>=199;system"t 0"];
.mqtt.pub[`topic1;string[.z.p],";","topic1_",string n];
.mqtt.pub[`topic2;string[.z.p],";","topic2_",string n];
.z.ts:{if[n>=200;0N!"Finished sending ",(string n)," messages to each topic";system"t 0"];
.mqtt.pubx[;string[.z.p],";","topic1_",string n;0;0b] each topics;
n+:1}

-1"Type `\\t 100` to publish a message every 100ms up to 200 messages, to stop type `\\t 0`";
-1"Configured topics: ",","sv string topics;
-1"Type `\\t 100` to publish a message to each topic every 100ms (up to 200 messages), to stop type `\\t 0`";
159 changes: 104 additions & 55 deletions src/mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -368,16 +368,14 @@ EXP K unsub(K topic){

// Callback data structure
typedef struct CallbackDataStr{
union{
char reserved[8]; // reserve space for flags, aligned to 64 bit word
enum{
enum{
MSG_TYPE_SEND = 9876, // arbitrary uncommon value
MSG_TYPE_RCVD,
MSG_TYPE_DISCONN
} msg_type;
} header;
unsigned int topic_len;
union{
long size;
long payload_len;
MQTTClient_deliveryToken dt;
} body;
// Start of dynamic data
Expand All @@ -389,23 +387,59 @@ static void msgsent(void* context, MQTTClient_deliveryToken dt){
// Body contains: <dt>
CallbackData msg;
msg.body.dt = dt;
msg.header.msg_type = MSG_TYPE_SEND;
msg.msg_type = MSG_TYPE_SEND;
send(spair[1], &msg, sizeof(CallbackData), 0);
}

static char* getSysError(char* buf,int len){
buf[0]=0;
#ifdef _WIN32
FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, 0,
WSAGetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), buf,
len, 0);
#else
strerror_r(errno,buf,len);
#endif
return buf;
}

static int msgrcvd(void* context, char* topic, int unused, MQTTClient_message* mq_msg){
// Body contains: <topic_len><topic><payload>
(void)unused;(void)context;
long topic_len = strlen(topic)+1;
long msg_size = sizeof(CallbackData) + topic_len + mq_msg->payloadlen;
CallbackData* msg = malloc(msg_size);
msg->body.size = topic_len + mq_msg->payloadlen;
msg->header.msg_type = MSG_TYPE_RCVD;
char* p = (char*)&(msg[1]);
memcpy(p, topic, topic_len);
memcpy(p += topic_len, mq_msg->payload, mq_msg->payloadlen);
send(spair[1], (char*)msg, msg_size, 0);
free(msg);
#ifdef _WIN32
WSABUF buffers[3];
DWORD bytesSent=0;
#else
struct iovec iov[3];
#endif
unsigned int topic_len=strlen(topic);
CallbackData msg;
msg.msg_type = MSG_TYPE_RCVD;
msg.topic_len = topic_len;
msg.body.payload_len = topic_len + mq_msg->payloadlen;
#ifdef _WIN32
buffers[0].buf=&msg;
buffers[0].len=sizeof(CallbackData);
buffers[1].buf=topic;
buffers[1].len=topic_len;
buffers[2].buf=mq_msg->payload;
buffers[2].len=mq_msg->payloadlen;
if(SOCKET_ERROR==WSASend(spair[1],buffers,3,&bytesSent,0,NULL,NULL)){
char buf[256];
fprintf(stderr, "WSASend error: %s\n", getSysError(buf,sizeof(buf)));
}
#else
iov[0].iov_base=&msg;
iov[0].iov_len=sizeof(CallbackData);
iov[1].iov_base=topic;
iov[1].iov_len=topic_len;
iov[2].iov_base=mq_msg->payload;
iov[2].iov_len=mq_msg->payloadlen;
if(-1==writev(spair[1],iov,sizeof(iov)/sizeof(struct iovec))){
char buf[256];
fprintf(stderr, "send error: %s\n", getSysError(buf,sizeof(buf)));
}
#endif
MQTTClient_freeMessage(&mq_msg);
MQTTClient_free(topic);
return 1;
Expand All @@ -415,8 +449,7 @@ static void disconn(void* context, char* cause){
(void)context;(void)cause;
// Body contains: <>
CallbackData msg;
msg.body.size = 0;
msg.header.msg_type = MSG_TYPE_DISCONN;
msg.msg_type = MSG_TYPE_DISCONN;
send(spair[1], &msg, sizeof(CallbackData), 0);
}

Expand All @@ -434,29 +467,10 @@ static void qmsgsent(MQTTClient_deliveryToken p){
pr0(k(0, (char*)".mqtt.msgsent", kj(p), (K)0));
}

static void qmsgrcvd(char* p,long sz){
K topic = kp(p);
p += topic->n+1;
K msg = kpn(p, sz - (topic->n+1));
pr0(k(0, (char*)".mqtt.msgrcvd", topic, msg, (K)0));
}

static void qdisconn(){
pr0(k(0, (char*)".mqtt.disconn", ktn(0,0), (K)0));
}

static char* getSysError(char* buff,int len){
buff[0]=0;
#ifdef _WIN32
FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, 0,
WSAGetLastError(), MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), buff,
len, 0);
#else
strerror_r(errno,buff,len);
#endif
return buff;
}

/* Socketpair initialization, callback definition and clean up functionality
* detach function initialized at exit, socketpair start issues handled
* callback function set to loop on socketpair connection
Expand All @@ -465,35 +479,70 @@ K mqttCallback(int fd){
CallbackData cb_data;
long rc = recv(fd, (char*)&cb_data, sizeof(cb_data), 0);
if (rc < (long)sizeof(cb_data)){
char buff[256];
fprintf(stderr, "recv(%li) error: %s\n", rc, getSysError(buff,sizeof(buff)));
char buf[256];
fprintf(stderr, "recv(%li) error: %s\n", rc, getSysError(buf,sizeof(buf)));
return (K)0;
}
switch (cb_data.header.msg_type){
switch (cb_data.msg_type){
case MSG_TYPE_SEND:
qmsgsent(cb_data.body.dt);
break;
case MSG_TYPE_RCVD:{
const long expected = cb_data.body.size;
long actual;
char* body = malloc(expected);
for (rc = 0, actual = 0;
actual < expected && rc >= 0;
actual += rc = recv(fd, body + actual, expected - actual, 0));
if (rc < 0){
char buff[256];
fprintf(stderr, "recv(%li) error: %s, expected: %li, actual: %li\n", rc, getSysError(buff,sizeof(buff)), expected, actual);
K topic = ktn(KC,cb_data.topic_len);
K msg = ktn(KC, cb_data.body.payload_len-cb_data.topic_len);
#ifdef _WIN32
DWORD actual=0;
DWORD flags=MSG_WAITALL;
WSABUF buffers[2];
buffers[0].buf=&(kG(topic));
buffers[0].len=cb_data.topic_len;
buffers[1].buf=&(kG(msg));
buffers[1].len=msg->n;
if (WSARecv(fd, buffers, 2, &actual, &flags, NULL, NULL) == SOCKET_ERROR) {
char buf[256];
fprintf(stderr, "WSARecv error: %s\n", getSysError(buf,sizeof(buf)));
}
#else
ssize_t actual=0,c=0,iov_c=2;
struct iovec iov[2];
iov[0].iov_base=&(kG(topic));
iov[0].iov_len=cb_data.topic_len;
iov[1].iov_base=&(kG(msg));
iov[1].iov_len=msg->n;
while(actual<cb_data.body.payload_len){
c=readv(fd,iov,iov_c);
if(c<=0){
if(EINTR==errno)
continue;
else{
char buf[256];
fprintf(stderr, "readv error: %s\n", getSysError(buf,sizeof(buf)));
break;
}
}
actual+=c;
if(actual<cb_data.topic_len){
iov[0].iov_base=&(kG(topic)[actual]);
iov[0].iov_len=cb_data.topic_len-actual;
}else{
iov_c=1;
iov[0].iov_base=&(kG(msg)[actual-cb_data.topic_len]);
iov[0].iov_len=msg->n-(actual-cb_data.topic_len);
}
}
#endif
if(actual==cb_data.body.payload_len)
pr0(k(0, (char*)".mqtt.msgrcvd", topic, msg, (K)0));
else{
r0(topic);r0(msg);
}
else
qmsgrcvd(body, actual);
free(body);
break;
}
case MSG_TYPE_DISCONN:
qdisconn();
break;
default:
fprintf(stderr, "mqttCallback - invalid callback type: %u\n", cb_data.header.msg_type);
fprintf(stderr, "mqttCallback - invalid callback type: %u\n", cb_data.msg_type);
}
return (K)0;
}
Expand All @@ -515,8 +564,8 @@ EXP K init(K UNUSED(X)){
if(!(0==validinit))
return 0;
if(dumb_socketpair(spair,1) == SOCKET_ERROR){
char buff[256];
fprintf(stderr,"Init failed. socketpair: %s\n", getSysError(buff,sizeof(buff)));
char buf[256];
fprintf(stderr,"Init failed. socketpair: %s\n", getSysError(buf,sizeof(buf)));
return 0;
}
pr0(sd1(spair[0], &mqttCallback));
Expand Down

0 comments on commit c0a2714

Please sign in to comment.