Added websocket files and working implementation!

This commit is contained in:
Andrew Lalis 2025-03-23 21:16:34 -04:00
parent 89a16ce6a8
commit fe5c31cd2f
7 changed files with 175 additions and 4 deletions

View File

@ -5,7 +5,8 @@
"copyright": "Copyright © 2025, Andrew Lalis",
"dependencies": {
"handy-http-primitives": "~>1.5",
"slf4d": "~>3"
"slf4d": "~>3",
"photon": "~>0.10"
},
"description": "mplementati",
"license": "CC0",

View File

@ -2,6 +2,8 @@
"fileVersion": 1,
"versions": {
"handy-http-primitives": "1.5.0",
"photon": "0.10.2",
"sharded-map": "2.7.0",
"slf4d": "3.0.1",
"streams": "3.6.0"
}

52
examples/simple-example.d Executable file
View File

@ -0,0 +1,52 @@
#!/usr/bin/env dub
/+ dub.sdl:
dependency "handy-http-transport" version="~>1.1"
dependency "handy-http-websockets" path="../"
+/
module examples.simple_example;
import handy_http_transport;
import handy_http_primitives;
import handy_http_websockets;
import slf4d;
import core.thread;
class MyMessageHandler : WebSocketMessageHandler {
private bool closed = false;
override void onConnectionEstablished(WebSocketConnection conn) {
info("Connection established.");
import photon : go;
go(() {
while (!closed) {
info("Broadcasting...");
webSocketManager.broadcast("BROADCAST TEST!");
Thread.sleep(seconds(5));
}
});
}
override void onTextMessage(WebSocketTextMessage msg) {
infoF!"Got a text message: %s"(msg.payload);
msg.conn.sendTextMessage("test");
}
override void onBinaryMessage(WebSocketBinaryMessage msg) {
infoF!"Got a binary message: %s"(msg.payload);
}
override void onCloseMessage(WebSocketCloseMessage msg) {
infoF!"Got a close message: %d - %s"(msg.statusCode, msg.message);
}
override void onConnectionClosed(WebSocketConnection conn) {
info("Connection closed.");
closed = true;
}
}
void main() {
HttpRequestHandler handler = new WebSocketRequestHandler(new MyMessageHandler());
new Http1Transport(handler).start();
}

View File

@ -19,10 +19,10 @@ class WebSocketConnection {
immutable UUID id;
/// Stream for reading from the client.
private InputStream!ubyte inputStream;
InputStream!ubyte inputStream;
/// Stream for writing to the client.
private OutputStream!ubyte outputStream;
OutputStream!ubyte outputStream;
/**
* The message handler that is called to handle this connection's events.

View File

@ -5,6 +5,8 @@ import slf4d;
import streams;
import handy_http_websockets.components;
import handy_http_websockets.connection;
import handy_http_websockets.manager : webSocketManager;
/**
* An HTTP request handler implementation that's used as the entrypoint for
@ -30,7 +32,11 @@ class WebSocketRequestHandler : HttpRequestHandler {
return;
}
sendSwitchingProtocolsResponse(request, response);
webSocketManager.addConnection(new WebSocketConnection(
messageHandler,
request.inputStream,
response.outputStream
));
}
}
@ -69,6 +75,7 @@ private void sendSwitchingProtocolsResponse(in ServerHttpRequest request, ref Se
response.headers.add("Upgrade", "websocket");
response.headers.add("Connection", "Upgrade");
response.headers.add("Sec-WebSocket-Accept", generateWebSocketAcceptHeader(key));
response.outputStream.writeToStream([]); // Trigger this to flush the response.
}
private string generateWebSocketAcceptHeader(string key) {

View File

@ -0,0 +1,107 @@
module handy_http_websockets.manager;
import core.thread.osthread : Thread;
import core.sync.rwmutex : ReadWriteMutex;
import std.uuid;
import streams;
import slf4d;
import photon : go;
import handy_http_websockets.connection;
import handy_http_websockets.components;
import handy_http_websockets.frame;
__gshared WebSocketManager webSocketManager;
static this() {
webSocketManager = new WebSocketManager();
}
class WebSocketManager {
private WebSocketConnection[UUID] connections;
private ReadWriteMutex connectionsMutex;
this() {
connectionsMutex = new ReadWriteMutex();
}
void addConnection(WebSocketConnection conn) {
synchronized(connectionsMutex.writer) {
connections[conn.id] = conn;
}
go(() => connectionHandler(conn));
conn.getMessageHandler().onConnectionEstablished(conn);
}
void removeConnection(WebSocketConnection conn) {
synchronized(connectionsMutex.writer) {
connections.remove(conn.id);
}
conn.close();
}
void broadcast(string text) {
synchronized(connectionsMutex.reader) {
foreach (id, conn; connections) {
try {
conn.sendTextMessage(text);
} catch (WebSocketException e) {
warnF!"Failed to broadcast to client %s."(id.toString());
}
}
}
}
}
void connectionHandler(WebSocketConnection conn) {
infoF!"Started routine to monitor websocket connection %s."(conn.id.toString());
bool running = true;
while (running) {
try {
WebSocketFrame f = receiveWebSocketFrame(conn.inputStream);
switch (f.opcode) {
case WebSocketFrameOpcode.CONNECTION_CLOSE:
webSocketManager.removeConnection(conn);
running = false;
break;
case WebSocketFrameOpcode.PING:
sendWebSocketPongFrame(conn.outputStream, f.payload);
break;
case WebSocketFrameOpcode.TEXT_FRAME:
case WebSocketFrameOpcode.BINARY_FRAME:
handleClientDataFrame(conn, f);
break;
case WebSocketFrameOpcode.CONTINUATION:
warn("Got websocket CONTINUATION frame when not expecting one. Ignoring.");
break;
default:
break;
}
} catch (Exception e) {
error(e);
running = false;
}
}
infoF!"Routine to monitor websocket connection %s has ended."(conn.id.toString());
}
void handleClientDataFrame(WebSocketConnection conn, WebSocketFrame f) {
bool isText = f.opcode == WebSocketFrameOpcode.TEXT_FRAME;
ubyte[] payload = f.payload.dup;
while (!f.finalFragment) {
f = receiveWebSocketFrame(conn.inputStream);
if (f.opcode != WebSocketFrameOpcode.CONTINUATION) {
webSocketManager.removeConnection(conn);
warnF!"Received invalid websocket frame opcode %s when expecting a CONTINUATION frame."(
f.opcode
);
return;
}
payload ~= f.payload;
}
if (isText) {
conn.getMessageHandler().onTextMessage(WebSocketTextMessage(conn, cast(string) payload));
} else {
conn.getMessageHandler().onBinaryMessage(WebSocketBinaryMessage(conn, payload));
}
}

View File

@ -2,4 +2,6 @@ module handy_http_websockets;
public import handy_http_websockets.components;
public import handy_http_websockets.connection;
public import handy_http_websockets.handler;
public import handy_http_websockets.manager;
public import handy_http_websockets.frame : WebSocketCloseStatusCode;