diff options
author | Bjorn Stenberg <bjorn@haxx.se> | 2020-04-14 11:19:12 +0200 |
---|---|---|
committer | Daniel Stenberg <daniel@haxx.se> | 2020-04-14 13:03:40 +0200 |
commit | 2522903b792ac5a802f780df60dc4647c58e2477 (patch) | |
tree | 4ccf24997d616fce58a798cbb3bcad3557e976db /lib | |
parent | 8909865191072b6fc3e040ab0caccc2ec09d8763 (diff) |
mqtt: add new experimental protocol
Closes #5173
Diffstat (limited to 'lib')
-rw-r--r-- | lib/Makefile.inc | 101 | ||||
-rw-r--r-- | lib/curl_config.h.cmake | 3 | ||||
-rw-r--r-- | lib/mqtt.c | 561 | ||||
-rw-r--r-- | lib/mqtt.h | 49 | ||||
-rw-r--r-- | lib/url.c | 5 | ||||
-rw-r--r-- | lib/urldata.h | 3 | ||||
-rw-r--r-- | lib/version.c | 3 |
7 files changed, 672 insertions, 53 deletions
diff --git a/lib/Makefile.inc b/lib/Makefile.inc index 46ded90bb..e3cf41891 100644 --- a/lib/Makefile.inc +++ b/lib/Makefile.inc @@ -20,71 +20,66 @@ # ########################################################################### -LIB_VAUTH_CFILES = vauth/vauth.c vauth/cleartext.c vauth/cram.c \ - vauth/digest.c vauth/digest_sspi.c vauth/krb5_gssapi.c \ - vauth/krb5_sspi.c vauth/ntlm.c vauth/ntlm_sspi.c vauth/oauth2.c \ - vauth/spnego_gssapi.c vauth/spnego_sspi.c +LIB_VAUTH_CFILES = vauth/cleartext.c vauth/cram.c vauth/digest.c \ + vauth/digest_sspi.c vauth/krb5_gssapi.c vauth/krb5_sspi.c vauth/ntlm.c \ + vauth/ntlm_sspi.c vauth/oauth2.c vauth/spnego_gssapi.c vauth/spnego_sspi.c \ + vauth/vauth.c -LIB_VAUTH_HFILES = vauth/vauth.h vauth/digest.h vauth/ntlm.h +LIB_VAUTH_HFILES = vauth/digest.h vauth/ntlm.h vauth/vauth.h -LIB_VTLS_CFILES = vtls/openssl.c vtls/gtls.c vtls/vtls.c vtls/nss.c \ - vtls/mbedtls_threadlock.c vtls/wolfssl.c vtls/schannel.c \ - vtls/schannel_verify.c vtls/sectransp.c vtls/gskit.c vtls/mbedtls.c \ - vtls/mesalink.c vtls/bearssl.c +LIB_VTLS_CFILES = vtls/bearssl.c vtls/gskit.c vtls/gtls.c vtls/mbedtls.c \ + vtls/mbedtls_threadlock.c vtls/mesalink.c vtls/nss.c vtls/openssl.c \ + vtls/schannel.c vtls/schannel_verify.c vtls/sectransp.c vtls/vtls.c \ + vtls/wolfssl.c -LIB_VTLS_HFILES = vtls/openssl.h vtls/vtls.h vtls/gtls.h vtls/nssg.h \ - vtls/mbedtls_threadlock.h vtls/wolfssl.h vtls/schannel.h \ - vtls/sectransp.h vtls/gskit.h vtls/mbedtls.h vtls/mesalink.h \ - vtls/bearssl.h +LIB_VTLS_HFILES = vtls/bearssl.h vtls/gskit.h vtls/gtls.h vtls/mbedtls.h \ + vtls/mbedtls_threadlock.h vtls/mesalink.h vtls/nssg.h vtls/openssl.h \ + vtls/schannel.h vtls/sectransp.h vtls/vtls.h vtls/wolfssl.h LIB_VQUIC_CFILES = vquic/ngtcp2.c vquic/quiche.c LIB_VQUIC_HFILES = vquic/ngtcp2.h vquic/quiche.h -LIB_VSSH_CFILES = vssh/libssh2.c vssh/libssh.c vssh/wolfssh.c +LIB_VSSH_CFILES = vssh/libssh.c vssh/libssh2.c vssh/wolfssh.c LIB_VSSH_HFILES = vssh/ssh.h -LIB_CFILES = file.c timeval.c base64.c hostip.c progress.c formdata.c \ - cookie.c http.c sendf.c ftp.c url.c dict.c if2ip.c speedcheck.c \ - ldap.c version.c getenv.c escape.c mprintf.c telnet.c netrc.c \ - getinfo.c transfer.c strcase.c easy.c security.c curl_fnmatch.c \ - fileinfo.c ftplistparser.c wildcard.c krb5.c memdebug.c http_chunks.c \ - strtok.c connect.c llist.c hash.c multi.c content_encoding.c share.c \ - http_digest.c md4.c md5.c http_negotiate.c inet_pton.c strtoofft.c \ - strerror.c amigaos.c hostasyn.c hostip4.c hostip6.c hostsyn.c \ - inet_ntop.c parsedate.c select.c tftp.c splay.c strdup.c socks.c \ - curl_addrinfo.c socks_gssapi.c socks_sspi.c \ - curl_sspi.c slist.c nonblock.c curl_memrchr.c imap.c pop3.c smtp.c \ - pingpong.c rtsp.c curl_threads.c warnless.c hmac.c curl_rtmp.c \ - openldap.c curl_gethostname.c gopher.c idn_win32.c \ - http_proxy.c non-ascii.c asyn-ares.c asyn-thread.c curl_gssapi.c \ - http_ntlm.c curl_ntlm_wb.c curl_ntlm_core.c curl_sasl.c rand.c \ - curl_multibyte.c hostcheck.c conncache.c dotdot.c \ - x509asn1.c http2.c smb.c curl_endian.c curl_des.c system_win32.c \ - mime.c sha256.c setopt.c curl_path.c curl_ctype.c curl_range.c psl.c \ - doh.c urlapi.c curl_get_line.c altsvc.c socketpair.c rename.c +LIB_CFILES = altsvc.c amigaos.c asyn-ares.c asyn-thread.c base64.c \ + conncache.c connect.c content_encoding.c cookie.c curl_addrinfo.c \ + curl_ctype.c curl_des.c curl_endian.c curl_fnmatch.c curl_get_line.c \ + curl_gethostname.c curl_gssapi.c curl_memrchr.c curl_multibyte.c \ + curl_ntlm_core.c curl_ntlm_wb.c curl_path.c curl_range.c curl_rtmp.c \ + curl_sasl.c curl_sspi.c curl_threads.c dict.c dotdot.c easy.c escape.c \ + file.c fileinfo.c formdata.c ftp.c url.c ftplistparser.c getenv.c getinfo.c \ + gopher.c hash.c hmac.c hostasyn.c hostcheck.c hostip.c hostip4.c hostip6.c \ + hostsyn.c http.c http2.c http_chunks.c http_digest.c http_negotiate.c \ + http_ntlm.c http_proxy.c idn_win32.c if2ip.c imap.c inet_ntop.c inet_pton.c \ + krb5.c ldap.c llist.c md4.c md5.c memdebug.c mime.c mprintf.c mqtt.c \ + multi.c netrc.c non-ascii.c nonblock.c openldap.c parsedate.c pingpong.c \ + pop3.c progress.c psl.c doh.c rand.c rename.c rtsp.c security.c select.c \ + sendf.c setopt.c sha256.c share.c slist.c smb.c smtp.c socketpair.c socks.c \ + socks_gssapi.c socks_sspi.c speedcheck.c splay.c strcase.c strdup.c \ + strerror.c strtok.c strtoofft.c system_win32.c telnet.c tftp.c timeval.c \ + transfer.c urlapi.c version.c warnless.c wildcard.c x509asn1.c -LIB_HFILES = arpa_telnet.h netrc.h file.h timeval.h hostip.h progress.h \ - formdata.h cookie.h http.h sendf.h ftp.h url.h dict.h if2ip.h \ - speedcheck.h urldata.h curl_ldap.h escape.h telnet.h getinfo.h \ - strcase.h curl_sec.h memdebug.h http_chunks.h curl_fnmatch.h \ - wildcard.h fileinfo.h ftplistparser.h strtok.h connect.h llist.h \ - hash.h content_encoding.h share.h curl_md4.h curl_md5.h http_digest.h \ - http_negotiate.h inet_pton.h amigaos.h strtoofft.h strerror.h \ - inet_ntop.h curlx.h curl_memory.h curl_setup.h transfer.h select.h \ - easyif.h multiif.h parsedate.h tftp.h sockaddr.h splay.h strdup.h \ - socks.h curl_base64.h curl_addrinfo.h curl_sspi.h \ - slist.h nonblock.h curl_memrchr.h imap.h pop3.h smtp.h pingpong.h \ - rtsp.h curl_threads.h warnless.h curl_hmac.h curl_rtmp.h \ - curl_gethostname.h gopher.h http_proxy.h non-ascii.h asyn.h \ - http_ntlm.h curl_gssapi.h curl_ntlm_wb.h curl_ntlm_core.h \ - curl_sasl.h curl_multibyte.h hostcheck.h conncache.h \ - curl_setup_once.h multihandle.h setup-vms.h dotdot.h \ - x509asn1.h http2.h sigpipe.h smb.h curl_endian.h curl_des.h \ - curl_printf.h system_win32.h rand.h mime.h curl_sha256.h setopt.h \ - curl_path.h curl_ctype.h curl_range.h psl.h doh.h urlapi-int.h \ - curl_get_line.h altsvc.h quic.h socketpair.h rename.h +LIB_HFILES = altsvc.h amigaos.h arpa_telnet.h asyn.h conncache.h connect.h \ + content_encoding.h cookie.h curl_addrinfo.h curl_base64.h curl_ctype.h \ + curl_des.h curl_endian.h curl_fnmatch.h curl_get_line.h curl_gethostname.h \ + curl_gssapi.h curl_hmac.h curl_ldap.h curl_md4.h curl_md5.h curl_memory.h \ + curl_memrchr.h curl_multibyte.h curl_ntlm_core.h curl_ntlm_wb.h curl_path.h \ + curl_printf.h curl_range.h curl_rtmp.h curl_sasl.h curl_sec.h curl_setup.h \ + curl_setup_once.h curl_sha256.h curl_sspi.h curl_threads.h curlx.h dict.h \ + dotdot.h easyif.h escape.h file.h fileinfo.h formdata.h ftp.h url.h \ + ftplistparser.h getinfo.h gopher.h hash.h hostcheck.h hostip.h http.h \ + http2.h http_chunks.h http_digest.h http_negotiate.h http_ntlm.h \ + http_proxy.h if2ip.h imap.h inet_ntop.h inet_pton.h llist.h memdebug.h \ + mime.h mqtt.h multihandle.h multiif.h netrc.h non-ascii.h nonblock.h \ + parsedate.h pingpong.h pop3.h progress.h psl.h doh.h quic.h rand.h rename.h \ + rtsp.h select.h sendf.h setopt.h setup-vms.h share.h sigpipe.h slist.h \ + smb.h smtp.h sockaddr.h socketpair.h socks.h speedcheck.h splay.h strcase.h \ + strdup.h strerror.h strtok.h strtoofft.h system_win32.h telnet.h tftp.h \ + timeval.h transfer.h urlapi-int.h urldata.h warnless.h wildcard.h \ + x509asn1.h LIB_RCFILES = libcurl.rc diff --git a/lib/curl_config.h.cmake b/lib/curl_config.h.cmake index 24b693eec..57a86e50a 100644 --- a/lib/curl_config.h.cmake +++ b/lib/curl_config.h.cmake @@ -63,6 +63,9 @@ /* to disable LDAPS */ #cmakedefine CURL_DISABLE_LDAPS 1 +/* to enable MQTT */ +#undef CURL_ENABLE_MQTT + /* to disable POP3 */ #cmakedefine CURL_DISABLE_POP3 1 diff --git a/lib/mqtt.c b/lib/mqtt.c new file mode 100644 index 000000000..3e244694d --- /dev/null +++ b/lib/mqtt.c @@ -0,0 +1,561 @@ +/*************************************************************************** + * _ _ ____ _ + * Project ___| | | | _ \| | + * / __| | | | |_) | | + * | (__| |_| | _ <| |___ + * \___|\___/|_| \_\_____| + * + * Copyright (C) 2020, Daniel Stenberg, <daniel@haxx.se>, et al. + * Copyright (C) 2019, Björn Stenberg, <bjorn@haxx.se> + * + * This software is licensed as described in the file COPYING, which + * you should have received as part of this distribution. The terms + * are also available at https://curl.haxx.se/docs/copyright.html. + * + * You may opt to use, copy, modify, merge, publish, distribute and/or sell + * copies of the Software, and permit persons to whom the Software is + * furnished to do so, under the terms of the COPYING file. + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY + * KIND, either express or implied. + * + ***************************************************************************/ + +#include "curl_setup.h" + +#ifdef CURL_ENABLE_MQTT + +#include "urldata.h" +#include <curl/curl.h> +#include "transfer.h" +#include "sendf.h" +#include "progress.h" +#include "mqtt.h" +#include "select.h" +#include "strdup.h" +#include "url.h" +#include "escape.h" +#include "warnless.h" +#include "curl_printf.h" +#include "curl_memory.h" +#include "multiif.h" +#include "rand.h" + +/* The last #include file should be: */ +#include "memdebug.h" + +#define MQTT_MSG_CONNECT 0x10 +#define MQTT_MSG_CONNACK 0x20 +#define MQTT_MSG_PUBLISH 0x30 +#define MQTT_MSG_SUBSCRIBE 0x82 +#define MQTT_MSG_SUBACK 0x90 +#define MQTT_MSG_DISCONNECT 0xe0 + +#define MQTT_CONNACK_LEN 4 +#define MQTT_SUBACK_LEN 5 +#define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */ + +/* + * Forward declarations. + */ + +static CURLcode mqtt_do(struct connectdata *conn, bool *done); +static CURLcode mqtt_doing(struct connectdata *conn, bool *done); +static int mqtt_getsock(struct connectdata *conn, curl_socket_t *sock); +static CURLcode mqtt_setup_conn(struct connectdata *conn); + +/* + * MQTT protocol handler. + */ + +const struct Curl_handler Curl_handler_mqtt = { + "MQTT", /* scheme */ + mqtt_setup_conn, /* setup_connection */ + mqtt_do, /* do_it */ + ZERO_NULL, /* done */ + ZERO_NULL, /* do_more */ + ZERO_NULL, /* connect_it */ + ZERO_NULL, /* connecting */ + mqtt_doing, /* doing */ + ZERO_NULL, /* proto_getsock */ + mqtt_getsock, /* doing_getsock */ + ZERO_NULL, /* domore_getsock */ + ZERO_NULL, /* perform_getsock */ + ZERO_NULL, /* disconnect */ + ZERO_NULL, /* readwrite */ + ZERO_NULL, /* connection_check */ + PORT_MQTT, /* defport */ + CURLPROTO_MQTT, /* protocol */ + PROTOPT_NONE /* flags */ +}; + +static CURLcode mqtt_setup_conn(struct connectdata *conn) +{ + /* allocate the HTTP-specific struct for the Curl_easy, only to survive + during this request */ + struct MQTT *mq; + struct Curl_easy *data = conn->data; + DEBUGASSERT(data->req.protop == NULL); + + mq = calloc(1, sizeof(struct MQTT)); + if(!mq) + return CURLE_OUT_OF_MEMORY; + data->req.protop = mq; + return CURLE_OK; +} + +static CURLcode mqtt_send(struct connectdata *conn, + char *buf, size_t len) +{ + CURLcode result = CURLE_OK; + curl_socket_t sockfd = conn->sock[FIRSTSOCKET]; + struct Curl_easy *data = conn->data; + struct MQTT *mq = data->req.protop; + ssize_t n; + result = Curl_write(conn, sockfd, buf, len, &n); + if(!result && data->set.verbose) + Curl_debug(data, CURLINFO_HEADER_OUT, buf, (size_t)n); + if(len != (size_t)n) { + size_t nsend = len - n; + char *sendleftovers = Curl_memdup(&buf[n], nsend); + if(!sendleftovers) + return CURLE_OUT_OF_MEMORY; + mq->sendleftovers = sendleftovers; + mq->nsend = nsend; + } + return result; +} + +/* Generic function called by the multi interface to figure out what socket(s) + to wait for and for what actions during the DOING and PROTOCONNECT + states */ +static int mqtt_getsock(struct connectdata *conn, + curl_socket_t *sock) +{ + sock[0] = conn->sock[FIRSTSOCKET]; + return GETSOCK_READSOCK(FIRSTSOCKET); +} + +static CURLcode mqtt_connect(struct connectdata *conn) +{ + CURLcode result = CURLE_OK; + const size_t client_id_offset = 14; + const size_t packetlen = client_id_offset + MQTT_CLIENTID_LEN; + char client_id[MQTT_CLIENTID_LEN + 1] = "curl"; + const size_t curl_len = strlen("curl"); + char packet[32] = { + MQTT_MSG_CONNECT, /* packet type */ + 0x00, /* remaining length */ + 0x00, 0x04, /* protocol length */ + 'M','Q','T','T', /* protocol name */ + 0x04, /* protocol level */ + 0x02, /* CONNECT flag: CleanSession */ + 0x00, 0x3c, /* keep-alive 0 = disabled */ + 0x00, 0x00 /* payload1 length */ + }; + packet[1] = (packetlen - 2) & 0x7f; + packet[client_id_offset - 1] = MQTT_CLIENTID_LEN; + + result = Curl_rand_hex(conn->data, (unsigned char *)&client_id[curl_len], + MQTT_CLIENTID_LEN - curl_len + 1); + memcpy(&packet[client_id_offset], client_id, MQTT_CLIENTID_LEN); + infof(conn->data, "Using client id '%s'\n", client_id); + if(!result) + result = mqtt_send(conn, packet, packetlen); + return result; +} + +static CURLcode mqtt_disconnect(struct connectdata *conn) +{ + CURLcode result = CURLE_OK; + result = mqtt_send(conn, (char *)"\xe0\x00", 2); + return result; +} + +static CURLcode mqtt_verify_connack(struct connectdata *conn) +{ + CURLcode result; + curl_socket_t sockfd = conn->sock[FIRSTSOCKET]; + unsigned char readbuf[MQTT_CONNACK_LEN]; + ssize_t nread; + struct Curl_easy *data = conn->data; + + result = Curl_read(conn, sockfd, (char *)readbuf, MQTT_CONNACK_LEN, &nread); + if(result) + goto fail; + + if(data->set.verbose) + Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread); + + /* fixme */ + if(nread < MQTT_CONNACK_LEN) { + result = CURLE_WEIRD_SERVER_REPLY; + goto fail; + } + + /* verify CONNACK */ + if(readbuf[0] != MQTT_MSG_CONNACK || + readbuf[1] != 0x02 || + readbuf[2] != 0x00 || + readbuf[3] != 0x00) { + failf(data, "Expected %02x%02x%02x%02x but got %02x%02x%02x%02x", + MQTT_MSG_CONNACK, 0x02, 0x00, 0x00, + readbuf[0], readbuf[1], readbuf[2], readbuf[3]); + result = CURLE_WEIRD_SERVER_REPLY; + } + +fail: + return result; +} + +static CURLcode mqtt_get_topic(struct connectdata *conn, + char **topic, size_t *topiclen) +{ + CURLcode result = CURLE_OK; + char *path = conn->data->state.up.path; + + if(strlen(path) > 1) { + result = Curl_urldecode(conn->data, path + 1, 0, topic, topiclen, FALSE); + } + else { + failf(conn->data, "Error: No topic specified."); + result = CURLE_URL_MALFORMAT; + } + return result; +} + + +static int mqtt_encode_len(char *buf, size_t len) +{ + unsigned char encoded; + int i; + + for(i = 0; (len > 0) && (i<4); i++) { + encoded = len % 0x80; + len /= 0x80; + if(len) + encoded |= 0x80; + buf[i] = encoded; + } + + return i; +} + +static CURLcode mqtt_subscribe(struct connectdata *conn) +{ + CURLcode result = CURLE_OK; + char *topic = NULL; + size_t topiclen; + unsigned char *packet = NULL; + size_t packetlen; + char encodedsize[4]; + size_t n; + + result = mqtt_get_topic(conn, &topic, &topiclen); + if(result) + goto fail; + + conn->proto.mqtt.packetid++; + + packetlen = topiclen + 5; /* packetid + topic (has a two byte length field) + + 2 bytes topic length + QoS byte */ + n = mqtt_encode_len((char *)encodedsize, packetlen); + packetlen += n + 1; /* add one for the control packet type byte */ + + packet = malloc(packetlen); + if(!packet) { + result = CURLE_OUT_OF_MEMORY; + goto fail; + } + + packet[0] = MQTT_MSG_SUBSCRIBE; + memcpy(&packet[1], encodedsize, n); + packet[1 + n] = (conn->proto.mqtt.packetid >> 8) & 0xff; + packet[2 + n] = conn->proto.mqtt.packetid & 0xff; + packet[3 + n] = (topiclen >> 8) & 0xff; + packet[4 + n ] = topiclen & 0xff; + memcpy(&packet[5 + n], topic, topiclen); + packet[5 + n + topiclen] = 0; /* QoS zero */ + + result = mqtt_send(conn, (char *)packet, packetlen); + +fail: + free(topic); + free(packet); + return result; +} + +static CURLcode mqtt_verify_suback(struct connectdata *conn) +{ + CURLcode result; + curl_socket_t sockfd = conn->sock[FIRSTSOCKET]; + unsigned char readbuf[MQTT_SUBACK_LEN]; + ssize_t nread; + struct mqtt_conn *mqtt = &conn->proto.mqtt; + + result = Curl_read(conn, sockfd, (char *)readbuf, MQTT_SUBACK_LEN, &nread); + if(result) + goto fail; + + if(conn->data->set.verbose) + Curl_debug(conn->data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread); + + /* fixme */ + if(nread < MQTT_SUBACK_LEN) { + result = CURLE_WEIRD_SERVER_REPLY; + goto fail; + } + + /* verify SUBACK */ + if(readbuf[0] != MQTT_MSG_SUBACK || + readbuf[1] != 0x03 || + readbuf[2] != ((mqtt->packetid >> 8) & 0xff) || + readbuf[3] != (mqtt->packetid & 0xff) || + readbuf[4] != 0x00) + result = CURLE_WEIRD_SERVER_REPLY; + +fail: + return result; +} + +static CURLcode mqtt_publish(struct connectdata *conn) +{ + CURLcode result; + char *payload = conn->data->set.postfields; + size_t payloadlen = (size_t)conn->data->set.postfieldsize; + char *topic = NULL; + size_t topiclen; + unsigned char *pkt = NULL; + size_t i = 0; + size_t remaininglength; + size_t encodelen; + char encodedbytes[4]; + + result = mqtt_get_topic(conn, &topic, &topiclen); + if(result) + goto fail; + + remaininglength = payloadlen + 2 + topiclen; + encodelen = mqtt_encode_len(encodedbytes, remaininglength); + + /* add the control byte and the encoded remaining length */ + pkt = malloc(remaininglength + 1 + encodelen); + if(!pkt) { + result = CURLE_OUT_OF_MEMORY; + goto fail; + } + + /* assemble packet */ + pkt[i++] = MQTT_MSG_PUBLISH; + memcpy(&pkt[i], encodedbytes, encodelen); + i += encodelen; + pkt[i++] = (topiclen >> 8) & 0xff; + pkt[i++] = (topiclen & 0xff); + memcpy(&pkt[i], topic, topiclen); + i += topiclen; + memcpy(&pkt[i], payload, payloadlen); + i += payloadlen; + result = mqtt_send(conn, (char *)pkt, i); + +fail: + free(pkt); + free(topic); + return result; +} + +static size_t mqtt_decode_len(unsigned char *buf, + size_t buflen, size_t *lenbytes) +{ + size_t len = 0; + size_t mult = 1; + size_t i; + unsigned char encoded = 128; + + for(i = 0; (i < buflen) && (encoded & 128); i++) { + encoded = buf[i]; + len += (encoded & 127) * mult; + mult *= 128; + } + + *lenbytes = i; + + return len; +} + +/* for the publish packet */ +#define MQTT_HEADER_LEN 5 /* max 5 bytes */ + +static CURLcode mqtt_read_publish(struct connectdata *conn, + bool *done) +{ + CURLcode result; + curl_socket_t sockfd = conn->sock[FIRSTSOCKET]; + ssize_t nread; + struct Curl_easy *data = conn->data; + unsigned char *pkt = (unsigned char *)data->state.buffer; + size_t remlen, lenbytes; + struct mqtt_conn *mqtt = &conn->proto.mqtt; + struct MQTT *mq = data->req.protop; + + switch(mqtt->state) { + case MQTT_SUBWAIT: + /* Read the initial byte and the entire Remaining Length field + in this state */ + result = Curl_read(conn, sockfd, (char *)&pkt[mq->npacket], 1, &nread); + if(result) + goto end; + if(data->set.verbose) + Curl_debug(data, CURLINFO_HEADER_IN, (char *)&pkt[mq->npacket], 1); + /* we are expecting a PUBLISH message */ + if(!mq->npacket && ((pkt[0] & 0xf0) != MQTT_MSG_PUBLISH)) { + if(pkt[0] == MQTT_MSG_DISCONNECT) { + infof(data, "Got DISCONNECT\n"); + *done = TRUE; + goto end; + } + result = CURLE_WEIRD_SERVER_REPLY; + goto end; + } + else if((mq->npacket >= 1) && !(pkt[mq->npacket] & 0x80)) + /* as long as the high bit is set in the length byte, we read one more + byte, then get the remainder of the PUBLISH */ + mqtt->state = MQTT_SUB_REMAIN; + mq->npacket++; + if(mqtt->state == MQTT_SUBWAIT) + return result; + + /* -- switched state -- */ + + /* remember the first byte */ + mq->firstbyte = pkt[0]; + + remlen = mqtt_decode_len(&pkt[1], 4, &lenbytes); + + infof(data, "Remaining length: %zd bytes\n", remlen); + Curl_pgrsSetDownloadSize(data, remlen); + data->req.bytecount = 0; + data->req.size = remlen; + mq->npacket = remlen; /* get this many bytes */ + /* FALLTHROUGH */ + case MQTT_SUB_REMAIN: { + /* read rest of packet, but no more. Cap to buffer size */ + struct SingleRequest *k = &data->req; + size_t rest = mq->npacket; + if(rest > (size_t)data->set.buffer_size) + rest = (size_t)data->set.buffer_size; + result = Curl_read(conn, sockfd, (char *)pkt, rest, &nread); + if(result) { + if(CURLE_AGAIN == result) { + infof(data, "EEEE AAAAGAIN\n"); + } + goto end; + } + if(data->set.verbose) + Curl_debug(data, CURLINFO_DATA_IN, (char *)pkt, (size_t)nread); + + mq->npacket -= nread; + k->bytecount += nread; + Curl_pgrsSetDownloadCounter(data, k->bytecount); + + /* if QoS is set, message contains packet id */ + + result = Curl_client_write(conn, CLIENTWRITE_BODY, (char *)pkt, nread); + if(result) + goto end; + + if(!mq->npacket) + /* no more PUBLISH payload, back to subscribe wait state */ + mqtt->state = MQTT_SUBWAIT; + break; + } + default: + DEBUGASSERT(NULL); /* illegal state */ + result = CURLE_WEIRD_SERVER_REPLY; + goto end; + } + end: + return result; +} + +static CURLcode mqtt_do(struct connectdata *conn, bool *done) +{ + CURLcode result = CURLE_OK; + struct Curl_easy *data = conn->data; + struct mqtt_conn *mqtt = &conn->proto.mqtt; + + *done = FALSE; /* unconditionally */ + + result = mqtt_connect(conn); + if(result) { + failf(data, "Error %d sending MQTT CONN request", result); + return result; + } + mqtt->state = MQTT_CONNACK; + return CURLE_OK; +} + +static CURLcode mqtt_doing(struct connectdata *conn, bool *done) +{ + CURLcode result = CURLE_OK; + struct mqtt_conn *mqtt = &conn->proto.mqtt; + struct Curl_easy *data = conn->data; + struct MQTT *mq = data->req.protop; + + *done = FALSE; + + if(mq->nsend) { + /* send the remainder of an outgoing packet */ + char *ptr = mq->sendleftovers; + result = mqtt_send(conn, mq->sendleftovers, mq->nsend); + free(ptr); + if(result) + return result; + } + + switch(mqtt->state) { + case MQTT_CONNACK: + result = mqtt_verify_connack(conn); + if(result) + break; + + if(conn->data->set.httpreq == HTTPREQ_POST) { + result = mqtt_publish(conn); + if(!result) { + result = mqtt_disconnect(conn); + *done = TRUE; + } + } + else { + result = mqtt_subscribe(conn); + if(!result) + mqtt->state = MQTT_SUBACK; + } + break; + + case MQTT_SUBACK: + result = mqtt_verify_suback(conn); + if(result) + break; + + mqtt->state = MQTT_SUBWAIT; + break; + + case MQTT_SUBWAIT: + case MQTT_SUB_REMAIN: + result = mqtt_read_publish(conn, done); + if(result) + break; + break; + + default: + failf(conn->data, "State not handled yet"); + *done = TRUE; + break; + } + + if(result == CURLE_AGAIN) + result = CURLE_OK; + return result; +} + +#endif /* CURL_ENABLE_MQTT */ diff --git a/lib/mqtt.h b/lib/mqtt.h new file mode 100644 index 000000000..b5e447be5 --- /dev/null +++ b/lib/mqtt.h @@ -0,0 +1,49 @@ +#ifndef HEADER_CURL_MQTT_H +#define HEADER_CURL_MQTT_H +/*************************************************************************** + * _ _ ____ _ + * Project ___| | | | _ \| | + * / __| | | | |_) | | + * | (__| |_| | _ <| |___ + * \___|\___/|_| \_\_____| + * + * Copyright (C) 2019 - 2020, Björn Stenberg, <bjorn@haxx.se> + * + * This software is licensed as described in the file COPYING, which + * you should have received as part of this distribution. The terms + * are also available at https://curl.haxx.se/docs/copyright.html. + * + * You may opt to use, copy, modify, merge, publish, distribute and/or sell + * copies of the Software, and permit persons to whom the Software is + * furnished to do so, under the terms of the COPYING file. + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY + * KIND, either express or implied. + * + ***************************************************************************/ + +#ifdef CURL_ENABLE_MQTT +extern const struct Curl_handler Curl_handler_mqtt; +#endif + +struct mqtt_conn { + enum { + MQTT_CONNACK, + MQTT_SUBACK, + MQTT_SUBWAIT, /* wait for subscribe response */ + MQTT_SUB_REMAIN /* wait for the remainder of the subscribe response */ + } state; + unsigned int packetid; +}; + +/* protocol-specific transfer-related data */ +struct MQTT { + char *sendleftovers; + size_t nsend; /* size of sendleftovers */ + + /* when receving a PUBLISH */ + size_t npacket; /* byte counter */ + unsigned char firstbyte; +}; + +#endif /* HEADER_CURL_MQTT_H */ @@ -114,6 +114,7 @@ bool curl_win32_idn_to_ascii(const char *in, char **out); #include "http_ntlm.h" #include "curl_rtmp.h" #include "gopher.h" +#include "mqtt.h" #include "http_proxy.h" #include "conncache.h" #include "multihandle.h" @@ -232,6 +233,10 @@ static const struct Curl_handler * const protocols[] = { &Curl_handler_gopher, #endif +#ifdef CURL_ENABLE_MQTT + &Curl_handler_mqtt, +#endif + #ifdef USE_LIBRTMP &Curl_handler_rtmp, &Curl_handler_rtmpt, diff --git a/lib/urldata.h b/lib/urldata.h index 2a36c1147..6e426a29f 100644 --- a/lib/urldata.h +++ b/lib/urldata.h @@ -49,6 +49,7 @@ #define PORT_RTMPT PORT_HTTP #define PORT_RTMPS PORT_HTTPS #define PORT_GOPHER 70 +#define PORT_MQTT 1883 #define DICT_MATCH "/MATCH:" #define DICT_MATCH2 "/M:" @@ -128,6 +129,7 @@ typedef ssize_t (Curl_recv)(struct connectdata *conn, /* connection data */ #include "http.h" #include "rtsp.h" #include "smb.h" +#include "mqtt.h" #include "wildcard.h" #include "multihandle.h" #include "quic.h" @@ -1081,6 +1083,7 @@ struct connectdata { struct smb_conn smbc; void *rtmp; struct ldapconninfo *ldapc; + struct mqtt_conn mqtt; } proto; int cselect_bits; /* bitmask of socket events */ diff --git a/lib/version.c b/lib/version.c index 4d7c2d0a3..47204e881 100644 --- a/lib/version.c +++ b/lib/version.c @@ -271,6 +271,9 @@ static const char * const protocols[] = { "ldaps", #endif #endif +#ifdef CURL_ENABLE_MQTT + "mqtt", +#endif #ifndef CURL_DISABLE_POP3 "pop3", #endif |