64 KiB
WebSocket 与通知系统完整文档
目录
概述
本项目基于 Laravel-S 和 Swoole 实现了完整的 WebSocket 与通知系统,支持实时双向通信、消息推送、通知管理等功能。系统采用 WebSocket 实现实时推送,同时将通知持久化到数据库,确保离线用户也能收到通知。
核心特性
- ✅ 实时双向通信(WebSocket)
- ✅ 用户连接管理(单例模式,避免重复连接)
- ✅ 点对点消息发送
- ✅ 群发消息/广播
- ✅ 频道订阅/取消订阅
- ✅ 心跳机制与自动重连
- ✅ 在线状态管理
- ✅ 系统通知推送
- ✅ 数据更新推送
- ✅ 通知持久化存储
- ✅ 已读/未读状态管理
- ✅ 批量操作支持
- ✅ 连接时自动授权(无需单独发送 auth 事件)
功能特性
WebSocket 功能
| 功能 | 说明 | 状态 |
|---|---|---|
| 自动连接管理 | 登录后自动建立连接,退出时自动关闭 | ✅ |
| 单例连接模式 | 全局只有一个 WebSocket 实例,避免重复连接 | ✅ |
| 连接时授权 | 通过 URL 参数传递 token,握手时即完成认证 | ✅ |
| 断线重连 | 连接断开自动重连(最多5次) | ✅ |
| 心跳机制 | 客户端每30秒发送心跳保持连接 | ✅ |
| 用户认证 | JWT token 验证,验证用户 ID 匹配和过期时间 | ✅ |
| 点对点消息 | 发送消息给指定用户 | ✅ |
| 广播消息 | 向所有在线用户发送消息 | ✅ |
| 频道订阅 | 支持频道订阅和取消订阅 | ✅ |
| 在线状态 | 实时获取用户在线状态 | ✅ |
通知功能
| 功能 | 说明 | 状态 |
|---|---|---|
| 通知发送 | 支持单个/批量/广播发送 | ✅ |
| 通知类型 | info/success/warning/error/task/system | ✅ |
| 通知分类 | system/task/message/reminder/announcement | ✅ |
| 实时推送 | 在线用户通过 WebSocket 实时推送 | ✅ |
| 持久化存储 | 所有通知保存到数据库 | ✅ |
| 已读管理 | 支持标记已读(单个/批量/全部) | ✅ |
| 通知删除 | 支持删除(单个/批量/清空) | ✅ |
| 重试机制 | 发送失败自动重试(最多3次) | ✅ |
| 统计分析 | 提供通知统计数据 | ✅ |
技术架构
后端架构
┌─────────────────────────────────────────┐
│ Laravel + Laravel-S │
├─────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────┐ │
│ │ WebSocketHandler │ │
│ │ (WebSocket 事件处理) │ │
│ │ - onOpen: 连接时授权 │ │
│ │ - onMessage: 消息路由 │ │
│ │ - onClose: 清理连接 │ │
│ └──────────┬───────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ WebSocketService │ │
│ │ (WebSocket 管理) │ │
│ └──────────┬───────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ NotificationService │ │
│ │ (通知服务) │ │
│ └──────────┬───────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ Notification Model │ │
│ │ (通知模型) │ │
│ └──────────┬───────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ Swoole Server │ │
│ │ (WebSocket 服务器) │ │
│ └──────────┬───────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ MySQL Database │ │
│ │ (数据持久化) │ │
│ └──────────────────────────────┘ │
│ │
└─────────────────────────────────────────┘
前端架构
┌─────────────────────────────────────────┐
│ Vue 3 + Vite │
├─────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────┐ │
│ │ App.vue (统一连接点) │ │
│ │ - 初始化 WebSocket │ │
│ │ - 监听用户状态 │ │
│ └──────────┬───────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ useWebSocket Hook │ │
│ │ (单例管理 + 消息路由) │ │
│ └──────────┬───────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ WebSocket Client │ │
│ │ (单例 + 重连 + 心跳) │ │
│ └──────────┬───────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ Notification Store │ │
│ │ (Pinia 状态管理) │ │
│ └──────────┬───────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ Userbar/Notification Page │ │
│ │ (展示 + 交互) │ │
│ └──────────────────────────────┘ │
│ │
└─────────────────────────────────────────┘
后端实现
1. 数据模型
system_notifications 表
| 字段 | 类型 | 说明 |
|---|---|---|
| id | bigint | 主键ID |
| user_id | bigint | 接收通知的用户ID |
| title | varchar(255) | 通知标题 |
| content | text | 通知内容 |
| type | varchar(50) | 通知类型 |
| category | varchar(50) | 通知分类 |
| data | json | 附加数据 |
| action_type | varchar(50) | 操作类型 |
| action_data | text | 操作数据 |
| is_read | boolean | 是否已读 |
| read_at | timestamp | 阅读时间 |
| sent_via_websocket | boolean | 是否已通过WebSocket发送 |
| sent_at | timestamp | 发送时间 |
| retry_count | int | 重试次数 |
| created_at | timestamp | 创建时间 |
| updated_at | timestamp | 更新时间 |
| deleted_at | timestamp | 删除时间(软删除) |
2. 核心服务类
WebSocketHandler (app/Services/WebSocket/WebSocketHandler.php)
WebSocket 处理器,实现 Swoole 的 WebSocketHandlerInterface 接口。
主要方法:
onOpen(): 处理连接建立事件,连接时通过 URL 参数授权onMessage(): 处理消息接收事件onClose(): 处理连接关闭事件
支持的消息类型:
ping/pong: 心跳检测heartbeat: 心跳确认chat: 私聊消息broadcast: 广播消息subscribe/unsubscribe: 频道订阅/取消订阅
连接时授权流程:
public function onOpen(Server $server, Request $request): void
{
// 从 URL 查询参数获取认证信息
$userId = (int)($request->get['user_id'] ?? 0);
$token = $request->get['token'] ?? '';
// 用户认证
if (!$userId || !$token) {
$server->push($request->fd, json_encode([
'type' => 'error',
'data' => [
'message' => '认证失败:缺少 user_id 或 token',
'code' => 401
]
]));
$server->disconnect($request->fd);
return;
}
// 验证 JWT token
try {
$payload = JWTAuth::setToken($token)->getPayload();
// 验证 token 中的用户 ID 是否匹配
$tokenUserId = $payload['sub'] ?? null;
if ($tokenUserId != $userId) {
Log::warning('WebSocket 认证失败:用户 ID 不匹配', [
'fd' => $request->fd,
'token_user_id' => $tokenUserId,
'query_user_id' => $userId
]);
$server->push($request->fd, json_encode([
'type' => 'error',
'data' => [
'message' => '认证失败:用户 ID 不匹配',
'code' => 401
]
]));
$server->disconnect($request->fd);
return;
}
// 验证 token 是否过期
if (isset($payload['exp']) && $payload['exp'] < time()) {
Log::warning('WebSocket 认证失败:token 已过期', [
'fd' => $request->fd,
'user_id' => $userId,
'exp' => $payload['exp'],
'current_time' => time()
]);
$server->push($request->fd, json_encode([
'type' => 'error',
'data' => [
'message' => '认证失败:token 已过期',
'code' => 401
]
]));
$server->disconnect($request->fd);
return;
}
Log::info('WebSocket 认证成功', [
'fd' => $request->fd,
'user_id' => $userId
]);
} catch (\Exception $e) {
Log::warning('WebSocket 认证失败:无效的 token', [
'fd' => $request->fd,
'user_id' => $userId,
'error' => $e->getMessage()
]);
$server->push($request->fd, json_encode([
'type' => 'error',
'data' => [
'message' => '认证失败:无效的 token',
'code' => 401
]
]));
$server->disconnect($request->fd);
return;
}
// 认证成功,存储连接映射
$wsTable->set('uid:' . $userId, [
'value' => $request->fd,
'expiry' => time() + 3600
]);
$wsTable->set('fd:' . $request->fd, [
'value' => $userId,
'expiry' => time() + 3600
]);
// 发送欢迎消息
$server->push($request->fd, json_encode([
'type' => 'connected',
'data' => [
'message' => '欢迎连接到 LaravelS WebSocket',
'user_id' => $userId,
'fd' => $request->fd,
'timestamp' => time()
]
]));
}
WebSocketService (app/Services/WebSocket/WebSocketService.php)
WebSocket 服务类,提供便捷的 WebSocket 操作方法。
主要方法:
// 发送消息给指定用户
sendToUser(int $userId, array $data): bool
// 发送消息给多个用户
sendToUsers(array $userIds, array $data): array
// 广播消息给所有用户
broadcast(array $data, ?int $excludeUserId = null): int
// 发送消息到频道
sendToChannel(string $channel, array $data): int
// 获取在线用户数
getOnlineUserCount(): int
// 检查用户是否在线
isUserOnline(int $userId): bool
// 获取在线用户ID列表
getOnlineUserIds(): array
// 断开用户连接
disconnectUser(int $userId): bool
// 发送系统通知
sendSystemNotification(string $title, string $content, string $type): int
// 发送通知给指定用户
sendNotificationToUsers(array $userIds, string $title, string $content, string $type): int
// 推送数据更新
pushDataUpdate(array $userIds, string $resourceType, string $action, array $data): array
// 推送数据更新到频道
pushDataUpdateToChannel(string $channel, string $resourceType, string $action, array $data): int
NotificationService (app/Services/System/NotificationService.php)
通知服务类,负责通知的创建、发送和管理。
主要方法:
// 发送通知给单个用户
sendToUser(int $userId, string $title, string $content, string $type, string $category, ?array $extraData = null): array
// 发送通知给多个用户
sendToUsers(array $userIds, string $title, string $content, string $type, string $category, ?array $extraData = null): array
// 广播通知给所有用户
broadcast(string $title, string $content, string $type, string $category, ?array $extraData = null): array
// 发送任务通知
sendTaskNotification(int $userId, string $title, string $content, array $taskData): array
// 发送消息通知
sendNewMessageNotification(int $userId, string $title, string $content, array $messageData): array
// 发送提醒通知
sendReminderNotification(int $userId, string $title, string $content, array $reminderData): array
// 标记通知为已读
markAsRead(int $notificationId): bool
// 批量标记为已读
markMultipleAsRead(array $notificationIds): int
// 标记所有通知为已读
markAllAsRead(int $userId): int
// 删除通知
deleteNotification(int $notificationId): bool
// 批量删除通知
deleteMultipleNotifications(array $notificationIds): int
// 清空已读通知
clearReadNotifications(int $userId): int
// 重试未发送的通知
retryUnsentNotifications(int $limit = 100): int
// 获取通知统计
getStatistics(int $userId): array
3. 控制器
NotificationController (app/Http/Controllers/System/Admin/Notification.php)
后台管理通知控制器,提供完整的 CRUD 操作。
主要方法:
index(): 获取通知列表show(): 获取通知详情read(): 标记通知为已读batchRead(): 批量标记为已读readAll(): 标记所有通知为已读destroy(): 删除通知batchDelete(): 批量删除通知clearRead(): 清空已读通知unread(): 获取未读通知列表unreadCount(): 获取未读通知数量statistics(): 获取通知统计send(): 发送通知(管理员)retryUnsent(): 重试未发送的通知(管理员)
WebSocketController (app/Http/Controllers/System/WebSocket.php)
WebSocket API 控制器,提供 HTTP 接口用于管理 WebSocket 连接。
主要方法:
getOnlineCount(): 获取在线用户数getOnlineUsers(): 获取在线用户列表checkOnline(): 检查用户在线状态sendToUser(): 发送消息给指定用户sendToUsers(): 发送消息给多个用户broadcast(): 广播消息sendToChannel(): 发送消息到频道sendNotification(): 发送系统通知sendNotificationToUsers(): 发送通知给指定用户pushDataUpdate(): 推送数据更新pushDataUpdateChannel(): 推送数据更新到频道disconnectUser(): 断开用户连接
4. 定时任务
RetryUnsentNotifications (app/Console/Commands/RetryUnsentNotifications.php)
自动重试发送失败的通知。
使用方法:
# 重试最多100条未发送的通知
php artisan notifications:retry-unsent
# 重试最多50条未发送的通知
php artisan notifications:retry-unsent --limit=50
定时任务配置:
// config/crontab.php
// 每5分钟重试一次未发送的通知
*/5 * * * * php /path/to/artisan notifications:retry-unsent --limit=50
前端实现
1. WebSocket 客户端
WebSocketClient (resources/admin/src/utils/websocket.js)
WebSocket 客户端封装类,提供自动连接、重连、消息处理等功能。
主要功能:
- 单例模式,确保全局只有一个实例
- 自动连接和重连
- 心跳机制
- 消息类型路由
- 事件监听
- 连接状态管理
- 连接时通过 URL 参数授权
连接 URL 构建:
export function createWebSocket(userId, token, options = {}) {
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'
const host = options.wsUrl || window.location.host
// ✅ 认证信息通过 URL 参数传递
const url = `${protocol}//${host}/ws?user_id=${userId}&token=${token}`
return new WebSocketClient(url, options)
}
单例管理:
let wsClient = null
let currentUserId = null
let currentToken = null
let pendingConnection = null
export function getWebSocket(userId, token, options = {}) {
// 检查是否已有相同用户的连接
if (wsClient) {
if (currentUserId === userId && currentToken === token) {
const state = wsClient.getConnectionState()
// 如果已连接或正在连接,返回现有实例
if (state === 'OPEN' || state === 'CONNECTING') {
return wsClient
}
// 如果连接已关闭,清理
if (state === 'CLOSED' || state === 'CLOSING') {
wsClient = null
}
}
// 不同用户/Token,断开旧连接
if (currentUserId !== userId || currentToken !== token) {
if (wsClient) {
wsClient.disconnect()
}
wsClient = null
}
}
// 创建新连接
currentUserId = userId
currentToken = token
wsClient = createWebSocket(userId, token, options)
return wsClient
}
使用示例:
import { getWebSocket, closeWebSocket } from '@/utils/websocket'
import { useUserStore } from '@/stores/modules/user'
const userStore = useUserStore()
// 连接 WebSocket(连接时自动授权)
const ws = getWebSocket(userStore.userInfo.id, userStore.token, {
onOpen: (event) => {
console.log('WebSocket 已连接(已通过 URL 参数完成认证)')
},
onMessage: (message) => {
console.log('收到消息:', message)
},
onError: (error) => {
console.error('WebSocket 错误:', error)
},
onClose: (event) => {
console.log('WebSocket 已关闭')
}
})
// 连接
ws.connect()
// 发送消息
ws.send('heartbeat', { timestamp: Date.now() })
// 监听特定消息类型
ws.on('notification', (data) => {
message.success(data.title, data.message)
})
// 断开连接
ws.disconnect()
2. useWebSocket Hook
useWebSocket (resources/admin/src/composables/useWebSocket.js)
WebSocket 组合式函数,统一管理 WebSocket 连接和消息处理。
主要功能:
- 统一初始化 WebSocket(只在 App.vue 中调用)
- 防止重复初始化
- 自动注册消息处理器
- 监听用户状态变化
- 连接时自动授权(无需发送 auth 事件)
关键实现:
let wsInstance = null
let isInitialized = false
export function useWebSocket() {
const userStore = useUserStore()
const initWebSocket = () => {
// 防止重复初始化
if (isInitialized && wsInstance && wsInstance.isConnected) {
console.log('WebSocket 已连接,跳过重复初始化')
return
}
isInitialized = true
// 获取 WebSocket 实例(单例)
wsInstance = getWebSocket(userStore.userInfo.id, userStore.token, {
onOpen: handleOpen,
onMessage: handleMessage,
onError: handleError,
onClose: handleClose
})
// 注册消息处理器
registerMessageHandlers()
// 连接
wsInstance.connect()
}
// 连接打开(已通过 URL 参数完成认证)
function handleOpen(event) {
console.log('WebSocket 连接已建立(已通过 URL 参数完成认证)', event)
}
return {
ws: wsInstance,
initWebSocket,
closeWebSocket,
reconnect,
isConnected,
send
}
}
3. 通知 Store
Notification Store (resources/admin/src/stores/modules/notification.js)
通知状态管理,基于 Pinia 实现。
主要方法:
// 获取未读数量
await notificationStore.fetchUnreadCount()
// 获取未读通知列表
await notificationStore.fetchUnreadNotifications({
page: 1,
page_size: 10
})
// 获取通知列表
await notificationStore.fetchNotifications({
page: 1,
page_size: 20,
type: 'notification',
category: 'system'
})
// 标记为已读
await notificationStore.markAsRead(notificationId)
// 批量标记为已读
await notificationStore.markMultipleAsRead([1, 2, 3])
// 标记所有为已读
await notificationStore.markAllAsRead()
// 删除通知
await notificationStore.deleteNotification(notificationId)
// 批量删除
await notificationStore.deleteMultipleNotifications([1, 2, 3])
// 清空已读通知
await notificationStore.clearReadNotifications()
// 发送通知(管理员)
await notificationStore.sendNotification({
recipient_id: userId,
title: '通知标题',
content: '通知内容',
type: 'info',
category: 'system'
})
// 重试未发送的通知(管理员)
await notificationStore.retryUnsentNotifications(100)
// 获取统计信息
await notificationStore.fetchStatistics()
4. 用户栏通知组件
Userbar Component (resources/admin/src/layouts/components/userbar.vue)
顶部用户栏中的通知组件,提供:
- 通知铃铛图标
- 未读数量徽章
- 通知下拉列表
- 快捷操作(全部已读、清空)
- 通知分类筛选
- 点击标记已读
功能特性:
- 未读消息数量实时更新
- 支持按类型筛选(全部/通知/任务/警告)
- 悬浮显示删除按钮
- 分页浏览
- 本地缓存(localStorage)
5. 通知列表页面
Notification List Page (resources/admin/src/pages/system/notifications/index.vue)
通知管理页面,提供完整的通知管理功能。
功能特性:
- 通知列表展示
- 搜索和筛选
- 批量操作
- 通知详情查看
- 已读/未读状态切换
- 删除和清空
API接口
WebSocket 接口
1. 获取在线用户数
GET /admin/websocket/online-count
响应:
{
"code": 200,
"message": "success",
"data": {
"online_count": 10
}
}
2. 获取在线用户列表
GET /admin/websocket/online-users
响应:
{
"code": 200,
"message": "success",
"data": {
"user_ids": [1, 2, 3, 4, 5],
"count": 5
}
}
3. 检查用户在线状态
POST /admin/websocket/check-online
请求参数:
{
"user_id": 1
}
响应:
{
"code": 200,
"message": "success",
"data": {
"user_id": 1,
"is_online": true
}
}
4. 发送消息给指定用户
POST /admin/websocket/send-to-user
请求参数:
{
"user_id": 1,
"type": "notification",
"data": {
"title": "新消息",
"message": "您有一条新消息"
}
}
5. 发送消息给多个用户
POST /admin/websocket/send-to-users
请求参数:
{
"user_ids": [1, 2, 3],
"type": "notification",
"data": {
"title": "系统通知",
"message": "系统将在今晚进行维护"
}
}
6. 广播消息
POST /admin/websocket/broadcast
请求参数:
{
"type": "notification",
"data": {
"title": "公告",
"message": "欢迎使用新版本"
},
"exclude_user_id": 1 // 可选:排除某个用户
}
7. 发送消息到频道
POST /admin/websocket/send-to-channel
请求参数:
{
"channel": "orders",
"type": "data_update",
"data": {
"order_id": 123,
"status": "paid"
}
}
8. 发送系统通知
POST /admin/websocket/send-notification
请求参数:
{
"title": "系统维护",
"message": "系统将于今晚 23:00-24:00 进行维护",
"type": "warning",
"extra_data": {
"start_time": "23:00",
"end_time": "24:00"
}
}
9. 发送通知给指定用户
POST /admin/websocket/send-notification-to-users
请求参数:
{
"user_ids": [1, 2, 3],
"title": "订单更新",
"message": "您的订单已发货",
"type": "success"
}
10. 推送数据更新
POST /admin/websocket/push-data-update
请求参数:
{
"user_ids": [1, 2, 3],
"resource_type": "order",
"action": "update",
"data": {
"id": 123,
"status": "shipped"
}
}
11. 推送数据更新到频道
POST /admin/websocket/push-data-update-channel
请求参数:
{
"channel": "orders",
"resource_type": "order",
"action": "create",
"data": {
"id": 124,
"customer": "张三",
"amount": 100.00
}
}
12. 断开用户连接
POST /admin/websocket/disconnect-user
请求参数:
{
"user_id": 1
}
通知接口
1. 获取通知列表
GET /admin/system/notifications
请求参数:
{
"user_id": 1, // 用户ID(可选,默认为当前用户)
"keyword": "通知", // 关键字搜索
"is_read": false, // 阅读状态(true/false)
"type": "info", // 通知类型
"category": "system", // 通知分类
"start_date": "2024-01-01", // 开始日期
"end_date": "2024-12-31", // 结束日期
"page": 1, // 页码
"page_size": 20 // 每页数量
}
响应:
{
"code": 200,
"message": "success",
"data": {
"list": [
{
"id": 1,
"user_id": 1,
"title": "系统通知",
"content": "这是一个测试通知",
"type": "info",
"category": "system",
"data": {},
"action_type": null,
"action_data": null,
"is_read": false,
"read_at": null,
"sent_via_websocket": true,
"sent_at": "2024-02-18 10:00:00",
"retry_count": 0,
"created_at": "2024-02-18 10:00:00"
}
],
"total": 100,
"page": 1,
"page_size": 20
}
}
2. 获取未读通知
GET /admin/system/notifications/unread?limit=10
3. 获取未读通知数量
GET /admin/system/notifications/unread-count
响应:
{
"code": 200,
"message": "success",
"data": {
"count": 5
}
}
4. 获取通知详情
GET /admin/system/notifications/{id}
5. 标记通知为已读
POST /admin/system/notifications/{id}/read
6. 批量标记为已读
POST /admin/system/notifications/batch-read
请求参数:
{
"ids": [1, 2, 3, 4, 5]
}
7. 标记所有通知为已读
POST /admin/system/notifications/read-all
8. 删除通知
DELETE /admin/system/notifications/{id}
9. 批量删除通知
POST /admin/system/notifications/batch-delete
请求参数:
{
"ids": [1, 2, 3]
}
10. 清空已读通知
POST /admin/system/notifications/clear-read
11. 获取通知统计
GET /admin/system/notifications/statistics
响应:
{
"code": 200,
"message": "success",
"data": {
"total": 100,
"unread": 5,
"read": 95,
"by_type": {
"info": 50,
"success": 20,
"warning": 15,
"error": 10,
"task": 5
},
"by_category": {
"system": 60,
"task": 20,
"message": 10,
"reminder": 8,
"announcement": 2
}
}
}
12. 发送通知(管理员功能)
POST /admin/system/notifications/send
请求参数:
{
"user_ids": [1, 2, 3], // 用户ID数组,为空则发送给所有用户
"title": "系统维护通知",
"content": "系统将于今晚22:00进行维护,预计维护时间2小时。",
"type": "warning",
"category": "announcement",
"data": {
"maintenance_start": "2024-02-18 22:00:00",
"maintenance_end": "2024-02-19 00:00:00"
},
"action_type": "link",
"action_data": {
"url": "/system/maintenance"
}
}
13. 重试发送未发送的通知(管理员功能)
POST /admin/system/notifications/retry-unsent?limit=100
配置说明
后端配置
Laravel-S 配置 (config/laravels.php)
'websocket' => [
'enable' => env('LARAVELS_WEBSOCKET', true),
'handler' => \App\Services\WebSocket\WebSocketHandler::class,
],
'swoole' => [
'enable_coroutine' => true,
'worker_num' => 4,
'max_request' => 5000,
'max_request_grace' => 500,
'dispatch_mode' => 2, // 重要:使用抢占模式确保连接状态一致性
],
'swoole_tables' => [
'wsTable' => [
'size' => 102400,
'column' => [
['name' => 'value', 'type' => \Swoole\Table::TYPE_STRING, 'size' => 1024],
['name' => 'expiry', 'type' => \Swoole\Table::TYPE_INT, 'size' => 4],
],
],
],
环境变量
在 .env 文件中添加:
LARAVELS_WEBSOCKET=true
重要配置说明:
dispatch_mode 模式详解
| 模式 | 值 | 说明 | 适用场景 |
|---|---|---|---|
| 轮询模式 | 1 | 按顺序依次分配请求到 Worker | 需要平均分配负载 |
| 抢占模式 | 2 | 固定 Worker 处理特定连接 | 推荐:保持连接状态一致性 |
| 抢占模式 | 3 | 随机分配请求到 Worker | ❌ 会导致状态不一致 |
dispatch_mode = 3 会导致的问题:
- 请求被随机分配到不同的 Worker 进程
- 同一用户的连接和消息发送可能被分配到不同 Worker
- wsTable 中的用户连接数据和消息发送操作无法正确匹配
- 导致消息发送失败、通知无法接收
dispatch_mode = 2 的优势:
- 确保同一用户的请求始终由同一个 Worker 处理
- 连接状态保持一致
- 消息发送可靠
前端配置
WebSocket 配置
在 resources/admin/src/utils/websocket.js 中:
const WS_URL = 'ws://localhost:5200' // WebSocket 服务器地址
const RECONNECT_INTERVAL = 5000 // 重连间隔(毫秒)
const MAX_RECONNECT_ATTEMPTS = 5 // 最大重连次数
const HEARTBEAT_INTERVAL = 30000 // 心跳间隔(毫秒)
通知配置
在 resources/admin/src/stores/modules/notification.js 中:
const pageSize = 20 // 每页数量
const maxLocalNotifications = 100 // 本地最大存储数量
Nginx 配置示例
server {
listen 80;
server_name yourdomain.com;
root /path/to/your/project/public;
location / {
try_files $uri $uri/ /index.php?$query_string;
}
# WebSocket 代理配置
location /ws {
proxy_pass http://127.0.0.1:5200;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header Host $host;
proxy_read_timeout 86400;
}
location ~ \.php$ {
fastcgi_pass 127.0.0.1:9000;
fastcgi_index index.php;
fastcgi_param SCRIPT_FILENAME $document_root$fastcgi_script_name;
include fastcgi_params;
}
}
定时任务配置
建议配置以下定时任务:
// config/crontab.php
// 每5分钟重试一次未发送的通知
*/5 * * * * php /path/to/artisan notifications:retry-unsent --limit=50
// 每天凌晨清理30天前的已读通知
0 0 * * * php /path/to/artisan notifications:cleanup
使用示例
后端使用示例
1. 发送通知给单个用户
use App\Services\System\NotificationService;
use App\Models\System\Notification;
class YourService
{
protected $notificationService;
public function __construct(NotificationService $notificationService)
{
$this->notificationService = $notificationService;
}
public function someMethod()
{
// 发送通知给单个用户
$result = $this->notificationService->sendToUser(
$userId = 1,
$title = '欢迎加入',
$content = '欢迎加入我们的系统!',
$type = Notification::TYPE_SUCCESS,
$category = Notification::CATEGORY_MESSAGE,
$extraData = ['welcome' => true]
);
}
}
2. 发送通知给多个用户
// 发送通知给多个用户
$result = $this->notificationService->sendToUsers(
$userIds = [1, 2, 3],
$title = '系统维护通知',
$content = '系统将于今晚进行维护',
$type = Notification::TYPE_WARNING,
$category = Notification::CATEGORY_ANNOUNCEMENT
);
3. 广播通知
// 广播通知(所有用户)
$result = $this->notificationService->broadcast(
$title = '新功能上线',
$content = '我们推出了新的功能,快来体验吧!',
$type = Notification::TYPE_INFO,
$category = Notification::CATEGORY_ANNOUNCEMENT
);
4. 发送任务通知
// 发送任务通知
$result = $this->notificationService->sendTaskNotification(
$userId = 1,
$title = '任务提醒',
$content = '您有一个任务即将到期',
$taskData = [
'task_id' => 123,
'task_name' => '完成报告',
'due_date' => '2024-02-20'
]
);
5. 使用 WebSocket Service
use App\Services\WebSocket\WebSocketService;
$wsService = app(WebSocketService::class);
// 发送给单个用户
$wsService->sendToUser($userId, [
'type' => 'notification',
'data' => [
'title' => '系统通知',
'content' => '这是一条通知',
'type' => 'info',
'timestamp' => time()
]
]);
// 发送给多个用户
$userIds = [1, 2, 3];
$sentTo = $wsService->sendToUsers($userIds, $data);
// 广播给所有用户
$count = $wsService->broadcast($data);
// 广播给所有用户(排除指定用户)
$count = $wsService->broadcast($data, $excludeUserId);
// 发送到频道
$count = $wsService->sendToChannel('orders', [
'type' => 'data_update',
'data' => [
'order_id' => 123,
'status' => 'paid'
]
]);
// 推送数据更新
$sentTo = $wsService->pushDataUpdate(
$userIds,
'dictionary',
'update',
['id' => 1, 'name' => 'test']
);
// 检查用户是否在线
$isOnline = $wsService->isUserOnline($userId);
// 获取在线用户数量
$count = $wsService->getOnlineUserCount();
// 获取在线用户ID列表
$userIds = $wsService->getOnlineUserIds();
// 断开用户连接
$wsService->disconnectUser($userId);
前端使用示例
1. 基本连接(App.vue 中)
<script setup>
import { useWebSocket } from '@/composables/useWebSocket'
import { useUserStore } from '@/stores/modules/user'
const userStore = useUserStore()
const { initWebSocket, closeWebSocket } = useWebSocket()
// 只在 App.vue 中初始化 WebSocket
onMounted(() => {
if (userStore.isUserInfoComplete()) {
initWebSocket()
}
})
// 监听用户状态变化
watch(
() => [userStore.token, userStore.userInfo],
() => {
if (userStore.isUserInfoComplete()) {
initWebSocket()
} else if (!userStore.isLoggedIn()) {
closeWebSocket()
}
},
{ deep: true }
)
</script>
2. 监听特定消息类型
// 在 useWebSocket 中注册消息处理器
ws.on('notification', (data) => {
message.success(data.title, data.message)
})
ws.on('data_update', (data) => {
console.log('数据更新:', data.resource_type, data.action)
// 刷新数据
loadData()
})
ws.on('chat', (data) => {
console.log('收到聊天消息:', data)
})
3. 发送消息
import { useWebSocket } from '@/composables/useWebSocket'
const { send } = useWebSocket()
// 发送心跳
send('heartbeat', { timestamp: Date.now() })
// 发送私聊消息
send('chat', {
to_user_id: 2,
content: '你好,这是一条私聊消息'
})
// 订阅频道
send('subscribe', { channel: 'orders' })
// 取消订阅
send('unsubscribe', { channel: 'orders' })
// 发送广播消息
send('broadcast', {
message: '这是一条广播消息'
})
4. 使用通知 Store
import { useNotificationStore } from '@/stores/modules/notification'
const notificationStore = useNotificationStore()
// 获取未读数量
await notificationStore.fetchUnreadCount()
// 获取未读通知列表
const unreadList = await notificationStore.fetchUnreadNotifications({
page: 1,
page_size: 10
})
// 获取通知列表
const list = await notificationStore.fetchNotifications({
page: 1,
page_size: 20,
type: 'notification',
category: 'system'
})
// 标记为已读
await notificationStore.markAsRead(notificationId)
// 批量标记为已读
await notificationStore.markMultipleAsRead([1, 2, 3])
// 标记所有为已读
await notificationStore.markAllAsRead()
// 删除通知
await notificationStore.deleteNotification(notificationId)
// 批量删除
await notificationStore.deleteMultipleNotifications([1, 2, 3])
// 清空已读通知
await notificationStore.clearReadNotifications()
// 获取统计信息
const stats = await notificationStore.fetchStatistics()
console.log('统计信息:', stats)
5. 在 Vue 组件中使用
<template>
<div>
<a-button @click="sendMessage">发送消息</a-button>
<div>连接状态: {{ connectionStatus }}</div>
</div>
</template>
<script setup>
import { ref, onMounted, onUnmounted } from 'vue'
import { useWebSocket } from '@/composables/useWebSocket'
import { useNotificationStore } from '@/stores/modules/notification'
const notificationStore = useNotificationStore()
const { send, isConnected } = useWebSocket()
const connectionStatus = ref('未连接')
// 监听未读数量
watch(() => notificationStore.unreadCount, (newCount) => {
console.log('未读数量更新:', newCount)
})
const sendMessage = () => {
if (isConnected.value) {
send('chat', {
to_user_id: 2,
content: '测试消息'
})
} else {
message.warning('WebSocket 未连接')
}
}
onMounted(() => {
// ✅ 注意:不要在这里调用 initWebSocket()
// WebSocket 已在 App.vue 中统一初始化
})
onUnmounted(() => {
// 清理逻辑
})
</script>
连接问题修复
问题描述
在开发过程中发现 WebSocket 连接存在重复创建的问题,导致:
- 每个组件页面都创建新的 WebSocket 连接
- 用户登录后可能同时存在多个 WebSocket 连接
- 消息接收混乱,状态不一致
根本原因
经过代码分析,发现以下三个地方都创建了 WebSocket 连接:
- App.vue(主要连接点):正确,应该在此处统一初始化
- userbar.vue(重复连接):错误,不应该重复初始化
- notifications/index.vue(重复连接):错误,不应该重复初始化
解决方案
原则
- 单一连接点:只在
App.vue中统一初始化和关闭 WebSocket 连接 - 全局状态管理:通过
useWebSocketcomposable 管理单例 WebSocket 实例 - 事件驱动:组件通过监听 store 中的事件来处理 WebSocket 消息
- 连接时授权:通过 URL 参数传递认证信息,无需单独发送 auth 事件
修复内容
1. websocket.js(单例模式)
使用单例模式确保全局只有一个 WebSocket 实例:
let wsClient = null
let currentUserId = null
let currentToken = null
let pendingConnection = null
export function getWebSocket(userId, token, options = {}) {
// 检查是否已有相同用户的连接
if (wsClient) {
if (currentUserId === userId && currentToken === token) {
const state = wsClient.getConnectionState()
// 如果已连接或正在连接,返回现有实例
if (state === 'OPEN' || state === 'CONNECTING') {
return wsClient
}
// 如果连接已关闭,清理
if (state === 'CLOSED' || state === 'CLOSING') {
wsClient = null
}
}
// 不同用户/Token,断开旧连接
if (currentUserId !== userId || currentToken !== token) {
if (wsClient) {
wsClient.disconnect()
}
wsClient = null
}
}
// 创建新连接
currentUserId = userId
currentToken = token
wsClient = createWebSocket(userId, token, options)
return wsClient
}
2. useWebSocket.js(统一初始化)
添加防止重复初始化的机制:
let wsInstance = null
let isInitialized = false
export function useWebSocket() {
const initWebSocket = () => {
// 防止重复初始化
if (isInitialized && wsInstance && wsInstance.isConnected) {
console.log('WebSocket 已连接,跳过重复初始化')
return
}
isInitialized = true
wsInstance = getWebSocket(userStore.userInfo.id, userStore.token, {
onOpen: handleOpen,
onMessage: handleMessage,
onError: handleError,
onClose: handleClose
})
// 注册消息处理器
registerMessageHandlers()
wsInstance.connect()
}
return {
ws: wsInstance,
initWebSocket,
closeWebSocket,
reconnect,
isConnected,
send
}
}
3. 移除重复连接
userbar.vue:
<script setup>
// ❌ 移除:import { useWebSocket } from '@/composables/useWebSocket'
onMounted(() => {
loadNotifications()
// ✅ 注意:WebSocket 已在 App.vue 中统一初始化
// 只需要处理消息即可,消息处理已经在 useWebSocket 中注册
})
</script>
notifications/index.vue:
<script setup>
// ❌ 移除:import { useWebSocket } from '@/composables/useWebSocket'
onMounted(() => {
loadUnreadCount()
// ✅ 注意:WebSocket 已在 App.vue 中统一初始化
// 通知会通过 notification store 自动更新
})
</script>
WebSocket 授权机制优化
优化前
// 旧方式:前端在 handleOpen 中发送 auth 事件
function handleOpen(event) {
console.log('WebSocket 连接已建立', event)
// ❌ 需要单独发送 auth 事件
if (ws.value) {
ws.value.send('auth', {
token: userStore.token,
user_id: userStore.userInfo.id
})
}
}
优化后
// 新方式:连接时通过 URL 参数认证
// URL: ws://host/ws?user_id=1&token=xxxxx
function handleOpen(event) {
console.log('WebSocket 连接已建立(已通过 URL 参数完成认证)', event)
// ✅ 不需要发送 auth 事件
}
优势
✅ 性能优化:减少 1 次消息往返,连接建立后立即可用 ✅ 安全性增强:握手阶段即进行认证,未认证的连接直接拒绝 ✅ 代码简化:前端无需处理 auth 事件,后端逻辑更清晰
认证流程
前端发起连接
│
├─ ws://host/ws?user_id=1&token=xxxxx
│
▼
后端 onOpen
│
├─ 1. 提取 URL 参数(user_id, token)
│
├─ 2. 验证参数完整性
│ └─ 缺失 → 返回错误,断开连接
│
├─ 3. 验证 JWT token
│ ├─ 3.1 解析 token
│ ├─ 3.2 验证用户 ID 匹配
│ │ └─ 不匹配 → 返回错误,断开连接
│ └─ 3.3 验证 token 是否过期
│ └─ 已过期 → 返回错误,断开连接
│
├─ 4. 认证成功
│ │
│ ├─ 存储用户到 fd 映射
│ ├─ 存储 fd 到用户映射
│ └─ 发送 connected 消息
│
▼
连接建立完成,可以正常通信
架构说明
WebSocket 连接流程
┌─────────────────────────────────────────────────────────────┐
│ App.vue │
│ ┌───────────────────────────────────────────────────┐ │
│ │ useWebSocket() 初始化 │ │
│ │ - 获取用户信息 │ │
│ │ - 创建 WebSocket 连接(单例) │ │
│ │ - 连接时通过 URL 参数授权 │ │
│ │ - 注册消息处理器 │ │
│ └──────────────────┬────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────┐ │
│ │ WebSocket Client (单例) │ │
│ │ - 管理连接状态 │ │
│ │ - 处理消息路由 │ │
│ │ - 心跳检测和重连 │ │
│ └──────────────────┬────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────┐ │
│ │ Notification Store │ │
│ │ - 接收 WebSocket 消息 │ │
│ │ - 更新通知状态 │ │
│ │ - 触发 Vue 响应式更新 │ │
│ └──────────────────┬────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────┐ │
│ │ 组件 (userbar, notifications等) │ │
│ │ - 监听 store 状态变化 │ │
│ │ - 响应消息更新 │ │
│ └───────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
消息处理流程
后端推送消息
│
▼
WebSocketClient.onMessage()
│
▼
消息类型路由 (switch/case)
│
├─► notification
│ │
│ ▼
│ notificationStore.handleWebSocketMessage()
│ │
│ ├─► 更新未读数量
│ ├─► 添加到通知列表
│ └─► 触发通知提示
│
├─► data_update
│ │
│ ▼
│ 自定义处理函数
│
└─► heartbeat
│
▼
响应心跳确认
最佳实践
1. WebSocket 连接管理
✅ 推荐做法:
- 只在应用根组件(App.vue)中初始化 WebSocket
- 使用单例模式确保全局只有一个实例
- 通过状态管理(Pinia Store)分发消息
❌ 避免做法:
- 在多个组件中创建 WebSocket 连接
- 直接在子组件中调用
initWebSocket() - 绕过 Store 直接处理 WebSocket 消息
2. 组件中使用 WebSocket
✅ 推荐做法:
// 只需要监听 store 中的状态变化
import { useNotificationStore } from '@/stores/modules/notification'
const notificationStore = useNotificationStore()
// 监听未读数量变化
watch(() => notificationStore.unreadCount, (newCount) => {
console.log('未读数量更新:', newCount)
})
// 监听通知列表变化
watch(() => notificationStore.notifications, (list) => {
console.log('通知列表更新:', list)
})
❌ 避免做法:
// 不要在组件中直接使用 WebSocket
import { useWebSocket } from '@/composables/useWebSocket'
const { initWebSocket } = useWebSocket()
initWebSocket() // ❌ 重复连接!
3. 消息处理器注册
✅ 推荐做法:
// 在 useWebSocket 中统一注册处理器
const initWebSocket = () => {
wsInstance.on('notification', (data) => {
notificationStore.handleWebSocketMessage({ type: 'notification', data })
})
wsInstance.on('data_update', (data) => {
console.log('收到数据更新:', data)
})
}
❌ 避免做法:
// 不要在每个组件中单独注册
ws.onMessage((msg) => {
if (msg.type === 'notification') {
// 处理通知
}
})
故障排查与修复
常见问题与解决方案
问题 1:WebSocket 连接失败
可能原因:
- WebSocket 服务未启动
- 端口被占用
- 防火墙阻止连接
- URL 配置错误
排查步骤:
# 1. 检查 Laravel-S 是否运行
php bin/laravels status
# 2. 启动 Laravel-S
php bin/laravels start
# 3. 检查端口是否被占用
netstat -ano | findstr :5200
# 4. 检查防火墙设置
# Windows: 控制面板 -> 系统和安全 -> Windows 防火墙 -> 允许应用通过防火墙
# Linux: sudo ufw allow 5200
问题 2:登录后 WebSocket 未连接
可能原因:
- 用户信息未加载完成
- token 无效
解决方法:
// 检查控制台日志
// 确认 userStore.isUserInfoComplete() 返回 true
// 查看 getWebSocket 调用参数
// 手动测试连接
import { useUserStore } from '@/stores/modules/user'
const userStore = useUserStore()
console.log('User Info:', userStore.userInfo)
console.log('Token:', userStore.token)
console.log('Is Complete:', userStore.isUserInfoComplete())
问题 3:消息未收到
可能原因:
- 消息处理器未注册
- 消息类型不匹配
- 网络问题
- dispatch_mode 配置错误
排查步骤:
# 1. 检查 dispatch_mode 配置
cat config/laravels.php | grep dispatch_mode
# 应该是 dispatch_mode = 2
# 2. 完全重启 Laravel-S
php bin/laravels stop && php bin/laravels start
# 3. 检查消息处理器
# 查看 useWebSocket.js 中的消息处理器注册
# 4. 查看网络面板 WebSocket 帧
# 浏览器开发者工具 -> Network -> WS
# 5. 查看日志
tail -f storage/logs/swoole.log
tail -f storage/logs/laravel.log
问题 4:消息发送失败
可能原因:
- wsTable 访问方式错误
- 用户不在线
- dispatch_mode 配置错误
错误信息:
TypeError: Access to undefined property Swoole\WebSocket\Server::$wsTable
正确修复方法:
确保通过 app('swoole')->wsTable 而不是 $server->wsTable 访问。
// ❌ 错误:$server 对象没有 wsTable 属性
$server->wsTable->set('uid:' . $userId, [...]);
$fdInfo = $server->wsTable->get('fd:' . $fd);
// ✅ 正确:通过服务容器获取 wsTable
$wsTable = app('swoole')->wsTable;
$wsTable->set('uid:' . $userId, [...]);
$fdInfo = $wsTable->get('fd:' . $fd);
问题 5:dispatch_mode 配置不生效
检查方法:
# 1. 确认配置文件
cat config/laravels.php | grep dispatch_mode
# 2. 完全重启 Laravel-S
php bin/laravels stop && php bin/laravels start
# 3. 检查运行时配置
php bin/laravels config
正确配置:
'swoole' => [
'dispatch_mode' => 2, // ✅ 正确:抢占模式
'worker_num' => 4,
],
问题 6:通知未收到
检查项:
- 用户是否在线
- WebSocket 连接是否正常
- 数据库中是否有通知记录
sent_via_websocket字段是否为 true
排查步骤:
# 1. 检查用户在线状态
php bin/laravels
# 在控制台中执行:
$wsService = app(App\Services\WebSocket\WebSocketService::class);
$wsService->isUserOnline(1);
# 2. 检查通知记录
php artisan tinker
>>> $notifications = App\Models\System\Notification::where('user_id', 1)->get();
>>> $notifications->each(fn($n) => echo "ID: {$n->id}, Sent via WS: {$n->sent_via_websocket}\n");
# 3. 检查日志
tail -f storage/logs/swoole.log | grep "notification"
问题 7:通知重复发送
检查项:
- 是否有多个任务在重试
retry_count是否超过限制- 是否有重复的创建逻辑
解决方法:
// 检查重试逻辑
// 通知最多重试3次,超过后不再重试
// 检查是否有重复的发送逻辑
// 确保不会多次调用 sendNotification
问题 8:WebSocket 认证失败
可能原因:
- Token 无效或已过期
- 用户 ID 不匹配
- 缺少认证参数
排查步骤:
检查前端控制台日志:
// 查看连接 URL 是否正确
console.log('WebSocket URL:', ws.url)
// 查看收到的错误消息
// 检查错误类型和错误信息
检查后端日志:
# 查看认证失败日志
tail -f storage/logs/laravel.log | grep "WebSocket 认证失败"
解决方法:
- 确保 token 有效且未过期
- 确保 URL 参数中的 user_id 与 token 中的 sub 匹配
- 重新登录获取新的 token
Laravel-S wsTable 使用规范
正确的 wsTable 访问方式
根据 Laravel-S 文档和源码,正确访问 wsTable 的方式有两种:
方式 1:在 WebSocketHandler 中通过构造函数获取(推荐)
class WebSocketHandler implements WebSocketHandlerInterface
{
protected $wsTable;
public function __construct()
{
$this->wsTable = app('swoole')->wsTable;
}
public function onOpen(Server $server, Request $request): void
{
// 直接使用 $this->wsTable
$this->wsTable->set('uid:' . $userId, [...]);
}
}
方式 2:在普通 Service 中通过服务容器获取
class WebSocketService
{
public function sendToUser(int $userId, array $data): bool
{
$server = $this->getServer();
// 每次都通过服务容器获取最新的 wsTable
$wsTable = app('swoole')->wsTable;
$fdInfo = $wsTable->get('uid:' . $userId);
// ...
}
}
错误的访问方式(禁止使用)
// ❌ 错误:$server 对象没有 wsTable 属性
$server->wsTable->set('uid:' . $userId, [...]);
$fdInfo = $server->wsTable->get('fd:' . $fd);
原因:
$server是 Swoole\WebSocket\Server 对象- 该对象没有
wsTable属性 - wsTable 是通过 Laravel-S 扩展动态添加到 Swoole Server 的
- Laravel-S 通过服务容器管理 wsTable,应该通过
app('swoole')->wsTable访问
Swoole Table 数据结构
wsTable 用于存储 WebSocket 连接映射关系:
'ws' => [
'size' => 102400, // 表最大行数
'columns' => [
'value' => ['type' => \Swoole\Table::TYPE_STRING, 'size' => 256], // 值
'expiry' => ['type' => \Swoole\Table::TYPE_INT, 'size' => 4], // 过期时间
]
]
存储的键值对:
uid:{userId}→['value' => {fd}, 'expiry' => timestamp]- 用户 ID 到文件描述符的映射fd:{fd}→['value' => {userId}, 'expiry' => timestamp]- 文件描述符到用户 ID 的映射channel:{channel}:fd:{fd}→['value' => 1, 'expiry' => timestamp]- 频道订阅关系
多 Worker 进程注意事项
当使用多个 Worker 进程时(worker_num > 1):
- 进程隔离:每个 Worker 有独立的内存空间
- 状态同步:使用 Swoole Table 实现跨进程数据共享
- 连接一致性:同一用户的连接必须由同一 Worker 处理
- 消息路由:dispatch_mode = 2 确保连接和消息在同一 Worker
性能优化
后端优化
1. 数据库优化
// 为常用查询字段添加索引
Schema::table('system_notifications', function (Blueprint $table) {
$table->index('user_id');
$table->index('is_read');
$table->index('type');
$table->index('category');
$table->index(['user_id', 'is_read']);
$table->index(['user_id', 'created_at']);
});
2. 批量操作
// 使用批量插入减少查询次数
$notifications = [];
foreach ($userIds as $userId) {
$notifications[] = [
'user_id' => $userId,
'title' => $title,
'content' => $content,
'type' => $type,
'category' => $category,
'created_at' => now(),
'updated_at' => now(),
];
}
Notification::insert($notifications);
3. 连接池管理
// 定期清理过期连接
public function cleanExpiredConnections(): void
{
$server = $this->getServer();
$wsTable = app('swoole')->wsTable;
$currentTime = time();
foreach ($wsTable as $key => $row) {
if (strpos($key, 'uid:') === 0) {
$fd = $row['value'];
if (!$server->isEstablished($fd)) {
// 清理无效的连接
$wsTable->del($key);
$wsTable->del('fd:' . $fd);
}
}
}
}
4. 消息队列
对于大量消息发送场景,建议使用队列异步处理:
use Illuminate\Support\Facades\Queue;
// 异步发送通知
dispatch(function () use ($userIds, $message) {
$webSocketService = new WebSocketService();
$webSocketService->sendNotificationToUsers($userIds, $title, $message);
})->onQueue('websocket');
5. 缓存优化
// 使用 Redis 缓存未读数量
use Illuminate\Support\Facades\Cache;
public function getUnreadCount(int $userId): int
{
$cacheKey = "unread_count:{$userId}";
return Cache::remember($cacheKey, 300, function() use ($userId) {
return Notification::where('user_id', $userId)
->where('is_read', false)
->count();
});
}
// 通知标记为已读时清除缓存
public function markAsRead(int $notificationId): bool
{
$notification = Notification::find($notificationId);
$notification->markAsRead();
// 清除缓存
Cache::forget("unread_count:{$notification->user_id}");
return true;
}
前端优化
1. 消息限制
// 限制本地存储数量
const maxLocalNotifications = 100
// 添加消息时检查数量
function addMessage(message) {
if (messages.value.length >= maxLocalNotifications) {
messages.value.pop() // 删除最旧的消息
}
messages.value.unshift(message)
}
2. 分页加载
// 消息列表使用分页,避免一次性加载过多数据
const currentPage = ref(1)
const pageSize = ref(20)
async function loadNotifications() {
const response = await api.notifications.get({
page: currentPage.value,
page_size: pageSize.value
})
// ...
}
3. 虚拟滚动
对于大量消息列表,使用虚拟滚动提升性能:
<template>
<a-virtual-list
:data-sources="messages"
:data-key="'id'"
:keeps="30"
:item-size="60"
>
<template #default="{ data }">
<NotificationItem :notification="data" />
</template>
</a-virtual-list>
</template>
4. 防抖和节流
// 搜索输入防抖
import { debounce } from 'lodash-es'
const handleSearch = debounce((keyword) => {
fetchNotifications({ keyword })
}, 300)
// 滚动加载节流
import { throttle } from 'lodash-es'
const handleScroll = throttle(() => {
if (isNearBottom()) {
loadMore()
}
}, 500)
安全考虑
后端安全
1. 连接认证
public function onOpen(Server $server, Request $request): void
{
$userId = $request->get['user_id'] ?? null;
$token = $request->get['token'] ?? null;
// ✅ 验证 token
if (!$token || !$this->validateToken($userId, $token)) {
$server->push($request->fd, json_encode([
'type' => 'error',
'data' => ['message' => 'Authentication failed']
]));
$server->disconnect($request->fd);
return;
}
// 认证成功,存储连接
$this->wsTable->set('uid:' . $userId, [
'value' => $request->fd,
'expiry' => time() + 3600
]);
}
2. 消息验证
public function onMessage(Server $server, Frame $frame): void
{
$message = json_decode($frame->data, true);
// ✅ 验证消息格式
if (!$message || !isset($message['type'])) {
$server->push($frame->fd, json_encode([
'type' => 'error',
'data' => ['message' => 'Invalid message format']
]));
return;
}
// 处理消息
// ...
}
3. 速率限制
// 防止消息滥用
private $messageRateLimits = [];
public function checkRateLimit(int $fd): bool
{
$key = 'fd:' . $fd;
$now = time();
if (!isset($this->messageRateLimits[$key])) {
$this->messageRateLimits[$key] = [];
}
// 清理旧记录
$this->messageRateLimits[$key] = array_filter(
$this->messageRateLimits[$key],
fn($time) => $time > $now - 60
);
// 检查频率(每分钟最多 60 条)
if (count($this->messageRateLimits[$key]) >= 60) {
return false;
}
$this->messageRateLimits[$key][] = $now;
return true;
}
4. 权限控制
// 确保用户只能查看和操作自己的通知
public function index(Request $request)
{
$userId = $request->user()->id;
$notifications = Notification::where('user_id', $userId)
->where(function($query) use ($request) {
// 应用搜索和筛选条件
if ($request->has('keyword')) {
$query->where('title', 'like', '%' . $request->keyword . '%');
}
// ...
})
->paginate($request->page_size ?? 20);
return response()->json($notifications);
}
5. 数据安全
- 敏感信息不要放在通知内容中
- 使用参数验证和过滤
- SQL 注入防护(使用 Eloquent ORM)
- XSS 防护(使用
htmlspecialchars或类似函数)
前端安全
1. Token 管理
// 不要在前端硬编码 token
// 从 store 中动态获取
import { useUserStore } from '@/stores/modules/user'
const userStore = useUserStore()
const ws = getWebSocket(userStore.userInfo.id, userStore.token)
2. 消息过滤
// 处理接收到的消息时,进行过滤和验证
function handleMessage(message) {
// 验证消息格式
if (!message || !message.type || !message.data) {
console.warn('Invalid message format:', message)
return
}
// 过滤敏感内容
if (containsSensitiveContent(message.data)) {
console.warn('Message contains sensitive content')
return
}
// 处理消息
// ...
}
3. 连接限制
// 限制每个用户的连接数量
const MAX_CONNECTIONS_PER_USER = 3
function canConnect(userId) {
const existingConnections = getConnectionsByUser(userId)
return existingConnections.length < MAX_CONNECTIONS_PER_USER
}
最佳实践
1. 通知类型选择
- info: 一般信息,如欢迎消息、功能更新
- success: 成功操作,如创建成功、导入成功
- warning: 警告信息,如即将过期、维护通知
- error: 错误信息,如执行失败、验证错误
- task: 任务相关,如任务提醒、执行结果
- system: 系统级,如系统维护、重要公告
2. 通知分类选择
- system: 系统管理、配置变更
- task: 定时任务、后台任务
- message: 用户消息、聊天消息
- reminder: 日程提醒、待办事项
- announcement: 公告、通知
3. 避免通知轰炸
- 合理设置通知频率
- 对相似通知进行合并
- 提供通知偏好设置
- 允许用户关闭特定类型的通知
4. 异步处理
// 大量通知建议使用队列异步处理
use App\Jobs\SendNotificationJob;
// 发送大量通知
dispatch(new SendNotificationJob(
$userIds,
$title,
$content,
$type,
$category
))->onQueue('notifications');
5. 错误处理
// 前端错误处理
async function markAsRead(notificationId) {
try {
await api.notifications.read(notificationId)
// 更新本地状态
} catch (error) {
console.error('标记已读失败:', error)
// 显示错误提示
message.error('标记已读失败,请重试')
}
}
6. 日志记录
// 后端日志记录
use Illuminate\Support\Facades\Log;
public function sendToUser(int $userId, array $data): bool
{
Log::info('Sending notification to user', [
'user_id' => $userId,
'type' => $data['type']
]);
// ... 发送逻辑
Log::info('Notification sent successfully', [
'user_id' => $userId,
'notification_id' => $notification->id
]);
return true;
}
7. 测试
// 单元测试示例
public function test_send_notification()
{
$result = $this->service->sendNotification([
'user_id' => 1,
'title' => 'Test',
'content' => 'Test content'
]);
$this->assertTrue($result['success']);
$this->assertDatabaseHas('system_notifications', [
'title' => 'Test'
]);
}
更新日志
2024-02-18
- ✅ 初始版本发布
- ✅ 实现基础 WebSocket 功能
- ✅ 实现通知系统功能
- ✅ 实现消息推送功能
- ✅ 实现频道订阅功能
- ✅ 实现前端客户端封装
- ✅ 实现管理 API 接口
- ✅ 修复 dispatch_mode 配置问题
- ✅ 修复 wsTable 访问问题
- ✅ 添加定时任务重试功能
- ✅ 修复 WebSocket 重复连接问题
- ✅ 实现单例模式管理连接
- ✅ 优化授权机制(连接时自动认证)
- ✅ 合并文档,提供完整的使用指南
参考资料
文档版本: v3.0
更新日期: 2024-02-18
维护者: Development Team