"""Chat server logic and state management.""" import random from datetime import datetime from typing import Dict, Set, List from .constants import USER_COLORS from .models import Channel, DirectMessage, UserConnection class ChatServer: """Central chat server managing users, channels, and messages.""" def __init__(self): self.users: Dict[str, UserConnection] = {} self.user_colors: Dict[str, str] = {} self.available_colors = USER_COLORS.copy() self.channels: Dict[str, Channel] = { 'global': Channel('global', 'system', is_private=False) } self.dms: Dict[frozenset, DirectMessage] = {} def get_user_color(self, username: str) -> str: if username not in self.user_colors: if not self.available_colors: self.available_colors = USER_COLORS.copy() color = random.choice(self.available_colors) self.available_colors.remove(color) self.user_colors[username] = color return self.user_colors[username] def release_user_color(self, username: str): if username in self.user_colors: color = self.user_colors[username] if color not in self.available_colors: self.available_colors.append(color) del self.user_colors[username] def add_user(self, username: str, connection: UserConnection): self.users[username] = connection self.get_user_color(username) self.channels['global'].add_member(username) self.broadcast_to_channel('global', { 'type': 'system', 'message': f"{username} has joined the chat", 'timestamp': datetime.now().strftime("%H:%M:%S") }) self.broadcast_user_list() def remove_user(self, username: str): if username in self.users: # Remove from all channels for channel in self.channels.values(): if channel.has_member(username): channel.remove_member(username) if channel.name != 'global' and len(channel.members) == 0: # Delete empty non-global channels pass del self.users[username] self.release_user_color(username) self.broadcast_to_channel('global', { 'type': 'system', 'message': f"{username} has left the chat", 'timestamp': datetime.now().strftime("%H:%M:%S") }) self.broadcast_user_list() def create_channel(self, name: str, creator: str, is_private: bool = False) -> bool: if name in self.channels: return False self.channels[name] = Channel(name, creator, is_private) return True def delete_channel(self, name: str, username: str) -> bool: if name not in self.channels or name == 'global': return False channel = self.channels[name] if channel.creator != username: return False # Notify all members self.broadcast_to_channel(name, { 'type': 'system', 'message': f"Channel #{name} has been deleted by {username}", 'timestamp': datetime.now().strftime("%H:%M:%S") }) del self.channels[name] return True def invite_to_channel(self, channel_name: str, inviter: str, invitee: str) -> tuple[bool, str]: if channel_name not in self.channels: return False, f"Channel #{channel_name} does not exist" channel = self.channels[channel_name] if not channel.has_member(inviter): return False, f"You are not a member of #{channel_name}" if invitee not in self.users: return False, f"User '{invitee}' is not online" if channel.has_member(invitee): return False, f"{invitee} is already in #{channel_name}" channel.invite(invitee) # Send invite notification to invitee if invitee in self.users: self.users[invitee].send_message({ 'type': 'info', 'message': f"{inviter} invited you to #{channel_name}. Use /join {channel_name} to accept", 'timestamp': datetime.now().strftime("%H:%M:%S") }) return True, f"Invited {invitee} to #{channel_name}" def join_channel(self, channel_name: str, username: str) -> tuple[bool, str]: if channel_name not in self.channels: return False, f"Channel #{channel_name} does not exist" channel = self.channels[channel_name] if channel.has_member(username): return False, f"You are already in #{channel_name}" if channel.is_private and not channel.is_invited(username): return False, f"#{channel_name} is private. You need an invitation to join" channel.add_member(username) self.broadcast_to_channel(channel_name, { 'type': 'system', 'message': f"{username} joined #{channel_name}", 'timestamp': datetime.now().strftime("%H:%M:%S") }) return True, f"Joined #{channel_name}" def leave_channel(self, channel_name: str, username: str) -> tuple[bool, str]: if channel_name == 'global': return False, "Cannot leave #global" if channel_name not in self.channels: return False, f"Channel #{channel_name} does not exist" channel = self.channels[channel_name] if not channel.has_member(username): return False, f"You are not in #{channel_name}" channel.remove_member(username) self.broadcast_to_channel(channel_name, { 'type': 'system', 'message': f"{username} left #{channel_name}", 'timestamp': datetime.now().strftime("%H:%M:%S") }) return True, f"Left #{channel_name}" def get_or_create_dm(self, user1: str, user2: str) -> DirectMessage: dm_key = frozenset([user1, user2]) if dm_key not in self.dms: self.dms[dm_key] = DirectMessage(user1, user2) return self.dms[dm_key] def send_dm(self, from_user: str, to_user: str, message: str): timestamp = datetime.now().strftime("%H:%M:%S") from_color = self.get_user_color(from_user) if to_user not in self.users: self.users[from_user].send_message({ 'type': 'error', 'message': f"User '{to_user}' is not online", 'timestamp': timestamp }) return dm = self.get_or_create_dm(from_user, to_user) msg_data = { 'type': 'dm', 'from': from_user, 'to': to_user, 'message': message, 'timestamp': timestamp, 'color': from_color } dm.add_message(msg_data) # Send to both users self.users[from_user].send_message(msg_data) self.users[to_user].send_message(msg_data) def broadcast_to_channel(self, channel_name: str, msg_data: dict): if channel_name not in self.channels: return channel = self.channels[channel_name] channel.add_message(msg_data) for member in channel.members: if member in self.users: self.users[member].send_message(msg_data) def send_channel_message(self, channel_name: str, username: str, message: str): if channel_name not in self.channels: return channel = self.channels[channel_name] if not channel.has_member(username): self.users[username].send_message({ 'type': 'error', 'message': f"You are not a member of #{channel_name}", 'timestamp': datetime.now().strftime("%H:%M:%S") }) return timestamp = datetime.now().strftime("%H:%M:%S") color = self.get_user_color(username) msg_data = { 'type': 'message', 'channel': channel_name, 'username': username, 'message': message, 'timestamp': timestamp, 'color': color } self.broadcast_to_channel(channel_name, msg_data) def broadcast_user_list(self): users_with_colors = [ {'username': u, 'color': self.get_user_color(u)} for u in self.users.keys() ] msg_data = { 'type': 'user_list', 'users': users_with_colors } for user_conn in self.users.values(): user_conn.send_message(msg_data) def get_user_channels(self, username: str) -> List[str]: return [name for name, channel in self.channels.items() if channel.has_member(username)]