Ghostcast Server May 2026

#!/usr/bin/env python3 """ GhostCast Server - Advanced Disk Imaging Solution Features: Multicast imaging, session management, client tracking, and recovery """ import socket import struct import threading import time import json import hashlib import os import sys from dataclasses import dataclass from typing import Dict, List, Optional from enum import Enum import logging Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger( name ) Protocol constants GHOST_PORT = 1236 MAGIC_COOKIE = b'GHOST' VERSION = 0x0001

MAGIC_COOKIE = b'GHOST'

class SessionStatus(Enum): WAITING = "waiting" ACTIVE = "active" COMPLETED = "completed" FAILED = "failed" ghostcast server

if == " main ": main() Client Implementation (Basic) #!/usr/bin/env python3 """ GhostCast Client - Receives multicast disk images """ import socket import struct import hashlib import os import threading import time import logging

def __init__(self, interface='0.0.0.0', port=GHOST_PORT): self.interface = interface self.port = port self.sessions: Dict[str, ImagingSession] = {} self.active_sessions: Dict[str, threading.Thread] = {} self.lock = threading.Lock() self.running = False def start(self): """Start the GhostCast server""" self.running = True logger.info(f"GhostCast Server starting on {self.interface}:{self.port}") # Start command interface cmd_thread = threading.Thread(target=self.command_interface, daemon=True) cmd_thread.start() # Start multicast listener self.multicast_listener() def multicast_listener(self): """Listen for multicast announcements and client connections""" sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Bind to multicast group multicast_group = '224.0.0.1' sock.bind(('', self.port)) # Join multicast group mreq = struct.pack("4sl", socket.inet_aton(multicast_group), socket.INADDR_ANY) sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) logger.info(f"Listening on multicast group {multicast_group}:{self.port}") while self.running: try: data, addr = sock.recvfrom(65535) self.handle_packet(data, addr, sock) except Exception as e: if self.running: logger.error(f"Error receiving packet: {e}") def handle_packet(self, data: bytes, addr: tuple, sock: socket.socket): """Process incoming packets""" if len(data) < 8: return magic = data[:5] if magic != MAGIC_COOKIE: return packet_type = PacketType(data[5]) if packet_type == PacketType.CLIENT_JOIN: self.handle_client_join(data[6:], addr, sock) elif packet_type == PacketType.HEARTBEAT: self.handle_heartbeat(data[6:], addr) elif packet_type == PacketType.RECOVERY_REQUEST: self.handle_recovery_request(data[6:], addr, sock) def handle_client_join(self, payload: bytes, addr: tuple, sock: socket.socket): """Handle client join requests""" try: # Parse join request session_id_len = payload[0] session_id = payload[1:1+session_id_len].decode('utf-8') client_id_len = payload[1+session_id_len] client_id = payload[2+session_id_len:2+session_id_len+client_id_len].decode('utf-8') with self.lock: if session_id not in self.sessions: logger.warning(f"Client {client_id} tried to join unknown session {session_id}") return session = self.sessions[session_id] # Add client to session client = Client( client_id=client_id, address=addr, last_heartbeat=time.time(), received_chunks=0, status="joined", ip_address=addr[0] ) session.clients[client_id] = client logger.info(f"Client {client_id} ({addr[0]}) joined session {session_id}") # Send acknowledgment self.send_join_ack(sock, addr, session_id, client_id) except Exception as e: logger.error(f"Error handling client join: {e}") def send_join_ack(self, sock: socket.socket, addr: tuple, session_id: str, client_id: str): """Send join acknowledgment to client""" packet = MAGIC_COOKIE + bytes([PacketType.CLIENT_READY.value]) packet += struct.pack('!I', len(session_id)) + session_id.encode() packet += struct.pack('!I', len(client_id)) + client_id.encode() sock.sendto(packet, addr) def handle_heartbeat(self, payload: bytes, addr: tuple): """Update client heartbeat timestamp""" try: client_id = payload.decode('utf-8') with self.lock: for session in self.sessions.values(): if client_id in session.clients: session.clients[client_id].last_heartbeat = time.time() session.clients[client_id].status = "active" break except Exception as e: logger.error(f"Error handling heartbeat: {e}") def handle_recovery_request(self, payload: bytes, addr: tuple, sock: socket.socket): """Handle recovery requests from failed clients""" try: session_id_len = payload[0] session_id = payload[1:1+session_id_len].decode('utf-8') client_id_len = payload[1+session_id_len] client_id = payload[2+session_id_len:2+session_id_len+client_id_len].decode('utf-8') last_chunk = struct.unpack('!I', payload[2+session_id_len+client_id_len:6+session_id_len+client_id_len])[0] with self.lock: if session_id not in self.sessions: return session = self.sessions[session_id] if client_id in session.clients: # Resume from last successful chunk session.clients[client_id].received_chunks = last_chunk session.clients[client_id].status = "recovering" logger.info(f"Recovery for {client_id} from chunk {last_chunk}") # Send missing chunks self.send_missing_chunks(sock, addr, session, client_id, last_chunk) except Exception as e: logger.error(f"Error handling recovery: {e}") def send_missing_chunks(self, sock: socket.socket, addr: tuple, session: ImagingSession, client_id: str, start_chunk: int): """Send missing data chunks to recovering client""" # This would send only the chunks the client missed pass def create_session(self, image_path: str, session_name: str = None, chunk_size: int = 65536, multicast_group: str = '224.0.0.1') -> str: """Create a new imaging session""" if not os.path.exists(image_path): raise FileNotFoundError(f"Image file not found: {image_path}") image_size = os.path.getsize(image_path) total_chunks = (image_size + chunk_size - 1) // chunk_size session_id = session_name or hashlib.md5(f"{time.time()}{image_path}".encode()).hexdigest()[:8] session = ImagingSession( session_id=session_id, image_name=image_path, image_size=image_size, chunk_size=chunk_size, status=SessionStatus.WAITING, clients={}, start_time=time.time(), multicast_group=multicast_group, port=self.port, total_chunks=total_chunks ) with self.lock: self.sessions[session_id] = session logger.info(f"Created session {session_id} for image {image_path} ({total_chunks} chunks)") # Start session thread session_thread = threading.Thread(target=self.run_session, args=(session_id,)) session_thread.start() self.active_sessions[session_id] = session_thread return session_id def run_session(self, session_id: str): """Execute an imaging session""" session = self.sessions[session_id] # Wait for clients to join logger.info(f"Session {session_id}: Waiting for clients...") time.sleep(10) # Give clients time to join if len(session.clients) == 0: logger.warning(f"Session {session_id}: No clients joined, cancelling") session.status = SessionStatus.FAILED return logger.info(f"Session {session_id}: Starting multicast with {len(session.clients)} clients") session.status = SessionStatus.ACTIVE # Start multicast transmission self.multicast_image(session_id) def multicast_image(self, session_id: str): """Multicast image data to all clients""" session = self.sessions[session_id] # Setup multicast socket sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2) try: with open(session.image_name, 'rb') as f: chunk_number = 0 while True: chunk_data = f.read(session.chunk_size) if not chunk_data: break # Create data packet packet = MAGIC_COOKIE + bytes([PacketType.DATA_CHUNK.value]) packet += struct.pack('!I', chunk_number) packet += struct.pack('!I', len(chunk_data)) packet += chunk_data packet += hashlib.md5(chunk_data).digest() # Checksum for integrity # Send to multicast group sock.sendto(packet, (session.multicast_group, session.port)) session.current_chunk = chunk_number chunk_number += 1 # Small delay to prevent network flooding time.sleep(0.001) # Progress report if chunk_number % 100 == 0: progress = (chunk_number / session.total_chunks) * 100 logger.info(f"Session {session_id}: Progress {progress:.1f}% ({chunk_number}/{session.total_chunks})") # Send completion signal completion_packet = MAGIC_COOKIE + bytes([PacketType.COMPLETE.value]) sock.sendto(completion_packet, (session.multicast_group, session.port)) session.status = SessionStatus.COMPLETED logger.info(f"Session {session_id}: Completed successfully") except Exception as e: logger.error(f"Session {session_id}: Failed - {e}") session.status = SessionStatus.FAILED finally: sock.close() def command_interface(self): """Interactive command interface for server management""" print("\n" + "="*60) print("GhostCast Server v1.0 - Interactive Console") print("="*60) print("Commands:") print(" list - List all sessions") print(" create <image> - Create new imaging session") print(" status <session> - Show session status") print(" clients <session> - List clients in session") print(" quit - Exit server") print("="*60) while self.running: try: cmd = input("\n> ").strip().split() if not cmd: continue if cmd[0] == 'quit': self.running = False break elif cmd[0] == 'list': self.list_sessions() elif cmd[0] == 'create' and len(cmd) > 1: image_path = cmd[1] session_name = cmd[2] if len(cmd) > 2 else None session_id = self.create_session(image_path, session_name) print(f"Created session: {session_id}") elif cmd[0] == 'status' and len(cmd) > 1: self.show_status(cmd[1]) elif cmd[0] == 'clients' and len(cmd) > 1: self.list_clients(cmd[1]) else: print("Unknown command") except KeyboardInterrupt: self.running = False break except Exception as e: print(f"Error: {e}") def list_sessions(self): """List all sessions""" with self.lock: if not self.sessions: print("No active sessions") return print("\nActive Sessions:") print("-" * 60) for session_id, session in self.sessions.items(): print(f"ID: {session_id}") print(f" Image: {session.image_name}") print(f" Status: {session.status.value}") print(f" Clients: {len(session.clients)}") print(f" Progress: {session.current_chunk}/{session.total_chunks} chunks") print() def show_status(self, session_id: str): """Show detailed session status""" with self.lock: if session_id not in self.sessions: print(f"Session {session_id} not found") return session = self.sessions[session_id] print(f"\nSession {session_id} Details:") print("-" * 40) print(f"Image: {session.image_name}") print(f"Size: {session.image_size / (1024**2):.2f} MB") print(f"Status: {session.status.value}") print(f"Chunk Size: {session.chunk_size} bytes") print(f"Total Chunks: {session.total_chunks}") print(f"Current Chunk: {session.current_chunk}") print(f"Multicast Group: {session.multicast_group}:{session.port}") print(f"Active Clients: {len(session.clients)}") if session.current_chunk > 0: progress = (session.current_chunk / session.total_chunks) * 100 print(f"Progress: {progress:.1f}%") def list_clients(self, session_id: str): """List all clients in a session""" with self.lock: if session_id not in self.sessions: print(f"Session {session_id} not found") return session = self.sessions[session_id] if not session.clients: print("No clients in this session") return print(f"\nClients in Session {session_id}:") print("-" * 60) for client_id, client in session.clients.items(): print(f"ID: {client_id}") print(f" IP: {client.ip_address}") print(f" Status: {client.status}") print(f" Received: {client.received_chunks} chunks") print(f" Last Heartbeat: {time.time() - client.last_heartbeat:.1f}s ago") print() def stop(self): """Stop the server""" self.running = False logger.info("GhostCast server shutting down") def main(): """Main entry point""" server = GhostCastServer() try: server.start() except KeyboardInterrupt: print("\nShutting down...") server.stop() except Exception as e: logger.error(f"Server error: {e}") sys.exit(1) ImagingSession] = {} self.active_sessions: Dict[str

class GhostCastClient: def (self, multicast_group='224.0.0.1', port=1236): self.multicast_group = multicast_group self.port = port self.client_id = socket.gethostname() self.session_id = None self.running = False

@dataclass class ImagingSession: """Imaging session configuration""" session_id: str image_name: str image_size: int chunk_size: int status: SessionStatus clients: Dict[str, Client] start_time: float multicast_group: str port: int total_chunks: int current_chunk: int = 0 addr = sock.recvfrom(65535) self.handle_packet(data

class PacketType(Enum): """GhostCast packet types""" SESSION_ANNOUNCE = 0x01 CLIENT_JOIN = 0x02 CLIENT_READY = 0x03 DATA_CHUNK = 0x04 HEARTBEAT = 0x05 COMPLETE = 0x06 ERROR = 0x07 RECOVERY_REQUEST = 0x08

We use cookies to optimize site functionality and give you the best possible experience.
Ok
x