aboutsummaryrefslogtreecommitdiff
path: root/tests/http_pipe.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/http_pipe.py')
-rwxr-xr-xtests/http_pipe.py447
1 files changed, 447 insertions, 0 deletions
diff --git a/tests/http_pipe.py b/tests/http_pipe.py
new file mode 100755
index 000000000..67185be8c
--- /dev/null
+++ b/tests/http_pipe.py
@@ -0,0 +1,447 @@
+#!/usr/bin/python
+
+# Copyright 2012 Google Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Modified by Linus Nielsen Feltzing for inclusion in the libcurl test
+# framework
+#
+import SocketServer
+import argparse
+import re
+import select
+import socket
+import time
+import pprint
+import os
+
+INFO_MESSAGE = '''
+This is a test server to test the libcurl pipelining functionality.
+It is a modified version if Google's HTTP pipelining test server. More
+information can be found here:
+
+http://dev.chromium.org/developers/design-documents/network-stack/http-pipelining
+
+Source code can be found here:
+
+http://code.google.com/p/http-pipelining-test/
+'''
+MAX_REQUEST_SIZE = 1024 # bytes
+MIN_POLL_TIME = 0.01 # seconds. Minimum time to poll, in order to prevent
+ # excessive looping because Python refuses to poll for
+ # small timeouts.
+SEND_BUFFER_TIME = 0.5 # seconds
+TIMEOUT = 30 # seconds
+
+
+class Error(Exception):
+ pass
+
+
+class RequestTooLargeError(Error):
+ pass
+
+
+class ServeIndexError(Error):
+ pass
+
+
+class UnexpectedMethodError(Error):
+ pass
+
+
+class RequestParser(object):
+ """Parses an input buffer looking for HTTP GET requests."""
+
+ global logfile
+
+ LOOKING_FOR_GET = 1
+ READING_HEADERS = 2
+
+ HEADER_RE = re.compile('([^:]+):(.*)\n')
+ REQUEST_RE = re.compile('([^ ]+) ([^ ]+) HTTP/(\d+)\.(\d+)\n')
+
+ def __init__(self):
+ """Initializer."""
+ self._buffer = ""
+ self._pending_headers = {}
+ self._pending_request = ""
+ self._state = self.LOOKING_FOR_GET
+ self._were_all_requests_http_1_1 = True
+ self._valid_requests = []
+
+ def ParseAdditionalData(self, data):
+ """Finds HTTP requests in |data|.
+
+ Args:
+ data: (String) Newly received input data from the socket.
+
+ Returns:
+ (List of Tuples)
+ (String) The request path.
+ (Map of String to String) The header name and value.
+
+ Raises:
+ RequestTooLargeError: If the request exceeds MAX_REQUEST_SIZE.
+ UnexpectedMethodError: On a non-GET method.
+ Error: On a programming error.
+ """
+ logfile = open('log/server.input', 'a')
+ logfile.write(data)
+ logfile.close()
+ self._buffer += data.replace('\r', '')
+ should_continue_parsing = True
+ while should_continue_parsing:
+ if self._state == self.LOOKING_FOR_GET:
+ should_continue_parsing = self._DoLookForGet()
+ elif self._state == self.READING_HEADERS:
+ should_continue_parsing = self._DoReadHeader()
+ else:
+ raise Error('Unexpected state: ' + self._state)
+ if len(self._buffer) > MAX_REQUEST_SIZE:
+ raise RequestTooLargeError(
+ 'Request is at least %d bytes' % len(self._buffer))
+ valid_requests = self._valid_requests
+ self._valid_requests = []
+ return valid_requests
+
+ @property
+ def were_all_requests_http_1_1(self):
+ return self._were_all_requests_http_1_1
+
+ def _DoLookForGet(self):
+ """Tries to parse an HTTTP request line.
+
+ Returns:
+ (Boolean) True if a request was found.
+
+ Raises:
+ UnexpectedMethodError: On a non-GET method.
+ """
+ m = self.REQUEST_RE.match(self._buffer)
+ if not m:
+ return False
+ method, path, http_major, http_minor = m.groups()
+
+ if method != 'GET':
+ raise UnexpectedMethodError('Unexpected method: ' + method)
+ if path in ['/', '/index.htm', '/index.html']:
+ raise ServeIndexError()
+
+ if http_major != '1' or http_minor != '1':
+ self._were_all_requests_http_1_1 = False
+
+# print method, path
+
+ self._pending_request = path
+ self._buffer = self._buffer[m.end():]
+ self._state = self.READING_HEADERS
+ return True
+
+ def _DoReadHeader(self):
+ """Tries to parse a HTTP header.
+
+ Returns:
+ (Boolean) True if it found the end of the request or a HTTP header.
+ """
+ if self._buffer.startswith('\n'):
+ self._buffer = self._buffer[1:]
+ self._state = self.LOOKING_FOR_GET
+ self._valid_requests.append((self._pending_request,
+ self._pending_headers))
+ self._pending_headers = {}
+ self._pending_request = ""
+ return True
+
+ m = self.HEADER_RE.match(self._buffer)
+ if not m:
+ return False
+
+ header = m.group(1).lower()
+ value = m.group(2).strip().lower()
+ if header not in self._pending_headers:
+ self._pending_headers[header] = value
+ self._buffer = self._buffer[m.end():]
+ return True
+
+
+class ResponseBuilder(object):
+ """Builds HTTP responses for a list of accumulated requests."""
+
+ def __init__(self):
+ """Initializer."""
+ self._max_pipeline_depth = 0
+ self._requested_paths = []
+ self._processed_end = False
+ self._were_all_requests_http_1_1 = True
+
+ def QueueRequests(self, requested_paths, were_all_requests_http_1_1):
+ """Adds requests to the queue of requests.
+
+ Args:
+ requested_paths: (List of Strings) Requested paths.
+ """
+ self._requested_paths.extend(requested_paths)
+ self._were_all_requests_http_1_1 = were_all_requests_http_1_1
+
+ def Chunkify(self, data, chunksize):
+ """ Divides a string into chunks
+ """
+ return [hex(chunksize)[2:] + "\r\n" + data[i:i+chunksize] + "\r\n" for i in range(0, len(data), chunksize)]
+
+ def BuildResponses(self):
+ """Converts the queue of requests into responses.
+
+ Returns:
+ (String) Buffer containing all of the responses.
+ """
+ result = ""
+ self._max_pipeline_depth = max(self._max_pipeline_depth,
+ len(self._requested_paths))
+ for path, headers in self._requested_paths:
+ if path == '/verifiedserver':
+ body = "WE ROOLZ: {}\r\n".format(os.getpid());
+ result += self._BuildResponse(
+ '200 OK', ['Server: Apache',
+ 'Content-Length: {}'.format(len(body)),
+ 'Cache-Control: no-store'], body)
+
+ elif path == '/alphabet.txt':
+ body = 'abcdefghijklmnopqrstuvwxyz'
+ result += self._BuildResponse(
+ '200 OK', ['Server: Apache',
+ 'Content-Length: 26',
+ 'Cache-Control: no-store'], body)
+
+ elif path == '/reverse.txt':
+ body = 'zyxwvutsrqponmlkjihgfedcba'
+ result += self._BuildResponse(
+ '200 OK', ['Content-Length: 26', 'Cache-Control: no-store'], body)
+
+ elif path == '/chunked.txt':
+ body = ('7\r\nchunked\r\n'
+ '8\r\nencoding\r\n'
+ '2\r\nis\r\n'
+ '3\r\nfun\r\n'
+ '0\r\n\r\n')
+ result += self._BuildResponse(
+ '200 OK', ['Transfer-Encoding: chunked', 'Cache-Control: no-store'],
+ body)
+
+ elif path == '/cached.txt':
+ body = 'azbycxdwevfugthsirjqkplomn'
+ result += self._BuildResponse(
+ '200 OK', ['Content-Length: 26', 'Cache-Control: max-age=60'], body)
+
+ elif path == '/connection_close.txt':
+ body = 'azbycxdwevfugthsirjqkplomn'
+ result += self._BuildResponse(
+ '200 OK', ['Content-Length: 26', 'Cache-Control: max-age=60', 'Connection: close'], body)
+ self._processed_end = True
+
+ elif path == '/1k.txt':
+ str = '0123456789abcdef'
+ body = ''.join([str for num in xrange(64)])
+ result += self._BuildResponse(
+ '200 OK', ['Server: Apache',
+ 'Content-Length: 1024',
+ 'Cache-Control: max-age=60'], body)
+
+ elif path == '/10k.txt':
+ str = '0123456789abcdef'
+ body = ''.join([str for num in xrange(640)])
+ result += self._BuildResponse(
+ '200 OK', ['Server: Apache',
+ 'Content-Length: 10240',
+ 'Cache-Control: max-age=60'], body)
+
+ elif path == '/100k.txt':
+ str = '0123456789abcdef'
+ body = ''.join([str for num in xrange(6400)])
+ result += self._BuildResponse(
+ '200 OK',
+ ['Server: Apache',
+ 'Content-Length: 102400',
+ 'Cache-Control: max-age=60'],
+ body)
+
+ elif path == '/100k_chunked.txt':
+ str = '0123456789abcdef'
+ moo = ''.join([str for num in xrange(6400)])
+ body = self.Chunkify(moo, 20480)
+ body.append('0\r\n\r\n')
+ body = ''.join(body)
+
+ result += self._BuildResponse(
+ '200 OK', ['Transfer-Encoding: chunked', 'Cache-Control: no-store'], body)
+
+ elif path == '/stats.txt':
+ results = {
+ 'max_pipeline_depth': self._max_pipeline_depth,
+ 'were_all_requests_http_1_1': int(self._were_all_requests_http_1_1),
+ }
+ body = ','.join(['%s:%s' % (k, v) for k, v in results.items()])
+ result += self._BuildResponse(
+ '200 OK',
+ ['Content-Length: %s' % len(body), 'Cache-Control: no-store'], body)
+ self._processed_end = True
+
+ else:
+ result += self._BuildResponse('404 Not Found', ['Content-Length: 7'], 'Go away')
+ if self._processed_end:
+ break
+ self._requested_paths = []
+ return result
+
+ def WriteError(self, status, error):
+ """Returns an HTTP response for the specified error.
+
+ Args:
+ status: (String) Response code and descrtion (e.g. "404 Not Found")
+
+ Returns:
+ (String) Text of HTTP response.
+ """
+ return self._BuildResponse(
+ status, ['Connection: close', 'Content-Type: text/plain'], error)
+
+ @property
+ def processed_end(self):
+ return self._processed_end
+
+ def _BuildResponse(self, status, headers, body):
+ """Builds an HTTP response.
+
+ Args:
+ status: (String) Response code and descrtion (e.g. "200 OK")
+ headers: (List of Strings) Headers (e.g. "Connection: close")
+ body: (String) Response body.
+
+ Returns:
+ (String) Text of HTTP response.
+ """
+ return ('HTTP/1.1 %s\r\n'
+ '%s\r\n'
+ '\r\n'
+ '%s' % (status, '\r\n'.join(headers), body))
+
+
+class PipelineRequestHandler(SocketServer.BaseRequestHandler):
+ """Called on an incoming TCP connection."""
+
+ def _GetTimeUntilTimeout(self):
+ return self._start_time + TIMEOUT - time.time()
+
+ def _GetTimeUntilNextSend(self):
+ if not self._last_queued_time:
+ return TIMEOUT
+ return self._last_queued_time + SEND_BUFFER_TIME - time.time()
+
+ def handle(self):
+ self._request_parser = RequestParser()
+ self._response_builder = ResponseBuilder()
+ self._last_queued_time = 0
+ self._num_queued = 0
+ self._num_written = 0
+ self._send_buffer = ""
+ self._start_time = time.time()
+ try:
+ poller = select.epoll(sizehint=1)
+ poller.register(self.request.fileno(), select.EPOLLIN)
+ while not self._response_builder.processed_end or self._send_buffer:
+
+ time_left = self._GetTimeUntilTimeout()
+ time_until_next_send = self._GetTimeUntilNextSend()
+ max_poll_time = min(time_left, time_until_next_send) + MIN_POLL_TIME
+
+ events = None
+ if max_poll_time > 0:
+ if self._send_buffer:
+ poller.modify(self.request.fileno(),
+ select.EPOLLIN | select.EPOLLOUT)
+ else:
+ poller.modify(self.request.fileno(), select.EPOLLIN)
+ events = poller.poll(timeout=max_poll_time)
+
+ if self._GetTimeUntilTimeout() <= 0:
+ return
+
+ if self._GetTimeUntilNextSend() <= 0:
+ self._send_buffer += self._response_builder.BuildResponses()
+ self._num_written = self._num_queued
+ self._last_queued_time = 0
+
+ for fd, mode in events:
+ if mode & select.EPOLLIN:
+ new_data = self.request.recv(MAX_REQUEST_SIZE, socket.MSG_DONTWAIT)
+ if not new_data:
+ return
+ new_requests = self._request_parser.ParseAdditionalData(new_data)
+ self._response_builder.QueueRequests(
+ new_requests, self._request_parser.were_all_requests_http_1_1)
+ self._num_queued += len(new_requests)
+ self._last_queued_time = time.time()
+ elif mode & select.EPOLLOUT:
+ num_bytes_sent = self.request.send(self._send_buffer[0:4096])
+ self._send_buffer = self._send_buffer[num_bytes_sent:]
+ time.sleep(0.05)
+ else:
+ return
+
+ except RequestTooLargeError as e:
+ self.request.send(self._response_builder.WriteError(
+ '413 Request Entity Too Large', e))
+ raise
+ except UnexpectedMethodError as e:
+ self.request.send(self._response_builder.WriteError(
+ '405 Method Not Allowed', e))
+ raise
+ except ServeIndexError:
+ self.request.send(self._response_builder.WriteError(
+ '200 OK', INFO_MESSAGE))
+ except Exception as e:
+ print e
+ self.request.close()
+
+
+class PipelineServer(SocketServer.ForkingMixIn, SocketServer.TCPServer):
+ pass
+
+
+parser = argparse.ArgumentParser()
+parser.add_argument("--port", action="store", default=0,
+ type=int, help="port to listen on")
+parser.add_argument("--verbose", action="store", default=0,
+ type=int, help="verbose output")
+parser.add_argument("--pidfile", action="store", default=0,
+ help="file name for the PID")
+parser.add_argument("--logfile", action="store", default=0,
+ help="file name for the log")
+parser.add_argument("--srcdir", action="store", default=0,
+ help="test directory")
+parser.add_argument("--id", action="store", default=0,
+ help="server ID")
+parser.add_argument("--ipv4", action="store_true", default=0,
+ help="IPv4 flag")
+args = parser.parse_args()
+
+if args.pidfile:
+ pid = os.getpid()
+ f = open(args.pidfile, 'w')
+ f.write('{}'.format(pid))
+ f.close()
+
+server = PipelineServer(('0.0.0.0', args.port), PipelineRequestHandler)
+server.allow_reuse_address = True
+server.serve_forever()