/** * This module defines input and output ranges that map onto sockets, to enable * easy reading and writing of data using all the benefits of ranges. It also * defines the interfaces used by HttpRequest and HttpResponse to allow * pluggable range implementations, useful for testing. */ module http_primitives.ranges; import std.exception; import std.format; import std.algorithm : min; import std.socket; import std.stdio; /** * An input range for reading from a Socket. */ struct SocketInputRange(size_t BufferSize) { /// The internal socket that serves as this range's data source. Socket socket; /// The internal (stack-allocated) buffer that this range uses. ubyte[BufferSize] buffer; /// The index representing the end (exclusive) of the data in the buffer. size_t bufferIdx = 0; /// Internal flag used to mark the socket as closed and thus this range empty. bool closed = false; this(Socket socket, bool initialRead = true) { this.socket = socket; if (initialRead) { popFront(); } } /** * Determines whether this socket input range is empty, which is true only * if we determine that the socket has been closed. Note that calling * `front()` may return an empty slice of the buffer if no data has been * received yet. * Returns: True if the socket has been closed and no more data can be read. */ bool empty() { return closed; } /** * Gets a slice to the data currently held in this range's buffer, which * may be empty (length of 0), even if `empty` returns false. * Returns: A slice to the data currently in this range's buffer. */ ubyte[] front() { return buffer[0 .. bufferIdx]; } /** * Discards the current contents of this range's buffer, and attempts to * receive more data from the socket if it's still alive. Warning! This * method will BLOCK if the underlying socket is blocking! */ void popFront() { if (!socket.isAlive) { closed = true; return; } ptrdiff_t bytesRead = socket.receive(buffer); if (bytesRead == 0) { closed = true; } else if (bytesRead == Socket.ERROR) { closed = true; throw new SocketRangeException(lastSocketError()); } else { bufferIdx = bytesRead; } } } /** * An output range for writing to a Socket. Serves as an output range for both * `ubyte` and `ubyte[]`, by using a template-defined buffer size. */ struct SocketOutputRange(size_t BufferSize) { /// The internal socket that's written to. Socket socket; /// The buffer to which data is first written before flushing to the socket. ubyte[BufferSize] buffer; /// The index of the buffer at which new data is written. size_t bufferIdx = 0; /** * Writes an array of bytes to the range. If this range's internal buffer * becomes full as a result of this method call, it will `flush()` and * write to the underlying socket. * Params: * bytes = The bytes to write. * Throws: `SocketRangeException` if flushing to the underlying socket fails. */ void put(ubyte[] bytes) { size_t dataIdx = 0; while (dataIdx < bytes.length) { const size_t bytesLeftToSend = bytes.length - dataIdx; const size_t bufferSpace = BufferSize - bufferIdx; const size_t bytesToCopy = min(bufferSpace, bytesLeftToSend); buffer[bufferIdx .. (bufferIdx + bytesToCopy)] = bytes[dataIdx .. (dataIdx + bytesToCopy)]; dataIdx += bytesToCopy; bufferIdx += bytesToCopy; if (bufferIdx == BufferSize) { flush(); } } } /** * Writes a single byte to the range. If this range's internal buffer * becomes full as a result of this method call, it will `flush()` and * write to the underlying socket. * Params: * singleByte = The byte to write. * Throws: `SocketRangeException` if flushing to the underlying socket fails. */ void put(ubyte singleByte) { buffer[bufferIdx++] = singleByte; if (bufferIdx == BufferSize) { flush(); } } /** * Writes a value to the range as a big-endian (network byte order) set of * bytes. Only works for integrals, chars, booleans, and float/double. See * `std.bitmanip.nativeToBigEndian` for details on the conversion. * Params: * value = The value to write. */ void put(T)(const T value) { import std.bitmanip : nativeToBigEndian; auto bytes = nativeToBigEndian(value); this.put(bytes); } /** * Flushes any data in the buffer to the underlying socket. * Throws: `SocketRangeException` if sending data fails. */ void flush() { if (bufferIdx == 0) return; const ptrdiff_t bytesSent = socket.send(buffer[0..bufferIdx]); if (bytesSent == Socket.ERROR) { throw new SocketRangeException(lastSocketError()); } else if (bytesSent != bufferIdx) { throw new SocketRangeException( format!"Failed to send all %d bytes. Only sent %d."(bufferIdx, bytesSent) ); } bufferIdx = 0; } } /** * An exception representing a socket IO error. */ class SocketRangeException : Exception { mixin basicExceptionCtors; } version(unittest) { /** * A convenience for unit tests, this test instance contains initialized * sockets and ranges with a configured buffer size. */ struct TestInstance(size_t outBufferSize, size_t inBufferSize) { Socket outputSocket; Socket inputSocket; SocketOutputRange!(outBufferSize) outputRange; SocketInputRange!(inBufferSize) inputRange; static TestInstance create() { Socket[2] pair = socketPair(); return TestInstance( pair[0], pair[1], SocketOutputRange!(outBufferSize)(pair[0]), SocketInputRange!(inBufferSize)(pair[1], false) ); } } alias StandardTestInstance = TestInstance!(4096, 4096); } // Test basic reading and writing. unittest { auto t = StandardTestInstance.create(); t.outputRange.put(cast(ubyte) 42); assert(t.outputRange.bufferIdx == 1); // Assert that the byte was put into the range's buffer. t.outputRange.flush(); assert(t.outputRange.bufferIdx == 0); // Assert that the data was written and the buffer reset. t.outputSocket.close(); assert(!t.inputRange.empty); // The input range should initially not be empty because the socket is not detected as dead yet. assert(t.inputRange.front.length == 0); // Because the test instance's input range has `initialRead` as false, no data has been read yet. t.inputRange.popFront(); assert(!t.inputRange.empty); assert(t.inputRange.front.length == 1); assert(t.inputRange.front[0] == 42); t.inputSocket.close(); t.inputRange.popFront(); // We need to attempt to read once more to determine if the socket has closed. assert(t.inputRange.empty); // Assert that the input range is indeed empty now. } // Test reading and writing big chunks that exceed the size limits. unittest { import std.file; import std.path; import std.array; auto t = TestInstance!(128, 128).create(); string filePath = buildPath("sub-packages", "http-primitives", "source", "http_primitives", "ranges.d"); string content = readText(filePath); assert(content.length > 4096); // Write the entire chunk of data to the output. t.outputRange.put(cast(ubyte[]) content); t.outputRange.flush(); t.outputSocket.close(); // Now read and append the data to an appender so we can check it. Appender!string app; while (!t.inputRange.empty) { app ~= cast(string) t.inputRange.front; t.inputRange.popFront(); } assert(content == app[]); } // Test reading and writing non-byte types (integral, bool, etc.) unittest { import std.bitmanip : bigEndianToNative; auto t = StandardTestInstance.create(); long value = 123_456_789_000; t.outputRange.put(value); t.outputRange.flush(); t.inputRange.popFront(); assert(t.inputRange.front.length == 8); ubyte[8] bytes = t.inputRange.front[0..8]; long readValue = bigEndianToNative!long(bytes); assert(readValue == value); bool bValue = false; t.outputRange.put(bValue); t.outputRange.flush(); t.inputRange.popFront(); assert(t.inputRange.front.length == 1); ubyte[1] bytes1 = t.inputRange.front[0..1]; bool readBValue = bigEndianToNative!bool(bytes1); assert(readBValue == bValue); t.outputSocket.close(); t.inputSocket.close(); } // Polymorphic OOP-Style ranges: /** * An interface for an output range to which bytes, and some other types * convertible to bytes, may be written. The underlying implementation is * likely buffered, so call `flush()` to write the buffered data once ready. */ interface ResponseOutputRange { void put(ubyte singleByte); void put(ubyte[] bytes); void put(T)(const T value); void flush(); } /** * An interface for an input range from which chunks of bytes can be read. */ interface RequestInputRange { bool empty(); ubyte[] front(); void popFront(); } class SocketResponseOutputRange(size_t BufferSize) : ResponseOutputRange { private SocketOutputRange!BufferSize outputRange; this(Socket socket) { this.outputRange = SocketOutputRange!BufferSize(socket); } void put(ubyte singleByte) { outputRange.put(singleByte); } void put(ubyte[] bytes) { outputRange.put(bytes); } void put(T)(const T value) { outputRange.put!(T)(value); } void flush() { outputRange.flush(); } } class SocketRequestInputRange(size_t BufferSize) : RequestInputRange { private SocketInputRange!BufferSize inputRange; this(Socket socket, bool initialRead = true) { this.inputRange = SocketInputRange!BufferSize(socket, initialRead); } bool empty() { return inputRange.empty(); } ubyte[] front() { return inputRange.front(); } void popFront() { inputRange.popFront(); } } /** * An output range that simply writes to an internal buffer, which is useful * for inspecting the data written to an HTTP response, for example. */ class ArrayResponseOutputRange : ResponseOutputRange { import std.array; Appender!(ubyte[]) app; void put(ubyte singleByte) { app ~= singleByte; } void put(ubyte[] bytes) { app ~= bytes; } void put(T)(const T value) { import std.bitmanip : nativeToBigEndian; auto bytes = nativeToBigEndian(value); app ~= bytes[0..T.sizeof]; } void flush() { // Do nothing. } ubyte[] data() { return app[]; } } // Test basic operations of the ArrayResponseOutputRange unittest { import std.bitmanip : bigEndianToNative; scope r = new ArrayResponseOutputRange(); r.put(cast(ubyte) 1); assert(r.data.length == 1); assert(r.data[0] == 1); r.put!ulong(42); assert(r.data.length == ubyte.sizeof + ulong.sizeof, format!"%d"(r.data.length)); ubyte[ulong.sizeof] bytes = r.data[1..ulong.sizeof + ubyte.sizeof]; ulong value = bigEndianToNative!ulong(bytes); assert(value == 42); scope r2 = new ArrayResponseOutputRange(); ubyte[] data = [1, 2, 3, 4, 5]; r2.put(data); assert(r2.data.length == data.length); assert(r2.data == data); } /** * An input range that simply supplies data from an internal buffer, which is * useful for validating HTTP request logic against pre-written requests, for * example. */ class ArrayRequestInputRange : RequestInputRange { ubyte[] data; bool popped = false; this(ubyte[] data) { this.data = data; } bool empty() { return !popped; } ubyte[] front() { if (popped) return []; return data; } void popFront() { popped = true; this.data = null; } }