Implemented private channels for N-person direct messaging.

This commit is contained in:
Andrew Lalis 2021-09-08 23:52:27 +02:00
parent 2d8a0967dc
commit 17a372a5b7
41 changed files with 727 additions and 307 deletions

View File

@ -10,9 +10,9 @@ import com.googlecode.lanterna.terminal.Terminal;
import lombok.Getter;
import nl.andrewl.concord_client.event.EventManager;
import nl.andrewl.concord_client.event.handlers.ChannelMovedHandler;
import nl.andrewl.concord_client.event.handlers.ChannelUsersResponseHandler;
import nl.andrewl.concord_client.event.handlers.ChatHistoryResponseHandler;
import nl.andrewl.concord_client.event.handlers.ServerMetaDataHandler;
import nl.andrewl.concord_client.event.handlers.ServerUsersHandler;
import nl.andrewl.concord_client.gui.MainWindow;
import nl.andrewl.concord_client.model.ClientModel;
import nl.andrewl.concord_core.msg.Message;
@ -48,7 +48,7 @@ public class ConcordClient implements Runnable {
// Add event listeners.
this.eventManager.addHandler(MoveToChannel.class, new ChannelMovedHandler());
this.eventManager.addHandler(ChannelUsersResponse.class, new ChannelUsersResponseHandler());
this.eventManager.addHandler(ServerUsers.class, new ServerUsersHandler());
this.eventManager.addHandler(ChatHistoryResponse.class, new ChatHistoryResponseHandler());
this.eventManager.addHandler(Chat.class, (msg, client) -> client.getModel().getChatHistory().addChat(msg));
this.eventManager.addHandler(ServerMetaData.class, new ServerMetaDataHandler());
@ -70,10 +70,9 @@ public class ConcordClient implements Runnable {
this.serializer.writeMessage(new Identification(nickname), this.out);
Message reply = this.serializer.readMessage(this.in);
if (reply instanceof ServerWelcome welcome) {
var model = new ClientModel(welcome.getClientId(), nickname, welcome.getCurrentChannelId(), welcome.getMetaData());
var model = new ClientModel(welcome.getClientId(), nickname, welcome.getCurrentChannelId(), welcome.getCurrentChannelName(), welcome.getMetaData());
// Start fetching initial data for the channel we were initially put into.
this.sendMessage(new ChannelUsersRequest(this.model.getCurrentChannelId()));
this.sendMessage(new ChatHistoryRequest(this.model.getCurrentChannelId(), ""));
this.sendMessage(new ChatHistoryRequest(model.getCurrentChannelId(), ""));
return model;
} else {
throw new IOException("Unexpected response from the server after sending identification message.");

View File

@ -7,7 +7,7 @@ import java.util.List;
import java.util.UUID;
public interface ClientModelListener {
default void channelMoved(UUID oldChannelId, UUID newChannelId) {}
default void channelMoved(UUID oldChannelId, String oldChannelName, UUID newChannelId, String newChannelName) {}
default void usersUpdated(List<UserData> users) {}

View File

@ -2,7 +2,6 @@ package nl.andrewl.concord_client.event.handlers;
import nl.andrewl.concord_client.ConcordClient;
import nl.andrewl.concord_client.event.MessageHandler;
import nl.andrewl.concord_core.msg.types.ChannelUsersRequest;
import nl.andrewl.concord_core.msg.types.ChatHistoryRequest;
import nl.andrewl.concord_core.msg.types.MoveToChannel;
@ -15,8 +14,7 @@ import nl.andrewl.concord_core.msg.types.MoveToChannel;
public class ChannelMovedHandler implements MessageHandler<MoveToChannel> {
@Override
public void handle(MoveToChannel msg, ConcordClient client) throws Exception {
client.getModel().setCurrentChannelId(msg.getChannelId());
client.sendMessage(new ChatHistoryRequest(msg.getChannelId(), ""));
client.sendMessage(new ChannelUsersRequest(msg.getChannelId()));
client.getModel().setCurrentChannel(msg.getId(), msg.getChannelName());
client.sendMessage(new ChatHistoryRequest(msg.getId(), ""));
}
}

View File

@ -6,7 +6,7 @@ import nl.andrewl.concord_core.msg.types.ChatHistoryResponse;
public class ChatHistoryResponseHandler implements MessageHandler<ChatHistoryResponse> {
@Override
public void handle(ChatHistoryResponse msg, ConcordClient client) throws Exception {
public void handle(ChatHistoryResponse msg, ConcordClient client) {
client.getModel().getChatHistory().setChats(msg.getMessages());
}
}

View File

@ -0,0 +1,12 @@
package nl.andrewl.concord_client.event.handlers;
import nl.andrewl.concord_client.ConcordClient;
import nl.andrewl.concord_client.event.MessageHandler;
import nl.andrewl.concord_core.msg.types.ServerUsers;
public class ServerUsersHandler implements MessageHandler<ServerUsers> {
@Override
public void handle(ServerUsers msg, ConcordClient client) {
client.getModel().setKnownUsers(msg.getUsers());
}
}

View File

@ -53,9 +53,7 @@ public class ChannelChatBox extends Panel {
}
public void refreshBorder() {
String name = client.getModel().getServerMetaData().getChannels().stream()
.filter(channelData -> channelData.getId().equals(client.getModel().getCurrentChannelId()))
.findAny().orElseThrow().getName();
String name = client.getModel().getCurrentChannelName();
if (this.chatBorder != null) this.removeComponent(this.chatBorder);
this.chatBorder = Borders.doubleLine("#" + name);
this.chatBorder.setComponent(this.chatList);

View File

@ -43,7 +43,7 @@ public class ServerPanel extends Panel implements ClientModelListener {
}
@Override
public void channelMoved(UUID oldChannelId, UUID newChannelId) {
public void channelMoved(UUID oldChannelId, String oldChannelName, UUID newChannelId, String newChannelName) {
this.getTextGUI().getGUIThread().invokeLater(() -> {
this.channelList.setChannels();
this.channelChatBox.getChatList().clearItems();

View File

@ -5,9 +5,10 @@ import com.googlecode.lanterna.gui2.Direction;
import com.googlecode.lanterna.gui2.LinearLayout;
import com.googlecode.lanterna.gui2.Panel;
import nl.andrewl.concord_client.ConcordClient;
import nl.andrewl.concord_core.msg.types.ChannelUsersResponse;
import nl.andrewl.concord_core.msg.types.MoveToChannel;
import nl.andrewl.concord_core.msg.types.UserData;
import java.io.IOException;
import java.util.List;
public class UserList extends Panel {
@ -22,7 +23,14 @@ public class UserList extends Panel {
this.removeAllComponents();
for (var user : usersResponse) {
Button b = new Button(user.getName(), () -> {
System.out.println("Opening DM channel with user " + user.getName() + ", id: " + user.getId());
if (!client.getModel().getId().equals(user.getId())) {
System.out.println("Opening DM channel with user " + user.getName() + ", id: " + user.getId());
try {
client.sendMessage(new MoveToChannel(user.getId()));
} catch (IOException e) {
e.printStackTrace();
}
}
});
this.addComponent(b);
}

View File

@ -17,25 +17,29 @@ public class ClientModel {
private ServerMetaData serverMetaData;
private UUID currentChannelId;
private String currentChannelName;
private List<UserData> knownUsers;
private final ChatHistory chatHistory;
private final List<ClientModelListener> modelListeners;
public ClientModel(UUID id, String nickname, UUID currentChannelId, ServerMetaData serverMetaData) {
public ClientModel(UUID id, String nickname, UUID currentChannelId, String currentChannelName, ServerMetaData serverMetaData) {
this.modelListeners = new CopyOnWriteArrayList<>();
this.id = id;
this.nickname = nickname;
this.currentChannelId = currentChannelId;
this.currentChannelName = currentChannelName;
this.serverMetaData = serverMetaData;
this.knownUsers = new ArrayList<>();
this.chatHistory = new ChatHistory();
}
public void setCurrentChannelId(UUID newChannelId) {
public void setCurrentChannel(UUID channelId, String channelName) {
UUID oldId = this.currentChannelId;
this.currentChannelId = newChannelId;
this.modelListeners.forEach(listener -> listener.channelMoved(oldId, newChannelId));
String oldName = this.currentChannelName;
this.currentChannelId = channelId;
this.currentChannelName = channelName;
this.modelListeners.forEach(listener -> listener.channelMoved(oldId, oldName, channelId, channelName));
}
public void setKnownUsers(List<UserData> users) {

View File

@ -18,11 +18,11 @@ public class MessageUtils {
/**
* Gets the number of bytes that the given string will occupy when it is
* serialized.
* @param s The string.
* @param s The string. This may be null.
* @return The number of bytes used to serialize the string.
*/
public static int getByteSize(String s) {
return Integer.BYTES + s.getBytes(StandardCharsets.UTF_8).length;
return Integer.BYTES + (s == null ? 0 : s.getBytes(StandardCharsets.UTF_8).length);
}
/**
@ -61,22 +61,35 @@ public class MessageUtils {
}
public static void writeEnum(Enum<?> value, DataOutputStream o) throws IOException {
o.writeInt(value.ordinal());
if (value == null) {
o.writeInt(-1);
} else {
o.writeInt(value.ordinal());
}
}
public static <T extends Enum<?>> T readEnum(Class<T> e, DataInputStream i) throws IOException {
int ordinal = i.readInt();
if (ordinal == -1) return null;
return e.getEnumConstants()[ordinal];
}
public static void writeUUID(UUID value, DataOutputStream o) throws IOException {
o.writeLong(value.getMostSignificantBits());
o.writeLong(value.getLeastSignificantBits());
if (value == null) {
o.writeLong(-1);
o.writeLong(-1);
} else {
o.writeLong(value.getMostSignificantBits());
o.writeLong(value.getLeastSignificantBits());
}
}
public static UUID readUUID(DataInputStream i) throws IOException {
long a = i.readLong();
long b = i.readLong();
if (a == -1 && b == -1) {
return null;
}
return new UUID(a, b);
}
@ -95,15 +108,19 @@ public class MessageUtils {
}
}
public static <T extends Message> List<T> readList(Class<T> type, DataInputStream i) throws IOException, ReflectiveOperationException {
public static <T extends Message> List<T> readList(Class<T> type, DataInputStream i) throws IOException {
int size = i.readInt();
var constructor = type.getConstructor();
List<T> items = new ArrayList<>(size);
for (int k = 0; k < size; k++) {
var item = constructor.newInstance();
item.read(i);
items.add(item);
try {
var constructor = type.getConstructor();
List<T> items = new ArrayList<>(size);
for (int k = 0; k < size; k++) {
var item = constructor.newInstance();
item.read(i);
items.add(item);
}
return items;
} catch (ReflectiveOperationException e) {
throw new IOException(e);
}
return items;
}
}

View File

@ -35,9 +35,10 @@ public class Serializer {
registerType(4, ChatHistoryRequest.class);
registerType(5, ChatHistoryResponse.class);
registerType(6, ChannelUsersRequest.class);
registerType(7, ChannelUsersResponse.class);
registerType(7, ServerUsers.class);
registerType(8, ServerMetaData.class);
registerType(9, Error.class);
registerType(10, CreateThread.class);
}
/**

View File

@ -15,6 +15,7 @@ import static nl.andrewl.concord_core.msg.MessageUtils.*;
@Data
@NoArgsConstructor
@AllArgsConstructor
@Deprecated
public class ChannelUsersRequest implements Message {
private UUID channelId;

View File

@ -17,10 +17,12 @@ import static nl.andrewl.concord_core.msg.MessageUtils.*;
* the users in the channel that a client is in has changed. For example, when
* a user leaves a channel, all others in that channel will be sent this message
* to indicate that update.
* @deprecated Clients will be updated via a {@link ServerUsers} message.
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Deprecated
public class ChannelUsersResponse implements Message {
private List<UserData> users;
@ -36,10 +38,6 @@ public class ChannelUsersResponse implements Message {
@Override
public void read(DataInputStream i) throws IOException {
try {
this.users = readList(UserData.class, i);
} catch (ReflectiveOperationException e) {
throw new IOException(e);
}
this.users = readList(UserData.class, i);
}
}

View File

@ -1,5 +1,6 @@
package nl.andrewl.concord_core.msg.types;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import nl.andrewl.concord_core.msg.Message;
@ -7,6 +8,7 @@ import nl.andrewl.concord_core.msg.Message;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Objects;
import java.util.UUID;
import static nl.andrewl.concord_core.msg.MessageUtils.*;
@ -16,13 +18,18 @@ import static nl.andrewl.concord_core.msg.MessageUtils.*;
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Chat implements Message {
private static final long ID_NONE = 0;
private UUID id;
private UUID senderId;
private String senderNickname;
private long timestamp;
private String message;
public Chat(UUID senderId, String senderNickname, long timestamp, String message) {
this.id = null;
this.senderId = senderId;
this.senderNickname = senderNickname;
this.timestamp = timestamp;
@ -40,6 +47,7 @@ public class Chat implements Message {
@Override
public void write(DataOutputStream o) throws IOException {
writeUUID(this.id, o);
writeUUID(this.senderId, o);
writeString(this.senderNickname, o);
o.writeLong(this.timestamp);
@ -48,6 +56,7 @@ public class Chat implements Message {
@Override
public void read(DataInputStream i) throws IOException {
this.id = readUUID(i);
this.senderId = readUUID(i);
this.senderNickname = readString(i);
this.timestamp = i.readLong();
@ -63,6 +72,7 @@ public class Chat implements Message {
public boolean equals(Object o) {
if (o.getClass().equals(this.getClass())) {
Chat other = (Chat) o;
if (Objects.equals(this.getId(), other.getId())) return true;
return this.getSenderId().equals(other.getSenderId()) &&
this.getTimestamp() == other.getTimestamp() &&
this.getSenderNickname().equals(other.getSenderNickname()) &&

View File

@ -41,6 +41,9 @@ import static nl.andrewl.concord_core.msg.MessageUtils.*;
* <li><code>to</code> - ISO-8601 timestamp indicating the timestamp
* before which messages should be fetched. Only messages before this
* point in time are returned.</li>
* <li><code>id</code> - A single message id to fetch. If this parameter
* is present, all others are ignored, and a list containing the single
* message is returned, if it could be found, otherwise an empty list.</li>
* </ul>
* </p>
* <p>

View File

@ -0,0 +1,56 @@
package nl.andrewl.concord_core.msg.types;
import lombok.Data;
import lombok.NoArgsConstructor;
import nl.andrewl.concord_core.msg.Message;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.UUID;
import static nl.andrewl.concord_core.msg.MessageUtils.*;
/**
* This message is sent by clients when they indicate that they would like to
* create a new thread in their current channel.
* <p>
* Conversely, this message is also sent by the server when a thread has
* been created by someone, and all clients need to be notified so that they
* can properly display to the user that a message has been turned into a
* thread.
* </p>
*/
@Data
@NoArgsConstructor
public class CreateThread implements Message {
/**
* The id of the message from which the thread will be created. This will
* serve as the entry point of the thread, and the unique identifier for the
* thread.
*/
private UUID messageId;
/**
* The title for the thread. This may be null, in which case the thread does
* not have any title.
*/
private String title;
@Override
public int getByteCount() {
return UUID_BYTES + getByteSize(title);
}
@Override
public void write(DataOutputStream o) throws IOException {
writeUUID(this.messageId, o);
writeString(this.title, o);
}
@Override
public void read(DataInputStream i) throws IOException {
this.messageId = readUUID(i);
this.title = readString(i);
}
}

View File

@ -1,5 +1,6 @@
package nl.andrewl.concord_core.msg.types;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import nl.andrewl.concord_core.msg.Message;
@ -19,28 +20,48 @@ import static nl.andrewl.concord_core.msg.MessageUtils.*;
* Conversely, a client can send this request to the server to indicate that
* they would like to switch to the specified channel.
* </p>
* <p>
* Clients can also send this message and provide the id of another client
* to request that they enter a private message channel with the referenced
* client.
* </p>
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MoveToChannel implements Message {
private UUID channelId;
/**
* The id of the channel that the client is requesting or being moved to, or
* the id of another client that the user wishes to begin private messaging
* with.
*/
private UUID id;
/**
* The name of the channel that the client is moved to. This is null in
* cases where the client is requesting to move to a channel, and is only
* provided by the server when it moves a client.
*/
private String channelName;
public MoveToChannel(UUID channelId) {
this.channelId = channelId;
this.id = channelId;
}
@Override
public int getByteCount() {
return UUID_BYTES;
return UUID_BYTES + getByteSize(this.channelName);
}
@Override
public void write(DataOutputStream o) throws IOException {
writeUUID(this.channelId, o);
writeUUID(this.id, o);
writeString(this.channelName, o);
}
@Override
public void read(DataInputStream i) throws IOException {
this.channelId = readUUID(i);
this.id = readUUID(i);
this.channelName = readString(i);
}
}

View File

@ -13,6 +13,11 @@ import java.util.UUID;
import static nl.andrewl.concord_core.msg.MessageUtils.*;
/**
* Metadata is sent by the server to clients to inform them of the structure of
* the server. This includes basic information about the server's own properties
* as well as information about all top-level channels.
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@ -34,13 +39,13 @@ public class ServerMetaData implements Message {
@Override
public void read(DataInputStream i) throws IOException {
this.name = readString(i);
try {
this.channels = readList(ChannelData.class, i);
} catch (ReflectiveOperationException e) {
throw new IOException("Reflection exception", e);
}
this.channels = readList(ChannelData.class, i);
}
/**
* Metadata about a top-level channel in the server which is visible and
* joinable for a user.
*/
@Data
@NoArgsConstructor
@AllArgsConstructor

View File

@ -0,0 +1,40 @@
package nl.andrewl.concord_core.msg.types;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import nl.andrewl.concord_core.msg.Message;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.List;
import static nl.andrewl.concord_core.msg.MessageUtils.*;
/**
* This message is sent from the server to the client whenever a change happens
* which requires the server to notify clients about a change of the list of
* global users.
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ServerUsers implements Message {
private List<UserData> users;
@Override
public int getByteCount() {
return getByteSize(this.users);
}
@Override
public void write(DataOutputStream o) throws IOException {
writeList(this.users, o);
}
@Override
public void read(DataInputStream i) throws IOException {
this.users = readList(UserData.class, i);
}
}

View File

@ -22,17 +22,19 @@ import static nl.andrewl.concord_core.msg.MessageUtils.*;
public class ServerWelcome implements Message {
private UUID clientId;
private UUID currentChannelId;
private String currentChannelName;
private ServerMetaData metaData;
@Override
public int getByteCount() {
return 2 * UUID_BYTES + this.metaData.getByteCount();
return 2 * UUID_BYTES + getByteSize(this.currentChannelName) + this.metaData.getByteCount();
}
@Override
public void write(DataOutputStream o) throws IOException {
writeUUID(this.clientId, o);
writeUUID(this.currentChannelId, o);
writeString(this.currentChannelName, o);
this.metaData.write(o);
}
@ -41,6 +43,7 @@ public class ServerWelcome implements Message {
this.clientId = readUUID(i);
this.currentChannelId = readUUID(i);
this.metaData = new ServerMetaData();
this.currentChannelName = readString(i);
this.metaData.read(i);
}
}

View File

@ -1,74 +0,0 @@
package nl.andrewl.concord_server;
import nl.andrewl.concord_core.msg.types.MoveToChannel;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
public class ChannelManager {
private final ConcordServer server;
private final Map<String, Channel> channelNameMap;
private final Map<UUID, Channel> channelIdMap;
public ChannelManager(ConcordServer server) {
this.server = server;
this.channelNameMap = new ConcurrentHashMap<>();
this.channelIdMap = new ConcurrentHashMap<>();
// Initialize the channels according to what's defined in the server's config.
for (var channelConfig : server.getConfig().getChannels()) {
this.addChannel(new Channel(
server,
UUID.fromString(channelConfig.getId()),
channelConfig.getName(),
server.getDb().getCollection("channel-" + channelConfig.getId())
));
}
}
public Set<Channel> getChannels() {
return Set.copyOf(this.channelIdMap.values());
}
public Optional<Channel> getDefaultChannel() {
var optionalGeneral = this.getChannelByName("general");
if (optionalGeneral.isPresent()) {
return optionalGeneral;
}
for (var channel : this.getChannels()) {
return Optional.of(channel);
}
return Optional.empty();
}
public void addChannel(Channel channel) {
this.channelNameMap.put(channel.getName(), channel);
this.channelIdMap.put(channel.getId(), channel);
}
public void removeChannel(Channel channel) {
this.channelNameMap.remove(channel.getName());
this.channelIdMap.remove(channel.getId());
}
public Optional<Channel> getChannelByName(String name) {
return Optional.ofNullable(this.channelNameMap.get(name));
}
public Optional<Channel> getChannelById(UUID id) {
return Optional.ofNullable(this.channelIdMap.get(id));
}
public void moveToChannel(ClientThread client, Channel channel) {
if (client.getCurrentChannel() != null) {
var previousChannel = client.getCurrentChannel();
previousChannel.removeClient(client);
}
channel.addClient(client);
client.setCurrentChannel(channel);
client.sendToClient(new MoveToChannel(channel.getId()));
System.out.println("Moved client " + client + " to channel " + channel);
}
}

View File

@ -1,4 +0,0 @@
package nl.andrewl.concord_server;
public class ChatThread {
}

View File

@ -1,26 +1,27 @@
package nl.andrewl.concord_server;
import lombok.Getter;
import nl.andrewl.concord_core.msg.Message;
import nl.andrewl.concord_core.msg.Serializer;
import nl.andrewl.concord_core.msg.types.Identification;
import nl.andrewl.concord_core.msg.types.ServerMetaData;
import nl.andrewl.concord_core.msg.types.ServerWelcome;
import nl.andrewl.concord_core.msg.types.UserData;
import nl.andrewl.concord_server.channel.ChannelManager;
import nl.andrewl.concord_server.cli.ServerCli;
import nl.andrewl.concord_server.client.ClientManager;
import nl.andrewl.concord_server.client.ClientThread;
import nl.andrewl.concord_server.config.ServerConfig;
import nl.andrewl.concord_server.event.EventManager;
import nl.andrewl.concord_server.util.IdProvider;
import nl.andrewl.concord_server.util.UUIDProvider;
import org.dizitart.no2.Nitrite;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.file.Path;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
@ -30,7 +31,6 @@ public class ConcordServer implements Runnable {
private static final Path CONFIG_FILE = Path.of("server-config.json");
private static final Path DATABASE_FILE = Path.of("concord-server.db");
private final Map<UUID, ClientThread> clients;
private volatile boolean running;
private final ServerSocket serverSocket;
@ -79,6 +79,12 @@ public class ConcordServer implements Runnable {
@Getter
private final ChannelManager channelManager;
/**
* Manager that handles the collection of clients connected to this server.
*/
@Getter
private final ClientManager clientManager;
private final DiscoveryServerPublisher discoveryServerPublisher;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
@ -87,51 +93,13 @@ public class ConcordServer implements Runnable {
this.config = ServerConfig.loadOrCreate(CONFIG_FILE, idProvider);
this.discoveryServerPublisher = new DiscoveryServerPublisher(this.config);
this.db = Nitrite.builder().filePath(DATABASE_FILE.toFile()).openOrCreate();
this.clients = new ConcurrentHashMap<>(32);
this.eventManager = new EventManager(this);
this.channelManager = new ChannelManager(this);
this.clientManager = new ClientManager(this);
this.serverSocket = new ServerSocket(this.config.getPort());
this.serializer = new Serializer();
}
/**
* Registers a new client as connected to the server. This is done once the
* client thread has received the correct identification information from
* the client. The server will register the client in its global set of
* connected clients, and it will immediately move the client to the default
* channel.
* @param identification The client's identification data.
* @param clientThread The client manager thread.
*/
public void registerClient(Identification identification, ClientThread clientThread) {
var id = this.idProvider.newId();
System.out.printf("Client \"%s\" joined with id %s.\n", identification.getNickname(), id);
this.clients.put(id, clientThread);
clientThread.setClientId(id);
clientThread.setClientNickname(identification.getNickname());
// Immediately add the client to the default channel and send the initial welcome message.
var defaultChannel = this.channelManager.getDefaultChannel().orElseThrow();
clientThread.sendToClient(new ServerWelcome(id, defaultChannel.getId(), this.getMetaData()));
// It is important that we send the welcome message first. The client expects this as the initial response to their identification message.
defaultChannel.addClient(clientThread);
clientThread.setCurrentChannel(defaultChannel);
System.out.println("Moved client " + clientThread + " to " + defaultChannel);
}
/**
* De-registers a client from the server, removing them from any channel
* they're currently in.
* @param clientId The id of the client to remove.
*/
public void deregisterClient(UUID clientId) {
var client = this.clients.remove(clientId);
if (client != null) {
client.getCurrentChannel().removeClient(client);
client.shutdown();
System.out.println("Client " + client + " has disconnected.");
}
}
/**
* @return True if the server is currently running, meaning it is accepting
* connections, or false otherwise.
@ -155,13 +123,6 @@ public class ConcordServer implements Runnable {
}
}
public List<UserData> getClients() {
return this.clients.values().stream()
.sorted(Comparator.comparing(ClientThread::getClientNickname))
.map(ClientThread::toData)
.collect(Collectors.toList());
}
public ServerMetaData getMetaData() {
return new ServerMetaData(
this.config.getName(),
@ -172,24 +133,6 @@ public class ConcordServer implements Runnable {
);
}
/**
* Sends a message to every connected client, ignoring any channels. All
* clients connected to this server will receive this message.
* @param message The message to send.
*/
public void broadcast(Message message) {
ByteArrayOutputStream baos = new ByteArrayOutputStream(message.getByteCount());
try {
this.serializer.writeMessage(message, baos);
byte[] data = baos.toByteArray();
for (var client : this.clients.values()) {
client.sendToClient(data);
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* Shuts down the server cleanly by doing the following things:
* <ol>
@ -201,8 +144,8 @@ public class ConcordServer implements Runnable {
*/
private void shutdown() {
System.out.println("Shutting down the server.");
for (var clientId : this.clients.keySet()) {
this.deregisterClient(clientId);
for (var clientId : this.clientManager.getConnectedIds()) {
this.clientManager.deregisterClient(clientId);
}
this.scheduledExecutorService.shutdown();
this.executorService.shutdown();
@ -231,12 +174,16 @@ public class ConcordServer implements Runnable {
ClientThread clientThread = new ClientThread(socket, this);
clientThread.start();
} catch (IOException e) {
System.err.println("Could not accept new client connection: " + e.getMessage());
if (!e.getMessage().equalsIgnoreCase("socket closed")) {
System.err.println("Could not accept new client connection: " + e.getMessage());
}
}
}
this.shutdown();
}
public static void main(String[] args) throws IOException {
var server = new ConcordServer();
new Thread(server).start();

View File

@ -1,11 +1,11 @@
package nl.andrewl.concord_server;
package nl.andrewl.concord_server.channel;
import lombok.Getter;
import nl.andrewl.concord_core.msg.Message;
import nl.andrewl.concord_core.msg.Serializer;
import nl.andrewl.concord_core.msg.types.ChannelUsersResponse;
import nl.andrewl.concord_core.msg.types.UserData;
import org.dizitart.no2.IndexOptions;
import nl.andrewl.concord_server.ConcordServer;
import nl.andrewl.concord_server.client.ClientThread;
import nl.andrewl.concord_server.util.CollectionUtils;
import org.dizitart.no2.IndexType;
import org.dizitart.no2.NitriteCollection;
@ -16,51 +16,36 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* Represents a single communication area in which messages are sent by clients
* and received by all connected clients.
* and received by all connected clients. A channel is a top-level communication
* medium, and usually this is a server channel or private message between two
* clients in a server.
*/
@Getter
public class Channel {
private final ConcordServer server;
private UUID id;
private final UUID id;
private String name;
private final Set<ClientThread> connectedClients;
/**
* A document collection which holds all messages created in this channel,
* indexed on id, timestamp, message, and sender's nickname.
*/
private final NitriteCollection messageCollection;
public Channel(ConcordServer server, UUID id, String name, NitriteCollection messageCollection) {
public Channel(ConcordServer server, UUID id, String name) {
this.server = server;
this.id = id;
this.name = name;
this.connectedClients = ConcurrentHashMap.newKeySet();
this.messageCollection = messageCollection;
this.initCollection();
}
/**
* Initializes this channel's nitrite database collection, which involves
* creating any indexes that don't yet exist.
*/
private void initCollection() {
if (!this.messageCollection.hasIndex("timestamp")) {
System.out.println("Adding index on \"timestamp\" field to collection " + this.messageCollection.getName());
this.messageCollection.createIndex("timestamp", IndexOptions.indexOptions(IndexType.NonUnique));
}
if (!this.messageCollection.hasIndex("senderNickname")) {
System.out.println("Adding index on \"senderNickname\" field to collection " + this.messageCollection.getName());
this.messageCollection.createIndex("senderNickname", IndexOptions.indexOptions(IndexType.Fulltext));
}
if (!this.messageCollection.hasIndex("message")) {
System.out.println("Adding index on \"message\" field to collection " + this.messageCollection.getName());
this.messageCollection.createIndex("message", IndexOptions.indexOptions(IndexType.Fulltext));
}
var fields = List.of("timestamp", "senderNickname", "message");
for (var index : this.messageCollection.listIndices()) {
if (!fields.contains(index.getField())) {
System.out.println("Dropping unknown index " + index.getField() + " from collection " + index.getCollectionName());
this.messageCollection.dropIndex(index.getField());
}
}
this.messageCollection = server.getDb().getCollection("channel-" + id);
CollectionUtils.ensureIndexes(this.messageCollection, Map.of(
"timestamp", IndexType.NonUnique,
"senderNickname", IndexType.Fulltext,
"message", IndexType.Fulltext,
"id", IndexType.Unique
));
}
/**
@ -70,11 +55,11 @@ public class Channel {
*/
public void addClient(ClientThread clientThread) {
this.connectedClients.add(clientThread);
try {
this.sendMessage(new ChannelUsersResponse(this.getUserData()));
} catch (IOException e) {
e.printStackTrace();
}
// try {
// this.sendMessage(new ChannelUsersResponse(this.getUserData()));
// } catch (IOException e) {
// e.printStackTrace();
// }
}
/**
@ -84,11 +69,11 @@ public class Channel {
*/
public void removeClient(ClientThread clientThread) {
this.connectedClients.remove(clientThread);
try {
this.sendMessage(new ChannelUsersResponse(this.getUserData()));
} catch (IOException e) {
e.printStackTrace();
}
// try {
// this.sendMessage(new ChannelUsersResponse(this.getUserData()));
// } catch (IOException e) {
// e.printStackTrace();
// }
}
/**
@ -125,12 +110,13 @@ public class Channel {
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof Channel channel)) return false;
return name.equals(channel.name);
if (Objects.equals(this.id, channel.getId())) return true;
return Objects.equals(this.name, channel.getName());
}
@Override
public int hashCode() {
return Objects.hash(name);
return Objects.hash(id, name);
}
@Override

View File

@ -0,0 +1,174 @@
package nl.andrewl.concord_server.channel;
import nl.andrewl.concord_core.msg.types.MoveToChannel;
import nl.andrewl.concord_server.ConcordServer;
import nl.andrewl.concord_server.client.ClientThread;
import nl.andrewl.concord_server.util.CollectionUtils;
import org.dizitart.no2.Document;
import org.dizitart.no2.IndexType;
import org.dizitart.no2.NitriteCollection;
import org.dizitart.no2.filters.Filters;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* This manager is responsible for keeping track of all the channels in the
* server, and controlling modifications to them.
*/
public class ChannelManager {
private final ConcordServer server;
private final Map<String, Channel> channelNameMap;
private final Map<UUID, Channel> channelIdMap;
private final Map<Set<UUID>, Channel> privateChannels;
private final NitriteCollection privateChannelCollection;
public ChannelManager(ConcordServer server) {
this.server = server;
this.channelNameMap = new ConcurrentHashMap<>();
this.channelIdMap = new ConcurrentHashMap<>();
this.privateChannels = new ConcurrentHashMap<>();
this.privateChannelCollection = this.server.getDb().getCollection("private-channels");
CollectionUtils.ensureIndexes(this.privateChannelCollection, Map.of(
"idHash", IndexType.Unique,
"id", IndexType.Unique
));
// Initialize the channels according to what's defined in the server's config.
for (var channelConfig : server.getConfig().getChannels()) {
this.addChannel(new Channel(server, UUID.fromString(channelConfig.getId()), channelConfig.getName()));
}
}
public Set<Channel> getChannels() {
return Set.copyOf(this.channelIdMap.values());
}
public Optional<Channel> getDefaultChannel() {
var optionalDefault = this.getChannelByName(this.server.getConfig().getDefaultChannel());
if (optionalDefault.isPresent()) {
return optionalDefault;
}
System.err.println("Could not find a channel with the name \"" + this.server.getConfig().getDefaultChannel() + "\".");
for (var channel : this.getChannels()) {
return Optional.of(channel);
}
System.err.println("Could not find any channel to use as a default channel.");
return Optional.empty();
}
public void addChannel(Channel channel) {
this.channelNameMap.put(channel.getName(), channel);
this.channelIdMap.put(channel.getId(), channel);
}
public void removeChannel(Channel channel) {
this.channelNameMap.remove(channel.getName());
this.channelIdMap.remove(channel.getId());
}
public Optional<Channel> getChannelByName(String name) {
return Optional.ofNullable(this.channelNameMap.get(name));
}
public Optional<Channel> getChannelById(UUID id) {
return Optional.ofNullable(this.channelIdMap.get(id));
}
/**
* Moves a client to the given channel. This involves removing the client
* from whatever channel they're currently in, if any, moving them to the
* new channel, and sending them a message to indicate that it has been done.
* @param client The client to move.
* @param channel The channel to move the client to.
*/
public void moveToChannel(ClientThread client, Channel channel) {
if (client.getCurrentChannel() != null) {
var previousChannel = client.getCurrentChannel();
previousChannel.removeClient(client);
}
channel.addClient(client);
client.setCurrentChannel(channel);
client.sendToClient(new MoveToChannel(channel.getId(), channel.getName()));
System.out.println("Moved client " + client + " to channel " + channel);
}
/**
* Gets or creates a private channel for the given client ids to be able to
* communicate together. No other clients are allowed to access the channel.
* @param clientIds The id of each client which should have access to the
* channel.
* @return The private channel.
*/
public Channel getPrivateChannel(Set<UUID> clientIds) {
if (clientIds.size() < 2) {
throw new IllegalArgumentException("At least 2 client ids are required for a private channel.");
}
return this.privateChannels.computeIfAbsent(clientIds, this::getPrivateChannelFromDatabase);
}
/**
* Gets a private channel, given the id of a client who is part of the
* channel, and the id of the channel.
* @param clientId The id of the client that's requesting the channel.
* @param channelId The id of the private channel.
* @return The private channel.
*/
public Optional<Channel> getPrivateChannel(UUID clientId, UUID channelId) {
Channel privateChannel = this.privateChannels.entrySet().stream()
.filter(entry -> entry.getKey().contains(clientId) && entry.getValue().getId().equals(channelId))
.findAny().map(Map.Entry::getValue).orElse(null);
if (privateChannel == null) {
var cursor = this.privateChannelCollection.find(Filters.and(Filters.eq("id", channelId), Filters.in("clientIds", clientId)));
Document channelInfo = cursor.firstOrDefault();
if (channelInfo != null) {
privateChannel = new Channel(
this.server,
channelInfo.get("id", UUID.class),
channelInfo.get("name", String.class)
);
Set<UUID> clientIds = Set.of(channelInfo.get("clientIds", UUID[].class));
this.privateChannels.put(clientIds, privateChannel);
}
}
return Optional.ofNullable(privateChannel);
}
/**
* Gets and instantiates a private channel from information stored in the
* "private-channels" collection of the database, or creates it if it does
* not exist yet.
* @param clientIds The set of client ids that the channel is for.
* @return The private channel.
*/
private Channel getPrivateChannelFromDatabase(Set<UUID> clientIds) {
// First check if a private channel for these clients exists in the database.
String idHash = clientIds.stream().sorted().map(UUID::toString).collect(Collectors.joining());
var cursor = this.privateChannelCollection.find(Filters.eq("idHash", idHash));
Document channelInfo = cursor.firstOrDefault();
if (channelInfo != null) {
// If it does exist, instantiate a channel with its info.
return new Channel(
this.server,
channelInfo.get("id", UUID.class),
channelInfo.get("name", String.class)
);
} else {
// Otherwise, create the channel anew and save it in the collection.
var channel = new Channel(this.server, this.server.getIdProvider().newId(), "Private Channel");
channelInfo = new Document(Map.of(
"idHash", idHash,
"id", channel.getId(),
"name", channel.getName(),
"clientIds", clientIds.toArray(new UUID[0])
));
this.privateChannelCollection.insert(channelInfo);
System.out.println("Created new private channel for clients: " + clientIds);
return channel;
}
}
}

View File

@ -1,12 +1,15 @@
package nl.andrewl.concord_server.cli.command;
import nl.andrewl.concord_server.Channel;
import nl.andrewl.concord_server.ConcordServer;
import nl.andrewl.concord_server.channel.Channel;
import nl.andrewl.concord_server.cli.ServerCliCommand;
import nl.andrewl.concord_server.config.ServerConfig;
import java.util.UUID;
/**
* This command adds a new channel to the server.
*/
public class AddChannelCommand implements ServerCliCommand {
@Override
public void handle(ConcordServer server, String[] args) throws Exception {
@ -32,8 +35,7 @@ public class AddChannelCommand implements ServerCliCommand {
server.getConfig().getChannels().add(channelConfig);
server.getConfig().save();
var col = server.getDb().getCollection("channel-" + id);
server.getChannelManager().addChannel(new Channel(server, id, name, col));
server.broadcast(server.getMetaData());
server.getChannelManager().addChannel(new Channel(server, id, name));
server.getClientManager().broadcast(server.getMetaData());
}
}

View File

@ -3,10 +3,13 @@ package nl.andrewl.concord_server.cli.command;
import nl.andrewl.concord_server.ConcordServer;
import nl.andrewl.concord_server.cli.ServerCliCommand;
/**
* This command shows a list of all clients that are currently connected to the server.
*/
public class ListClientsCommand implements ServerCliCommand {
@Override
public void handle(ConcordServer server, String[] args) throws Exception {
var users = server.getClients();
var users = server.getClientManager().getClients();
if (users.isEmpty()) {
System.out.println("There are no connected clients.");
} else {

View File

@ -1,6 +1,6 @@
package nl.andrewl.concord_server.cli.command;
import nl.andrewl.concord_server.Channel;
import nl.andrewl.concord_server.channel.Channel;
import nl.andrewl.concord_server.ConcordServer;
import nl.andrewl.concord_server.cli.ServerCliCommand;
@ -38,7 +38,7 @@ public class RemoveChannelCommand implements ServerCliCommand {
server.getDb().getContext().dropCollection(channelToRemove.getMessageCollection().getName());
server.getConfig().getChannels().removeIf(channelConfig -> channelConfig.getName().equals(channelToRemove.getName()));
server.getConfig().save();
server.broadcast(server.getMetaData());
server.getClientManager().broadcast(server.getMetaData());
System.out.println("Removed the channel " + channelToRemove);
}
}

View File

@ -0,0 +1,101 @@
package nl.andrewl.concord_server.client;
import nl.andrewl.concord_core.msg.Message;
import nl.andrewl.concord_core.msg.types.Identification;
import nl.andrewl.concord_core.msg.types.ServerUsers;
import nl.andrewl.concord_core.msg.types.ServerWelcome;
import nl.andrewl.concord_core.msg.types.UserData;
import nl.andrewl.concord_server.ConcordServer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* The client manager is responsible for managing the set of clients connected
* to a server.
*/
public class ClientManager {
private final ConcordServer server;
private final Map<UUID, ClientThread> clients;
public ClientManager(ConcordServer server) {
this.server = server;
this.clients = new ConcurrentHashMap<>();
}
/**
* Registers a new client as connected to the server. This is done once the
* client thread has received the correct identification information from
* the client. The server will register the client in its global set of
* connected clients, and it will immediately move the client to the default
* channel.
* @param identification The client's identification data.
* @param clientThread The client manager thread.
*/
public void registerClient(Identification identification, ClientThread clientThread) {
var id = this.server.getIdProvider().newId();
System.out.printf("Client \"%s\" joined with id %s.\n", identification.getNickname(), id);
this.clients.put(id, clientThread);
clientThread.setClientId(id);
clientThread.setClientNickname(identification.getNickname());
// Immediately add the client to the default channel and send the initial welcome message.
var defaultChannel = this.server.getChannelManager().getDefaultChannel().orElseThrow();
clientThread.sendToClient(new ServerWelcome(id, defaultChannel.getId(), defaultChannel.getName(), this.server.getMetaData()));
// It is important that we send the welcome message first. The client expects this as the initial response to their identification message.
defaultChannel.addClient(clientThread);
clientThread.setCurrentChannel(defaultChannel);
System.out.println("Moved client " + clientThread + " to " + defaultChannel);
this.broadcast(new ServerUsers(this.getClients()));
}
/**
* De-registers a client from the server, removing them from any channel
* they're currently in.
* @param clientId The id of the client to remove.
*/
public void deregisterClient(UUID clientId) {
var client = this.clients.remove(clientId);
if (client != null) {
client.getCurrentChannel().removeClient(client);
client.shutdown();
System.out.println("Client " + client + " has disconnected.");
this.broadcast(new ServerUsers(this.getClients()));
}
}
/**
* Sends a message to every connected client, ignoring any channels. All
* clients connected to this server will receive this message.
* @param message The message to send.
*/
public void broadcast(Message message) {
ByteArrayOutputStream baos = new ByteArrayOutputStream(message.getByteCount());
try {
this.server.getSerializer().writeMessage(message, baos);
byte[] data = baos.toByteArray();
for (var client : this.clients.values()) {
client.sendToClient(data);
}
} catch (IOException e) {
e.printStackTrace();
}
}
public List<UserData> getClients() {
return this.clients.values().stream()
.sorted(Comparator.comparing(ClientThread::getClientNickname))
.map(ClientThread::toData)
.collect(Collectors.toList());
}
public Set<UUID> getConnectedIds() {
return this.clients.keySet();
}
public Optional<ClientThread> getClientById(UUID id) {
return Optional.ofNullable(this.clients.get(id));
}
}

View File

@ -1,10 +1,12 @@
package nl.andrewl.concord_server;
package nl.andrewl.concord_server.client;
import lombok.Getter;
import lombok.Setter;
import nl.andrewl.concord_core.msg.Message;
import nl.andrewl.concord_core.msg.types.Identification;
import nl.andrewl.concord_core.msg.types.UserData;
import nl.andrewl.concord_server.channel.Channel;
import nl.andrewl.concord_server.ConcordServer;
import java.io.DataInputStream;
import java.io.DataOutputStream;
@ -109,7 +111,7 @@ public class ClientThread extends Thread {
}
if (this.clientId != null) {
this.server.deregisterClient(this.clientId);
this.server.getClientManager().deregisterClient(this.clientId);
}
try {
if (!this.socket.isClosed()) {
@ -133,7 +135,7 @@ public class ClientThread extends Thread {
try {
var msg = this.server.getSerializer().readMessage(this.in);
if (msg instanceof Identification id) {
this.server.registerClient(id, this);
this.server.getClientManager().registerClient(id, this);
return true;
}
} catch (IOException e) {

View File

@ -5,7 +5,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import nl.andrewl.concord_server.IdProvider;
import nl.andrewl.concord_server.util.IdProvider;
import java.io.IOException;
import java.io.UncheckedIOException;
@ -23,6 +23,7 @@ public final class ServerConfig {
private int chatHistoryMaxCount;
private int chatHistoryDefaultCount;
private int maxMessageLength;
private String defaultChannel;
private List<ChannelConfig> channels;
private List<String> discoveryServers;
@ -53,6 +54,7 @@ public final class ServerConfig {
100,
50,
8192,
"general",
List.of(new ChannelConfig(idProvider.newId().toString(), "general", "Default channel for general discussion.")),
List.of(),
filePath

View File

@ -1,16 +1,36 @@
package nl.andrewl.concord_server.event;
import nl.andrewl.concord_core.msg.types.Error;
import nl.andrewl.concord_core.msg.types.MoveToChannel;
import nl.andrewl.concord_server.ClientThread;
import nl.andrewl.concord_server.ConcordServer;
import nl.andrewl.concord_server.client.ClientThread;
import java.util.List;
import java.util.Set;
/**
* Handles client requests to move to another channel.
* Handles client requests to move to another channel. We first check if the id
* which the client sent refers to a channel, in which case we move them to that
* channel. Otherwise, we look for a client with that id, and try to move the
* requester into a private channel with them.
*/
public class ChannelMoveHandler implements MessageHandler<MoveToChannel> {
@Override
public void handle(MoveToChannel msg, ClientThread client, ConcordServer server) {
var optionalChannel = server.getChannelManager().getChannelById(msg.getChannelId());
optionalChannel.ifPresent(channel -> server.getChannelManager().moveToChannel(client, channel));
var optionalChannel = server.getChannelManager().getChannelById(msg.getId());
if (optionalChannel.isPresent()) {
server.getChannelManager().moveToChannel(client, optionalChannel.get());
} else {
var optionalClient = server.getClientManager().getClientById(msg.getId());
if (optionalClient.isPresent()) {
var privateChannel = server.getChannelManager().getPrivateChannel(Set.of(
client.getClientId(),
optionalClient.get().getClientId()
));
server.getChannelManager().moveToChannel(client, privateChannel);
} else {
client.sendToClient(Error.warning("Unknown channel or client id."));
}
}
}
}

View File

@ -2,7 +2,7 @@ package nl.andrewl.concord_server.event;
import nl.andrewl.concord_core.msg.types.ChannelUsersRequest;
import nl.andrewl.concord_core.msg.types.ChannelUsersResponse;
import nl.andrewl.concord_server.ClientThread;
import nl.andrewl.concord_server.client.ClientThread;
import nl.andrewl.concord_server.ConcordServer;
public class ChannelUsersRequestHandler implements MessageHandler<ChannelUsersRequest> {

View File

@ -2,13 +2,18 @@ package nl.andrewl.concord_server.event;
import nl.andrewl.concord_core.msg.types.Chat;
import nl.andrewl.concord_core.msg.types.Error;
import nl.andrewl.concord_server.ClientThread;
import nl.andrewl.concord_server.ConcordServer;
import nl.andrewl.concord_server.client.ClientThread;
import org.dizitart.no2.Document;
import java.io.IOException;
import java.util.Map;
/**
* This handler is responsible for taking incoming chat messages and saving them
* to the channel's message collection, and then relaying the new message to all
* clients in the channel.
*/
public class ChatHandler implements MessageHandler<Chat> {
@Override
public void handle(Chat msg, ClientThread client, ConcordServer server) throws IOException {
@ -16,16 +21,22 @@ public class ChatHandler implements MessageHandler<Chat> {
client.getCurrentChannel().sendMessage(Error.warning("Message is too long."));
return;
}
server.getExecutorService().submit(() -> {
var collection = client.getCurrentChannel().getMessageCollection();
Document doc = new Document(Map.of(
"senderId", msg.getSenderId(),
"senderNickname", msg.getSenderNickname(),
"timestamp", msg.getTimestamp(),
"message", msg.getMessage()
));
collection.insert(doc);
});
/*
When we receive a message from the client, it will have a random UUID.
A compromised client could try and send a duplicate or otherwise
malicious UUID, so we overwrite it with a server-generated id which we
know is safe.
*/
msg.setId(server.getIdProvider().newId());
var collection = client.getCurrentChannel().getMessageCollection();
Document doc = new Document(Map.of(
"id", msg.getId(),
"senderId", msg.getSenderId(),
"senderNickname", msg.getSenderNickname(),
"timestamp", msg.getTimestamp(),
"message", msg.getMessage()
));
collection.insert(doc);
System.out.printf("#%s | %s: %s\n", client.getCurrentChannel(), client.getClientNickname(), msg.getMessage());
client.getCurrentChannel().sendMessage(msg);
}

View File

@ -3,9 +3,10 @@ package nl.andrewl.concord_server.event;
import nl.andrewl.concord_core.msg.types.Chat;
import nl.andrewl.concord_core.msg.types.ChatHistoryRequest;
import nl.andrewl.concord_core.msg.types.ChatHistoryResponse;
import nl.andrewl.concord_server.Channel;
import nl.andrewl.concord_server.ClientThread;
import nl.andrewl.concord_core.msg.types.Error;
import nl.andrewl.concord_server.ConcordServer;
import nl.andrewl.concord_server.channel.Channel;
import nl.andrewl.concord_server.client.ClientThread;
import org.dizitart.no2.*;
import org.dizitart.no2.filters.Filters;
@ -17,10 +18,21 @@ import java.util.*;
public class ChatHistoryRequestHandler implements MessageHandler<ChatHistoryRequest> {
@Override
public void handle(ChatHistoryRequest msg, ClientThread client, ConcordServer server) {
var optionalChannel = server.getChannelManager().getChannelById(msg.getChannelId());
if (optionalChannel.isPresent()) {
var channel = optionalChannel.get();
var params = msg.getQueryAsMap();
// First try and find a public channel with the given id.
var channel = server.getChannelManager().getChannelById(msg.getChannelId()).orElse(null);
if (channel == null) {
// Couldn't find a public channel, so look for a private channel this client is involved in.
channel = server.getChannelManager().getPrivateChannel(client.getClientId(), msg.getChannelId()).orElse(null);
}
// If we couldn't find a public or private channel, give up.
if (channel == null) {
client.sendToClient(Error.warning("Unknown channel id."));
return;
}
var params = msg.getQueryAsMap();
if (params.containsKey("id")) {
this.handleIdRequest(client, channel, params.get("id"));
} else {
Long count = this.getOrDefault(params, "count", (long) server.getConfig().getChatHistoryDefaultCount());
if (count > server.getConfig().getChatHistoryMaxCount()) {
return;
@ -31,14 +43,19 @@ public class ChatHistoryRequestHandler implements MessageHandler<ChatHistoryRequ
}
}
private Long getOrDefault(Map<String, String> params, String key, Long defaultValue) {
String value = params.get(key);
if (value == null) return defaultValue;
try {
return Long.parseLong(value);
} catch (NumberFormatException e) {
return defaultValue;
/**
* Handles a request for a single message from a channel.
* @param client The client who's requesting the data.
* @param channel The channel in which to search for the message.
* @param id The id of the message.
*/
private void handleIdRequest(ClientThread client, Channel channel, String id) {
var cursor = channel.getMessageCollection().find(Filters.eq("id", id));
List<Chat> chats = new ArrayList<>(1);
for (var doc : cursor) {
chats.add(this.read(doc));
}
client.sendToClient(new ChatHistoryResponse(channel.getId(), chats));
}
private ChatHistoryResponse getResponse(Channel channel, long count, Long from, Long to) {
@ -60,15 +77,43 @@ public class ChatHistoryRequestHandler implements MessageHandler<ChatHistoryRequ
List<Chat> chats = new ArrayList<>((int) count);
for (Document doc : cursor) {
chats.add(new Chat(
doc.get("senderId", UUID.class),
doc.get("senderNickname", String.class),
doc.get("timestamp", Long.class),
doc.get("message", String.class)
));
chats.add(this.read(doc));
}
col.close();
chats.sort(Comparator.comparingLong(Chat::getTimestamp));
return new ChatHistoryResponse(channel.getId(), chats);
}
/**
* Helper method to read a {@link Chat} from a document retrieved from a
* collection.
* @param doc The document to read.
* @return The chat that was read.
*/
private Chat read(Document doc) {
return new Chat(
doc.get("id", UUID.class),
doc.get("senderId", UUID.class),
doc.get("senderNickname", String.class),
doc.get("timestamp", Long.class),
doc.get("message", String.class)
);
}
/**
* Helper method to get a long value or fall back to a default.
* @param params The parameters to check.
* @param key The key to get the value for.
* @param defaultValue The default value to return if no value exists.
* @return The value that was found, or the default value.
*/
private Long getOrDefault(Map<String, String> params, String key, Long defaultValue) {
String value = params.get(key);
if (value == null) return defaultValue;
try {
return Long.parseLong(value);
} catch (NumberFormatException e) {
return defaultValue;
}
}
}

View File

@ -1,7 +1,7 @@
package nl.andrewl.concord_server.event;
import nl.andrewl.concord_core.msg.types.Chat;
import nl.andrewl.concord_server.ClientThread;
import nl.andrewl.concord_server.client.ClientThread;
import nl.andrewl.concord_server.ConcordServer;
public interface EventListener {

View File

@ -1,12 +1,12 @@
package nl.andrewl.concord_server;
package nl.andrewl.concord_server.event;
import lombok.extern.java.Log;
import nl.andrewl.concord_core.msg.Message;
import nl.andrewl.concord_core.msg.types.ChannelUsersRequest;
import nl.andrewl.concord_core.msg.types.Chat;
import nl.andrewl.concord_core.msg.types.ChatHistoryRequest;
import nl.andrewl.concord_core.msg.types.MoveToChannel;
import nl.andrewl.concord_server.event.*;
import nl.andrewl.concord_server.ConcordServer;
import nl.andrewl.concord_server.client.ClientThread;
import java.util.HashMap;
import java.util.Map;
@ -25,7 +25,6 @@ public class EventManager {
this.messageHandlers = new HashMap<>();
this.messageHandlers.put(Chat.class, new ChatHandler());
this.messageHandlers.put(MoveToChannel.class, new ChannelMoveHandler());
this.messageHandlers.put(ChannelUsersRequest.class, new ChannelUsersRequestHandler());
this.messageHandlers.put(ChatHistoryRequest.class, new ChatHistoryRequestHandler());
}

View File

@ -1,7 +1,7 @@
package nl.andrewl.concord_server.event;
import nl.andrewl.concord_core.msg.Message;
import nl.andrewl.concord_server.ClientThread;
import nl.andrewl.concord_server.client.ClientThread;
import nl.andrewl.concord_server.ConcordServer;
public interface MessageHandler<T extends Message> {

View File

@ -0,0 +1,32 @@
package nl.andrewl.concord_server.util;
import org.dizitart.no2.IndexOptions;
import org.dizitart.no2.IndexType;
import org.dizitart.no2.NitriteCollection;
import java.util.Map;
public class CollectionUtils {
/**
* Ensures that the given nitrite collection has exactly the given set of
* indexes. It will remove any non-conforming indexes, and create new ones
* as necessary.
* @param collection The collection to operate on.
* @param indexMap A mapping containing keys referring to fields, and values
* that represent the type of index that should be on that
* field.
*/
public static void ensureIndexes(NitriteCollection collection, Map<String, IndexType> indexMap) {
for (var index : collection.listIndices()) {
var entry = indexMap.get(index.getField());
if (entry == null || !index.getIndexType().equals(entry)) {
collection.dropIndex(index.getField());
}
}
for (var entry : indexMap.entrySet()) {
if (!collection.hasIndex(entry.getKey())) {
collection.createIndex(entry.getKey(), IndexOptions.indexOptions(entry.getValue()));
}
}
}
}

View File

@ -1,4 +1,4 @@
package nl.andrewl.concord_server;
package nl.andrewl.concord_server.util;
import java.util.UUID;

View File

@ -1,4 +1,4 @@
package nl.andrewl.concord_server;
package nl.andrewl.concord_server.util;
import java.util.UUID;