Compare commits
25 Commits
Author | SHA1 | Date |
---|---|---|
|
7ff80c8a9f | |
|
a0d1274bbe | |
|
01d48e9537 | |
|
5e79fba1b4 | |
|
febce5eb8d | |
|
22e8fa9b70 | |
|
18851ac786 | |
|
eb43a1be78 | |
|
de96a4a45b | |
|
08bb1b58af | |
|
602667879f | |
|
ae1194e159 | |
|
fd42b11c8b | |
|
d1263a5991 | |
|
d5a09c0421 | |
|
a02d1e4dec | |
|
49d0ebfed0 | |
|
bbd1c05e62 | |
|
93d983424e | |
|
905e3f93f8 | |
|
c2cd2cfc5c | |
|
4709f8b00c | |
|
b06cb7547f | |
|
4d352a0ffa | |
|
b260ddfe8c |
|
@ -0,0 +1,32 @@
|
|||
name: Build and Test Module
|
||||
on:
|
||||
push:
|
||||
paths:
|
||||
- 'source/**'
|
||||
- '.gitea/workflows/ci.yaml'
|
||||
pull_request:
|
||||
types: [opened, reopened, synchronize]
|
||||
jobs:
|
||||
build-and-test:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- name: Setup DLang
|
||||
uses: dlang-community/setup-dlang@v2
|
||||
with:
|
||||
compiler: ldc-latest
|
||||
- name: Build and Test
|
||||
run: dub -q test
|
||||
|
||||
integration-tests:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- name: Setup DLang
|
||||
uses: dlang-community/setup-dlang@v2
|
||||
with:
|
||||
compiler: ldc-latest
|
||||
|
||||
- name: http1-test
|
||||
working-directory: integration-tests/http1-basic
|
||||
run: dub run --single http1-test.d
|
6
LICENSE
6
LICENSE
|
@ -1 +1,5 @@
|
|||
Handy-Http by Andrew Lalis is marked with CC0 1.0 Universal. To view a copy of this license, visit https://creativecommons.org/publicdomain/zero/1.0/
|
||||
Copyright 2025 Andrew Lalis
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so.
|
||||
|
||||
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
|
|
|
@ -10,7 +10,7 @@ implemented so far.
|
|||
|
||||
## HTTP/1.1
|
||||
|
||||
Use the `Http1Transport` implementation of `HttpTransport` to serve content
|
||||
Use the `TaskPoolHttp1Transport` implementation of `HttpTransport` to serve content
|
||||
using the HTTP/1.1 protocol. See the example below:
|
||||
|
||||
```d
|
||||
|
@ -26,7 +26,7 @@ class MyHandler : HttpRequestHandler {
|
|||
}
|
||||
|
||||
void main() {
|
||||
HttpTransport tp = new Http1Transport(new MyHandler(), 8080);
|
||||
HttpTransport tp = new TaskPoolHttp1Transport(new MyHandler());
|
||||
tp.start();
|
||||
}
|
||||
```
|
6
dub.json
6
dub.json
|
@ -4,9 +4,9 @@
|
|||
],
|
||||
"copyright": "Copyright © 2024, Andrew Lalis",
|
||||
"dependencies": {
|
||||
"handy-http-primitives": "~>1.0.0",
|
||||
"photon": "~>0.10.2",
|
||||
"streams": "~>3.5.0"
|
||||
"handy-http-primitives": "~>1.8",
|
||||
"streams": "~>3.6",
|
||||
"slf4d": "~>4.0"
|
||||
},
|
||||
"description": "Implementations of HTTP transport protocols.",
|
||||
"license": "CC0",
|
||||
|
|
|
@ -1,9 +1,8 @@
|
|||
{
|
||||
"fileVersion": 1,
|
||||
"versions": {
|
||||
"handy-http-primitives": "1.0.0",
|
||||
"photon": "0.10.2",
|
||||
"sharded-map": "2.7.0",
|
||||
"streams": "3.5.0"
|
||||
"handy-http-primitives": "1.8.0",
|
||||
"slf4d": "4.1.1",
|
||||
"streams": "3.6.0"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
http1-test
|
|
@ -0,0 +1,53 @@
|
|||
/+ dub.sdl:
|
||||
dependency "handy-http-transport" path="../../"
|
||||
dependency "requests" version="~>2.1"
|
||||
+/
|
||||
|
||||
/**
|
||||
* This tests the basic HTTP functionality of the Http1Transport implementation
|
||||
* by starting a server, sending a request, and checking the response.
|
||||
*/
|
||||
module integration_tests.http1_test;
|
||||
|
||||
import handy_http_primitives;
|
||||
import handy_http_transport;
|
||||
import slf4d;
|
||||
import slf4d.default_provider;
|
||||
import requests;
|
||||
|
||||
import core.thread;
|
||||
|
||||
int main() {
|
||||
auto loggingProvider = DefaultProvider.builder()
|
||||
.withRootLoggingLevel(Levels.INFO)
|
||||
.withConsoleSerializer(true, 48)
|
||||
.build();
|
||||
configureLoggingProvider(loggingProvider);
|
||||
|
||||
HttpTransport transport = new TaskPoolHttp1Transport(HttpRequestHandler.of(&handleRequest));
|
||||
Thread thread = transport.startInNewThread();
|
||||
scope(exit) {
|
||||
transport.stop();
|
||||
thread.join();
|
||||
}
|
||||
info("Started server in another thread.");
|
||||
Thread.sleep(msecs(100)); // Wait for the server to start.
|
||||
|
||||
// Send a simple GET request to the server.
|
||||
info("Sending GET request to http://localhost:8080");
|
||||
auto content = getContent("http://localhost:8080");
|
||||
ubyte[] data = content.data;
|
||||
if (data.length != 13 || (cast(string) data) != "Hello, world!") {
|
||||
error("Received unexpected content: " ~ cast(string) data);
|
||||
return 1;
|
||||
}
|
||||
|
||||
info("Test completed successfully.");
|
||||
return 0;
|
||||
}
|
||||
|
||||
void handleRequest(ref ServerHttpRequest request, ref ServerHttpResponse response) {
|
||||
response.headers.add("Content-Type", "text/plain");
|
||||
response.headers.add("Content-Length", "13");
|
||||
response.outputStream.writeToStream(cast(ubyte[]) "Hello, world!");
|
||||
}
|
|
@ -0,0 +1 @@
|
|||
http1-speed-test
|
|
@ -0,0 +1,28 @@
|
|||
/+ dub.sdl:
|
||||
dependency "handy-http-transport" path="../../"
|
||||
+/
|
||||
module integration_tests.http1_speed_test;
|
||||
|
||||
import handy_http_primitives;
|
||||
import handy_http_transport;
|
||||
import slf4d;
|
||||
import slf4d.default_provider;
|
||||
|
||||
void main() {
|
||||
auto loggingProvider = DefaultProvider.builder()
|
||||
.withRootLoggingLevel(Levels.ERROR)
|
||||
.withConsoleSerializer(true, 48)
|
||||
.build();
|
||||
configureLoggingProvider(loggingProvider);
|
||||
HttpTransport transport;
|
||||
transport = new TaskPoolHttp1Transport(HttpRequestHandler.of(
|
||||
(ref ServerHttpRequest request, ref ServerHttpResponse response) {
|
||||
if (request.method == HttpMethod.DELETE) {
|
||||
transport.stop();
|
||||
}
|
||||
response.headers.add("Content-Type", "text/plain");
|
||||
response.headers.add("Content-Length", "13");
|
||||
response.outputStream.writeToStream(cast(ubyte[]) "Hello, world!");
|
||||
}));
|
||||
transport.start();
|
||||
}
|
|
@ -0,0 +1,14 @@
|
|||
#!/usr/bin/env bash
|
||||
|
||||
# This script runs all integration tests.
|
||||
|
||||
set -e -o pipefail
|
||||
|
||||
cd http1-basic
|
||||
dub build --single http1-test.d
|
||||
./http1-test
|
||||
cd ..
|
||||
|
||||
# cd http1-speed-test
|
||||
# dub build --single --build=release http1-speed-test.d
|
||||
|
|
@ -20,15 +20,14 @@ Either!(string, "value", StreamError, "error") consumeUntil(S)(
|
|||
auto result = inputStream.readFromStream(buffer[idx .. idx + 1]);
|
||||
if (result.hasError) return Either!(string, "value", StreamError, "error")(result.error);
|
||||
if (result.count != 1) return Either!(string, "value", StreamError, "error")(
|
||||
StreamError("Failed to read a single element", 1)
|
||||
StreamError("Failed to read a single element", 0)
|
||||
);
|
||||
idx++;
|
||||
if (idx >= target.length && buffer[idx - target.length .. idx] == target) {
|
||||
return Either!(string, "value", StreamError, "error")(
|
||||
cast(string) buffer[0 .. idx - target.length].idup
|
||||
);
|
||||
}
|
||||
if (idx >= buffer.length) {
|
||||
} else if (idx >= buffer.length) {
|
||||
return Either!(string, "value", StreamError, "error")(
|
||||
StreamError("Couldn't find target \"" ~ target ~ "\" after reading 1024 bytes.", 1)
|
||||
);
|
||||
|
@ -61,13 +60,22 @@ ptrdiff_t indexOf(string s, char c, size_t offset = 0) {
|
|||
string stripSpaces(string s) {
|
||||
if (s.length == 0) return s;
|
||||
ptrdiff_t startIdx = 0;
|
||||
while (s[startIdx] == ' ' && startIdx < s.length) startIdx++;
|
||||
while (startIdx < s.length && s[startIdx] == ' ') startIdx++;
|
||||
s = s[startIdx .. $];
|
||||
ptrdiff_t endIdx = s.length - 1;
|
||||
if (s.length == 0) return "";
|
||||
ptrdiff_t endIdx = cast(ptrdiff_t) s.length - 1;
|
||||
while (s[endIdx] == ' ' && endIdx >= 0) endIdx--;
|
||||
return s[0 .. endIdx + 1];
|
||||
}
|
||||
|
||||
unittest {
|
||||
assert(stripSpaces("") == "");
|
||||
assert(stripSpaces(" ") == "");
|
||||
assert(stripSpaces("test") == "test");
|
||||
assert(stripSpaces(" test") == "test");
|
||||
assert(stripSpaces(" test string ") == "test string");
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper function to append an unsigned integer value to a char buffer. It is
|
||||
* assumed that there's enough space to write the value.
|
||||
|
|
|
@ -0,0 +1,122 @@
|
|||
module handy_http_transport.http1.epoll;
|
||||
|
||||
import core.sys.posix.sys.socket;
|
||||
import core.sys.linux.epoll;
|
||||
import core.sys.posix.netinet.in_;
|
||||
import core.sys.posix.fcntl;
|
||||
import core.sys.posix.unistd;
|
||||
|
||||
import core.stdc.errno;
|
||||
|
||||
extern(C) {
|
||||
int accept4(int sockfd, sockaddr *addr, socklen_t *addrlen, int flags);
|
||||
}
|
||||
|
||||
import handy_http_transport.interfaces;
|
||||
import handy_http_transport.http1;
|
||||
import handy_http_primitives;
|
||||
import slf4d;
|
||||
|
||||
class Http1EpollTransport : Http1Transport {
|
||||
|
||||
this(HttpRequestHandler requestHandler, ushort port) {
|
||||
super(requestHandler, port);
|
||||
}
|
||||
|
||||
override void start() {
|
||||
super.start();
|
||||
// Create the server socket.
|
||||
enum SOCK_NONBLOCK = 0x4000;
|
||||
int listenFd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
|
||||
sockaddr_in serverAddress;
|
||||
serverAddress.sin_family = AF_INET;
|
||||
serverAddress.sin_port = htons(port);
|
||||
serverAddress.sin_addr.s_addr = INADDR_ANY;
|
||||
|
||||
if (bind(listenFd, cast(sockaddr*) &serverAddress, serverAddress.sizeof) == -1) {
|
||||
errorF!"Failed to bind socket: %d"(errno);
|
||||
close(listenFd);
|
||||
return;
|
||||
}
|
||||
|
||||
if (listen(listenFd, SOMAXCONN) == -1) {
|
||||
errorF!"Failed to listen on socket: %d"(errno);
|
||||
close(listenFd);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
int epollFd = epoll_create1(0);
|
||||
if (epollFd == -1) {
|
||||
errorF!"Failed to create epoll instance: %d"(errno);
|
||||
return;
|
||||
}
|
||||
|
||||
epoll_event event;
|
||||
epoll_event[64] events;
|
||||
event.events = EPOLLIN | EPOLLET;
|
||||
event.data.fd = listenFd;
|
||||
if (epoll_ctl(epollFd, EPOLL_CTL_ADD, listenFd, &event) == -1) {
|
||||
errorF!"Failed to add listen socket to epoll: %d"(errno);
|
||||
close(listenFd);
|
||||
close(epollFd);
|
||||
return;
|
||||
}
|
||||
|
||||
infoF!"Server listening on port %d."(port);
|
||||
|
||||
while (true) {
|
||||
int eventCount = epoll_wait(epollFd, &event, 64, 5000);
|
||||
if (eventCount == -1) {
|
||||
errorF!"Epoll wait failed: %d"(errno);
|
||||
break;
|
||||
}
|
||||
|
||||
for (int i = 0; i < eventCount; i++) {
|
||||
if (events[i].data.fd == listenFd) {
|
||||
// New incoming connection.
|
||||
while (true) {
|
||||
sockaddr_in clientAddress;
|
||||
socklen_t clientAddressLength = clientAddress.sizeof;
|
||||
int clientFd = accept4(
|
||||
listenFd,
|
||||
cast(sockaddr*) &clientAddress,
|
||||
&clientAddressLength,
|
||||
SOCK_NONBLOCK
|
||||
);
|
||||
if (clientFd == -1) {
|
||||
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
||||
// No more connections to accept.
|
||||
break;
|
||||
} else {
|
||||
errorF!"Failed to accept connection: %d"(errno);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Add the client socket to epoll's listening list.
|
||||
event.events = EPOLLIN | EPOLLET;
|
||||
event.data.fd = clientFd;
|
||||
if (epoll_ctl(epollFd, EPOLL_CTL_ADD, clientFd, &event) == -1) {
|
||||
errorF!"Failed to add client socket to epoll: %d"(errno);
|
||||
close(clientFd);
|
||||
}
|
||||
|
||||
infoF!"Accepted new connection from %s:%d."(
|
||||
inet_ntoa(clientAddress.sin_addr),
|
||||
ntohs(clientAddress.sin_port)
|
||||
);
|
||||
}
|
||||
} else {
|
||||
// Event on an existing client socket.
|
||||
|
||||
int clientFd = events[i].data.fd;
|
||||
|
||||
if (events[i].events & EPOLLIN) {
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -3,3 +3,5 @@
|
|||
*/
|
||||
module handy_http_transport.http1;
|
||||
|
||||
public import handy_http_transport.http1.transport;
|
||||
public import handy_http_transport.http1.task_pool;
|
||||
|
|
|
@ -0,0 +1,115 @@
|
|||
module handy_http_transport.http1.task_pool;
|
||||
|
||||
import std.socket;
|
||||
import std.parallelism;
|
||||
|
||||
import handy_http_transport.http1.transport;
|
||||
import handy_http_primitives;
|
||||
import slf4d;
|
||||
|
||||
/**
|
||||
* Configuration options to provide when creating a new Http1Transport
|
||||
* instance.
|
||||
*/
|
||||
struct Http1TransportConfig {
|
||||
/// The host address to bind to.
|
||||
string host;
|
||||
/// The port to bind to.
|
||||
ushort port;
|
||||
/// The number of workers to use in the task pool.
|
||||
size_t workerCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines the default configuration options if none are provided. They are:
|
||||
* * Host address 127.0.0.1
|
||||
* * Port 8080
|
||||
* * Worker count of 5.
|
||||
* Returns: The default configuration.
|
||||
*/
|
||||
Http1TransportConfig defaultConfig() {
|
||||
return Http1TransportConfig(
|
||||
"127.0.0.1",
|
||||
8080,
|
||||
5
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* An implementation of Http1Transport which uses D's standard library
|
||||
* parallelization, where each incoming client request is turned into a task
|
||||
* and submitted to the standard task pool.
|
||||
*/
|
||||
class TaskPoolHttp1Transport : Http1Transport {
|
||||
private TaskPool httpTaskPool;
|
||||
private immutable Http1TransportConfig config;
|
||||
|
||||
/**
|
||||
* Creates a new transport instance using a std.parallelism TaskPool for
|
||||
* handling requests.
|
||||
* Params:
|
||||
* requestHandler = The handler to call for each incoming request.
|
||||
* workerCount = The number of workers to use in the task pool.
|
||||
* port = The port.
|
||||
*/
|
||||
this(HttpRequestHandler requestHandler, in Http1TransportConfig config = defaultConfig()) {
|
||||
super(requestHandler, config.port);
|
||||
this.config = config;
|
||||
this.httpTaskPool = new TaskPool(config.workerCount);
|
||||
}
|
||||
|
||||
override void runServer() {
|
||||
Socket serverSocket = new TcpSocket();
|
||||
serverSocket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1);
|
||||
serverSocket.bind(parseAddress(config.host, config.port));
|
||||
debugF!"Bound the server socket to %s"(serverSocket.localAddress);
|
||||
serverSocket.listen(1024);
|
||||
debug_("Server is now listening.");
|
||||
|
||||
while (super.isRunning) {
|
||||
try {
|
||||
trace("Waiting to accept a new socket.");
|
||||
Socket clientSocket = serverSocket.accept();
|
||||
trace("Accepted a new socket.");
|
||||
auto t = task!handleClient(clientSocket, requestHandler);
|
||||
this.httpTaskPool.put(t);
|
||||
trace("Added handleClient() task to the task pool.");
|
||||
} catch (SocketAcceptException e) {
|
||||
warn("Failed to accept socket connection.", e);
|
||||
}
|
||||
}
|
||||
serverSocket.close();
|
||||
this.httpTaskPool.stop();
|
||||
}
|
||||
|
||||
override void stop() {
|
||||
super.stop();
|
||||
// Send a dummy request to cause the server's blocking accept() call to end.
|
||||
try {
|
||||
Socket dummySocket = new TcpSocket(new InternetAddress("127.0.0.1", port));
|
||||
dummySocket.shutdown(SocketShutdown.BOTH);
|
||||
dummySocket.close();
|
||||
} catch (SocketOSException e) {
|
||||
warn("Failed to send empty request to stop server.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
unittest {
|
||||
import slf4d.default_provider;
|
||||
auto logProvider = DefaultProvider.builder().withRootLoggingLevel(Levels.DEBUG).build();
|
||||
configureLoggingProvider(logProvider);
|
||||
|
||||
HttpRequestHandler handler = HttpRequestHandler.of(
|
||||
(ref ServerHttpRequest request, ref ServerHttpResponse response) {
|
||||
response.status = HttpStatus.OK;
|
||||
});
|
||||
testHttp1Transport(new TaskPoolHttp1Transport(handler));
|
||||
|
||||
HttpRequestHandler handler2 = HttpRequestHandler.of(
|
||||
(ref ServerHttpRequest request, ref ServerHttpResponse response) {
|
||||
response.status = HttpStatus.OK;
|
||||
response.writeBodyString("Testing");
|
||||
});
|
||||
testHttp1Transport(new TaskPoolHttp1Transport(handler2));
|
||||
}
|
|
@ -1,6 +1,7 @@
|
|||
module handy_http_transport.http1.transport;
|
||||
|
||||
import std.socket;
|
||||
import core.atomic : atomicStore, atomicLoad;
|
||||
|
||||
import handy_http_transport.interfaces;
|
||||
import handy_http_transport.helpers;
|
||||
|
@ -10,51 +11,97 @@ import handy_http_primitives;
|
|||
import handy_http_primitives.address;
|
||||
|
||||
import streams;
|
||||
import photon;
|
||||
import slf4d;
|
||||
|
||||
/**
|
||||
* The HTTP/1.1 transport protocol implementation, using Dimitry Olshansky's
|
||||
* Photon fiber scheduling library for concurrency.
|
||||
* Base class for HTTP/1.1 transport, where different subclasses can define
|
||||
* how the actual socket communication works (threadpool / epoll/ etc).
|
||||
*/
|
||||
class Http1Transport : HttpTransport {
|
||||
private Socket serverSocket;
|
||||
private HttpRequestHandler requestHandler;
|
||||
private const ushort port;
|
||||
abstract class Http1Transport : HttpTransport {
|
||||
protected HttpRequestHandler requestHandler;
|
||||
protected immutable ushort port;
|
||||
private bool running = false;
|
||||
|
||||
this(HttpRequestHandler requestHandler, ushort port = 8080) {
|
||||
assert(requestHandler !is null);
|
||||
this.serverSocket = new TcpSocket();
|
||||
this.requestHandler = requestHandler;
|
||||
this.port = port;
|
||||
}
|
||||
|
||||
void start() {
|
||||
startloop();
|
||||
go(() => runServer());
|
||||
runFibers();
|
||||
bool isRunning() {
|
||||
return atomicLoad(running);
|
||||
}
|
||||
|
||||
void start() {
|
||||
infoF!"Starting Http1Transport server on port %d."(port);
|
||||
atomicStore(running, true);
|
||||
runServer();
|
||||
}
|
||||
|
||||
protected abstract void runServer();
|
||||
|
||||
void stop() {
|
||||
this.running = false;
|
||||
this.serverSocket.shutdown(SocketShutdown.BOTH);
|
||||
this.serverSocket.close();
|
||||
infoF!"Stopping Http1Transport server on port %d."(port);
|
||||
atomicStore(running, false);
|
||||
}
|
||||
}
|
||||
|
||||
private void runServer() {
|
||||
this.running = true;
|
||||
serverSocket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1);
|
||||
serverSocket.bind(new InternetAddress("127.0.0.1", port));
|
||||
serverSocket.listen(100);
|
||||
while (running) {
|
||||
try {
|
||||
Socket clientSocket = serverSocket.accept();
|
||||
go(() => handleClient(clientSocket, requestHandler));
|
||||
} catch (SocketAcceptException e) {
|
||||
import std.stdio;
|
||||
stderr.writefln!"Failed to accept socket connection: %s"(e);
|
||||
version(unittest) {
|
||||
/**
|
||||
* A generic test to ensure that any Http1Transport implementation behaves
|
||||
* properly to start & stop, and process requests when running.
|
||||
*
|
||||
* It's assumed that the given transport is configured to run on localhost,
|
||||
* port 8080, and return a standard 200 OK empty response to all requests.
|
||||
* Params:
|
||||
* transport = The transport implementation to test.
|
||||
*/
|
||||
void testHttp1Transport(Http1Transport transport) {
|
||||
import core.thread;
|
||||
import std.string;
|
||||
infoF!"Testing Http1Transport implementation: %s"(transport);
|
||||
|
||||
Thread thread = transport.startInNewThread();
|
||||
Thread.sleep(msecs(100));
|
||||
|
||||
Socket clientSocket1 = new TcpSocket(new InternetAddress(8080));
|
||||
const requestBody = "POST /users HTTP/1.1\r\n" ~
|
||||
"Host: example.com\r\n" ~
|
||||
"Content-Type: text/plain\r\n" ~
|
||||
"Content-Length: 13\r\n" ~
|
||||
"\r\n" ~
|
||||
"Hello, world!";
|
||||
ptrdiff_t bytesSent = clientSocket1.send(requestBody);
|
||||
assert(bytesSent == requestBody.length, "Couldn't send the full request body to the server.");
|
||||
|
||||
ubyte[8192] buffer;
|
||||
size_t totalBytesReceived = 0;
|
||||
ptrdiff_t bytesReceived;
|
||||
do {
|
||||
bytesReceived = clientSocket1.receive(buffer[totalBytesReceived .. $]);
|
||||
if (bytesReceived == Socket.ERROR) {
|
||||
assert(false, "Socket error when attempting to receive a response from the HttpTransport server.");
|
||||
}
|
||||
}
|
||||
totalBytesReceived += bytesReceived;
|
||||
} while (bytesReceived > 0);
|
||||
|
||||
string httpResponseContent = cast(string) buffer[0 .. totalBytesReceived];
|
||||
string[] parts = httpResponseContent.split("\r\n\r\n");
|
||||
assert(parts.length > 0, "HTTP 1.1 response is missing required status and headers section:\n\n" ~ httpResponseContent);
|
||||
string[] headerLines = parts[0].split("\r\n");
|
||||
assert(headerLines.length > 0, "HTTP 1.1 response is missing required status line.");
|
||||
string statusLine = headerLines[0];
|
||||
string[] statusLineParts = statusLine.split(" ");
|
||||
assert(statusLineParts[0] == "HTTP/1.1");
|
||||
assert(
|
||||
statusLineParts[1] == "200",
|
||||
format!"Expected status line's HTTP code to be 200, but it was \"%s\"."(statusLineParts[1])
|
||||
);
|
||||
assert(statusLineParts[2] == "OK");
|
||||
|
||||
info("Testing is complete. Stopping the server.");
|
||||
transport.stop();
|
||||
thread.join();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -68,32 +115,83 @@ class Http1Transport : HttpTransport {
|
|||
* requestHandler = The request handler that will handle the received HTTP request.
|
||||
*/
|
||||
void handleClient(Socket clientSocket, HttpRequestHandler requestHandler) {
|
||||
auto inputStream = SocketInputStream(clientSocket);
|
||||
auto bufferedInput = bufferedInputStreamFor!(8192)(inputStream);
|
||||
SocketInputStream* inputStream = new SocketInputStream(clientSocket);
|
||||
BufferedInputStream!(SocketInputStream*, 8192)* bufferedInput
|
||||
= new BufferedInputStream!(SocketInputStream*, 8192)(inputStream);
|
||||
// Get remote address from the socket.
|
||||
import handy_http_primitives.address;
|
||||
ClientAddress addr = getAddress(clientSocket);
|
||||
auto result = readHttpRequest(&bufferedInput, addr);
|
||||
traceF!"Got request from client: %s"(addr.toString());
|
||||
auto result = readHttpRequest(bufferedInput, addr);
|
||||
if (result.hasError) {
|
||||
import std.stdio;
|
||||
stderr.writeln("Failed to read HTTP request: " ~ result.error.message);
|
||||
if (result.error.code != -1) {
|
||||
// Only warn if we didn't read an empty request.
|
||||
warnF!"Failed to read request: %s"(result.error.message);
|
||||
}
|
||||
inputStream.closeStream();
|
||||
return;
|
||||
}
|
||||
scope ServerHttpRequest request = result.request;
|
||||
scope ServerHttpResponse response;
|
||||
SocketOutputStream outputStream = SocketOutputStream(clientSocket);
|
||||
response.outputStream = outputStreamObjectFor(HttpResponseOutputStream!(SocketOutputStream*)(
|
||||
&outputStream,
|
||||
&response
|
||||
));
|
||||
SocketOutputStream* outputStream = new SocketOutputStream(clientSocket);
|
||||
HttpResponseOutputStream!(SocketOutputStream*) responseOutputStream
|
||||
= HttpResponseOutputStream!(SocketOutputStream*)(
|
||||
outputStream,
|
||||
&response
|
||||
);
|
||||
response.outputStream = outputStreamObjectFor(&responseOutputStream);
|
||||
try {
|
||||
requestHandler.handle(request, response);
|
||||
debugF!"%s %s -> %d %s"(request.method, request.url, response.status.code, response.status.text);
|
||||
// If the response's headers aren't flushed yet, write them now.
|
||||
if (!responseOutputStream.areHeadersFlushed()) {
|
||||
trace("Flushing response headers because they weren't flushed by the request handler.");
|
||||
auto writeResult = responseOutputStream.writeHeaders();
|
||||
if (writeResult.hasError) {
|
||||
errorF!"Failed to write response headers: %s"(writeResult.error.message);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
import std.stdio;
|
||||
stderr.writeln("Exception thrown while handling request: " ~ e.msg);
|
||||
error("Exception thrown while handling request.", e);
|
||||
} catch (Throwable t) {
|
||||
errorF!"Throwable error while handling request: %s"(t.msg);
|
||||
throw t;
|
||||
}
|
||||
inputStream.closeStream();
|
||||
|
||||
if (response.status != HttpStatus.SWITCHING_PROTOCOLS) {
|
||||
inputStream.closeStream();
|
||||
}
|
||||
}
|
||||
|
||||
// Test case where we use a local socket pair to test the full handleClient
|
||||
// workflow from the HttpRequestHandler's point of view.
|
||||
unittest {
|
||||
Socket[2] sockets = socketPair();
|
||||
Socket clientSocket = sockets[0];
|
||||
Socket serverSocket = sockets[1];
|
||||
const requestContent =
|
||||
"POST /data HTTP/1.1\r\n" ~
|
||||
"Content-Type: application/json\r\n" ~
|
||||
"Content-Length: 22\r\n" ~
|
||||
"\r\n" ~
|
||||
"{\"x\": 5, \"flag\": true}";
|
||||
clientSocket.send(cast(ubyte[]) requestContent);
|
||||
|
||||
class TestHandler : HttpRequestHandler {
|
||||
import std.conv;
|
||||
|
||||
void handle(ref ServerHttpRequest request, ref ServerHttpResponse response) {
|
||||
assert(request.headers["Content-Type"] == ["application/json"]);
|
||||
assert("Content-Length" in request.headers && request.headers["Content-Length"].length > 0);
|
||||
ulong contentLength = request.headers["Content-Length"][0].to!ulong;
|
||||
assert(contentLength == 22);
|
||||
ubyte[22] bodyBuffer;
|
||||
auto readResult = request.inputStream.readFromStream(bodyBuffer);
|
||||
assert(readResult.hasCount && readResult.count == 22);
|
||||
assert(cast(string) bodyBuffer == "{\"x\": 5, \"flag\": true}");
|
||||
}
|
||||
}
|
||||
handleClient(serverSocket, new TestHandler());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -143,9 +241,17 @@ alias HttpRequestParseResult = Either!(ServerHttpRequest, "request", StreamError
|
|||
*/
|
||||
HttpRequestParseResult readHttpRequest(S)(S inputStream, in ClientAddress addr) if (isByteInputStream!S) {
|
||||
auto methodStr = consumeUntil(inputStream, " ");
|
||||
if (methodStr.hasError) return HttpRequestParseResult(methodStr.error);
|
||||
if (methodStr.hasError) {
|
||||
if (methodStr.error.code == 0) {
|
||||
// Set a custom code to indicate an empty request.
|
||||
return HttpRequestParseResult(StreamError(methodStr.error.message, -1));
|
||||
}
|
||||
return HttpRequestParseResult(methodStr.error);
|
||||
}
|
||||
|
||||
auto urlStr = consumeUntil(inputStream, " ");
|
||||
if (urlStr.hasError) return HttpRequestParseResult(urlStr.error);
|
||||
|
||||
auto versionStr = consumeUntil(inputStream, "\r\n");
|
||||
if (versionStr.hasError) return HttpRequestParseResult(versionStr.error);
|
||||
|
||||
|
@ -159,6 +265,8 @@ HttpRequestParseResult readHttpRequest(S)(S inputStream, in ClientAddress addr)
|
|||
auto headersResult = parseHeaders(inputStream);
|
||||
if (headersResult.hasError) return HttpRequestParseResult(headersResult.error);
|
||||
|
||||
auto queryParams = parseQueryParameters(urlStr.value);
|
||||
|
||||
import std.uri : decode; // TODO: Remove dependency on phobos for this?
|
||||
|
||||
return HttpRequestParseResult(ServerHttpRequest(
|
||||
|
@ -167,15 +275,57 @@ HttpRequestParseResult readHttpRequest(S)(S inputStream, in ClientAddress addr)
|
|||
methodStr.value,
|
||||
decode(urlStr.value),
|
||||
headersResult.headers,
|
||||
queryParams,
|
||||
inputStreamObjectFor(inputStream)
|
||||
));
|
||||
}
|
||||
|
||||
unittest {
|
||||
import streams;
|
||||
|
||||
auto makeStream(string text) {
|
||||
return arrayInputStreamFor(cast(ubyte[]) text);
|
||||
}
|
||||
|
||||
// Basic HTTP request.
|
||||
ArrayInputStream!ubyte s1 = makeStream(
|
||||
"GET /test?x=5 HTTP/1.1\r\n" ~
|
||||
"Accept: text/plain\r\n" ~
|
||||
"\r\n"
|
||||
);
|
||||
auto r1 = readHttpRequest(&s1, ClientAddress.unknown());
|
||||
assert(r1.hasRequest);
|
||||
assert(r1.request.httpVersion == HttpVersion.V1);
|
||||
assert(r1.request.method == HttpMethod.GET);
|
||||
assert(r1.request.url == "/test?x=5");
|
||||
const r1ExpectedHeaders = ["Accept": ["text/plain"]];
|
||||
assert(r1.request.headers == r1ExpectedHeaders);
|
||||
assert(r1.request.clientAddress == ClientAddress.unknown());
|
||||
|
||||
// POST request with body. Test that the body is read correctly.
|
||||
ArrayInputStream!ubyte s2 = makeStream(
|
||||
"POST /data HTTP/1.1\r\n" ~
|
||||
"Content-Type: text/plain\r\n" ~
|
||||
"Content-Length: 12\r\n" ~
|
||||
"\r\n" ~
|
||||
"Hello world!"
|
||||
);
|
||||
auto r2 = readHttpRequest(&s2, ClientAddress.unknown());
|
||||
assert(r2.hasRequest);
|
||||
assert(r2.request.method == HttpMethod.POST);
|
||||
ubyte[12] r2BodyBuffer;
|
||||
StreamResult r2BodyReadResult = s2.readFromStream(r2BodyBuffer);
|
||||
assert(r2BodyReadResult.count == 12);
|
||||
assert(cast(string) r2BodyBuffer == "Hello world!");
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses HTTP headers from an input stream, and returns them as an associative
|
||||
* array mapping header names to their list of values.
|
||||
* Params:
|
||||
* inputStream = The byte input stream to read from.
|
||||
* inputStream = The byte input stream to read from. Note that this stream
|
||||
* should be passed as a pointer / reference, values will be
|
||||
* consumed from the stream.
|
||||
* Returns: Either the headers, or a stream error.
|
||||
*/
|
||||
Either!(string[][string], "headers", StreamError, "error") parseHeaders(S)(S inputStream) if (isByteInputStream!S) {
|
||||
|
@ -202,14 +352,40 @@ Either!(string[][string], "headers", StreamError, "error") parseHeaders(S)(S inp
|
|||
}
|
||||
|
||||
unittest {
|
||||
class TestHandler : HttpRequestHandler {
|
||||
void handle(ref ServerHttpRequest request, ref ServerHttpResponse response) {
|
||||
response.status = HttpStatus.OK;
|
||||
response.headers.add("Content-Type", "application/json");
|
||||
response.outputStream.writeToStream(cast(ubyte[]) "{\"a\": 1}");
|
||||
}
|
||||
import streams;
|
||||
|
||||
auto makeStream(string text) {
|
||||
return arrayInputStreamFor(cast(ubyte[]) text);
|
||||
}
|
||||
|
||||
HttpTransport tp = new Http1Transport(new TestHandler(), 8080);
|
||||
tp.start();
|
||||
// Basic valid headers.
|
||||
auto s1 = makeStream("Content-Type: application/json\r\n\r\n");
|
||||
auto r1 = parseHeaders(&s1);
|
||||
assert(r1.hasHeaders);
|
||||
assert("Content-Type" in r1.headers);
|
||||
assert(r1.headers["Content-Type"] == ["application/json"]);
|
||||
|
||||
// Multiple headers.
|
||||
auto s2 = makeStream("Accept: text, json, image\r\nContent-Length: 1234\r\n\r\n");
|
||||
auto r2 = parseHeaders(&s2);
|
||||
assert(r2.hasHeaders);
|
||||
assert("Accept" in r2.headers);
|
||||
assert(r2.headers["Accept"] == ["text, json, image"]);
|
||||
assert(r2.headers["Content-Length"] == ["1234"]);
|
||||
|
||||
// Basic invalid header string.
|
||||
auto s3 = makeStream("Invalid headers");
|
||||
auto r3 = parseHeaders(&s3);
|
||||
assert(r3.hasError);
|
||||
|
||||
// No trailing \r\n
|
||||
auto s4 = makeStream("Content-Type: application/json");
|
||||
auto r4 = parseHeaders(&s4);
|
||||
assert(r4.hasError);
|
||||
|
||||
// Empty headers.
|
||||
auto s5 = makeStream("\r\n");
|
||||
auto r5 = parseHeaders(&s5);
|
||||
assert(r5.hasHeaders);
|
||||
assert(r5.headers.length == 0);
|
||||
}
|
||||
|
|
|
@ -4,3 +4,21 @@ interface HttpTransport {
|
|||
void start();
|
||||
void stop();
|
||||
}
|
||||
|
||||
import core.thread;
|
||||
|
||||
/**
|
||||
* Starts a new thread to run an HTTP transport implementation in, separate
|
||||
* from the calling thread. This is useful for running a server in the
|
||||
* background, like for integration tests.
|
||||
* Params:
|
||||
* transport = The transport implementation to start.
|
||||
* Returns: The thread that was started.
|
||||
*/
|
||||
Thread startInNewThread(HttpTransport transport) {
|
||||
Thread t = new Thread(() {
|
||||
transport.start();
|
||||
});
|
||||
t.start();
|
||||
return t;
|
||||
}
|
||||
|
|
|
@ -2,3 +2,6 @@ module handy_http_transport;
|
|||
|
||||
public import handy_http_transport.http1;
|
||||
public import handy_http_transport.http2;
|
||||
|
||||
public import handy_http_transport.interfaces;
|
||||
public import handy_http_transport.response_output_stream;
|
||||
|
|
|
@ -3,6 +3,7 @@ module handy_http_transport.response_output_stream;
|
|||
import handy_http_transport.helpers : writeUIntToBuffer;
|
||||
import handy_http_primitives : ServerHttpResponse;
|
||||
import streams;
|
||||
import slf4d;
|
||||
|
||||
/**
|
||||
* A wrapper around a byte output stream that's used for writing HTTP response
|
||||
|
@ -23,6 +24,15 @@ struct HttpResponseOutputStream(S) if (isByteOutputStream!S) {
|
|||
this.response = response;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determines if the HTTP response headers have been flushed to the
|
||||
* underlying output stream.
|
||||
* Returns: `true` if the headers have been flushed, `false` otherwise.
|
||||
*/
|
||||
bool areHeadersFlushed() const {
|
||||
return headersFlushed;
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes the given data to the stream. If the referenced HTTP response's
|
||||
* status and headers haven't yet been written, they will be written first.
|
||||
|
@ -37,7 +47,6 @@ struct HttpResponseOutputStream(S) if (isByteOutputStream!S) {
|
|||
auto result = writeHeaders();
|
||||
if (result.hasError) return result;
|
||||
bytesWritten += result.count;
|
||||
headersFlushed = true;
|
||||
}
|
||||
auto result = outputStream.writeToStream(buffer);
|
||||
if (result.hasError) return result;
|
||||
|
@ -50,28 +59,87 @@ struct HttpResponseOutputStream(S) if (isByteOutputStream!S) {
|
|||
* Returns: The stream result of writing.
|
||||
*/
|
||||
StreamResult writeHeaders() {
|
||||
// TODO: Come up with a better way of writing headers than string concatenation.
|
||||
if (headersFlushed) {
|
||||
return StreamResult(0); // No need to write again.
|
||||
}
|
||||
debug_("Flushing HTTP status and headers to the output stream.");
|
||||
headersFlushed = true;
|
||||
size_t idx = 0;
|
||||
char[6] statusCodeBuffer; // Normal HTTP codes are 3 digits, but this leaves room for extensions.
|
||||
writeUIntToBuffer(response.status.code, statusCodeBuffer, idx);
|
||||
|
||||
string statusAndHeaders = "HTTP/1.1 "
|
||||
~ cast(string) statusCodeBuffer[0..idx]
|
||||
~ " " ~ response.status.text
|
||||
~ "\r\n";
|
||||
// Write the status line.
|
||||
StreamResult r = outputStream.writeToStream(cast(ubyte[]) "HTTP/1.1 ");
|
||||
if (r.hasError) return r;
|
||||
size_t writeCount = r.count;
|
||||
traceF!"Wrote HTTP version. Bytes written: %d."(writeCount);
|
||||
r = outputStream.writeToStream(cast(ubyte[]) statusCodeBuffer[0..idx]);
|
||||
if (r.hasError) return r;
|
||||
writeCount += r.count;
|
||||
traceF!"Wrote status code. Bytes written: %d."(writeCount);
|
||||
r = outputStream.writeToStream([' ']);
|
||||
if (r.hasError) return r;
|
||||
writeCount += r.count;
|
||||
r = outputStream.writeToStream(cast(ubyte[]) response.status.text);
|
||||
if (r.hasError) return r;
|
||||
writeCount += r.count;
|
||||
r = outputStream.writeToStream(['\r', '\n']);
|
||||
if (r.hasError) return r;
|
||||
writeCount += r.count;
|
||||
traceF!"Wrote HTTP status line. Bytes written: %d."(writeCount);
|
||||
|
||||
foreach (headerName; response.headers.keys) {
|
||||
string headerLine = headerName ~ ": ";
|
||||
// Write the header name.
|
||||
traceF!"Writing header name: %s"(headerName);
|
||||
r = outputStream.writeToStream(cast(ubyte[]) headerName);
|
||||
if (r.hasError) return r;
|
||||
writeCount += r.count;
|
||||
r = outputStream.writeToStream([':', ' ']);
|
||||
if (r.hasError) return r;
|
||||
writeCount += r.count;
|
||||
// Write the comma-separated list of values.
|
||||
string[] headerValues = response.headers.getAll(headerName);
|
||||
for (size_t i = 0; i < headerValues.length; i++) {
|
||||
headerLine ~= headerValues[i];
|
||||
r = outputStream.writeToStream(cast(ubyte[]) headerValues[i]);
|
||||
if (r.hasError) return r;
|
||||
writeCount += r.count;
|
||||
if (i + 1 < headerValues.length) {
|
||||
headerLine ~= ", ";
|
||||
r = outputStream.writeToStream([',', ' ']);
|
||||
if (r.hasError) return r;
|
||||
writeCount += r.count;
|
||||
}
|
||||
}
|
||||
headerLine ~= "\r\n";
|
||||
statusAndHeaders ~= headerLine;
|
||||
r = outputStream.writeToStream(['\r', '\n']);
|
||||
if (r.hasError) return r;
|
||||
writeCount += r.count;
|
||||
traceF!"Wrote header %s: %s"(headerName, headerValues);
|
||||
}
|
||||
statusAndHeaders ~= "\r\n"; // Trailing CLRF before the body.
|
||||
return outputStream.writeToStream(cast(ubyte[]) statusAndHeaders);
|
||||
r = outputStream.writeToStream(['\r', '\n']); // Trailing CLRF before the body.
|
||||
if (r.hasError) return r;
|
||||
writeCount += r.count;
|
||||
return StreamResult(cast(uint) writeCount);
|
||||
}
|
||||
}
|
||||
|
||||
// Test basic functionality for writing a standard response with headers and a
|
||||
// body.
|
||||
unittest {
|
||||
import handy_http_primitives.response;
|
||||
|
||||
ArrayOutputStream!ubyte os;
|
||||
ServerHttpResponse resp;
|
||||
resp.status = HttpStatus.OK;
|
||||
resp.headers.add("Content-Type", "text/plain");
|
||||
auto httpOut = HttpResponseOutputStream!(ArrayOutputStream!ubyte*)(&os, &resp);
|
||||
resp.outputStream = outputStreamObjectFor(&httpOut);
|
||||
assert(httpOut.areHeadersFlushed() == false);
|
||||
StreamResult r = resp.outputStream.writeToStream(cast(ubyte[]) "Hello world!");
|
||||
const expectedOutput =
|
||||
"HTTP/1.1 200 OK\r\n" ~
|
||||
"Content-Type: text/plain\r\n" ~
|
||||
"\r\n" ~
|
||||
"Hello world!";
|
||||
assert(httpOut.areHeadersFlushed() == true);
|
||||
assert(os.toArray() == expectedOutput);
|
||||
assert(r.hasCount);
|
||||
assert(r.count == os.toArray().length);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue