Files
laravel_swoole/docs/README_WEBSOCKET_NOTIFICATION.md

64 KiB
Raw Permalink Blame History

WebSocket 与通知系统完整文档

目录


概述

本项目基于 Laravel-S 和 Swoole 实现了完整的 WebSocket 与通知系统,支持实时双向通信、消息推送、通知管理等功能。系统采用 WebSocket 实现实时推送,同时将通知持久化到数据库,确保离线用户也能收到通知。

核心特性

  • 实时双向通信WebSocket
  • 用户连接管理(单例模式,避免重复连接)
  • 点对点消息发送
  • 群发消息/广播
  • 频道订阅/取消订阅
  • 心跳机制与自动重连
  • 在线状态管理
  • 系统通知推送
  • 数据更新推送
  • 通知持久化存储
  • 已读/未读状态管理
  • 批量操作支持
  • 连接时自动授权(无需单独发送 auth 事件)

功能特性

WebSocket 功能

功能 说明 状态
自动连接管理 登录后自动建立连接,退出时自动关闭
单例连接模式 全局只有一个 WebSocket 实例,避免重复连接
连接时授权 通过 URL 参数传递 token握手时即完成认证
断线重连 连接断开自动重连最多5次
心跳机制 客户端每30秒发送心跳保持连接
用户认证 JWT token 验证,验证用户 ID 匹配和过期时间
点对点消息 发送消息给指定用户
广播消息 向所有在线用户发送消息
频道订阅 支持频道订阅和取消订阅
在线状态 实时获取用户在线状态

通知功能

功能 说明 状态
通知发送 支持单个/批量/广播发送
通知类型 info/success/warning/error/task/system
通知分类 system/task/message/reminder/announcement
实时推送 在线用户通过 WebSocket 实时推送
持久化存储 所有通知保存到数据库
已读管理 支持标记已读(单个/批量/全部)
通知删除 支持删除(单个/批量/清空)
重试机制 发送失败自动重试最多3次
统计分析 提供通知统计数据

技术架构

后端架构

┌─────────────────────────────────────────┐
│         Laravel + Laravel-S          │
├─────────────────────────────────────────┤
│                                    │
│  ┌──────────────────────────────┐   │
│  │   WebSocketHandler        │   │
│  │   (WebSocket 事件处理)        │   │
│  │   - onOpen: 连接时授权         │   │
│  │   - onMessage: 消息路由       │   │
│  │   - onClose: 清理连接         │   │
│  └──────────┬───────────────┘   │
│             │                     │
│             ▼                     │
│  ┌──────────────────────────────┐   │
│  │   WebSocketService        │   │
│  │   (WebSocket 管理)          │   │
│  └──────────┬───────────────┘   │
│             │                     │
│             ▼                     │
│  ┌──────────────────────────────┐   │
│  │   NotificationService     │   │
│  │   (通知服务)               │   │
│  └──────────┬───────────────┘   │
│             │                     │
│             ▼                     │
│  ┌──────────────────────────────┐   │
│  │     Notification Model     │   │
│  │   (通知模型)               │   │
│  └──────────┬───────────────┘   │
│             │                     │
│             ▼                     │
│  ┌──────────────────────────────┐   │
│  │      Swoole Server       │   │
│  │   (WebSocket 服务器)         │   │
│  └──────────┬───────────────┘   │
│             │                     │
│             ▼                     │
│  ┌──────────────────────────────┐   │
│  │      MySQL Database       │   │
│  │   (数据持久化)             │   │
│  └──────────────────────────────┘   │
│                                    │
└─────────────────────────────────────────┘

前端架构

┌─────────────────────────────────────────┐
│          Vue 3 + Vite             │
├─────────────────────────────────────────┤
│                                    │
│  ┌──────────────────────────────┐   │
│  │      App.vue (统一连接点)  │   │
│  │   - 初始化 WebSocket         │   │
│  │   - 监听用户状态            │   │
│  └──────────┬───────────────┘   │
│             │                     │
│             ▼                     │
│  ┌──────────────────────────────┐   │
│  │   useWebSocket Hook     │   │
│  │   (单例管理 + 消息路由)     │   │
│  └──────────┬───────────────┘   │
│             │                     │
│             ▼                     │
│  ┌──────────────────────────────┐   │
│  │    WebSocket Client      │   │
│  │   (单例 + 重连 + 心跳)      │   │
│  └──────────┬───────────────┘   │
│             │                     │
│             ▼                     │
│  ┌──────────────────────────────┐   │
│  │   Notification Store      │   │
│  │   (Pinia 状态管理)          │   │
│  └──────────┬───────────────┘   │
│             │                     │
│             ▼                     │
│  ┌──────────────────────────────┐   │
│  │  Userbar/Notification Page │   │
│  │   (展示 + 交互)            │   │
│  └──────────────────────────────┘   │
│                                    │
└─────────────────────────────────────────┘

后端实现

1. 数据模型

system_notifications 表

字段 类型 说明
id bigint 主键ID
user_id bigint 接收通知的用户ID
title varchar(255) 通知标题
content text 通知内容
type varchar(50) 通知类型
category varchar(50) 通知分类
data json 附加数据
action_type varchar(50) 操作类型
action_data text 操作数据
is_read boolean 是否已读
read_at timestamp 阅读时间
sent_via_websocket boolean 是否已通过WebSocket发送
sent_at timestamp 发送时间
retry_count int 重试次数
created_at timestamp 创建时间
updated_at timestamp 更新时间
deleted_at timestamp 删除时间(软删除)

2. 核心服务类

WebSocketHandler (app/Services/WebSocket/WebSocketHandler.php)

WebSocket 处理器,实现 Swoole 的 WebSocketHandlerInterface 接口。

主要方法:

  • onOpen(): 处理连接建立事件,连接时通过 URL 参数授权
  • onMessage(): 处理消息接收事件
  • onClose(): 处理连接关闭事件

支持的消息类型:

  • ping/pong: 心跳检测
  • heartbeat: 心跳确认
  • chat: 私聊消息
  • broadcast: 广播消息
  • subscribe/unsubscribe: 频道订阅/取消订阅

连接时授权流程:

public function onOpen(Server $server, Request $request): void
{
	// 从 URL 查询参数获取认证信息
	$userId = (int)($request->get['user_id'] ?? 0);
	$token = $request->get['token'] ?? '';

	// 用户认证
	if (!$userId || !$token) {
		$server->push($request->fd, json_encode([
			'type' => 'error',
			'data' => [
				'message' => '认证失败:缺少 user_id 或 token',
				'code' => 401
			]
		]));
		$server->disconnect($request->fd);
		return;
	}

	// 验证 JWT token
	try {
		$payload = JWTAuth::setToken($token)->getPayload();

		// 验证 token 中的用户 ID 是否匹配
		$tokenUserId = $payload['sub'] ?? null;
		if ($tokenUserId != $userId) {
			Log::warning('WebSocket 认证失败:用户 ID 不匹配', [
				'fd' => $request->fd,
				'token_user_id' => $tokenUserId,
				'query_user_id' => $userId
			]);
			
			$server->push($request->fd, json_encode([
				'type' => 'error',
				'data' => [
					'message' => '认证失败:用户 ID 不匹配',
					'code' => 401
				]
			]));
			$server->disconnect($request->fd);
			return;
		}

		// 验证 token 是否过期
		if (isset($payload['exp']) && $payload['exp'] < time()) {
			Log::warning('WebSocket 认证失败token 已过期', [
				'fd' => $request->fd,
				'user_id' => $userId,
				'exp' => $payload['exp'],
				'current_time' => time()
			]);
			
			$server->push($request->fd, json_encode([
				'type' => 'error',
				'data' => [
					'message' => '认证失败token 已过期',
					'code' => 401
				]
			]));
			$server->disconnect($request->fd);
			return;
		}

		Log::info('WebSocket 认证成功', [
			'fd' => $request->fd,
			'user_id' => $userId
		]);
	} catch (\Exception $e) {
		Log::warning('WebSocket 认证失败:无效的 token', [
			'fd' => $request->fd,
			'user_id' => $userId,
			'error' => $e->getMessage()
		]);
		
		$server->push($request->fd, json_encode([
			'type' => 'error',
			'data' => [
				'message' => '认证失败:无效的 token',
				'code' => 401
			]
		]));
		$server->disconnect($request->fd);
		return;
	}

	// 认证成功,存储连接映射
	$wsTable->set('uid:' . $userId, [
		'value' => $request->fd,
		'expiry' => time() + 3600
	]);

	$wsTable->set('fd:' . $request->fd, [
		'value' => $userId,
		'expiry' => time() + 3600
	]);

	// 发送欢迎消息
	$server->push($request->fd, json_encode([
		'type' => 'connected',
		'data' => [
			'message' => '欢迎连接到 LaravelS WebSocket',
			'user_id' => $userId,
			'fd' => $request->fd,
			'timestamp' => time()
		]
	]));
}

WebSocketService (app/Services/WebSocket/WebSocketService.php)

WebSocket 服务类,提供便捷的 WebSocket 操作方法。

主要方法:

// 发送消息给指定用户
sendToUser(int $userId, array $data): bool

// 发送消息给多个用户
sendToUsers(array $userIds, array $data): array

// 广播消息给所有用户
broadcast(array $data, ?int $excludeUserId = null): int

// 发送消息到频道
sendToChannel(string $channel, array $data): int

// 获取在线用户数
getOnlineUserCount(): int

// 检查用户是否在线
isUserOnline(int $userId): bool

// 获取在线用户ID列表
getOnlineUserIds(): array

// 断开用户连接
disconnectUser(int $userId): bool

// 发送系统通知
sendSystemNotification(string $title, string $content, string $type): int

// 发送通知给指定用户
sendNotificationToUsers(array $userIds, string $title, string $content, string $type): int

// 推送数据更新
pushDataUpdate(array $userIds, string $resourceType, string $action, array $data): array

// 推送数据更新到频道
pushDataUpdateToChannel(string $channel, string $resourceType, string $action, array $data): int

NotificationService (app/Services/System/NotificationService.php)

通知服务类,负责通知的创建、发送和管理。

主要方法:

// 发送通知给单个用户
sendToUser(int $userId, string $title, string $content, string $type, string $category, ?array $extraData = null): array

// 发送通知给多个用户
sendToUsers(array $userIds, string $title, string $content, string $type, string $category, ?array $extraData = null): array

// 广播通知给所有用户
broadcast(string $title, string $content, string $type, string $category, ?array $extraData = null): array

// 发送任务通知
sendTaskNotification(int $userId, string $title, string $content, array $taskData): array

// 发送消息通知
sendNewMessageNotification(int $userId, string $title, string $content, array $messageData): array

// 发送提醒通知
sendReminderNotification(int $userId, string $title, string $content, array $reminderData): array

// 标记通知为已读
markAsRead(int $notificationId): bool

// 批量标记为已读
markMultipleAsRead(array $notificationIds): int

// 标记所有通知为已读
markAllAsRead(int $userId): int

// 删除通知
deleteNotification(int $notificationId): bool

// 批量删除通知
deleteMultipleNotifications(array $notificationIds): int

// 清空已读通知
clearReadNotifications(int $userId): int

// 重试未发送的通知
retryUnsentNotifications(int $limit = 100): int

// 获取通知统计
getStatistics(int $userId): array

3. 控制器

NotificationController (app/Http/Controllers/System/Admin/Notification.php)

后台管理通知控制器,提供完整的 CRUD 操作。

主要方法:

  • index(): 获取通知列表
  • show(): 获取通知详情
  • read(): 标记通知为已读
  • batchRead(): 批量标记为已读
  • readAll(): 标记所有通知为已读
  • destroy(): 删除通知
  • batchDelete(): 批量删除通知
  • clearRead(): 清空已读通知
  • unread(): 获取未读通知列表
  • unreadCount(): 获取未读通知数量
  • statistics(): 获取通知统计
  • send(): 发送通知(管理员)
  • retryUnsent(): 重试未发送的通知(管理员)

WebSocketController (app/Http/Controllers/System/WebSocket.php)

WebSocket API 控制器,提供 HTTP 接口用于管理 WebSocket 连接。

主要方法:

  • getOnlineCount(): 获取在线用户数
  • getOnlineUsers(): 获取在线用户列表
  • checkOnline(): 检查用户在线状态
  • sendToUser(): 发送消息给指定用户
  • sendToUsers(): 发送消息给多个用户
  • broadcast(): 广播消息
  • sendToChannel(): 发送消息到频道
  • sendNotification(): 发送系统通知
  • sendNotificationToUsers(): 发送通知给指定用户
  • pushDataUpdate(): 推送数据更新
  • pushDataUpdateChannel(): 推送数据更新到频道
  • disconnectUser(): 断开用户连接

4. 定时任务

RetryUnsentNotifications (app/Console/Commands/RetryUnsentNotifications.php)

自动重试发送失败的通知。

使用方法:

# 重试最多100条未发送的通知
php artisan notifications:retry-unsent

# 重试最多50条未发送的通知
php artisan notifications:retry-unsent --limit=50

定时任务配置:

// config/crontab.php
// 每5分钟重试一次未发送的通知
*/5 * * * * php /path/to/artisan notifications:retry-unsent --limit=50

前端实现

1. WebSocket 客户端

WebSocketClient (resources/admin/src/utils/websocket.js)

WebSocket 客户端封装类,提供自动连接、重连、消息处理等功能。

主要功能:

  • 单例模式,确保全局只有一个实例
  • 自动连接和重连
  • 心跳机制
  • 消息类型路由
  • 事件监听
  • 连接状态管理
  • 连接时通过 URL 参数授权

连接 URL 构建:

export function createWebSocket(userId, token, options = {}) {
  const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'
  const host = options.wsUrl || window.location.host
  
  // ✅ 认证信息通过 URL 参数传递
  const url = `${protocol}//${host}/ws?user_id=${userId}&token=${token}`
  
  return new WebSocketClient(url, options)
}

单例管理:

let wsClient = null
let currentUserId = null
let currentToken = null
let pendingConnection = null

export function getWebSocket(userId, token, options = {}) {
  // 检查是否已有相同用户的连接
  if (wsClient) {
	if (currentUserId === userId && currentToken === token) {
	  const state = wsClient.getConnectionState()
	  
	  // 如果已连接或正在连接,返回现有实例
	  if (state === 'OPEN' || state === 'CONNECTING') {
		return wsClient
	  }
	  
	  // 如果连接已关闭,清理
	  if (state === 'CLOSED' || state === 'CLOSING') {
		wsClient = null
	  }
	}
	
	// 不同用户/Token断开旧连接
	if (currentUserId !== userId || currentToken !== token) {
	  if (wsClient) {
		wsClient.disconnect()
	  }
	  wsClient = null
	}
  }
  
  // 创建新连接
  currentUserId = userId
  currentToken = token
  wsClient = createWebSocket(userId, token, options)
  
  return wsClient
}

使用示例:

import { getWebSocket, closeWebSocket } from '@/utils/websocket'
import { useUserStore } from '@/stores/modules/user'

const userStore = useUserStore()

// 连接 WebSocket连接时自动授权
const ws = getWebSocket(userStore.userInfo.id, userStore.token, {
  onOpen: (event) => {
	console.log('WebSocket 已连接(已通过 URL 参数完成认证)')
  },
  onMessage: (message) => {
	console.log('收到消息:', message)
  },
  onError: (error) => {
	console.error('WebSocket 错误:', error)
  },
  onClose: (event) => {
	console.log('WebSocket 已关闭')
  }
})

// 连接
ws.connect()

// 发送消息
ws.send('heartbeat', { timestamp: Date.now() })

// 监听特定消息类型
ws.on('notification', (data) => {
  message.success(data.title, data.message)
})

// 断开连接
ws.disconnect()

2. useWebSocket Hook

useWebSocket (resources/admin/src/composables/useWebSocket.js)

WebSocket 组合式函数,统一管理 WebSocket 连接和消息处理。

主要功能:

  • 统一初始化 WebSocket只在 App.vue 中调用)
  • 防止重复初始化
  • 自动注册消息处理器
  • 监听用户状态变化
  • 连接时自动授权(无需发送 auth 事件)

关键实现:

let wsInstance = null
let isInitialized = false

export function useWebSocket() {
  const userStore = useUserStore()
  
  const initWebSocket = () => {
	// 防止重复初始化
	if (isInitialized && wsInstance && wsInstance.isConnected) {
	  console.log('WebSocket 已连接,跳过重复初始化')
	  return
	}
	
	isInitialized = true
	
	// 获取 WebSocket 实例(单例)
	wsInstance = getWebSocket(userStore.userInfo.id, userStore.token, {
	  onOpen: handleOpen,
	  onMessage: handleMessage,
	  onError: handleError,
	  onClose: handleClose
	})
	
	// 注册消息处理器
	registerMessageHandlers()
	
	// 连接
	wsInstance.connect()
  }
  
  // 连接打开(已通过 URL 参数完成认证)
  function handleOpen(event) {
	console.log('WebSocket 连接已建立(已通过 URL 参数完成认证)', event)
  }
  
  return {
	ws: wsInstance,
	initWebSocket,
	closeWebSocket,
	reconnect,
	isConnected,
	send
  }
}

3. 通知 Store

Notification Store (resources/admin/src/stores/modules/notification.js)

通知状态管理,基于 Pinia 实现。

主要方法:

// 获取未读数量
await notificationStore.fetchUnreadCount()

// 获取未读通知列表
await notificationStore.fetchUnreadNotifications({
  page: 1,
  page_size: 10
})

// 获取通知列表
await notificationStore.fetchNotifications({
  page: 1,
  page_size: 20,
  type: 'notification',
  category: 'system'
})

// 标记为已读
await notificationStore.markAsRead(notificationId)

// 批量标记为已读
await notificationStore.markMultipleAsRead([1, 2, 3])

// 标记所有为已读
await notificationStore.markAllAsRead()

// 删除通知
await notificationStore.deleteNotification(notificationId)

// 批量删除
await notificationStore.deleteMultipleNotifications([1, 2, 3])

// 清空已读通知
await notificationStore.clearReadNotifications()

// 发送通知(管理员)
await notificationStore.sendNotification({
  recipient_id: userId,
  title: '通知标题',
  content: '通知内容',
  type: 'info',
  category: 'system'
})

// 重试未发送的通知(管理员)
await notificationStore.retryUnsentNotifications(100)

// 获取统计信息
await notificationStore.fetchStatistics()

4. 用户栏通知组件

Userbar Component (resources/admin/src/layouts/components/userbar.vue)

顶部用户栏中的通知组件,提供:

  • 通知铃铛图标
  • 未读数量徽章
  • 通知下拉列表
  • 快捷操作(全部已读、清空)
  • 通知分类筛选
  • 点击标记已读

功能特性:

  • 未读消息数量实时更新
  • 支持按类型筛选(全部/通知/任务/警告)
  • 悬浮显示删除按钮
  • 分页浏览
  • 本地缓存localStorage

5. 通知列表页面

Notification List Page (resources/admin/src/pages/system/notifications/index.vue)

通知管理页面,提供完整的通知管理功能。

功能特性:

  • 通知列表展示
  • 搜索和筛选
  • 批量操作
  • 通知详情查看
  • 已读/未读状态切换
  • 删除和清空

API接口

WebSocket 接口

1. 获取在线用户数

GET /admin/websocket/online-count

响应:

{
	"code": 200,
	"message": "success",
	"data": {
		"online_count": 10
	}
}

2. 获取在线用户列表

GET /admin/websocket/online-users

响应:

{
	"code": 200,
	"message": "success",
	"data": {
		"user_ids": [1, 2, 3, 4, 5],
		"count": 5
	}
}

3. 检查用户在线状态

POST /admin/websocket/check-online

请求参数:

{
	"user_id": 1
}

响应:

{
	"code": 200,
	"message": "success",
	"data": {
		"user_id": 1,
		"is_online": true
	}
}

4. 发送消息给指定用户

POST /admin/websocket/send-to-user

请求参数:

{
	"user_id": 1,
	"type": "notification",
	"data": {
		"title": "新消息",
		"message": "您有一条新消息"
	}
}

5. 发送消息给多个用户

POST /admin/websocket/send-to-users

请求参数:

{
	"user_ids": [1, 2, 3],
	"type": "notification",
	"data": {
		"title": "系统通知",
		"message": "系统将在今晚进行维护"
	}
}

6. 广播消息

POST /admin/websocket/broadcast

请求参数:

{
	"type": "notification",
	"data": {
		"title": "公告",
		"message": "欢迎使用新版本"
	},
	"exclude_user_id": 1  // 可选:排除某个用户
}

7. 发送消息到频道

POST /admin/websocket/send-to-channel

请求参数:

{
	"channel": "orders",
	"type": "data_update",
	"data": {
		"order_id": 123,
		"status": "paid"
	}
}

8. 发送系统通知

POST /admin/websocket/send-notification

请求参数:

{
	"title": "系统维护",
	"message": "系统将于今晚 23:00-24:00 进行维护",
	"type": "warning",
	"extra_data": {
		"start_time": "23:00",
		"end_time": "24:00"
	}
}

9. 发送通知给指定用户

POST /admin/websocket/send-notification-to-users

请求参数:

{
	"user_ids": [1, 2, 3],
	"title": "订单更新",
	"message": "您的订单已发货",
	"type": "success"
}

10. 推送数据更新

POST /admin/websocket/push-data-update

请求参数:

{
	"user_ids": [1, 2, 3],
	"resource_type": "order",
	"action": "update",
	"data": {
		"id": 123,
		"status": "shipped"
	}
}

11. 推送数据更新到频道

POST /admin/websocket/push-data-update-channel

请求参数:

{
	"channel": "orders",
	"resource_type": "order",
	"action": "create",
	"data": {
		"id": 124,
		"customer": "张三",
		"amount": 100.00
	}
}

12. 断开用户连接

POST /admin/websocket/disconnect-user

请求参数:

{
	"user_id": 1
}

通知接口

1. 获取通知列表

GET /admin/system/notifications

请求参数:

{
  "user_id": 1,           // 用户ID可选默认为当前用户
  "keyword": "通知",       // 关键字搜索
  "is_read": false,         // 阅读状态true/false
  "type": "info",          // 通知类型
  "category": "system",     // 通知分类
  "start_date": "2024-01-01", // 开始日期
  "end_date": "2024-12-31",   // 结束日期
  "page": 1,              // 页码
  "page_size": 20          // 每页数量
}

响应:

{
  "code": 200,
  "message": "success",
  "data": {
	"list": [
	  {
		"id": 1,
		"user_id": 1,
		"title": "系统通知",
		"content": "这是一个测试通知",
		"type": "info",
		"category": "system",
		"data": {},
		"action_type": null,
		"action_data": null,
		"is_read": false,
		"read_at": null,
		"sent_via_websocket": true,
		"sent_at": "2024-02-18 10:00:00",
		"retry_count": 0,
		"created_at": "2024-02-18 10:00:00"
	  }
	],
	"total": 100,
	"page": 1,
	"page_size": 20
  }
}

2. 获取未读通知

GET /admin/system/notifications/unread?limit=10

3. 获取未读通知数量

GET /admin/system/notifications/unread-count

响应:

{
  "code": 200,
  "message": "success",
  "data": {
	"count": 5
  }
}

4. 获取通知详情

GET /admin/system/notifications/{id}

5. 标记通知为已读

POST /admin/system/notifications/{id}/read

6. 批量标记为已读

POST /admin/system/notifications/batch-read

请求参数:

{
  "ids": [1, 2, 3, 4, 5]
}

7. 标记所有通知为已读

POST /admin/system/notifications/read-all

8. 删除通知

DELETE /admin/system/notifications/{id}

9. 批量删除通知

POST /admin/system/notifications/batch-delete

请求参数:

{
  "ids": [1, 2, 3]
}

10. 清空已读通知

POST /admin/system/notifications/clear-read

11. 获取通知统计

GET /admin/system/notifications/statistics

响应:

{
  "code": 200,
  "message": "success",
  "data": {
	"total": 100,
	"unread": 5,
	"read": 95,
	"by_type": {
	  "info": 50,
	  "success": 20,
	  "warning": 15,
	  "error": 10,
	  "task": 5
	},
	"by_category": {
	  "system": 60,
	  "task": 20,
	  "message": 10,
	  "reminder": 8,
	  "announcement": 2
	}
  }
}

12. 发送通知(管理员功能)

POST /admin/system/notifications/send

请求参数:

{
  "user_ids": [1, 2, 3],  // 用户ID数组为空则发送给所有用户
  "title": "系统维护通知",
  "content": "系统将于今晚22:00进行维护预计维护时间2小时。",
  "type": "warning",
  "category": "announcement",
  "data": {
	"maintenance_start": "2024-02-18 22:00:00",
	"maintenance_end": "2024-02-19 00:00:00"
  },
  "action_type": "link",
  "action_data": {
	"url": "/system/maintenance"
  }
}

13. 重试发送未发送的通知(管理员功能)

POST /admin/system/notifications/retry-unsent?limit=100

配置说明

后端配置

Laravel-S 配置 (config/laravels.php)

'websocket' => [
	'enable' => env('LARAVELS_WEBSOCKET', true),
	'handler' => \App\Services\WebSocket\WebSocketHandler::class,
],

'swoole' => [
	'enable_coroutine' => true,
	'worker_num' => 4,
	'max_request' => 5000,
	'max_request_grace' => 500,
	'dispatch_mode' => 2,  // 重要:使用抢占模式确保连接状态一致性
],

'swoole_tables' => [
	'wsTable' => [
		'size'   => 102400,
		'column' => [
			['name' => 'value', 'type' => \Swoole\Table::TYPE_STRING, 'size' => 1024],
			['name' => 'expiry', 'type' => \Swoole\Table::TYPE_INT, 'size' => 4],
		],
	],
],

环境变量

.env 文件中添加:

LARAVELS_WEBSOCKET=true

重要配置说明:

dispatch_mode 模式详解

模式 说明 适用场景
轮询模式 1 按顺序依次分配请求到 Worker 需要平均分配负载
抢占模式 2 固定 Worker 处理特定连接 推荐:保持连接状态一致性
抢占模式 3 随机分配请求到 Worker 会导致状态不一致

dispatch_mode = 3 会导致的问题:

  • 请求被随机分配到不同的 Worker 进程
  • 同一用户的连接和消息发送可能被分配到不同 Worker
  • wsTable 中的用户连接数据和消息发送操作无法正确匹配
  • 导致消息发送失败、通知无法接收

dispatch_mode = 2 的优势:

  • 确保同一用户的请求始终由同一个 Worker 处理
  • 连接状态保持一致
  • 消息发送可靠

前端配置

WebSocket 配置

resources/admin/src/utils/websocket.js 中:

const WS_URL = 'ws://localhost:5200'  // WebSocket 服务器地址
const RECONNECT_INTERVAL = 5000      // 重连间隔(毫秒)
const MAX_RECONNECT_ATTEMPTS = 5      // 最大重连次数
const HEARTBEAT_INTERVAL = 30000      // 心跳间隔(毫秒)

通知配置

resources/admin/src/stores/modules/notification.js 中:

const pageSize = 20                    // 每页数量
const maxLocalNotifications = 100      // 本地最大存储数量

Nginx 配置示例

server {
	listen 80;
	server_name yourdomain.com;
	root /path/to/your/project/public;

	location / {
		try_files $uri $uri/ /index.php?$query_string;
	}

	# WebSocket 代理配置
	location /ws {
		proxy_pass http://127.0.0.1:5200;
		proxy_http_version 1.1;
		proxy_set_header Upgrade $http_upgrade;
		proxy_set_header Connection "upgrade";
		proxy_set_header X-Real-IP $remote_addr;
		proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
		proxy_set_header Host $host;
		proxy_read_timeout 86400;
	}

	location ~ \.php$ {
		fastcgi_pass 127.0.0.1:9000;
		fastcgi_index index.php;
		fastcgi_param SCRIPT_FILENAME $document_root$fastcgi_script_name;
		include fastcgi_params;
	}
}

定时任务配置

建议配置以下定时任务:

// config/crontab.php

// 每5分钟重试一次未发送的通知
*/5 * * * * php /path/to/artisan notifications:retry-unsent --limit=50

// 每天凌晨清理30天前的已读通知
0 0 * * * php /path/to/artisan notifications:cleanup

使用示例

后端使用示例

1. 发送通知给单个用户

use App\Services\System\NotificationService;
use App\Models\System\Notification;

class YourService
{
	protected $notificationService;

	public function __construct(NotificationService $notificationService)
	{
		$this->notificationService = $notificationService;
	}

	public function someMethod()
	{
		// 发送通知给单个用户
		$result = $this->notificationService->sendToUser(
			$userId = 1,
			$title = '欢迎加入',
			$content = '欢迎加入我们的系统!',
			$type = Notification::TYPE_SUCCESS,
			$category = Notification::CATEGORY_MESSAGE,
			$extraData = ['welcome' => true]
		);
	}
}

2. 发送通知给多个用户

// 发送通知给多个用户
$result = $this->notificationService->sendToUsers(
	$userIds = [1, 2, 3],
	$title = '系统维护通知',
	$content = '系统将于今晚进行维护',
	$type = Notification::TYPE_WARNING,
	$category = Notification::CATEGORY_ANNOUNCEMENT
);

3. 广播通知

// 广播通知(所有用户)
$result = $this->notificationService->broadcast(
	$title = '新功能上线',
	$content = '我们推出了新的功能,快来体验吧!',
	$type = Notification::TYPE_INFO,
	$category = Notification::CATEGORY_ANNOUNCEMENT
);

4. 发送任务通知

// 发送任务通知
$result = $this->notificationService->sendTaskNotification(
	$userId = 1,
	$title = '任务提醒',
	$content = '您有一个任务即将到期',
	$taskData = [
		'task_id' => 123,
		'task_name' => '完成报告',
		'due_date' => '2024-02-20'
	]
);

5. 使用 WebSocket Service

use App\Services\WebSocket\WebSocketService;

$wsService = app(WebSocketService::class);

// 发送给单个用户
$wsService->sendToUser($userId, [
	'type' => 'notification',
	'data' => [
		'title' => '系统通知',
		'content' => '这是一条通知',
		'type' => 'info',
		'timestamp' => time()
	]
]);

// 发送给多个用户
$userIds = [1, 2, 3];
$sentTo = $wsService->sendToUsers($userIds, $data);

// 广播给所有用户
$count = $wsService->broadcast($data);

// 广播给所有用户(排除指定用户)
$count = $wsService->broadcast($data, $excludeUserId);

// 发送到频道
$count = $wsService->sendToChannel('orders', [
	'type' => 'data_update',
	'data' => [
		'order_id' => 123,
		'status' => 'paid'
	]
]);

// 推送数据更新
$sentTo = $wsService->pushDataUpdate(
	$userIds,
	'dictionary',
	'update',
	['id' => 1, 'name' => 'test']
);

// 检查用户是否在线
$isOnline = $wsService->isUserOnline($userId);

// 获取在线用户数量
$count = $wsService->getOnlineUserCount();

// 获取在线用户ID列表
$userIds = $wsService->getOnlineUserIds();

// 断开用户连接
$wsService->disconnectUser($userId);

前端使用示例

1. 基本连接App.vue 中)

<script setup>
import { useWebSocket } from '@/composables/useWebSocket'
import { useUserStore } from '@/stores/modules/user'

const userStore = useUserStore()
const { initWebSocket, closeWebSocket } = useWebSocket()

// 只在 App.vue 中初始化 WebSocket
onMounted(() => {
  if (userStore.isUserInfoComplete()) {
	initWebSocket()
  }
})

// 监听用户状态变化
watch(
  () => [userStore.token, userStore.userInfo],
  () => {
	if (userStore.isUserInfoComplete()) {
	  initWebSocket()
	} else if (!userStore.isLoggedIn()) {
	  closeWebSocket()
	}
  },
  { deep: true }
)
</script>

2. 监听特定消息类型

// 在 useWebSocket 中注册消息处理器
ws.on('notification', (data) => {
  message.success(data.title, data.message)
})

ws.on('data_update', (data) => {
  console.log('数据更新:', data.resource_type, data.action)
  // 刷新数据
  loadData()
})

ws.on('chat', (data) => {
  console.log('收到聊天消息:', data)
})

3. 发送消息

import { useWebSocket } from '@/composables/useWebSocket'

const { send } = useWebSocket()

// 发送心跳
send('heartbeat', { timestamp: Date.now() })

// 发送私聊消息
send('chat', {
  to_user_id: 2,
  content: '你好,这是一条私聊消息'
})

// 订阅频道
send('subscribe', { channel: 'orders' })

// 取消订阅
send('unsubscribe', { channel: 'orders' })

// 发送广播消息
send('broadcast', {
  message: '这是一条广播消息'
})

4. 使用通知 Store

import { useNotificationStore } from '@/stores/modules/notification'

const notificationStore = useNotificationStore()

// 获取未读数量
await notificationStore.fetchUnreadCount()

// 获取未读通知列表
const unreadList = await notificationStore.fetchUnreadNotifications({
  page: 1,
  page_size: 10
})

// 获取通知列表
const list = await notificationStore.fetchNotifications({
  page: 1,
  page_size: 20,
  type: 'notification',
  category: 'system'
})

// 标记为已读
await notificationStore.markAsRead(notificationId)

// 批量标记为已读
await notificationStore.markMultipleAsRead([1, 2, 3])

// 标记所有为已读
await notificationStore.markAllAsRead()

// 删除通知
await notificationStore.deleteNotification(notificationId)

// 批量删除
await notificationStore.deleteMultipleNotifications([1, 2, 3])

// 清空已读通知
await notificationStore.clearReadNotifications()

// 获取统计信息
const stats = await notificationStore.fetchStatistics()
console.log('统计信息:', stats)

5. 在 Vue 组件中使用

<template>
  <div>
	<a-button @click="sendMessage">发送消息</a-button>
	<div>连接状态: {{ connectionStatus }}</div>
  </div>
</template>

<script setup>
import { ref, onMounted, onUnmounted } from 'vue'
import { useWebSocket } from '@/composables/useWebSocket'
import { useNotificationStore } from '@/stores/modules/notification'

const notificationStore = useNotificationStore()
const { send, isConnected } = useWebSocket()
const connectionStatus = ref('未连接')

// 监听未读数量
watch(() => notificationStore.unreadCount, (newCount) => {
  console.log('未读数量更新:', newCount)
})

const sendMessage = () => {
  if (isConnected.value) {
	send('chat', {
	  to_user_id: 2,
	  content: '测试消息'
	})
  } else {
	message.warning('WebSocket 未连接')
  }
}

onMounted(() => {
  // ✅ 注意:不要在这里调用 initWebSocket()
  // WebSocket 已在 App.vue 中统一初始化
})

onUnmounted(() => {
  // 清理逻辑
})
</script>

连接问题修复

问题描述

在开发过程中发现 WebSocket 连接存在重复创建的问题,导致:

  1. 每个组件页面都创建新的 WebSocket 连接
  2. 用户登录后可能同时存在多个 WebSocket 连接
  3. 消息接收混乱,状态不一致

根本原因

经过代码分析,发现以下三个地方都创建了 WebSocket 连接:

  1. App.vue主要连接点:正确,应该在此处统一初始化
  2. userbar.vue重复连接:错误,不应该重复初始化
  3. notifications/index.vue重复连接:错误,不应该重复初始化

解决方案

原则

  1. 单一连接点:只在 App.vue 中统一初始化和关闭 WebSocket 连接
  2. 全局状态管理:通过 useWebSocket composable 管理单例 WebSocket 实例
  3. 事件驱动:组件通过监听 store 中的事件来处理 WebSocket 消息
  4. 连接时授权:通过 URL 参数传递认证信息,无需单独发送 auth 事件

修复内容

1. websocket.js单例模式

使用单例模式确保全局只有一个 WebSocket 实例:

let wsClient = null
let currentUserId = null
let currentToken = null
let pendingConnection = null

export function getWebSocket(userId, token, options = {}) {
  // 检查是否已有相同用户的连接
  if (wsClient) {
	if (currentUserId === userId && currentToken === token) {
	  const state = wsClient.getConnectionState()
	  
	  // 如果已连接或正在连接,返回现有实例
	  if (state === 'OPEN' || state === 'CONNECTING') {
		return wsClient
	  }
	  
	  // 如果连接已关闭,清理
	  if (state === 'CLOSED' || state === 'CLOSING') {
		wsClient = null
	  }
	}
	
	// 不同用户/Token断开旧连接
	if (currentUserId !== userId || currentToken !== token) {
	  if (wsClient) {
		wsClient.disconnect()
	  }
	  wsClient = null
	}
  }
  
  // 创建新连接
  currentUserId = userId
  currentToken = token
  wsClient = createWebSocket(userId, token, options)
  
  return wsClient
}
2. useWebSocket.js统一初始化

添加防止重复初始化的机制:

let wsInstance = null
let isInitialized = false

export function useWebSocket() {
  const initWebSocket = () => {
	// 防止重复初始化
	if (isInitialized && wsInstance && wsInstance.isConnected) {
	  console.log('WebSocket 已连接,跳过重复初始化')
	  return
	}
	
	isInitialized = true
	wsInstance = getWebSocket(userStore.userInfo.id, userStore.token, {
	  onOpen: handleOpen,
	  onMessage: handleMessage,
	  onError: handleError,
	  onClose: handleClose
	})
	
	// 注册消息处理器
	registerMessageHandlers()
	
	wsInstance.connect()
  }
  
  return {
	ws: wsInstance,
	initWebSocket,
	closeWebSocket,
	reconnect,
	isConnected,
	send
  }
}
3. 移除重复连接

userbar.vue

<script setup>
// ❌ 移除import { useWebSocket } from '@/composables/useWebSocket'

onMounted(() => {
  loadNotifications()
  
  // ✅ 注意WebSocket 已在 App.vue 中统一初始化
  // 只需要处理消息即可,消息处理已经在 useWebSocket 中注册
})
</script>

notifications/index.vue

<script setup>
// ❌ 移除import { useWebSocket } from '@/composables/useWebSocket'

onMounted(() => {
  loadUnreadCount()
  
  // ✅ 注意WebSocket 已在 App.vue 中统一初始化
  // 通知会通过 notification store 自动更新
})
</script>

WebSocket 授权机制优化

优化前

// 旧方式:前端在 handleOpen 中发送 auth 事件
function handleOpen(event) {
  console.log('WebSocket 连接已建立', event)
  
  // ❌ 需要单独发送 auth 事件
  if (ws.value) {
	ws.value.send('auth', {
	  token: userStore.token,
	  user_id: userStore.userInfo.id
	})
  }
}

优化后

// 新方式:连接时通过 URL 参数认证
// URL: ws://host/ws?user_id=1&token=xxxxx
function handleOpen(event) {
  console.log('WebSocket 连接已建立(已通过 URL 参数完成认证)', event)
  // ✅ 不需要发送 auth 事件
}

优势

性能优化:减少 1 次消息往返,连接建立后立即可用 安全性增强:握手阶段即进行认证,未认证的连接直接拒绝 代码简化:前端无需处理 auth 事件,后端逻辑更清晰

认证流程

前端发起连接
	│
	├─ ws://host/ws?user_id=1&token=xxxxx
	│
	▼
后端 onOpen
	│
	├─ 1. 提取 URL 参数user_id, token
	│
	├─ 2. 验证参数完整性
	│   └─ 缺失 → 返回错误,断开连接
	│
	├─ 3. 验证 JWT token
	│   ├─ 3.1 解析 token
	│   ├─ 3.2 验证用户 ID 匹配
	│   │   └─ 不匹配 → 返回错误,断开连接
	│   └─ 3.3 验证 token 是否过期
	│       └─ 已过期 → 返回错误,断开连接
	│
	├─ 4. 认证成功
	│   │
	│   ├─ 存储用户到 fd 映射
	│   ├─ 存储 fd 到用户映射
	│   └─ 发送 connected 消息
	│
	▼
连接建立完成,可以正常通信

架构说明

WebSocket 连接流程

┌─────────────────────────────────────────────────────────────┐
│                      App.vue                             │
│  ┌───────────────────────────────────────────────────┐   │
│  │  useWebSocket() 初始化                         │   │
│  │  - 获取用户信息                                │   │
│  │  - 创建 WebSocket 连接(单例)                  │   │
│  │  - 连接时通过 URL 参数授权                       │   │
│  │  - 注册消息处理器                              │   │
│  └──────────────────┬────────────────────────────┘   │
│                     │                                   │
│                     ▼                                   │
│  ┌───────────────────────────────────────────────────┐   │
│  │    WebSocket Client (单例)                    │   │
│  │  - 管理连接状态                                │   │
│  │  - 处理消息路由                                │   │
│  │  - 心跳检测和重连                              │   │
│  └──────────────────┬────────────────────────────┘   │
│                     │                                   │
│                     ▼                                   │
│  ┌───────────────────────────────────────────────────┐   │
│  │   Notification Store                            │   │
│  │  - 接收 WebSocket 消息                         │   │
│  │  - 更新通知状态                                │   │
│  │  - 触发 Vue 响应式更新                         │   │
│  └──────────────────┬────────────────────────────┘   │
│                     │                                   │
│                     ▼                                   │
│  ┌───────────────────────────────────────────────────┐   │
│  │  组件 (userbar, notifications等)                │   │
│  │  - 监听 store 状态变化                         │   │
│  │  - 响应消息更新                                │   │
│  └───────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────┘

消息处理流程

后端推送消息
	│
	▼
WebSocketClient.onMessage()
	│
	▼
消息类型路由 (switch/case)
	│
	├─► notification
	│      │
	│      ▼
	│  notificationStore.handleWebSocketMessage()
	│      │
	│      ├─► 更新未读数量
	│      ├─► 添加到通知列表
	│      └─► 触发通知提示
	│
	├─► data_update
	│      │
	│      ▼
	│  自定义处理函数
	│
	└─► heartbeat
		   │
		   ▼
	   响应心跳确认

最佳实践

1. WebSocket 连接管理

推荐做法:

  • 只在应用根组件App.vue中初始化 WebSocket
  • 使用单例模式确保全局只有一个实例
  • 通过状态管理Pinia Store分发消息

避免做法:

  • 在多个组件中创建 WebSocket 连接
  • 直接在子组件中调用 initWebSocket()
  • 绕过 Store 直接处理 WebSocket 消息

2. 组件中使用 WebSocket

推荐做法:

// 只需要监听 store 中的状态变化
import { useNotificationStore } from '@/stores/modules/notification'

const notificationStore = useNotificationStore()

// 监听未读数量变化
watch(() => notificationStore.unreadCount, (newCount) => {
  console.log('未读数量更新:', newCount)
})

// 监听通知列表变化
watch(() => notificationStore.notifications, (list) => {
  console.log('通知列表更新:', list)
})

避免做法:

// 不要在组件中直接使用 WebSocket
import { useWebSocket } from '@/composables/useWebSocket'

const { initWebSocket } = useWebSocket()
initWebSocket()  // ❌ 重复连接!

3. 消息处理器注册

推荐做法:

// 在 useWebSocket 中统一注册处理器
const initWebSocket = () => {
  wsInstance.on('notification', (data) => {
	notificationStore.handleWebSocketMessage({ type: 'notification', data })
  })
  
  wsInstance.on('data_update', (data) => {
	console.log('收到数据更新:', data)
  })
}

避免做法:

// 不要在每个组件中单独注册
ws.onMessage((msg) => {
  if (msg.type === 'notification') {
	// 处理通知
  }
})

故障排查与修复

常见问题与解决方案

问题 1WebSocket 连接失败

可能原因:

  • WebSocket 服务未启动
  • 端口被占用
  • 防火墙阻止连接
  • URL 配置错误

排查步骤:

# 1. 检查 Laravel-S 是否运行
php bin/laravels status

# 2. 启动 Laravel-S
php bin/laravels start

# 3. 检查端口是否被占用
netstat -ano | findstr :5200

# 4. 检查防火墙设置
# Windows: 控制面板 -> 系统和安全 -> Windows 防火墙 -> 允许应用通过防火墙
# Linux: sudo ufw allow 5200

问题 2登录后 WebSocket 未连接

可能原因:

  • 用户信息未加载完成
  • token 无效

解决方法:

// 检查控制台日志
// 确认 userStore.isUserInfoComplete() 返回 true
// 查看 getWebSocket 调用参数

// 手动测试连接
import { useUserStore } from '@/stores/modules/user'

const userStore = useUserStore()

console.log('User Info:', userStore.userInfo)
console.log('Token:', userStore.token)
console.log('Is Complete:', userStore.isUserInfoComplete())

问题 3消息未收到

可能原因:

  • 消息处理器未注册
  • 消息类型不匹配
  • 网络问题
  • dispatch_mode 配置错误

排查步骤:

# 1. 检查 dispatch_mode 配置
cat config/laravels.php | grep dispatch_mode
# 应该是 dispatch_mode = 2

# 2. 完全重启 Laravel-S
php bin/laravels stop && php bin/laravels start

# 3. 检查消息处理器
# 查看 useWebSocket.js 中的消息处理器注册

# 4. 查看网络面板 WebSocket 帧
# 浏览器开发者工具 -> Network -> WS

# 5. 查看日志
tail -f storage/logs/swoole.log
tail -f storage/logs/laravel.log

问题 4消息发送失败

可能原因:

  • wsTable 访问方式错误
  • 用户不在线
  • dispatch_mode 配置错误

错误信息:

TypeError: Access to undefined property Swoole\WebSocket\Server::$wsTable

正确修复方法:

确保通过 app('swoole')->wsTable 而不是 $server->wsTable 访问。

// ❌ 错误:$server 对象没有 wsTable 属性
$server->wsTable->set('uid:' . $userId, [...]);
$fdInfo = $server->wsTable->get('fd:' . $fd);

// ✅ 正确:通过服务容器获取 wsTable
$wsTable = app('swoole')->wsTable;
$wsTable->set('uid:' . $userId, [...]);
$fdInfo = $wsTable->get('fd:' . $fd);

问题 5dispatch_mode 配置不生效

检查方法:

# 1. 确认配置文件
cat config/laravels.php | grep dispatch_mode

# 2. 完全重启 Laravel-S
php bin/laravels stop && php bin/laravels start

# 3. 检查运行时配置
php bin/laravels config

正确配置:

'swoole' => [
	'dispatch_mode' => 2,  // ✅ 正确:抢占模式
	'worker_num' => 4,
],

问题 6通知未收到

检查项:

  • 用户是否在线
  • WebSocket 连接是否正常
  • 数据库中是否有通知记录
  • sent_via_websocket 字段是否为 true

排查步骤:

# 1. 检查用户在线状态
php bin/laravels
# 在控制台中执行:
$wsService = app(App\Services\WebSocket\WebSocketService::class);
$wsService->isUserOnline(1);

# 2. 检查通知记录
php artisan tinker
>>> $notifications = App\Models\System\Notification::where('user_id', 1)->get();
>>> $notifications->each(fn($n) => echo "ID: {$n->id}, Sent via WS: {$n->sent_via_websocket}\n");

# 3. 检查日志
tail -f storage/logs/swoole.log | grep "notification"

问题 7通知重复发送

检查项:

  • 是否有多个任务在重试
  • retry_count 是否超过限制
  • 是否有重复的创建逻辑

解决方法:

// 检查重试逻辑
// 通知最多重试3次超过后不再重试

// 检查是否有重复的发送逻辑
// 确保不会多次调用 sendNotification

问题 8WebSocket 认证失败

可能原因:

  • Token 无效或已过期
  • 用户 ID 不匹配
  • 缺少认证参数

排查步骤:

检查前端控制台日志:

// 查看连接 URL 是否正确
console.log('WebSocket URL:', ws.url)

// 查看收到的错误消息
// 检查错误类型和错误信息

检查后端日志:

# 查看认证失败日志
tail -f storage/logs/laravel.log | grep "WebSocket 认证失败"

解决方法:

  • 确保 token 有效且未过期
  • 确保 URL 参数中的 user_id 与 token 中的 sub 匹配
  • 重新登录获取新的 token

Laravel-S wsTable 使用规范

正确的 wsTable 访问方式

根据 Laravel-S 文档和源码,正确访问 wsTable 的方式有两种:

方式 1在 WebSocketHandler 中通过构造函数获取(推荐)

class WebSocketHandler implements WebSocketHandlerInterface
{
	protected $wsTable;

	public function __construct()
	{
		$this->wsTable = app('swoole')->wsTable;
	}

	public function onOpen(Server $server, Request $request): void
	{
		// 直接使用 $this->wsTable
		$this->wsTable->set('uid:' . $userId, [...]);
	}
}

方式 2在普通 Service 中通过服务容器获取

class WebSocketService
{
	public function sendToUser(int $userId, array $data): bool
	{
		$server = $this->getServer();
		
		// 每次都通过服务容器获取最新的 wsTable
		$wsTable = app('swoole')->wsTable;
		
		$fdInfo = $wsTable->get('uid:' . $userId);
		// ...
	}
}

错误的访问方式(禁止使用)

// ❌ 错误:$server 对象没有 wsTable 属性
$server->wsTable->set('uid:' . $userId, [...]);
$fdInfo = $server->wsTable->get('fd:' . $fd);

原因:

  • $server 是 Swoole\WebSocket\Server 对象
  • 该对象没有 wsTable 属性
  • wsTable 是通过 Laravel-S 扩展动态添加到 Swoole Server 的
  • Laravel-S 通过服务容器管理 wsTable应该通过 app('swoole')->wsTable 访问

Swoole Table 数据结构

wsTable 用于存储 WebSocket 连接映射关系:

'ws' => [
	'size' => 102400,  // 表最大行数
	'columns' => [
		'value' => ['type' => \Swoole\Table::TYPE_STRING, 'size' => 256],  // 值
		'expiry' => ['type' => \Swoole\Table::TYPE_INT, 'size' => 4],  // 过期时间
	]
]

存储的键值对:

  • uid:{userId}['value' => {fd}, 'expiry' => timestamp] - 用户 ID 到文件描述符的映射
  • fd:{fd}['value' => {userId}, 'expiry' => timestamp] - 文件描述符到用户 ID 的映射
  • channel:{channel}:fd:{fd}['value' => 1, 'expiry' => timestamp] - 频道订阅关系

多 Worker 进程注意事项

当使用多个 Worker 进程时worker_num > 1

  1. 进程隔离:每个 Worker 有独立的内存空间
  2. 状态同步:使用 Swoole Table 实现跨进程数据共享
  3. 连接一致性:同一用户的连接必须由同一 Worker 处理
  4. 消息路由dispatch_mode = 2 确保连接和消息在同一 Worker

性能优化

后端优化

1. 数据库优化

// 为常用查询字段添加索引
Schema::table('system_notifications', function (Blueprint $table) {
	$table->index('user_id');
	$table->index('is_read');
	$table->index('type');
	$table->index('category');
	$table->index(['user_id', 'is_read']);
	$table->index(['user_id', 'created_at']);
});

2. 批量操作

// 使用批量插入减少查询次数
$notifications = [];
foreach ($userIds as $userId) {
	$notifications[] = [
		'user_id' => $userId,
		'title' => $title,
		'content' => $content,
		'type' => $type,
		'category' => $category,
		'created_at' => now(),
		'updated_at' => now(),
	];
}

Notification::insert($notifications);

3. 连接池管理

// 定期清理过期连接
public function cleanExpiredConnections(): void
{
	$server = $this->getServer();
	$wsTable = app('swoole')->wsTable;
	$currentTime = time();
	
	foreach ($wsTable as $key => $row) {
		if (strpos($key, 'uid:') === 0) {
			$fd = $row['value'];
			
			if (!$server->isEstablished($fd)) {
				// 清理无效的连接
				$wsTable->del($key);
				$wsTable->del('fd:' . $fd);
			}
		}
	}
}

4. 消息队列

对于大量消息发送场景,建议使用队列异步处理:

use Illuminate\Support\Facades\Queue;

// 异步发送通知
dispatch(function () use ($userIds, $message) {
	$webSocketService = new WebSocketService();
	$webSocketService->sendNotificationToUsers($userIds, $title, $message);
})->onQueue('websocket');

5. 缓存优化

// 使用 Redis 缓存未读数量
use Illuminate\Support\Facades\Cache;

public function getUnreadCount(int $userId): int
{
	$cacheKey = "unread_count:{$userId}";
	
	return Cache::remember($cacheKey, 300, function() use ($userId) {
		return Notification::where('user_id', $userId)
			->where('is_read', false)
			->count();
	});
}

// 通知标记为已读时清除缓存
public function markAsRead(int $notificationId): bool
{
	$notification = Notification::find($notificationId);
	$notification->markAsRead();
	
	// 清除缓存
	Cache::forget("unread_count:{$notification->user_id}");
	
	return true;
}

前端优化

1. 消息限制

// 限制本地存储数量
const maxLocalNotifications = 100

// 添加消息时检查数量
function addMessage(message) {
  if (messages.value.length >= maxLocalNotifications) {
	messages.value.pop() // 删除最旧的消息
  }
  messages.value.unshift(message)
}

2. 分页加载

// 消息列表使用分页,避免一次性加载过多数据
const currentPage = ref(1)
const pageSize = ref(20)

async function loadNotifications() {
  const response = await api.notifications.get({
	page: currentPage.value,
	page_size: pageSize.value
  })
  // ...
}

3. 虚拟滚动

对于大量消息列表,使用虚拟滚动提升性能:

<template>
  <a-virtual-list
	:data-sources="messages"
	:data-key="'id'"
	:keeps="30"
	:item-size="60"
  >
	<template #default="{ data }">
	  <NotificationItem :notification="data" />
	</template>
  </a-virtual-list>
</template>

4. 防抖和节流

// 搜索输入防抖
import { debounce } from 'lodash-es'

const handleSearch = debounce((keyword) => {
  fetchNotifications({ keyword })
}, 300)

// 滚动加载节流
import { throttle } from 'lodash-es'

const handleScroll = throttle(() => {
  if (isNearBottom()) {
	loadMore()
  }
}, 500)

安全考虑

后端安全

1. 连接认证

public function onOpen(Server $server, Request $request): void
{
	$userId = $request->get['user_id'] ?? null;
	$token = $request->get['token'] ?? null;
	
	// ✅ 验证 token
	if (!$token || !$this->validateToken($userId, $token)) {
		$server->push($request->fd, json_encode([
			'type' => 'error',
			'data' => ['message' => 'Authentication failed']
		]));
		$server->disconnect($request->fd);
		return;
	}
	
	// 认证成功,存储连接
	$this->wsTable->set('uid:' . $userId, [
		'value' => $request->fd,
		'expiry' => time() + 3600
	]);
}

2. 消息验证

public function onMessage(Server $server, Frame $frame): void
{
	$message = json_decode($frame->data, true);
	
	// ✅ 验证消息格式
	if (!$message || !isset($message['type'])) {
		$server->push($frame->fd, json_encode([
			'type' => 'error',
			'data' => ['message' => 'Invalid message format']
		]));
		return;
	}
	
	// 处理消息
	// ...
}

3. 速率限制

// 防止消息滥用
private $messageRateLimits = [];

public function checkRateLimit(int $fd): bool
{
	$key = 'fd:' . $fd;
	$now = time();
	
	if (!isset($this->messageRateLimits[$key])) {
		$this->messageRateLimits[$key] = [];
	}
	
	// 清理旧记录
	$this->messageRateLimits[$key] = array_filter(
		$this->messageRateLimits[$key],
		fn($time) => $time > $now - 60
	);
	
	// 检查频率(每分钟最多 60 条)
	if (count($this->messageRateLimits[$key]) >= 60) {
		return false;
	}
	
	$this->messageRateLimits[$key][] = $now;
	return true;
}

4. 权限控制

// 确保用户只能查看和操作自己的通知
public function index(Request $request)
{
	$userId = $request->user()->id;
	
	$notifications = Notification::where('user_id', $userId)
		->where(function($query) use ($request) {
			// 应用搜索和筛选条件
			if ($request->has('keyword')) {
				$query->where('title', 'like', '%' . $request->keyword . '%');
			}
			// ...
		})
		->paginate($request->page_size ?? 20);
	
	return response()->json($notifications);
}

5. 数据安全

  • 敏感信息不要放在通知内容中
  • 使用参数验证和过滤
  • SQL 注入防护(使用 Eloquent ORM
  • XSS 防护(使用 htmlspecialchars 或类似函数)

前端安全

1. Token 管理

// 不要在前端硬编码 token
// 从 store 中动态获取
import { useUserStore } from '@/stores/modules/user'

const userStore = useUserStore()

const ws = getWebSocket(userStore.userInfo.id, userStore.token)

2. 消息过滤

// 处理接收到的消息时,进行过滤和验证
function handleMessage(message) {
  // 验证消息格式
  if (!message || !message.type || !message.data) {
	console.warn('Invalid message format:', message)
	return
  }
  
  // 过滤敏感内容
  if (containsSensitiveContent(message.data)) {
	console.warn('Message contains sensitive content')
	return
  }
  
  // 处理消息
  // ...
}

3. 连接限制

// 限制每个用户的连接数量
const MAX_CONNECTIONS_PER_USER = 3

function canConnect(userId) {
  const existingConnections = getConnectionsByUser(userId)
  return existingConnections.length < MAX_CONNECTIONS_PER_USER
}

最佳实践

1. 通知类型选择

  • info: 一般信息,如欢迎消息、功能更新
  • success: 成功操作,如创建成功、导入成功
  • warning: 警告信息,如即将过期、维护通知
  • error: 错误信息,如执行失败、验证错误
  • task: 任务相关,如任务提醒、执行结果
  • system: 系统级,如系统维护、重要公告

2. 通知分类选择

  • system: 系统管理、配置变更
  • task: 定时任务、后台任务
  • message: 用户消息、聊天消息
  • reminder: 日程提醒、待办事项
  • announcement: 公告、通知

3. 避免通知轰炸

  • 合理设置通知频率
  • 对相似通知进行合并
  • 提供通知偏好设置
  • 允许用户关闭特定类型的通知

4. 异步处理

// 大量通知建议使用队列异步处理
use App\Jobs\SendNotificationJob;

// 发送大量通知
dispatch(new SendNotificationJob(
	$userIds,
	$title,
	$content,
	$type,
	$category
))->onQueue('notifications');

5. 错误处理

// 前端错误处理
async function markAsRead(notificationId) {
  try {
	await api.notifications.read(notificationId)
	// 更新本地状态
  } catch (error) {
	console.error('标记已读失败:', error)
	// 显示错误提示
	message.error('标记已读失败,请重试')
  }
}

6. 日志记录

// 后端日志记录
use Illuminate\Support\Facades\Log;

public function sendToUser(int $userId, array $data): bool
{
	Log::info('Sending notification to user', [
		'user_id' => $userId,
		'type' => $data['type']
	]);
	
	// ... 发送逻辑
	
	Log::info('Notification sent successfully', [
		'user_id' => $userId,
		'notification_id' => $notification->id
	]);
	
	return true;
}

7. 测试

// 单元测试示例
public function test_send_notification()
{
	$result = $this->service->sendNotification([
		'user_id' => 1,
		'title' => 'Test',
		'content' => 'Test content'
	]);
	
	$this->assertTrue($result['success']);
	$this->assertDatabaseHas('system_notifications', [
		'title' => 'Test'
	]);
}

更新日志

2024-02-18

  • 初始版本发布
  • 实现基础 WebSocket 功能
  • 实现通知系统功能
  • 实现消息推送功能
  • 实现频道订阅功能
  • 实现前端客户端封装
  • 实现管理 API 接口
  • 修复 dispatch_mode 配置问题
  • 修复 wsTable 访问问题
  • 添加定时任务重试功能
  • 修复 WebSocket 重复连接问题
  • 实现单例模式管理连接
  • 优化授权机制(连接时自动认证)
  • 合并文档,提供完整的使用指南

参考资料


文档版本: v3.0
更新日期: 2024-02-18
维护者: Development Team