aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Stenberg <daniel@haxx.se>2006-08-03 22:57:04 +0000
committerDaniel Stenberg <daniel@haxx.se>2006-08-03 22:57:04 +0000
commitb9b06b00bf19657344656ee26933425140b8787c (patch)
treec6887a58500a423233b0c4b38ca57caafc41c2b9
parent51f258d1034171173771a4705afee323560dcca3 (diff)
This is now a working example using libevent and curl_multi_socket() for really
fast treatment of many simultaneous transfers
-rw-r--r--hiper/hipev.c314
1 files changed, 98 insertions, 216 deletions
diff --git a/hiper/hipev.c b/hiper/hipev.c
index e99f9086a..936dfeebb 100644
--- a/hiper/hipev.c
+++ b/hiper/hipev.c
@@ -10,8 +10,7 @@
* Connect N connections. Z are idle, and X are active. Transfer as fast as
* possible.
*
- * Run for a specific amount of time (10 secs for now). Output detailed timing
- * information.
+ * Output detailed timing information.
*
* Uses libevent.
*
@@ -50,16 +49,6 @@
when using asynch supported libcurl. */
#define IDLE_TIME 10
-struct ourfdset {
- /* __fds_bits is what the Linux glibc headers use when they declare the
- fd_set struct so by using this we can actually avoid the typecase for the
- FD_SET() macro usage but it would hardly be portable */
- char __fds_bits[NCONNECTIONS/8];
-};
-#define FD2_ZERO(x) memset(x, 0, sizeof(struct ourfdset))
-
-typedef struct ourfdset fd2_set;
-
struct globalinfo {
size_t dlcounter;
};
@@ -73,6 +62,8 @@ struct connection {
char error[CURL_ERROR_SIZE];
};
+/* this is the struct associated with each file descriptor libcurl tells us
+ it is dealing with */
struct fdinfo {
/* create a link list of fdinfo structs */
struct fdinfo *next;
@@ -91,15 +82,64 @@ static struct fdinfo *allsocks;
static int running_handles;
+/* we have the timerevent global so that when the final socket-based event is
+ done, we can remove the timerevent as well */
+static struct event timerevent;
+
/* called from libevent on action on a particular socket ("event") */
static void eventcallback(int fd, short type, void *userp)
{
struct fdinfo *fdp = (struct fdinfo *)userp;
+ CURLMcode rc;
+
+ fprintf(stderr, "EVENT callback type %d\n", type);
+
+ /* tell libcurl to deal with the transfer associated with this socket */
+ do {
+ rc = curl_multi_socket(fdp->multi, fd, fdp->running_handles);
+ } while (rc == CURLM_CALL_MULTI_PERFORM);
+
+ if(rc) {
+ fprintf(stderr, "curl_multi_socket() returned %d\n", (int)rc);
+ }
+
+ fprintf(stderr, "running_handles: %d\n", *fdp->running_handles);
+ if(!*fdp->running_handles) {
+ /* last transfer is complete, kill pending timeout */
+ fprintf(stderr, "last transfer done, kill timeout\n");
+ if(evtimer_pending(&timerevent, NULL))
+ evtimer_del(&timerevent);
+ }
+}
+
+/* called from libevent when our timer event expires */
+static void timercallback(int fd, short type, void *userp)
+{
+ (void)fd; /* not used for this */
+ (void)type; /* ignored in here */
+ CURLM *multi_handle = (CURLM *)userp;
+ long timeout_ms;
+ struct timeval timeout;
+ int running_handles;
+ CURLMcode rc;
- fprintf(stderr, "EVENT callback\n");
+ fprintf(stderr, "EVENT timeout\n");
/* tell libcurl to deal with the transfer associated with this socket */
- curl_multi_socket(fdp->multi, fd, fdp->running_handles);
+ do {
+ rc = curl_multi_socket(multi_handle, CURL_SOCKET_TIMEOUT,
+ &running_handles);
+ } while (rc == CURLM_CALL_MULTI_PERFORM);
+
+ if(running_handles) {
+ /* Get the current timeout value from libcurl and set a new timeout */
+ curl_multi_timeout(multi_handle, &timeout_ms);
+
+ /* convert ms to timeval */
+ timeout.tv_sec = timeout_ms/1000;
+ timeout.tv_usec = (timeout_ms%1000)*1000;
+ evtimer_add(&timerevent, &timeout);
+ }
}
static void remsock(struct fdinfo *f)
@@ -108,6 +148,9 @@ static void remsock(struct fdinfo *f)
/* did not find socket to remove! */
return;
+ if(f->evset)
+ event_del(&f->ev);
+
if(f->prev)
f->prev->next = f->next;
if(f->next)
@@ -128,16 +171,22 @@ static void setsock(struct fdinfo *fdp, curl_socket_t s, CURL *easy,
/* first remove the existing event if the old setup was used */
event_del(&fdp->ev);
- /* now use and add the current socket setup */
+ /* now use and add the current socket setup to libevent. The EV_PERSIST is
+ the key here as otherwise libevent will automatically remove the event
+ when it occurs the first time */
event_set(&fdp->ev, fdp->sockfd,
(action&CURL_POLL_IN?EV_READ:0)|
- (action&CURL_POLL_OUT?EV_WRITE:0),
+ (action&CURL_POLL_OUT?EV_WRITE:0)| EV_PERSIST,
eventcallback, fdp);
fdp->evset=1;
fprintf(stderr, "event_add() for fd %d\n", s);
- event_add(&fdp->ev, NULL); /* no timeout */
+
+ /* We don't use any socket-specific timeout but intead we use a single
+ global one. This is (mostly) because libcurl doesn't expose any
+ particular socket- based timeout value. */
+ event_add(&fdp->ev, NULL);
}
static void addsock(curl_socket_t s, CURL *easy, int action, CURLM *multi)
@@ -162,55 +211,17 @@ static void addsock(curl_socket_t s, CURL *easy, int action, CURLM *multi)
curl_multi_assign(multi, s, fdp);
}
-static void fdinfo2fdset(fd2_set *fdread, fd2_set *fdwrite, int *maxfd)
-{
- struct fdinfo *fdp = allsocks;
- int writable=0;
-
- FD2_ZERO(fdread);
- FD2_ZERO(fdwrite);
-
- *maxfd = 0;
-
-#if 0
- printf("Wait for: ");
-#endif
-
- while(fdp) {
- if(fdp->action & CURL_POLL_IN) {
- FD_SET(fdp->sockfd, (fd_set *)fdread);
- }
- if(fdp->action & CURL_POLL_OUT) {
- FD_SET(fdp->sockfd, (fd_set *)fdwrite);
- writable++;
- }
-
-#if 0
- printf("%d (%s%s) ",
- fdp->sockfd,
- (fdp->action & CURL_POLL_IN)?"r":"",
- (fdp->action & CURL_POLL_OUT)?"w":"");
-#endif
-
- if(fdp->sockfd > *maxfd)
- *maxfd = fdp->sockfd;
-
- fdp = fdp->next;
- }
-#if 0
- if(writable)
- printf("Check for %d writable sockets\n", writable);
-#endif
-}
-
/* on port 8999 we run a fork enabled sws that supports 'idle' and 'stream' */
#define PORT "8999"
-#define HOST "192.168.1.13"
+#define HOST "127.0.0.1"
#define URL_IDLE "http://" HOST ":" PORT "/1000"
+#if 1
#define URL_ACTIVE "http://" HOST ":" PORT "/1001"
-
+#else
+#define URL_ACTIVE "http://localhost/"
+#endif
static int socket_callback(CURL *easy, /* easy handle */
curl_socket_t s, /* socket */
@@ -219,15 +230,24 @@ static int socket_callback(CURL *easy, /* easy handle */
void *socketp) /* socket pointer */
{
struct fdinfo *fdp = (struct fdinfo *)socketp;
+ char *whatstr[]={
+ "none",
+ "IN",
+ "OUT",
+ "INOUT",
+ "REMOVE"};
- fprintf(stderr, "socket %d easy %p what %d\n", s, easy, what);
+ fprintf(stderr, "socket %d easy %p what %s\n", s, easy,
+ whatstr[what]);
if(what == CURL_POLL_REMOVE)
remsock(fdp);
else {
if(!fdp) {
/* not previously known, add it and set association */
- printf("Add info for socket %d (%d)\n", s, what);
+ printf("Add info for socket %d %s%s\n", s,
+ what&CURL_POLL_IN?"READ":"",
+ what&CURL_POLL_OUT?"WRITE":"" );
addsock(s, easy, what, cbp);
}
else {
@@ -250,135 +270,19 @@ writecallback(void *ptr, size_t size, size_t nmemb, void *data)
c->dlcounter += realsize;
c->global->dlcounter += realsize;
-#if 1
printf("%02d: %d, total %d\n",
c->id, c->dlcounter, c->global->dlcounter);
-#endif
- return realsize;
-}
-
-/* return the diff between two timevals, in us */
-static long tvdiff(struct timeval *newer, struct timeval *older)
-{
- return (newer->tv_sec-older->tv_sec)*1000000+
- (newer->tv_usec-older->tv_usec);
-}
-
-
-/* store the start time of the program in this variable */
-static struct timeval timer;
-static void timer_start(void)
-{
- /* capture the time of the start moment */
- gettimeofday(&timer, NULL);
-}
-
-static struct timeval cont; /* at this moment we continued */
-
-int still_running; /* keep number of running handles */
-
-struct conncount {
- long time_us;
- long laps;
- long maxtime;
-};
-
-static struct timeval timerpause;
-static void timer_pause(void)
-{
- /* capture the time of the pause moment */
- gettimeofday(&timerpause, NULL);
-
- /* If we have a previous continue (all times except the first), we can now
- store the time for a whole "lap" */
- if(cont.tv_sec) {
- long lap;
-
- lap = tvdiff(&timerpause, &cont);
- }
-}
-
-static long paused; /* amount of us we have been pausing */
-
-static void timer_continue(void)
-{
- /* Capture the time of the restored operation moment, now calculate how long
- time we were paused and added that to the 'paused' variable.
- */
- gettimeofday(&cont, NULL);
-
- paused += tvdiff(&cont, &timerpause);
-}
-
-static long total; /* amount of us from start to stop */
-static void timer_total(void)
-{
- struct timeval stop;
- /* Capture the time of the operation stopped moment, now calculate how long
- time we were running and how much of that pausing.
- */
- gettimeofday(&stop, NULL);
-
- total = tvdiff(&stop, &timer);
+ return realsize;
}
struct globalinfo info;
struct connection *conns;
-long selects;
-long timeouts;
-
-long multi_socket;
-long performalive;
-long performselect;
-long topselect;
-
int num_total;
int num_idle;
int num_active;
-static void report(void)
-{
- int i;
- long active = total - paused;
- long numdl = 0;
-
- for(i=0; i < num_total; i++) {
- if(conns[i].dlcounter)
- numdl++;
- }
-
- printf("Summary from %d simultanoues transfers (%d active)\n",
- num_total, num_active);
- printf("%d out of %d connections provided data\n", numdl, num_total);
-
- printf("Total time: %ldus paused: %ldus curl_multi_socket(): %ldus\n",
- total, paused, active);
-
- printf("%d calls to select() "
- "Average time: %dus\n",
- selects, paused/selects);
- printf(" Average number of readable connections per select() return: %d\n",
- performselect/selects);
-
- printf(" Max number of readable connections for a single select() "
- "return: %d\n",
- topselect);
-
- printf("%ld calls to multi_socket(), "
- "Average time: %ldus\n",
- multi_socket, active/multi_socket);
-
- printf("%ld select() timeouts\n", timeouts);
-
- printf("Downloaded %ld bytes in %ld bytes/sec, %ld usec/byte\n",
- info.dlcounter,
- info.dlcounter/(total/1000000),
- total/info.dlcounter);
-
-}
-
int main(int argc, char **argv)
{
CURLM *multi_handle;
@@ -387,11 +291,11 @@ int main(int argc, char **argv)
CURLMcode mcode = CURLM_OK;
int rc;
int i;
- fd2_set fdsizecheck;
int selectmaxamount;
struct fdinfo *fdp;
char act;
long timeout_ms;
+ struct timeval timeout;
memset(&info, 0, sizeof(struct globalinfo));
@@ -462,45 +366,25 @@ int main(int argc, char **argv)
curl_multi_setopt(multi_handle, CURLMOPT_SOCKETFUNCTION, socket_callback);
curl_multi_setopt(multi_handle, CURLMOPT_SOCKETDATA, multi_handle);
- /* we start the action by calling *socket() right away */
+ /* we start the action by calling *socket_all() */
while(CURLM_CALL_MULTI_PERFORM == curl_multi_socket_all(multi_handle,
&running_handles));
- /* event_dispatch() isn't good enough for us, since we need a global timeout
- to occur after a given time of inactivity
- */
-
- /* get the timeout value from libcurl */
+ /* Since we need a global timeout to occur after a given time of inactivity,
+ we add a single timeout-event. Get the timeout value from libcurl */
curl_multi_timeout(multi_handle, &timeout_ms);
+ /* convert ms to timeval */
+ timeout.tv_sec = timeout_ms/1000;
+ timeout.tv_usec = (timeout_ms%1000)*1000;
+ evtimer_set(&timerevent, timercallback, multi_handle);
+ evtimer_add(&timerevent, &timeout);
- while(running_handles) {
- struct timeval timeout;
+ /* event_dispatch() runs the event main loop. It ends when no events are
+ left to wait for. */
- /* convert ms to timeval */
- timeout.tv_sec = timeout_ms/1000;
- timeout.tv_usec = (timeout_ms%1000)*1000;
-
- event_loopexit(&timeout);
-
- /* The event_loopexit() function may have taken a while and it may or may
- not have invoked libcurl calls during that time. During those calls,
- the timeout situation might very well have changed, so we check the
- timeout time again to see if we really need to call curl_multi_socket()
- at this point! */
-
- /* get the timeout value from libcurl */
- curl_multi_timeout(multi_handle, &timeout_ms);
+ event_dispatch();
- if(timeout_ms <= 0) {
- /* no time left */
- curl_multi_socket(multi_handle, CURL_SOCKET_TIMEOUT, &running_handles);
-
- /* and get the new timeout value again */
- curl_multi_timeout(multi_handle, &timeout_ms);
- }
- }
-
- if(still_running != num_total) {
+ {
/* something made connections fail, extract the reason and tell */
int msgs_left;
struct connection *cptr;
@@ -508,10 +392,10 @@ int main(int argc, char **argv)
if (msg->msg == CURLMSG_DONE) {
curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &cptr);
- printf("%d => (%d) %s", cptr->id, msg->data.result, cptr->error);
+ printf("%d => (%d) %s\n",
+ cptr->id, msg->data.result, cptr->error);
}
}
-
}
curl_multi_cleanup(multi_handle);
@@ -520,7 +404,5 @@ int main(int argc, char **argv)
for(i=0; i< num_total; i++)
curl_easy_cleanup(conns[i].e);
- report();
-
return code;
}