Compare commits
No commits in common. "main" and "v1.0.0" have entirely different histories.
|
@ -1,32 +0,0 @@
|
||||||
name: Build and Test Module
|
|
||||||
on:
|
|
||||||
push:
|
|
||||||
paths:
|
|
||||||
- 'source/**'
|
|
||||||
- '.gitea/workflows/ci.yaml'
|
|
||||||
pull_request:
|
|
||||||
types: [opened, reopened, synchronize]
|
|
||||||
jobs:
|
|
||||||
build-and-test:
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
steps:
|
|
||||||
- uses: actions/checkout@v4
|
|
||||||
- name: Setup DLang
|
|
||||||
uses: dlang-community/setup-dlang@v2
|
|
||||||
with:
|
|
||||||
compiler: ldc-latest
|
|
||||||
- name: Build and Test
|
|
||||||
run: dub -q test
|
|
||||||
|
|
||||||
integration-tests:
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
steps:
|
|
||||||
- uses: actions/checkout@v4
|
|
||||||
- name: Setup DLang
|
|
||||||
uses: dlang-community/setup-dlang@v2
|
|
||||||
with:
|
|
||||||
compiler: ldc-latest
|
|
||||||
|
|
||||||
- name: http1-test
|
|
||||||
working-directory: integration-tests/http1-basic
|
|
||||||
run: dub run --single http1-test.d
|
|
12
README.md
12
README.md
|
@ -1,16 +1,10 @@
|
||||||
# http-transport
|
# http-transport
|
||||||
|
|
||||||
This library provides implementations of various versions of HTTP transport,
|
Implementations of HTTP transport protocols, compatible with other Handy-Http components.
|
||||||
acting as a "glue" for connecting clients and servers. Practically speaking,
|
|
||||||
the handy-http-transport library provides HTTP server implementations you can
|
|
||||||
use interchangeably with other handy-http libraries.
|
|
||||||
|
|
||||||
For now, see the section on HTTP/1.1, as that's the only HTTP version
|
|
||||||
implemented so far.
|
|
||||||
|
|
||||||
## HTTP/1.1
|
## HTTP/1.1
|
||||||
|
|
||||||
Use the `TaskPoolHttp1Transport` implementation of `HttpTransport` to serve content
|
Use the `Http1Transport` implementation of `HttpTransport` to serve content
|
||||||
using the HTTP/1.1 protocol. See the example below:
|
using the HTTP/1.1 protocol. See the example below:
|
||||||
|
|
||||||
```d
|
```d
|
||||||
|
@ -26,7 +20,7 @@ class MyHandler : HttpRequestHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
void main() {
|
void main() {
|
||||||
HttpTransport tp = new TaskPoolHttp1Transport(new MyHandler(), 8080);
|
HttpTransport tp = new Http1Transport(new MyHandler(), 8080);
|
||||||
tp.start();
|
tp.start();
|
||||||
}
|
}
|
||||||
```
|
```
|
6
dub.json
6
dub.json
|
@ -4,9 +4,9 @@
|
||||||
],
|
],
|
||||||
"copyright": "Copyright © 2024, Andrew Lalis",
|
"copyright": "Copyright © 2024, Andrew Lalis",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"handy-http-primitives": "~>1.8",
|
"handy-http-primitives": "~>1.0.0",
|
||||||
"streams": "~>3.6",
|
"photon": "~>0.10.2",
|
||||||
"slf4d": "~>4.0"
|
"streams": "~>3.5.0"
|
||||||
},
|
},
|
||||||
"description": "Implementations of HTTP transport protocols.",
|
"description": "Implementations of HTTP transport protocols.",
|
||||||
"license": "CC0",
|
"license": "CC0",
|
||||||
|
|
|
@ -1,8 +1,9 @@
|
||||||
{
|
{
|
||||||
"fileVersion": 1,
|
"fileVersion": 1,
|
||||||
"versions": {
|
"versions": {
|
||||||
"handy-http-primitives": "1.8.0",
|
"handy-http-primitives": "1.0.0",
|
||||||
"slf4d": "4.1.1",
|
"photon": "0.10.2",
|
||||||
"streams": "3.6.0"
|
"sharded-map": "2.7.0",
|
||||||
|
"streams": "3.5.0"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1 +0,0 @@
|
||||||
http1-test
|
|
|
@ -1,53 +0,0 @@
|
||||||
/+ dub.sdl:
|
|
||||||
dependency "handy-http-transport" path="../../"
|
|
||||||
dependency "requests" version="~>2.1"
|
|
||||||
+/
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This tests the basic HTTP functionality of the Http1Transport implementation
|
|
||||||
* by starting a server, sending a request, and checking the response.
|
|
||||||
*/
|
|
||||||
module integration_tests.http1_test;
|
|
||||||
|
|
||||||
import handy_http_primitives;
|
|
||||||
import handy_http_transport;
|
|
||||||
import slf4d;
|
|
||||||
import slf4d.default_provider;
|
|
||||||
import requests;
|
|
||||||
|
|
||||||
import core.thread;
|
|
||||||
|
|
||||||
int main() {
|
|
||||||
auto loggingProvider = DefaultProvider.builder()
|
|
||||||
.withRootLoggingLevel(Levels.INFO)
|
|
||||||
.withConsoleSerializer(true, 48)
|
|
||||||
.build();
|
|
||||||
configureLoggingProvider(loggingProvider);
|
|
||||||
|
|
||||||
HttpTransport transport = new TaskPoolHttp1Transport(HttpRequestHandler.of(&handleRequest));
|
|
||||||
Thread thread = transport.startInNewThread();
|
|
||||||
scope(exit) {
|
|
||||||
transport.stop();
|
|
||||||
thread.join();
|
|
||||||
}
|
|
||||||
info("Started server in another thread.");
|
|
||||||
Thread.sleep(msecs(100)); // Wait for the server to start.
|
|
||||||
|
|
||||||
// Send a simple GET request to the server.
|
|
||||||
info("Sending GET request to http://localhost:8080");
|
|
||||||
auto content = getContent("http://localhost:8080");
|
|
||||||
ubyte[] data = content.data;
|
|
||||||
if (data.length != 13 || (cast(string) data) != "Hello, world!") {
|
|
||||||
error("Received unexpected content: " ~ cast(string) data);
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
info("Test completed successfully.");
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void handleRequest(ref ServerHttpRequest request, ref ServerHttpResponse response) {
|
|
||||||
response.headers.add("Content-Type", "text/plain");
|
|
||||||
response.headers.add("Content-Length", "13");
|
|
||||||
response.outputStream.writeToStream(cast(ubyte[]) "Hello, world!");
|
|
||||||
}
|
|
|
@ -1 +0,0 @@
|
||||||
http1-speed-test
|
|
|
@ -1,28 +0,0 @@
|
||||||
/+ dub.sdl:
|
|
||||||
dependency "handy-http-transport" path="../../"
|
|
||||||
+/
|
|
||||||
module integration_tests.http1_speed_test;
|
|
||||||
|
|
||||||
import handy_http_primitives;
|
|
||||||
import handy_http_transport;
|
|
||||||
import slf4d;
|
|
||||||
import slf4d.default_provider;
|
|
||||||
|
|
||||||
void main() {
|
|
||||||
auto loggingProvider = DefaultProvider.builder()
|
|
||||||
.withRootLoggingLevel(Levels.ERROR)
|
|
||||||
.withConsoleSerializer(true, 48)
|
|
||||||
.build();
|
|
||||||
configureLoggingProvider(loggingProvider);
|
|
||||||
HttpTransport transport;
|
|
||||||
transport = new TaskPoolHttp1Transport(HttpRequestHandler.of(
|
|
||||||
(ref ServerHttpRequest request, ref ServerHttpResponse response) {
|
|
||||||
if (request.method == HttpMethod.DELETE) {
|
|
||||||
transport.stop();
|
|
||||||
}
|
|
||||||
response.headers.add("Content-Type", "text/plain");
|
|
||||||
response.headers.add("Content-Length", "13");
|
|
||||||
response.outputStream.writeToStream(cast(ubyte[]) "Hello, world!");
|
|
||||||
}));
|
|
||||||
transport.start();
|
|
||||||
}
|
|
|
@ -1,14 +0,0 @@
|
||||||
#!/usr/bin/env bash
|
|
||||||
|
|
||||||
# This script runs all integration tests.
|
|
||||||
|
|
||||||
set -e -o pipefail
|
|
||||||
|
|
||||||
cd http1-basic
|
|
||||||
dub build --single http1-test.d
|
|
||||||
./http1-test
|
|
||||||
cd ..
|
|
||||||
|
|
||||||
# cd http1-speed-test
|
|
||||||
# dub build --single --build=release http1-speed-test.d
|
|
||||||
|
|
|
@ -1,103 +0,0 @@
|
||||||
module handy_http_transport.helpers;
|
|
||||||
|
|
||||||
import streams;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Helper function to consume string content from an input stream until a
|
|
||||||
* certain target pattern of characters is encountered.
|
|
||||||
* Params:
|
|
||||||
* inputStream = The stream to read from.
|
|
||||||
* target = The target at which to stop reading.
|
|
||||||
* Returns: The string that was read, or a stream error.
|
|
||||||
*/
|
|
||||||
Either!(string, "value", StreamError, "error") consumeUntil(S)(
|
|
||||||
S inputStream,
|
|
||||||
string target
|
|
||||||
) if (isByteInputStream!S) {
|
|
||||||
ubyte[1024] buffer;
|
|
||||||
size_t idx;
|
|
||||||
while (true) {
|
|
||||||
auto result = inputStream.readFromStream(buffer[idx .. idx + 1]);
|
|
||||||
if (result.hasError) return Either!(string, "value", StreamError, "error")(result.error);
|
|
||||||
if (result.count != 1) return Either!(string, "value", StreamError, "error")(
|
|
||||||
StreamError("Failed to read a single element", 0)
|
|
||||||
);
|
|
||||||
idx++;
|
|
||||||
if (idx >= target.length && buffer[idx - target.length .. idx] == target) {
|
|
||||||
return Either!(string, "value", StreamError, "error")(
|
|
||||||
cast(string) buffer[0 .. idx - target.length].idup
|
|
||||||
);
|
|
||||||
} else if (idx >= buffer.length) {
|
|
||||||
return Either!(string, "value", StreamError, "error")(
|
|
||||||
StreamError("Couldn't find target \"" ~ target ~ "\" after reading 1024 bytes.", 1)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Internal helper function to get the first index of a character in a string.
|
|
||||||
* Params:
|
|
||||||
* s = The string to look in.
|
|
||||||
* c = The character to look for.
|
|
||||||
* offset = An optional offset to look from.
|
|
||||||
* Returns: The index of the character, or -1.
|
|
||||||
*/
|
|
||||||
ptrdiff_t indexOf(string s, char c, size_t offset = 0) {
|
|
||||||
for (size_t i = offset; i < s.length; i++) {
|
|
||||||
if (s[i] == c) return i;
|
|
||||||
}
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Internal helper function that returns the slice of a string excluding any
|
|
||||||
* preceding or trailing spaces.
|
|
||||||
* Params:
|
|
||||||
* s = The string to strip.
|
|
||||||
* Returns: The slice of the string that has been stripped.
|
|
||||||
*/
|
|
||||||
string stripSpaces(string s) {
|
|
||||||
if (s.length == 0) return s;
|
|
||||||
ptrdiff_t startIdx = 0;
|
|
||||||
while (startIdx < s.length && s[startIdx] == ' ') startIdx++;
|
|
||||||
s = s[startIdx .. $];
|
|
||||||
if (s.length == 0) return "";
|
|
||||||
ptrdiff_t endIdx = cast(ptrdiff_t) s.length - 1;
|
|
||||||
while (s[endIdx] == ' ' && endIdx >= 0) endIdx--;
|
|
||||||
return s[0 .. endIdx + 1];
|
|
||||||
}
|
|
||||||
|
|
||||||
unittest {
|
|
||||||
assert(stripSpaces("") == "");
|
|
||||||
assert(stripSpaces(" ") == "");
|
|
||||||
assert(stripSpaces("test") == "test");
|
|
||||||
assert(stripSpaces(" test") == "test");
|
|
||||||
assert(stripSpaces(" test string ") == "test string");
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Helper function to append an unsigned integer value to a char buffer. It is
|
|
||||||
* assumed that there's enough space to write the value.
|
|
||||||
* Params:
|
|
||||||
* value = The value to append.
|
|
||||||
* buffer = The buffer to append to.
|
|
||||||
* idx = A reference to a variable tracking the next writable index in the buffer.
|
|
||||||
*/
|
|
||||||
void writeUIntToBuffer(uint value, char[] buffer, ref size_t idx) {
|
|
||||||
const size_t startIdx = idx;
|
|
||||||
while (true) {
|
|
||||||
ubyte remainder = value % 10;
|
|
||||||
value /= 10;
|
|
||||||
buffer[idx++] = cast(char) ('0' + remainder);
|
|
||||||
if (value == 0) break;
|
|
||||||
}
|
|
||||||
// Swap the characters to proper order.
|
|
||||||
for (size_t i = 0; i < (idx - startIdx) / 2; i++) {
|
|
||||||
size_t p1 = i + startIdx;
|
|
||||||
size_t p2 = idx - i - 1;
|
|
||||||
char tmp = buffer[p1];
|
|
||||||
buffer[p1] = buffer[p2];
|
|
||||||
buffer[p2] = tmp;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,122 +0,0 @@
|
||||||
module handy_http_transport.http1.epoll;
|
|
||||||
|
|
||||||
import core.sys.posix.sys.socket;
|
|
||||||
import core.sys.linux.epoll;
|
|
||||||
import core.sys.posix.netinet.in_;
|
|
||||||
import core.sys.posix.fcntl;
|
|
||||||
import core.sys.posix.unistd;
|
|
||||||
|
|
||||||
import core.stdc.errno;
|
|
||||||
|
|
||||||
extern(C) {
|
|
||||||
int accept4(int sockfd, sockaddr *addr, socklen_t *addrlen, int flags);
|
|
||||||
}
|
|
||||||
|
|
||||||
import handy_http_transport.interfaces;
|
|
||||||
import handy_http_transport.http1;
|
|
||||||
import handy_http_primitives;
|
|
||||||
import slf4d;
|
|
||||||
|
|
||||||
class Http1EpollTransport : Http1Transport {
|
|
||||||
|
|
||||||
this(HttpRequestHandler requestHandler, ushort port) {
|
|
||||||
super(requestHandler, port);
|
|
||||||
}
|
|
||||||
|
|
||||||
override void start() {
|
|
||||||
super.start();
|
|
||||||
// Create the server socket.
|
|
||||||
enum SOCK_NONBLOCK = 0x4000;
|
|
||||||
int listenFd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
|
|
||||||
sockaddr_in serverAddress;
|
|
||||||
serverAddress.sin_family = AF_INET;
|
|
||||||
serverAddress.sin_port = htons(port);
|
|
||||||
serverAddress.sin_addr.s_addr = INADDR_ANY;
|
|
||||||
|
|
||||||
if (bind(listenFd, cast(sockaddr*) &serverAddress, serverAddress.sizeof) == -1) {
|
|
||||||
errorF!"Failed to bind socket: %d"(errno);
|
|
||||||
close(listenFd);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (listen(listenFd, SOMAXCONN) == -1) {
|
|
||||||
errorF!"Failed to listen on socket: %d"(errno);
|
|
||||||
close(listenFd);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
int epollFd = epoll_create1(0);
|
|
||||||
if (epollFd == -1) {
|
|
||||||
errorF!"Failed to create epoll instance: %d"(errno);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
epoll_event event;
|
|
||||||
epoll_event[64] events;
|
|
||||||
event.events = EPOLLIN | EPOLLET;
|
|
||||||
event.data.fd = listenFd;
|
|
||||||
if (epoll_ctl(epollFd, EPOLL_CTL_ADD, listenFd, &event) == -1) {
|
|
||||||
errorF!"Failed to add listen socket to epoll: %d"(errno);
|
|
||||||
close(listenFd);
|
|
||||||
close(epollFd);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
infoF!"Server listening on port %d."(port);
|
|
||||||
|
|
||||||
while (true) {
|
|
||||||
int eventCount = epoll_wait(epollFd, &event, 64, 5000);
|
|
||||||
if (eventCount == -1) {
|
|
||||||
errorF!"Epoll wait failed: %d"(errno);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int i = 0; i < eventCount; i++) {
|
|
||||||
if (events[i].data.fd == listenFd) {
|
|
||||||
// New incoming connection.
|
|
||||||
while (true) {
|
|
||||||
sockaddr_in clientAddress;
|
|
||||||
socklen_t clientAddressLength = clientAddress.sizeof;
|
|
||||||
int clientFd = accept4(
|
|
||||||
listenFd,
|
|
||||||
cast(sockaddr*) &clientAddress,
|
|
||||||
&clientAddressLength,
|
|
||||||
SOCK_NONBLOCK
|
|
||||||
);
|
|
||||||
if (clientFd == -1) {
|
|
||||||
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
|
||||||
// No more connections to accept.
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
errorF!"Failed to accept connection: %d"(errno);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add the client socket to epoll's listening list.
|
|
||||||
event.events = EPOLLIN | EPOLLET;
|
|
||||||
event.data.fd = clientFd;
|
|
||||||
if (epoll_ctl(epollFd, EPOLL_CTL_ADD, clientFd, &event) == -1) {
|
|
||||||
errorF!"Failed to add client socket to epoll: %d"(errno);
|
|
||||||
close(clientFd);
|
|
||||||
}
|
|
||||||
|
|
||||||
infoF!"Accepted new connection from %s:%d."(
|
|
||||||
inet_ntoa(clientAddress.sin_addr),
|
|
||||||
ntohs(clientAddress.sin_port)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Event on an existing client socket.
|
|
||||||
|
|
||||||
int clientFd = events[i].data.fd;
|
|
||||||
|
|
||||||
if (events[i].events & EPOLLIN) {
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -3,5 +3,3 @@
|
||||||
*/
|
*/
|
||||||
module handy_http_transport.http1;
|
module handy_http_transport.http1;
|
||||||
|
|
||||||
public import handy_http_transport.http1.transport;
|
|
||||||
public import handy_http_transport.http1.task_pool;
|
|
||||||
|
|
|
@ -1,62 +0,0 @@
|
||||||
module handy_http_transport.http1.task_pool;
|
|
||||||
|
|
||||||
import std.socket;
|
|
||||||
import std.parallelism;
|
|
||||||
|
|
||||||
import handy_http_transport.http1.transport;
|
|
||||||
import handy_http_primitives;
|
|
||||||
import slf4d;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* An implementation of Http1Transport which uses D's standard library
|
|
||||||
* parallelization, where each incoming client request is turned into a task
|
|
||||||
* and submitted to the standard task pool.
|
|
||||||
*/
|
|
||||||
class TaskPoolHttp1Transport : Http1Transport {
|
|
||||||
this(HttpRequestHandler requestHandler, ushort port = 8080) {
|
|
||||||
super(requestHandler, port);
|
|
||||||
}
|
|
||||||
|
|
||||||
override void runServer() {
|
|
||||||
Socket serverSocket = new TcpSocket();
|
|
||||||
serverSocket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1);
|
|
||||||
serverSocket.bind(new InternetAddress("127.0.0.1", port));
|
|
||||||
serverSocket.listen(1024);
|
|
||||||
|
|
||||||
while (super.isRunning) {
|
|
||||||
try {
|
|
||||||
Socket clientSocket = serverSocket.accept();
|
|
||||||
auto t = task!handleClient(clientSocket, requestHandler);
|
|
||||||
taskPool().put(t);
|
|
||||||
} catch (SocketAcceptException e) {
|
|
||||||
warn("Failed to accept socket connection.", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
serverSocket.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
override void stop() {
|
|
||||||
super.stop();
|
|
||||||
// Send a dummy request to cause the server's blocking accept() call to end.
|
|
||||||
try {
|
|
||||||
Socket dummySocket = new TcpSocket(new InternetAddress("127.0.0.1", port));
|
|
||||||
dummySocket.shutdown(SocketShutdown.BOTH);
|
|
||||||
dummySocket.close();
|
|
||||||
} catch (SocketOSException e) {
|
|
||||||
warn("Failed to send empty request to stop server.", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
unittest {
|
|
||||||
import slf4d.default_provider;
|
|
||||||
auto logProvider = DefaultProvider.builder().withRootLoggingLevel(Levels.DEBUG).build();
|
|
||||||
configureLoggingProvider(logProvider);
|
|
||||||
|
|
||||||
HttpRequestHandler handler = HttpRequestHandler.of(
|
|
||||||
(ref ServerHttpRequest request, ref ServerHttpResponse response) {
|
|
||||||
response.status = HttpStatus.OK;
|
|
||||||
response.writeBodyString("Testing");
|
|
||||||
});
|
|
||||||
testHttp1Transport(new TaskPoolHttp1Transport(handler));
|
|
||||||
}
|
|
|
@ -1,104 +1,56 @@
|
||||||
module handy_http_transport.http1.transport;
|
module handy_http_transport.http1.transport;
|
||||||
|
|
||||||
import std.socket;
|
import std.socket;
|
||||||
import core.atomic : atomicStore, atomicLoad;
|
|
||||||
|
|
||||||
import handy_http_transport.interfaces;
|
import handy_http_transport.interfaces;
|
||||||
import handy_http_transport.helpers;
|
|
||||||
import handy_http_transport.response_output_stream;
|
|
||||||
|
|
||||||
import handy_http_primitives;
|
import handy_http_primitives;
|
||||||
import handy_http_primitives.address;
|
|
||||||
|
|
||||||
import streams;
|
import streams;
|
||||||
import slf4d;
|
import photon;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Base class for HTTP/1.1 transport, where different subclasses can define
|
* The HTTP/1.1 transport protocol implementation, using Dimitry Olshansky's
|
||||||
* how the actual socket communication works (threadpool / epoll/ etc).
|
* Photon fiber scheduling library for concurrency.
|
||||||
*/
|
*/
|
||||||
abstract class Http1Transport : HttpTransport {
|
class Http1Transport : HttpTransport {
|
||||||
protected HttpRequestHandler requestHandler;
|
private Socket serverSocket;
|
||||||
protected immutable ushort port;
|
private HttpRequestHandler requestHandler;
|
||||||
|
private const ushort port;
|
||||||
private bool running = false;
|
private bool running = false;
|
||||||
|
|
||||||
this(HttpRequestHandler requestHandler, ushort port = 8080) {
|
this(HttpRequestHandler requestHandler, ushort port = 8080) {
|
||||||
assert(requestHandler !is null);
|
assert(requestHandler !is null);
|
||||||
|
this.serverSocket = new TcpSocket();
|
||||||
this.requestHandler = requestHandler;
|
this.requestHandler = requestHandler;
|
||||||
this.port = port;
|
this.port = port;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool isRunning() {
|
|
||||||
return atomicLoad(running);
|
|
||||||
}
|
|
||||||
|
|
||||||
void start() {
|
void start() {
|
||||||
infoF!"Starting Http1Transport server on port %d."(port);
|
startloop();
|
||||||
atomicStore(running, true);
|
go(() => runServer());
|
||||||
runServer();
|
runFibers();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract void runServer();
|
|
||||||
|
|
||||||
void stop() {
|
void stop() {
|
||||||
infoF!"Stopping Http1Transport server on port %d."(port);
|
this.running = false;
|
||||||
atomicStore(running, false);
|
this.serverSocket.shutdown(SocketShutdown.BOTH);
|
||||||
|
this.serverSocket.close();
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
version(unittest) {
|
private void runServer() {
|
||||||
/**
|
this.running = true;
|
||||||
* A generic test to ensure that any Http1Transport implementation behaves
|
serverSocket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1);
|
||||||
* properly to start & stop, and process requests when running.
|
serverSocket.bind(new InternetAddress("127.0.0.1", port));
|
||||||
*
|
serverSocket.listen(100);
|
||||||
* It's assumed that the given transport is configured to run on localhost,
|
while (running) {
|
||||||
* port 8080, and return a standard 200 OK empty response to all requests.
|
try {
|
||||||
* Params:
|
Socket clientSocket = serverSocket.accept();
|
||||||
* transport = The transport implementation to test.
|
go(() => handleClient(clientSocket, requestHandler));
|
||||||
*/
|
} catch (SocketAcceptException e) {
|
||||||
void testHttp1Transport(Http1Transport transport) {
|
import std.stdio;
|
||||||
import core.thread;
|
stderr.writefln!"Failed to accept socket connection: %s"(e);
|
||||||
import std.string;
|
|
||||||
infoF!"Testing Http1Transport implementation: %s"(transport);
|
|
||||||
|
|
||||||
Thread thread = transport.startInNewThread();
|
|
||||||
Thread.sleep(msecs(100));
|
|
||||||
|
|
||||||
Socket clientSocket1 = new TcpSocket(new InternetAddress(8080));
|
|
||||||
const requestBody = "POST /users HTTP/1.1\r\n" ~
|
|
||||||
"Host: example.com\r\n" ~
|
|
||||||
"Content-Type: text/plain\r\n" ~
|
|
||||||
"Content-Length: 13\r\n" ~
|
|
||||||
"\r\n" ~
|
|
||||||
"Hello, world!";
|
|
||||||
ptrdiff_t bytesSent = clientSocket1.send(requestBody);
|
|
||||||
assert(bytesSent == requestBody.length, "Couldn't send the full request body to the server.");
|
|
||||||
|
|
||||||
ubyte[8192] buffer;
|
|
||||||
size_t totalBytesReceived = 0;
|
|
||||||
ptrdiff_t bytesReceived;
|
|
||||||
do {
|
|
||||||
bytesReceived = clientSocket1.receive(buffer[totalBytesReceived .. $]);
|
|
||||||
if (bytesReceived == Socket.ERROR) {
|
|
||||||
assert(false, "Socket error when attempting to receive a response from the HttpTransport server.");
|
|
||||||
}
|
}
|
||||||
totalBytesReceived += bytesReceived;
|
}
|
||||||
} while (bytesReceived > 0);
|
|
||||||
|
|
||||||
string httpResponseContent = cast(string) buffer[0 .. totalBytesReceived];
|
|
||||||
string[] parts = httpResponseContent.split("\r\n\r\n");
|
|
||||||
assert(parts.length > 0, "HTTP 1.1 response is missing required status and headers section.");
|
|
||||||
string[] headerLines = parts[0].split("\r\n");
|
|
||||||
assert(headerLines.length > 0, "HTTP 1.1 response is missing required status line.");
|
|
||||||
string statusLine = headerLines[0];
|
|
||||||
string[] statusLineParts = statusLine.split(" ");
|
|
||||||
assert(statusLineParts[0] == "HTTP/1.1");
|
|
||||||
assert(statusLineParts[1] == "200");
|
|
||||||
assert(statusLineParts[2] == "OK");
|
|
||||||
|
|
||||||
info("Testing is complete. Stopping the server.");
|
|
||||||
transport.stop();
|
|
||||||
thread.join();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -112,108 +64,29 @@ version(unittest) {
|
||||||
* requestHandler = The request handler that will handle the received HTTP request.
|
* requestHandler = The request handler that will handle the received HTTP request.
|
||||||
*/
|
*/
|
||||||
void handleClient(Socket clientSocket, HttpRequestHandler requestHandler) {
|
void handleClient(Socket clientSocket, HttpRequestHandler requestHandler) {
|
||||||
SocketInputStream* inputStream = new SocketInputStream(clientSocket);
|
auto inputStream = SocketInputStream(clientSocket);
|
||||||
BufferedInputStream!(SocketInputStream*, 8192)* bufferedInput
|
auto bufferedInput = bufferedInputStreamFor!(8192)(inputStream);
|
||||||
= new BufferedInputStream!(SocketInputStream*, 8192)(inputStream);
|
auto result = readHttpRequest(&bufferedInput);
|
||||||
// Get remote address from the socket.
|
|
||||||
import handy_http_primitives.address;
|
|
||||||
ClientAddress addr = getAddress(clientSocket);
|
|
||||||
traceF!"Got request from client: %s"(addr.toString());
|
|
||||||
auto result = readHttpRequest(bufferedInput, addr);
|
|
||||||
if (result.hasError) {
|
if (result.hasError) {
|
||||||
if (result.error.code != -1) {
|
import std.stdio;
|
||||||
// Only warn if we didn't read an empty request.
|
stderr.writeln("Failed to read HTTP request: " ~ result.error.message);
|
||||||
warnF!"Failed to read request: %s"(result.error.message);
|
|
||||||
}
|
|
||||||
inputStream.closeStream();
|
inputStream.closeStream();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
scope ServerHttpRequest request = result.request;
|
scope ServerHttpRequest request = result.request;
|
||||||
scope ServerHttpResponse response;
|
scope ServerHttpResponse response;
|
||||||
SocketOutputStream* outputStream = new SocketOutputStream(clientSocket);
|
SocketOutputStream outputStream = SocketOutputStream(clientSocket);
|
||||||
response.outputStream = outputStreamObjectFor(HttpResponseOutputStream!(SocketOutputStream*)(
|
response.outputStream = outputStreamObjectFor(HttpResponseOutputStream!(SocketOutputStream*)(
|
||||||
outputStream,
|
&outputStream,
|
||||||
&response
|
&response
|
||||||
));
|
));
|
||||||
try {
|
try {
|
||||||
requestHandler.handle(request, response);
|
requestHandler.handle(request, response);
|
||||||
debugF!"%s %s -> %d %s"(request.method, request.url, response.status.code, response.status.text);
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
error("Exception thrown while handling request.", e);
|
import std.stdio;
|
||||||
} catch (Throwable t) {
|
stderr.writeln("Exception thrown while handling request: " ~ e.msg);
|
||||||
errorF!"Throwable error while handling request: %s"(t.msg);
|
|
||||||
throw t;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (response.status != HttpStatus.SWITCHING_PROTOCOLS) {
|
|
||||||
inputStream.closeStream();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test case where we use a local socket pair to test the full handleClient
|
|
||||||
// workflow from the HttpRequestHandler's point of view.
|
|
||||||
unittest {
|
|
||||||
Socket[2] sockets = socketPair();
|
|
||||||
Socket clientSocket = sockets[0];
|
|
||||||
Socket serverSocket = sockets[1];
|
|
||||||
const requestContent =
|
|
||||||
"POST /data HTTP/1.1\r\n" ~
|
|
||||||
"Content-Type: application/json\r\n" ~
|
|
||||||
"Content-Length: 22\r\n" ~
|
|
||||||
"\r\n" ~
|
|
||||||
"{\"x\": 5, \"flag\": true}";
|
|
||||||
clientSocket.send(cast(ubyte[]) requestContent);
|
|
||||||
|
|
||||||
class TestHandler : HttpRequestHandler {
|
|
||||||
import std.conv;
|
|
||||||
|
|
||||||
void handle(ref ServerHttpRequest request, ref ServerHttpResponse response) {
|
|
||||||
assert(request.headers["Content-Type"] == ["application/json"]);
|
|
||||||
assert("Content-Length" in request.headers && request.headers["Content-Length"].length > 0);
|
|
||||||
ulong contentLength = request.headers["Content-Length"][0].to!ulong;
|
|
||||||
assert(contentLength == 22);
|
|
||||||
ubyte[22] bodyBuffer;
|
|
||||||
auto readResult = request.inputStream.readFromStream(bodyBuffer);
|
|
||||||
assert(readResult.hasCount && readResult.count == 22);
|
|
||||||
assert(cast(string) bodyBuffer == "{\"x\": 5, \"flag\": true}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
handleClient(serverSocket, new TestHandler());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Gets a ClientAddress value from a socket's address information.
|
|
||||||
* Params:
|
|
||||||
* socket = The socket to get address information for.
|
|
||||||
* Returns: The address that was obtained.
|
|
||||||
*/
|
|
||||||
ClientAddress getAddress(Socket socket) {
|
|
||||||
try {
|
|
||||||
Address addr = socket.remoteAddress();
|
|
||||||
if (auto a = cast(InternetAddress) addr) {
|
|
||||||
union U {
|
|
||||||
ubyte[4] bytes;
|
|
||||||
uint intValue;
|
|
||||||
}
|
|
||||||
U u;
|
|
||||||
u.intValue = a.addr();
|
|
||||||
return ClientAddress.ofIPv4(IPv4InternetAddress(
|
|
||||||
u.bytes,
|
|
||||||
a.port()
|
|
||||||
));
|
|
||||||
} else if (auto a = cast(Internet6Address) addr) {
|
|
||||||
return ClientAddress.ofIPv6(IPv6InternetAddress(
|
|
||||||
a.addr(),
|
|
||||||
a.port()
|
|
||||||
));
|
|
||||||
} else if (auto a = cast(UnixAddress) addr) {
|
|
||||||
return ClientAddress.ofUnixSocket(UnixSocketAddress(a.path()));
|
|
||||||
} else {
|
|
||||||
return ClientAddress(ClientAddressType.UNKNOWN);
|
|
||||||
}
|
|
||||||
} catch (SocketOSException e) {
|
|
||||||
return ClientAddress(ClientAddressType.UNKNOWN);
|
|
||||||
}
|
}
|
||||||
|
inputStream.closeStream();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Alias for the result of the `readHttpRequest` function which parses HTTP requests.
|
/// Alias for the result of the `readHttpRequest` function which parses HTTP requests.
|
||||||
|
@ -223,22 +96,15 @@ alias HttpRequestParseResult = Either!(ServerHttpRequest, "request", StreamError
|
||||||
* Parses an HTTP/1.1 request from a byte input stream.
|
* Parses an HTTP/1.1 request from a byte input stream.
|
||||||
* Params:
|
* Params:
|
||||||
* inputStream = The byte input stream to read from.
|
* inputStream = The byte input stream to read from.
|
||||||
* addr = The client address, used in constructed the http request struct.
|
|
||||||
* Returns: Either the request which was parsed, or a stream error.
|
* Returns: Either the request which was parsed, or a stream error.
|
||||||
*/
|
*/
|
||||||
HttpRequestParseResult readHttpRequest(S)(S inputStream, in ClientAddress addr) if (isByteInputStream!S) {
|
HttpRequestParseResult readHttpRequest(S)(S inputStream) if (isByteInputStream!S) {
|
||||||
auto methodStr = consumeUntil(inputStream, " ");
|
import handy_http_primitives.address;
|
||||||
if (methodStr.hasError) {
|
|
||||||
if (methodStr.error.code == 0) {
|
|
||||||
// Set a custom code to indicate an empty request.
|
|
||||||
return HttpRequestParseResult(StreamError(methodStr.error.message, -1));
|
|
||||||
}
|
|
||||||
return HttpRequestParseResult(methodStr.error);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
auto methodStr = consumeUntil(inputStream, " ");
|
||||||
|
if (methodStr.hasError) return HttpRequestParseResult(methodStr.error);
|
||||||
auto urlStr = consumeUntil(inputStream, " ");
|
auto urlStr = consumeUntil(inputStream, " ");
|
||||||
if (urlStr.hasError) return HttpRequestParseResult(urlStr.error);
|
if (urlStr.hasError) return HttpRequestParseResult(urlStr.error);
|
||||||
|
|
||||||
auto versionStr = consumeUntil(inputStream, "\r\n");
|
auto versionStr = consumeUntil(inputStream, "\r\n");
|
||||||
if (versionStr.hasError) return HttpRequestParseResult(versionStr.error);
|
if (versionStr.hasError) return HttpRequestParseResult(versionStr.error);
|
||||||
|
|
||||||
|
@ -252,67 +118,21 @@ HttpRequestParseResult readHttpRequest(S)(S inputStream, in ClientAddress addr)
|
||||||
auto headersResult = parseHeaders(inputStream);
|
auto headersResult = parseHeaders(inputStream);
|
||||||
if (headersResult.hasError) return HttpRequestParseResult(headersResult.error);
|
if (headersResult.hasError) return HttpRequestParseResult(headersResult.error);
|
||||||
|
|
||||||
auto queryParams = parseQueryParameters(urlStr.value);
|
|
||||||
|
|
||||||
import std.uri : decode; // TODO: Remove dependency on phobos for this?
|
|
||||||
|
|
||||||
return HttpRequestParseResult(ServerHttpRequest(
|
return HttpRequestParseResult(ServerHttpRequest(
|
||||||
httpVersion,
|
httpVersion,
|
||||||
addr,
|
ClientAddress.init, // TODO: Get this from the socket, if possible?
|
||||||
methodStr.value,
|
methodStr.value,
|
||||||
decode(urlStr.value),
|
urlStr.value,
|
||||||
headersResult.headers,
|
headersResult.headers,
|
||||||
queryParams,
|
|
||||||
inputStreamObjectFor(inputStream)
|
inputStreamObjectFor(inputStream)
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
unittest {
|
|
||||||
import streams;
|
|
||||||
|
|
||||||
auto makeStream(string text) {
|
|
||||||
return arrayInputStreamFor(cast(ubyte[]) text);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Basic HTTP request.
|
|
||||||
ArrayInputStream!ubyte s1 = makeStream(
|
|
||||||
"GET /test?x=5 HTTP/1.1\r\n" ~
|
|
||||||
"Accept: text/plain\r\n" ~
|
|
||||||
"\r\n"
|
|
||||||
);
|
|
||||||
auto r1 = readHttpRequest(&s1, ClientAddress.unknown());
|
|
||||||
assert(r1.hasRequest);
|
|
||||||
assert(r1.request.httpVersion == HttpVersion.V1);
|
|
||||||
assert(r1.request.method == HttpMethod.GET);
|
|
||||||
assert(r1.request.url == "/test?x=5");
|
|
||||||
const r1ExpectedHeaders = ["Accept": ["text/plain"]];
|
|
||||||
assert(r1.request.headers == r1ExpectedHeaders);
|
|
||||||
assert(r1.request.clientAddress == ClientAddress.unknown());
|
|
||||||
|
|
||||||
// POST request with body. Test that the body is read correctly.
|
|
||||||
ArrayInputStream!ubyte s2 = makeStream(
|
|
||||||
"POST /data HTTP/1.1\r\n" ~
|
|
||||||
"Content-Type: text/plain\r\n" ~
|
|
||||||
"Content-Length: 12\r\n" ~
|
|
||||||
"\r\n" ~
|
|
||||||
"Hello world!"
|
|
||||||
);
|
|
||||||
auto r2 = readHttpRequest(&s2, ClientAddress.unknown());
|
|
||||||
assert(r2.hasRequest);
|
|
||||||
assert(r2.request.method == HttpMethod.POST);
|
|
||||||
ubyte[12] r2BodyBuffer;
|
|
||||||
StreamResult r2BodyReadResult = s2.readFromStream(r2BodyBuffer);
|
|
||||||
assert(r2BodyReadResult.count == 12);
|
|
||||||
assert(cast(string) r2BodyBuffer == "Hello world!");
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Parses HTTP headers from an input stream, and returns them as an associative
|
* Parses HTTP headers from an input stream, and returns them as an associative
|
||||||
* array mapping header names to their list of values.
|
* array mapping header names to their list of values.
|
||||||
* Params:
|
* Params:
|
||||||
* inputStream = The byte input stream to read from. Note that this stream
|
* inputStream = The byte input stream to read from.
|
||||||
* should be passed as a pointer / reference, values will be
|
|
||||||
* consumed from the stream.
|
|
||||||
* Returns: Either the headers, or a stream error.
|
* Returns: Either the headers, or a stream error.
|
||||||
*/
|
*/
|
||||||
Either!(string[][string], "headers", StreamError, "error") parseHeaders(S)(S inputStream) if (isByteInputStream!S) {
|
Either!(string[][string], "headers", StreamError, "error") parseHeaders(S)(S inputStream) if (isByteInputStream!S) {
|
||||||
|
@ -338,41 +158,179 @@ Either!(string[][string], "headers", StreamError, "error") parseHeaders(S)(S inp
|
||||||
return Either!(string[][string], "headers", StreamError, "error")(headers);
|
return Either!(string[][string], "headers", StreamError, "error")(headers);
|
||||||
}
|
}
|
||||||
|
|
||||||
unittest {
|
/**
|
||||||
import streams;
|
* Helper function to consume string content from an input stream until a
|
||||||
|
* certain target pattern of characters is encountered.
|
||||||
|
* Params:
|
||||||
|
* inputStream = The stream to read from.
|
||||||
|
* target = The target at which to stop reading.
|
||||||
|
* Returns: The string that was read, or a stream error.
|
||||||
|
*/
|
||||||
|
private Either!(string, "value", StreamError, "error") consumeUntil(S)(
|
||||||
|
S inputStream,
|
||||||
|
string target
|
||||||
|
) if (isByteInputStream!S) {
|
||||||
|
ubyte[1024] buffer;
|
||||||
|
size_t idx;
|
||||||
|
while (true) {
|
||||||
|
auto result = inputStream.readFromStream(buffer[idx .. idx + 1]);
|
||||||
|
if (result.hasError) return Either!(string, "value", StreamError, "error")(result.error);
|
||||||
|
if (result.count != 1) return Either!(string, "value", StreamError, "error")(
|
||||||
|
StreamError("Failed to read a single element", 1)
|
||||||
|
);
|
||||||
|
idx++;
|
||||||
|
if (idx >= target.length && buffer[idx - target.length .. idx] == target) {
|
||||||
|
return Either!(string, "value", StreamError, "error")(
|
||||||
|
cast(string) buffer[0 .. idx - target.length].idup
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if (idx >= buffer.length) {
|
||||||
|
return Either!(string, "value", StreamError, "error")(
|
||||||
|
StreamError("Couldn't find target \"" ~ target ~ "\" after reading 1024 bytes.", 1)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
auto makeStream(string text) {
|
/**
|
||||||
return arrayInputStreamFor(cast(ubyte[]) text);
|
* Internal helper function to get the first index of a character in a string.
|
||||||
|
* Params:
|
||||||
|
* s = The string to look in.
|
||||||
|
* c = The character to look for.
|
||||||
|
* offset = An optional offset to look from.
|
||||||
|
* Returns: The index of the character, or -1.
|
||||||
|
*/
|
||||||
|
private ptrdiff_t indexOf(string s, char c, size_t offset = 0) {
|
||||||
|
for (size_t i = offset; i < s.length; i++) {
|
||||||
|
if (s[i] == c) return i;
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Internal helper function that returns the slice of a string excluding any
|
||||||
|
* preceding or trailing spaces.
|
||||||
|
* Params:
|
||||||
|
* s = The string to strip.
|
||||||
|
* Returns: The slice of the string that has been stripped.
|
||||||
|
*/
|
||||||
|
private string stripSpaces(string s) {
|
||||||
|
if (s.length == 0) return s;
|
||||||
|
ptrdiff_t startIdx = 0;
|
||||||
|
while (s[startIdx] == ' ' && startIdx < s.length) startIdx++;
|
||||||
|
s = s[startIdx .. $];
|
||||||
|
ptrdiff_t endIdx = s.length - 1;
|
||||||
|
while (s[endIdx] == ' ' && endIdx >= 0) endIdx--;
|
||||||
|
return s[0 .. endIdx + 1];
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper function to append an unsigned integer value to a char buffer. It is
|
||||||
|
* assumed that there's enough space to write the value.
|
||||||
|
* Params:
|
||||||
|
* value = The value to append.
|
||||||
|
* buffer = The buffer to append to.
|
||||||
|
* idx = A reference to a variable tracking the next writable index in the buffer.
|
||||||
|
*/
|
||||||
|
private void writeUIntToBuffer(uint value, char[] buffer, ref size_t idx) {
|
||||||
|
const size_t startIdx = idx;
|
||||||
|
while (true) {
|
||||||
|
ubyte remainder = value % 10;
|
||||||
|
value /= 10;
|
||||||
|
buffer[idx++] = cast(char) ('0' + remainder);
|
||||||
|
if (value == 0) break;
|
||||||
|
}
|
||||||
|
// Swap the characters to proper order.
|
||||||
|
for (size_t i = 0; i < (idx - startIdx) / 2; i++) {
|
||||||
|
size_t p1 = i + startIdx;
|
||||||
|
size_t p2 = idx - i - 1;
|
||||||
|
char tmp = buffer[p1];
|
||||||
|
buffer[p1] = buffer[p2];
|
||||||
|
buffer[p2] = tmp;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A wrapper around a byte output stream that's used for writing HTTP response
|
||||||
|
* content. It keeps a reference to the `ServerHttpResponse` so that when a
|
||||||
|
* handler writes data to the stream, it'll flush the HTTP response status and
|
||||||
|
* headers beforehand.
|
||||||
|
*/
|
||||||
|
struct HttpResponseOutputStream(S) if (isByteOutputStream!S) {
|
||||||
|
/// The underlying output stream to write to.
|
||||||
|
private S outputStream;
|
||||||
|
/// A pointer to the HTTP response that this stream is for.
|
||||||
|
private ServerHttpResponse* response;
|
||||||
|
/// Flag that keeps track of if the HTTP status and headers were written.
|
||||||
|
private bool headersFlushed = false;
|
||||||
|
|
||||||
|
this(S outputStream, ServerHttpResponse* response) {
|
||||||
|
this.outputStream = outputStream;
|
||||||
|
this.response = response;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Basic valid headers.
|
/**
|
||||||
auto s1 = makeStream("Content-Type: application/json\r\n\r\n");
|
* Writes the given data to the stream. If the referenced HTTP response's
|
||||||
auto r1 = parseHeaders(&s1);
|
* status and headers haven't yet been written, they will be written first.
|
||||||
assert(r1.hasHeaders);
|
* Params:
|
||||||
assert("Content-Type" in r1.headers);
|
* buffer = The buffer containing data to write.
|
||||||
assert(r1.headers["Content-Type"] == ["application/json"]);
|
* Returns: The result of writing. If status and headers are written, the
|
||||||
|
* number of bytes written will include that in addition to the buffer size.
|
||||||
|
*/
|
||||||
|
StreamResult writeToStream(ubyte[] buffer) {
|
||||||
|
uint bytesWritten = 0;
|
||||||
|
if (!headersFlushed) {
|
||||||
|
auto result = writeHeaders();
|
||||||
|
if (result.hasError) return result;
|
||||||
|
bytesWritten += result.count;
|
||||||
|
headersFlushed = true;
|
||||||
|
}
|
||||||
|
auto result = outputStream.writeToStream(buffer);
|
||||||
|
if (result.hasError) return result;
|
||||||
|
return StreamResult(result.count + bytesWritten);
|
||||||
|
}
|
||||||
|
|
||||||
// Multiple headers.
|
/**
|
||||||
auto s2 = makeStream("Accept: text, json, image\r\nContent-Length: 1234\r\n\r\n");
|
* Writes HTTP/1.1 status line and headers to the underlying output stream,
|
||||||
auto r2 = parseHeaders(&s2);
|
* which is done before any body content can be written.
|
||||||
assert(r2.hasHeaders);
|
* Returns: The stream result of writing.
|
||||||
assert("Accept" in r2.headers);
|
*/
|
||||||
assert(r2.headers["Accept"] == ["text, json, image"]);
|
StreamResult writeHeaders() {
|
||||||
assert(r2.headers["Content-Length"] == ["1234"]);
|
// TODO: Come up with a better way of writing headers than string concatenation.
|
||||||
|
size_t idx = 0;
|
||||||
|
char[6] statusCodeBuffer; // Normal HTTP codes are 3 digits, but this leaves room for extensions.
|
||||||
|
writeUIntToBuffer(response.status.code, statusCodeBuffer, idx);
|
||||||
|
|
||||||
// Basic invalid header string.
|
string statusAndHeaders = "HTTP/1.1 "
|
||||||
auto s3 = makeStream("Invalid headers");
|
~ cast(string) statusCodeBuffer[0..idx]
|
||||||
auto r3 = parseHeaders(&s3);
|
~ " " ~ response.status.text
|
||||||
assert(r3.hasError);
|
~ "\r\n";
|
||||||
|
foreach (headerName; response.headers.keys) {
|
||||||
// No trailing \r\n
|
string headerLine = headerName ~ ": ";
|
||||||
auto s4 = makeStream("Content-Type: application/json");
|
string[] headerValues = response.headers.getAll(headerName);
|
||||||
auto r4 = parseHeaders(&s4);
|
for (size_t i = 0; i < headerValues.length; i++) {
|
||||||
assert(r4.hasError);
|
headerLine ~= headerValues[i];
|
||||||
|
if (i + 1 < headerValues.length) {
|
||||||
// Empty headers.
|
headerLine ~= ", ";
|
||||||
auto s5 = makeStream("\r\n");
|
}
|
||||||
auto r5 = parseHeaders(&s5);
|
}
|
||||||
assert(r5.hasHeaders);
|
headerLine ~= "\r\n";
|
||||||
assert(r5.headers.length == 0);
|
statusAndHeaders ~= headerLine;
|
||||||
|
}
|
||||||
|
statusAndHeaders ~= "\r\n"; // Trailing CLRF before the body.
|
||||||
|
return outputStream.writeToStream(cast(ubyte[]) statusAndHeaders);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
unittest {
|
||||||
|
class TestHandler : HttpRequestHandler {
|
||||||
|
void handle(ref ServerHttpRequest request, ref ServerHttpResponse response) {
|
||||||
|
response.status = HttpStatus.OK;
|
||||||
|
response.headers.add("Content-Type", "application/json");
|
||||||
|
response.outputStream.writeToStream(cast(ubyte[]) "{\"a\": 1}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
HttpTransport tp = new Http1Transport(new TestHandler(), 8080);
|
||||||
|
tp.start();
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,3 +1,2 @@
|
||||||
module handy_http_transport.http2;
|
module handy_http_transport.http2;
|
||||||
|
|
||||||
// Not yet implemented.
|
|
||||||
|
|
|
@ -4,21 +4,3 @@ interface HttpTransport {
|
||||||
void start();
|
void start();
|
||||||
void stop();
|
void stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
import core.thread;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Starts a new thread to run an HTTP transport implementation in, separate
|
|
||||||
* from the calling thread. This is useful for running a server in the
|
|
||||||
* background, like for integration tests.
|
|
||||||
* Params:
|
|
||||||
* transport = The transport implementation to start.
|
|
||||||
* Returns: The thread that was started.
|
|
||||||
*/
|
|
||||||
Thread startInNewThread(HttpTransport transport) {
|
|
||||||
Thread t = new Thread(() {
|
|
||||||
transport.start();
|
|
||||||
});
|
|
||||||
t.start();
|
|
||||||
return t;
|
|
||||||
}
|
|
||||||
|
|
|
@ -2,6 +2,3 @@ module handy_http_transport;
|
||||||
|
|
||||||
public import handy_http_transport.http1;
|
public import handy_http_transport.http1;
|
||||||
public import handy_http_transport.http2;
|
public import handy_http_transport.http2;
|
||||||
|
|
||||||
public import handy_http_transport.interfaces;
|
|
||||||
public import handy_http_transport.response_output_stream;
|
|
||||||
|
|
|
@ -1,124 +0,0 @@
|
||||||
module handy_http_transport.response_output_stream;
|
|
||||||
|
|
||||||
import handy_http_transport.helpers : writeUIntToBuffer;
|
|
||||||
import handy_http_primitives : ServerHttpResponse;
|
|
||||||
import streams;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A wrapper around a byte output stream that's used for writing HTTP response
|
|
||||||
* content. It keeps a reference to the `ServerHttpResponse` so that when a
|
|
||||||
* handler writes data to the stream, it'll flush the HTTP response status and
|
|
||||||
* headers beforehand.
|
|
||||||
*/
|
|
||||||
struct HttpResponseOutputStream(S) if (isByteOutputStream!S) {
|
|
||||||
/// The underlying output stream to write to.
|
|
||||||
private S outputStream;
|
|
||||||
/// A pointer to the HTTP response that this stream is for.
|
|
||||||
private ServerHttpResponse* response;
|
|
||||||
/// Flag that keeps track of if the HTTP status and headers were written.
|
|
||||||
private bool headersFlushed = false;
|
|
||||||
|
|
||||||
this(S outputStream, ServerHttpResponse* response) {
|
|
||||||
this.outputStream = outputStream;
|
|
||||||
this.response = response;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Writes the given data to the stream. If the referenced HTTP response's
|
|
||||||
* status and headers haven't yet been written, they will be written first.
|
|
||||||
* Params:
|
|
||||||
* buffer = The buffer containing data to write.
|
|
||||||
* Returns: The result of writing. If status and headers are written, the
|
|
||||||
* number of bytes written will include that in addition to the buffer size.
|
|
||||||
*/
|
|
||||||
StreamResult writeToStream(ubyte[] buffer) {
|
|
||||||
uint bytesWritten = 0;
|
|
||||||
if (!headersFlushed) {
|
|
||||||
auto result = writeHeaders();
|
|
||||||
if (result.hasError) return result;
|
|
||||||
bytesWritten += result.count;
|
|
||||||
headersFlushed = true;
|
|
||||||
}
|
|
||||||
auto result = outputStream.writeToStream(buffer);
|
|
||||||
if (result.hasError) return result;
|
|
||||||
return StreamResult(result.count + bytesWritten);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Writes HTTP/1.1 status line and headers to the underlying output stream,
|
|
||||||
* which is done before any body content can be written.
|
|
||||||
* Returns: The stream result of writing.
|
|
||||||
*/
|
|
||||||
StreamResult writeHeaders() {
|
|
||||||
size_t idx = 0;
|
|
||||||
char[6] statusCodeBuffer; // Normal HTTP codes are 3 digits, but this leaves room for extensions.
|
|
||||||
writeUIntToBuffer(response.status.code, statusCodeBuffer, idx);
|
|
||||||
// Write the status line.
|
|
||||||
StreamResult r = outputStream.writeToStream(cast(ubyte[]) "HTTP/1.1 ");
|
|
||||||
if (r.hasError) return r;
|
|
||||||
size_t writeCount = r.count;
|
|
||||||
r = outputStream.writeToStream(cast(ubyte[]) statusCodeBuffer[0..idx]);
|
|
||||||
if (r.hasError) return r;
|
|
||||||
writeCount += r.count;
|
|
||||||
r = outputStream.writeToStream([' ']);
|
|
||||||
if (r.hasError) return r;
|
|
||||||
writeCount += r.count;
|
|
||||||
r = outputStream.writeToStream(cast(ubyte[]) response.status.text);
|
|
||||||
if (r.hasError) return r;
|
|
||||||
writeCount += r.count;
|
|
||||||
r = outputStream.writeToStream(['\r', '\n']);
|
|
||||||
if (r.hasError) return r;
|
|
||||||
writeCount += r.count;
|
|
||||||
|
|
||||||
foreach (headerName; response.headers.keys) {
|
|
||||||
// Write the header name.
|
|
||||||
r = outputStream.writeToStream(cast(ubyte[]) headerName);
|
|
||||||
if (r.hasError) return r;
|
|
||||||
writeCount += r.count;
|
|
||||||
r = outputStream.writeToStream([':', ' ']);
|
|
||||||
if (r.hasError) return r;
|
|
||||||
writeCount += r.count;
|
|
||||||
// Write the comma-separated list of values.
|
|
||||||
string[] headerValues = response.headers.getAll(headerName);
|
|
||||||
for (size_t i = 0; i < headerValues.length; i++) {
|
|
||||||
r = outputStream.writeToStream(cast(ubyte[]) headerValues[i]);
|
|
||||||
if (r.hasError) return r;
|
|
||||||
writeCount += r.count;
|
|
||||||
if (i + 1 < headerValues.length) {
|
|
||||||
r = outputStream.writeToStream([',', ' ']);
|
|
||||||
if (r.hasError) return r;
|
|
||||||
writeCount += r.count;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
r = outputStream.writeToStream(['\r', '\n']);
|
|
||||||
if (r.hasError) return r;
|
|
||||||
writeCount += r.count;
|
|
||||||
}
|
|
||||||
r = outputStream.writeToStream(['\r', '\n']); // Trailing CLRF before the body.
|
|
||||||
if (r.hasError) return r;
|
|
||||||
writeCount += r.count;
|
|
||||||
return StreamResult(cast(uint) writeCount);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test basic functionality for writing a standard response with headers and a
|
|
||||||
// body.
|
|
||||||
unittest {
|
|
||||||
import handy_http_primitives.response;
|
|
||||||
|
|
||||||
ArrayOutputStream!ubyte os;
|
|
||||||
ServerHttpResponse resp;
|
|
||||||
resp.status = HttpStatus.OK;
|
|
||||||
resp.headers.add("Content-Type", "text/plain");
|
|
||||||
auto httpOut = HttpResponseOutputStream!(ArrayOutputStream!ubyte*)(&os, &resp);
|
|
||||||
resp.outputStream = outputStreamObjectFor(httpOut);
|
|
||||||
StreamResult r = resp.outputStream.writeToStream(cast(ubyte[]) "Hello world!");
|
|
||||||
const expectedOutput =
|
|
||||||
"HTTP/1.1 200 OK\r\n" ~
|
|
||||||
"Content-Type: text/plain\r\n" ~
|
|
||||||
"\r\n" ~
|
|
||||||
"Hello world!";
|
|
||||||
assert(os.toArray() == expectedOutput);
|
|
||||||
assert(r.hasCount);
|
|
||||||
assert(r.count == os.toArray().length);
|
|
||||||
}
|
|
Loading…
Reference in New Issue