Compare commits
9 Commits
Author | SHA1 | Date |
---|---|---|
|
18851ac786 | |
|
eb43a1be78 | |
|
de96a4a45b | |
|
08bb1b58af | |
|
602667879f | |
|
ae1194e159 | |
|
fd42b11c8b | |
|
d1263a5991 | |
|
d5a09c0421 |
.gitea/workflows
README.mddub.jsondub.selections.jsonintegration-tests
source/handy_http_transport/http1
|
@ -28,5 +28,5 @@ jobs:
|
|||
compiler: ldc-latest
|
||||
|
||||
- name: http1-test
|
||||
working-directory: integration-tests
|
||||
working-directory: integration-tests/http1-basic
|
||||
run: dub run --single http1-test.d
|
||||
|
|
|
@ -10,7 +10,7 @@ implemented so far.
|
|||
|
||||
## HTTP/1.1
|
||||
|
||||
Use the `Http1Transport` implementation of `HttpTransport` to serve content
|
||||
Use the `TaskPoolHttp1Transport` implementation of `HttpTransport` to serve content
|
||||
using the HTTP/1.1 protocol. See the example below:
|
||||
|
||||
```d
|
||||
|
@ -26,7 +26,7 @@ class MyHandler : HttpRequestHandler {
|
|||
}
|
||||
|
||||
void main() {
|
||||
HttpTransport tp = new Http1Transport(new MyHandler(), 8080);
|
||||
HttpTransport tp = new TaskPoolHttp1Transport(new MyHandler(), 8080);
|
||||
tp.start();
|
||||
}
|
||||
```
|
3
dub.json
3
dub.json
|
@ -4,8 +4,7 @@
|
|||
],
|
||||
"copyright": "Copyright © 2024, Andrew Lalis",
|
||||
"dependencies": {
|
||||
"handy-http-primitives": "~>1.6",
|
||||
"photon": "~>0.11",
|
||||
"handy-http-primitives": "~>1.8",
|
||||
"streams": "~>3.6",
|
||||
"slf4d": "~>4.0"
|
||||
},
|
||||
|
|
|
@ -1,10 +1,8 @@
|
|||
{
|
||||
"fileVersion": 1,
|
||||
"versions": {
|
||||
"handy-http-primitives": "1.6.0",
|
||||
"photon": "0.11.0",
|
||||
"sharded-map": "2.7.0",
|
||||
"slf4d": "4.0.1",
|
||||
"handy-http-primitives": "1.8.0",
|
||||
"slf4d": "4.1.1",
|
||||
"streams": "3.6.0"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/+ dub.sdl:
|
||||
dependency "handy-http-transport" path="../"
|
||||
dependency "handy-http-transport" path="../../"
|
||||
dependency "requests" version="~>2.1"
|
||||
+/
|
||||
|
||||
|
@ -18,14 +18,13 @@ import requests;
|
|||
import core.thread;
|
||||
|
||||
int main() {
|
||||
auto loggingProvider = new shared DefaultProvider(true, Levels.INFO);
|
||||
auto loggingProvider = DefaultProvider.builder()
|
||||
.withRootLoggingLevel(Levels.INFO)
|
||||
.withConsoleSerializer(true, 48)
|
||||
.build();
|
||||
configureLoggingProvider(loggingProvider);
|
||||
|
||||
HttpTransport transport = new Http1Transport(HttpRequestHandler.of((ref ServerHttpRequest request, ref ServerHttpResponse response) {
|
||||
response.headers.add("Content-Type", "text/plain");
|
||||
response.headers.add("Content-Length", "13");
|
||||
response.outputStream.writeToStream(cast(ubyte[]) "Hello, world!");
|
||||
}));
|
||||
HttpTransport transport = new TaskPoolHttp1Transport(HttpRequestHandler.of(&handleRequest));
|
||||
Thread thread = transport.startInNewThread();
|
||||
scope(exit) {
|
||||
transport.stop();
|
||||
|
@ -35,6 +34,7 @@ int main() {
|
|||
Thread.sleep(msecs(100)); // Wait for the server to start.
|
||||
|
||||
// Send a simple GET request to the server.
|
||||
info("Sending GET request to http://localhost:8080");
|
||||
auto content = getContent("http://localhost:8080");
|
||||
ubyte[] data = content.data;
|
||||
if (data.length != 13 || (cast(string) data) != "Hello, world!") {
|
||||
|
@ -45,3 +45,9 @@ int main() {
|
|||
info("Test completed successfully.");
|
||||
return 0;
|
||||
}
|
||||
|
||||
void handleRequest(ref ServerHttpRequest request, ref ServerHttpResponse response) {
|
||||
response.headers.add("Content-Type", "text/plain");
|
||||
response.headers.add("Content-Length", "13");
|
||||
response.outputStream.writeToStream(cast(ubyte[]) "Hello, world!");
|
||||
}
|
|
@ -0,0 +1 @@
|
|||
http1-speed-test
|
|
@ -0,0 +1,28 @@
|
|||
/+ dub.sdl:
|
||||
dependency "handy-http-transport" path="../../"
|
||||
+/
|
||||
module integration_tests.http1_speed_test;
|
||||
|
||||
import handy_http_primitives;
|
||||
import handy_http_transport;
|
||||
import slf4d;
|
||||
import slf4d.default_provider;
|
||||
|
||||
void main() {
|
||||
auto loggingProvider = DefaultProvider.builder()
|
||||
.withRootLoggingLevel(Levels.ERROR)
|
||||
.withConsoleSerializer(true, 48)
|
||||
.build();
|
||||
configureLoggingProvider(loggingProvider);
|
||||
HttpTransport transport;
|
||||
transport = new TaskPoolHttp1Transport(HttpRequestHandler.of(
|
||||
(ref ServerHttpRequest request, ref ServerHttpResponse response) {
|
||||
if (request.method == HttpMethod.DELETE) {
|
||||
transport.stop();
|
||||
}
|
||||
response.headers.add("Content-Type", "text/plain");
|
||||
response.headers.add("Content-Length", "13");
|
||||
response.outputStream.writeToStream(cast(ubyte[]) "Hello, world!");
|
||||
}));
|
||||
transport.start();
|
||||
}
|
|
@ -0,0 +1,14 @@
|
|||
#!/usr/bin/env bash
|
||||
|
||||
# This script runs all integration tests.
|
||||
|
||||
set -e -o pipefail
|
||||
|
||||
cd http1-basic
|
||||
dub build --single http1-test.d
|
||||
./http1-test
|
||||
cd ..
|
||||
|
||||
# cd http1-speed-test
|
||||
# dub build --single --build=release http1-speed-test.d
|
||||
|
|
@ -0,0 +1,122 @@
|
|||
module handy_http_transport.http1.epoll;
|
||||
|
||||
import core.sys.posix.sys.socket;
|
||||
import core.sys.linux.epoll;
|
||||
import core.sys.posix.netinet.in_;
|
||||
import core.sys.posix.fcntl;
|
||||
import core.sys.posix.unistd;
|
||||
|
||||
import core.stdc.errno;
|
||||
|
||||
extern(C) {
|
||||
int accept4(int sockfd, sockaddr *addr, socklen_t *addrlen, int flags);
|
||||
}
|
||||
|
||||
import handy_http_transport.interfaces;
|
||||
import handy_http_transport.http1;
|
||||
import handy_http_primitives;
|
||||
import slf4d;
|
||||
|
||||
class Http1EpollTransport : Http1Transport {
|
||||
|
||||
this(HttpRequestHandler requestHandler, ushort port) {
|
||||
super(requestHandler, port);
|
||||
}
|
||||
|
||||
override void start() {
|
||||
super.start();
|
||||
// Create the server socket.
|
||||
enum SOCK_NONBLOCK = 0x4000;
|
||||
int listenFd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
|
||||
sockaddr_in serverAddress;
|
||||
serverAddress.sin_family = AF_INET;
|
||||
serverAddress.sin_port = htons(port);
|
||||
serverAddress.sin_addr.s_addr = INADDR_ANY;
|
||||
|
||||
if (bind(listenFd, cast(sockaddr*) &serverAddress, serverAddress.sizeof) == -1) {
|
||||
errorF!"Failed to bind socket: %d"(errno);
|
||||
close(listenFd);
|
||||
return;
|
||||
}
|
||||
|
||||
if (listen(listenFd, SOMAXCONN) == -1) {
|
||||
errorF!"Failed to listen on socket: %d"(errno);
|
||||
close(listenFd);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
int epollFd = epoll_create1(0);
|
||||
if (epollFd == -1) {
|
||||
errorF!"Failed to create epoll instance: %d"(errno);
|
||||
return;
|
||||
}
|
||||
|
||||
epoll_event event;
|
||||
epoll_event[64] events;
|
||||
event.events = EPOLLIN | EPOLLET;
|
||||
event.data.fd = listenFd;
|
||||
if (epoll_ctl(epollFd, EPOLL_CTL_ADD, listenFd, &event) == -1) {
|
||||
errorF!"Failed to add listen socket to epoll: %d"(errno);
|
||||
close(listenFd);
|
||||
close(epollFd);
|
||||
return;
|
||||
}
|
||||
|
||||
infoF!"Server listening on port %d."(port);
|
||||
|
||||
while (true) {
|
||||
int eventCount = epoll_wait(epollFd, &event, 64, 5000);
|
||||
if (eventCount == -1) {
|
||||
errorF!"Epoll wait failed: %d"(errno);
|
||||
break;
|
||||
}
|
||||
|
||||
for (int i = 0; i < eventCount; i++) {
|
||||
if (events[i].data.fd == listenFd) {
|
||||
// New incoming connection.
|
||||
while (true) {
|
||||
sockaddr_in clientAddress;
|
||||
socklen_t clientAddressLength = clientAddress.sizeof;
|
||||
int clientFd = accept4(
|
||||
listenFd,
|
||||
cast(sockaddr*) &clientAddress,
|
||||
&clientAddressLength,
|
||||
SOCK_NONBLOCK
|
||||
);
|
||||
if (clientFd == -1) {
|
||||
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
||||
// No more connections to accept.
|
||||
break;
|
||||
} else {
|
||||
errorF!"Failed to accept connection: %d"(errno);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Add the client socket to epoll's listening list.
|
||||
event.events = EPOLLIN | EPOLLET;
|
||||
event.data.fd = clientFd;
|
||||
if (epoll_ctl(epollFd, EPOLL_CTL_ADD, clientFd, &event) == -1) {
|
||||
errorF!"Failed to add client socket to epoll: %d"(errno);
|
||||
close(clientFd);
|
||||
}
|
||||
|
||||
infoF!"Accepted new connection from %s:%d."(
|
||||
inet_ntoa(clientAddress.sin_addr),
|
||||
ntohs(clientAddress.sin_port)
|
||||
);
|
||||
}
|
||||
} else {
|
||||
// Event on an existing client socket.
|
||||
|
||||
int clientFd = events[i].data.fd;
|
||||
|
||||
if (events[i].events & EPOLLIN) {
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -4,3 +4,4 @@
|
|||
module handy_http_transport.http1;
|
||||
|
||||
public import handy_http_transport.http1.transport;
|
||||
public import handy_http_transport.http1.task_pool;
|
||||
|
|
|
@ -0,0 +1,62 @@
|
|||
module handy_http_transport.http1.task_pool;
|
||||
|
||||
import std.socket;
|
||||
import std.parallelism;
|
||||
|
||||
import handy_http_transport.http1.transport;
|
||||
import handy_http_primitives;
|
||||
import slf4d;
|
||||
|
||||
/**
|
||||
* An implementation of Http1Transport which uses D's standard library
|
||||
* parallelization, where each incoming client request is turned into a task
|
||||
* and submitted to the standard task pool.
|
||||
*/
|
||||
class TaskPoolHttp1Transport : Http1Transport {
|
||||
this(HttpRequestHandler requestHandler, ushort port = 8080) {
|
||||
super(requestHandler, port);
|
||||
}
|
||||
|
||||
override void runServer() {
|
||||
Socket serverSocket = new TcpSocket();
|
||||
serverSocket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1);
|
||||
serverSocket.bind(new InternetAddress("127.0.0.1", port));
|
||||
serverSocket.listen(1024);
|
||||
|
||||
while (super.isRunning) {
|
||||
try {
|
||||
Socket clientSocket = serverSocket.accept();
|
||||
auto t = task!handleClient(clientSocket, requestHandler);
|
||||
taskPool().put(t);
|
||||
} catch (SocketAcceptException e) {
|
||||
warn("Failed to accept socket connection.", e);
|
||||
}
|
||||
}
|
||||
serverSocket.close();
|
||||
}
|
||||
|
||||
override void stop() {
|
||||
super.stop();
|
||||
// Send a dummy request to cause the server's blocking accept() call to end.
|
||||
try {
|
||||
Socket dummySocket = new TcpSocket(new InternetAddress("127.0.0.1", port));
|
||||
dummySocket.shutdown(SocketShutdown.BOTH);
|
||||
dummySocket.close();
|
||||
} catch (SocketOSException e) {
|
||||
warn("Failed to send empty request to stop server.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
unittest {
|
||||
import slf4d.default_provider;
|
||||
auto logProvider = DefaultProvider.builder().withRootLoggingLevel(Levels.DEBUG).build();
|
||||
configureLoggingProvider(logProvider);
|
||||
|
||||
HttpRequestHandler handler = HttpRequestHandler.of(
|
||||
(ref ServerHttpRequest request, ref ServerHttpResponse response) {
|
||||
response.status = HttpStatus.OK;
|
||||
response.writeBodyString("Testing");
|
||||
});
|
||||
testHttp1Transport(new TaskPoolHttp1Transport(handler));
|
||||
}
|
|
@ -1,6 +1,7 @@
|
|||
module handy_http_transport.http1.transport;
|
||||
|
||||
import std.socket;
|
||||
import core.atomic : atomicStore, atomicLoad;
|
||||
|
||||
import handy_http_transport.interfaces;
|
||||
import handy_http_transport.helpers;
|
||||
|
@ -11,76 +12,93 @@ import handy_http_primitives.address;
|
|||
|
||||
import streams;
|
||||
import slf4d;
|
||||
import photon;
|
||||
|
||||
/**
|
||||
* The HTTP/1.1 transport protocol implementation, using Dimitry Olshansky's
|
||||
* Photon fiber scheduling library for concurrency.
|
||||
* Base class for HTTP/1.1 transport, where different subclasses can define
|
||||
* how the actual socket communication works (threadpool / epoll/ etc).
|
||||
*/
|
||||
class Http1Transport : HttpTransport {
|
||||
private Socket serverSocket;
|
||||
private HttpRequestHandler requestHandler;
|
||||
private const ushort port;
|
||||
abstract class Http1Transport : HttpTransport {
|
||||
protected HttpRequestHandler requestHandler;
|
||||
protected immutable ushort port;
|
||||
private bool running = false;
|
||||
|
||||
/**
|
||||
* Constructs a new Http1Transport server instance.
|
||||
* Params:
|
||||
* requestHandler = The request handler to use for all requests.
|
||||
* port = The port to bind to.
|
||||
*/
|
||||
this(HttpRequestHandler requestHandler, ushort port = 8080) {
|
||||
assert(requestHandler !is null);
|
||||
this.serverSocket = new TcpSocket();
|
||||
this.requestHandler = requestHandler;
|
||||
this.port = port;
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts the server. Internally, this starts the Photon event loop and
|
||||
* accepts incoming connections in a separate fiber. Then, clients are
|
||||
* handled in their own separate fiber (think "coroutine").
|
||||
*/
|
||||
bool isRunning() {
|
||||
return atomicLoad(running);
|
||||
}
|
||||
|
||||
void start() {
|
||||
debugF!"Starting server on port %d."(port);
|
||||
startloop();
|
||||
go(() => runServer());
|
||||
runFibers();
|
||||
infoF!"Starting Http1Transport server on port %d."(port);
|
||||
atomicStore(running, true);
|
||||
runServer();
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops the server. This will mark the server as no longer running, so
|
||||
* no more connections will be accepted.
|
||||
*/
|
||||
protected abstract void runServer();
|
||||
|
||||
void stop() {
|
||||
debugF!"Stopping server on port %d."(port);
|
||||
this.running = false;
|
||||
// Send a dummy request to cause the server's blocking accept() call to end.
|
||||
try {
|
||||
Socket dummySocket = new TcpSocket(this.serverSocket.localAddress());
|
||||
dummySocket.shutdown(SocketShutdown.BOTH);
|
||||
dummySocket.close();
|
||||
} catch (SocketOSException e) {
|
||||
warn("Failed to send empty request to stop server.", e);
|
||||
}
|
||||
infoF!"Stopping Http1Transport server on port %d."(port);
|
||||
atomicStore(running, false);
|
||||
}
|
||||
}
|
||||
|
||||
private void runServer() {
|
||||
this.running = true;
|
||||
serverSocket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1);
|
||||
serverSocket.bind(new InternetAddress("127.0.0.1", port));
|
||||
traceF!"Bound to %s."(serverSocket.localAddress().toString());
|
||||
serverSocket.listen(100);
|
||||
while (running) {
|
||||
try {
|
||||
Socket clientSocket = serverSocket.accept();
|
||||
go(() => handleClient(clientSocket, requestHandler));
|
||||
} catch (SocketAcceptException e) {
|
||||
warn("Failed to accept socket connection.", e);
|
||||
version(unittest) {
|
||||
/**
|
||||
* A generic test to ensure that any Http1Transport implementation behaves
|
||||
* properly to start & stop, and process requests when running.
|
||||
*
|
||||
* It's assumed that the given transport is configured to run on localhost,
|
||||
* port 8080, and return a standard 200 OK empty response to all requests.
|
||||
* Params:
|
||||
* transport = The transport implementation to test.
|
||||
*/
|
||||
void testHttp1Transport(Http1Transport transport) {
|
||||
import core.thread;
|
||||
import std.string;
|
||||
infoF!"Testing Http1Transport implementation: %s"(transport);
|
||||
|
||||
Thread thread = transport.startInNewThread();
|
||||
Thread.sleep(msecs(100));
|
||||
|
||||
Socket clientSocket1 = new TcpSocket(new InternetAddress(8080));
|
||||
const requestBody = "POST /users HTTP/1.1\r\n" ~
|
||||
"Host: example.com\r\n" ~
|
||||
"Content-Type: text/plain\r\n" ~
|
||||
"Content-Length: 13\r\n" ~
|
||||
"\r\n" ~
|
||||
"Hello, world!";
|
||||
ptrdiff_t bytesSent = clientSocket1.send(requestBody);
|
||||
assert(bytesSent == requestBody.length, "Couldn't send the full request body to the server.");
|
||||
|
||||
ubyte[8192] buffer;
|
||||
size_t totalBytesReceived = 0;
|
||||
ptrdiff_t bytesReceived;
|
||||
do {
|
||||
bytesReceived = clientSocket1.receive(buffer[totalBytesReceived .. $]);
|
||||
if (bytesReceived == Socket.ERROR) {
|
||||
assert(false, "Socket error when attempting to receive a response from the HttpTransport server.");
|
||||
}
|
||||
}
|
||||
this.serverSocket.shutdown(SocketShutdown.BOTH);
|
||||
this.serverSocket.close();
|
||||
totalBytesReceived += bytesReceived;
|
||||
} while (bytesReceived > 0);
|
||||
|
||||
string httpResponseContent = cast(string) buffer[0 .. totalBytesReceived];
|
||||
string[] parts = httpResponseContent.split("\r\n\r\n");
|
||||
assert(parts.length > 0, "HTTP 1.1 response is missing required status and headers section.");
|
||||
string[] headerLines = parts[0].split("\r\n");
|
||||
assert(headerLines.length > 0, "HTTP 1.1 response is missing required status line.");
|
||||
string statusLine = headerLines[0];
|
||||
string[] statusLineParts = statusLine.split(" ");
|
||||
assert(statusLineParts[0] == "HTTP/1.1");
|
||||
assert(statusLineParts[1] == "200");
|
||||
assert(statusLineParts[2] == "OK");
|
||||
|
||||
info("Testing is complete. Stopping the server.");
|
||||
transport.stop();
|
||||
thread.join();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -94,31 +112,32 @@ class Http1Transport : HttpTransport {
|
|||
* requestHandler = The request handler that will handle the received HTTP request.
|
||||
*/
|
||||
void handleClient(Socket clientSocket, HttpRequestHandler requestHandler) {
|
||||
auto inputStream = SocketInputStream(clientSocket);
|
||||
auto bufferedInput = bufferedInputStreamFor!(8192)(inputStream);
|
||||
SocketInputStream* inputStream = new SocketInputStream(clientSocket);
|
||||
BufferedInputStream!(SocketInputStream*, 8192)* bufferedInput
|
||||
= new BufferedInputStream!(SocketInputStream*, 8192)(inputStream);
|
||||
// Get remote address from the socket.
|
||||
import handy_http_primitives.address;
|
||||
ClientAddress addr = getAddress(clientSocket);
|
||||
debugF!"Handling client request from %s."(addr.toString());
|
||||
auto result = readHttpRequest(&bufferedInput, addr);
|
||||
debug_("Finished reading HTTP request from client.");
|
||||
traceF!"Got request from client: %s"(addr.toString());
|
||||
auto result = readHttpRequest(bufferedInput, addr);
|
||||
if (result.hasError) {
|
||||
if (result.error.code != -1) {
|
||||
// Only warn if we didn't read an empty request.
|
||||
warnF!"Failed to read HTTP request: %s"(result.error.message);
|
||||
warnF!"Failed to read request: %s"(result.error.message);
|
||||
}
|
||||
inputStream.closeStream();
|
||||
return;
|
||||
}
|
||||
scope ServerHttpRequest request = result.request;
|
||||
scope ServerHttpResponse response;
|
||||
SocketOutputStream outputStream = SocketOutputStream(clientSocket);
|
||||
SocketOutputStream* outputStream = new SocketOutputStream(clientSocket);
|
||||
response.outputStream = outputStreamObjectFor(HttpResponseOutputStream!(SocketOutputStream*)(
|
||||
&outputStream,
|
||||
outputStream,
|
||||
&response
|
||||
));
|
||||
try {
|
||||
requestHandler.handle(request, response);
|
||||
debugF!"%s %s -> %d %s"(request.method, request.url, response.status.code, response.status.text);
|
||||
} catch (Exception e) {
|
||||
error("Exception thrown while handling request.", e);
|
||||
} catch (Throwable t) {
|
||||
|
|
Loading…
Reference in New Issue