userOnlineService = app(UserOnlineService::class); } /** * Handle WebSocket connection open event * * @param Server $server * @param Request $request * @return void */ public function onOpen(Server $server, Request $request): void { try { $fd = $request->fd; $path = $request->server['path_info'] ?? $request->server['request_uri'] ?? '/'; Log::info('WebSocket connection opened', [ 'fd' => $fd, 'path' => $path, 'ip' => $request->server['remote_addr'] ?? 'unknown' ]); // Extract user ID from query parameters if provided $userId = $request->get['user_id'] ?? null; $token = $request->get['token'] ?? null; if ($userId && $token) { // Store user connection mapping $server->wsTable->set('uid:' . $userId, [ 'value' => $fd, 'expiry' => time() + 3600, // 1 hour expiry ]); $server->wsTable->set('fd:' . $fd, [ 'value' => $userId, 'expiry' => time() + 3600 ]); // Update user online status $this->userOnlineService->updateUserOnlineStatus($userId, $fd, true); Log::info('User connected to WebSocket', [ 'user_id' => $userId, 'fd' => $fd ]); // Send welcome message to client $server->push($fd, json_encode([ 'type' => 'welcome', 'data' => [ 'message' => 'WebSocket connection established', 'user_id' => $userId, 'timestamp' => time() ] ])); } else { Log::warning('WebSocket connection without authentication', [ 'fd' => $fd ]); // Send error message $server->push($fd, json_encode([ 'type' => 'error', 'data' => [ 'message' => 'Authentication required. Please provide user_id and token.', 'code' => 401 ] ])); } } catch (\Exception $e) { Log::error('WebSocket onOpen error', [ 'error' => $e->getMessage(), 'trace' => $e->getTraceAsString() ]); } } /** * Handle WebSocket message event * * @param Server $server * @param Frame $frame * @return void */ public function onMessage(Server $server, Frame $frame): void { try { $fd = $frame->fd; $data = $frame->data; Log::info('WebSocket message received', [ 'fd' => $fd, 'data' => $data, 'opcode' => $frame->opcode ]); // Parse incoming message $message = json_decode($data, true); if (!$message) { $server->push($fd, json_encode([ 'type' => 'error', 'data' => [ 'message' => 'Invalid JSON format', 'code' => 400 ] ])); return; } // Handle different message types $this->handleMessage($server, $fd, $message); } catch (\Exception $e) { Log::error('WebSocket onMessage error', [ 'error' => $e->getMessage(), 'trace' => $e->getTraceAsString() ]); } } /** * Handle WebSocket message based on type * * @param Server $server * @param int $fd * @param array $message * @return void */ protected function handleMessage(Server $server, int $fd, array $message): void { $type = $message['type'] ?? 'unknown'; $data = $message['data'] ?? []; switch ($type) { case 'ping': // Respond to ping with pong $server->push($fd, json_encode([ 'type' => 'pong', 'data' => [ 'timestamp' => time() ] ])); break; case 'heartbeat': // Handle heartbeat $server->push($fd, json_encode([ 'type' => 'heartbeat_ack', 'data' => [ 'timestamp' => time() ] ])); break; case 'chat': // Handle chat message $this->handleChatMessage($server, $fd, $data); break; case 'broadcast': // Handle broadcast message (admin only) $this->handleBroadcast($server, $fd, $data); break; case 'subscribe': // Handle channel subscription $this->handleSubscribe($server, $fd, $data); break; case 'unsubscribe': // Handle channel unsubscription $this->handleUnsubscribe($server, $fd, $data); break; default: $server->push($fd, json_encode([ 'type' => 'error', 'data' => [ 'message' => 'Unknown message type: ' . $type, 'code' => 400 ] ])); break; } } /** * Handle chat message * * @param Server $server * @param int $fd * @param array $data * @return void */ protected function handleChatMessage(Server $server, int $fd, array $data): void { $toUserId = $data['to_user_id'] ?? null; $content = $data['content'] ?? ''; if (!$toUserId || !$content) { $server->push($fd, json_encode([ 'type' => 'error', 'data' => [ 'message' => 'Missing required fields: to_user_id and content', 'code' => 400 ] ])); return; } // Get target user's connection $targetFd = $server->wsTable->get('uid:' . $toUserId); if ($targetFd && $targetFd['value']) { $server->push((int)$targetFd['value'], json_encode([ 'type' => 'chat', 'data' => [ 'from_user_id' => $server->wsTable->get('fd:' . $fd)['value'] ?? null, 'content' => $content, 'timestamp' => time() ] ])); // Send delivery receipt to sender $server->push($fd, json_encode([ 'type' => 'message_delivered', 'data' => [ 'to_user_id' => $toUserId, 'content' => $content, 'timestamp' => time() ] ])); } else { $server->push($fd, json_encode([ 'type' => 'error', 'data' => [ 'message' => 'Target user is not online', 'code' => 404 ] ])); } } /** * Handle broadcast message * * @param Server $server * @param int $fd * @param array $data * @return void */ protected function handleBroadcast(Server $server, int $fd, array $data): void { $message = $data['message'] ?? ''; $userId = $server->wsTable->get('fd:' . $fd)['value'] ?? null; // TODO: Check if user has admin permission to broadcast // For now, allow any authenticated user if (!$message) { $server->push($fd, json_encode([ 'type' => 'error', 'data' => [ 'message' => 'Message content is required', 'code' => 400 ] ])); return; } // Broadcast to all connected clients except sender $broadcastData = json_encode([ 'type' => 'broadcast', 'data' => [ 'from_user_id' => $userId, 'message' => $message, 'timestamp' => time() ] ]); foreach ($server->connections as $connectionFd) { if ($server->isEstablished($connectionFd) && $connectionFd !== $fd) { $server->push($connectionFd, $broadcastData); } } // Send confirmation to sender $server->push($fd, json_encode([ 'type' => 'broadcast_sent', 'data' => [ 'message' => $message, 'timestamp' => time() ] ])); } /** * Handle channel subscription * * @param Server $server * @param int $fd * @param array $data * @return void */ protected function handleSubscribe(Server $server, int $fd, array $data): void { $channel = $data['channel'] ?? ''; if (!$channel) { $server->push($fd, json_encode([ 'type' => 'error', 'data' => [ 'message' => 'Channel name is required', 'code' => 400 ] ])); return; } // Store subscription in wsTable $server->wsTable->set('channel:' . $channel . ':fd:' . $fd, [ 'value' => 1, 'expiry' => time() + 7200 // 2 hours ]); $server->push($fd, json_encode([ 'type' => 'subscribed', 'data' => [ 'channel' => $channel, 'timestamp' => time() ] ])); Log::info('User subscribed to channel', [ 'fd' => $fd, 'channel' => $channel ]); } /** * Handle channel unsubscription * * @param Server $server * @param int $fd * @param array $data * @return void */ protected function handleUnsubscribe(Server $server, int $fd, array $data): void { $channel = $data['channel'] ?? ''; if (!$channel) { $server->push($fd, json_encode([ 'type' => 'error', 'data' => [ 'message' => 'Channel name is required', 'code' => 400 ] ])); return; } // Remove subscription from wsTable $server->wsTable->del('channel:' . $channel . ':fd:' . $fd); $server->push($fd, json_encode([ 'type' => 'unsubscribed', 'data' => [ 'channel' => $channel, 'timestamp' => time() ] ])); Log::info('User unsubscribed from channel', [ 'fd' => $fd, 'channel' => $channel ]); } /** * Handle WebSocket connection close event * * @param Server $server * @param $fd * @param $reactorId * @return void */ public function onClose(Server $server, $fd, $reactorId): void { try { Log::info('WebSocket connection closed', [ 'fd' => $fd, 'reactor_id' => $reactorId ]); // Get user ID from wsTable $userId = $server->wsTable->get('fd:' . $fd)['value'] ?? null; if ($userId) { // Remove user connection mapping $server->wsTable->del('uid:' . $userId); $server->wsTable->del('fd:' . $fd); // Update user online status $this->userOnlineService->updateUserOnlineStatus($userId, $fd, false); Log::info('User disconnected from WebSocket', [ 'user_id' => $userId, 'fd' => $fd ]); } // Clean up channel subscriptions // Note: In production, you might want to iterate through all channel keys // and remove the ones associated with this fd } catch (\Exception $e) { Log::error('WebSocket onClose error', [ 'error' => $e->getMessage(), 'trace' => $e->getTraceAsString() ]); } } }