From b9b06b00bf19657344656ee26933425140b8787c Mon Sep 17 00:00:00 2001 From: Daniel Stenberg Date: Thu, 3 Aug 2006 22:57:04 +0000 Subject: This is now a working example using libevent and curl_multi_socket() for really fast treatment of many simultaneous transfers --- hiper/hipev.c | 314 ++++++++++++++++++---------------------------------------- 1 file changed, 98 insertions(+), 216 deletions(-) diff --git a/hiper/hipev.c b/hiper/hipev.c index e99f9086a..936dfeebb 100644 --- a/hiper/hipev.c +++ b/hiper/hipev.c @@ -10,8 +10,7 @@ * Connect N connections. Z are idle, and X are active. Transfer as fast as * possible. * - * Run for a specific amount of time (10 secs for now). Output detailed timing - * information. + * Output detailed timing information. * * Uses libevent. * @@ -50,16 +49,6 @@ when using asynch supported libcurl. */ #define IDLE_TIME 10 -struct ourfdset { - /* __fds_bits is what the Linux glibc headers use when they declare the - fd_set struct so by using this we can actually avoid the typecase for the - FD_SET() macro usage but it would hardly be portable */ - char __fds_bits[NCONNECTIONS/8]; -}; -#define FD2_ZERO(x) memset(x, 0, sizeof(struct ourfdset)) - -typedef struct ourfdset fd2_set; - struct globalinfo { size_t dlcounter; }; @@ -73,6 +62,8 @@ struct connection { char error[CURL_ERROR_SIZE]; }; +/* this is the struct associated with each file descriptor libcurl tells us + it is dealing with */ struct fdinfo { /* create a link list of fdinfo structs */ struct fdinfo *next; @@ -91,15 +82,64 @@ static struct fdinfo *allsocks; static int running_handles; +/* we have the timerevent global so that when the final socket-based event is + done, we can remove the timerevent as well */ +static struct event timerevent; + /* called from libevent on action on a particular socket ("event") */ static void eventcallback(int fd, short type, void *userp) { struct fdinfo *fdp = (struct fdinfo *)userp; + CURLMcode rc; + + fprintf(stderr, "EVENT callback type %d\n", type); + + /* tell libcurl to deal with the transfer associated with this socket */ + do { + rc = curl_multi_socket(fdp->multi, fd, fdp->running_handles); + } while (rc == CURLM_CALL_MULTI_PERFORM); + + if(rc) { + fprintf(stderr, "curl_multi_socket() returned %d\n", (int)rc); + } + + fprintf(stderr, "running_handles: %d\n", *fdp->running_handles); + if(!*fdp->running_handles) { + /* last transfer is complete, kill pending timeout */ + fprintf(stderr, "last transfer done, kill timeout\n"); + if(evtimer_pending(&timerevent, NULL)) + evtimer_del(&timerevent); + } +} + +/* called from libevent when our timer event expires */ +static void timercallback(int fd, short type, void *userp) +{ + (void)fd; /* not used for this */ + (void)type; /* ignored in here */ + CURLM *multi_handle = (CURLM *)userp; + long timeout_ms; + struct timeval timeout; + int running_handles; + CURLMcode rc; - fprintf(stderr, "EVENT callback\n"); + fprintf(stderr, "EVENT timeout\n"); /* tell libcurl to deal with the transfer associated with this socket */ - curl_multi_socket(fdp->multi, fd, fdp->running_handles); + do { + rc = curl_multi_socket(multi_handle, CURL_SOCKET_TIMEOUT, + &running_handles); + } while (rc == CURLM_CALL_MULTI_PERFORM); + + if(running_handles) { + /* Get the current timeout value from libcurl and set a new timeout */ + curl_multi_timeout(multi_handle, &timeout_ms); + + /* convert ms to timeval */ + timeout.tv_sec = timeout_ms/1000; + timeout.tv_usec = (timeout_ms%1000)*1000; + evtimer_add(&timerevent, &timeout); + } } static void remsock(struct fdinfo *f) @@ -108,6 +148,9 @@ static void remsock(struct fdinfo *f) /* did not find socket to remove! */ return; + if(f->evset) + event_del(&f->ev); + if(f->prev) f->prev->next = f->next; if(f->next) @@ -128,16 +171,22 @@ static void setsock(struct fdinfo *fdp, curl_socket_t s, CURL *easy, /* first remove the existing event if the old setup was used */ event_del(&fdp->ev); - /* now use and add the current socket setup */ + /* now use and add the current socket setup to libevent. The EV_PERSIST is + the key here as otherwise libevent will automatically remove the event + when it occurs the first time */ event_set(&fdp->ev, fdp->sockfd, (action&CURL_POLL_IN?EV_READ:0)| - (action&CURL_POLL_OUT?EV_WRITE:0), + (action&CURL_POLL_OUT?EV_WRITE:0)| EV_PERSIST, eventcallback, fdp); fdp->evset=1; fprintf(stderr, "event_add() for fd %d\n", s); - event_add(&fdp->ev, NULL); /* no timeout */ + + /* We don't use any socket-specific timeout but intead we use a single + global one. This is (mostly) because libcurl doesn't expose any + particular socket- based timeout value. */ + event_add(&fdp->ev, NULL); } static void addsock(curl_socket_t s, CURL *easy, int action, CURLM *multi) @@ -162,55 +211,17 @@ static void addsock(curl_socket_t s, CURL *easy, int action, CURLM *multi) curl_multi_assign(multi, s, fdp); } -static void fdinfo2fdset(fd2_set *fdread, fd2_set *fdwrite, int *maxfd) -{ - struct fdinfo *fdp = allsocks; - int writable=0; - - FD2_ZERO(fdread); - FD2_ZERO(fdwrite); - - *maxfd = 0; - -#if 0 - printf("Wait for: "); -#endif - - while(fdp) { - if(fdp->action & CURL_POLL_IN) { - FD_SET(fdp->sockfd, (fd_set *)fdread); - } - if(fdp->action & CURL_POLL_OUT) { - FD_SET(fdp->sockfd, (fd_set *)fdwrite); - writable++; - } - -#if 0 - printf("%d (%s%s) ", - fdp->sockfd, - (fdp->action & CURL_POLL_IN)?"r":"", - (fdp->action & CURL_POLL_OUT)?"w":""); -#endif - - if(fdp->sockfd > *maxfd) - *maxfd = fdp->sockfd; - - fdp = fdp->next; - } -#if 0 - if(writable) - printf("Check for %d writable sockets\n", writable); -#endif -} - /* on port 8999 we run a fork enabled sws that supports 'idle' and 'stream' */ #define PORT "8999" -#define HOST "192.168.1.13" +#define HOST "127.0.0.1" #define URL_IDLE "http://" HOST ":" PORT "/1000" +#if 1 #define URL_ACTIVE "http://" HOST ":" PORT "/1001" - +#else +#define URL_ACTIVE "http://localhost/" +#endif static int socket_callback(CURL *easy, /* easy handle */ curl_socket_t s, /* socket */ @@ -219,15 +230,24 @@ static int socket_callback(CURL *easy, /* easy handle */ void *socketp) /* socket pointer */ { struct fdinfo *fdp = (struct fdinfo *)socketp; + char *whatstr[]={ + "none", + "IN", + "OUT", + "INOUT", + "REMOVE"}; - fprintf(stderr, "socket %d easy %p what %d\n", s, easy, what); + fprintf(stderr, "socket %d easy %p what %s\n", s, easy, + whatstr[what]); if(what == CURL_POLL_REMOVE) remsock(fdp); else { if(!fdp) { /* not previously known, add it and set association */ - printf("Add info for socket %d (%d)\n", s, what); + printf("Add info for socket %d %s%s\n", s, + what&CURL_POLL_IN?"READ":"", + what&CURL_POLL_OUT?"WRITE":"" ); addsock(s, easy, what, cbp); } else { @@ -250,135 +270,19 @@ writecallback(void *ptr, size_t size, size_t nmemb, void *data) c->dlcounter += realsize; c->global->dlcounter += realsize; -#if 1 printf("%02d: %d, total %d\n", c->id, c->dlcounter, c->global->dlcounter); -#endif - return realsize; -} - -/* return the diff between two timevals, in us */ -static long tvdiff(struct timeval *newer, struct timeval *older) -{ - return (newer->tv_sec-older->tv_sec)*1000000+ - (newer->tv_usec-older->tv_usec); -} - - -/* store the start time of the program in this variable */ -static struct timeval timer; -static void timer_start(void) -{ - /* capture the time of the start moment */ - gettimeofday(&timer, NULL); -} - -static struct timeval cont; /* at this moment we continued */ - -int still_running; /* keep number of running handles */ - -struct conncount { - long time_us; - long laps; - long maxtime; -}; - -static struct timeval timerpause; -static void timer_pause(void) -{ - /* capture the time of the pause moment */ - gettimeofday(&timerpause, NULL); - - /* If we have a previous continue (all times except the first), we can now - store the time for a whole "lap" */ - if(cont.tv_sec) { - long lap; - - lap = tvdiff(&timerpause, &cont); - } -} - -static long paused; /* amount of us we have been pausing */ - -static void timer_continue(void) -{ - /* Capture the time of the restored operation moment, now calculate how long - time we were paused and added that to the 'paused' variable. - */ - gettimeofday(&cont, NULL); - - paused += tvdiff(&cont, &timerpause); -} - -static long total; /* amount of us from start to stop */ -static void timer_total(void) -{ - struct timeval stop; - /* Capture the time of the operation stopped moment, now calculate how long - time we were running and how much of that pausing. - */ - gettimeofday(&stop, NULL); - - total = tvdiff(&stop, &timer); + return realsize; } struct globalinfo info; struct connection *conns; -long selects; -long timeouts; - -long multi_socket; -long performalive; -long performselect; -long topselect; - int num_total; int num_idle; int num_active; -static void report(void) -{ - int i; - long active = total - paused; - long numdl = 0; - - for(i=0; i < num_total; i++) { - if(conns[i].dlcounter) - numdl++; - } - - printf("Summary from %d simultanoues transfers (%d active)\n", - num_total, num_active); - printf("%d out of %d connections provided data\n", numdl, num_total); - - printf("Total time: %ldus paused: %ldus curl_multi_socket(): %ldus\n", - total, paused, active); - - printf("%d calls to select() " - "Average time: %dus\n", - selects, paused/selects); - printf(" Average number of readable connections per select() return: %d\n", - performselect/selects); - - printf(" Max number of readable connections for a single select() " - "return: %d\n", - topselect); - - printf("%ld calls to multi_socket(), " - "Average time: %ldus\n", - multi_socket, active/multi_socket); - - printf("%ld select() timeouts\n", timeouts); - - printf("Downloaded %ld bytes in %ld bytes/sec, %ld usec/byte\n", - info.dlcounter, - info.dlcounter/(total/1000000), - total/info.dlcounter); - -} - int main(int argc, char **argv) { CURLM *multi_handle; @@ -387,11 +291,11 @@ int main(int argc, char **argv) CURLMcode mcode = CURLM_OK; int rc; int i; - fd2_set fdsizecheck; int selectmaxamount; struct fdinfo *fdp; char act; long timeout_ms; + struct timeval timeout; memset(&info, 0, sizeof(struct globalinfo)); @@ -462,45 +366,25 @@ int main(int argc, char **argv) curl_multi_setopt(multi_handle, CURLMOPT_SOCKETFUNCTION, socket_callback); curl_multi_setopt(multi_handle, CURLMOPT_SOCKETDATA, multi_handle); - /* we start the action by calling *socket() right away */ + /* we start the action by calling *socket_all() */ while(CURLM_CALL_MULTI_PERFORM == curl_multi_socket_all(multi_handle, &running_handles)); - /* event_dispatch() isn't good enough for us, since we need a global timeout - to occur after a given time of inactivity - */ - - /* get the timeout value from libcurl */ + /* Since we need a global timeout to occur after a given time of inactivity, + we add a single timeout-event. Get the timeout value from libcurl */ curl_multi_timeout(multi_handle, &timeout_ms); + /* convert ms to timeval */ + timeout.tv_sec = timeout_ms/1000; + timeout.tv_usec = (timeout_ms%1000)*1000; + evtimer_set(&timerevent, timercallback, multi_handle); + evtimer_add(&timerevent, &timeout); - while(running_handles) { - struct timeval timeout; + /* event_dispatch() runs the event main loop. It ends when no events are + left to wait for. */ - /* convert ms to timeval */ - timeout.tv_sec = timeout_ms/1000; - timeout.tv_usec = (timeout_ms%1000)*1000; - - event_loopexit(&timeout); - - /* The event_loopexit() function may have taken a while and it may or may - not have invoked libcurl calls during that time. During those calls, - the timeout situation might very well have changed, so we check the - timeout time again to see if we really need to call curl_multi_socket() - at this point! */ - - /* get the timeout value from libcurl */ - curl_multi_timeout(multi_handle, &timeout_ms); + event_dispatch(); - if(timeout_ms <= 0) { - /* no time left */ - curl_multi_socket(multi_handle, CURL_SOCKET_TIMEOUT, &running_handles); - - /* and get the new timeout value again */ - curl_multi_timeout(multi_handle, &timeout_ms); - } - } - - if(still_running != num_total) { + { /* something made connections fail, extract the reason and tell */ int msgs_left; struct connection *cptr; @@ -508,10 +392,10 @@ int main(int argc, char **argv) if (msg->msg == CURLMSG_DONE) { curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &cptr); - printf("%d => (%d) %s", cptr->id, msg->data.result, cptr->error); + printf("%d => (%d) %s\n", + cptr->id, msg->data.result, cptr->error); } } - } curl_multi_cleanup(multi_handle); @@ -520,7 +404,5 @@ int main(int argc, char **argv) for(i=0; i< num_total; i++) curl_easy_cleanup(conns[i].e); - report(); - return code; } -- cgit v1.2.3