websockets/source/handy_http_websockets/handler.d

254 lines
9.4 KiB
D

module handy_http_websockets.handler;
import core.sync.rwmutex : ReadWriteMutex;
import std.uuid;
import handy_http_primitives;
import slf4d;
import streams;
import handy_http_websockets.components;
import handy_http_websockets.connection;
import handy_http_websockets.frame;
/**
* An HTTP request handler implementation that's used as the entrypoint for
* clients that want to establish a websocket connection. It will verify the
* websocket request, and if successful, send back a SWITCHING_PROTOCOLS
* response, and register a new websocket connection.
*/
class WebSocketRequestHandler : HttpRequestHandler {
private WebSocketMessageHandler messageHandler;
private shared WebSocketConnection[UUID] connections;
private shared ReadWriteMutex connectionsMutex;
/**
* Constructs a request handler that will use the given message handler to
* deal with events from any websocket connections that are established.
* Params:
* messageHandler = The message handler to use.
*/
this(WebSocketMessageHandler messageHandler) {
this.messageHandler = messageHandler;
this.connectionsMutex = new shared ReadWriteMutex();
}
/**
* Handles an incoming HTTP request and tries to establish a websocket
* connection by first verifying the request, then sending a switching-
* protocols response, and finally registering the new connection with the
* websocket manager.
* Params:
* request = The request to read from.
* response = The response to write to.
*/
void handle(ref ServerHttpRequest request, ref ServerHttpResponse response) {
auto verification = verifyWebSocketRequest(request);
if (verification == RequestVerificationResponse.INVALID_HTTP_METHOD) {
warnF!"Received a %s request to a websocket request handler. Only GET requests are expected."(
request.method
);
sendErrorResponse(response, HttpStatus.METHOD_NOT_ALLOWED, "Only GET requests are allowed.");
return;
}
if (verification == RequestVerificationResponse.MISSING_KEY) {
warn("Received a request to a websocket request handler, but is " ~
"missing the required \"Sec-WebSocket-Key\" header.");
sendErrorResponse(response, HttpStatus.BAD_REQUEST, "Missing Sec-WebSocket-Key header.");
return;
}
sendSwitchingProtocolsResponse(request, response);
addConnection(new WebSocketConnection(
messageHandler,
this,
request.inputStream,
response.outputStream
), request);
}
/**
* Gets the number of active connections.
* Returns: The number of active connections.
*/
size_t connectionCount() {
synchronized(connectionsMutex.reader) {
return connections.length;
}
}
/**
* Adds a connection to the manager and starts listening for messages.
* Params:
* conn = The connection to add.
* request = The HTTP request that initiated the connection.
*/
void addConnection(WebSocketConnection conn, in ServerHttpRequest request) {
import core.thread;
synchronized(connectionsMutex.writer) {
connections[conn.id] = cast(shared(WebSocketConnection)) conn;
}
new Thread(() => connectionHandler(conn)).start();
conn.getMessageHandler().onConnectionEstablished(conn, request);
debugF!"Added websocket connection: %s"(conn.id.toString());
}
/**
* Removes a websocket connection from the manager and closes it. This is
* called automatically if the client sends a CLOSE frame, but you can also
* call it yourself.
* Params:
* conn = The connection to remove.
*/
void removeConnection(WebSocketConnection conn) {
synchronized(connectionsMutex.writer) {
connections.remove(conn.id);
}
conn.close();
debugF!"Removed websocket connection: %s"(conn.id.toString());
}
/**
* Broadcasts a message to all connected clients.
* Params:
* text = The text to send to all clients.
*/
void broadcast(string text) {
synchronized(connectionsMutex.reader) {
debugF!"Broadcasting %d-length text message to %d clients."(text.length, connections.length);
foreach (id, conn; connections) {
try {
(cast(WebSocketConnection) conn).sendTextMessage(text);
} catch (WebSocketException e) {
warnF!"Failed to broadcast to client %s: %s"(id.toString(), e.msg);
}
}
}
}
/**
* Broadcasts a binary message to all connected clients.
* Params:
* data = The binary data to send to all clients.
*/
void broadcast(ubyte[] data) {
synchronized(connectionsMutex.reader) {
debugF!"Broadcasting %d bytes of binary data to %d clients."(data.length, connections.length);
foreach (id, conn; connections) {
try {
(cast(WebSocketConnection) conn).sendBinaryMessage(data);
} catch (WebSocketException e) {
warnF!"Failed to broadcast to client %s: %s"(id.toString(), e.msg);
}
}
}
}
/**
* Internal routine that runs in a fiber, and handles an individual websocket
* connection by listening for messages.
* Params:
* conn = The connection to handle.
*/
private void connectionHandler(WebSocketConnection conn) {
debugF!"Started routine to monitor websocket connection %s."(conn.id.toString());
bool running = true;
while (running) {
try {
WebSocketFrame f = receiveWebSocketFrame(conn.inputStream);
switch (f.opcode) {
case WebSocketFrameOpcode.CONNECTION_CLOSE:
removeConnection(conn);
running = false;
break;
case WebSocketFrameOpcode.PING:
sendWebSocketPongFrame(conn.outputStream, f.payload);
break;
case WebSocketFrameOpcode.TEXT_FRAME:
case WebSocketFrameOpcode.BINARY_FRAME:
handleClientDataFrame(conn, f);
break;
case WebSocketFrameOpcode.CONTINUATION:
warn("Got websocket CONTINUATION frame when not expecting one. Ignoring.");
break;
default:
break;
}
} catch (Exception e) {
error(e);
running = false;
}
}
debugF!"Routine to monitor websocket connection %s has ended."(conn.id.toString());
}
/**
* Handles a websocket data frame (text or binary).
* Params:
* conn = The connection from which the frame was received.
* f = The frame that was received.
*/
private void handleClientDataFrame(WebSocketConnection conn, WebSocketFrame f) {
bool isText = f.opcode == WebSocketFrameOpcode.TEXT_FRAME;
ubyte[] payload = f.payload.dup;
while (!f.finalFragment) {
f = receiveWebSocketFrame(conn.inputStream);
if (f.opcode != WebSocketFrameOpcode.CONTINUATION) {
removeConnection(conn);
warnF!"Received invalid websocket frame opcode %s when expecting a CONTINUATION frame."(
f.opcode
);
return;
}
payload ~= f.payload;
}
if (isText) {
conn.getMessageHandler().onTextMessage(WebSocketTextMessage(conn, cast(string) payload));
} else {
conn.getMessageHandler().onBinaryMessage(WebSocketBinaryMessage(conn, payload));
}
}
}
private enum RequestVerificationResponse {
INVALID_HTTP_METHOD,
MISSING_KEY,
VALID
}
private RequestVerificationResponse verifyWebSocketRequest(in ServerHttpRequest r) {
if (r.method != HttpMethod.GET) {
return RequestVerificationResponse.INVALID_HTTP_METHOD;
}
if ("Sec-WebSocket-Key" !in r.headers || r.headers["Sec-WebSocket-Key"].length == 0) {
return RequestVerificationResponse.MISSING_KEY;
}
return RequestVerificationResponse.VALID;
}
private void sendErrorResponse(ref ServerHttpResponse response, HttpStatus status, string msg) {
response.status = status;
response.writeBodyString(msg);
}
private void sendSwitchingProtocolsResponse(in ServerHttpRequest request, ref ServerHttpResponse response) {
string key = request.headers["Sec-WebSocket-Key"][0];
response.status = HttpStatus.SWITCHING_PROTOCOLS;
response.headers.add("Upgrade", "websocket");
response.headers.add("Connection", "Upgrade");
response.headers.add("Sec-WebSocket-Accept", generateWebSocketAcceptHeader(key));
response.outputStream.writeToStream([]); // Trigger this to flush the response.
}
private string generateWebSocketAcceptHeader(string key) {
import std.digest.sha : sha1Of;
import std.base64;
ubyte[20] hash = sha1Of(key ~ "258EAFA5-E914-47DA-95CA-C5AB0DC85B11");
return Base64.encode(hash);
}
unittest {
string result = generateWebSocketAcceptHeader("dGhlIHNhbXBsZSBub25jZQ==");
assert(result == "s3pPLMBiTxaQ9kYGzzhZRbK+xOo=");
}