From fe5c31cd2fae320ffa5a40f0b3b1d4ae258374a3 Mon Sep 17 00:00:00 2001 From: andrewlalis Date: Sun, 23 Mar 2025 21:16:34 -0400 Subject: [PATCH] Added websocket files and working implementation! --- dub.json | 3 +- dub.selections.json | 2 + examples/simple-example.d | 52 +++++++++++ source/handy_http_websockets/connection.d | 4 +- source/handy_http_websockets/handler.d | 9 +- source/handy_http_websockets/manager.d | 107 ++++++++++++++++++++++ source/handy_http_websockets/package.d | 2 + 7 files changed, 175 insertions(+), 4 deletions(-) create mode 100755 examples/simple-example.d create mode 100644 source/handy_http_websockets/manager.d diff --git a/dub.json b/dub.json index 9e24b6c..6204828 100644 --- a/dub.json +++ b/dub.json @@ -5,7 +5,8 @@ "copyright": "Copyright © 2025, Andrew Lalis", "dependencies": { "handy-http-primitives": "~>1.5", - "slf4d": "~>3" + "slf4d": "~>3", + "photon": "~>0.10" }, "description": "mplementati", "license": "CC0", diff --git a/dub.selections.json b/dub.selections.json index 67c9609..0098d19 100644 --- a/dub.selections.json +++ b/dub.selections.json @@ -2,6 +2,8 @@ "fileVersion": 1, "versions": { "handy-http-primitives": "1.5.0", + "photon": "0.10.2", + "sharded-map": "2.7.0", "slf4d": "3.0.1", "streams": "3.6.0" } diff --git a/examples/simple-example.d b/examples/simple-example.d new file mode 100755 index 0000000..20d4d66 --- /dev/null +++ b/examples/simple-example.d @@ -0,0 +1,52 @@ +#!/usr/bin/env dub +/+ dub.sdl: + dependency "handy-http-transport" version="~>1.1" + dependency "handy-http-websockets" path="../" ++/ + +module examples.simple_example; + +import handy_http_transport; +import handy_http_primitives; +import handy_http_websockets; +import slf4d; +import core.thread; + +class MyMessageHandler : WebSocketMessageHandler { + private bool closed = false; + + override void onConnectionEstablished(WebSocketConnection conn) { + info("Connection established."); + import photon : go; + go(() { + while (!closed) { + info("Broadcasting..."); + webSocketManager.broadcast("BROADCAST TEST!"); + Thread.sleep(seconds(5)); + } + }); + } + + override void onTextMessage(WebSocketTextMessage msg) { + infoF!"Got a text message: %s"(msg.payload); + msg.conn.sendTextMessage("test"); + } + + override void onBinaryMessage(WebSocketBinaryMessage msg) { + infoF!"Got a binary message: %s"(msg.payload); + } + + override void onCloseMessage(WebSocketCloseMessage msg) { + infoF!"Got a close message: %d - %s"(msg.statusCode, msg.message); + } + + override void onConnectionClosed(WebSocketConnection conn) { + info("Connection closed."); + closed = true; + } +} + +void main() { + HttpRequestHandler handler = new WebSocketRequestHandler(new MyMessageHandler()); + new Http1Transport(handler).start(); +} diff --git a/source/handy_http_websockets/connection.d b/source/handy_http_websockets/connection.d index 9e0232d..d0b3523 100644 --- a/source/handy_http_websockets/connection.d +++ b/source/handy_http_websockets/connection.d @@ -19,10 +19,10 @@ class WebSocketConnection { immutable UUID id; /// Stream for reading from the client. - private InputStream!ubyte inputStream; + InputStream!ubyte inputStream; /// Stream for writing to the client. - private OutputStream!ubyte outputStream; + OutputStream!ubyte outputStream; /** * The message handler that is called to handle this connection's events. diff --git a/source/handy_http_websockets/handler.d b/source/handy_http_websockets/handler.d index 8a05e15..c257fa2 100644 --- a/source/handy_http_websockets/handler.d +++ b/source/handy_http_websockets/handler.d @@ -5,6 +5,8 @@ import slf4d; import streams; import handy_http_websockets.components; +import handy_http_websockets.connection; +import handy_http_websockets.manager : webSocketManager; /** * An HTTP request handler implementation that's used as the entrypoint for @@ -30,7 +32,11 @@ class WebSocketRequestHandler : HttpRequestHandler { return; } sendSwitchingProtocolsResponse(request, response); - + webSocketManager.addConnection(new WebSocketConnection( + messageHandler, + request.inputStream, + response.outputStream + )); } } @@ -69,6 +75,7 @@ private void sendSwitchingProtocolsResponse(in ServerHttpRequest request, ref Se response.headers.add("Upgrade", "websocket"); response.headers.add("Connection", "Upgrade"); response.headers.add("Sec-WebSocket-Accept", generateWebSocketAcceptHeader(key)); + response.outputStream.writeToStream([]); // Trigger this to flush the response. } private string generateWebSocketAcceptHeader(string key) { diff --git a/source/handy_http_websockets/manager.d b/source/handy_http_websockets/manager.d new file mode 100644 index 0000000..c441ccc --- /dev/null +++ b/source/handy_http_websockets/manager.d @@ -0,0 +1,107 @@ +module handy_http_websockets.manager; + +import core.thread.osthread : Thread; +import core.sync.rwmutex : ReadWriteMutex; +import std.uuid; +import streams; +import slf4d; +import photon : go; + +import handy_http_websockets.connection; +import handy_http_websockets.components; +import handy_http_websockets.frame; + +__gshared WebSocketManager webSocketManager; + +static this() { + webSocketManager = new WebSocketManager(); +} + +class WebSocketManager { + private WebSocketConnection[UUID] connections; + private ReadWriteMutex connectionsMutex; + + this() { + connectionsMutex = new ReadWriteMutex(); + } + + void addConnection(WebSocketConnection conn) { + synchronized(connectionsMutex.writer) { + connections[conn.id] = conn; + } + go(() => connectionHandler(conn)); + conn.getMessageHandler().onConnectionEstablished(conn); + } + + void removeConnection(WebSocketConnection conn) { + synchronized(connectionsMutex.writer) { + connections.remove(conn.id); + } + conn.close(); + } + + void broadcast(string text) { + synchronized(connectionsMutex.reader) { + foreach (id, conn; connections) { + try { + conn.sendTextMessage(text); + } catch (WebSocketException e) { + warnF!"Failed to broadcast to client %s."(id.toString()); + } + } + } + } +} + +void connectionHandler(WebSocketConnection conn) { + infoF!"Started routine to monitor websocket connection %s."(conn.id.toString()); + bool running = true; + while (running) { + try { + WebSocketFrame f = receiveWebSocketFrame(conn.inputStream); + switch (f.opcode) { + case WebSocketFrameOpcode.CONNECTION_CLOSE: + webSocketManager.removeConnection(conn); + running = false; + break; + case WebSocketFrameOpcode.PING: + sendWebSocketPongFrame(conn.outputStream, f.payload); + break; + case WebSocketFrameOpcode.TEXT_FRAME: + case WebSocketFrameOpcode.BINARY_FRAME: + handleClientDataFrame(conn, f); + break; + case WebSocketFrameOpcode.CONTINUATION: + warn("Got websocket CONTINUATION frame when not expecting one. Ignoring."); + break; + default: + break; + } + } catch (Exception e) { + error(e); + running = false; + } + } + infoF!"Routine to monitor websocket connection %s has ended."(conn.id.toString()); +} + +void handleClientDataFrame(WebSocketConnection conn, WebSocketFrame f) { + bool isText = f.opcode == WebSocketFrameOpcode.TEXT_FRAME; + ubyte[] payload = f.payload.dup; + while (!f.finalFragment) { + f = receiveWebSocketFrame(conn.inputStream); + if (f.opcode != WebSocketFrameOpcode.CONTINUATION) { + webSocketManager.removeConnection(conn); + warnF!"Received invalid websocket frame opcode %s when expecting a CONTINUATION frame."( + f.opcode + ); + return; + } + payload ~= f.payload; + } + if (isText) { + conn.getMessageHandler().onTextMessage(WebSocketTextMessage(conn, cast(string) payload)); + } else { + conn.getMessageHandler().onBinaryMessage(WebSocketBinaryMessage(conn, payload)); + } +} diff --git a/source/handy_http_websockets/package.d b/source/handy_http_websockets/package.d index 38fdba6..29d973b 100644 --- a/source/handy_http_websockets/package.d +++ b/source/handy_http_websockets/package.d @@ -2,4 +2,6 @@ module handy_http_websockets; public import handy_http_websockets.components; public import handy_http_websockets.connection; +public import handy_http_websockets.handler; +public import handy_http_websockets.manager; public import handy_http_websockets.frame : WebSocketCloseStatusCode;