Compare commits
No commits in common. "main" and "v1.0.0" have entirely different histories.
|
@ -1,32 +0,0 @@
|
||||||
name: Build and Test Module
|
|
||||||
on:
|
|
||||||
push:
|
|
||||||
paths:
|
|
||||||
- 'source/**'
|
|
||||||
- '.gitea/workflows/ci.yaml'
|
|
||||||
pull_request:
|
|
||||||
types: [opened, reopened, synchronize]
|
|
||||||
jobs:
|
|
||||||
build-and-test:
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
steps:
|
|
||||||
- uses: actions/checkout@v4
|
|
||||||
- name: Setup DLang
|
|
||||||
uses: dlang-community/setup-dlang@v2
|
|
||||||
with:
|
|
||||||
compiler: ldc-latest
|
|
||||||
- name: Build and Test
|
|
||||||
run: dub -q test
|
|
||||||
|
|
||||||
integration-tests:
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
steps:
|
|
||||||
- uses: actions/checkout@v4
|
|
||||||
- name: Setup DLang
|
|
||||||
uses: dlang-community/setup-dlang@v2
|
|
||||||
with:
|
|
||||||
compiler: ldc-latest
|
|
||||||
|
|
||||||
- name: http1-test
|
|
||||||
working-directory: integration-tests/http1-basic
|
|
||||||
run: dub run --single http1-test.d
|
|
6
LICENSE
6
LICENSE
|
@ -1,5 +1 @@
|
||||||
Copyright 2025 Andrew Lalis
|
Handy-Http by Andrew Lalis is marked with CC0 1.0 Universal. To view a copy of this license, visit https://creativecommons.org/publicdomain/zero/1.0/
|
||||||
|
|
||||||
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so.
|
|
||||||
|
|
||||||
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
|
12
README.md
12
README.md
|
@ -1,16 +1,10 @@
|
||||||
# http-transport
|
# http-transport
|
||||||
|
|
||||||
This library provides implementations of various versions of HTTP transport,
|
Implementations of HTTP transport protocols, compatible with other Handy-Http components.
|
||||||
acting as a "glue" for connecting clients and servers. Practically speaking,
|
|
||||||
the handy-http-transport library provides HTTP server implementations you can
|
|
||||||
use interchangeably with other handy-http libraries.
|
|
||||||
|
|
||||||
For now, see the section on HTTP/1.1, as that's the only HTTP version
|
|
||||||
implemented so far.
|
|
||||||
|
|
||||||
## HTTP/1.1
|
## HTTP/1.1
|
||||||
|
|
||||||
Use the `TaskPoolHttp1Transport` implementation of `HttpTransport` to serve content
|
Use the `Http1Transport` implementation of `HttpTransport` to serve content
|
||||||
using the HTTP/1.1 protocol. See the example below:
|
using the HTTP/1.1 protocol. See the example below:
|
||||||
|
|
||||||
```d
|
```d
|
||||||
|
@ -26,7 +20,7 @@ class MyHandler : HttpRequestHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
void main() {
|
void main() {
|
||||||
HttpTransport tp = new TaskPoolHttp1Transport(new MyHandler());
|
HttpTransport tp = new Http1Transport(new MyHandler(), 8080);
|
||||||
tp.start();
|
tp.start();
|
||||||
}
|
}
|
||||||
```
|
```
|
7
dub.json
7
dub.json
|
@ -4,10 +4,9 @@
|
||||||
],
|
],
|
||||||
"copyright": "Copyright © 2024, Andrew Lalis",
|
"copyright": "Copyright © 2024, Andrew Lalis",
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"handy-http-primitives": "~>1.8",
|
"handy-http-primitives": "~>1.0.0",
|
||||||
"photon": "~>0.15.0",
|
"photon": "~>0.10.2",
|
||||||
"slf4d": "~>4.0",
|
"streams": "~>3.5.0"
|
||||||
"streams": "~>3.6"
|
|
||||||
},
|
},
|
||||||
"description": "Implementations of HTTP transport protocols.",
|
"description": "Implementations of HTTP transport protocols.",
|
||||||
"license": "CC0",
|
"license": "CC0",
|
||||||
|
|
|
@ -1,10 +1,9 @@
|
||||||
{
|
{
|
||||||
"fileVersion": 1,
|
"fileVersion": 1,
|
||||||
"versions": {
|
"versions": {
|
||||||
"handy-http-primitives": "1.8.0",
|
"handy-http-primitives": "1.0.0",
|
||||||
"photon": "0.15.0",
|
"photon": "0.10.2",
|
||||||
"sharded-map": "2.7.0",
|
"sharded-map": "2.7.0",
|
||||||
"slf4d": "4.1.1",
|
"streams": "3.5.0"
|
||||||
"streams": "3.6.0"
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1 +0,0 @@
|
||||||
http1-test
|
|
|
@ -1,53 +0,0 @@
|
||||||
/+ dub.sdl:
|
|
||||||
dependency "handy-http-transport" path="../../"
|
|
||||||
dependency "requests" version="~>2.1"
|
|
||||||
+/
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This tests the basic HTTP functionality of the Http1Transport implementation
|
|
||||||
* by starting a server, sending a request, and checking the response.
|
|
||||||
*/
|
|
||||||
module integration_tests.http1_test;
|
|
||||||
|
|
||||||
import handy_http_primitives;
|
|
||||||
import handy_http_transport;
|
|
||||||
import slf4d;
|
|
||||||
import slf4d.default_provider;
|
|
||||||
import requests;
|
|
||||||
|
|
||||||
import core.thread;
|
|
||||||
|
|
||||||
int main() {
|
|
||||||
auto loggingProvider = DefaultProvider.builder()
|
|
||||||
.withRootLoggingLevel(Levels.INFO)
|
|
||||||
.withConsoleSerializer(true, 48)
|
|
||||||
.build();
|
|
||||||
configureLoggingProvider(loggingProvider);
|
|
||||||
|
|
||||||
HttpTransport transport = new TaskPoolHttp1Transport(HttpRequestHandler.of(&handleRequest));
|
|
||||||
Thread thread = transport.startInNewThread();
|
|
||||||
scope(exit) {
|
|
||||||
transport.stop();
|
|
||||||
thread.join();
|
|
||||||
}
|
|
||||||
info("Started server in another thread.");
|
|
||||||
Thread.sleep(msecs(100)); // Wait for the server to start.
|
|
||||||
|
|
||||||
// Send a simple GET request to the server.
|
|
||||||
info("Sending GET request to http://localhost:8080");
|
|
||||||
auto content = getContent("http://localhost:8080");
|
|
||||||
ubyte[] data = content.data;
|
|
||||||
if (data.length != 13 || (cast(string) data) != "Hello, world!") {
|
|
||||||
error("Received unexpected content: " ~ cast(string) data);
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
info("Test completed successfully.");
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void handleRequest(ref ServerHttpRequest request, ref ServerHttpResponse response) {
|
|
||||||
response.headers.add("Content-Type", "text/plain");
|
|
||||||
response.headers.add("Content-Length", "13");
|
|
||||||
response.outputStream.writeToStream(cast(ubyte[]) "Hello, world!");
|
|
||||||
}
|
|
|
@ -1 +0,0 @@
|
||||||
http1-speed-test
|
|
|
@ -1,28 +0,0 @@
|
||||||
/+ dub.sdl:
|
|
||||||
dependency "handy-http-transport" path="../../"
|
|
||||||
+/
|
|
||||||
module integration_tests.http1_speed_test;
|
|
||||||
|
|
||||||
import handy_http_primitives;
|
|
||||||
import handy_http_transport;
|
|
||||||
import slf4d;
|
|
||||||
import slf4d.default_provider;
|
|
||||||
|
|
||||||
void main() {
|
|
||||||
auto loggingProvider = DefaultProvider.builder()
|
|
||||||
.withRootLoggingLevel(Levels.ERROR)
|
|
||||||
.withConsoleSerializer(true, 48)
|
|
||||||
.build();
|
|
||||||
configureLoggingProvider(loggingProvider);
|
|
||||||
HttpTransport transport;
|
|
||||||
transport = new TaskPoolHttp1Transport(HttpRequestHandler.of(
|
|
||||||
(ref ServerHttpRequest request, ref ServerHttpResponse response) {
|
|
||||||
if (request.method == HttpMethod.DELETE) {
|
|
||||||
transport.stop();
|
|
||||||
}
|
|
||||||
response.headers.add("Content-Type", "text/plain");
|
|
||||||
response.headers.add("Content-Length", "13");
|
|
||||||
response.outputStream.writeToStream(cast(ubyte[]) "Hello, world!");
|
|
||||||
}));
|
|
||||||
transport.start();
|
|
||||||
}
|
|
|
@ -1,14 +0,0 @@
|
||||||
#!/usr/bin/env bash
|
|
||||||
|
|
||||||
# This script runs all integration tests.
|
|
||||||
|
|
||||||
set -e -o pipefail
|
|
||||||
|
|
||||||
cd http1-basic
|
|
||||||
dub build --single http1-test.d
|
|
||||||
./http1-test
|
|
||||||
cd ..
|
|
||||||
|
|
||||||
# cd http1-speed-test
|
|
||||||
# dub build --single --build=release http1-speed-test.d
|
|
||||||
|
|
|
@ -1,103 +0,0 @@
|
||||||
module handy_http_transport.helpers;
|
|
||||||
|
|
||||||
import streams;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Helper function to consume string content from an input stream until a
|
|
||||||
* certain target pattern of characters is encountered.
|
|
||||||
* Params:
|
|
||||||
* inputStream = The stream to read from.
|
|
||||||
* target = The target at which to stop reading.
|
|
||||||
* Returns: The string that was read, or a stream error.
|
|
||||||
*/
|
|
||||||
Either!(string, "value", StreamError, "error") consumeUntil(S)(
|
|
||||||
S inputStream,
|
|
||||||
string target
|
|
||||||
) if (isByteInputStream!S) {
|
|
||||||
ubyte[1024] buffer;
|
|
||||||
size_t idx;
|
|
||||||
while (true) {
|
|
||||||
auto result = inputStream.readFromStream(buffer[idx .. idx + 1]);
|
|
||||||
if (result.hasError) return Either!(string, "value", StreamError, "error")(result.error);
|
|
||||||
if (result.count != 1) return Either!(string, "value", StreamError, "error")(
|
|
||||||
StreamError("Failed to read a single element", 0)
|
|
||||||
);
|
|
||||||
idx++;
|
|
||||||
if (idx >= target.length && buffer[idx - target.length .. idx] == target) {
|
|
||||||
return Either!(string, "value", StreamError, "error")(
|
|
||||||
cast(string) buffer[0 .. idx - target.length].idup
|
|
||||||
);
|
|
||||||
} else if (idx >= buffer.length) {
|
|
||||||
return Either!(string, "value", StreamError, "error")(
|
|
||||||
StreamError("Couldn't find target \"" ~ target ~ "\" after reading 1024 bytes.", 1)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Internal helper function to get the first index of a character in a string.
|
|
||||||
* Params:
|
|
||||||
* s = The string to look in.
|
|
||||||
* c = The character to look for.
|
|
||||||
* offset = An optional offset to look from.
|
|
||||||
* Returns: The index of the character, or -1.
|
|
||||||
*/
|
|
||||||
ptrdiff_t indexOf(string s, char c, size_t offset = 0) {
|
|
||||||
for (size_t i = offset; i < s.length; i++) {
|
|
||||||
if (s[i] == c) return i;
|
|
||||||
}
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Internal helper function that returns the slice of a string excluding any
|
|
||||||
* preceding or trailing spaces.
|
|
||||||
* Params:
|
|
||||||
* s = The string to strip.
|
|
||||||
* Returns: The slice of the string that has been stripped.
|
|
||||||
*/
|
|
||||||
string stripSpaces(string s) {
|
|
||||||
if (s.length == 0) return s;
|
|
||||||
ptrdiff_t startIdx = 0;
|
|
||||||
while (startIdx < s.length && s[startIdx] == ' ') startIdx++;
|
|
||||||
s = s[startIdx .. $];
|
|
||||||
if (s.length == 0) return "";
|
|
||||||
ptrdiff_t endIdx = cast(ptrdiff_t) s.length - 1;
|
|
||||||
while (s[endIdx] == ' ' && endIdx >= 0) endIdx--;
|
|
||||||
return s[0 .. endIdx + 1];
|
|
||||||
}
|
|
||||||
|
|
||||||
unittest {
|
|
||||||
assert(stripSpaces("") == "");
|
|
||||||
assert(stripSpaces(" ") == "");
|
|
||||||
assert(stripSpaces("test") == "test");
|
|
||||||
assert(stripSpaces(" test") == "test");
|
|
||||||
assert(stripSpaces(" test string ") == "test string");
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Helper function to append an unsigned integer value to a char buffer. It is
|
|
||||||
* assumed that there's enough space to write the value.
|
|
||||||
* Params:
|
|
||||||
* value = The value to append.
|
|
||||||
* buffer = The buffer to append to.
|
|
||||||
* idx = A reference to a variable tracking the next writable index in the buffer.
|
|
||||||
*/
|
|
||||||
void writeUIntToBuffer(uint value, char[] buffer, ref size_t idx) {
|
|
||||||
const size_t startIdx = idx;
|
|
||||||
while (true) {
|
|
||||||
ubyte remainder = value % 10;
|
|
||||||
value /= 10;
|
|
||||||
buffer[idx++] = cast(char) ('0' + remainder);
|
|
||||||
if (value == 0) break;
|
|
||||||
}
|
|
||||||
// Swap the characters to proper order.
|
|
||||||
for (size_t i = 0; i < (idx - startIdx) / 2; i++) {
|
|
||||||
size_t p1 = i + startIdx;
|
|
||||||
size_t p2 = idx - i - 1;
|
|
||||||
char tmp = buffer[p1];
|
|
||||||
buffer[p1] = buffer[p2];
|
|
||||||
buffer[p2] = tmp;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,122 +0,0 @@
|
||||||
module handy_http_transport.http1.epoll;
|
|
||||||
|
|
||||||
import core.sys.posix.sys.socket;
|
|
||||||
import core.sys.linux.epoll;
|
|
||||||
import core.sys.posix.netinet.in_;
|
|
||||||
import core.sys.posix.fcntl;
|
|
||||||
import core.sys.posix.unistd;
|
|
||||||
|
|
||||||
import core.stdc.errno;
|
|
||||||
|
|
||||||
extern(C) {
|
|
||||||
int accept4(int sockfd, sockaddr *addr, socklen_t *addrlen, int flags);
|
|
||||||
}
|
|
||||||
|
|
||||||
import handy_http_transport.interfaces;
|
|
||||||
import handy_http_transport.http1;
|
|
||||||
import handy_http_primitives;
|
|
||||||
import slf4d;
|
|
||||||
|
|
||||||
class Http1EpollTransport : Http1Transport {
|
|
||||||
|
|
||||||
this(HttpRequestHandler requestHandler, ushort port) {
|
|
||||||
super(requestHandler, port);
|
|
||||||
}
|
|
||||||
|
|
||||||
override void start() {
|
|
||||||
super.start();
|
|
||||||
// Create the server socket.
|
|
||||||
enum SOCK_NONBLOCK = 0x4000;
|
|
||||||
int listenFd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
|
|
||||||
sockaddr_in serverAddress;
|
|
||||||
serverAddress.sin_family = AF_INET;
|
|
||||||
serverAddress.sin_port = htons(port);
|
|
||||||
serverAddress.sin_addr.s_addr = INADDR_ANY;
|
|
||||||
|
|
||||||
if (bind(listenFd, cast(sockaddr*) &serverAddress, serverAddress.sizeof) == -1) {
|
|
||||||
errorF!"Failed to bind socket: %d"(errno);
|
|
||||||
close(listenFd);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (listen(listenFd, SOMAXCONN) == -1) {
|
|
||||||
errorF!"Failed to listen on socket: %d"(errno);
|
|
||||||
close(listenFd);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
int epollFd = epoll_create1(0);
|
|
||||||
if (epollFd == -1) {
|
|
||||||
errorF!"Failed to create epoll instance: %d"(errno);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
epoll_event event;
|
|
||||||
epoll_event[64] events;
|
|
||||||
event.events = EPOLLIN | EPOLLET;
|
|
||||||
event.data.fd = listenFd;
|
|
||||||
if (epoll_ctl(epollFd, EPOLL_CTL_ADD, listenFd, &event) == -1) {
|
|
||||||
errorF!"Failed to add listen socket to epoll: %d"(errno);
|
|
||||||
close(listenFd);
|
|
||||||
close(epollFd);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
infoF!"Server listening on port %d."(port);
|
|
||||||
|
|
||||||
while (true) {
|
|
||||||
int eventCount = epoll_wait(epollFd, &event, 64, 5000);
|
|
||||||
if (eventCount == -1) {
|
|
||||||
errorF!"Epoll wait failed: %d"(errno);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int i = 0; i < eventCount; i++) {
|
|
||||||
if (events[i].data.fd == listenFd) {
|
|
||||||
// New incoming connection.
|
|
||||||
while (true) {
|
|
||||||
sockaddr_in clientAddress;
|
|
||||||
socklen_t clientAddressLength = clientAddress.sizeof;
|
|
||||||
int clientFd = accept4(
|
|
||||||
listenFd,
|
|
||||||
cast(sockaddr*) &clientAddress,
|
|
||||||
&clientAddressLength,
|
|
||||||
SOCK_NONBLOCK
|
|
||||||
);
|
|
||||||
if (clientFd == -1) {
|
|
||||||
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
|
||||||
// No more connections to accept.
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
errorF!"Failed to accept connection: %d"(errno);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add the client socket to epoll's listening list.
|
|
||||||
event.events = EPOLLIN | EPOLLET;
|
|
||||||
event.data.fd = clientFd;
|
|
||||||
if (epoll_ctl(epollFd, EPOLL_CTL_ADD, clientFd, &event) == -1) {
|
|
||||||
errorF!"Failed to add client socket to epoll: %d"(errno);
|
|
||||||
close(clientFd);
|
|
||||||
}
|
|
||||||
|
|
||||||
infoF!"Accepted new connection from %s:%d."(
|
|
||||||
inet_ntoa(clientAddress.sin_addr),
|
|
||||||
ntohs(clientAddress.sin_port)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Event on an existing client socket.
|
|
||||||
|
|
||||||
int clientFd = events[i].data.fd;
|
|
||||||
|
|
||||||
if (events[i].events & EPOLLIN) {
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -3,5 +3,3 @@
|
||||||
*/
|
*/
|
||||||
module handy_http_transport.http1;
|
module handy_http_transport.http1;
|
||||||
|
|
||||||
public import handy_http_transport.http1.transport;
|
|
||||||
public import handy_http_transport.http1.task_pool;
|
|
||||||
|
|
|
@ -1,67 +0,0 @@
|
||||||
module handy_http_transport.http1.photon;
|
|
||||||
|
|
||||||
import handy_http_transport.http1.transport;
|
|
||||||
import handy_http_primitives;
|
|
||||||
import slf4d;
|
|
||||||
|
|
||||||
import photon;
|
|
||||||
import std.socket;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* An implementation of Http1Transport which uses Dimitry Olshansky's Photon
|
|
||||||
* library for asynchronous task processing. A main fiber is started which
|
|
||||||
* accepts incoming client sockets, and a fiber is spawned for each client so
|
|
||||||
* its request can be handled asynchronously.
|
|
||||||
*/
|
|
||||||
class PhotonHttp1Transport : Http1Transport {
|
|
||||||
this(HttpRequestHandler handler, ushort port = 8080) {
|
|
||||||
super(handler, port);
|
|
||||||
}
|
|
||||||
|
|
||||||
override void runServer() {
|
|
||||||
initPhoton();
|
|
||||||
go(() {
|
|
||||||
Socket serverSocket = new TcpSocket();
|
|
||||||
serverSocket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1);
|
|
||||||
serverSocket.bind(parseAddress("127.0.0.1", port));
|
|
||||||
debugF!"Bound the server socket to %s"(serverSocket.localAddress);
|
|
||||||
serverSocket.listen(1024);
|
|
||||||
debug_("Server is now listening.");
|
|
||||||
|
|
||||||
while (super.isRunning) {
|
|
||||||
try {
|
|
||||||
trace("Waiting to accept a new socket.");
|
|
||||||
Socket clientSocket = serverSocket.accept();
|
|
||||||
trace("Accepted a new socket.");
|
|
||||||
go(() => handleClient(clientSocket, requestHandler));
|
|
||||||
trace("Added handleClient() task to the task pool.");
|
|
||||||
} catch (SocketAcceptException e) {
|
|
||||||
warn("Failed to accept socket connection.", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
serverSocket.close();
|
|
||||||
});
|
|
||||||
runScheduler();
|
|
||||||
}
|
|
||||||
|
|
||||||
override void stop() {
|
|
||||||
super.stop();
|
|
||||||
// Send a dummy request to cause the server's blocking accept() call to end.
|
|
||||||
try {
|
|
||||||
Socket dummySocket = new TcpSocket(new InternetAddress("127.0.0.1", port));
|
|
||||||
dummySocket.shutdown(SocketShutdown.BOTH);
|
|
||||||
dummySocket.close();
|
|
||||||
} catch (SocketOSException e) {
|
|
||||||
warn("Failed to send empty request to stop server.", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
unittest {
|
|
||||||
testHttp1Transport(new PhotonHttp1Transport(
|
|
||||||
HttpRequestHandler.of((req, resp) {
|
|
||||||
resp.status = HttpStatus.OK;
|
|
||||||
}),
|
|
||||||
8080
|
|
||||||
));
|
|
||||||
}
|
|
|
@ -1,114 +0,0 @@
|
||||||
module handy_http_transport.http1.task_pool;
|
|
||||||
|
|
||||||
import std.socket;
|
|
||||||
import std.parallelism;
|
|
||||||
|
|
||||||
import handy_http_transport.http1.transport;
|
|
||||||
import handy_http_primitives;
|
|
||||||
import slf4d;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Configuration options to provide when creating a new Http1Transport
|
|
||||||
* instance.
|
|
||||||
*/
|
|
||||||
struct Http1TransportConfig {
|
|
||||||
/// The host address to bind to.
|
|
||||||
string host;
|
|
||||||
/// The port to bind to.
|
|
||||||
ushort port;
|
|
||||||
/// The number of workers to use in the task pool.
|
|
||||||
size_t workerCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Defines the default configuration options if none are provided. They are:
|
|
||||||
* * Host address 127.0.0.1
|
|
||||||
* * Port 8080
|
|
||||||
* * Worker count of 5.
|
|
||||||
* Returns: The default configuration.
|
|
||||||
*/
|
|
||||||
Http1TransportConfig defaultConfig() {
|
|
||||||
return Http1TransportConfig(
|
|
||||||
"127.0.0.1",
|
|
||||||
8080,
|
|
||||||
5
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* An implementation of Http1Transport which uses D's standard library
|
|
||||||
* parallelization, where each incoming client request is turned into a task
|
|
||||||
* and submitted to the standard task pool.
|
|
||||||
*/
|
|
||||||
class TaskPoolHttp1Transport : Http1Transport {
|
|
||||||
private TaskPool httpTaskPool;
|
|
||||||
private immutable Http1TransportConfig config;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a new transport instance using a std.parallelism TaskPool for
|
|
||||||
* handling requests.
|
|
||||||
* Params:
|
|
||||||
* requestHandler = The handler to call for each incoming request.
|
|
||||||
* config = The configuration settings for this transport instance.
|
|
||||||
*/
|
|
||||||
this(HttpRequestHandler requestHandler, in Http1TransportConfig config = defaultConfig()) {
|
|
||||||
super(requestHandler, config.port);
|
|
||||||
this.config = config;
|
|
||||||
this.httpTaskPool = new TaskPool(config.workerCount);
|
|
||||||
}
|
|
||||||
|
|
||||||
override void runServer() {
|
|
||||||
Socket serverSocket = new TcpSocket();
|
|
||||||
serverSocket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1);
|
|
||||||
serverSocket.bind(parseAddress(config.host, config.port));
|
|
||||||
debugF!"Bound the server socket to %s"(serverSocket.localAddress);
|
|
||||||
serverSocket.listen(1024);
|
|
||||||
debug_("Server is now listening.");
|
|
||||||
|
|
||||||
while (super.isRunning) {
|
|
||||||
try {
|
|
||||||
trace("Waiting to accept a new socket.");
|
|
||||||
Socket clientSocket = serverSocket.accept();
|
|
||||||
trace("Accepted a new socket.");
|
|
||||||
auto t = task!handleClient(clientSocket, requestHandler);
|
|
||||||
this.httpTaskPool.put(t);
|
|
||||||
trace("Added handleClient() task to the task pool.");
|
|
||||||
} catch (SocketAcceptException e) {
|
|
||||||
warn("Failed to accept socket connection.", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
serverSocket.close();
|
|
||||||
this.httpTaskPool.stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
override void stop() {
|
|
||||||
super.stop();
|
|
||||||
// Send a dummy request to cause the server's blocking accept() call to end.
|
|
||||||
try {
|
|
||||||
Socket dummySocket = new TcpSocket(new InternetAddress("127.0.0.1", port));
|
|
||||||
dummySocket.shutdown(SocketShutdown.BOTH);
|
|
||||||
dummySocket.close();
|
|
||||||
} catch (SocketOSException e) {
|
|
||||||
warn("Failed to send empty request to stop server.", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
unittest {
|
|
||||||
import slf4d.default_provider;
|
|
||||||
auto logProvider = DefaultProvider.builder().withRootLoggingLevel(Levels.DEBUG).build();
|
|
||||||
configureLoggingProvider(logProvider);
|
|
||||||
|
|
||||||
HttpRequestHandler handler = HttpRequestHandler.of(
|
|
||||||
(ref ServerHttpRequest request, ref ServerHttpResponse response) {
|
|
||||||
response.status = HttpStatus.OK;
|
|
||||||
});
|
|
||||||
testHttp1Transport(new TaskPoolHttp1Transport(handler));
|
|
||||||
|
|
||||||
HttpRequestHandler handler2 = HttpRequestHandler.of(
|
|
||||||
(ref ServerHttpRequest request, ref ServerHttpResponse response) {
|
|
||||||
response.status = HttpStatus.OK;
|
|
||||||
response.writeBodyString("Testing");
|
|
||||||
});
|
|
||||||
testHttp1Transport(new TaskPoolHttp1Transport(handler2));
|
|
||||||
}
|
|
|
@ -1,114 +1,56 @@
|
||||||
module handy_http_transport.http1.transport;
|
module handy_http_transport.http1.transport;
|
||||||
|
|
||||||
import std.socket;
|
import std.socket;
|
||||||
import core.atomic : atomicStore, atomicLoad;
|
|
||||||
|
|
||||||
import handy_http_transport.interfaces;
|
import handy_http_transport.interfaces;
|
||||||
import handy_http_transport.helpers;
|
|
||||||
import handy_http_transport.response_output_stream;
|
|
||||||
|
|
||||||
import handy_http_primitives;
|
import handy_http_primitives;
|
||||||
import handy_http_primitives.address;
|
|
||||||
|
|
||||||
import streams;
|
import streams;
|
||||||
import slf4d;
|
import photon;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Base class for HTTP/1.1 transport, where different subclasses can define
|
* The HTTP/1.1 transport protocol implementation, using Dimitry Olshansky's
|
||||||
* how the actual socket communication works (threadpool / epoll/ etc).
|
* Photon fiber scheduling library for concurrency.
|
||||||
*/
|
*/
|
||||||
abstract class Http1Transport : HttpTransport {
|
class Http1Transport : HttpTransport {
|
||||||
protected HttpRequestHandler requestHandler;
|
private Socket serverSocket;
|
||||||
protected immutable ushort port;
|
private HttpRequestHandler requestHandler;
|
||||||
|
private const ushort port;
|
||||||
private bool running = false;
|
private bool running = false;
|
||||||
|
|
||||||
this(HttpRequestHandler requestHandler, ushort port = 8080) {
|
this(HttpRequestHandler requestHandler, ushort port = 8080) {
|
||||||
assert(requestHandler !is null);
|
assert(requestHandler !is null);
|
||||||
|
this.serverSocket = new TcpSocket();
|
||||||
this.requestHandler = requestHandler;
|
this.requestHandler = requestHandler;
|
||||||
this.port = port;
|
this.port = port;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool isRunning() {
|
|
||||||
return atomicLoad(running);
|
|
||||||
}
|
|
||||||
|
|
||||||
void start() {
|
void start() {
|
||||||
infoF!"Starting Http1Transport server on port %d."(port);
|
startloop();
|
||||||
atomicStore(running, true);
|
go(() => runServer());
|
||||||
runServer();
|
runFibers();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract void runServer();
|
|
||||||
|
|
||||||
void stop() {
|
void stop() {
|
||||||
infoF!"Stopping Http1Transport server on port %d."(port);
|
this.running = false;
|
||||||
atomicStore(running, false);
|
this.serverSocket.shutdown(SocketShutdown.BOTH);
|
||||||
|
this.serverSocket.close();
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
version(unittest) {
|
private void runServer() {
|
||||||
/**
|
this.running = true;
|
||||||
* A generic test to ensure that any Http1Transport implementation behaves
|
serverSocket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1);
|
||||||
* properly to start & stop, and process requests when running.
|
serverSocket.bind(new InternetAddress("127.0.0.1", port));
|
||||||
*
|
serverSocket.listen(100);
|
||||||
* It's assumed that the given transport is configured to run on localhost,
|
while (running) {
|
||||||
* port 8080, and return a standard 200 OK empty response to all requests.
|
try {
|
||||||
* Params:
|
Socket clientSocket = serverSocket.accept();
|
||||||
* transport = The transport implementation to test.
|
go(() => handleClient(clientSocket, requestHandler));
|
||||||
*/
|
} catch (SocketAcceptException e) {
|
||||||
void testHttp1Transport(Http1Transport transport) {
|
import std.stdio;
|
||||||
import core.thread;
|
stderr.writefln!"Failed to accept socket connection: %s"(e);
|
||||||
import std.string;
|
|
||||||
|
|
||||||
import slf4d.default_provider;
|
|
||||||
auto loggingProvider = new DefaultProvider(Levels.DEBUG);
|
|
||||||
configureLoggingProvider(loggingProvider);
|
|
||||||
|
|
||||||
infoF!"Testing Http1Transport implementation: %s"(transport);
|
|
||||||
|
|
||||||
Thread thread = transport.startInNewThread();
|
|
||||||
Thread.sleep(msecs(100));
|
|
||||||
|
|
||||||
Socket clientSocket1 = new TcpSocket(new InternetAddress(8080));
|
|
||||||
const requestBody = "POST /users HTTP/1.1\r\n" ~
|
|
||||||
"Host: example.com\r\n" ~
|
|
||||||
"Content-Type: text/plain\r\n" ~
|
|
||||||
"Content-Length: 13\r\n" ~
|
|
||||||
"\r\n" ~
|
|
||||||
"Hello, world!";
|
|
||||||
ptrdiff_t bytesSent = clientSocket1.send(requestBody);
|
|
||||||
assert(bytesSent == requestBody.length, "Couldn't send the full request body to the server.");
|
|
||||||
|
|
||||||
ubyte[8192] buffer;
|
|
||||||
size_t totalBytesReceived = 0;
|
|
||||||
ptrdiff_t bytesReceived;
|
|
||||||
do {
|
|
||||||
bytesReceived = clientSocket1.receive(buffer[totalBytesReceived .. $]);
|
|
||||||
if (bytesReceived == Socket.ERROR) {
|
|
||||||
assert(false, "Socket error when attempting to receive a response from the HttpTransport server.");
|
|
||||||
}
|
}
|
||||||
totalBytesReceived += bytesReceived;
|
}
|
||||||
} while (bytesReceived > 0);
|
|
||||||
|
|
||||||
string httpResponseContent = cast(string) buffer[0 .. totalBytesReceived];
|
|
||||||
string[] parts = httpResponseContent.split("\r\n\r\n");
|
|
||||||
assert(parts.length > 0, "HTTP 1.1 response is missing required status and headers section:\n\n" ~ httpResponseContent);
|
|
||||||
string[] headerLines = parts[0].split("\r\n");
|
|
||||||
assert(headerLines.length > 0, "HTTP 1.1 response is missing required status line.");
|
|
||||||
string statusLine = headerLines[0];
|
|
||||||
string[] statusLineParts = statusLine.split(" ");
|
|
||||||
assert(statusLineParts[0] == "HTTP/1.1");
|
|
||||||
assert(
|
|
||||||
statusLineParts[1] == "200",
|
|
||||||
format!"Expected status line's HTTP code to be 200, but it was \"%s\"."(statusLineParts[1])
|
|
||||||
);
|
|
||||||
assert(statusLineParts[2] == "OK");
|
|
||||||
|
|
||||||
info("Testing is complete. Stopping the server.");
|
|
||||||
transport.stop();
|
|
||||||
thread.join();
|
|
||||||
|
|
||||||
resetLoggingState();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -122,118 +64,29 @@ version(unittest) {
|
||||||
* requestHandler = The request handler that will handle the received HTTP request.
|
* requestHandler = The request handler that will handle the received HTTP request.
|
||||||
*/
|
*/
|
||||||
void handleClient(Socket clientSocket, HttpRequestHandler requestHandler) {
|
void handleClient(Socket clientSocket, HttpRequestHandler requestHandler) {
|
||||||
SocketInputStream* inputStream = new SocketInputStream(clientSocket);
|
auto inputStream = SocketInputStream(clientSocket);
|
||||||
BufferedInputStream!(SocketInputStream*, 8192)* bufferedInput
|
auto bufferedInput = bufferedInputStreamFor!(8192)(inputStream);
|
||||||
= new BufferedInputStream!(SocketInputStream*, 8192)(inputStream);
|
auto result = readHttpRequest(&bufferedInput);
|
||||||
// Get remote address from the socket.
|
|
||||||
import handy_http_primitives.address;
|
|
||||||
ClientAddress addr = getAddress(clientSocket);
|
|
||||||
traceF!"Got request from client: %s"(addr.toString());
|
|
||||||
auto result = readHttpRequest(bufferedInput, addr);
|
|
||||||
if (result.hasError) {
|
if (result.hasError) {
|
||||||
if (result.error.code != -1) {
|
import std.stdio;
|
||||||
// Only warn if we didn't read an empty request.
|
stderr.writeln("Failed to read HTTP request: " ~ result.error.message);
|
||||||
warnF!"Failed to read request: %s"(result.error.message);
|
|
||||||
}
|
|
||||||
inputStream.closeStream();
|
inputStream.closeStream();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
scope ServerHttpRequest request = result.request;
|
scope ServerHttpRequest request = result.request;
|
||||||
scope ServerHttpResponse response;
|
scope ServerHttpResponse response;
|
||||||
SocketOutputStream* outputStream = new SocketOutputStream(clientSocket);
|
SocketOutputStream outputStream = SocketOutputStream(clientSocket);
|
||||||
HttpResponseOutputStream!(SocketOutputStream*) responseOutputStream
|
response.outputStream = outputStreamObjectFor(HttpResponseOutputStream!(SocketOutputStream*)(
|
||||||
= HttpResponseOutputStream!(SocketOutputStream*)(
|
&outputStream,
|
||||||
outputStream,
|
&response
|
||||||
&response
|
));
|
||||||
);
|
|
||||||
response.outputStream = outputStreamObjectFor(&responseOutputStream);
|
|
||||||
try {
|
try {
|
||||||
requestHandler.handle(request, response);
|
requestHandler.handle(request, response);
|
||||||
debugF!"%s %s -> %d %s"(request.method, request.url, response.status.code, response.status.text);
|
|
||||||
// If the response's headers aren't flushed yet, write them now.
|
|
||||||
if (!responseOutputStream.areHeadersFlushed()) {
|
|
||||||
trace("Flushing response headers because they weren't flushed by the request handler.");
|
|
||||||
auto writeResult = responseOutputStream.writeHeaders();
|
|
||||||
if (writeResult.hasError) {
|
|
||||||
errorF!"Failed to write response headers: %s"(writeResult.error.message);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
error("Exception thrown while handling request.", e);
|
import std.stdio;
|
||||||
} catch (Throwable t) {
|
stderr.writeln("Exception thrown while handling request: " ~ e.msg);
|
||||||
errorF!"Throwable error while handling request: %s"(t.msg);
|
|
||||||
throw t;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (response.status != HttpStatus.SWITCHING_PROTOCOLS) {
|
|
||||||
inputStream.closeStream();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test case where we use a local socket pair to test the full handleClient
|
|
||||||
// workflow from the HttpRequestHandler's point of view.
|
|
||||||
unittest {
|
|
||||||
Socket[2] sockets = socketPair();
|
|
||||||
Socket clientSocket = sockets[0];
|
|
||||||
Socket serverSocket = sockets[1];
|
|
||||||
const requestContent =
|
|
||||||
"POST /data HTTP/1.1\r\n" ~
|
|
||||||
"Content-Type: application/json\r\n" ~
|
|
||||||
"Content-Length: 22\r\n" ~
|
|
||||||
"\r\n" ~
|
|
||||||
"{\"x\": 5, \"flag\": true}";
|
|
||||||
clientSocket.send(cast(ubyte[]) requestContent);
|
|
||||||
|
|
||||||
class TestHandler : HttpRequestHandler {
|
|
||||||
import std.conv;
|
|
||||||
|
|
||||||
void handle(ref ServerHttpRequest request, ref ServerHttpResponse response) {
|
|
||||||
assert(request.headers["Content-Type"] == ["application/json"]);
|
|
||||||
assert("Content-Length" in request.headers && request.headers["Content-Length"].length > 0);
|
|
||||||
ulong contentLength = request.headers["Content-Length"][0].to!ulong;
|
|
||||||
assert(contentLength == 22);
|
|
||||||
ubyte[22] bodyBuffer;
|
|
||||||
auto readResult = request.inputStream.readFromStream(bodyBuffer);
|
|
||||||
assert(readResult.hasCount && readResult.count == 22);
|
|
||||||
assert(cast(string) bodyBuffer == "{\"x\": 5, \"flag\": true}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
handleClient(serverSocket, new TestHandler());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Gets a ClientAddress value from a socket's address information.
|
|
||||||
* Params:
|
|
||||||
* socket = The socket to get address information for.
|
|
||||||
* Returns: The address that was obtained.
|
|
||||||
*/
|
|
||||||
ClientAddress getAddress(Socket socket) {
|
|
||||||
try {
|
|
||||||
Address addr = socket.remoteAddress();
|
|
||||||
if (auto a = cast(InternetAddress) addr) {
|
|
||||||
union U {
|
|
||||||
ubyte[4] bytes;
|
|
||||||
uint intValue;
|
|
||||||
}
|
|
||||||
U u;
|
|
||||||
u.intValue = a.addr();
|
|
||||||
return ClientAddress.ofIPv4(IPv4InternetAddress(
|
|
||||||
u.bytes,
|
|
||||||
a.port()
|
|
||||||
));
|
|
||||||
} else if (auto a = cast(Internet6Address) addr) {
|
|
||||||
return ClientAddress.ofIPv6(IPv6InternetAddress(
|
|
||||||
a.addr(),
|
|
||||||
a.port()
|
|
||||||
));
|
|
||||||
} else if (auto a = cast(UnixAddress) addr) {
|
|
||||||
return ClientAddress.ofUnixSocket(UnixSocketAddress(a.path()));
|
|
||||||
} else {
|
|
||||||
return ClientAddress(ClientAddressType.UNKNOWN);
|
|
||||||
}
|
|
||||||
} catch (SocketOSException e) {
|
|
||||||
return ClientAddress(ClientAddressType.UNKNOWN);
|
|
||||||
}
|
}
|
||||||
|
inputStream.closeStream();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Alias for the result of the `readHttpRequest` function which parses HTTP requests.
|
/// Alias for the result of the `readHttpRequest` function which parses HTTP requests.
|
||||||
|
@ -243,22 +96,15 @@ alias HttpRequestParseResult = Either!(ServerHttpRequest, "request", StreamError
|
||||||
* Parses an HTTP/1.1 request from a byte input stream.
|
* Parses an HTTP/1.1 request from a byte input stream.
|
||||||
* Params:
|
* Params:
|
||||||
* inputStream = The byte input stream to read from.
|
* inputStream = The byte input stream to read from.
|
||||||
* addr = The client address, used in constructed the http request struct.
|
|
||||||
* Returns: Either the request which was parsed, or a stream error.
|
* Returns: Either the request which was parsed, or a stream error.
|
||||||
*/
|
*/
|
||||||
HttpRequestParseResult readHttpRequest(S)(S inputStream, in ClientAddress addr) if (isByteInputStream!S) {
|
HttpRequestParseResult readHttpRequest(S)(S inputStream) if (isByteInputStream!S) {
|
||||||
auto methodStr = consumeUntil(inputStream, " ");
|
import handy_http_primitives.address;
|
||||||
if (methodStr.hasError) {
|
|
||||||
if (methodStr.error.code == 0) {
|
|
||||||
// Set a custom code to indicate an empty request.
|
|
||||||
return HttpRequestParseResult(StreamError(methodStr.error.message, -1));
|
|
||||||
}
|
|
||||||
return HttpRequestParseResult(methodStr.error);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
auto methodStr = consumeUntil(inputStream, " ");
|
||||||
|
if (methodStr.hasError) return HttpRequestParseResult(methodStr.error);
|
||||||
auto urlStr = consumeUntil(inputStream, " ");
|
auto urlStr = consumeUntil(inputStream, " ");
|
||||||
if (urlStr.hasError) return HttpRequestParseResult(urlStr.error);
|
if (urlStr.hasError) return HttpRequestParseResult(urlStr.error);
|
||||||
|
|
||||||
auto versionStr = consumeUntil(inputStream, "\r\n");
|
auto versionStr = consumeUntil(inputStream, "\r\n");
|
||||||
if (versionStr.hasError) return HttpRequestParseResult(versionStr.error);
|
if (versionStr.hasError) return HttpRequestParseResult(versionStr.error);
|
||||||
|
|
||||||
|
@ -272,67 +118,21 @@ HttpRequestParseResult readHttpRequest(S)(S inputStream, in ClientAddress addr)
|
||||||
auto headersResult = parseHeaders(inputStream);
|
auto headersResult = parseHeaders(inputStream);
|
||||||
if (headersResult.hasError) return HttpRequestParseResult(headersResult.error);
|
if (headersResult.hasError) return HttpRequestParseResult(headersResult.error);
|
||||||
|
|
||||||
auto queryParams = parseQueryParameters(urlStr.value);
|
|
||||||
|
|
||||||
import std.uri : decode; // TODO: Remove dependency on phobos for this?
|
|
||||||
|
|
||||||
return HttpRequestParseResult(ServerHttpRequest(
|
return HttpRequestParseResult(ServerHttpRequest(
|
||||||
httpVersion,
|
httpVersion,
|
||||||
addr,
|
ClientAddress.init, // TODO: Get this from the socket, if possible?
|
||||||
methodStr.value,
|
methodStr.value,
|
||||||
decode(urlStr.value),
|
urlStr.value,
|
||||||
headersResult.headers,
|
headersResult.headers,
|
||||||
queryParams,
|
|
||||||
inputStreamObjectFor(inputStream)
|
inputStreamObjectFor(inputStream)
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
unittest {
|
|
||||||
import streams;
|
|
||||||
|
|
||||||
auto makeStream(string text) {
|
|
||||||
return arrayInputStreamFor(cast(ubyte[]) text);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Basic HTTP request.
|
|
||||||
ArrayInputStream!ubyte s1 = makeStream(
|
|
||||||
"GET /test?x=5 HTTP/1.1\r\n" ~
|
|
||||||
"Accept: text/plain\r\n" ~
|
|
||||||
"\r\n"
|
|
||||||
);
|
|
||||||
auto r1 = readHttpRequest(&s1, ClientAddress.unknown());
|
|
||||||
assert(r1.hasRequest);
|
|
||||||
assert(r1.request.httpVersion == HttpVersion.V1);
|
|
||||||
assert(r1.request.method == HttpMethod.GET);
|
|
||||||
assert(r1.request.url == "/test?x=5");
|
|
||||||
const r1ExpectedHeaders = ["Accept": ["text/plain"]];
|
|
||||||
assert(r1.request.headers == r1ExpectedHeaders);
|
|
||||||
assert(r1.request.clientAddress == ClientAddress.unknown());
|
|
||||||
|
|
||||||
// POST request with body. Test that the body is read correctly.
|
|
||||||
ArrayInputStream!ubyte s2 = makeStream(
|
|
||||||
"POST /data HTTP/1.1\r\n" ~
|
|
||||||
"Content-Type: text/plain\r\n" ~
|
|
||||||
"Content-Length: 12\r\n" ~
|
|
||||||
"\r\n" ~
|
|
||||||
"Hello world!"
|
|
||||||
);
|
|
||||||
auto r2 = readHttpRequest(&s2, ClientAddress.unknown());
|
|
||||||
assert(r2.hasRequest);
|
|
||||||
assert(r2.request.method == HttpMethod.POST);
|
|
||||||
ubyte[12] r2BodyBuffer;
|
|
||||||
StreamResult r2BodyReadResult = s2.readFromStream(r2BodyBuffer);
|
|
||||||
assert(r2BodyReadResult.count == 12);
|
|
||||||
assert(cast(string) r2BodyBuffer == "Hello world!");
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Parses HTTP headers from an input stream, and returns them as an associative
|
* Parses HTTP headers from an input stream, and returns them as an associative
|
||||||
* array mapping header names to their list of values.
|
* array mapping header names to their list of values.
|
||||||
* Params:
|
* Params:
|
||||||
* inputStream = The byte input stream to read from. Note that this stream
|
* inputStream = The byte input stream to read from.
|
||||||
* should be passed as a pointer / reference, values will be
|
|
||||||
* consumed from the stream.
|
|
||||||
* Returns: Either the headers, or a stream error.
|
* Returns: Either the headers, or a stream error.
|
||||||
*/
|
*/
|
||||||
Either!(string[][string], "headers", StreamError, "error") parseHeaders(S)(S inputStream) if (isByteInputStream!S) {
|
Either!(string[][string], "headers", StreamError, "error") parseHeaders(S)(S inputStream) if (isByteInputStream!S) {
|
||||||
|
@ -358,41 +158,179 @@ Either!(string[][string], "headers", StreamError, "error") parseHeaders(S)(S inp
|
||||||
return Either!(string[][string], "headers", StreamError, "error")(headers);
|
return Either!(string[][string], "headers", StreamError, "error")(headers);
|
||||||
}
|
}
|
||||||
|
|
||||||
unittest {
|
/**
|
||||||
import streams;
|
* Helper function to consume string content from an input stream until a
|
||||||
|
* certain target pattern of characters is encountered.
|
||||||
|
* Params:
|
||||||
|
* inputStream = The stream to read from.
|
||||||
|
* target = The target at which to stop reading.
|
||||||
|
* Returns: The string that was read, or a stream error.
|
||||||
|
*/
|
||||||
|
private Either!(string, "value", StreamError, "error") consumeUntil(S)(
|
||||||
|
S inputStream,
|
||||||
|
string target
|
||||||
|
) if (isByteInputStream!S) {
|
||||||
|
ubyte[1024] buffer;
|
||||||
|
size_t idx;
|
||||||
|
while (true) {
|
||||||
|
auto result = inputStream.readFromStream(buffer[idx .. idx + 1]);
|
||||||
|
if (result.hasError) return Either!(string, "value", StreamError, "error")(result.error);
|
||||||
|
if (result.count != 1) return Either!(string, "value", StreamError, "error")(
|
||||||
|
StreamError("Failed to read a single element", 1)
|
||||||
|
);
|
||||||
|
idx++;
|
||||||
|
if (idx >= target.length && buffer[idx - target.length .. idx] == target) {
|
||||||
|
return Either!(string, "value", StreamError, "error")(
|
||||||
|
cast(string) buffer[0 .. idx - target.length].idup
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if (idx >= buffer.length) {
|
||||||
|
return Either!(string, "value", StreamError, "error")(
|
||||||
|
StreamError("Couldn't find target \"" ~ target ~ "\" after reading 1024 bytes.", 1)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
auto makeStream(string text) {
|
/**
|
||||||
return arrayInputStreamFor(cast(ubyte[]) text);
|
* Internal helper function to get the first index of a character in a string.
|
||||||
|
* Params:
|
||||||
|
* s = The string to look in.
|
||||||
|
* c = The character to look for.
|
||||||
|
* offset = An optional offset to look from.
|
||||||
|
* Returns: The index of the character, or -1.
|
||||||
|
*/
|
||||||
|
private ptrdiff_t indexOf(string s, char c, size_t offset = 0) {
|
||||||
|
for (size_t i = offset; i < s.length; i++) {
|
||||||
|
if (s[i] == c) return i;
|
||||||
|
}
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Internal helper function that returns the slice of a string excluding any
|
||||||
|
* preceding or trailing spaces.
|
||||||
|
* Params:
|
||||||
|
* s = The string to strip.
|
||||||
|
* Returns: The slice of the string that has been stripped.
|
||||||
|
*/
|
||||||
|
private string stripSpaces(string s) {
|
||||||
|
if (s.length == 0) return s;
|
||||||
|
ptrdiff_t startIdx = 0;
|
||||||
|
while (s[startIdx] == ' ' && startIdx < s.length) startIdx++;
|
||||||
|
s = s[startIdx .. $];
|
||||||
|
ptrdiff_t endIdx = s.length - 1;
|
||||||
|
while (s[endIdx] == ' ' && endIdx >= 0) endIdx--;
|
||||||
|
return s[0 .. endIdx + 1];
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper function to append an unsigned integer value to a char buffer. It is
|
||||||
|
* assumed that there's enough space to write the value.
|
||||||
|
* Params:
|
||||||
|
* value = The value to append.
|
||||||
|
* buffer = The buffer to append to.
|
||||||
|
* idx = A reference to a variable tracking the next writable index in the buffer.
|
||||||
|
*/
|
||||||
|
private void writeUIntToBuffer(uint value, char[] buffer, ref size_t idx) {
|
||||||
|
const size_t startIdx = idx;
|
||||||
|
while (true) {
|
||||||
|
ubyte remainder = value % 10;
|
||||||
|
value /= 10;
|
||||||
|
buffer[idx++] = cast(char) ('0' + remainder);
|
||||||
|
if (value == 0) break;
|
||||||
|
}
|
||||||
|
// Swap the characters to proper order.
|
||||||
|
for (size_t i = 0; i < (idx - startIdx) / 2; i++) {
|
||||||
|
size_t p1 = i + startIdx;
|
||||||
|
size_t p2 = idx - i - 1;
|
||||||
|
char tmp = buffer[p1];
|
||||||
|
buffer[p1] = buffer[p2];
|
||||||
|
buffer[p2] = tmp;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A wrapper around a byte output stream that's used for writing HTTP response
|
||||||
|
* content. It keeps a reference to the `ServerHttpResponse` so that when a
|
||||||
|
* handler writes data to the stream, it'll flush the HTTP response status and
|
||||||
|
* headers beforehand.
|
||||||
|
*/
|
||||||
|
struct HttpResponseOutputStream(S) if (isByteOutputStream!S) {
|
||||||
|
/// The underlying output stream to write to.
|
||||||
|
private S outputStream;
|
||||||
|
/// A pointer to the HTTP response that this stream is for.
|
||||||
|
private ServerHttpResponse* response;
|
||||||
|
/// Flag that keeps track of if the HTTP status and headers were written.
|
||||||
|
private bool headersFlushed = false;
|
||||||
|
|
||||||
|
this(S outputStream, ServerHttpResponse* response) {
|
||||||
|
this.outputStream = outputStream;
|
||||||
|
this.response = response;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Basic valid headers.
|
/**
|
||||||
auto s1 = makeStream("Content-Type: application/json\r\n\r\n");
|
* Writes the given data to the stream. If the referenced HTTP response's
|
||||||
auto r1 = parseHeaders(&s1);
|
* status and headers haven't yet been written, they will be written first.
|
||||||
assert(r1.hasHeaders);
|
* Params:
|
||||||
assert("Content-Type" in r1.headers);
|
* buffer = The buffer containing data to write.
|
||||||
assert(r1.headers["Content-Type"] == ["application/json"]);
|
* Returns: The result of writing. If status and headers are written, the
|
||||||
|
* number of bytes written will include that in addition to the buffer size.
|
||||||
|
*/
|
||||||
|
StreamResult writeToStream(ubyte[] buffer) {
|
||||||
|
uint bytesWritten = 0;
|
||||||
|
if (!headersFlushed) {
|
||||||
|
auto result = writeHeaders();
|
||||||
|
if (result.hasError) return result;
|
||||||
|
bytesWritten += result.count;
|
||||||
|
headersFlushed = true;
|
||||||
|
}
|
||||||
|
auto result = outputStream.writeToStream(buffer);
|
||||||
|
if (result.hasError) return result;
|
||||||
|
return StreamResult(result.count + bytesWritten);
|
||||||
|
}
|
||||||
|
|
||||||
// Multiple headers.
|
/**
|
||||||
auto s2 = makeStream("Accept: text, json, image\r\nContent-Length: 1234\r\n\r\n");
|
* Writes HTTP/1.1 status line and headers to the underlying output stream,
|
||||||
auto r2 = parseHeaders(&s2);
|
* which is done before any body content can be written.
|
||||||
assert(r2.hasHeaders);
|
* Returns: The stream result of writing.
|
||||||
assert("Accept" in r2.headers);
|
*/
|
||||||
assert(r2.headers["Accept"] == ["text, json, image"]);
|
StreamResult writeHeaders() {
|
||||||
assert(r2.headers["Content-Length"] == ["1234"]);
|
// TODO: Come up with a better way of writing headers than string concatenation.
|
||||||
|
size_t idx = 0;
|
||||||
|
char[6] statusCodeBuffer; // Normal HTTP codes are 3 digits, but this leaves room for extensions.
|
||||||
|
writeUIntToBuffer(response.status.code, statusCodeBuffer, idx);
|
||||||
|
|
||||||
// Basic invalid header string.
|
string statusAndHeaders = "HTTP/1.1 "
|
||||||
auto s3 = makeStream("Invalid headers");
|
~ cast(string) statusCodeBuffer[0..idx]
|
||||||
auto r3 = parseHeaders(&s3);
|
~ " " ~ response.status.text
|
||||||
assert(r3.hasError);
|
~ "\r\n";
|
||||||
|
foreach (headerName; response.headers.keys) {
|
||||||
// No trailing \r\n
|
string headerLine = headerName ~ ": ";
|
||||||
auto s4 = makeStream("Content-Type: application/json");
|
string[] headerValues = response.headers.getAll(headerName);
|
||||||
auto r4 = parseHeaders(&s4);
|
for (size_t i = 0; i < headerValues.length; i++) {
|
||||||
assert(r4.hasError);
|
headerLine ~= headerValues[i];
|
||||||
|
if (i + 1 < headerValues.length) {
|
||||||
// Empty headers.
|
headerLine ~= ", ";
|
||||||
auto s5 = makeStream("\r\n");
|
}
|
||||||
auto r5 = parseHeaders(&s5);
|
}
|
||||||
assert(r5.hasHeaders);
|
headerLine ~= "\r\n";
|
||||||
assert(r5.headers.length == 0);
|
statusAndHeaders ~= headerLine;
|
||||||
|
}
|
||||||
|
statusAndHeaders ~= "\r\n"; // Trailing CLRF before the body.
|
||||||
|
return outputStream.writeToStream(cast(ubyte[]) statusAndHeaders);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
unittest {
|
||||||
|
class TestHandler : HttpRequestHandler {
|
||||||
|
void handle(ref ServerHttpRequest request, ref ServerHttpResponse response) {
|
||||||
|
response.status = HttpStatus.OK;
|
||||||
|
response.headers.add("Content-Type", "application/json");
|
||||||
|
response.outputStream.writeToStream(cast(ubyte[]) "{\"a\": 1}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
HttpTransport tp = new Http1Transport(new TestHandler(), 8080);
|
||||||
|
tp.start();
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,3 +1,2 @@
|
||||||
module handy_http_transport.http2;
|
module handy_http_transport.http2;
|
||||||
|
|
||||||
// Not yet implemented.
|
|
||||||
|
|
|
@ -4,21 +4,3 @@ interface HttpTransport {
|
||||||
void start();
|
void start();
|
||||||
void stop();
|
void stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
import core.thread;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Starts a new thread to run an HTTP transport implementation in, separate
|
|
||||||
* from the calling thread. This is useful for running a server in the
|
|
||||||
* background, like for integration tests.
|
|
||||||
* Params:
|
|
||||||
* transport = The transport implementation to start.
|
|
||||||
* Returns: The thread that was started.
|
|
||||||
*/
|
|
||||||
Thread startInNewThread(HttpTransport transport) {
|
|
||||||
Thread t = new Thread(() {
|
|
||||||
transport.start();
|
|
||||||
});
|
|
||||||
t.start();
|
|
||||||
return t;
|
|
||||||
}
|
|
||||||
|
|
|
@ -2,6 +2,3 @@ module handy_http_transport;
|
||||||
|
|
||||||
public import handy_http_transport.http1;
|
public import handy_http_transport.http1;
|
||||||
public import handy_http_transport.http2;
|
public import handy_http_transport.http2;
|
||||||
|
|
||||||
public import handy_http_transport.interfaces;
|
|
||||||
public import handy_http_transport.response_output_stream;
|
|
||||||
|
|
|
@ -1,145 +0,0 @@
|
||||||
module handy_http_transport.response_output_stream;
|
|
||||||
|
|
||||||
import handy_http_transport.helpers : writeUIntToBuffer;
|
|
||||||
import handy_http_primitives : ServerHttpResponse;
|
|
||||||
import streams;
|
|
||||||
import slf4d;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A wrapper around a byte output stream that's used for writing HTTP response
|
|
||||||
* content. It keeps a reference to the `ServerHttpResponse` so that when a
|
|
||||||
* handler writes data to the stream, it'll flush the HTTP response status and
|
|
||||||
* headers beforehand.
|
|
||||||
*/
|
|
||||||
struct HttpResponseOutputStream(S) if (isByteOutputStream!S) {
|
|
||||||
/// The underlying output stream to write to.
|
|
||||||
private S outputStream;
|
|
||||||
/// A pointer to the HTTP response that this stream is for.
|
|
||||||
private ServerHttpResponse* response;
|
|
||||||
/// Flag that keeps track of if the HTTP status and headers were written.
|
|
||||||
private bool headersFlushed = false;
|
|
||||||
|
|
||||||
this(S outputStream, ServerHttpResponse* response) {
|
|
||||||
this.outputStream = outputStream;
|
|
||||||
this.response = response;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Determines if the HTTP response headers have been flushed to the
|
|
||||||
* underlying output stream.
|
|
||||||
* Returns: `true` if the headers have been flushed, `false` otherwise.
|
|
||||||
*/
|
|
||||||
bool areHeadersFlushed() const {
|
|
||||||
return headersFlushed;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Writes the given data to the stream. If the referenced HTTP response's
|
|
||||||
* status and headers haven't yet been written, they will be written first.
|
|
||||||
* Params:
|
|
||||||
* buffer = The buffer containing data to write.
|
|
||||||
* Returns: The result of writing. If status and headers are written, the
|
|
||||||
* number of bytes written will include that in addition to the buffer size.
|
|
||||||
*/
|
|
||||||
StreamResult writeToStream(ubyte[] buffer) {
|
|
||||||
uint bytesWritten = 0;
|
|
||||||
if (!headersFlushed) {
|
|
||||||
auto result = writeHeaders();
|
|
||||||
if (result.hasError) return result;
|
|
||||||
bytesWritten += result.count;
|
|
||||||
}
|
|
||||||
auto result = outputStream.writeToStream(buffer);
|
|
||||||
if (result.hasError) return result;
|
|
||||||
return StreamResult(result.count + bytesWritten);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Writes HTTP/1.1 status line and headers to the underlying output stream,
|
|
||||||
* which is done before any body content can be written.
|
|
||||||
* Returns: The stream result of writing.
|
|
||||||
*/
|
|
||||||
StreamResult writeHeaders() {
|
|
||||||
if (headersFlushed) {
|
|
||||||
return StreamResult(0); // No need to write again.
|
|
||||||
}
|
|
||||||
debug_("Flushing HTTP status and headers to the output stream.");
|
|
||||||
headersFlushed = true;
|
|
||||||
size_t idx = 0;
|
|
||||||
char[6] statusCodeBuffer; // Normal HTTP codes are 3 digits, but this leaves room for extensions.
|
|
||||||
writeUIntToBuffer(response.status.code, statusCodeBuffer, idx);
|
|
||||||
// Write the status line.
|
|
||||||
StreamResult r = outputStream.writeToStream(cast(ubyte[]) "HTTP/1.1 ");
|
|
||||||
if (r.hasError) return r;
|
|
||||||
size_t writeCount = r.count;
|
|
||||||
traceF!"Wrote HTTP version. Bytes written: %d."(writeCount);
|
|
||||||
r = outputStream.writeToStream(cast(ubyte[]) statusCodeBuffer[0..idx]);
|
|
||||||
if (r.hasError) return r;
|
|
||||||
writeCount += r.count;
|
|
||||||
traceF!"Wrote status code. Bytes written: %d."(writeCount);
|
|
||||||
r = outputStream.writeToStream([' ']);
|
|
||||||
if (r.hasError) return r;
|
|
||||||
writeCount += r.count;
|
|
||||||
r = outputStream.writeToStream(cast(ubyte[]) response.status.text);
|
|
||||||
if (r.hasError) return r;
|
|
||||||
writeCount += r.count;
|
|
||||||
r = outputStream.writeToStream(['\r', '\n']);
|
|
||||||
if (r.hasError) return r;
|
|
||||||
writeCount += r.count;
|
|
||||||
traceF!"Wrote HTTP status line. Bytes written: %d."(writeCount);
|
|
||||||
|
|
||||||
foreach (headerName; response.headers.keys) {
|
|
||||||
// Write the header name.
|
|
||||||
traceF!"Writing header name: %s"(headerName);
|
|
||||||
r = outputStream.writeToStream(cast(ubyte[]) headerName);
|
|
||||||
if (r.hasError) return r;
|
|
||||||
writeCount += r.count;
|
|
||||||
r = outputStream.writeToStream([':', ' ']);
|
|
||||||
if (r.hasError) return r;
|
|
||||||
writeCount += r.count;
|
|
||||||
// Write the comma-separated list of values.
|
|
||||||
string[] headerValues = response.headers.getAll(headerName);
|
|
||||||
for (size_t i = 0; i < headerValues.length; i++) {
|
|
||||||
r = outputStream.writeToStream(cast(ubyte[]) headerValues[i]);
|
|
||||||
if (r.hasError) return r;
|
|
||||||
writeCount += r.count;
|
|
||||||
if (i + 1 < headerValues.length) {
|
|
||||||
r = outputStream.writeToStream([',', ' ']);
|
|
||||||
if (r.hasError) return r;
|
|
||||||
writeCount += r.count;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
r = outputStream.writeToStream(['\r', '\n']);
|
|
||||||
if (r.hasError) return r;
|
|
||||||
writeCount += r.count;
|
|
||||||
traceF!"Wrote header %s: %s"(headerName, headerValues);
|
|
||||||
}
|
|
||||||
r = outputStream.writeToStream(['\r', '\n']); // Trailing CLRF before the body.
|
|
||||||
if (r.hasError) return r;
|
|
||||||
writeCount += r.count;
|
|
||||||
return StreamResult(cast(uint) writeCount);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test basic functionality for writing a standard response with headers and a
|
|
||||||
// body.
|
|
||||||
unittest {
|
|
||||||
import handy_http_primitives.response;
|
|
||||||
|
|
||||||
ArrayOutputStream!ubyte os;
|
|
||||||
ServerHttpResponse resp;
|
|
||||||
resp.status = HttpStatus.OK;
|
|
||||||
resp.headers.add("Content-Type", "text/plain");
|
|
||||||
auto httpOut = HttpResponseOutputStream!(ArrayOutputStream!ubyte*)(&os, &resp);
|
|
||||||
resp.outputStream = outputStreamObjectFor(&httpOut);
|
|
||||||
assert(httpOut.areHeadersFlushed() == false);
|
|
||||||
StreamResult r = resp.outputStream.writeToStream(cast(ubyte[]) "Hello world!");
|
|
||||||
const expectedOutput =
|
|
||||||
"HTTP/1.1 200 OK\r\n" ~
|
|
||||||
"Content-Type: text/plain\r\n" ~
|
|
||||||
"\r\n" ~
|
|
||||||
"Hello world!";
|
|
||||||
assert(httpOut.areHeadersFlushed() == true);
|
|
||||||
assert(os.toArray() == expectedOutput);
|
|
||||||
assert(r.hasCount);
|
|
||||||
assert(r.count == os.toArray().length);
|
|
||||||
}
|
|
Loading…
Reference in New Issue