diff --git a/integration-tests/http1-basic/http1-test.d b/integration-tests/http1-basic/http1-test.d index 7ef69a8..49a41d9 100755 --- a/integration-tests/http1-basic/http1-test.d +++ b/integration-tests/http1-basic/http1-test.d @@ -24,7 +24,7 @@ int main() { .build(); configureLoggingProvider(loggingProvider); - HttpTransport transport = new Http1Transport(HttpRequestHandler.of(&handleRequest)); + HttpTransport transport = new TaskPoolHttp1Transport(HttpRequestHandler.of(&handleRequest)); Thread thread = transport.startInNewThread(); scope(exit) { transport.stop(); diff --git a/integration-tests/http1-speed-test/.gitignore b/integration-tests/http1-speed-test/.gitignore new file mode 100644 index 0000000..99a2287 --- /dev/null +++ b/integration-tests/http1-speed-test/.gitignore @@ -0,0 +1 @@ +http1-speed-test diff --git a/integration-tests/http1-speed-test/http1-speed-test.d b/integration-tests/http1-speed-test/http1-speed-test.d old mode 100644 new mode 100755 index 5568b61..7ce36d1 --- a/integration-tests/http1-speed-test/http1-speed-test.d +++ b/integration-tests/http1-speed-test/http1-speed-test.d @@ -1,6 +1,28 @@ +/+ dub.sdl: + dependency "handy-http-transport" path="../../" ++/ module integration_tests.http1_speed_test; +import handy_http_primitives; +import handy_http_transport; +import slf4d; +import slf4d.default_provider; void main() { - + auto loggingProvider = DefaultProvider.builder() + .withRootLoggingLevel(Levels.ERROR) + .withConsoleSerializer(true, 48) + .build(); + configureLoggingProvider(loggingProvider); + HttpTransport transport; + transport = new TaskPoolHttp1Transport(HttpRequestHandler.of( + (ref ServerHttpRequest request, ref ServerHttpResponse response) { + if (request.method == HttpMethod.DELETE) { + transport.stop(); + } + response.headers.add("Content-Type", "text/plain"); + response.headers.add("Content-Length", "13"); + response.outputStream.writeToStream(cast(ubyte[]) "Hello, world!"); + })); + transport.start(); } diff --git a/integration-tests/run_all.sh b/integration-tests/run_all.sh index 6934c10..f72b5b4 100755 --- a/integration-tests/run_all.sh +++ b/integration-tests/run_all.sh @@ -7,5 +7,8 @@ set -e -o pipefail cd http1-basic dub build --single http1-test.d ./http1-test - cd .. + +# cd http1-speed-test +# dub build --single --build=release http1-speed-test.d + diff --git a/source/handy_http_transport/http1/epoll.d b/source/handy_http_transport/http1/epoll.d new file mode 100644 index 0000000..8d8d547 --- /dev/null +++ b/source/handy_http_transport/http1/epoll.d @@ -0,0 +1,125 @@ +module handy_http_transport.http1.epoll; + +import core.sys.posix.sys.socket; +import core.sys.linux.epoll; +import core.sys.posix.netinet.in_; +import core.sys.posix.fcntl; +import core.sys.posix.unistd; + +import core.stdc.errno; + +extern(C) { + int accept4(int sockfd, sockaddr *addr, socklen_t *addrlen, int flags); +} + +import handy_http_transport.interfaces; +import slf4d; + +class Http1EpollTransport : HttpTransport { + private immutable ushort port; + + this(ushort port) { + this.port = port; + } + + void start() { + + // Create the server socket. + enum SOCK_NONBLOCK = 0x4000; + int listenFd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0); + sockaddr_in serverAddress; + serverAddress.sin_family = AF_INET; + serverAddress.sin_port = htons(port); + serverAddress.sin_addr.s_addr = INADDR_ANY; + + if (bind(listenFd, cast(sockaddr*) &serverAddress, serverAddress.sizeof) == -1) { + errorF!"Failed to bind socket: %d"(errno); + close(listenFd); + return; + } + + if (listen(listenFd, SOMAXCONN) == -1) { + errorF!"Failed to listen on socket: %d"(errno); + close(listenFd); + return; + } + + + int epollFd = epoll_create1(0); + if (epollFd == -1) { + errorF!"Failed to create epoll instance: %d"(errno); + return; + } + + epoll_event event; + epoll_event[64] events; + event.events = EPOLLIN | EPOLLET; + event.data.fd = listenFd; + if (epoll_ctl(epollFd, EPOLL_CTL_ADD, listenFd, &event) == -1) { + errorF!"Failed to add listen socket to epoll: %d"(errno); + close(listenFd); + close(epollFd); + return; + } + + infoF!"Server listening on port %d."(port); + + while (true) { + int eventCount = epoll_wait(epollFd, &event, 64, 5000); + if (eventCount == -1) { + errorF!"Epoll wait failed: %d"(errno); + break; + } + + for (int i = 0; i < eventCount; i++) { + if (events[i].data.fd == listenFd) { + // New incoming connection. + while (true) { + sockaddr_in clientAddress; + socklen_t clientAddressLength = clientAddress.sizeof; + int clientFd = accept4( + listenFd, + cast(sockaddr*) &clientAddress, + &clientAddressLength, + SOCK_NONBLOCK + ); + if (clientFd == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + // No more connections to accept. + break; + } else { + errorF!"Failed to accept connection: %d"(errno); + break; + } + } + + // Add the client socket to epoll's listening list. + event.events = EPOLLIN | EPOLLET; + event.data.fd = clientFd; + if (epoll_ctl(epollFd, EPOLL_CTL_ADD, clientFd, &event) == -1) { + errorF!"Failed to add client socket to epoll: %d"(errno); + close(clientFd); + } + + infoF!"Accepted new connection from %s:%d."( + inet_ntoa(clientAddress.sin_addr), + ntohs(clientAddress.sin_port) + ); + } + } else { + // Event on an existing client socket. + + int clientFd = events[i].data.fd; + + if (events[i].events & EPOLLIN) { + + } + } + } + } + } + + void stop() { + + } +} \ No newline at end of file diff --git a/source/handy_http_transport/http1/package.d b/source/handy_http_transport/http1/package.d index 728f598..5118177 100644 --- a/source/handy_http_transport/http1/package.d +++ b/source/handy_http_transport/http1/package.d @@ -4,3 +4,4 @@ module handy_http_transport.http1; public import handy_http_transport.http1.transport; +public import handy_http_transport.http1.task_pool; diff --git a/source/handy_http_transport/http1/task_pool.d b/source/handy_http_transport/http1/task_pool.d new file mode 100644 index 0000000..6cf9035 --- /dev/null +++ b/source/handy_http_transport/http1/task_pool.d @@ -0,0 +1,49 @@ +module handy_http_transport.http1.task_pool; + +import std.socket; +import std.parallelism; + +import handy_http_transport.http1.transport; +import handy_http_primitives; +import slf4d; + +/** + * An implementation of Http1Transport which uses D's standard library + * parallelization, where each incoming client request is turned into a task + * and submitted to the standard task pool. + */ +class TaskPoolHttp1Transport : Http1Transport { + this(HttpRequestHandler requestHandler, ushort port = 8080) { + super(requestHandler, port); + } + + override void runServer() { + Socket serverSocket = new TcpSocket(); + serverSocket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1); + serverSocket.bind(new InternetAddress("127.0.0.1", port)); + serverSocket.listen(1024); + + while (super.isRunning) { + try { + Socket clientSocket = serverSocket.accept(); + auto t = task!handleClient(clientSocket, requestHandler); + taskPool().put(t); + } catch (SocketAcceptException e) { + warn("Failed to accept socket connection.", e); + } + } + serverSocket.close(); + } + + override void stop() { + super.stop(); + // Send a dummy request to cause the server's blocking accept() call to end. + try { + Socket dummySocket = new TcpSocket(new InternetAddress("127.0.0.1", port)); + dummySocket.shutdown(SocketShutdown.BOTH); + dummySocket.close(); + } catch (SocketOSException e) { + warn("Failed to send empty request to stop server.", e); + } + } +} \ No newline at end of file diff --git a/source/handy_http_transport/http1/transport.d b/source/handy_http_transport/http1/transport.d index 176cea2..7435492 100644 --- a/source/handy_http_transport/http1/transport.d +++ b/source/handy_http_transport/http1/transport.d @@ -1,6 +1,7 @@ module handy_http_transport.http1.transport; import std.socket; +import core.atomic : atomicStore, atomicLoad; import handy_http_transport.interfaces; import handy_http_transport.helpers; @@ -14,73 +15,33 @@ import slf4d; import photon; /** - * The HTTP/1.1 transport protocol implementation, using Dimitry Olshansky's - * Photon fiber scheduling library for concurrency. + * Base class for HTTP/1.1 transport, where different subclasses can define + * how the actual socket communication works (threadpool / epoll/ etc). */ -class Http1Transport : HttpTransport { - private Socket serverSocket; - private HttpRequestHandler requestHandler; - private const ushort port; +abstract class Http1Transport : HttpTransport { + protected HttpRequestHandler requestHandler; + protected immutable ushort port; private bool running = false; - /** - * Constructs a new Http1Transport server instance. - * Params: - * requestHandler = The request handler to use for all requests. - * port = The port to bind to. - */ this(HttpRequestHandler requestHandler, ushort port = 8080) { assert(requestHandler !is null); - this.serverSocket = new TcpSocket(); this.requestHandler = requestHandler; this.port = port; } - /** - * Starts the server. Internally, this starts the Photon event loop and - * accepts incoming connections in a separate fiber. Then, clients are - * handled in their own separate fiber (think "coroutine"). - */ + bool isRunning() { + return atomicLoad(running); + } + void start() { - debugF!"Starting server on port %d."(port); - startloop(); - go(() => runServer()); - runFibers(); + atomicStore(running, true); + runServer(); } - /** - * Stops the server. This will mark the server as no longer running, so - * no more connections will be accepted. - */ + protected abstract void runServer(); + void stop() { - debugF!"Stopping server on port %d."(port); - this.running = false; - // Send a dummy request to cause the server's blocking accept() call to end. - try { - Socket dummySocket = new TcpSocket(this.serverSocket.localAddress()); - dummySocket.shutdown(SocketShutdown.BOTH); - dummySocket.close(); - } catch (SocketOSException e) { - warn("Failed to send empty request to stop server.", e); - } - } - - private void runServer() { - this.running = true; - serverSocket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1); - serverSocket.bind(new InternetAddress("127.0.0.1", port)); - traceF!"Bound to %s."(serverSocket.localAddress().toString()); - serverSocket.listen(100); - while (running) { - try { - Socket clientSocket = serverSocket.accept(); - go(() => handleClient(clientSocket, requestHandler)); - } catch (SocketAcceptException e) { - warn("Failed to accept socket connection.", e); - } - } - this.serverSocket.shutdown(SocketShutdown.BOTH); - this.serverSocket.close(); + atomicStore(running, false); } }