Added integration tests, simpler implementation which uses task pool.
This commit is contained in:
parent
d1263a5991
commit
fd42b11c8b
|
@ -24,7 +24,7 @@ int main() {
|
||||||
.build();
|
.build();
|
||||||
configureLoggingProvider(loggingProvider);
|
configureLoggingProvider(loggingProvider);
|
||||||
|
|
||||||
HttpTransport transport = new Http1Transport(HttpRequestHandler.of(&handleRequest));
|
HttpTransport transport = new TaskPoolHttp1Transport(HttpRequestHandler.of(&handleRequest));
|
||||||
Thread thread = transport.startInNewThread();
|
Thread thread = transport.startInNewThread();
|
||||||
scope(exit) {
|
scope(exit) {
|
||||||
transport.stop();
|
transport.stop();
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
http1-speed-test
|
|
@ -1,6 +1,28 @@
|
||||||
|
/+ dub.sdl:
|
||||||
|
dependency "handy-http-transport" path="../../"
|
||||||
|
+/
|
||||||
module integration_tests.http1_speed_test;
|
module integration_tests.http1_speed_test;
|
||||||
|
|
||||||
|
import handy_http_primitives;
|
||||||
|
import handy_http_transport;
|
||||||
|
import slf4d;
|
||||||
|
import slf4d.default_provider;
|
||||||
|
|
||||||
void main() {
|
void main() {
|
||||||
|
auto loggingProvider = DefaultProvider.builder()
|
||||||
|
.withRootLoggingLevel(Levels.ERROR)
|
||||||
|
.withConsoleSerializer(true, 48)
|
||||||
|
.build();
|
||||||
|
configureLoggingProvider(loggingProvider);
|
||||||
|
HttpTransport transport;
|
||||||
|
transport = new TaskPoolHttp1Transport(HttpRequestHandler.of(
|
||||||
|
(ref ServerHttpRequest request, ref ServerHttpResponse response) {
|
||||||
|
if (request.method == HttpMethod.DELETE) {
|
||||||
|
transport.stop();
|
||||||
|
}
|
||||||
|
response.headers.add("Content-Type", "text/plain");
|
||||||
|
response.headers.add("Content-Length", "13");
|
||||||
|
response.outputStream.writeToStream(cast(ubyte[]) "Hello, world!");
|
||||||
|
}));
|
||||||
|
transport.start();
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,5 +7,8 @@ set -e -o pipefail
|
||||||
cd http1-basic
|
cd http1-basic
|
||||||
dub build --single http1-test.d
|
dub build --single http1-test.d
|
||||||
./http1-test
|
./http1-test
|
||||||
|
|
||||||
cd ..
|
cd ..
|
||||||
|
|
||||||
|
# cd http1-speed-test
|
||||||
|
# dub build --single --build=release http1-speed-test.d
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,125 @@
|
||||||
|
module handy_http_transport.http1.epoll;
|
||||||
|
|
||||||
|
import core.sys.posix.sys.socket;
|
||||||
|
import core.sys.linux.epoll;
|
||||||
|
import core.sys.posix.netinet.in_;
|
||||||
|
import core.sys.posix.fcntl;
|
||||||
|
import core.sys.posix.unistd;
|
||||||
|
|
||||||
|
import core.stdc.errno;
|
||||||
|
|
||||||
|
extern(C) {
|
||||||
|
int accept4(int sockfd, sockaddr *addr, socklen_t *addrlen, int flags);
|
||||||
|
}
|
||||||
|
|
||||||
|
import handy_http_transport.interfaces;
|
||||||
|
import slf4d;
|
||||||
|
|
||||||
|
class Http1EpollTransport : HttpTransport {
|
||||||
|
private immutable ushort port;
|
||||||
|
|
||||||
|
this(ushort port) {
|
||||||
|
this.port = port;
|
||||||
|
}
|
||||||
|
|
||||||
|
void start() {
|
||||||
|
|
||||||
|
// Create the server socket.
|
||||||
|
enum SOCK_NONBLOCK = 0x4000;
|
||||||
|
int listenFd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
|
||||||
|
sockaddr_in serverAddress;
|
||||||
|
serverAddress.sin_family = AF_INET;
|
||||||
|
serverAddress.sin_port = htons(port);
|
||||||
|
serverAddress.sin_addr.s_addr = INADDR_ANY;
|
||||||
|
|
||||||
|
if (bind(listenFd, cast(sockaddr*) &serverAddress, serverAddress.sizeof) == -1) {
|
||||||
|
errorF!"Failed to bind socket: %d"(errno);
|
||||||
|
close(listenFd);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (listen(listenFd, SOMAXCONN) == -1) {
|
||||||
|
errorF!"Failed to listen on socket: %d"(errno);
|
||||||
|
close(listenFd);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int epollFd = epoll_create1(0);
|
||||||
|
if (epollFd == -1) {
|
||||||
|
errorF!"Failed to create epoll instance: %d"(errno);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
epoll_event event;
|
||||||
|
epoll_event[64] events;
|
||||||
|
event.events = EPOLLIN | EPOLLET;
|
||||||
|
event.data.fd = listenFd;
|
||||||
|
if (epoll_ctl(epollFd, EPOLL_CTL_ADD, listenFd, &event) == -1) {
|
||||||
|
errorF!"Failed to add listen socket to epoll: %d"(errno);
|
||||||
|
close(listenFd);
|
||||||
|
close(epollFd);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
infoF!"Server listening on port %d."(port);
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
int eventCount = epoll_wait(epollFd, &event, 64, 5000);
|
||||||
|
if (eventCount == -1) {
|
||||||
|
errorF!"Epoll wait failed: %d"(errno);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < eventCount; i++) {
|
||||||
|
if (events[i].data.fd == listenFd) {
|
||||||
|
// New incoming connection.
|
||||||
|
while (true) {
|
||||||
|
sockaddr_in clientAddress;
|
||||||
|
socklen_t clientAddressLength = clientAddress.sizeof;
|
||||||
|
int clientFd = accept4(
|
||||||
|
listenFd,
|
||||||
|
cast(sockaddr*) &clientAddress,
|
||||||
|
&clientAddressLength,
|
||||||
|
SOCK_NONBLOCK
|
||||||
|
);
|
||||||
|
if (clientFd == -1) {
|
||||||
|
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
||||||
|
// No more connections to accept.
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
errorF!"Failed to accept connection: %d"(errno);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the client socket to epoll's listening list.
|
||||||
|
event.events = EPOLLIN | EPOLLET;
|
||||||
|
event.data.fd = clientFd;
|
||||||
|
if (epoll_ctl(epollFd, EPOLL_CTL_ADD, clientFd, &event) == -1) {
|
||||||
|
errorF!"Failed to add client socket to epoll: %d"(errno);
|
||||||
|
close(clientFd);
|
||||||
|
}
|
||||||
|
|
||||||
|
infoF!"Accepted new connection from %s:%d."(
|
||||||
|
inet_ntoa(clientAddress.sin_addr),
|
||||||
|
ntohs(clientAddress.sin_port)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Event on an existing client socket.
|
||||||
|
|
||||||
|
int clientFd = events[i].data.fd;
|
||||||
|
|
||||||
|
if (events[i].events & EPOLLIN) {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void stop() {
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -4,3 +4,4 @@
|
||||||
module handy_http_transport.http1;
|
module handy_http_transport.http1;
|
||||||
|
|
||||||
public import handy_http_transport.http1.transport;
|
public import handy_http_transport.http1.transport;
|
||||||
|
public import handy_http_transport.http1.task_pool;
|
||||||
|
|
|
@ -0,0 +1,49 @@
|
||||||
|
module handy_http_transport.http1.task_pool;
|
||||||
|
|
||||||
|
import std.socket;
|
||||||
|
import std.parallelism;
|
||||||
|
|
||||||
|
import handy_http_transport.http1.transport;
|
||||||
|
import handy_http_primitives;
|
||||||
|
import slf4d;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An implementation of Http1Transport which uses D's standard library
|
||||||
|
* parallelization, where each incoming client request is turned into a task
|
||||||
|
* and submitted to the standard task pool.
|
||||||
|
*/
|
||||||
|
class TaskPoolHttp1Transport : Http1Transport {
|
||||||
|
this(HttpRequestHandler requestHandler, ushort port = 8080) {
|
||||||
|
super(requestHandler, port);
|
||||||
|
}
|
||||||
|
|
||||||
|
override void runServer() {
|
||||||
|
Socket serverSocket = new TcpSocket();
|
||||||
|
serverSocket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1);
|
||||||
|
serverSocket.bind(new InternetAddress("127.0.0.1", port));
|
||||||
|
serverSocket.listen(1024);
|
||||||
|
|
||||||
|
while (super.isRunning) {
|
||||||
|
try {
|
||||||
|
Socket clientSocket = serverSocket.accept();
|
||||||
|
auto t = task!handleClient(clientSocket, requestHandler);
|
||||||
|
taskPool().put(t);
|
||||||
|
} catch (SocketAcceptException e) {
|
||||||
|
warn("Failed to accept socket connection.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
serverSocket.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
override void stop() {
|
||||||
|
super.stop();
|
||||||
|
// Send a dummy request to cause the server's blocking accept() call to end.
|
||||||
|
try {
|
||||||
|
Socket dummySocket = new TcpSocket(new InternetAddress("127.0.0.1", port));
|
||||||
|
dummySocket.shutdown(SocketShutdown.BOTH);
|
||||||
|
dummySocket.close();
|
||||||
|
} catch (SocketOSException e) {
|
||||||
|
warn("Failed to send empty request to stop server.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,6 +1,7 @@
|
||||||
module handy_http_transport.http1.transport;
|
module handy_http_transport.http1.transport;
|
||||||
|
|
||||||
import std.socket;
|
import std.socket;
|
||||||
|
import core.atomic : atomicStore, atomicLoad;
|
||||||
|
|
||||||
import handy_http_transport.interfaces;
|
import handy_http_transport.interfaces;
|
||||||
import handy_http_transport.helpers;
|
import handy_http_transport.helpers;
|
||||||
|
@ -14,73 +15,33 @@ import slf4d;
|
||||||
import photon;
|
import photon;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The HTTP/1.1 transport protocol implementation, using Dimitry Olshansky's
|
* Base class for HTTP/1.1 transport, where different subclasses can define
|
||||||
* Photon fiber scheduling library for concurrency.
|
* how the actual socket communication works (threadpool / epoll/ etc).
|
||||||
*/
|
*/
|
||||||
class Http1Transport : HttpTransport {
|
abstract class Http1Transport : HttpTransport {
|
||||||
private Socket serverSocket;
|
protected HttpRequestHandler requestHandler;
|
||||||
private HttpRequestHandler requestHandler;
|
protected immutable ushort port;
|
||||||
private const ushort port;
|
|
||||||
private bool running = false;
|
private bool running = false;
|
||||||
|
|
||||||
/**
|
|
||||||
* Constructs a new Http1Transport server instance.
|
|
||||||
* Params:
|
|
||||||
* requestHandler = The request handler to use for all requests.
|
|
||||||
* port = The port to bind to.
|
|
||||||
*/
|
|
||||||
this(HttpRequestHandler requestHandler, ushort port = 8080) {
|
this(HttpRequestHandler requestHandler, ushort port = 8080) {
|
||||||
assert(requestHandler !is null);
|
assert(requestHandler !is null);
|
||||||
this.serverSocket = new TcpSocket();
|
|
||||||
this.requestHandler = requestHandler;
|
this.requestHandler = requestHandler;
|
||||||
this.port = port;
|
this.port = port;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
bool isRunning() {
|
||||||
* Starts the server. Internally, this starts the Photon event loop and
|
return atomicLoad(running);
|
||||||
* accepts incoming connections in a separate fiber. Then, clients are
|
}
|
||||||
* handled in their own separate fiber (think "coroutine").
|
|
||||||
*/
|
|
||||||
void start() {
|
void start() {
|
||||||
debugF!"Starting server on port %d."(port);
|
atomicStore(running, true);
|
||||||
startloop();
|
runServer();
|
||||||
go(() => runServer());
|
|
||||||
runFibers();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
protected abstract void runServer();
|
||||||
* Stops the server. This will mark the server as no longer running, so
|
|
||||||
* no more connections will be accepted.
|
|
||||||
*/
|
|
||||||
void stop() {
|
void stop() {
|
||||||
debugF!"Stopping server on port %d."(port);
|
atomicStore(running, false);
|
||||||
this.running = false;
|
|
||||||
// Send a dummy request to cause the server's blocking accept() call to end.
|
|
||||||
try {
|
|
||||||
Socket dummySocket = new TcpSocket(this.serverSocket.localAddress());
|
|
||||||
dummySocket.shutdown(SocketShutdown.BOTH);
|
|
||||||
dummySocket.close();
|
|
||||||
} catch (SocketOSException e) {
|
|
||||||
warn("Failed to send empty request to stop server.", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void runServer() {
|
|
||||||
this.running = true;
|
|
||||||
serverSocket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1);
|
|
||||||
serverSocket.bind(new InternetAddress("127.0.0.1", port));
|
|
||||||
traceF!"Bound to %s."(serverSocket.localAddress().toString());
|
|
||||||
serverSocket.listen(100);
|
|
||||||
while (running) {
|
|
||||||
try {
|
|
||||||
Socket clientSocket = serverSocket.accept();
|
|
||||||
go(() => handleClient(clientSocket, requestHandler));
|
|
||||||
} catch (SocketAcceptException e) {
|
|
||||||
warn("Failed to accept socket connection.", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
this.serverSocket.shutdown(SocketShutdown.BOTH);
|
|
||||||
this.serverSocket.close();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue