Added photon, tagged version of primitives, and some server logic cleanup.

This commit is contained in:
Andrew Lalis 2025-01-09 21:52:11 -05:00
parent 97f544d95b
commit b865247da7
3 changed files with 42 additions and 29 deletions

View File

@ -4,9 +4,8 @@
], ],
"copyright": "Copyright © 2024, Andrew Lalis", "copyright": "Copyright © 2024, Andrew Lalis",
"dependencies": { "dependencies": {
"handy-http-primitives": { "handy-http-primitives": "~>1.0.0",
"path": "../primitives" "photon": "~>0.10.2",
},
"streams": "~>3.5.0" "streams": "~>3.5.0"
}, },
"description": "Implementations of HTTP transport protocols.", "description": "Implementations of HTTP transport protocols.",

View File

@ -1,7 +1,9 @@
{ {
"fileVersion": 1, "fileVersion": 1,
"versions": { "versions": {
"handy-http-primitives": {"path":"../primitives"}, "handy-http-primitives": "1.0.0",
"photon": "0.10.2",
"sharded-map": "2.7.0",
"streams": "3.5.0" "streams": "3.5.0"
} }
} }

View File

@ -1,42 +1,34 @@
module handy_http_transport.http1.transport; module handy_http_transport.http1.transport;
import std.socket; // TODO: Implement this without std.socket? import std.socket;
import std.stdio;
import handy_http_transport.interfaces; import handy_http_transport.interfaces;
import handy_http_primitives; import handy_http_primitives;
import streams; import streams;
import photon;
/** /**
* The HTTP/1.1 transport protocol implementation. * The HTTP/1.1 transport protocol implementation, using Dimitry Olshansky's
* Photon fiber scheduling library for concurrency.
*/ */
class Http1Transport : HttpTransport { class Http1Transport : HttpTransport {
private Socket serverSocket; private Socket serverSocket;
private HttpRequestHandler requestHandler; private HttpRequestHandler requestHandler;
private const ushort port;
private bool running = false; private bool running = false;
this(HttpRequestHandler requestHandler) { this(HttpRequestHandler requestHandler, ushort port = 8080) {
assert(requestHandler !is null);
this.serverSocket = new TcpSocket(); this.serverSocket = new TcpSocket();
this.requestHandler = requestHandler; this.requestHandler = requestHandler;
this.port = port;
} }
void start() { void start() {
this.running = true; startloop();
serverSocket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1); go(() => runServer());
serverSocket.bind(new InternetAddress("127.0.0.1", 8080)); runFibers();
serverSocket.listen(100);
while (running) {
try {
Socket clientSocket = serverSocket.accept();
import core.thread.osthread;
Thread t = new Thread(() => handleClient(clientSocket, requestHandler));
t.start();
} catch (SocketAcceptException e) {
import std.stdio;
stderr.writefln!"Failed to accept socket connection: %s"(e);
}
}
} }
void stop() { void stop() {
@ -44,6 +36,22 @@ class Http1Transport : HttpTransport {
this.serverSocket.shutdown(SocketShutdown.BOTH); this.serverSocket.shutdown(SocketShutdown.BOTH);
this.serverSocket.close(); this.serverSocket.close();
} }
private void runServer() {
this.running = true;
serverSocket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, 1);
serverSocket.bind(new InternetAddress("127.0.0.1", port));
serverSocket.listen(100);
while (running) {
try {
Socket clientSocket = serverSocket.accept();
go(() => handleClient(clientSocket, requestHandler));
} catch (SocketAcceptException e) {
import std.stdio;
stderr.writefln!"Failed to accept socket connection: %s"(e);
}
}
}
} }
/** /**
@ -60,19 +68,23 @@ void handleClient(Socket clientSocket, HttpRequestHandler requestHandler) {
auto bufferedInput = bufferedInputStreamFor!(8192)(inputStream); auto bufferedInput = bufferedInputStreamFor!(8192)(inputStream);
auto result = readHttpRequest(&bufferedInput); auto result = readHttpRequest(&bufferedInput);
if (result.hasError) { if (result.hasError) {
import std.stdio;
stderr.writeln("Failed to read HTTP request: " ~ result.error.message); stderr.writeln("Failed to read HTTP request: " ~ result.error.message);
inputStream.closeStream(); inputStream.closeStream();
return; return;
} }
ServerHttpRequest request = result.request; scope ServerHttpRequest request = result.request;
ServerHttpResponse response; scope ServerHttpResponse response;
SocketOutputStream outputStream = SocketOutputStream(clientSocket); SocketOutputStream outputStream = SocketOutputStream(clientSocket);
response.outputStream = outputStreamObjectFor(HttpResponseOutputStream!(SocketOutputStream*)( response.outputStream = outputStreamObjectFor(HttpResponseOutputStream!(SocketOutputStream*)(
&outputStream, &outputStream,
&response &response
)); ));
if (requestHandler !is null) { try {
requestHandler.handle(request, response); requestHandler.handle(request, response);
} catch (Exception e) {
import std.stdio;
stderr.writeln("Exception thrown while handling request: " ~ e.msg);
} }
inputStream.closeStream(); inputStream.closeStream();
} }
@ -214,7 +226,7 @@ private string stripSpaces(string s) {
/** /**
* Helper function to append an unsigned integer value to a char buffer. It is * Helper function to append an unsigned integer value to a char buffer. It is
* assumed that there's enough space to write value. * assumed that there's enough space to write the value.
* Params: * Params:
* value = The value to append. * value = The value to append.
* buffer = The buffer to append to. * buffer = The buffer to append to.
@ -314,11 +326,11 @@ unittest {
class TestHandler : HttpRequestHandler { class TestHandler : HttpRequestHandler {
void handle(ref ServerHttpRequest request, ref ServerHttpResponse response) { void handle(ref ServerHttpRequest request, ref ServerHttpResponse response) {
response.status = HttpStatus.OK; response.status = HttpStatus.OK;
response.headers.add("ContentType", "application/json"); response.headers.add("Content-Type", "application/json");
response.outputStream.writeToStream(cast(ubyte[]) "{\"a\": 1}"); response.outputStream.writeToStream(cast(ubyte[]) "{\"a\": 1}");
} }
} }
HttpTransport tp = new Http1Transport(new TestHandler()); HttpTransport tp = new Http1Transport(new TestHandler(), 8080);
tp.start(); tp.start();
} }