Compare commits

...

18 Commits
v1.0.1 ... main

Author SHA1 Message Date
Andrew Lalis eb43a1be78 Added heap-allocation to allow long-lived websocket connections.
Build and Test Module / build-and-test (push) Successful in 13s Details
Build and Test Module / integration-tests (push) Successful in 18s Details
2025-07-05 14:27:56 -04:00
Andrew Lalis de96a4a45b Added basic standard HTTP 1 test.
Build and Test Module / build-and-test (push) Successful in 14s Details
Build and Test Module / integration-tests (push) Successful in 17s Details
2025-07-01 22:42:09 -04:00
Andrew Lalis 08bb1b58af Updated primitives to 1.3.0 2025-07-01 22:13:10 -04:00
Andrew Lalis 602667879f Updated readme. 2025-07-01 21:07:41 -04:00
Andrew Lalis ae1194e159 Updated readme, updated epoll.
Build and Test Module / build-and-test (push) Successful in 11s Details
Build and Test Module / integration-tests (push) Successful in 17s Details
2025-06-29 16:42:25 -04:00
Andrew Lalis fd42b11c8b Added integration tests, simpler implementation which uses task pool.
Build and Test Module / build-and-test (push) Successful in 14s Details
Build and Test Module / integration-tests (push) Successful in 20s Details
2025-06-29 15:06:14 -04:00
Andrew Lalis d1263a5991 Added tests.
Build and Test Module / build-and-test (push) Successful in 21s Details
Build and Test Module / integration-tests (push) Successful in 23s Details
2025-06-29 11:42:36 -04:00
Andrew Lalis d5a09c0421 Upgrade slf4d and fix integration test. 2025-06-23 22:07:25 -04:00
Andrew Lalis a02d1e4dec Fixed unit tests.
Build and Test Module / build-and-test (push) Successful in 12s Details
Build and Test Module / integration-tests (push) Successful in 18s Details
2025-06-22 22:38:04 -04:00
Andrew Lalis 49d0ebfed0 Cleaned up http1-test.
Build and Test Module / integration-tests (push) Successful in 20s Details
Build and Test Module / build-and-test (push) Has been cancelled Details
2025-06-22 22:06:39 -04:00
Andrew Lalis bbd1c05e62 Added integration test. 2025-06-22 22:00:14 -04:00
Andrew Lalis 93d983424e Introduced more logging calls.
Build and Test Module / build-and-test (push) Successful in 35s Details
2025-05-29 22:52:43 -04:00
Andrew Lalis 905e3f93f8 Upgrade primitives to 1.6
Build and Test Module / build-and-test (push) Successful in 12s Details
2025-03-26 14:34:41 -04:00
Andrew Lalis c2cd2cfc5c upgraded dependencies
Build and Test Module / build-and-test (push) Successful in 11s Details
2025-03-23 20:03:55 -04:00
Andrew Lalis 4709f8b00c updated dependency version strings, and added test for response output stream.
Build and Test Module / build-and-test (push) Successful in 12s Details
2025-03-13 19:42:31 -04:00
Andrew Lalis b06cb7547f Added more tests.
Build and Test Module / build-and-test (push) Successful in 13s Details
2025-03-06 21:55:04 -05:00
Andrew Lalis 4d352a0ffa Upgraded dependencies, and simplified response output stream.
Build and Test Module / build-and-test (push) Successful in 11s Details
2025-03-05 20:13:45 -05:00
Andrew Lalis b260ddfe8c Added public imports for packages. 2025-01-14 17:57:01 -05:00
17 changed files with 630 additions and 74 deletions

32
.gitea/workflows/ci.yaml Normal file
View File

@ -0,0 +1,32 @@
name: Build and Test Module
on:
push:
paths:
- 'source/**'
- '.gitea/workflows/ci.yaml'
pull_request:
types: [opened, reopened, synchronize]
jobs:
build-and-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup DLang
uses: dlang-community/setup-dlang@v2
with:
compiler: ldc-latest
- name: Build and Test
run: dub -q test
integration-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup DLang
uses: dlang-community/setup-dlang@v2
with:
compiler: ldc-latest
- name: http1-test
working-directory: integration-tests/http1-basic
run: dub run --single http1-test.d

View File

@ -10,7 +10,7 @@ implemented so far.
## HTTP/1.1
Use the `Http1Transport` implementation of `HttpTransport` to serve content
Use the `TaskPoolHttp1Transport` implementation of `HttpTransport` to serve content
using the HTTP/1.1 protocol. See the example below:
```d
@ -26,7 +26,7 @@ class MyHandler : HttpRequestHandler {
}
void main() {
HttpTransport tp = new Http1Transport(new MyHandler(), 8080);
HttpTransport tp = new TaskPoolHttp1Transport(new MyHandler(), 8080);
tp.start();
}
```

View File

@ -4,9 +4,10 @@
],
"copyright": "Copyright © 2024, Andrew Lalis",
"dependencies": {
"handy-http-primitives": "~>1.0.0",
"photon": "~>0.10.2",
"streams": "~>3.5.0"
"handy-http-primitives": "~>1.6",
"photon": "~>0.11",
"streams": "~>3.6",
"slf4d": "~>4.0"
},
"description": "Implementations of HTTP transport protocols.",
"license": "CC0",

View File

@ -1,9 +1,10 @@
{
"fileVersion": 1,
"versions": {
"handy-http-primitives": "1.0.0",
"photon": "0.10.2",
"handy-http-primitives": "1.7.0",
"photon": "0.11.0",
"sharded-map": "2.7.0",
"streams": "3.5.0"
"slf4d": "4.1.1",
"streams": "3.6.0"
}
}

View File

@ -0,0 +1 @@
http1-test

View File

@ -0,0 +1,53 @@
/+ dub.sdl:
dependency "handy-http-transport" path="../../"
dependency "requests" version="~>2.1"
+/
/**
* This tests the basic HTTP functionality of the Http1Transport implementation
* by starting a server, sending a request, and checking the response.
*/
module integration_tests.http1_test;
import handy_http_primitives;
import handy_http_transport;
import slf4d;
import slf4d.default_provider;
import requests;
import core.thread;
int main() {
auto loggingProvider = DefaultProvider.builder()
.withRootLoggingLevel(Levels.INFO)
.withConsoleSerializer(true, 48)
.build();
configureLoggingProvider(loggingProvider);
HttpTransport transport = new TaskPoolHttp1Transport(HttpRequestHandler.of(&handleRequest));
Thread thread = transport.startInNewThread();
scope(exit) {
transport.stop();
thread.join();
}
info("Started server in another thread.");
Thread.sleep(msecs(100)); // Wait for the server to start.
// Send a simple GET request to the server.
info("Sending GET request to http://localhost:8080");
auto content = getContent("http://localhost:8080");
ubyte[] data = content.data;
if (data.length != 13 || (cast(string) data) != "Hello, world!") {
error("Received unexpected content: " ~ cast(string) data);
return 1;
}
info("Test completed successfully.");
return 0;
}
void handleRequest(ref ServerHttpRequest request, ref ServerHttpResponse response) {
response.headers.add("Content-Type", "text/plain");
response.headers.add("Content-Length", "13");
response.outputStream.writeToStream(cast(ubyte[]) "Hello, world!");
}

View File

@ -0,0 +1 @@
http1-speed-test

View File

@ -0,0 +1,28 @@
/+ dub.sdl:
dependency "handy-http-transport" path="../../"
+/
module integration_tests.http1_speed_test;
import handy_http_primitives;
import handy_http_transport;
import slf4d;
import slf4d.default_provider;
void main() {
auto loggingProvider = DefaultProvider.builder()
.withRootLoggingLevel(Levels.ERROR)
.withConsoleSerializer(true, 48)
.build();
configureLoggingProvider(loggingProvider);
HttpTransport transport;
transport = new TaskPoolHttp1Transport(HttpRequestHandler.of(
(ref ServerHttpRequest request, ref ServerHttpResponse response) {
if (request.method == HttpMethod.DELETE) {
transport.stop();
}
response.headers.add("Content-Type", "text/plain");
response.headers.add("Content-Length", "13");
response.outputStream.writeToStream(cast(ubyte[]) "Hello, world!");
}));
transport.start();
}

14
integration-tests/run_all.sh Executable file
View File

@ -0,0 +1,14 @@
#!/usr/bin/env bash
# This script runs all integration tests.
set -e -o pipefail
cd http1-basic
dub build --single http1-test.d
./http1-test
cd ..
# cd http1-speed-test
# dub build --single --build=release http1-speed-test.d

View File

@ -20,15 +20,14 @@ Either!(string, "value", StreamError, "error") consumeUntil(S)(
auto result = inputStream.readFromStream(buffer[idx .. idx + 1]);
if (result.hasError) return Either!(string, "value", StreamError, "error")(result.error);
if (result.count != 1) return Either!(string, "value", StreamError, "error")(
StreamError("Failed to read a single element", 1)
StreamError("Failed to read a single element", 0)
);
idx++;
if (idx >= target.length && buffer[idx - target.length .. idx] == target) {
return Either!(string, "value", StreamError, "error")(
cast(string) buffer[0 .. idx - target.length].idup
);
}
if (idx >= buffer.length) {
} else if (idx >= buffer.length) {
return Either!(string, "value", StreamError, "error")(
StreamError("Couldn't find target \"" ~ target ~ "\" after reading 1024 bytes.", 1)
);
@ -61,13 +60,22 @@ ptrdiff_t indexOf(string s, char c, size_t offset = 0) {
string stripSpaces(string s) {
if (s.length == 0) return s;
ptrdiff_t startIdx = 0;
while (s[startIdx] == ' ' && startIdx < s.length) startIdx++;
while (startIdx < s.length && s[startIdx] == ' ') startIdx++;
s = s[startIdx .. $];
ptrdiff_t endIdx = s.length - 1;
if (s.length == 0) return "";
ptrdiff_t endIdx = cast(ptrdiff_t) s.length - 1;
while (s[endIdx] == ' ' && endIdx >= 0) endIdx--;
return s[0 .. endIdx + 1];
}
unittest {
assert(stripSpaces("") == "");
assert(stripSpaces(" ") == "");
assert(stripSpaces("test") == "test");
assert(stripSpaces(" test") == "test");
assert(stripSpaces(" test string ") == "test string");
}
/**
* Helper function to append an unsigned integer value to a char buffer. It is
* assumed that there's enough space to write the value.

View File

@ -0,0 +1,122 @@
module handy_http_transport.http1.epoll;
import core.sys.posix.sys.socket;
import core.sys.linux.epoll;
import core.sys.posix.netinet.in_;
import core.sys.posix.fcntl;
import core.sys.posix.unistd;
import core.stdc.errno;
extern(C) {
int accept4(int sockfd, sockaddr *addr, socklen_t *addrlen, int flags);
}
import handy_http_transport.interfaces;
import handy_http_transport.http1;
import handy_http_primitives;
import slf4d;
class Http1EpollTransport : Http1Transport {
this(HttpRequestHandler requestHandler, ushort port) {
super(requestHandler, port);
}
override void start() {
super.start();
// Create the server socket.
enum SOCK_NONBLOCK = 0x4000;
int listenFd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
sockaddr_in serverAddress;
serverAddress.sin_family = AF_INET;
serverAddress.sin_port = htons(port);
serverAddress.sin_addr.s_addr = INADDR_ANY;
if (bind(listenFd, cast(sockaddr*) &serverAddress, serverAddress.sizeof) == -1) {
errorF!"Failed to bind socket: %d"(errno);
close(listenFd);
return;
}
if (listen(listenFd, SOMAXCONN) == -1) {
errorF!"Failed to listen on socket: %d"(errno);
close(listenFd);
return;
}
int epollFd = epoll_create1(0);
if (epollFd == -1) {
errorF!"Failed to create epoll instance: %d"(errno);
return;
}
epoll_event event;
epoll_event[64] events;
event.events = EPOLLIN | EPOLLET;
event.data.fd = listenFd;
if (epoll_ctl(epollFd, EPOLL_CTL_ADD, listenFd, &event) == -1) {
errorF!"Failed to add listen socket to epoll: %d"(errno);
close(listenFd);
close(epollFd);
return;
}
infoF!"Server listening on port %d."(port);
while (true) {
int eventCount = epoll_wait(epollFd, &event, 64, 5000);
if (eventCount == -1) {
errorF!"Epoll wait failed: %d"(errno);
break;
}
for (int i = 0; i < eventCount; i++) {
if (events[i].data.fd == listenFd) {
// New incoming connection.
while (true) {
sockaddr_in clientAddress;
socklen_t clientAddressLength = clientAddress.sizeof;
int clientFd = accept4(
listenFd,
cast(sockaddr*) &clientAddress,
&clientAddressLength,
SOCK_NONBLOCK
);
if (clientFd == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// No more connections to accept.
break;
} else {
errorF!"Failed to accept connection: %d"(errno);
break;
}
}
// Add the client socket to epoll's listening list.
event.events = EPOLLIN | EPOLLET;
event.data.fd = clientFd;
if (epoll_ctl(epollFd, EPOLL_CTL_ADD, clientFd, &event) == -1) {
errorF!"Failed to add client socket to epoll: %d"(errno);
close(clientFd);
}
infoF!"Accepted new connection from %s:%d."(
inet_ntoa(clientAddress.sin_addr),
ntohs(clientAddress.sin_port)
);
}
} else {
// Event on an existing client socket.
int clientFd = events[i].data.fd;
if (events[i].events & EPOLLIN) {
}
}
}
}
}
}

View File

@ -3,3 +3,5 @@
*/
module handy_http_transport.http1;
public import handy_http_transport.http1.transport;
public import handy_http_transport.http1.task_pool;

View File

@ -0,0 +1,62 @@
module handy_http_transport.http1.task_pool;
import std.socket;
import std.parallelism;
import handy_http_transport.http1.transport;
import handy_http_primitives;
import slf4d;
/**
* An implementation of Http1Transport which uses D's standard library
* parallelization, where each incoming client request is turned into a task
* and submitted to the standard task pool.
*/
class TaskPoolHttp1Transport : Http1Transport {
this(HttpRequestHandler requestHandler, ushort port = 8080) {
super(requestHandler, port);
}
override void runServer() {
Socket serverSocket = new TcpSocket();
serverSocket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1);
serverSocket.bind(new InternetAddress("127.0.0.1", port));
serverSocket.listen(1024);
while (super.isRunning) {
try {
Socket clientSocket = serverSocket.accept();
auto t = task!handleClient(clientSocket, requestHandler);
taskPool().put(t);
} catch (SocketAcceptException e) {
warn("Failed to accept socket connection.", e);
}
}
serverSocket.close();
}
override void stop() {
super.stop();
// Send a dummy request to cause the server's blocking accept() call to end.
try {
Socket dummySocket = new TcpSocket(new InternetAddress("127.0.0.1", port));
dummySocket.shutdown(SocketShutdown.BOTH);
dummySocket.close();
} catch (SocketOSException e) {
warn("Failed to send empty request to stop server.", e);
}
}
}
unittest {
import slf4d.default_provider;
auto logProvider = DefaultProvider.builder().withRootLoggingLevel(Levels.DEBUG).build();
configureLoggingProvider(logProvider);
HttpRequestHandler handler = HttpRequestHandler.of(
(ref ServerHttpRequest request, ref ServerHttpResponse response) {
response.status = HttpStatus.OK;
response.writeBodyString("Testing");
});
testHttp1Transport(new TaskPoolHttp1Transport(handler));
}

View File

@ -1,6 +1,7 @@
module handy_http_transport.http1.transport;
import std.socket;
import core.atomic : atomicStore, atomicLoad;
import handy_http_transport.interfaces;
import handy_http_transport.helpers;
@ -10,51 +11,94 @@ import handy_http_primitives;
import handy_http_primitives.address;
import streams;
import photon;
import slf4d;
/**
* The HTTP/1.1 transport protocol implementation, using Dimitry Olshansky's
* Photon fiber scheduling library for concurrency.
* Base class for HTTP/1.1 transport, where different subclasses can define
* how the actual socket communication works (threadpool / epoll/ etc).
*/
class Http1Transport : HttpTransport {
private Socket serverSocket;
private HttpRequestHandler requestHandler;
private const ushort port;
abstract class Http1Transport : HttpTransport {
protected HttpRequestHandler requestHandler;
protected immutable ushort port;
private bool running = false;
this(HttpRequestHandler requestHandler, ushort port = 8080) {
assert(requestHandler !is null);
this.serverSocket = new TcpSocket();
this.requestHandler = requestHandler;
this.port = port;
}
void start() {
startloop();
go(() => runServer());
runFibers();
bool isRunning() {
return atomicLoad(running);
}
void start() {
infoF!"Starting Http1Transport server on port %d."(port);
atomicStore(running, true);
runServer();
}
protected abstract void runServer();
void stop() {
this.running = false;
this.serverSocket.shutdown(SocketShutdown.BOTH);
this.serverSocket.close();
infoF!"Stopping Http1Transport server on port %d."(port);
atomicStore(running, false);
}
}
private void runServer() {
this.running = true;
serverSocket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1);
serverSocket.bind(new InternetAddress("127.0.0.1", port));
serverSocket.listen(100);
while (running) {
try {
Socket clientSocket = serverSocket.accept();
go(() => handleClient(clientSocket, requestHandler));
} catch (SocketAcceptException e) {
import std.stdio;
stderr.writefln!"Failed to accept socket connection: %s"(e);
version(unittest) {
/**
* A generic test to ensure that any Http1Transport implementation behaves
* properly to start & stop, and process requests when running.
*
* It's assumed that the given transport is configured to run on localhost,
* port 8080, and return a standard 200 OK empty response to all requests.
* Params:
* transport = The transport implementation to test.
*/
void testHttp1Transport(Http1Transport transport) {
import core.thread;
import std.string;
infoF!"Testing Http1Transport implementation: %s"(transport);
Thread thread = transport.startInNewThread();
Thread.sleep(msecs(100));
Socket clientSocket1 = new TcpSocket(new InternetAddress(8080));
const requestBody = "POST /users HTTP/1.1\r\n" ~
"Host: example.com\r\n" ~
"Content-Type: text/plain\r\n" ~
"Content-Length: 13\r\n" ~
"\r\n" ~
"Hello, world!";
ptrdiff_t bytesSent = clientSocket1.send(requestBody);
assert(bytesSent == requestBody.length, "Couldn't send the full request body to the server.");
ubyte[8192] buffer;
size_t totalBytesReceived = 0;
ptrdiff_t bytesReceived;
do {
bytesReceived = clientSocket1.receive(buffer[totalBytesReceived .. $]);
if (bytesReceived == Socket.ERROR) {
assert(false, "Socket error when attempting to receive a response from the HttpTransport server.");
}
}
totalBytesReceived += bytesReceived;
} while (bytesReceived > 0);
string httpResponseContent = cast(string) buffer[0 .. totalBytesReceived];
string[] parts = httpResponseContent.split("\r\n\r\n");
assert(parts.length > 0, "HTTP 1.1 response is missing required status and headers section.");
string[] headerLines = parts[0].split("\r\n");
assert(headerLines.length > 0, "HTTP 1.1 response is missing required status line.");
string statusLine = headerLines[0];
string[] statusLineParts = statusLine.split(" ");
assert(statusLineParts[0] == "HTTP/1.1");
assert(statusLineParts[1] == "200");
assert(statusLineParts[2] == "OK");
info("Testing is complete. Stopping the server.");
transport.stop();
thread.join();
}
}
@ -68,32 +112,73 @@ class Http1Transport : HttpTransport {
* requestHandler = The request handler that will handle the received HTTP request.
*/
void handleClient(Socket clientSocket, HttpRequestHandler requestHandler) {
auto inputStream = SocketInputStream(clientSocket);
auto bufferedInput = bufferedInputStreamFor!(8192)(inputStream);
SocketInputStream* inputStream = new SocketInputStream(clientSocket);
BufferedInputStream!(SocketInputStream*, 8192)* bufferedInput
= new BufferedInputStream!(SocketInputStream*, 8192)(inputStream);
// Get remote address from the socket.
import handy_http_primitives.address;
ClientAddress addr = getAddress(clientSocket);
auto result = readHttpRequest(&bufferedInput, addr);
traceF!"Got request from client: %s"(addr.toString());
auto result = readHttpRequest(bufferedInput, addr);
if (result.hasError) {
import std.stdio;
stderr.writeln("Failed to read HTTP request: " ~ result.error.message);
if (result.error.code != -1) {
// Only warn if we didn't read an empty request.
warnF!"Failed to read request: %s"(result.error.message);
}
inputStream.closeStream();
return;
}
scope ServerHttpRequest request = result.request;
scope ServerHttpResponse response;
SocketOutputStream outputStream = SocketOutputStream(clientSocket);
SocketOutputStream* outputStream = new SocketOutputStream(clientSocket);
response.outputStream = outputStreamObjectFor(HttpResponseOutputStream!(SocketOutputStream*)(
&outputStream,
outputStream,
&response
));
try {
requestHandler.handle(request, response);
debugF!"%s %s -> %d %s"(request.method, request.url, response.status.code, response.status.text);
} catch (Exception e) {
import std.stdio;
stderr.writeln("Exception thrown while handling request: " ~ e.msg);
error("Exception thrown while handling request.", e);
} catch (Throwable t) {
errorF!"Throwable error while handling request: %s"(t.msg);
throw t;
}
inputStream.closeStream();
if (response.status != HttpStatus.SWITCHING_PROTOCOLS) {
inputStream.closeStream();
}
}
// Test case where we use a local socket pair to test the full handleClient
// workflow from the HttpRequestHandler's point of view.
unittest {
Socket[2] sockets = socketPair();
Socket clientSocket = sockets[0];
Socket serverSocket = sockets[1];
const requestContent =
"POST /data HTTP/1.1\r\n" ~
"Content-Type: application/json\r\n" ~
"Content-Length: 22\r\n" ~
"\r\n" ~
"{\"x\": 5, \"flag\": true}";
clientSocket.send(cast(ubyte[]) requestContent);
class TestHandler : HttpRequestHandler {
import std.conv;
void handle(ref ServerHttpRequest request, ref ServerHttpResponse response) {
assert(request.headers["Content-Type"] == ["application/json"]);
assert("Content-Length" in request.headers && request.headers["Content-Length"].length > 0);
ulong contentLength = request.headers["Content-Length"][0].to!ulong;
assert(contentLength == 22);
ubyte[22] bodyBuffer;
auto readResult = request.inputStream.readFromStream(bodyBuffer);
assert(readResult.hasCount && readResult.count == 22);
assert(cast(string) bodyBuffer == "{\"x\": 5, \"flag\": true}");
}
}
handleClient(serverSocket, new TestHandler());
}
/**
@ -143,9 +228,17 @@ alias HttpRequestParseResult = Either!(ServerHttpRequest, "request", StreamError
*/
HttpRequestParseResult readHttpRequest(S)(S inputStream, in ClientAddress addr) if (isByteInputStream!S) {
auto methodStr = consumeUntil(inputStream, " ");
if (methodStr.hasError) return HttpRequestParseResult(methodStr.error);
if (methodStr.hasError) {
if (methodStr.error.code == 0) {
// Set a custom code to indicate an empty request.
return HttpRequestParseResult(StreamError(methodStr.error.message, -1));
}
return HttpRequestParseResult(methodStr.error);
}
auto urlStr = consumeUntil(inputStream, " ");
if (urlStr.hasError) return HttpRequestParseResult(urlStr.error);
auto versionStr = consumeUntil(inputStream, "\r\n");
if (versionStr.hasError) return HttpRequestParseResult(versionStr.error);
@ -159,6 +252,8 @@ HttpRequestParseResult readHttpRequest(S)(S inputStream, in ClientAddress addr)
auto headersResult = parseHeaders(inputStream);
if (headersResult.hasError) return HttpRequestParseResult(headersResult.error);
auto queryParams = parseQueryParameters(urlStr.value);
import std.uri : decode; // TODO: Remove dependency on phobos for this?
return HttpRequestParseResult(ServerHttpRequest(
@ -167,15 +262,57 @@ HttpRequestParseResult readHttpRequest(S)(S inputStream, in ClientAddress addr)
methodStr.value,
decode(urlStr.value),
headersResult.headers,
queryParams,
inputStreamObjectFor(inputStream)
));
}
unittest {
import streams;
auto makeStream(string text) {
return arrayInputStreamFor(cast(ubyte[]) text);
}
// Basic HTTP request.
ArrayInputStream!ubyte s1 = makeStream(
"GET /test?x=5 HTTP/1.1\r\n" ~
"Accept: text/plain\r\n" ~
"\r\n"
);
auto r1 = readHttpRequest(&s1, ClientAddress.unknown());
assert(r1.hasRequest);
assert(r1.request.httpVersion == HttpVersion.V1);
assert(r1.request.method == HttpMethod.GET);
assert(r1.request.url == "/test?x=5");
const r1ExpectedHeaders = ["Accept": ["text/plain"]];
assert(r1.request.headers == r1ExpectedHeaders);
assert(r1.request.clientAddress == ClientAddress.unknown());
// POST request with body. Test that the body is read correctly.
ArrayInputStream!ubyte s2 = makeStream(
"POST /data HTTP/1.1\r\n" ~
"Content-Type: text/plain\r\n" ~
"Content-Length: 12\r\n" ~
"\r\n" ~
"Hello world!"
);
auto r2 = readHttpRequest(&s2, ClientAddress.unknown());
assert(r2.hasRequest);
assert(r2.request.method == HttpMethod.POST);
ubyte[12] r2BodyBuffer;
StreamResult r2BodyReadResult = s2.readFromStream(r2BodyBuffer);
assert(r2BodyReadResult.count == 12);
assert(cast(string) r2BodyBuffer == "Hello world!");
}
/**
* Parses HTTP headers from an input stream, and returns them as an associative
* array mapping header names to their list of values.
* Params:
* inputStream = The byte input stream to read from.
* inputStream = The byte input stream to read from. Note that this stream
* should be passed as a pointer / reference, values will be
* consumed from the stream.
* Returns: Either the headers, or a stream error.
*/
Either!(string[][string], "headers", StreamError, "error") parseHeaders(S)(S inputStream) if (isByteInputStream!S) {
@ -202,14 +339,40 @@ Either!(string[][string], "headers", StreamError, "error") parseHeaders(S)(S inp
}
unittest {
class TestHandler : HttpRequestHandler {
void handle(ref ServerHttpRequest request, ref ServerHttpResponse response) {
response.status = HttpStatus.OK;
response.headers.add("Content-Type", "application/json");
response.outputStream.writeToStream(cast(ubyte[]) "{\"a\": 1}");
}
import streams;
auto makeStream(string text) {
return arrayInputStreamFor(cast(ubyte[]) text);
}
HttpTransport tp = new Http1Transport(new TestHandler(), 8080);
tp.start();
// Basic valid headers.
auto s1 = makeStream("Content-Type: application/json\r\n\r\n");
auto r1 = parseHeaders(&s1);
assert(r1.hasHeaders);
assert("Content-Type" in r1.headers);
assert(r1.headers["Content-Type"] == ["application/json"]);
// Multiple headers.
auto s2 = makeStream("Accept: text, json, image\r\nContent-Length: 1234\r\n\r\n");
auto r2 = parseHeaders(&s2);
assert(r2.hasHeaders);
assert("Accept" in r2.headers);
assert(r2.headers["Accept"] == ["text, json, image"]);
assert(r2.headers["Content-Length"] == ["1234"]);
// Basic invalid header string.
auto s3 = makeStream("Invalid headers");
auto r3 = parseHeaders(&s3);
assert(r3.hasError);
// No trailing \r\n
auto s4 = makeStream("Content-Type: application/json");
auto r4 = parseHeaders(&s4);
assert(r4.hasError);
// Empty headers.
auto s5 = makeStream("\r\n");
auto r5 = parseHeaders(&s5);
assert(r5.hasHeaders);
assert(r5.headers.length == 0);
}

View File

@ -4,3 +4,21 @@ interface HttpTransport {
void start();
void stop();
}
import core.thread;
/**
* Starts a new thread to run an HTTP transport implementation in, separate
* from the calling thread. This is useful for running a server in the
* background, like for integration tests.
* Params:
* transport = The transport implementation to start.
* Returns: The thread that was started.
*/
Thread startInNewThread(HttpTransport transport) {
Thread t = new Thread(() {
transport.start();
});
t.start();
return t;
}

View File

@ -2,3 +2,6 @@ module handy_http_transport;
public import handy_http_transport.http1;
public import handy_http_transport.http2;
public import handy_http_transport.interfaces;
public import handy_http_transport.response_output_stream;

View File

@ -50,28 +50,75 @@ struct HttpResponseOutputStream(S) if (isByteOutputStream!S) {
* Returns: The stream result of writing.
*/
StreamResult writeHeaders() {
// TODO: Come up with a better way of writing headers than string concatenation.
size_t idx = 0;
char[6] statusCodeBuffer; // Normal HTTP codes are 3 digits, but this leaves room for extensions.
writeUIntToBuffer(response.status.code, statusCodeBuffer, idx);
string statusAndHeaders = "HTTP/1.1 "
~ cast(string) statusCodeBuffer[0..idx]
~ " " ~ response.status.text
~ "\r\n";
// Write the status line.
StreamResult r = outputStream.writeToStream(cast(ubyte[]) "HTTP/1.1 ");
if (r.hasError) return r;
size_t writeCount = r.count;
r = outputStream.writeToStream(cast(ubyte[]) statusCodeBuffer[0..idx]);
if (r.hasError) return r;
writeCount += r.count;
r = outputStream.writeToStream([' ']);
if (r.hasError) return r;
writeCount += r.count;
r = outputStream.writeToStream(cast(ubyte[]) response.status.text);
if (r.hasError) return r;
writeCount += r.count;
r = outputStream.writeToStream(['\r', '\n']);
if (r.hasError) return r;
writeCount += r.count;
foreach (headerName; response.headers.keys) {
string headerLine = headerName ~ ": ";
// Write the header name.
r = outputStream.writeToStream(cast(ubyte[]) headerName);
if (r.hasError) return r;
writeCount += r.count;
r = outputStream.writeToStream([':', ' ']);
if (r.hasError) return r;
writeCount += r.count;
// Write the comma-separated list of values.
string[] headerValues = response.headers.getAll(headerName);
for (size_t i = 0; i < headerValues.length; i++) {
headerLine ~= headerValues[i];
r = outputStream.writeToStream(cast(ubyte[]) headerValues[i]);
if (r.hasError) return r;
writeCount += r.count;
if (i + 1 < headerValues.length) {
headerLine ~= ", ";
r = outputStream.writeToStream([',', ' ']);
if (r.hasError) return r;
writeCount += r.count;
}
}
headerLine ~= "\r\n";
statusAndHeaders ~= headerLine;
r = outputStream.writeToStream(['\r', '\n']);
if (r.hasError) return r;
writeCount += r.count;
}
statusAndHeaders ~= "\r\n"; // Trailing CLRF before the body.
return outputStream.writeToStream(cast(ubyte[]) statusAndHeaders);
r = outputStream.writeToStream(['\r', '\n']); // Trailing CLRF before the body.
if (r.hasError) return r;
writeCount += r.count;
return StreamResult(cast(uint) writeCount);
}
}
// Test basic functionality for writing a standard response with headers and a
// body.
unittest {
import handy_http_primitives.response;
ArrayOutputStream!ubyte os;
ServerHttpResponse resp;
resp.status = HttpStatus.OK;
resp.headers.add("Content-Type", "text/plain");
auto httpOut = HttpResponseOutputStream!(ArrayOutputStream!ubyte*)(&os, &resp);
resp.outputStream = outputStreamObjectFor(httpOut);
StreamResult r = resp.outputStream.writeToStream(cast(ubyte[]) "Hello world!");
const expectedOutput =
"HTTP/1.1 200 OK\r\n" ~
"Content-Type: text/plain\r\n" ~
"\r\n" ~
"Hello world!";
assert(os.toArray() == expectedOutput);
assert(r.hasCount);
assert(r.count == os.toArray().length);
}