diff options
author | Daniel Stenberg <daniel@haxx.se> | 2020-04-16 13:20:52 +0200 |
---|---|---|
committer | Daniel Stenberg <daniel@haxx.se> | 2020-04-20 08:09:33 +0200 |
commit | 5811beba391baefad41cd6f8f4fa4e3862098813 (patch) | |
tree | 572fc2d609408300d6d4946476d5f2f41d09e96e | |
parent | d1a2816b4128faa8ebc50ce93285c7364652856e (diff) |
mqtt: improve the state machine
To handle PUBLISH before SUBACK and more.
Updated the existing tests and added three new ones.
Reported-by: Christoph Krey
Bug: https://curl.haxx.se/mail/lib-2020-04/0021.html
Closes #5246
-rw-r--r-- | lib/mqtt.c | 180 | ||||
-rw-r--r-- | lib/mqtt.h | 24 | ||||
-rw-r--r-- | tests/data/Makefile.inc | 2 | ||||
-rw-r--r-- | tests/data/test1190 | 2 | ||||
-rw-r--r-- | tests/data/test1191 | 2 | ||||
-rw-r--r-- | tests/data/test1192 | 2 | ||||
-rw-r--r-- | tests/data/test1193 | 2 | ||||
-rw-r--r-- | tests/data/test1194 | 59 | ||||
-rw-r--r-- | tests/data/test1195 | 63 | ||||
-rw-r--r-- | tests/data/test1196 | 62 | ||||
-rw-r--r-- | tests/server/mqttd.c | 103 |
11 files changed, 402 insertions, 99 deletions
diff --git a/lib/mqtt.c b/lib/mqtt.c index 3e244694d..35c1b3e83 100644 --- a/lib/mqtt.c +++ b/lib/mqtt.c @@ -51,8 +51,8 @@ #define MQTT_MSG_SUBACK 0x90 #define MQTT_MSG_DISCONNECT 0xe0 -#define MQTT_CONNACK_LEN 4 -#define MQTT_SUBACK_LEN 5 +#define MQTT_CONNACK_LEN 2 +#define MQTT_SUBACK_LEN 3 #define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */ /* @@ -194,13 +194,9 @@ static CURLcode mqtt_verify_connack(struct connectdata *conn) } /* verify CONNACK */ - if(readbuf[0] != MQTT_MSG_CONNACK || - readbuf[1] != 0x02 || - readbuf[2] != 0x00 || - readbuf[3] != 0x00) { - failf(data, "Expected %02x%02x%02x%02x but got %02x%02x%02x%02x", - MQTT_MSG_CONNACK, 0x02, 0x00, 0x00, - readbuf[0], readbuf[1], readbuf[2], readbuf[3]); + if(readbuf[0] != 0x00 || readbuf[1] != 0x00) { + failf(data, "Expected %02x%02x but got %02x%02x", + 0x00, 0x00, readbuf[0], readbuf[1]); result = CURLE_WEIRD_SERVER_REPLY; } @@ -285,6 +281,9 @@ fail: return result; } +/* + * Called when the first byte was already read. + */ static CURLcode mqtt_verify_suback(struct connectdata *conn) { CURLcode result; @@ -307,11 +306,9 @@ static CURLcode mqtt_verify_suback(struct connectdata *conn) } /* verify SUBACK */ - if(readbuf[0] != MQTT_MSG_SUBACK || - readbuf[1] != 0x03 || - readbuf[2] != ((mqtt->packetid >> 8) & 0xff) || - readbuf[3] != (mqtt->packetid & 0xff) || - readbuf[4] != 0x00) + if(readbuf[0] != ((mqtt->packetid >> 8) & 0xff) || + readbuf[1] != (mqtt->packetid & 0xff) || + readbuf[2] != 0x00) result = CURLE_WEIRD_SERVER_REPLY; fail: @@ -377,67 +374,97 @@ static size_t mqtt_decode_len(unsigned char *buf, mult *= 128; } - *lenbytes = i; + if(lenbytes) + *lenbytes = i; return len; } +#ifdef CURLDEBUG +static const char *statenames[]={ + "MQTT_FIRST", + "MQTT_REMAINING_LENGTH", + "MQTT_CONNACK", + "MQTT_SUBACK", + "MQTT_SUBACK_COMING", + "MQTT_PUBWAIT", + "MQTT_PUB_REMAIN" +}; +#endif + +/* The only way to change state */ +static void mqstate(struct connectdata *conn, + enum mqttstate state, + enum mqttstate nextstate) /* used if state == FIRST */ +{ + struct mqtt_conn *mqtt = &conn->proto.mqtt; +#ifdef CURLDEBUG + infof(conn->data, "%s (from %s) (next is %s)\n", + statenames[state], + statenames[mqtt->state], + (state == MQTT_FIRST)? statenames[nextstate] : ""); +#endif + mqtt->state = state; + if(state == MQTT_FIRST) + mqtt->nextstate = nextstate; +} + + /* for the publish packet */ #define MQTT_HEADER_LEN 5 /* max 5 bytes */ static CURLcode mqtt_read_publish(struct connectdata *conn, bool *done) { - CURLcode result; + CURLcode result = CURLE_OK; curl_socket_t sockfd = conn->sock[FIRSTSOCKET]; ssize_t nread; struct Curl_easy *data = conn->data; unsigned char *pkt = (unsigned char *)data->state.buffer; - size_t remlen, lenbytes; + size_t remlen; struct mqtt_conn *mqtt = &conn->proto.mqtt; struct MQTT *mq = data->req.protop; + unsigned char packet; switch(mqtt->state) { - case MQTT_SUBWAIT: - /* Read the initial byte and the entire Remaining Length field - in this state */ - result = Curl_read(conn, sockfd, (char *)&pkt[mq->npacket], 1, &nread); + MQTT_SUBACK_COMING: + case MQTT_SUBACK_COMING: + result = mqtt_verify_suback(conn); if(result) + break; + + mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT); + break; + + case MQTT_SUBACK: + case MQTT_PUBWAIT: + /* we are expecting PUBLISH or SUBACK */ + packet = mq->firstbyte & 0xf0; + if(packet == MQTT_MSG_PUBLISH) + mqstate(conn, MQTT_PUB_REMAIN, MQTT_NOSTATE); + else if(packet == MQTT_MSG_SUBACK) { + mqstate(conn, MQTT_SUBACK_COMING, MQTT_NOSTATE); + goto MQTT_SUBACK_COMING; + } + else if(packet == MQTT_MSG_DISCONNECT) { + infof(data, "Got DISCONNECT\n"); + *done = TRUE; goto end; - if(data->set.verbose) - Curl_debug(data, CURLINFO_HEADER_IN, (char *)&pkt[mq->npacket], 1); - /* we are expecting a PUBLISH message */ - if(!mq->npacket && ((pkt[0] & 0xf0) != MQTT_MSG_PUBLISH)) { - if(pkt[0] == MQTT_MSG_DISCONNECT) { - infof(data, "Got DISCONNECT\n"); - *done = TRUE; - goto end; - } + } + else { result = CURLE_WEIRD_SERVER_REPLY; goto end; } - else if((mq->npacket >= 1) && !(pkt[mq->npacket] & 0x80)) - /* as long as the high bit is set in the length byte, we read one more - byte, then get the remainder of the PUBLISH */ - mqtt->state = MQTT_SUB_REMAIN; - mq->npacket++; - if(mqtt->state == MQTT_SUBWAIT) - return result; /* -- switched state -- */ - - /* remember the first byte */ - mq->firstbyte = pkt[0]; - - remlen = mqtt_decode_len(&pkt[1], 4, &lenbytes); - + remlen = mq->remaining_length; infof(data, "Remaining length: %zd bytes\n", remlen); Curl_pgrsSetDownloadSize(data, remlen); data->req.bytecount = 0; data->req.size = remlen; mq->npacket = remlen; /* get this many bytes */ /* FALLTHROUGH */ - case MQTT_SUB_REMAIN: { + case MQTT_PUB_REMAIN: { /* read rest of packet, but no more. Cap to buffer size */ struct SingleRequest *k = &data->req; size_t rest = mq->npacket; @@ -450,6 +477,11 @@ static CURLcode mqtt_read_publish(struct connectdata *conn, } goto end; } + if(!nread) { + infof(data, "server disconnected\n"); + result = CURLE_PARTIAL_FILE; + goto end; + } if(data->set.verbose) Curl_debug(data, CURLINFO_DATA_IN, (char *)pkt, (size_t)nread); @@ -465,7 +497,7 @@ static CURLcode mqtt_read_publish(struct connectdata *conn, if(!mq->npacket) /* no more PUBLISH payload, back to subscribe wait state */ - mqtt->state = MQTT_SUBWAIT; + mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT); break; } default: @@ -481,7 +513,6 @@ static CURLcode mqtt_do(struct connectdata *conn, bool *done) { CURLcode result = CURLE_OK; struct Curl_easy *data = conn->data; - struct mqtt_conn *mqtt = &conn->proto.mqtt; *done = FALSE; /* unconditionally */ @@ -490,7 +521,7 @@ static CURLcode mqtt_do(struct connectdata *conn, bool *done) failf(data, "Error %d sending MQTT CONN request", result); return result; } - mqtt->state = MQTT_CONNACK; + mqstate(conn, MQTT_FIRST, MQTT_CONNACK); return CURLE_OK; } @@ -500,6 +531,10 @@ static CURLcode mqtt_doing(struct connectdata *conn, bool *done) struct mqtt_conn *mqtt = &conn->proto.mqtt; struct Curl_easy *data = conn->data; struct MQTT *mq = data->req.protop; + ssize_t nread; + curl_socket_t sockfd = conn->sock[FIRSTSOCKET]; + unsigned char *pkt = (unsigned char *)data->state.buffer; + unsigned char byte; *done = FALSE; @@ -512,7 +547,41 @@ static CURLcode mqtt_doing(struct connectdata *conn, bool *done) return result; } + infof(data, "mqtt_doing: state [%d]\n", (int) mqtt->state); switch(mqtt->state) { + case MQTT_FIRST: + /* Read the initial byte only */ + result = Curl_read(conn, sockfd, (char *)&mq->firstbyte, 1, &nread); + if(result) + break; + if(data->set.verbose) + Curl_debug(data, CURLINFO_HEADER_IN, (char *)&mq->firstbyte, 1); + /* remember the first byte */ + mq->npacket = 0; + mqstate(conn, MQTT_REMAINING_LENGTH, MQTT_NOSTATE); + /* FALLTHROUGH */ + case MQTT_REMAINING_LENGTH: + do { + result = Curl_read(conn, sockfd, (char *)&byte, 1, &nread); + if(result) + break; + if(data->set.verbose) + Curl_debug(data, CURLINFO_HEADER_IN, (char *)&byte, 1); + pkt[mq->npacket++] = byte; + } while((byte & 0x80) && (mq->npacket < 4)); + mq->remaining_length = mqtt_decode_len(&pkt[0], mq->npacket, NULL); + mq->npacket = 0; + if(mq->remaining_length) { + mqstate(conn, mqtt->nextstate, MQTT_NOSTATE); + break; + } + mqstate(conn, MQTT_FIRST, MQTT_FIRST); + + if(mq->firstbyte == MQTT_MSG_DISCONNECT) { + infof(data, "Got DISCONNECT\n"); + *done = TRUE; + } + break; case MQTT_CONNACK: result = mqtt_verify_connack(conn); if(result) @@ -524,24 +593,19 @@ static CURLcode mqtt_doing(struct connectdata *conn, bool *done) result = mqtt_disconnect(conn); *done = TRUE; } + mqtt->nextstate = MQTT_FIRST; } else { result = mqtt_subscribe(conn); - if(!result) - mqtt->state = MQTT_SUBACK; + if(!result) { + mqstate(conn, MQTT_FIRST, MQTT_SUBACK); + } } break; case MQTT_SUBACK: - result = mqtt_verify_suback(conn); - if(result) - break; - - mqtt->state = MQTT_SUBWAIT; - break; - - case MQTT_SUBWAIT: - case MQTT_SUB_REMAIN: + case MQTT_PUBWAIT: + case MQTT_PUB_REMAIN: result = mqtt_read_publish(conn, done); if(result) break; diff --git a/lib/mqtt.h b/lib/mqtt.h index b5e447be5..155fbd60a 100644 --- a/lib/mqtt.h +++ b/lib/mqtt.h @@ -26,13 +26,22 @@ extern const struct Curl_handler Curl_handler_mqtt; #endif +enum mqttstate { + MQTT_FIRST, /* 0 */ + MQTT_REMAINING_LENGTH, /* 1 */ + MQTT_CONNACK, /* 2 */ + MQTT_SUBACK, /* 3 */ + MQTT_SUBACK_COMING, /* 4 - the SUBACK remainder */ + MQTT_PUBWAIT, /* 5 - wait for publish */ + MQTT_PUB_REMAIN, /* 6 - wait for the remainder of the publish */ + + MQTT_NOSTATE = 99 /* never an actual state */ +}; + struct mqtt_conn { - enum { - MQTT_CONNACK, - MQTT_SUBACK, - MQTT_SUBWAIT, /* wait for subscribe response */ - MQTT_SUB_REMAIN /* wait for the remainder of the subscribe response */ - } state; + enum mqttstate state; + enum mqttstate nextstate; /* switch to this after remaining length is + done */ unsigned int packetid; }; @@ -41,9 +50,10 @@ struct MQTT { char *sendleftovers; size_t nsend; /* size of sendleftovers */ - /* when receving a PUBLISH */ + /* when receving */ size_t npacket; /* byte counter */ unsigned char firstbyte; + size_t remaining_length; }; #endif /* HEADER_CURL_MQTT_H */ diff --git a/tests/data/Makefile.inc b/tests/data/Makefile.inc index 099940410..425a0c02f 100644 --- a/tests/data/Makefile.inc +++ b/tests/data/Makefile.inc @@ -139,7 +139,7 @@ test1160 test1161 test1162 test1163 test1164 test1165 test1166 test1167 \ \ test1170 test1171 test1172 test1173 test1174 test1175 test1176 test1177 \ \ -test1190 test1191 test1192 test1193 \ +test1190 test1191 test1192 test1193 test1194 test1195 test1196 \ \ test1200 test1201 test1202 test1203 test1204 test1205 test1206 test1207 \ test1208 test1209 test1210 test1211 test1212 test1213 test1214 test1215 \ diff --git a/tests/data/test1190 b/tests/data/test1190 index 491f2b843..007a15013 100644 --- a/tests/data/test1190 +++ b/tests/data/test1190 @@ -46,7 +46,7 @@ s/^(.* 00044d5154540402003c000c6375726c).*/$1/ </strippart> <protocol> client CONNECT 18 00044d5154540402003c000c6375726c -server CONACK 2 20020000 +server CONNACK 2 20020000 client SUBSCRIBE 9 000100043131393000 server SUBACK 3 9003000100 server PUBLISH c 300c00043131393068656c6c6f0a diff --git a/tests/data/test1191 b/tests/data/test1191 index fc8c68bb2..a36bc3113 100644 --- a/tests/data/test1191 +++ b/tests/data/test1191 @@ -42,7 +42,7 @@ s/^(.* 00044d5154540402003c000c6375726c).*/$1/ </strippart> <protocol> client CONNECT 18 00044d5154540402003c000c6375726c -server CONACK 2 20020000 +server CONNACK 2 20020000 client PUBLISH f 000431313931736f6d657468696e67 client DISCONNECT 0 e000 </protocol> diff --git a/tests/data/test1192 b/tests/data/test1192 index 92b96c3fd..691c7783f 100644 --- a/tests/data/test1192 +++ b/tests/data/test1192 @@ -46,7 +46,7 @@ s/^(.* 00044d5154540402003c000c6375726c).*/$1/ </strippart> <protocol> client CONNECT 18 00044d5154540402003c000c6375726c -server CONACK 2 20020000 +server CONNACK 2 20020000 client SUBSCRIBE 80af3131393200 server SUBACK 3 9003000100 server PUBLISH 80d 308df3131393268656c6c6f0a diff --git a/tests/data/test1193 b/tests/data/test1193 index 479ed5fe3..8da9abb21 100644 --- a/tests/data/test1193 +++ b/tests/data/test1193 @@ -64,7 +64,7 @@ s/^(.* 00044d5154540402003c000c6375726c).*/$1/ </strippart> <protocol> client CONNECT 18 00044d5154540402003c000c6375726c -server CONACK 2 20020000 +server CONNACK 2 20020000 client PUBLISH 7c2 000431313933313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839303132333435363738393031323334353637383930313233343536373839 client DISCONNECT 0 e000 </protocol> diff --git a/tests/data/test1194 b/tests/data/test1194 new file mode 100644 index 000000000..497891add --- /dev/null +++ b/tests/data/test1194 @@ -0,0 +1,59 @@ +<testcase> +<info> +<keywords> +MQTT +MQTT SUBSCRIBE +</keywords> +</info> + +# +# Server-side +<reply> +<data nocheck="yes"> +hello +</data> +<datacheck hex="yes"> +00 04 31 31 39 30 68 65 6c 6c 6f 5b 4c 46 5d 0a +</datacheck> +<servercmd> +PUBLISH-before-SUBACK TRUE +</servercmd> +</reply> + +# +# Client-side +<client> +<features> +mqtt +</features> +<server> +mqtt +</server> +<name> +MQTT SUBSCRIBE with PUBLISH befoire SUBACK +</name> +<command option="binary-trace"> +mqtt://%HOSTIP:%MQTTPORT/1194 +</command> +</client> + +# +# Verify data after the test has been "shot" +<verify> +# These are hexadecimal protocol dumps from the client +# +# Strip out the random part of the client id from the CONNECT message +# before comparison +<strippart> +s/^(.* 00044d5154540402003c000c6375726c).*/$1/ +</strippart> +<protocol> +client CONNECT 18 00044d5154540402003c000c6375726c +server CONNACK 2 20020000 +client SUBSCRIBE 9 000100043131393400 +server PUBLISH c 300c00043131393468656c6c6f0a +server SUBACK 3 9003000100 +server DISCONNECT 0 e000 +</protocol> +</verify> +</testcase> diff --git a/tests/data/test1195 b/tests/data/test1195 new file mode 100644 index 000000000..0dfaccd53 --- /dev/null +++ b/tests/data/test1195 @@ -0,0 +1,63 @@ +<testcase> +<info> +<keywords> +MQTT +MQTT SUBSCRIBE +</keywords> +</info> + +# +# Server-side +<reply> +<data nocheck="yes"> +hello +</data> +<datacheck hex="yes"> +00 04 31 31 39 30 68 65 6c 6c 6f 5b 4c 46 5d 0a +</datacheck> +<servercmd> +PUBLISH-before-SUBACK TRUE +short-PUBLISH TRUE +</servercmd> +</reply> + +# +# Client-side +<client> +<features> +mqtt +</features> +<server> +mqtt +</server> +<name> +MQTT SUBSCRIBE with short PUBLISH +</name> +<command option="binary-trace"> +mqtt://%HOSTIP:%MQTTPORT/1195 +</command> +</client> + +# +# Verify data after the test has been "shot" +<verify> +# These are hexadecimal protocol dumps from the client +# +# Strip out the random part of the client id from the CONNECT message +# before comparison +<strippart> +s/^(.* 00044d5154540402003c000c6375726c).*/$1/ +</strippart> +<protocol> +client CONNECT 18 00044d5154540402003c000c6375726c +server CONNACK 2 20020000 +client SUBSCRIBE 9 000100043131393500 +server PUBLISH c 300c00043131393568656c6c +</protocol> + +# 18 is CURLE_PARTIAL_FILE +<errorcode> +18 +</errorcode> +</verify> +</testcase> diff --git a/tests/data/test1196 b/tests/data/test1196 new file mode 100644 index 000000000..c07efd927 --- /dev/null +++ b/tests/data/test1196 @@ -0,0 +1,62 @@ +<testcase> +<info> +<keywords> +MQTT +MQTT SUBSCRIBE +</keywords> +</info> + +# +# Server-side +<reply> +<data nocheck="yes"> +hello +</data> +<datacheck hex="yes"> +00 04 31 31 39 30 68 65 6c 6c 6f 5b 4c 46 5d 0a +</datacheck> + +# error 1 - "Connection Refused, unacceptable protocol version" +<servercmd> +error-CONNACK 1 +</servercmd> +</reply> + +# +# Client-side +<client> +<features> +mqtt +</features> +<server> +mqtt +</server> +<name> +MQTT with error in CONNACK +</name> +<command option="binary-trace"> +mqtt://%HOSTIP:%MQTTPORT/1196 +</command> +</client> + +# +# Verify data after the test has been "shot" +<verify> +# These are hexadecimal protocol dumps from the client +# +# Strip out the random part of the client id from the CONNECT message +# before comparison +<strippart> +s/^(.* 00044d5154540402003c000c6375726c).*/$1/ +</strippart> +<protocol> +client CONNECT 18 00044d5154540402003c000c6375726c +server CONNACK 2 20020001 +</protocol> + +# 8 is CURLE_WEIRD_SERVER_REPLY +<errorcode> +8 +</errorcode> +</verify> +</testcase> diff --git a/tests/server/mqttd.c b/tests/server/mqttd.c index db5723cdd..6785b0014 100644 --- a/tests/server/mqttd.c +++ b/tests/server/mqttd.c @@ -104,6 +104,10 @@ struct configurable { unsigned char version; /* initial version byte in the request must match this */ + bool publish_before_suback; + bool short_publish; + unsigned char error_connack; + int testnum; }; #define REQUEST_DUMP "log/server.input" @@ -124,6 +128,10 @@ static void resetdefaults(void) { logmsg("Reset to defaults"); config.version = CONFIG_VERSION; + config.publish_before_suback = FALSE; + config.short_publish = FALSE; + config.error_connack = 0; + config.testnum = 0; } static unsigned char byteval(char *value) @@ -147,10 +155,29 @@ static void getconfig(void) config.version = byteval(value); logmsg("version [%d] set", config.version); } + else if(!strcmp(key, "PUBLISH-before-SUBACK")) { + logmsg("PUBLISH-before-SUBACK set"); + config.publish_before_suback = TRUE; + } + else if(!strcmp(key, "short-PUBLISH")) { + logmsg("short-PUBLISH set"); + config.short_publish = TRUE; + } + else if(!strcmp(key, "error-CONNACK")) { + config.error_connack = byteval(value); + logmsg("error-CONNACK = %d", config.error_connack); + } + else if(!strcmp(key, "Testnum")) { + config.testnum = atoi(value); + logmsg("testnum = %d", config.testnum); + } } } fclose(fp); } + else { + logmsg("No config file '%s' to read", configfile); + } } static void loghex(unsigned char *buffer, ssize_t len) @@ -209,11 +236,17 @@ static int connack(FILE *dump, curl_socket_t fd) MQTT_MSG_CONNACK, 0x02, 0x00, 0x00 }; - ssize_t rc = swrite(fd, (char *)packet, sizeof(packet)); - if(rc == sizeof(packet)) { - logmsg("WROTE %d bytes [CONACK]", rc); + ssize_t rc; + + packet[3] = config.error_connack; + + rc = swrite(fd, (char *)packet, sizeof(packet)); + if(rc > 0) { + logmsg("WROTE %d bytes [CONNACK]", rc); loghex(packet, rc); - logprotocol(FROM_SERVER, "CONACK", 2, dump, packet, sizeof(packet)); + logprotocol(FROM_SERVER, "CONNACK", 2, dump, packet, sizeof(packet)); + } + if(rc == sizeof(packet)) { return 0; } return 1; @@ -360,6 +393,7 @@ static int publish(FILE *dump, size_t payloadindex; ssize_t remaininglength = topiclen + 2 + payloadlen; ssize_t packetlen; + ssize_t sendamount; ssize_t rc; char rembuffer[4]; int encodedlen; @@ -385,13 +419,18 @@ static int publish(FILE *dump, payloadindex = 3 + topiclen + encodedlen; memcpy(&packet[payloadindex], payload, payloadlen); - rc = swrite(fd, (char *)packet, packetlen); - if(rc == packetlen) { + sendamount = packetlen; + if(config.short_publish) + sendamount -= 2; + + rc = swrite(fd, (char *)packet, sendamount); + if(rc > 0) { logmsg("WROTE %d bytes [PUBLISH]", rc); loghex(packet, rc); logprotocol(FROM_SERVER, "PUBLISH", remaininglength, dump, packet, rc); - return 0; } + if(rc == packetlen) + return 0; return 1; } @@ -459,6 +498,11 @@ static curl_socket_t mqttit(curl_socket_t fd) getconfig(); + testno = config.testnum; + + if(testno) + logmsg("Found test number %ld", testno); + do { /* get the fixed header */ rc = fixedheader(fd, &byte, &remaining_length, &bytes); @@ -506,8 +550,10 @@ static curl_socket_t mqttit(curl_socket_t fd) } } else if(byte == MQTT_MSG_SUBSCRIBE) { - char *testnop; - + FILE *stream; + int error; + char *data; + size_t datalen; logprotocol(FROM_CLIENT, "SUBSCRIBE", remaining_length, dump, buffer, rc); logmsg("Incoming SUBSCRIBE"); @@ -533,26 +579,25 @@ static curl_socket_t mqttit(curl_socket_t fd) /* there's a QoS byte (two bits) after the topic */ logmsg("SUBSCRIBE to '%s' [%d]", topic, packet_id); - if(suback(dump, fd, packet_id)) { - logmsg("failed sending SUBACK"); - goto end; - } - testnop = strrchr(topic, '/'); - if(!testnop) - testnop = topic; - else - testnop++; /* pass the slash */ - testno = strtol(testnop, NULL, 10); - if(testno) { - FILE *stream; - int error; - char *data; - size_t datalen; - logmsg("Found test number %ld", testno); - stream = test2fopen(testno); - error = getpart(&data, &datalen, "reply", "data", stream); - if(!error) - publish(dump, fd, packet_id, topic, data, datalen); + stream = test2fopen(testno); + error = getpart(&data, &datalen, "reply", "data", stream); + if(!error) { + if(!config.publish_before_suback) { + if(suback(dump, fd, packet_id)) { + logmsg("failed sending SUBACK"); + goto end; + } + } + if(publish(dump, fd, packet_id, topic, data, datalen)) { + logmsg("PUBLISH failed"); + goto end; + } + if(config.publish_before_suback) { + if(suback(dump, fd, packet_id)) { + logmsg("failed sending SUBACK"); + goto end; + } + } } else { char *def = (char *)"this is random payload yes yes it is"; |