246 lines
9.7 KiB
Python
246 lines
9.7 KiB
Python
"""Simple text-based chat client for SSH connections."""
|
|
|
|
import asyncio
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
from constants import COLORS
|
|
from chat_server import ChatServer
|
|
from models import UserConnection
|
|
|
|
|
|
class SimpleTextChatClient:
|
|
"""Simple text-based chat client that works over SSH."""
|
|
|
|
def __init__(self, username: str, message_queue: asyncio.Queue, chat_server: ChatServer, process):
|
|
self.username = username
|
|
self.message_queue = message_queue
|
|
self.chat_server = chat_server
|
|
self.process = process
|
|
self.current_view = 'global'
|
|
self.running = True
|
|
|
|
async def run(self):
|
|
"""Run the chat client."""
|
|
# Send welcome message
|
|
self.write_line("=" * 60)
|
|
self.write_line(f"Welcome to Terminal Chat, {self.username}!")
|
|
self.write_line("=" * 60)
|
|
self.write_line("Type /help for commands or start chatting!")
|
|
self.write_line(f"Current channel: #{self.current_view}")
|
|
self.write_line("")
|
|
|
|
# Start message processor
|
|
message_task = asyncio.create_task(self.process_messages())
|
|
|
|
# Start input handler
|
|
try:
|
|
while self.running:
|
|
self.process.stdout.write(f"[{self.current_view}] > ")
|
|
line = await self.process.stdin.readline()
|
|
|
|
if not line:
|
|
break
|
|
|
|
message = line.strip()
|
|
if message:
|
|
await self.handle_input(message)
|
|
except Exception as e:
|
|
self.write_line(f"Error: {e}")
|
|
finally:
|
|
self.running = False
|
|
message_task.cancel()
|
|
|
|
def write_line(self, text: str):
|
|
"""Write a line to the client."""
|
|
self.process.stdout.write(f"{text}\n")
|
|
|
|
async def process_messages(self):
|
|
"""Process incoming messages from the queue."""
|
|
while self.running:
|
|
try:
|
|
msg_data = await asyncio.wait_for(self.message_queue.get(), timeout=0.1)
|
|
|
|
if msg_data['type'] == 'message':
|
|
if msg_data.get('channel') == self.current_view:
|
|
self.write_line(
|
|
f"[{msg_data['timestamp']}] {msg_data['username']}: {msg_data['message']}"
|
|
)
|
|
|
|
elif msg_data['type'] == 'dm':
|
|
other_user = msg_data['from'] if msg_data['from'] != self.username else msg_data['to']
|
|
if self.current_view == f"dm:{other_user}":
|
|
self.write_line(
|
|
f"[{msg_data['timestamp']}] {msg_data['from']}: {msg_data['message']}"
|
|
)
|
|
|
|
elif msg_data['type'] == 'system':
|
|
if self.current_view == 'global':
|
|
self.write_line(f"[{msg_data['timestamp']}] [SYSTEM] {msg_data['message']}")
|
|
|
|
elif msg_data['type'] == 'error':
|
|
self.write_line(f"[ERROR] {msg_data['message']}")
|
|
|
|
elif msg_data['type'] == 'info':
|
|
self.write_line(f"[INFO] {msg_data['message']}")
|
|
|
|
elif msg_data['type'] == 'user_list':
|
|
pass # Don't auto-display user list updates
|
|
|
|
except asyncio.TimeoutError:
|
|
continue
|
|
except Exception as e:
|
|
continue
|
|
|
|
async def handle_input(self, message: str):
|
|
"""Handle user input."""
|
|
if message.startswith('/'):
|
|
await self.handle_command(message)
|
|
else:
|
|
# Send regular message
|
|
if self.current_view.startswith('dm:'):
|
|
target_user = self.current_view[3:]
|
|
self.chat_server.send_dm(self.username, target_user, message)
|
|
else:
|
|
self.chat_server.send_channel_message(self.current_view, self.username, message)
|
|
|
|
async def handle_command(self, message: str):
|
|
"""Handle chat commands."""
|
|
parts = message.split(' ', 2)
|
|
command = parts[0].lower()
|
|
|
|
if command in ['/help', '/h']:
|
|
self.show_help()
|
|
|
|
elif command in ['/dm', '/d']:
|
|
if len(parts) < 2:
|
|
self.write_line("[ERROR] Usage: /dm <username> [message]")
|
|
return
|
|
|
|
target_user = parts[1]
|
|
if target_user == self.username:
|
|
self.write_line("[ERROR] You cannot DM yourself")
|
|
return
|
|
|
|
if target_user not in self.chat_server.users:
|
|
self.write_line(f"[ERROR] User '{target_user}' is not online")
|
|
return
|
|
|
|
if len(parts) == 3:
|
|
self.chat_server.send_dm(self.username, target_user, parts[2])
|
|
else:
|
|
self.switch_view(f"dm:{target_user}")
|
|
|
|
elif command in ['/create', '/new']:
|
|
if len(parts) < 2:
|
|
self.write_line("[ERROR] Usage: /create <channel_name> [private]")
|
|
return
|
|
|
|
channel_name = parts[1].lower()
|
|
is_private = len(parts) > 2 and parts[2].lower() == 'private'
|
|
|
|
if self.chat_server.create_channel(channel_name, self.username, is_private):
|
|
self.write_line(f"[SUCCESS] Created channel #{channel_name}")
|
|
self.switch_view(channel_name)
|
|
else:
|
|
self.write_line(f"[ERROR] Channel #{channel_name} already exists")
|
|
|
|
elif command in ['/join', '/j']:
|
|
if len(parts) < 2:
|
|
self.write_line("[ERROR] Usage: /join <channel_name>")
|
|
return
|
|
|
|
channel_name = parts[1].lower()
|
|
success, msg = self.chat_server.join_channel(channel_name, self.username)
|
|
|
|
if success:
|
|
self.write_line(f"[SUCCESS] {msg}")
|
|
self.switch_view(channel_name)
|
|
else:
|
|
self.write_line(f"[ERROR] {msg}")
|
|
|
|
elif command in ['/leave', '/l']:
|
|
if len(parts) < 2:
|
|
self.write_line("[ERROR] Usage: /leave <channel_name>")
|
|
return
|
|
|
|
channel_name = parts[1].lower()
|
|
success, msg = self.chat_server.leave_channel(channel_name, self.username)
|
|
|
|
if success:
|
|
self.write_line(f"[SUCCESS] {msg}")
|
|
self.switch_view('global')
|
|
else:
|
|
self.write_line(f"[ERROR] {msg}")
|
|
|
|
elif command in ['/invite', '/inv']:
|
|
if len(parts) < 3:
|
|
self.write_line("[ERROR] Usage: /invite <channel_name> <username>")
|
|
return
|
|
|
|
channel_name = parts[1].lower()
|
|
invitee = parts[2]
|
|
|
|
success, msg = self.chat_server.invite_to_channel(channel_name, self.username, invitee)
|
|
|
|
if success:
|
|
self.write_line(f"[SUCCESS] {msg}")
|
|
else:
|
|
self.write_line(f"[ERROR] {msg}")
|
|
|
|
elif command in ['/switch', '/s']:
|
|
if len(parts) < 2:
|
|
self.write_line("[ERROR] Usage: /switch <channel_name>")
|
|
return
|
|
|
|
channel_name = parts[1].lower()
|
|
|
|
if channel_name in self.chat_server.channels and self.chat_server.channels[channel_name].has_member(self.username):
|
|
self.switch_view(channel_name)
|
|
else:
|
|
self.write_line(f"[ERROR] You are not a member of #{channel_name}")
|
|
|
|
elif command in ['/channels', '/ch']:
|
|
channels = self.chat_server.get_user_channels(self.username)
|
|
self.write_line(f"Your channels: {', '.join(['#' + c for c in channels])}")
|
|
|
|
elif command in ['/users', '/u']:
|
|
users = list(self.chat_server.users.keys())
|
|
self.write_line(f"Online users ({len(users)}): {', '.join(sorted(users))}")
|
|
|
|
elif command in ['/quit', '/q', '/exit']:
|
|
self.write_line("Goodbye!")
|
|
self.running = False
|
|
self.process.exit(0)
|
|
|
|
else:
|
|
self.write_line(f"[ERROR] Unknown command: {command}. Type /help for available commands")
|
|
|
|
def switch_view(self, view: str):
|
|
"""Switch to a different channel or DM."""
|
|
self.current_view = view
|
|
self.write_line("")
|
|
self.write_line("=" * 60)
|
|
if view.startswith('dm:'):
|
|
self.write_line(f"Switched to DM with {view[3:]}")
|
|
else:
|
|
self.write_line(f"Switched to channel #{view}")
|
|
self.write_line("=" * 60)
|
|
self.write_line("")
|
|
|
|
def show_help(self):
|
|
"""Show help message."""
|
|
self.write_line("")
|
|
self.write_line("Available commands:")
|
|
self.write_line(" /dm <user> [msg] or /d - DM a user")
|
|
self.write_line(" /create <name> [private] /new - Create a channel")
|
|
self.write_line(" /join <name> or /j - Join a channel")
|
|
self.write_line(" /leave <name> or /l - Leave a channel")
|
|
self.write_line(" /invite <ch> <user> or /inv - Invite user to channel")
|
|
self.write_line(" /switch <name> or /s - Switch to a channel")
|
|
self.write_line(" /channels or /ch - List your channels")
|
|
self.write_line(" /users or /u - List online users")
|
|
self.write_line(" /quit or /q - Exit")
|
|
self.write_line(" /help or /h - Show this help")
|
|
self.write_line("")
|