aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorBjorn Stenberg <bjorn@haxx.se>2020-04-14 11:19:12 +0200
committerDaniel Stenberg <daniel@haxx.se>2020-04-14 13:03:40 +0200
commit2522903b792ac5a802f780df60dc4647c58e2477 (patch)
tree4ccf24997d616fce58a798cbb3bcad3557e976db /lib
parent8909865191072b6fc3e040ab0caccc2ec09d8763 (diff)
mqtt: add new experimental protocol
Closes #5173
Diffstat (limited to 'lib')
-rw-r--r--lib/Makefile.inc101
-rw-r--r--lib/curl_config.h.cmake3
-rw-r--r--lib/mqtt.c561
-rw-r--r--lib/mqtt.h49
-rw-r--r--lib/url.c5
-rw-r--r--lib/urldata.h3
-rw-r--r--lib/version.c3
7 files changed, 672 insertions, 53 deletions
diff --git a/lib/Makefile.inc b/lib/Makefile.inc
index 46ded90bb..e3cf41891 100644
--- a/lib/Makefile.inc
+++ b/lib/Makefile.inc
@@ -20,71 +20,66 @@
#
###########################################################################
-LIB_VAUTH_CFILES = vauth/vauth.c vauth/cleartext.c vauth/cram.c \
- vauth/digest.c vauth/digest_sspi.c vauth/krb5_gssapi.c \
- vauth/krb5_sspi.c vauth/ntlm.c vauth/ntlm_sspi.c vauth/oauth2.c \
- vauth/spnego_gssapi.c vauth/spnego_sspi.c
+LIB_VAUTH_CFILES = vauth/cleartext.c vauth/cram.c vauth/digest.c \
+ vauth/digest_sspi.c vauth/krb5_gssapi.c vauth/krb5_sspi.c vauth/ntlm.c \
+ vauth/ntlm_sspi.c vauth/oauth2.c vauth/spnego_gssapi.c vauth/spnego_sspi.c \
+ vauth/vauth.c
-LIB_VAUTH_HFILES = vauth/vauth.h vauth/digest.h vauth/ntlm.h
+LIB_VAUTH_HFILES = vauth/digest.h vauth/ntlm.h vauth/vauth.h
-LIB_VTLS_CFILES = vtls/openssl.c vtls/gtls.c vtls/vtls.c vtls/nss.c \
- vtls/mbedtls_threadlock.c vtls/wolfssl.c vtls/schannel.c \
- vtls/schannel_verify.c vtls/sectransp.c vtls/gskit.c vtls/mbedtls.c \
- vtls/mesalink.c vtls/bearssl.c
+LIB_VTLS_CFILES = vtls/bearssl.c vtls/gskit.c vtls/gtls.c vtls/mbedtls.c \
+ vtls/mbedtls_threadlock.c vtls/mesalink.c vtls/nss.c vtls/openssl.c \
+ vtls/schannel.c vtls/schannel_verify.c vtls/sectransp.c vtls/vtls.c \
+ vtls/wolfssl.c
-LIB_VTLS_HFILES = vtls/openssl.h vtls/vtls.h vtls/gtls.h vtls/nssg.h \
- vtls/mbedtls_threadlock.h vtls/wolfssl.h vtls/schannel.h \
- vtls/sectransp.h vtls/gskit.h vtls/mbedtls.h vtls/mesalink.h \
- vtls/bearssl.h
+LIB_VTLS_HFILES = vtls/bearssl.h vtls/gskit.h vtls/gtls.h vtls/mbedtls.h \
+ vtls/mbedtls_threadlock.h vtls/mesalink.h vtls/nssg.h vtls/openssl.h \
+ vtls/schannel.h vtls/sectransp.h vtls/vtls.h vtls/wolfssl.h
LIB_VQUIC_CFILES = vquic/ngtcp2.c vquic/quiche.c
LIB_VQUIC_HFILES = vquic/ngtcp2.h vquic/quiche.h
-LIB_VSSH_CFILES = vssh/libssh2.c vssh/libssh.c vssh/wolfssh.c
+LIB_VSSH_CFILES = vssh/libssh.c vssh/libssh2.c vssh/wolfssh.c
LIB_VSSH_HFILES = vssh/ssh.h
-LIB_CFILES = file.c timeval.c base64.c hostip.c progress.c formdata.c \
- cookie.c http.c sendf.c ftp.c url.c dict.c if2ip.c speedcheck.c \
- ldap.c version.c getenv.c escape.c mprintf.c telnet.c netrc.c \
- getinfo.c transfer.c strcase.c easy.c security.c curl_fnmatch.c \
- fileinfo.c ftplistparser.c wildcard.c krb5.c memdebug.c http_chunks.c \
- strtok.c connect.c llist.c hash.c multi.c content_encoding.c share.c \
- http_digest.c md4.c md5.c http_negotiate.c inet_pton.c strtoofft.c \
- strerror.c amigaos.c hostasyn.c hostip4.c hostip6.c hostsyn.c \
- inet_ntop.c parsedate.c select.c tftp.c splay.c strdup.c socks.c \
- curl_addrinfo.c socks_gssapi.c socks_sspi.c \
- curl_sspi.c slist.c nonblock.c curl_memrchr.c imap.c pop3.c smtp.c \
- pingpong.c rtsp.c curl_threads.c warnless.c hmac.c curl_rtmp.c \
- openldap.c curl_gethostname.c gopher.c idn_win32.c \
- http_proxy.c non-ascii.c asyn-ares.c asyn-thread.c curl_gssapi.c \
- http_ntlm.c curl_ntlm_wb.c curl_ntlm_core.c curl_sasl.c rand.c \
- curl_multibyte.c hostcheck.c conncache.c dotdot.c \
- x509asn1.c http2.c smb.c curl_endian.c curl_des.c system_win32.c \
- mime.c sha256.c setopt.c curl_path.c curl_ctype.c curl_range.c psl.c \
- doh.c urlapi.c curl_get_line.c altsvc.c socketpair.c rename.c
+LIB_CFILES = altsvc.c amigaos.c asyn-ares.c asyn-thread.c base64.c \
+ conncache.c connect.c content_encoding.c cookie.c curl_addrinfo.c \
+ curl_ctype.c curl_des.c curl_endian.c curl_fnmatch.c curl_get_line.c \
+ curl_gethostname.c curl_gssapi.c curl_memrchr.c curl_multibyte.c \
+ curl_ntlm_core.c curl_ntlm_wb.c curl_path.c curl_range.c curl_rtmp.c \
+ curl_sasl.c curl_sspi.c curl_threads.c dict.c dotdot.c easy.c escape.c \
+ file.c fileinfo.c formdata.c ftp.c url.c ftplistparser.c getenv.c getinfo.c \
+ gopher.c hash.c hmac.c hostasyn.c hostcheck.c hostip.c hostip4.c hostip6.c \
+ hostsyn.c http.c http2.c http_chunks.c http_digest.c http_negotiate.c \
+ http_ntlm.c http_proxy.c idn_win32.c if2ip.c imap.c inet_ntop.c inet_pton.c \
+ krb5.c ldap.c llist.c md4.c md5.c memdebug.c mime.c mprintf.c mqtt.c \
+ multi.c netrc.c non-ascii.c nonblock.c openldap.c parsedate.c pingpong.c \
+ pop3.c progress.c psl.c doh.c rand.c rename.c rtsp.c security.c select.c \
+ sendf.c setopt.c sha256.c share.c slist.c smb.c smtp.c socketpair.c socks.c \
+ socks_gssapi.c socks_sspi.c speedcheck.c splay.c strcase.c strdup.c \
+ strerror.c strtok.c strtoofft.c system_win32.c telnet.c tftp.c timeval.c \
+ transfer.c urlapi.c version.c warnless.c wildcard.c x509asn1.c
-LIB_HFILES = arpa_telnet.h netrc.h file.h timeval.h hostip.h progress.h \
- formdata.h cookie.h http.h sendf.h ftp.h url.h dict.h if2ip.h \
- speedcheck.h urldata.h curl_ldap.h escape.h telnet.h getinfo.h \
- strcase.h curl_sec.h memdebug.h http_chunks.h curl_fnmatch.h \
- wildcard.h fileinfo.h ftplistparser.h strtok.h connect.h llist.h \
- hash.h content_encoding.h share.h curl_md4.h curl_md5.h http_digest.h \
- http_negotiate.h inet_pton.h amigaos.h strtoofft.h strerror.h \
- inet_ntop.h curlx.h curl_memory.h curl_setup.h transfer.h select.h \
- easyif.h multiif.h parsedate.h tftp.h sockaddr.h splay.h strdup.h \
- socks.h curl_base64.h curl_addrinfo.h curl_sspi.h \
- slist.h nonblock.h curl_memrchr.h imap.h pop3.h smtp.h pingpong.h \
- rtsp.h curl_threads.h warnless.h curl_hmac.h curl_rtmp.h \
- curl_gethostname.h gopher.h http_proxy.h non-ascii.h asyn.h \
- http_ntlm.h curl_gssapi.h curl_ntlm_wb.h curl_ntlm_core.h \
- curl_sasl.h curl_multibyte.h hostcheck.h conncache.h \
- curl_setup_once.h multihandle.h setup-vms.h dotdot.h \
- x509asn1.h http2.h sigpipe.h smb.h curl_endian.h curl_des.h \
- curl_printf.h system_win32.h rand.h mime.h curl_sha256.h setopt.h \
- curl_path.h curl_ctype.h curl_range.h psl.h doh.h urlapi-int.h \
- curl_get_line.h altsvc.h quic.h socketpair.h rename.h
+LIB_HFILES = altsvc.h amigaos.h arpa_telnet.h asyn.h conncache.h connect.h \
+ content_encoding.h cookie.h curl_addrinfo.h curl_base64.h curl_ctype.h \
+ curl_des.h curl_endian.h curl_fnmatch.h curl_get_line.h curl_gethostname.h \
+ curl_gssapi.h curl_hmac.h curl_ldap.h curl_md4.h curl_md5.h curl_memory.h \
+ curl_memrchr.h curl_multibyte.h curl_ntlm_core.h curl_ntlm_wb.h curl_path.h \
+ curl_printf.h curl_range.h curl_rtmp.h curl_sasl.h curl_sec.h curl_setup.h \
+ curl_setup_once.h curl_sha256.h curl_sspi.h curl_threads.h curlx.h dict.h \
+ dotdot.h easyif.h escape.h file.h fileinfo.h formdata.h ftp.h url.h \
+ ftplistparser.h getinfo.h gopher.h hash.h hostcheck.h hostip.h http.h \
+ http2.h http_chunks.h http_digest.h http_negotiate.h http_ntlm.h \
+ http_proxy.h if2ip.h imap.h inet_ntop.h inet_pton.h llist.h memdebug.h \
+ mime.h mqtt.h multihandle.h multiif.h netrc.h non-ascii.h nonblock.h \
+ parsedate.h pingpong.h pop3.h progress.h psl.h doh.h quic.h rand.h rename.h \
+ rtsp.h select.h sendf.h setopt.h setup-vms.h share.h sigpipe.h slist.h \
+ smb.h smtp.h sockaddr.h socketpair.h socks.h speedcheck.h splay.h strcase.h \
+ strdup.h strerror.h strtok.h strtoofft.h system_win32.h telnet.h tftp.h \
+ timeval.h transfer.h urlapi-int.h urldata.h warnless.h wildcard.h \
+ x509asn1.h
LIB_RCFILES = libcurl.rc
diff --git a/lib/curl_config.h.cmake b/lib/curl_config.h.cmake
index 24b693eec..57a86e50a 100644
--- a/lib/curl_config.h.cmake
+++ b/lib/curl_config.h.cmake
@@ -63,6 +63,9 @@
/* to disable LDAPS */
#cmakedefine CURL_DISABLE_LDAPS 1
+/* to enable MQTT */
+#undef CURL_ENABLE_MQTT
+
/* to disable POP3 */
#cmakedefine CURL_DISABLE_POP3 1
diff --git a/lib/mqtt.c b/lib/mqtt.c
new file mode 100644
index 000000000..3e244694d
--- /dev/null
+++ b/lib/mqtt.c
@@ -0,0 +1,561 @@
+/***************************************************************************
+ * _ _ ____ _
+ * Project ___| | | | _ \| |
+ * / __| | | | |_) | |
+ * | (__| |_| | _ <| |___
+ * \___|\___/|_| \_\_____|
+ *
+ * Copyright (C) 2020, Daniel Stenberg, <daniel@haxx.se>, et al.
+ * Copyright (C) 2019, Björn Stenberg, <bjorn@haxx.se>
+ *
+ * This software is licensed as described in the file COPYING, which
+ * you should have received as part of this distribution. The terms
+ * are also available at https://curl.haxx.se/docs/copyright.html.
+ *
+ * You may opt to use, copy, modify, merge, publish, distribute and/or sell
+ * copies of the Software, and permit persons to whom the Software is
+ * furnished to do so, under the terms of the COPYING file.
+ *
+ * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
+ * KIND, either express or implied.
+ *
+ ***************************************************************************/
+
+#include "curl_setup.h"
+
+#ifdef CURL_ENABLE_MQTT
+
+#include "urldata.h"
+#include <curl/curl.h>
+#include "transfer.h"
+#include "sendf.h"
+#include "progress.h"
+#include "mqtt.h"
+#include "select.h"
+#include "strdup.h"
+#include "url.h"
+#include "escape.h"
+#include "warnless.h"
+#include "curl_printf.h"
+#include "curl_memory.h"
+#include "multiif.h"
+#include "rand.h"
+
+/* The last #include file should be: */
+#include "memdebug.h"
+
+#define MQTT_MSG_CONNECT 0x10
+#define MQTT_MSG_CONNACK 0x20
+#define MQTT_MSG_PUBLISH 0x30
+#define MQTT_MSG_SUBSCRIBE 0x82
+#define MQTT_MSG_SUBACK 0x90
+#define MQTT_MSG_DISCONNECT 0xe0
+
+#define MQTT_CONNACK_LEN 4
+#define MQTT_SUBACK_LEN 5
+#define MQTT_CLIENTID_LEN 12 /* "curl0123abcd" */
+
+/*
+ * Forward declarations.
+ */
+
+static CURLcode mqtt_do(struct connectdata *conn, bool *done);
+static CURLcode mqtt_doing(struct connectdata *conn, bool *done);
+static int mqtt_getsock(struct connectdata *conn, curl_socket_t *sock);
+static CURLcode mqtt_setup_conn(struct connectdata *conn);
+
+/*
+ * MQTT protocol handler.
+ */
+
+const struct Curl_handler Curl_handler_mqtt = {
+ "MQTT", /* scheme */
+ mqtt_setup_conn, /* setup_connection */
+ mqtt_do, /* do_it */
+ ZERO_NULL, /* done */
+ ZERO_NULL, /* do_more */
+ ZERO_NULL, /* connect_it */
+ ZERO_NULL, /* connecting */
+ mqtt_doing, /* doing */
+ ZERO_NULL, /* proto_getsock */
+ mqtt_getsock, /* doing_getsock */
+ ZERO_NULL, /* domore_getsock */
+ ZERO_NULL, /* perform_getsock */
+ ZERO_NULL, /* disconnect */
+ ZERO_NULL, /* readwrite */
+ ZERO_NULL, /* connection_check */
+ PORT_MQTT, /* defport */
+ CURLPROTO_MQTT, /* protocol */
+ PROTOPT_NONE /* flags */
+};
+
+static CURLcode mqtt_setup_conn(struct connectdata *conn)
+{
+ /* allocate the HTTP-specific struct for the Curl_easy, only to survive
+ during this request */
+ struct MQTT *mq;
+ struct Curl_easy *data = conn->data;
+ DEBUGASSERT(data->req.protop == NULL);
+
+ mq = calloc(1, sizeof(struct MQTT));
+ if(!mq)
+ return CURLE_OUT_OF_MEMORY;
+ data->req.protop = mq;
+ return CURLE_OK;
+}
+
+static CURLcode mqtt_send(struct connectdata *conn,
+ char *buf, size_t len)
+{
+ CURLcode result = CURLE_OK;
+ curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
+ struct Curl_easy *data = conn->data;
+ struct MQTT *mq = data->req.protop;
+ ssize_t n;
+ result = Curl_write(conn, sockfd, buf, len, &n);
+ if(!result && data->set.verbose)
+ Curl_debug(data, CURLINFO_HEADER_OUT, buf, (size_t)n);
+ if(len != (size_t)n) {
+ size_t nsend = len - n;
+ char *sendleftovers = Curl_memdup(&buf[n], nsend);
+ if(!sendleftovers)
+ return CURLE_OUT_OF_MEMORY;
+ mq->sendleftovers = sendleftovers;
+ mq->nsend = nsend;
+ }
+ return result;
+}
+
+/* Generic function called by the multi interface to figure out what socket(s)
+ to wait for and for what actions during the DOING and PROTOCONNECT
+ states */
+static int mqtt_getsock(struct connectdata *conn,
+ curl_socket_t *sock)
+{
+ sock[0] = conn->sock[FIRSTSOCKET];
+ return GETSOCK_READSOCK(FIRSTSOCKET);
+}
+
+static CURLcode mqtt_connect(struct connectdata *conn)
+{
+ CURLcode result = CURLE_OK;
+ const size_t client_id_offset = 14;
+ const size_t packetlen = client_id_offset + MQTT_CLIENTID_LEN;
+ char client_id[MQTT_CLIENTID_LEN + 1] = "curl";
+ const size_t curl_len = strlen("curl");
+ char packet[32] = {
+ MQTT_MSG_CONNECT, /* packet type */
+ 0x00, /* remaining length */
+ 0x00, 0x04, /* protocol length */
+ 'M','Q','T','T', /* protocol name */
+ 0x04, /* protocol level */
+ 0x02, /* CONNECT flag: CleanSession */
+ 0x00, 0x3c, /* keep-alive 0 = disabled */
+ 0x00, 0x00 /* payload1 length */
+ };
+ packet[1] = (packetlen - 2) & 0x7f;
+ packet[client_id_offset - 1] = MQTT_CLIENTID_LEN;
+
+ result = Curl_rand_hex(conn->data, (unsigned char *)&client_id[curl_len],
+ MQTT_CLIENTID_LEN - curl_len + 1);
+ memcpy(&packet[client_id_offset], client_id, MQTT_CLIENTID_LEN);
+ infof(conn->data, "Using client id '%s'\n", client_id);
+ if(!result)
+ result = mqtt_send(conn, packet, packetlen);
+ return result;
+}
+
+static CURLcode mqtt_disconnect(struct connectdata *conn)
+{
+ CURLcode result = CURLE_OK;
+ result = mqtt_send(conn, (char *)"\xe0\x00", 2);
+ return result;
+}
+
+static CURLcode mqtt_verify_connack(struct connectdata *conn)
+{
+ CURLcode result;
+ curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
+ unsigned char readbuf[MQTT_CONNACK_LEN];
+ ssize_t nread;
+ struct Curl_easy *data = conn->data;
+
+ result = Curl_read(conn, sockfd, (char *)readbuf, MQTT_CONNACK_LEN, &nread);
+ if(result)
+ goto fail;
+
+ if(data->set.verbose)
+ Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
+
+ /* fixme */
+ if(nread < MQTT_CONNACK_LEN) {
+ result = CURLE_WEIRD_SERVER_REPLY;
+ goto fail;
+ }
+
+ /* verify CONNACK */
+ if(readbuf[0] != MQTT_MSG_CONNACK ||
+ readbuf[1] != 0x02 ||
+ readbuf[2] != 0x00 ||
+ readbuf[3] != 0x00) {
+ failf(data, "Expected %02x%02x%02x%02x but got %02x%02x%02x%02x",
+ MQTT_MSG_CONNACK, 0x02, 0x00, 0x00,
+ readbuf[0], readbuf[1], readbuf[2], readbuf[3]);
+ result = CURLE_WEIRD_SERVER_REPLY;
+ }
+
+fail:
+ return result;
+}
+
+static CURLcode mqtt_get_topic(struct connectdata *conn,
+ char **topic, size_t *topiclen)
+{
+ CURLcode result = CURLE_OK;
+ char *path = conn->data->state.up.path;
+
+ if(strlen(path) > 1) {
+ result = Curl_urldecode(conn->data, path + 1, 0, topic, topiclen, FALSE);
+ }
+ else {
+ failf(conn->data, "Error: No topic specified.");
+ result = CURLE_URL_MALFORMAT;
+ }
+ return result;
+}
+
+
+static int mqtt_encode_len(char *buf, size_t len)
+{
+ unsigned char encoded;
+ int i;
+
+ for(i = 0; (len > 0) && (i<4); i++) {
+ encoded = len % 0x80;
+ len /= 0x80;
+ if(len)
+ encoded |= 0x80;
+ buf[i] = encoded;
+ }
+
+ return i;
+}
+
+static CURLcode mqtt_subscribe(struct connectdata *conn)
+{
+ CURLcode result = CURLE_OK;
+ char *topic = NULL;
+ size_t topiclen;
+ unsigned char *packet = NULL;
+ size_t packetlen;
+ char encodedsize[4];
+ size_t n;
+
+ result = mqtt_get_topic(conn, &topic, &topiclen);
+ if(result)
+ goto fail;
+
+ conn->proto.mqtt.packetid++;
+
+ packetlen = topiclen + 5; /* packetid + topic (has a two byte length field)
+ + 2 bytes topic length + QoS byte */
+ n = mqtt_encode_len((char *)encodedsize, packetlen);
+ packetlen += n + 1; /* add one for the control packet type byte */
+
+ packet = malloc(packetlen);
+ if(!packet) {
+ result = CURLE_OUT_OF_MEMORY;
+ goto fail;
+ }
+
+ packet[0] = MQTT_MSG_SUBSCRIBE;
+ memcpy(&packet[1], encodedsize, n);
+ packet[1 + n] = (conn->proto.mqtt.packetid >> 8) & 0xff;
+ packet[2 + n] = conn->proto.mqtt.packetid & 0xff;
+ packet[3 + n] = (topiclen >> 8) & 0xff;
+ packet[4 + n ] = topiclen & 0xff;
+ memcpy(&packet[5 + n], topic, topiclen);
+ packet[5 + n + topiclen] = 0; /* QoS zero */
+
+ result = mqtt_send(conn, (char *)packet, packetlen);
+
+fail:
+ free(topic);
+ free(packet);
+ return result;
+}
+
+static CURLcode mqtt_verify_suback(struct connectdata *conn)
+{
+ CURLcode result;
+ curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
+ unsigned char readbuf[MQTT_SUBACK_LEN];
+ ssize_t nread;
+ struct mqtt_conn *mqtt = &conn->proto.mqtt;
+
+ result = Curl_read(conn, sockfd, (char *)readbuf, MQTT_SUBACK_LEN, &nread);
+ if(result)
+ goto fail;
+
+ if(conn->data->set.verbose)
+ Curl_debug(conn->data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
+
+ /* fixme */
+ if(nread < MQTT_SUBACK_LEN) {
+ result = CURLE_WEIRD_SERVER_REPLY;
+ goto fail;
+ }
+
+ /* verify SUBACK */
+ if(readbuf[0] != MQTT_MSG_SUBACK ||
+ readbuf[1] != 0x03 ||
+ readbuf[2] != ((mqtt->packetid >> 8) & 0xff) ||
+ readbuf[3] != (mqtt->packetid & 0xff) ||
+ readbuf[4] != 0x00)
+ result = CURLE_WEIRD_SERVER_REPLY;
+
+fail:
+ return result;
+}
+
+static CURLcode mqtt_publish(struct connectdata *conn)
+{
+ CURLcode result;
+ char *payload = conn->data->set.postfields;
+ size_t payloadlen = (size_t)conn->data->set.postfieldsize;
+ char *topic = NULL;
+ size_t topiclen;
+ unsigned char *pkt = NULL;
+ size_t i = 0;
+ size_t remaininglength;
+ size_t encodelen;
+ char encodedbytes[4];
+
+ result = mqtt_get_topic(conn, &topic, &topiclen);
+ if(result)
+ goto fail;
+
+ remaininglength = payloadlen + 2 + topiclen;
+ encodelen = mqtt_encode_len(encodedbytes, remaininglength);
+
+ /* add the control byte and the encoded remaining length */
+ pkt = malloc(remaininglength + 1 + encodelen);
+ if(!pkt) {
+ result = CURLE_OUT_OF_MEMORY;
+ goto fail;
+ }
+
+ /* assemble packet */
+ pkt[i++] = MQTT_MSG_PUBLISH;
+ memcpy(&pkt[i], encodedbytes, encodelen);
+ i += encodelen;
+ pkt[i++] = (topiclen >> 8) & 0xff;
+ pkt[i++] = (topiclen & 0xff);
+ memcpy(&pkt[i], topic, topiclen);
+ i += topiclen;
+ memcpy(&pkt[i], payload, payloadlen);
+ i += payloadlen;
+ result = mqtt_send(conn, (char *)pkt, i);
+
+fail:
+ free(pkt);
+ free(topic);
+ return result;
+}
+
+static size_t mqtt_decode_len(unsigned char *buf,
+ size_t buflen, size_t *lenbytes)
+{
+ size_t len = 0;
+ size_t mult = 1;
+ size_t i;
+ unsigned char encoded = 128;
+
+ for(i = 0; (i < buflen) && (encoded & 128); i++) {
+ encoded = buf[i];
+ len += (encoded & 127) * mult;
+ mult *= 128;
+ }
+
+ *lenbytes = i;
+
+ return len;
+}
+
+/* for the publish packet */
+#define MQTT_HEADER_LEN 5 /* max 5 bytes */
+
+static CURLcode mqtt_read_publish(struct connectdata *conn,
+ bool *done)
+{
+ CURLcode result;
+ curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
+ ssize_t nread;
+ struct Curl_easy *data = conn->data;
+ unsigned char *pkt = (unsigned char *)data->state.buffer;
+ size_t remlen, lenbytes;
+ struct mqtt_conn *mqtt = &conn->proto.mqtt;
+ struct MQTT *mq = data->req.protop;
+
+ switch(mqtt->state) {
+ case MQTT_SUBWAIT:
+ /* Read the initial byte and the entire Remaining Length field
+ in this state */
+ result = Curl_read(conn, sockfd, (char *)&pkt[mq->npacket], 1, &nread);
+ if(result)
+ goto end;
+ if(data->set.verbose)
+ Curl_debug(data, CURLINFO_HEADER_IN, (char *)&pkt[mq->npacket], 1);
+ /* we are expecting a PUBLISH message */
+ if(!mq->npacket && ((pkt[0] & 0xf0) != MQTT_MSG_PUBLISH)) {
+ if(pkt[0] == MQTT_MSG_DISCONNECT) {
+ infof(data, "Got DISCONNECT\n");
+ *done = TRUE;
+ goto end;
+ }
+ result = CURLE_WEIRD_SERVER_REPLY;
+ goto end;
+ }
+ else if((mq->npacket >= 1) && !(pkt[mq->npacket] & 0x80))
+ /* as long as the high bit is set in the length byte, we read one more
+ byte, then get the remainder of the PUBLISH */
+ mqtt->state = MQTT_SUB_REMAIN;
+ mq->npacket++;
+ if(mqtt->state == MQTT_SUBWAIT)
+ return result;
+
+ /* -- switched state -- */
+
+ /* remember the first byte */
+ mq->firstbyte = pkt[0];
+
+ remlen = mqtt_decode_len(&pkt[1], 4, &lenbytes);
+
+ infof(data, "Remaining length: %zd bytes\n", remlen);
+ Curl_pgrsSetDownloadSize(data, remlen);
+ data->req.bytecount = 0;
+ data->req.size = remlen;
+ mq->npacket = remlen; /* get this many bytes */
+ /* FALLTHROUGH */
+ case MQTT_SUB_REMAIN: {
+ /* read rest of packet, but no more. Cap to buffer size */
+ struct SingleRequest *k = &data->req;
+ size_t rest = mq->npacket;
+ if(rest > (size_t)data->set.buffer_size)
+ rest = (size_t)data->set.buffer_size;
+ result = Curl_read(conn, sockfd, (char *)pkt, rest, &nread);
+ if(result) {
+ if(CURLE_AGAIN == result) {
+ infof(data, "EEEE AAAAGAIN\n");
+ }
+ goto end;
+ }
+ if(data->set.verbose)
+ Curl_debug(data, CURLINFO_DATA_IN, (char *)pkt, (size_t)nread);
+
+ mq->npacket -= nread;
+ k->bytecount += nread;
+ Curl_pgrsSetDownloadCounter(data, k->bytecount);
+
+ /* if QoS is set, message contains packet id */
+
+ result = Curl_client_write(conn, CLIENTWRITE_BODY, (char *)pkt, nread);
+ if(result)
+ goto end;
+
+ if(!mq->npacket)
+ /* no more PUBLISH payload, back to subscribe wait state */
+ mqtt->state = MQTT_SUBWAIT;
+ break;
+ }
+ default:
+ DEBUGASSERT(NULL); /* illegal state */
+ result = CURLE_WEIRD_SERVER_REPLY;
+ goto end;
+ }
+ end:
+ return result;
+}
+
+static CURLcode mqtt_do(struct connectdata *conn, bool *done)
+{
+ CURLcode result = CURLE_OK;
+ struct Curl_easy *data = conn->data;
+ struct mqtt_conn *mqtt = &conn->proto.mqtt;
+
+ *done = FALSE; /* unconditionally */
+
+ result = mqtt_connect(conn);
+ if(result) {
+ failf(data, "Error %d sending MQTT CONN request", result);
+ return result;
+ }
+ mqtt->state = MQTT_CONNACK;
+ return CURLE_OK;
+}
+
+static CURLcode mqtt_doing(struct connectdata *conn, bool *done)
+{
+ CURLcode result = CURLE_OK;
+ struct mqtt_conn *mqtt = &conn->proto.mqtt;
+ struct Curl_easy *data = conn->data;
+ struct MQTT *mq = data->req.protop;
+
+ *done = FALSE;
+
+ if(mq->nsend) {
+ /* send the remainder of an outgoing packet */
+ char *ptr = mq->sendleftovers;
+ result = mqtt_send(conn, mq->sendleftovers, mq->nsend);
+ free(ptr);
+ if(result)
+ return result;
+ }
+
+ switch(mqtt->state) {
+ case MQTT_CONNACK:
+ result = mqtt_verify_connack(conn);
+ if(result)
+ break;
+
+ if(conn->data->set.httpreq == HTTPREQ_POST) {
+ result = mqtt_publish(conn);
+ if(!result) {
+ result = mqtt_disconnect(conn);
+ *done = TRUE;
+ }
+ }
+ else {
+ result = mqtt_subscribe(conn);
+ if(!result)
+ mqtt->state = MQTT_SUBACK;
+ }
+ break;
+
+ case MQTT_SUBACK:
+ result = mqtt_verify_suback(conn);
+ if(result)
+ break;
+
+ mqtt->state = MQTT_SUBWAIT;
+ break;
+
+ case MQTT_SUBWAIT:
+ case MQTT_SUB_REMAIN:
+ result = mqtt_read_publish(conn, done);
+ if(result)
+ break;
+ break;
+
+ default:
+ failf(conn->data, "State not handled yet");
+ *done = TRUE;
+ break;
+ }
+
+ if(result == CURLE_AGAIN)
+ result = CURLE_OK;
+ return result;
+}
+
+#endif /* CURL_ENABLE_MQTT */
diff --git a/lib/mqtt.h b/lib/mqtt.h
new file mode 100644
index 000000000..b5e447be5
--- /dev/null
+++ b/lib/mqtt.h
@@ -0,0 +1,49 @@
+#ifndef HEADER_CURL_MQTT_H
+#define HEADER_CURL_MQTT_H
+/***************************************************************************
+ * _ _ ____ _
+ * Project ___| | | | _ \| |
+ * / __| | | | |_) | |
+ * | (__| |_| | _ <| |___
+ * \___|\___/|_| \_\_____|
+ *
+ * Copyright (C) 2019 - 2020, Björn Stenberg, <bjorn@haxx.se>
+ *
+ * This software is licensed as described in the file COPYING, which
+ * you should have received as part of this distribution. The terms
+ * are also available at https://curl.haxx.se/docs/copyright.html.
+ *
+ * You may opt to use, copy, modify, merge, publish, distribute and/or sell
+ * copies of the Software, and permit persons to whom the Software is
+ * furnished to do so, under the terms of the COPYING file.
+ *
+ * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
+ * KIND, either express or implied.
+ *
+ ***************************************************************************/
+
+#ifdef CURL_ENABLE_MQTT
+extern const struct Curl_handler Curl_handler_mqtt;
+#endif
+
+struct mqtt_conn {
+ enum {
+ MQTT_CONNACK,
+ MQTT_SUBACK,
+ MQTT_SUBWAIT, /* wait for subscribe response */
+ MQTT_SUB_REMAIN /* wait for the remainder of the subscribe response */
+ } state;
+ unsigned int packetid;
+};
+
+/* protocol-specific transfer-related data */
+struct MQTT {
+ char *sendleftovers;
+ size_t nsend; /* size of sendleftovers */
+
+ /* when receving a PUBLISH */
+ size_t npacket; /* byte counter */
+ unsigned char firstbyte;
+};
+
+#endif /* HEADER_CURL_MQTT_H */
diff --git a/lib/url.c b/lib/url.c
index 4c62b50ec..03c274438 100644
--- a/lib/url.c
+++ b/lib/url.c
@@ -114,6 +114,7 @@ bool curl_win32_idn_to_ascii(const char *in, char **out);
#include "http_ntlm.h"
#include "curl_rtmp.h"
#include "gopher.h"
+#include "mqtt.h"
#include "http_proxy.h"
#include "conncache.h"
#include "multihandle.h"
@@ -232,6 +233,10 @@ static const struct Curl_handler * const protocols[] = {
&Curl_handler_gopher,
#endif
+#ifdef CURL_ENABLE_MQTT
+ &Curl_handler_mqtt,
+#endif
+
#ifdef USE_LIBRTMP
&Curl_handler_rtmp,
&Curl_handler_rtmpt,
diff --git a/lib/urldata.h b/lib/urldata.h
index 2a36c1147..6e426a29f 100644
--- a/lib/urldata.h
+++ b/lib/urldata.h
@@ -49,6 +49,7 @@
#define PORT_RTMPT PORT_HTTP
#define PORT_RTMPS PORT_HTTPS
#define PORT_GOPHER 70
+#define PORT_MQTT 1883
#define DICT_MATCH "/MATCH:"
#define DICT_MATCH2 "/M:"
@@ -128,6 +129,7 @@ typedef ssize_t (Curl_recv)(struct connectdata *conn, /* connection data */
#include "http.h"
#include "rtsp.h"
#include "smb.h"
+#include "mqtt.h"
#include "wildcard.h"
#include "multihandle.h"
#include "quic.h"
@@ -1081,6 +1083,7 @@ struct connectdata {
struct smb_conn smbc;
void *rtmp;
struct ldapconninfo *ldapc;
+ struct mqtt_conn mqtt;
} proto;
int cselect_bits; /* bitmask of socket events */
diff --git a/lib/version.c b/lib/version.c
index 4d7c2d0a3..47204e881 100644
--- a/lib/version.c
+++ b/lib/version.c
@@ -271,6 +271,9 @@ static const char * const protocols[] = {
"ldaps",
#endif
#endif
+#ifdef CURL_ENABLE_MQTT
+ "mqtt",
+#endif
#ifndef CURL_DISABLE_POP3
"pop3",
#endif