From 6faf8ca8e970d86d0a108c6890cdf1977331e1d5 Mon Sep 17 00:00:00 2001 From: andrewlalis Date: Sat, 5 Jul 2025 14:31:46 -0400 Subject: [PATCH] Got websockets working with http1transport. --- dub.json | 5 +- dub.selections.json | 4 +- examples/.gitignore | 1 + examples/simple-example.d | 17 ++- source/handy_http_websockets/connection.d | 23 ++- source/handy_http_websockets/frame.d | 9 +- source/handy_http_websockets/handler.d | 169 +++++++++++++++++++-- source/handy_http_websockets/manager.d | 170 ---------------------- source/handy_http_websockets/package.d | 1 - 9 files changed, 200 insertions(+), 199 deletions(-) create mode 100644 examples/.gitignore delete mode 100644 source/handy_http_websockets/manager.d diff --git a/dub.json b/dub.json index 6f2d45c..8a4ace2 100644 --- a/dub.json +++ b/dub.json @@ -4,9 +4,8 @@ ], "copyright": "Copyright © 2025, Andrew Lalis", "dependencies": { - "handy-http-primitives": "~>1.6", - "slf4d": "~>4", - "photon": "~>0.10" + "handy-http-primitives": "~>1.7", + "slf4d": "~>4" }, "description": "Websocket implementation for Handy-Http.", "license": "CC0", diff --git a/dub.selections.json b/dub.selections.json index 092b1d6..3da2f38 100644 --- a/dub.selections.json +++ b/dub.selections.json @@ -1,9 +1,7 @@ { "fileVersion": 1, "versions": { - "handy-http-primitives": "1.6.0", - "photon": "0.11.0", - "sharded-map": "2.7.0", + "handy-http-primitives": "1.7.0", "slf4d": "4.1.1", "streams": "3.6.0" } diff --git a/examples/.gitignore b/examples/.gitignore new file mode 100644 index 0000000..b95e9b3 --- /dev/null +++ b/examples/.gitignore @@ -0,0 +1 @@ +simple-example \ No newline at end of file diff --git a/examples/simple-example.d b/examples/simple-example.d index adaa2b2..86b9461 100755 --- a/examples/simple-example.d +++ b/examples/simple-example.d @@ -1,6 +1,6 @@ #!/usr/bin/env dub /+ dub.sdl: - dependency "handy-http-transport" version="~>1.1" + dependency "handy-http-transport" path="../../transport" dependency "handy-http-websockets" path="../" +/ @@ -16,6 +16,7 @@ import handy_http_transport; import handy_http_primitives; import handy_http_websockets; import slf4d; +import slf4d.default_provider; import core.thread; class MyMessageHandler : WebSocketMessageHandler { @@ -23,18 +24,18 @@ class MyMessageHandler : WebSocketMessageHandler { override void onConnectionEstablished(WebSocketConnection conn, in ServerHttpRequest req) { info("Connection established."); - import photon : go; - go(() { - while (!closed) { + this.closed = false; + new Thread(() { + while (!this.closed) { info("Broadcasting..."); - webSocketManager.broadcast("BROADCAST TEST!"); + conn.getRequestHandler().broadcast("BROADCAST TEST!"); Thread.sleep(seconds(5)); } - }); + }).start(); } override void onTextMessage(WebSocketTextMessage msg) { - infoF!"Got a text message: %s"(msg.payload); + infoF!"Got a text message from connection %s: %s"(msg.conn.id, msg.payload); msg.conn.sendTextMessage("test"); } @@ -76,5 +77,5 @@ void main() { }); // Start the server with all default settings. - new Http1Transport(handler).start(); + new TaskPoolHttp1Transport(handler).start(); } diff --git a/source/handy_http_websockets/connection.d b/source/handy_http_websockets/connection.d index b2d3a3d..86c6b9a 100644 --- a/source/handy_http_websockets/connection.d +++ b/source/handy_http_websockets/connection.d @@ -7,6 +7,7 @@ import std.socket; import handy_http_websockets.components; import handy_http_websockets.frame; +import handy_http_websockets.handler; /** * All the data that represents a WebSocket connection tracked by the @@ -29,8 +30,19 @@ class WebSocketConnection { */ private WebSocketMessageHandler messageHandler; - this(WebSocketMessageHandler messageHandler, InputStream!ubyte inputStream, OutputStream!ubyte outputStream) { + /** + * The HTTP request handler that created and owns this connection. + */ + private WebSocketRequestHandler requestHandler; + + this( + WebSocketMessageHandler messageHandler, + WebSocketRequestHandler requestHandler, + InputStream!ubyte inputStream, + OutputStream!ubyte outputStream + ) { this.messageHandler = messageHandler; + this.requestHandler = requestHandler; this.inputStream = inputStream; this.outputStream = outputStream; this.id = randomUUID(); @@ -44,6 +56,15 @@ class WebSocketConnection { return this.messageHandler; } + /** + * Gets the `WebSocketRequestHandler` that owns this connection. Use this + * to interact with the set of connections managed by that handler. + * Returns: The request handler. + */ + WebSocketRequestHandler getRequestHandler() { + return this.requestHandler; + } + /** * Sends a text message to the connected client. * Params: diff --git a/source/handy_http_websockets/frame.d b/source/handy_http_websockets/frame.d index 571dc5e..f7cf0ba 100644 --- a/source/handy_http_websockets/frame.d +++ b/source/handy_http_websockets/frame.d @@ -106,6 +106,11 @@ void sendWebSocketPongFrame(S)(S stream, ubyte[] pingPayload) if (isByteOutputSt * frame = The frame to write. */ void sendWebSocketFrame(S)(S stream, WebSocketFrame frame) if (isByteOutputStream!S) { + traceF!"Sending websocket frame: opcode=%s, final=%s, payload.length=%d"( + frame.opcode, + frame.finalFragment, + frame.payload.length + ); static if (isPointerToStream!S) { S ptr = stream; } else { @@ -160,7 +165,7 @@ WebSocketFrame receiveWebSocketFrame(S)(S stream) if (isByteInputStream!S) { immutable ubyte maskAndLength = readDataOrThrow!(ubyte)(ptr); immutable bool payloadMasked = (maskAndLength & 128) > 0; immutable ubyte initialPayloadLength = maskAndLength & 127; - debugF!"Websocket data frame Mask bit = %s, Initial payload length = %d"(payloadMasked, initialPayloadLength); + traceF!"Websocket data frame Mask bit = %s, Initial payload length = %d"(payloadMasked, initialPayloadLength); size_t payloadLength = readPayloadLength(initialPayloadLength, ptr); if (isControlFrame && payloadLength > 125) { throw new WebSocketException("Control frame payload is too large."); @@ -168,7 +173,7 @@ WebSocketFrame receiveWebSocketFrame(S)(S stream) if (isByteInputStream!S) { ubyte[4] maskingKey; if (payloadMasked) maskingKey = readDataOrThrow!(ubyte[4])(ptr); - debugF!"Receiving websocket frame: (FIN=%s,OP=%d,MASK=%s,LENGTH=%d)"( + traceF!"Receiving websocket frame: (FIN=%s,OP=%d,MASK=%s,LENGTH=%d)"( finalFragment, opcode, payloadMasked, diff --git a/source/handy_http_websockets/handler.d b/source/handy_http_websockets/handler.d index 3b738ff..35bbc29 100644 --- a/source/handy_http_websockets/handler.d +++ b/source/handy_http_websockets/handler.d @@ -1,12 +1,15 @@ module handy_http_websockets.handler; +import core.sync.rwmutex : ReadWriteMutex; +import std.uuid; + import handy_http_primitives; import slf4d; import streams; import handy_http_websockets.components; import handy_http_websockets.connection; -import handy_http_websockets.manager : webSocketManager; +import handy_http_websockets.frame; /** * An HTTP request handler implementation that's used as the entrypoint for @@ -17,6 +20,9 @@ import handy_http_websockets.manager : webSocketManager; class WebSocketRequestHandler : HttpRequestHandler { private WebSocketMessageHandler messageHandler; + private shared WebSocketConnection[UUID] connections; + private shared ReadWriteMutex connectionsMutex; + /** * Constructs a request handler that will use the given message handler to * deal with events from any websocket connections that are established. @@ -25,6 +31,7 @@ class WebSocketRequestHandler : HttpRequestHandler { */ this(WebSocketMessageHandler messageHandler) { this.messageHandler = messageHandler; + this.connectionsMutex = new shared ReadWriteMutex(); } /** @@ -39,20 +46,168 @@ class WebSocketRequestHandler : HttpRequestHandler { void handle(ref ServerHttpRequest request, ref ServerHttpResponse response) { auto verification = verifyWebSocketRequest(request); if (verification == RequestVerificationResponse.INVALID_HTTP_METHOD) { + warnF!"Received a %s request to a websocket request handler. Only GET requests are expected."( + request.method + ); sendErrorResponse(response, HttpStatus.METHOD_NOT_ALLOWED, "Only GET requests are allowed."); return; } if (verification == RequestVerificationResponse.MISSING_KEY) { + warn("Received a request to a websocket request handler, but is " ~ + "missing the required \"Sec-WebSocket-Key\" header."); sendErrorResponse(response, HttpStatus.BAD_REQUEST, "Missing Sec-WebSocket-Key header."); return; } sendSwitchingProtocolsResponse(request, response); - webSocketManager.addConnection(new WebSocketConnection( + addConnection(new WebSocketConnection( messageHandler, + this, request.inputStream, response.outputStream ), request); } + + /** + * Gets the number of active connections. + * Returns: The number of active connections. + */ + size_t connectionCount() { + synchronized(connectionsMutex.reader) { + return connections.length; + } + } + + /** + * Adds a connection to the manager and starts listening for messages. + * Params: + * conn = The connection to add. + * request = The HTTP request that initiated the connection. + */ + void addConnection(WebSocketConnection conn, in ServerHttpRequest request) { + import core.thread; + synchronized(connectionsMutex.writer) { + connections[conn.id] = cast(shared(WebSocketConnection)) conn; + } + new Thread(() => connectionHandler(conn)).start(); + conn.getMessageHandler().onConnectionEstablished(conn, request); + debugF!"Added websocket connection: %s"(conn.id.toString()); + } + + /** + * Removes a websocket connection from the manager and closes it. This is + * called automatically if the client sends a CLOSE frame, but you can also + * call it yourself. + * Params: + * conn = The connection to remove. + */ + void removeConnection(WebSocketConnection conn) { + synchronized(connectionsMutex.writer) { + connections.remove(conn.id); + } + conn.close(); + debugF!"Removed websocket connection: %s"(conn.id.toString()); + } + + /** + * Broadcasts a message to all connected clients. + * Params: + * text = The text to send to all clients. + */ + void broadcast(string text) { + synchronized(connectionsMutex.reader) { + debugF!"Broadcasting %d-length text message to %d clients."(text.length, connections.length); + foreach (id, conn; connections) { + try { + (cast(WebSocketConnection) conn).sendTextMessage(text); + } catch (WebSocketException e) { + warnF!"Failed to broadcast to client %s: %s"(id.toString(), e.msg); + } + } + } + } + + /** + * Broadcasts a binary message to all connected clients. + * Params: + * data = The binary data to send to all clients. + */ + void broadcast(ubyte[] data) { + synchronized(connectionsMutex.reader) { + debugF!"Broadcasting %d bytes of binary data to %d clients."(data.length, connections.length); + foreach (id, conn; connections) { + try { + (cast(WebSocketConnection) conn).sendBinaryMessage(data); + } catch (WebSocketException e) { + warnF!"Failed to broadcast to client %s: %s"(id.toString(), e.msg); + } + } + } + } + + /** + * Internal routine that runs in a fiber, and handles an individual websocket + * connection by listening for messages. + * Params: + * conn = The connection to handle. + */ + private void connectionHandler(WebSocketConnection conn) { + debugF!"Started routine to monitor websocket connection %s."(conn.id.toString()); + bool running = true; + while (running) { + try { + WebSocketFrame f = receiveWebSocketFrame(conn.inputStream); + switch (f.opcode) { + case WebSocketFrameOpcode.CONNECTION_CLOSE: + removeConnection(conn); + running = false; + break; + case WebSocketFrameOpcode.PING: + sendWebSocketPongFrame(conn.outputStream, f.payload); + break; + case WebSocketFrameOpcode.TEXT_FRAME: + case WebSocketFrameOpcode.BINARY_FRAME: + handleClientDataFrame(conn, f); + break; + case WebSocketFrameOpcode.CONTINUATION: + warn("Got websocket CONTINUATION frame when not expecting one. Ignoring."); + break; + default: + break; + } + } catch (Exception e) { + error(e); + running = false; + } + } + debugF!"Routine to monitor websocket connection %s has ended."(conn.id.toString()); + } + + /** + * Handles a websocket data frame (text or binary). + * Params: + * conn = The connection from which the frame was received. + * f = The frame that was received. + */ + private void handleClientDataFrame(WebSocketConnection conn, WebSocketFrame f) { + bool isText = f.opcode == WebSocketFrameOpcode.TEXT_FRAME; + ubyte[] payload = f.payload.dup; + while (!f.finalFragment) { + f = receiveWebSocketFrame(conn.inputStream); + if (f.opcode != WebSocketFrameOpcode.CONTINUATION) { + removeConnection(conn); + warnF!"Received invalid websocket frame opcode %s when expecting a CONTINUATION frame."( + f.opcode + ); + return; + } + payload ~= f.payload; + } + if (isText) { + conn.getMessageHandler().onTextMessage(WebSocketTextMessage(conn, cast(string) payload)); + } else { + conn.getMessageHandler().onBinaryMessage(WebSocketBinaryMessage(conn, payload)); + } + } } private enum RequestVerificationResponse { @@ -73,15 +228,7 @@ private RequestVerificationResponse verifyWebSocketRequest(in ServerHttpRequest private void sendErrorResponse(ref ServerHttpResponse response, HttpStatus status, string msg) { response.status = status; - ubyte[] data = cast(ubyte[]) msg; - import std.conv : to; - response.headers.add("Content-Type", "text/plain"); - response.headers.add("Content-Length", data.length.to!string); - auto result = response.outputStream.writeToStream(data); - if (result.hasError) { - StreamError e = result.error; - warnF!"Failed to send HTTP error response: %s"(e.message); - } + response.writeBodyString(msg); } private void sendSwitchingProtocolsResponse(in ServerHttpRequest request, ref ServerHttpResponse response) { diff --git a/source/handy_http_websockets/manager.d b/source/handy_http_websockets/manager.d deleted file mode 100644 index e6486e4..0000000 --- a/source/handy_http_websockets/manager.d +++ /dev/null @@ -1,170 +0,0 @@ -module handy_http_websockets.manager; - -import core.thread.osthread : Thread; -import core.sync.rwmutex : ReadWriteMutex; -import std.uuid; -import streams; -import slf4d; -import photon : go; -import handy_http_primitives.request : ServerHttpRequest; - -import handy_http_websockets.connection; -import handy_http_websockets.components; -import handy_http_websockets.frame; - -/** - * Global singleton websocket manager that handles all websocket connections. - * Generally, the `addConnection` method will be called by a `WebSocketRequestHandler` - * that you've registered in your server, so users will most often use the - * manager to access the set of connected clients, and broadcast messages to - * them. - */ -__gshared WebSocketManager webSocketManager; - -static this() { - webSocketManager = new WebSocketManager(); -} - -/** - * The websocket manager is responsible for managing all websocket connections. - */ -class WebSocketManager { - private WebSocketConnection[UUID] connections; - private ReadWriteMutex connectionsMutex; - - this() { - connectionsMutex = new ReadWriteMutex(); - } - - /** - * Adds a connection to the manager and starts listening for messages. - * Usually only called by a `WebSocketRequestHandler`. - * Params: - * conn = The connection to add. - * request = The HTTP request that initiated the connection. - */ - void addConnection(WebSocketConnection conn, in ServerHttpRequest request) { - synchronized(connectionsMutex.writer) { - connections[conn.id] = conn; - } - go(() => connectionHandler(conn)); - conn.getMessageHandler().onConnectionEstablished(conn, request); - debugF!"Added websocket connection: %s"(conn.id.toString()); - } - - /** - * Removes a websocket connection from the manager and closes it. This is - * called automatically if the client sends a CLOSE frame, but you can also - * call it yourself. - * Params: - * conn = - */ - void removeConnection(WebSocketConnection conn) { - synchronized(connectionsMutex.writer) { - connections.remove(conn.id); - } - conn.close(); - debugF!"Removed websocket connection: %s"(conn.id.toString()); - } - - /** - * Broadcasts a message to all connected clients. - * Params: - * text = The text to send to all clients. - */ - void broadcast(string text) { - debugF!"Broadcasting %d-length text message to all clients."(text.length); - synchronized(connectionsMutex.reader) { - foreach (id, conn; connections) { - try { - conn.sendTextMessage(text); - } catch (WebSocketException e) { - warnF!"Failed to broadcast to client %s: %s"(id.toString(), e.msg); - } - } - } - } - - /** - * Broadcasts a binary message to all connected clients. - * Params: - * data = The binary data to send to all clients. - */ - void broadcast(ubyte[] data) { - debugF!"Broadcasting %d bytes of binary data to all clients."(data.length); - synchronized(connectionsMutex.reader) { - foreach (id, conn; connections) { - try { - conn.sendBinaryMessage(data); - } catch (WebSocketException e) { - warnF!"Failed to broadcast to client %s: %s"(id.toString(), e.msg); - } - } - } - } -} - -/** - * Internal routine that runs in a fiber, and handles an individual websocket - * connection by listening for messages. - * Params: - * conn = The connection to handle. - */ -private void connectionHandler(WebSocketConnection conn) { - traceF!"Started routine to monitor websocket connection %s."(conn.id.toString()); - bool running = true; - while (running) { - try { - WebSocketFrame f = receiveWebSocketFrame(conn.inputStream); - switch (f.opcode) { - case WebSocketFrameOpcode.CONNECTION_CLOSE: - webSocketManager.removeConnection(conn); - running = false; - break; - case WebSocketFrameOpcode.PING: - sendWebSocketPongFrame(conn.outputStream, f.payload); - break; - case WebSocketFrameOpcode.TEXT_FRAME: - case WebSocketFrameOpcode.BINARY_FRAME: - handleClientDataFrame(conn, f); - break; - case WebSocketFrameOpcode.CONTINUATION: - warn("Got websocket CONTINUATION frame when not expecting one. Ignoring."); - break; - default: - break; - } - } catch (Exception e) { - error(e); - running = false; - } - } - traceF!"Routine to monitor websocket connection %s has ended."(conn.id.toString()); -} - -/** - * Handles a websocket data frame (text or binary). - * Params: - * conn = The connection from which the frame was received. - * f = The frame that was received. - */ -private void handleClientDataFrame(WebSocketConnection conn, WebSocketFrame f) { - bool isText = f.opcode == WebSocketFrameOpcode.TEXT_FRAME; - ubyte[] payload = f.payload.dup; - while (!f.finalFragment) { - f = receiveWebSocketFrame(conn.inputStream); - if (f.opcode != WebSocketFrameOpcode.CONTINUATION) { - webSocketManager.removeConnection(conn); - warnF!"Received invalid websocket frame opcode %s when expecting a CONTINUATION frame."( - f.opcode - ); - return; - } - payload ~= f.payload; - } - if (isText) { - conn.getMessageHandler().onTextMessage(WebSocketTextMessage(conn, cast(string) payload)); - } else { - conn.getMessageHandler().onBinaryMessage(WebSocketBinaryMessage(conn, payload)); - } -} diff --git a/source/handy_http_websockets/package.d b/source/handy_http_websockets/package.d index 29d973b..901e4a1 100644 --- a/source/handy_http_websockets/package.d +++ b/source/handy_http_websockets/package.d @@ -3,5 +3,4 @@ module handy_http_websockets; public import handy_http_websockets.components; public import handy_http_websockets.connection; public import handy_http_websockets.handler; -public import handy_http_websockets.manager; public import handy_http_websockets.frame : WebSocketCloseStatusCode;