From 565d18c89be0af9a60432e03b4ee9126c9ae9288 Mon Sep 17 00:00:00 2001 From: Andrew Lalis Date: Thu, 7 Jul 2022 00:55:26 +0200 Subject: [PATCH] Added chunk data message and improved player connection flow a bit. --- .../aos2_client/CommunicationHandler.java | 21 ++++++++ .../main/java/nl/andrewl/aos_core/Net.java | 6 +-- .../java/nl/andrewl/aos_core/model/World.java | 14 +++-- .../aos_core/net/ChunkDataMessage.java | 16 ++++++ .../aos_core/net/ChunkHashMessage.java | 7 ++- .../nl/andrewl/aos_core/net/TcpReceiver.java | 17 ++++-- .../nl/andrewl/aos_core/net/UdpReceiver.java | 11 +++- .../ClientCommunicationHandler.java | 46 ++++++++++++---- .../java/nl/andrewl/aos2_server/Server.java | 52 ++++++++++++++++--- 9 files changed, 159 insertions(+), 31 deletions(-) create mode 100644 core/src/main/java/nl/andrewl/aos_core/net/ChunkDataMessage.java diff --git a/client/src/main/java/nl/andrewl/aos2_client/CommunicationHandler.java b/client/src/main/java/nl/andrewl/aos2_client/CommunicationHandler.java index c1243dd..625b981 100644 --- a/client/src/main/java/nl/andrewl/aos2_client/CommunicationHandler.java +++ b/client/src/main/java/nl/andrewl/aos2_client/CommunicationHandler.java @@ -13,6 +13,11 @@ import java.net.DatagramSocket; import java.net.InetAddress; import java.net.Socket; +/** + * Class which handles the client's communication with the server. This + * involves establishing a TCP and UDP connection, and providing generic + * methods for sending messages and processing those we receive. + */ public class CommunicationHandler { private Socket socket; private DatagramSocket datagramSocket; @@ -46,6 +51,15 @@ public class CommunicationHandler { } } + public void shutdown() { + try { + socket.close(); + datagramSocket.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + public void sendMessage(Message msg) { try { Net.write(msg, out); @@ -64,6 +78,13 @@ public class CommunicationHandler { } } + /** + * Establishes a UDP "connection" to the server, after we've already + * obtained our {@link CommunicationHandler#clientId} from our TCP + * connection. This continuously sends {@link DatagramInit} packets until + * the server responds with an echo of that packet. + * @throws IOException If an error occurs. + */ private void establishDatagramConnection() throws IOException { datagramSocket = new DatagramSocket(); boolean connectionEstablished = false; diff --git a/core/src/main/java/nl/andrewl/aos_core/Net.java b/core/src/main/java/nl/andrewl/aos_core/Net.java index 29bc5ab..71a3d22 100644 --- a/core/src/main/java/nl/andrewl/aos_core/Net.java +++ b/core/src/main/java/nl/andrewl/aos_core/Net.java @@ -1,9 +1,6 @@ package nl.andrewl.aos_core; -import nl.andrewl.aos_core.net.ChunkHashMessage; -import nl.andrewl.aos_core.net.ConnectAcceptMessage; -import nl.andrewl.aos_core.net.ConnectRejectMessage; -import nl.andrewl.aos_core.net.ConnectRequestMessage; +import nl.andrewl.aos_core.net.*; import nl.andrewl.aos_core.net.udp.DatagramInit; import nl.andrewl.record_net.Message; import nl.andrewl.record_net.Serializer; @@ -28,6 +25,7 @@ public final class Net { serializer.registerType(3, ConnectRejectMessage.class); serializer.registerType(4, DatagramInit.class); serializer.registerType(5, ChunkHashMessage.class); + serializer.registerType(6, ChunkDataMessage.class); } public static ExtendedDataInputStream getInputStream(InputStream in) { diff --git a/core/src/main/java/nl/andrewl/aos_core/model/World.java b/core/src/main/java/nl/andrewl/aos_core/model/World.java index 950b401..bb9fb42 100644 --- a/core/src/main/java/nl/andrewl/aos_core/model/World.java +++ b/core/src/main/java/nl/andrewl/aos_core/model/World.java @@ -1,15 +1,21 @@ package nl.andrewl.aos_core.model; import org.joml.Vector3i; +import org.joml.Vector3ic; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.Set; public class World { - private final Map chunkMap = new HashMap<>(); - private final Set players = new HashSet<>(); + private final Map chunkMap = new HashMap<>(); + + public void addChunk(Chunk chunk) { + chunkMap.put(chunk.getPosition(), chunk); + } + + public Map getChunkMap() { + return chunkMap; + } public byte getBlockAt(int x, int y, int z) { int chunkX = x / Chunk.SIZE; diff --git a/core/src/main/java/nl/andrewl/aos_core/net/ChunkDataMessage.java b/core/src/main/java/nl/andrewl/aos_core/net/ChunkDataMessage.java new file mode 100644 index 0000000..8560519 --- /dev/null +++ b/core/src/main/java/nl/andrewl/aos_core/net/ChunkDataMessage.java @@ -0,0 +1,16 @@ +package nl.andrewl.aos_core.net; + +import nl.andrewl.aos_core.model.Chunk; +import nl.andrewl.record_net.Message; + +/** + * A message containing all the information about a chunk, to send to a client. + */ +public record ChunkDataMessage( + int cx, int cy, int cz, + byte[] blocks +) implements Message { + public ChunkDataMessage(Chunk chunk) { + this(chunk.getPosition().x, chunk.getPosition().y, chunk.getPosition().z, chunk.getBlocks()); + } +} diff --git a/core/src/main/java/nl/andrewl/aos_core/net/ChunkHashMessage.java b/core/src/main/java/nl/andrewl/aos_core/net/ChunkHashMessage.java index b043c10..34cae45 100644 --- a/core/src/main/java/nl/andrewl/aos_core/net/ChunkHashMessage.java +++ b/core/src/main/java/nl/andrewl/aos_core/net/ChunkHashMessage.java @@ -1,5 +1,6 @@ package nl.andrewl.aos_core.net; +import nl.andrewl.aos_core.model.Chunk; import nl.andrewl.record_net.Message; /** @@ -14,4 +15,8 @@ import nl.andrewl.record_net.Message; public record ChunkHashMessage( int cx, int cy, int cz, long hash -) implements Message {} +) implements Message { + public ChunkHashMessage(Chunk chunk) { + this(chunk.getPosition().x, chunk.getPosition().y, chunk.getPosition().z, chunk.blockHash()); + } +} diff --git a/core/src/main/java/nl/andrewl/aos_core/net/TcpReceiver.java b/core/src/main/java/nl/andrewl/aos_core/net/TcpReceiver.java index da207b1..1511089 100644 --- a/core/src/main/java/nl/andrewl/aos_core/net/TcpReceiver.java +++ b/core/src/main/java/nl/andrewl/aos_core/net/TcpReceiver.java @@ -12,28 +12,37 @@ import java.util.function.Consumer; public class TcpReceiver implements Runnable { private final ExtendedDataInputStream in; private final Consumer messageConsumer; + private Runnable shutdownHook; public TcpReceiver(ExtendedDataInputStream in, Consumer messageConsumer) { this.in = in; this.messageConsumer = messageConsumer; } + public TcpReceiver withShutdownHook(Runnable shutdownHook) { + this.shutdownHook = shutdownHook; + return this; + } + @Override public void run() { - while (true) { + boolean running = true; + while (running) { try { Message msg = Net.read(in); messageConsumer.accept(msg); } catch (SocketException e) { if (e.getMessage().equals("Socket closed")) { - return; + running = false; + } else { + e.printStackTrace(); } - e.printStackTrace(); } catch (EOFException e) { - return; + running = false; } catch (IOException e) { e.printStackTrace(); } } + if (shutdownHook != null) shutdownHook.run(); } } diff --git a/core/src/main/java/nl/andrewl/aos_core/net/UdpReceiver.java b/core/src/main/java/nl/andrewl/aos_core/net/UdpReceiver.java index 27b3e44..aed09fe 100644 --- a/core/src/main/java/nl/andrewl/aos_core/net/UdpReceiver.java +++ b/core/src/main/java/nl/andrewl/aos_core/net/UdpReceiver.java @@ -9,6 +9,10 @@ import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.SocketException; +/** + * A runnable that receives UDP packets from a datagram socket and relays the + * messages to a {@link UdpMessageHandler}. + */ public class UdpReceiver implements Runnable { public static final short MAX_PACKET_SIZE = 1400; @@ -31,14 +35,17 @@ public class UdpReceiver implements Runnable { handler.handle(msg, packet); } catch (SocketException e) { if (e.getMessage().equals("Socket closed")) { - return; + System.out.println("Socket closed!"); + break; } e.printStackTrace(); } catch (EOFException e) { - return; + System.out.println("EOF!"); + break; } catch (IOException e) { e.printStackTrace(); } } + System.out.println("UDP receiver shut down."); } } diff --git a/server/src/main/java/nl/andrewl/aos2_server/ClientCommunicationHandler.java b/server/src/main/java/nl/andrewl/aos2_server/ClientCommunicationHandler.java index 7000906..b7c34b8 100644 --- a/server/src/main/java/nl/andrewl/aos2_server/ClientCommunicationHandler.java +++ b/server/src/main/java/nl/andrewl/aos2_server/ClientCommunicationHandler.java @@ -2,10 +2,7 @@ package nl.andrewl.aos2_server; import nl.andrewl.aos_core.Net; import nl.andrewl.aos_core.model.Player; -import nl.andrewl.aos_core.net.ConnectAcceptMessage; -import nl.andrewl.aos_core.net.ConnectRejectMessage; -import nl.andrewl.aos_core.net.ConnectRequestMessage; -import nl.andrewl.aos_core.net.TcpReceiver; +import nl.andrewl.aos_core.net.*; import nl.andrewl.record_net.Message; import nl.andrewl.record_net.util.ExtendedDataInputStream; import nl.andrewl.record_net.util.ExtendedDataOutputStream; @@ -16,6 +13,14 @@ import java.net.DatagramSocket; import java.net.InetAddress; import java.net.Socket; +/** + * Component which manages the establishing and maintenance of a connection + * to a single client. This involves waiting for the client to send their + * first {@link ConnectRequestMessage}, so that we can respond with either a + * {@link ConnectRejectMessage} or {@link ConnectAcceptMessage}. If the player + * is accepted, we proceed to register the player and begin receiving messages + * from them. + */ public class ClientCommunicationHandler { private final Server server; private final Socket socket; @@ -33,18 +38,22 @@ public class ClientCommunicationHandler { this.datagramSocket = datagramSocket; this.in = Net.getInputStream(socket.getInputStream()); this.out = Net.getOutputStream(socket.getOutputStream()); - establishConnection(); - new Thread(new TcpReceiver(in, this::handleTcpMessage)).start(); } public void shutdown() { try { - socket.close(); + if (!socket.isClosed()) socket.close(); } catch (IOException e) { e.printStackTrace(); } } + /** + * Used to set UDP port once we know it, since the client first sends their + * connection request, then we accept, and then the client begins + * the UDP communication. + * @param port The client's port. + */ public void setClientUdpPort(int port) { this.clientUdpPort = port; } @@ -53,7 +62,7 @@ public class ClientCommunicationHandler { System.out.println("Message received from client " + player.getUsername() + ": " + msg); } - private void establishConnection() throws IOException { + public void establishConnection() throws IOException { socket.setSoTimeout(1000); boolean connectionEstablished = false; int attempts = 0; @@ -64,10 +73,21 @@ public class ClientCommunicationHandler { // Try to set the TCP timeout back to 0 now that we've got the correct request. socket.setSoTimeout(0); this.clientAddress = socket.getInetAddress(); - System.out.println("Player connected: " + connectMsg.username()); connectionEstablished = true; this.player = server.registerPlayer(this, connectMsg.username()); Net.write(new ConnectAcceptMessage(player.getId()), out); + System.out.println("Sent connect accept message."); + + System.out.println("Sending world data..."); + for (var chunk : server.getWorld().getChunkMap().values()) { + sendTcpMessage(new ChunkDataMessage(chunk)); + } + System.out.println("Sent all world data."); + + // Initiate a TCP receiver thread to accept incoming messages from the client. + TcpReceiver tcpReceiver = new TcpReceiver(in, this::handleTcpMessage) + .withShutdownHook(() -> server.deregisterPlayer(this.player)); + new Thread(tcpReceiver).start(); } } catch (IOException e) { e.printStackTrace(); @@ -85,6 +105,14 @@ public class ClientCommunicationHandler { } } + public void sendTcpMessage(Message msg) { + try { + Net.write(msg, out); + } catch (IOException e) { + e.printStackTrace(); + } + } + public void sendDatagramPacket(Message msg) { try { sendDatagramPacket(Net.write(msg)); diff --git a/server/src/main/java/nl/andrewl/aos2_server/Server.java b/server/src/main/java/nl/andrewl/aos2_server/Server.java index 365f82e..b5075a3 100644 --- a/server/src/main/java/nl/andrewl/aos2_server/Server.java +++ b/server/src/main/java/nl/andrewl/aos2_server/Server.java @@ -1,16 +1,18 @@ package nl.andrewl.aos2_server; +import nl.andrewl.aos_core.model.Chunk; import nl.andrewl.aos_core.model.Player; +import nl.andrewl.aos_core.model.World; import nl.andrewl.aos_core.net.UdpReceiver; import nl.andrewl.aos_core.net.udp.DatagramInit; import nl.andrewl.record_net.Message; import java.io.IOException; import java.net.*; +import java.util.Arrays; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.Set; +import java.util.concurrent.ForkJoinPool; public class Server implements Runnable { private final ServerSocket serverSocket; @@ -18,18 +20,29 @@ public class Server implements Runnable { private volatile boolean running; private int nextClientId = 1; - private final Set clientHandlers; private final Map players; private final Map playerClientHandlers; + private final World world; public Server() throws IOException { this.serverSocket = new ServerSocket(24464, 5); this.serverSocket.setReuseAddress(true); this.datagramSocket = new DatagramSocket(24464); this.datagramSocket.setReuseAddress(true); - this.clientHandlers = new HashSet<>(); this.players = new HashMap<>(); this.playerClientHandlers = new HashMap<>(); + + // Generate world. TODO: do this elsewhere. + this.world = new World(); + for (int x = -5; x <= 5; x++) { + for (int y = 0; y <= 3; y++) { + for (int z = -3; z <= 3; z++) { + Chunk chunk = new Chunk(x, y, z); + Arrays.fill(chunk.getBlocks(), (byte) 40); + world.addChunk(chunk); + } + } + } } @Override @@ -40,8 +53,15 @@ public class Server implements Runnable { while (running) { acceptClientConnection(); } + for (var player : players.values()) { + deregisterPlayer(player); + } datagramSocket.close(); - for (var handler : clientHandlers) handler.shutdown(); + try { + serverSocket.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } } public void handleUdpMessage(Message msg, DatagramPacket packet) { @@ -63,15 +83,33 @@ public class Server implements Runnable { return player; } + public synchronized void deregisterPlayer(Player player) { + ClientCommunicationHandler handler = playerClientHandlers.get(player.getId()); + handler.shutdown(); + players.remove(player.getId()); + playerClientHandlers.remove(player.getId()); + System.out.println("Deregistered player " + player.getUsername() + " with id " + player.getId()); + } + public ClientCommunicationHandler getHandler(int id) { return playerClientHandlers.get(id); } + public World getWorld() { + return world; + } + private void acceptClientConnection() { try { Socket clientSocket = serverSocket.accept(); - ClientCommunicationHandler handler = new ClientCommunicationHandler(this, clientSocket, datagramSocket); - clientHandlers.add(handler); + var handler = new ClientCommunicationHandler(this, clientSocket, datagramSocket); + ForkJoinPool.commonPool().submit(() -> { + try { + handler.establishConnection(); + } catch (IOException e) { + e.printStackTrace(); + } + }); } catch (IOException e) { if (e instanceof SocketException && !this.running && e.getMessage().equalsIgnoreCase("Socket closed")) { return; // Ignore this exception, since it is expected on shutdown.