Files
ChatSSH/chatapp/chat_server.py
2025-11-24 15:39:05 +00:00

252 lines
8.9 KiB
Python

"""Chat server logic and state management."""
import random
from datetime import datetime
from typing import Dict, Set, List
from .constants import USER_COLORS
from .models import Channel, DirectMessage, UserConnection
class ChatServer:
"""Central chat server managing users, channels, and messages."""
def __init__(self):
self.users: Dict[str, UserConnection] = {}
self.user_colors: Dict[str, str] = {}
self.available_colors = USER_COLORS.copy()
self.channels: Dict[str, Channel] = {
'global': Channel('global', 'system', is_private=False)
}
self.dms: Dict[frozenset, DirectMessage] = {}
def get_user_color(self, username: str) -> str:
if username not in self.user_colors:
if not self.available_colors:
self.available_colors = USER_COLORS.copy()
color = random.choice(self.available_colors)
self.available_colors.remove(color)
self.user_colors[username] = color
return self.user_colors[username]
def release_user_color(self, username: str):
if username in self.user_colors:
color = self.user_colors[username]
if color not in self.available_colors:
self.available_colors.append(color)
del self.user_colors[username]
def add_user(self, username: str, connection: UserConnection):
self.users[username] = connection
self.get_user_color(username)
self.channels['global'].add_member(username)
self.broadcast_to_channel('global', {
'type': 'system',
'message': f"{username} has joined the chat",
'timestamp': datetime.now().strftime("%H:%M:%S")
})
self.broadcast_user_list()
def remove_user(self, username: str):
if username in self.users:
# Remove from all channels
for channel in self.channels.values():
if channel.has_member(username):
channel.remove_member(username)
if channel.name != 'global' and len(channel.members) == 0:
# Delete empty non-global channels
pass
del self.users[username]
self.release_user_color(username)
self.broadcast_to_channel('global', {
'type': 'system',
'message': f"{username} has left the chat",
'timestamp': datetime.now().strftime("%H:%M:%S")
})
self.broadcast_user_list()
def create_channel(self, name: str, creator: str, is_private: bool = False) -> bool:
if name in self.channels:
return False
self.channels[name] = Channel(name, creator, is_private)
return True
def delete_channel(self, name: str, username: str) -> bool:
if name not in self.channels or name == 'global':
return False
channel = self.channels[name]
if channel.creator != username:
return False
# Notify all members
self.broadcast_to_channel(name, {
'type': 'system',
'message': f"Channel #{name} has been deleted by {username}",
'timestamp': datetime.now().strftime("%H:%M:%S")
})
del self.channels[name]
return True
def invite_to_channel(self, channel_name: str, inviter: str, invitee: str) -> tuple[bool, str]:
if channel_name not in self.channels:
return False, f"Channel #{channel_name} does not exist"
channel = self.channels[channel_name]
if not channel.has_member(inviter):
return False, f"You are not a member of #{channel_name}"
if invitee not in self.users:
return False, f"User '{invitee}' is not online"
if channel.has_member(invitee):
return False, f"{invitee} is already in #{channel_name}"
channel.invite(invitee)
# Send invite notification to invitee
if invitee in self.users:
self.users[invitee].send_message({
'type': 'info',
'message': f"{inviter} invited you to #{channel_name}. Use /join {channel_name} to accept",
'timestamp': datetime.now().strftime("%H:%M:%S")
})
return True, f"Invited {invitee} to #{channel_name}"
def join_channel(self, channel_name: str, username: str) -> tuple[bool, str]:
if channel_name not in self.channels:
return False, f"Channel #{channel_name} does not exist"
channel = self.channels[channel_name]
if channel.has_member(username):
return False, f"You are already in #{channel_name}"
if channel.is_private and not channel.is_invited(username):
return False, f"#{channel_name} is private. You need an invitation to join"
channel.add_member(username)
self.broadcast_to_channel(channel_name, {
'type': 'system',
'message': f"{username} joined #{channel_name}",
'timestamp': datetime.now().strftime("%H:%M:%S")
})
return True, f"Joined #{channel_name}"
def leave_channel(self, channel_name: str, username: str) -> tuple[bool, str]:
if channel_name == 'global':
return False, "Cannot leave #global"
if channel_name not in self.channels:
return False, f"Channel #{channel_name} does not exist"
channel = self.channels[channel_name]
if not channel.has_member(username):
return False, f"You are not in #{channel_name}"
channel.remove_member(username)
self.broadcast_to_channel(channel_name, {
'type': 'system',
'message': f"{username} left #{channel_name}",
'timestamp': datetime.now().strftime("%H:%M:%S")
})
return True, f"Left #{channel_name}"
def get_or_create_dm(self, user1: str, user2: str) -> DirectMessage:
dm_key = frozenset([user1, user2])
if dm_key not in self.dms:
self.dms[dm_key] = DirectMessage(user1, user2)
return self.dms[dm_key]
def send_dm(self, from_user: str, to_user: str, message: str):
timestamp = datetime.now().strftime("%H:%M:%S")
from_color = self.get_user_color(from_user)
if to_user not in self.users:
self.users[from_user].send_message({
'type': 'error',
'message': f"User '{to_user}' is not online",
'timestamp': timestamp
})
return
dm = self.get_or_create_dm(from_user, to_user)
msg_data = {
'type': 'dm',
'from': from_user,
'to': to_user,
'message': message,
'timestamp': timestamp,
'color': from_color
}
dm.add_message(msg_data)
# Send to both users
self.users[from_user].send_message(msg_data)
self.users[to_user].send_message(msg_data)
def broadcast_to_channel(self, channel_name: str, msg_data: dict):
if channel_name not in self.channels:
return
channel = self.channels[channel_name]
channel.add_message(msg_data)
for member in channel.members:
if member in self.users:
self.users[member].send_message(msg_data)
def send_channel_message(self, channel_name: str, username: str, message: str):
if channel_name not in self.channels:
return
channel = self.channels[channel_name]
if not channel.has_member(username):
self.users[username].send_message({
'type': 'error',
'message': f"You are not a member of #{channel_name}",
'timestamp': datetime.now().strftime("%H:%M:%S")
})
return
timestamp = datetime.now().strftime("%H:%M:%S")
color = self.get_user_color(username)
msg_data = {
'type': 'message',
'channel': channel_name,
'username': username,
'message': message,
'timestamp': timestamp,
'color': color
}
self.broadcast_to_channel(channel_name, msg_data)
def broadcast_user_list(self):
users_with_colors = [
{'username': u, 'color': self.get_user_color(u)}
for u in self.users.keys()
]
msg_data = {
'type': 'user_list',
'users': users_with_colors
}
for user_conn in self.users.values():
user_conn.send_message(msg_data)
def get_user_channels(self, username: str) -> List[str]:
return [name for name, channel in self.channels.items() if channel.has_member(username)]