From 5811beba391baefad41cd6f8f4fa4e3862098813 Mon Sep 17 00:00:00 2001 From: Daniel Stenberg Date: Thu, 16 Apr 2020 13:20:52 +0200 Subject: mqtt: improve the state machine To handle PUBLISH before SUBACK and more. Updated the existing tests and added three new ones. Reported-by: Christoph Krey Bug: https://curl.haxx.se/mail/lib-2020-04/0021.html Closes #5246 --- tests/server/mqttd.c | 103 ++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 74 insertions(+), 29 deletions(-) (limited to 'tests/server/mqttd.c') diff --git a/tests/server/mqttd.c b/tests/server/mqttd.c index db5723cdd..6785b0014 100644 --- a/tests/server/mqttd.c +++ b/tests/server/mqttd.c @@ -104,6 +104,10 @@ struct configurable { unsigned char version; /* initial version byte in the request must match this */ + bool publish_before_suback; + bool short_publish; + unsigned char error_connack; + int testnum; }; #define REQUEST_DUMP "log/server.input" @@ -124,6 +128,10 @@ static void resetdefaults(void) { logmsg("Reset to defaults"); config.version = CONFIG_VERSION; + config.publish_before_suback = FALSE; + config.short_publish = FALSE; + config.error_connack = 0; + config.testnum = 0; } static unsigned char byteval(char *value) @@ -147,10 +155,29 @@ static void getconfig(void) config.version = byteval(value); logmsg("version [%d] set", config.version); } + else if(!strcmp(key, "PUBLISH-before-SUBACK")) { + logmsg("PUBLISH-before-SUBACK set"); + config.publish_before_suback = TRUE; + } + else if(!strcmp(key, "short-PUBLISH")) { + logmsg("short-PUBLISH set"); + config.short_publish = TRUE; + } + else if(!strcmp(key, "error-CONNACK")) { + config.error_connack = byteval(value); + logmsg("error-CONNACK = %d", config.error_connack); + } + else if(!strcmp(key, "Testnum")) { + config.testnum = atoi(value); + logmsg("testnum = %d", config.testnum); + } } } fclose(fp); } + else { + logmsg("No config file '%s' to read", configfile); + } } static void loghex(unsigned char *buffer, ssize_t len) @@ -209,11 +236,17 @@ static int connack(FILE *dump, curl_socket_t fd) MQTT_MSG_CONNACK, 0x02, 0x00, 0x00 }; - ssize_t rc = swrite(fd, (char *)packet, sizeof(packet)); - if(rc == sizeof(packet)) { - logmsg("WROTE %d bytes [CONACK]", rc); + ssize_t rc; + + packet[3] = config.error_connack; + + rc = swrite(fd, (char *)packet, sizeof(packet)); + if(rc > 0) { + logmsg("WROTE %d bytes [CONNACK]", rc); loghex(packet, rc); - logprotocol(FROM_SERVER, "CONACK", 2, dump, packet, sizeof(packet)); + logprotocol(FROM_SERVER, "CONNACK", 2, dump, packet, sizeof(packet)); + } + if(rc == sizeof(packet)) { return 0; } return 1; @@ -360,6 +393,7 @@ static int publish(FILE *dump, size_t payloadindex; ssize_t remaininglength = topiclen + 2 + payloadlen; ssize_t packetlen; + ssize_t sendamount; ssize_t rc; char rembuffer[4]; int encodedlen; @@ -385,13 +419,18 @@ static int publish(FILE *dump, payloadindex = 3 + topiclen + encodedlen; memcpy(&packet[payloadindex], payload, payloadlen); - rc = swrite(fd, (char *)packet, packetlen); - if(rc == packetlen) { + sendamount = packetlen; + if(config.short_publish) + sendamount -= 2; + + rc = swrite(fd, (char *)packet, sendamount); + if(rc > 0) { logmsg("WROTE %d bytes [PUBLISH]", rc); loghex(packet, rc); logprotocol(FROM_SERVER, "PUBLISH", remaininglength, dump, packet, rc); - return 0; } + if(rc == packetlen) + return 0; return 1; } @@ -459,6 +498,11 @@ static curl_socket_t mqttit(curl_socket_t fd) getconfig(); + testno = config.testnum; + + if(testno) + logmsg("Found test number %ld", testno); + do { /* get the fixed header */ rc = fixedheader(fd, &byte, &remaining_length, &bytes); @@ -506,8 +550,10 @@ static curl_socket_t mqttit(curl_socket_t fd) } } else if(byte == MQTT_MSG_SUBSCRIBE) { - char *testnop; - + FILE *stream; + int error; + char *data; + size_t datalen; logprotocol(FROM_CLIENT, "SUBSCRIBE", remaining_length, dump, buffer, rc); logmsg("Incoming SUBSCRIBE"); @@ -533,26 +579,25 @@ static curl_socket_t mqttit(curl_socket_t fd) /* there's a QoS byte (two bits) after the topic */ logmsg("SUBSCRIBE to '%s' [%d]", topic, packet_id); - if(suback(dump, fd, packet_id)) { - logmsg("failed sending SUBACK"); - goto end; - } - testnop = strrchr(topic, '/'); - if(!testnop) - testnop = topic; - else - testnop++; /* pass the slash */ - testno = strtol(testnop, NULL, 10); - if(testno) { - FILE *stream; - int error; - char *data; - size_t datalen; - logmsg("Found test number %ld", testno); - stream = test2fopen(testno); - error = getpart(&data, &datalen, "reply", "data", stream); - if(!error) - publish(dump, fd, packet_id, topic, data, datalen); + stream = test2fopen(testno); + error = getpart(&data, &datalen, "reply", "data", stream); + if(!error) { + if(!config.publish_before_suback) { + if(suback(dump, fd, packet_id)) { + logmsg("failed sending SUBACK"); + goto end; + } + } + if(publish(dump, fd, packet_id, topic, data, datalen)) { + logmsg("PUBLISH failed"); + goto end; + } + if(config.publish_before_suback) { + if(suback(dump, fd, packet_id)) { + logmsg("failed sending SUBACK"); + goto end; + } + } } else { char *def = (char *)"this is random payload yes yes it is"; -- cgit v1.2.3