From 5811beba391baefad41cd6f8f4fa4e3862098813 Mon Sep 17 00:00:00 2001 From: Daniel Stenberg Date: Thu, 16 Apr 2020 13:20:52 +0200 Subject: mqtt: improve the state machine To handle PUBLISH before SUBACK and more. Updated the existing tests and added three new ones. Reported-by: Christoph Krey Bug: https://curl.haxx.se/mail/lib-2020-04/0021.html Closes #5246 --- lib/mqtt.c | 180 +++++++++++++++++++++++++++++++++++++++++-------------------- lib/mqtt.h | 24 ++++++--- 2 files changed, 139 insertions(+), 65 deletions(-) (limited to 'lib') diff --git a/lib/mqtt.c b/lib/mqtt.c index 3e244694d..35c1b3e83 100644 --- a/lib/mqtt.c +++ b/lib/mqtt.c @@ -51,8 +51,8 @@ #define MQTT_MSG_SUBACK 0x90 #define MQTT_MSG_DISCONNECT 0xe0 -#define MQTT_CONNACK_LEN 4 -#define MQTT_SUBACK_LEN 5 +#define MQTT_CONNACK_LEN 2 +#define MQTT_SUBACK_LEN 3 #define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */ /* @@ -194,13 +194,9 @@ static CURLcode mqtt_verify_connack(struct connectdata *conn) } /* verify CONNACK */ - if(readbuf[0] != MQTT_MSG_CONNACK || - readbuf[1] != 0x02 || - readbuf[2] != 0x00 || - readbuf[3] != 0x00) { - failf(data, "Expected %02x%02x%02x%02x but got %02x%02x%02x%02x", - MQTT_MSG_CONNACK, 0x02, 0x00, 0x00, - readbuf[0], readbuf[1], readbuf[2], readbuf[3]); + if(readbuf[0] != 0x00 || readbuf[1] != 0x00) { + failf(data, "Expected %02x%02x but got %02x%02x", + 0x00, 0x00, readbuf[0], readbuf[1]); result = CURLE_WEIRD_SERVER_REPLY; } @@ -285,6 +281,9 @@ fail: return result; } +/* + * Called when the first byte was already read. + */ static CURLcode mqtt_verify_suback(struct connectdata *conn) { CURLcode result; @@ -307,11 +306,9 @@ static CURLcode mqtt_verify_suback(struct connectdata *conn) } /* verify SUBACK */ - if(readbuf[0] != MQTT_MSG_SUBACK || - readbuf[1] != 0x03 || - readbuf[2] != ((mqtt->packetid >> 8) & 0xff) || - readbuf[3] != (mqtt->packetid & 0xff) || - readbuf[4] != 0x00) + if(readbuf[0] != ((mqtt->packetid >> 8) & 0xff) || + readbuf[1] != (mqtt->packetid & 0xff) || + readbuf[2] != 0x00) result = CURLE_WEIRD_SERVER_REPLY; fail: @@ -377,67 +374,97 @@ static size_t mqtt_decode_len(unsigned char *buf, mult *= 128; } - *lenbytes = i; + if(lenbytes) + *lenbytes = i; return len; } +#ifdef CURLDEBUG +static const char *statenames[]={ + "MQTT_FIRST", + "MQTT_REMAINING_LENGTH", + "MQTT_CONNACK", + "MQTT_SUBACK", + "MQTT_SUBACK_COMING", + "MQTT_PUBWAIT", + "MQTT_PUB_REMAIN" +}; +#endif + +/* The only way to change state */ +static void mqstate(struct connectdata *conn, + enum mqttstate state, + enum mqttstate nextstate) /* used if state == FIRST */ +{ + struct mqtt_conn *mqtt = &conn->proto.mqtt; +#ifdef CURLDEBUG + infof(conn->data, "%s (from %s) (next is %s)\n", + statenames[state], + statenames[mqtt->state], + (state == MQTT_FIRST)? statenames[nextstate] : ""); +#endif + mqtt->state = state; + if(state == MQTT_FIRST) + mqtt->nextstate = nextstate; +} + + /* for the publish packet */ #define MQTT_HEADER_LEN 5 /* max 5 bytes */ static CURLcode mqtt_read_publish(struct connectdata *conn, bool *done) { - CURLcode result; + CURLcode result = CURLE_OK; curl_socket_t sockfd = conn->sock[FIRSTSOCKET]; ssize_t nread; struct Curl_easy *data = conn->data; unsigned char *pkt = (unsigned char *)data->state.buffer; - size_t remlen, lenbytes; + size_t remlen; struct mqtt_conn *mqtt = &conn->proto.mqtt; struct MQTT *mq = data->req.protop; + unsigned char packet; switch(mqtt->state) { - case MQTT_SUBWAIT: - /* Read the initial byte and the entire Remaining Length field - in this state */ - result = Curl_read(conn, sockfd, (char *)&pkt[mq->npacket], 1, &nread); + MQTT_SUBACK_COMING: + case MQTT_SUBACK_COMING: + result = mqtt_verify_suback(conn); if(result) + break; + + mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT); + break; + + case MQTT_SUBACK: + case MQTT_PUBWAIT: + /* we are expecting PUBLISH or SUBACK */ + packet = mq->firstbyte & 0xf0; + if(packet == MQTT_MSG_PUBLISH) + mqstate(conn, MQTT_PUB_REMAIN, MQTT_NOSTATE); + else if(packet == MQTT_MSG_SUBACK) { + mqstate(conn, MQTT_SUBACK_COMING, MQTT_NOSTATE); + goto MQTT_SUBACK_COMING; + } + else if(packet == MQTT_MSG_DISCONNECT) { + infof(data, "Got DISCONNECT\n"); + *done = TRUE; goto end; - if(data->set.verbose) - Curl_debug(data, CURLINFO_HEADER_IN, (char *)&pkt[mq->npacket], 1); - /* we are expecting a PUBLISH message */ - if(!mq->npacket && ((pkt[0] & 0xf0) != MQTT_MSG_PUBLISH)) { - if(pkt[0] == MQTT_MSG_DISCONNECT) { - infof(data, "Got DISCONNECT\n"); - *done = TRUE; - goto end; - } + } + else { result = CURLE_WEIRD_SERVER_REPLY; goto end; } - else if((mq->npacket >= 1) && !(pkt[mq->npacket] & 0x80)) - /* as long as the high bit is set in the length byte, we read one more - byte, then get the remainder of the PUBLISH */ - mqtt->state = MQTT_SUB_REMAIN; - mq->npacket++; - if(mqtt->state == MQTT_SUBWAIT) - return result; /* -- switched state -- */ - - /* remember the first byte */ - mq->firstbyte = pkt[0]; - - remlen = mqtt_decode_len(&pkt[1], 4, &lenbytes); - + remlen = mq->remaining_length; infof(data, "Remaining length: %zd bytes\n", remlen); Curl_pgrsSetDownloadSize(data, remlen); data->req.bytecount = 0; data->req.size = remlen; mq->npacket = remlen; /* get this many bytes */ /* FALLTHROUGH */ - case MQTT_SUB_REMAIN: { + case MQTT_PUB_REMAIN: { /* read rest of packet, but no more. Cap to buffer size */ struct SingleRequest *k = &data->req; size_t rest = mq->npacket; @@ -450,6 +477,11 @@ static CURLcode mqtt_read_publish(struct connectdata *conn, } goto end; } + if(!nread) { + infof(data, "server disconnected\n"); + result = CURLE_PARTIAL_FILE; + goto end; + } if(data->set.verbose) Curl_debug(data, CURLINFO_DATA_IN, (char *)pkt, (size_t)nread); @@ -465,7 +497,7 @@ static CURLcode mqtt_read_publish(struct connectdata *conn, if(!mq->npacket) /* no more PUBLISH payload, back to subscribe wait state */ - mqtt->state = MQTT_SUBWAIT; + mqstate(conn, MQTT_FIRST, MQTT_PUBWAIT); break; } default: @@ -481,7 +513,6 @@ static CURLcode mqtt_do(struct connectdata *conn, bool *done) { CURLcode result = CURLE_OK; struct Curl_easy *data = conn->data; - struct mqtt_conn *mqtt = &conn->proto.mqtt; *done = FALSE; /* unconditionally */ @@ -490,7 +521,7 @@ static CURLcode mqtt_do(struct connectdata *conn, bool *done) failf(data, "Error %d sending MQTT CONN request", result); return result; } - mqtt->state = MQTT_CONNACK; + mqstate(conn, MQTT_FIRST, MQTT_CONNACK); return CURLE_OK; } @@ -500,6 +531,10 @@ static CURLcode mqtt_doing(struct connectdata *conn, bool *done) struct mqtt_conn *mqtt = &conn->proto.mqtt; struct Curl_easy *data = conn->data; struct MQTT *mq = data->req.protop; + ssize_t nread; + curl_socket_t sockfd = conn->sock[FIRSTSOCKET]; + unsigned char *pkt = (unsigned char *)data->state.buffer; + unsigned char byte; *done = FALSE; @@ -512,7 +547,41 @@ static CURLcode mqtt_doing(struct connectdata *conn, bool *done) return result; } + infof(data, "mqtt_doing: state [%d]\n", (int) mqtt->state); switch(mqtt->state) { + case MQTT_FIRST: + /* Read the initial byte only */ + result = Curl_read(conn, sockfd, (char *)&mq->firstbyte, 1, &nread); + if(result) + break; + if(data->set.verbose) + Curl_debug(data, CURLINFO_HEADER_IN, (char *)&mq->firstbyte, 1); + /* remember the first byte */ + mq->npacket = 0; + mqstate(conn, MQTT_REMAINING_LENGTH, MQTT_NOSTATE); + /* FALLTHROUGH */ + case MQTT_REMAINING_LENGTH: + do { + result = Curl_read(conn, sockfd, (char *)&byte, 1, &nread); + if(result) + break; + if(data->set.verbose) + Curl_debug(data, CURLINFO_HEADER_IN, (char *)&byte, 1); + pkt[mq->npacket++] = byte; + } while((byte & 0x80) && (mq->npacket < 4)); + mq->remaining_length = mqtt_decode_len(&pkt[0], mq->npacket, NULL); + mq->npacket = 0; + if(mq->remaining_length) { + mqstate(conn, mqtt->nextstate, MQTT_NOSTATE); + break; + } + mqstate(conn, MQTT_FIRST, MQTT_FIRST); + + if(mq->firstbyte == MQTT_MSG_DISCONNECT) { + infof(data, "Got DISCONNECT\n"); + *done = TRUE; + } + break; case MQTT_CONNACK: result = mqtt_verify_connack(conn); if(result) @@ -524,24 +593,19 @@ static CURLcode mqtt_doing(struct connectdata *conn, bool *done) result = mqtt_disconnect(conn); *done = TRUE; } + mqtt->nextstate = MQTT_FIRST; } else { result = mqtt_subscribe(conn); - if(!result) - mqtt->state = MQTT_SUBACK; + if(!result) { + mqstate(conn, MQTT_FIRST, MQTT_SUBACK); + } } break; case MQTT_SUBACK: - result = mqtt_verify_suback(conn); - if(result) - break; - - mqtt->state = MQTT_SUBWAIT; - break; - - case MQTT_SUBWAIT: - case MQTT_SUB_REMAIN: + case MQTT_PUBWAIT: + case MQTT_PUB_REMAIN: result = mqtt_read_publish(conn, done); if(result) break; diff --git a/lib/mqtt.h b/lib/mqtt.h index b5e447be5..155fbd60a 100644 --- a/lib/mqtt.h +++ b/lib/mqtt.h @@ -26,13 +26,22 @@ extern const struct Curl_handler Curl_handler_mqtt; #endif +enum mqttstate { + MQTT_FIRST, /* 0 */ + MQTT_REMAINING_LENGTH, /* 1 */ + MQTT_CONNACK, /* 2 */ + MQTT_SUBACK, /* 3 */ + MQTT_SUBACK_COMING, /* 4 - the SUBACK remainder */ + MQTT_PUBWAIT, /* 5 - wait for publish */ + MQTT_PUB_REMAIN, /* 6 - wait for the remainder of the publish */ + + MQTT_NOSTATE = 99 /* never an actual state */ +}; + struct mqtt_conn { - enum { - MQTT_CONNACK, - MQTT_SUBACK, - MQTT_SUBWAIT, /* wait for subscribe response */ - MQTT_SUB_REMAIN /* wait for the remainder of the subscribe response */ - } state; + enum mqttstate state; + enum mqttstate nextstate; /* switch to this after remaining length is + done */ unsigned int packetid; }; @@ -41,9 +50,10 @@ struct MQTT { char *sendleftovers; size_t nsend; /* size of sendleftovers */ - /* when receving a PUBLISH */ + /* when receving */ size_t npacket; /* byte counter */ unsigned char firstbyte; + size_t remaining_length; }; #endif /* HEADER_CURL_MQTT_H */ -- cgit v1.2.3