51 KiB
WebSocket 与通知系统完整文档
目录
概述
本项目基于 Laravel-S 和 Swoole 实现了完整的 WebSocket 与通知系统,支持实时双向通信、消息推送、通知管理等功能。系统采用 WebSocket 实现实时推送,同时将通知持久化到数据库,确保离线用户也能收到通知。
核心特性
- ✅ 实时双向通信(WebSocket)
- ✅ 用户连接管理
- ✅ 点对点消息发送
- ✅ 群发消息/广播
- ✅ 频道订阅/取消订阅
- ✅ 心跳机制与自动重连
- ✅ 在线状态管理
- ✅ 系统通知推送
- ✅ 数据更新推送
- ✅ 通知持久化存储
- ✅ 已读/未读状态管理
- ✅ 批量操作支持
功能特性
WebSocket 功能
| 功能 | 说明 | 状态 |
|---|---|---|
| 自动连接管理 | 登录后自动建立连接,退出时自动关闭 | ✅ |
| 断线重连 | 连接断开自动重连(最多5次) | ✅ |
| 心跳机制 | 客户端每30秒发送心跳保持连接 | ✅ |
| 用户认证 | 通过 token 验证用户身份 | ✅ |
| 点对点消息 | 发送消息给指定用户 | ✅ |
| 广播消息 | 向所有在线用户发送消息 | ✅ |
| 频道订阅 | 支持频道订阅和取消订阅 | ✅ |
| 在线状态 | 实时获取用户在线状态 | ✅ |
通知功能
| 功能 | 说明 | 状态 |
|---|---|---|
| 通知发送 | 支持单个/批量/广播发送 | ✅ |
| 通知类型 | info/success/warning/error/task/system | ✅ |
| 通知分类 | system/task/message/reminder/announcement | ✅ |
| 实时推送 | 在线用户通过 WebSocket 实时推送 | ✅ |
| 持久化存储 | 所有通知保存到数据库 | ✅ |
| 已读管理 | 支持标记已读(单个/批量/全部) | ✅ |
| 通知删除 | 支持删除(单个/批量/清空) | ✅ |
| 重试机制 | 发送失败自动重试(最多3次) | ✅ |
| 统计分析 | 提供通知统计数据 | ✅ |
技术架构
后端架构
┌─────────────────────────────────────────┐
│ Laravel + Laravel-S │
├─────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────┐ │
│ │ WebSocketHandler │ │
│ │ (WebSocket 事件处理) │ │
│ └──────────┬───────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ WebSocketService │ │
│ │ (WebSocket 管理) │ │
│ └──────────┬───────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ NotificationService │ │
│ │ (通知服务) │ │
│ └──────────┬───────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ Notification Model │ │
│ │ (通知模型) │ │
│ └──────────┬───────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ Swoole Server │ │
│ │ (WebSocket 服务器) │ │
│ └──────────┬───────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ MySQL Database │ │
│ │ (数据持久化) │ │
│ └──────────────────────────────┘ │
│ │
└─────────────────────────────────────────┘
前端架构
┌─────────────────────────────────────────┐
│ Vue 3 + Vite │
├─────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────┐ │
│ │ Userbar Component │ │
│ │ (通知入口 + 徽章显示) │ │
│ └──────────┬───────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ Notification Store │ │
│ │ (Pinia 状态管理) │ │
│ └──────────┬───────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ System API │ │
│ │ (接口封装) │ │
│ └──────────┬───────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ useWebSocket Hook │ │
│ │ (WebSocket 管理) │ │
│ └──────────┬───────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ WebSocket Client │ │
│ └──────────────────────────────┘ │
│ │
└─────────────────────────────────────────┘
后端实现
1. 数据模型
system_notifications 表
| 字段 | 类型 | 说明 |
|---|---|---|
| id | bigint | 主键ID |
| user_id | bigint | 接收通知的用户ID |
| title | varchar(255) | 通知标题 |
| content | text | 通知内容 |
| type | varchar(50) | 通知类型 |
| category | varchar(50) | 通知分类 |
| data | json | 附加数据 |
| action_type | varchar(50) | 操作类型 |
| action_data | text | 操作数据 |
| is_read | boolean | 是否已读 |
| read_at | timestamp | 阅读时间 |
| sent_via_websocket | boolean | 是否已通过WebSocket发送 |
| sent_at | timestamp | 发送时间 |
| retry_count | int | 重试次数 |
| created_at | timestamp | 创建时间 |
| updated_at | timestamp | 更新时间 |
| deleted_at | timestamp | 删除时间(软删除) |
2. 核心服务类
WebSocketHandler (app/Services/WebSocket/WebSocketHandler.php)
WebSocket 处理器,实现 Swoole 的 WebSocketHandlerInterface 接口。
主要方法:
onOpen(): 处理连接建立事件onMessage(): 处理消息接收事件onClose(): 处理连接关闭事件
支持的消息类型:
ping/pong: 心跳检测heartbeat: 心跳确认chat: 私聊消息broadcast: 广播消息subscribe/unsubscribe: 频道订阅/取消订阅
WebSocketService (app/Services/WebSocket/WebSocketService.php)
WebSocket 服务类,提供便捷的 WebSocket 操作方法。
主要方法:
// 发送消息给指定用户
sendToUser(int $userId, array $data): bool
// 发送消息给多个用户
sendToUsers(array $userIds, array $data): array
// 广播消息给所有用户
broadcast(array $data, ?int $excludeUserId = null): int
// 发送消息到频道
sendToChannel(string $channel, array $data): int
// 获取在线用户数
getOnlineUserCount(): int
// 检查用户是否在线
isUserOnline(int $userId): bool
// 获取在线用户ID列表
getOnlineUserIds(): array
// 断开用户连接
disconnectUser(int $userId): bool
// 发送系统通知
sendSystemNotification(string $title, string $content, string $type): int
// 发送通知给指定用户
sendNotificationToUsers(array $userIds, string $title, string $content, string $type): int
// 推送数据更新
pushDataUpdate(array $userIds, string $resourceType, string $action, array $data): array
// 推送数据更新到频道
pushDataUpdateToChannel(string $channel, string $resourceType, string $action, array $data): int
NotificationService (app/Services/System/NotificationService.php)
通知服务类,负责通知的创建、发送和管理。
主要方法:
// 发送通知给单个用户
sendToUser(int $userId, string $title, string $content, string $type, string $category, ?array $extraData = null): array
// 发送通知给多个用户
sendToUsers(array $userIds, string $title, string $content, string $type, string $category, ?array $extraData = null): array
// 广播通知给所有用户
broadcast(string $title, string $content, string $type, string $category, ?array $extraData = null): array
// 发送任务通知
sendTaskNotification(int $userId, string $title, string $content, array $taskData): array
// 发送消息通知
sendNewMessageNotification(int $userId, string $title, string $content, array $messageData): array
// 发送提醒通知
sendReminderNotification(int $userId, string $title, string $content, array $reminderData): array
// 标记通知为已读
markAsRead(int $notificationId): bool
// 批量标记为已读
markMultipleAsRead(array $notificationIds): int
// 标记所有通知为已读
markAllAsRead(int $userId): int
// 删除通知
deleteNotification(int $notificationId): bool
// 批量删除通知
deleteMultipleNotifications(array $notificationIds): int
// 清空已读通知
clearReadNotifications(int $userId): int
// 重试未发送的通知
retryUnsentNotifications(int $limit = 100): int
// 获取通知统计
getStatistics(int $userId): array
3. 控制器
NotificationController (app/Http/Controllers/System/Admin/Notification.php)
后台管理通知控制器,提供完整的 CRUD 操作。
主要方法:
index(): 获取通知列表show(): 获取通知详情read(): 标记通知为已读batchRead(): 批量标记为已读readAll(): 标记所有通知为已读destroy(): 删除通知batchDelete(): 批量删除通知clearRead(): 清空已读通知unread(): 获取未读通知列表unreadCount(): 获取未读通知数量statistics(): 获取通知统计send(): 发送通知(管理员)retryUnsent(): 重试未发送的通知(管理员)
WebSocketController (app/Http/Controllers/System/WebSocket.php)
WebSocket API 控制器,提供 HTTP 接口用于管理 WebSocket 连接。
主要方法:
getOnlineCount(): 获取在线用户数getOnlineUsers(): 获取在线用户列表checkOnline(): 检查用户在线状态sendToUser(): 发送消息给指定用户sendToUsers(): 发送消息给多个用户broadcast(): 广播消息sendToChannel(): 发送消息到频道sendNotification(): 发送系统通知sendNotificationToUsers(): 发送通知给指定用户pushDataUpdate(): 推送数据更新pushDataUpdateChannel(): 推送数据更新到频道disconnectUser(): 断开用户连接
4. 定时任务
RetryUnsentNotifications (app/Console/Commands/RetryUnsentNotifications.php)
自动重试发送失败的通知。
使用方法:
# 重试最多100条未发送的通知
php artisan notifications:retry-unsent
# 重试最多50条未发送的通知
php artisan notifications:retry-unsent --limit=50
定时任务配置:
// config/crontab.php
// 每5分钟重试一次未发送的通知
*/5 * * * * php /path/to/artisan notifications:retry-unsent --limit=50
前端实现
1. WebSocket 客户端
WebSocketClient (resources/admin/src/utils/websocket.js)
WebSocket 客户端封装类,提供自动连接、重连、消息处理等功能。
主要功能:
- 自动连接和重连
- 心跳机制
- 消息类型路由
- 事件监听
- 连接状态管理
使用示例:
import { getWebSocket, closeWebSocket } from '@/utils/websocket'
import { useUserStore } from '@/stores/modules/user'
const userStore = useUserStore()
// 连接 WebSocket
const ws = getWebSocket(userStore.userInfo.id, userStore.token, {
onOpen: (event) => {
console.log('WebSocket 已连接')
},
onMessage: (message) => {
console.log('收到消息:', message)
},
onError: (error) => {
console.error('WebSocket 错误:', error)
},
onClose: (event) => {
console.log('WebSocket 已关闭')
}
})
// 连接
ws.connect()
// 发送消息
ws.send('heartbeat', { timestamp: Date.now() })
// 监听特定消息类型
ws.on('notification', (data) => {
message.success(data.title, data.message)
})
// 断开连接
ws.disconnect()
2. 通知 Store
Notification Store (resources/admin/src/stores/modules/notification.js)
通知状态管理,基于 Pinia 实现。
主要方法:
// 获取未读数量
await notificationStore.fetchUnreadCount()
// 获取未读通知列表
await notificationStore.fetchUnreadNotifications({
page: 1,
page_size: 10
})
// 获取通知列表
await notificationStore.fetchNotifications({
page: 1,
page_size: 20,
type: 'notification',
category: 'system'
})
// 标记为已读
await notificationStore.markAsRead(notificationId)
// 批量标记为已读
await notificationStore.markMultipleAsRead([1, 2, 3])
// 标记所有为已读
await notificationStore.markAllAsRead()
// 删除通知
await notificationStore.deleteNotification(notificationId)
// 批量删除
await notificationStore.deleteMultipleNotifications([1, 2, 3])
// 清空已读通知
await notificationStore.clearReadNotifications()
// 发送通知(管理员)
await notificationStore.sendNotification({
recipient_id: userId,
title: '通知标题',
content: '通知内容',
type: 'info',
category: 'system'
})
// 重试未发送的通知(管理员)
await notificationStore.retryUnsentNotifications(100)
// 获取统计信息
await notificationStore.fetchStatistics()
3. 通知 API
System API (resources/admin/src/api/system.js)
通知相关的 API 接口封装。
// API 方法
export default {
// 通知列表
notifications: {
get: async function(params) {
return await request.get('admin/system/notifications', { params })
},
// 未读列表
unread: {
get: async function(params) {
return await request.get('admin/system/notifications/unread', { params })
}
},
// 未读数量
unreadCount: {
get: async function() {
return await request.get('admin/system/notifications/unread-count')
}
},
// 详情
show: async function(id) {
return await request.get(`admin/system/notifications/${id}`)
},
// 标记已读
read: async function(id) {
return await request.post(`admin/system/notifications/${id}/read`)
},
// 批量标记已读
batchRead: async function(data) {
return await request.post('admin/system/notifications/batch-read', data)
},
// 全部标记已读
readAll: async function() {
return await request.post('admin/system/notifications/read-all')
},
// 删除
delete: async function(id) {
return await request.delete(`admin/system/notifications/${id}`)
},
// 批量删除
batchDelete: async function(data) {
return await request.post('admin/system/notifications/batch-delete', data)
},
// 清空已读
clearRead: async function() {
return await request.post('admin/system/notifications/clear-read')
},
// 统计
statistics: {
get: async function() {
return await request.get('admin/system/notifications/statistics')
}
},
// 发送通知
send: async function(data) {
return await request.post('admin/system/notifications/send', data)
},
// 重试未发送
retryUnsent: async function(params) {
return await request.post('admin/system/notifications/retry-unsent', null, { params })
}
}
}
4. 用户栏通知组件
Userbar Component (resources/admin/src/layouts/components/userbar.vue)
顶部用户栏中的通知组件,提供:
- 通知铃铛图标
- 未读数量徽章
- 通知下拉列表
- 快捷操作(全部已读、清空)
- 通知分类筛选
- 点击标记已读
功能特性:
- 未读消息数量实时更新
- 支持按类型筛选(全部/通知/任务/警告)
- 悬浮显示删除按钮
- 分页浏览
- 本地缓存(localStorage)
5. 通知列表页面
Notification List Page (resources/admin/src/pages/system/notifications/index.vue)
通知管理页面,提供完整的通知管理功能。
功能特性:
- 通知列表展示
- 搜索和筛选
- 批量操作
- 通知详情查看
- 已读/未读状态切换
- 删除和清空
API接口
WebSocket 接口
1. 获取在线用户数
GET /admin/websocket/online-count
响应:
{
"code": 200,
"message": "success",
"data": {
"online_count": 10
}
}
2. 获取在线用户列表
GET /admin/websocket/online-users
响应:
{
"code": 200,
"message": "success",
"data": {
"user_ids": [1, 2, 3, 4, 5],
"count": 5
}
}
3. 检查用户在线状态
POST /admin/websocket/check-online
请求参数:
{
"user_id": 1
}
响应:
{
"code": 200,
"message": "success",
"data": {
"user_id": 1,
"is_online": true
}
}
4. 发送消息给指定用户
POST /admin/websocket/send-to-user
请求参数:
{
"user_id": 1,
"type": "notification",
"data": {
"title": "新消息",
"message": "您有一条新消息"
}
}
5. 发送消息给多个用户
POST /admin/websocket/send-to-users
请求参数:
{
"user_ids": [1, 2, 3],
"type": "notification",
"data": {
"title": "系统通知",
"message": "系统将在今晚进行维护"
}
}
6. 广播消息
POST /admin/websocket/broadcast
请求参数:
{
"type": "notification",
"data": {
"title": "公告",
"message": "欢迎使用新版本"
},
"exclude_user_id": 1 // 可选:排除某个用户
}
7. 发送消息到频道
POST /admin/websocket/send-to-channel
请求参数:
{
"channel": "orders",
"type": "data_update",
"data": {
"order_id": 123,
"status": "paid"
}
}
8. 发送系统通知
POST /admin/websocket/send-notification
请求参数:
{
"title": "系统维护",
"message": "系统将于今晚 23:00-24:00 进行维护",
"type": "warning",
"extra_data": {
"start_time": "23:00",
"end_time": "24:00"
}
}
9. 发送通知给指定用户
POST /admin/websocket/send-notification-to-users
请求参数:
{
"user_ids": [1, 2, 3],
"title": "订单更新",
"message": "您的订单已发货",
"type": "success"
}
10. 推送数据更新
POST /admin/websocket/push-data-update
请求参数:
{
"user_ids": [1, 2, 3],
"resource_type": "order",
"action": "update",
"data": {
"id": 123,
"status": "shipped"
}
}
11. 推送数据更新到频道
POST /admin/websocket/push-data-update-channel
请求参数:
{
"channel": "orders",
"resource_type": "order",
"action": "create",
"data": {
"id": 124,
"customer": "张三",
"amount": 100.00
}
}
12. 断开用户连接
POST /admin/websocket/disconnect-user
请求参数:
{
"user_id": 1
}
通知接口
1. 获取通知列表
GET /admin/system/notifications
请求参数:
{
"user_id": 1, // 用户ID(可选,默认为当前用户)
"keyword": "通知", // 关键字搜索
"is_read": false, // 阅读状态(true/false)
"type": "info", // 通知类型
"category": "system", // 通知分类
"start_date": "2024-01-01", // 开始日期
"end_date": "2024-12-31", // 结束日期
"page": 1, // 页码
"page_size": 20 // 每页数量
}
响应:
{
"code": 200,
"message": "success",
"data": {
"list": [
{
"id": 1,
"user_id": 1,
"title": "系统通知",
"content": "这是一个测试通知",
"type": "info",
"category": "system",
"data": {},
"action_type": null,
"action_data": null,
"is_read": false,
"read_at": null,
"sent_via_websocket": true,
"sent_at": "2024-02-18 10:00:00",
"retry_count": 0,
"created_at": "2024-02-18 10:00:00"
}
],
"total": 100,
"page": 1,
"page_size": 20
}
}
2. 获取未读通知
GET /admin/system/notifications/unread?limit=10
3. 获取未读通知数量
GET /admin/system/notifications/unread-count
响应:
{
"code": 200,
"message": "success",
"data": {
"count": 5
}
}
4. 获取通知详情
GET /admin/system/notifications/{id}
5. 标记通知为已读
POST /admin/system/notifications/{id}/read
6. 批量标记为已读
POST /admin/system/notifications/batch-read
请求参数:
{
"ids": [1, 2, 3, 4, 5]
}
7. 标记所有通知为已读
POST /admin/system/notifications/read-all
8. 删除通知
DELETE /admin/system/notifications/{id}
9. 批量删除通知
POST /admin/system/notifications/batch-delete
请求参数:
{
"ids": [1, 2, 3]
}
10. 清空已读通知
POST /admin/system/notifications/clear-read
11. 获取通知统计
GET /admin/system/notifications/statistics
响应:
{
"code": 200,
"message": "success",
"data": {
"total": 100,
"unread": 5,
"read": 95,
"by_type": {
"info": 50,
"success": 20,
"warning": 15,
"error": 10,
"task": 5
},
"by_category": {
"system": 60,
"task": 20,
"message": 10,
"reminder": 8,
"announcement": 2
}
}
}
12. 发送通知(管理员功能)
POST /admin/system/notifications/send
请求参数:
{
"user_ids": [1, 2, 3], // 用户ID数组,为空则发送给所有用户
"title": "系统维护通知",
"content": "系统将于今晚22:00进行维护,预计维护时间2小时。",
"type": "warning",
"category": "announcement",
"data": {
"maintenance_start": "2024-02-18 22:00:00",
"maintenance_end": "2024-02-19 00:00:00"
},
"action_type": "link",
"action_data": {
"url": "/system/maintenance"
}
}
13. 重试发送未发送的通知(管理员功能)
POST /admin/system/notifications/retry-unsent?limit=100
配置说明
后端配置
Laravel-S 配置 (config/laravels.php)
'websocket' => [
'enable' => env('LARAVELS_WEBSOCKET', true),
'handler' => \App\Services\WebSocket\WebSocketHandler::class,
],
'swoole' => [
'enable_coroutine' => true,
'worker_num' => 4,
'max_request' => 5000,
'max_request_grace' => 500,
'dispatch_mode' => 2, // 重要:使用抢占模式确保连接状态一致性
],
'swoole_tables' => [
'wsTable' => [
'size' => 102400,
'column' => [
['name' => 'value', 'type' => \Swoole\Table::TYPE_STRING, 'size' => 1024],
['name' => 'expiry', 'type' => \Swoole\Table::TYPE_INT, 'size' => 4],
],
],
],
环境变量
在 .env 文件中添加:
LARAVELS_WEBSOCKET=true
重要配置说明:
dispatch_mode 模式详解
| 模式 | 值 | 说明 | 适用场景 |
|---|---|---|---|
| 轮询模式 | 1 | 按顺序依次分配请求到 Worker | 需要平均分配负载 |
| 抢占模式 | 2 | 固定 Worker 处理特定连接 | 推荐:保持连接状态一致性 |
| 抢占模式 | 3 | 随机分配请求到 Worker | ❌ 会导致状态不一致 |
dispatch_mode = 3 会导致的问题:
- 请求被随机分配到不同的 Worker 进程
- 同一用户的连接和消息发送可能被分配到不同 Worker
- wsTable 中的用户连接数据和消息发送操作无法正确匹配
- 导致消息发送失败、通知无法接收
dispatch_mode = 2 的优势:
- 确保同一用户的请求始终由同一个 Worker 处理
- 连接状态保持一致
- 消息发送可靠
前端配置
WebSocket 配置
在 resources/admin/src/utils/websocket.js 中:
const WS_URL = 'ws://localhost:5200' // WebSocket 服务器地址
const RECONNECT_INTERVAL = 5000 // 重连间隔(毫秒)
const MAX_RECONNECT_ATTEMPTS = 5 // 最大重连次数
const HEARTBEAT_INTERVAL = 30000 // 心跳间隔(毫秒)
通知配置
在 resources/admin/src/stores/modules/notification.js 中:
const pageSize = 20 // 每页数量
const maxLocalNotifications = 100 // 本地最大存储数量
Nginx 配置示例
server {
listen 80;
server_name yourdomain.com;
root /path/to/your/project/public;
location / {
try_files $uri $uri/ /index.php?$query_string;
}
# WebSocket 代理配置
location /ws {
proxy_pass http://127.0.0.1:5200;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header Host $host;
proxy_read_timeout 86400;
}
location ~ \.php$ {
fastcgi_pass 127.0.0.1:9000;
fastcgi_index index.php;
fastcgi_param SCRIPT_FILENAME $document_root$fastcgi_script_name;
include fastcgi_params;
}
}
定时任务配置
建议配置以下定时任务:
// config/crontab.php
// 每5分钟重试一次未发送的通知
*/5 * * * * php /path/to/artisan notifications:retry-unsent --limit=50
// 每天凌晨清理30天前的已读通知
0 0 * * * php /path/to/artisan notifications:cleanup
使用示例
后端使用示例
1. 发送通知给单个用户
use App\Services\System\NotificationService;
use App\Models\System\Notification;
class YourService
{
protected $notificationService;
public function __construct(NotificationService $notificationService)
{
$this->notificationService = $notificationService;
}
public function someMethod()
{
// 发送通知给单个用户
$result = $this->notificationService->sendToUser(
$userId = 1,
$title = '欢迎加入',
$content = '欢迎加入我们的系统!',
$type = Notification::TYPE_SUCCESS,
$category = Notification::CATEGORY_MESSAGE,
$extraData = ['welcome' => true]
);
}
}
2. 发送通知给多个用户
// 发送通知给多个用户
$result = $this->notificationService->sendToUsers(
$userIds = [1, 2, 3],
$title = '系统维护通知',
$content = '系统将于今晚进行维护',
$type = Notification::TYPE_WARNING,
$category = Notification::CATEGORY_ANNOUNCEMENT
);
3. 广播通知
// 广播通知(所有用户)
$result = $this->notificationService->broadcast(
$title = '新功能上线',
$content = '我们推出了新的功能,快来体验吧!',
$type = Notification::TYPE_INFO,
$category = Notification::CATEGORY_ANNOUNCEMENT
);
4. 发送任务通知
// 发送任务通知
$result = $this->notificationService->sendTaskNotification(
$userId = 1,
$title = '任务提醒',
$content = '您有一个任务即将到期',
$taskData = [
'task_id' => 123,
'task_name' => '完成报告',
'due_date' => '2024-02-20'
]
);
5. 使用 WebSocket Service
use App\Services\WebSocket\WebSocketService;
$wsService = app(WebSocketService::class);
// 发送给单个用户
$wsService->sendToUser($userId, [
'type' => 'notification',
'data' => [
'title' => '系统通知',
'content' => '这是一条通知',
'type' => 'info',
'timestamp' => time()
]
]);
// 发送给多个用户
$userIds = [1, 2, 3];
$sentTo = $wsService->sendToUsers($userIds, $data);
// 广播给所有用户
$count = $wsService->broadcast($data);
// 广播给所有用户(排除指定用户)
$count = $wsService->broadcast($data, $excludeUserId);
// 发送到频道
$count = $wsService->sendToChannel('orders', [
'type' => 'data_update',
'data' => [
'order_id' => 123,
'status' => 'paid'
]
]);
// 推送数据更新
$sentTo = $wsService->pushDataUpdate(
$userIds,
'dictionary',
'update',
['id' => 1, 'name' => 'test']
);
// 检查用户是否在线
$isOnline = $wsService->isUserOnline($userId);
// 获取在线用户数量
$count = $wsService->getOnlineUserCount();
// 获取在线用户ID列表
$userIds = $wsService->getOnlineUserIds();
// 断开用户连接
$wsService->disconnectUser($userId);
前端使用示例
1. 基本连接
import { getWebSocket, closeWebSocket } from '@/utils/websocket'
import { useUserStore } from '@/stores/modules/user'
const userStore = useUserStore()
// 连接 WebSocket
const ws = getWebSocket(userStore.userInfo.id, userStore.token, {
onOpen: (event) => {
console.log('WebSocket 已连接')
},
onMessage: (message) => {
console.log('收到消息:', message)
},
onError: (error) => {
console.error('WebSocket 错误:', error)
},
onClose: (event) => {
console.log('WebSocket 已关闭')
}
})
// 连接
ws.connect()
2. 监听特定消息类型
// 监听通知消息
ws.on('notification', (data) => {
message.success(data.title, data.message)
})
// 监听数据更新
ws.on('data_update', (data) => {
console.log('数据更新:', data.resource_type, data.action)
// 刷新数据
loadData()
})
// 监听聊天消息
ws.on('chat', (data) => {
console.log('收到聊天消息:', data)
})
3. 发送消息
// 发送心跳
ws.send('heartbeat', { timestamp: Date.now() })
// 发送私聊消息
ws.send('chat', {
to_user_id: 2,
content: '你好,这是一条私聊消息'
})
// 订阅频道
ws.send('subscribe', { channel: 'orders' })
// 取消订阅
ws.send('unsubscribe', { channel: 'orders' })
// 发送广播消息
ws.send('broadcast', {
message: '这是一条广播消息'
})
4. 使用通知 Store
import { useNotificationStore } from '@/stores/modules/notification'
const notificationStore = useNotificationStore()
// 获取未读数量
await notificationStore.fetchUnreadCount()
// 获取未读通知列表
const unreadList = await notificationStore.fetchUnreadNotifications({
page: 1,
page_size: 10
})
// 获取通知列表
const list = await notificationStore.fetchNotifications({
page: 1,
page_size: 20,
type: 'notification',
category: 'system'
})
// 标记为已读
await notificationStore.markAsRead(notificationId)
// 批量标记为已读
await notificationStore.markMultipleAsRead([1, 2, 3])
// 标记所有为已读
await notificationStore.markAllAsRead()
// 删除通知
await notificationStore.deleteNotification(notificationId)
// 批量删除
await notificationStore.deleteMultipleNotifications([1, 2, 3])
// 清空已读通知
await notificationStore.clearReadNotifications()
// 获取统计信息
const stats = await notificationStore.fetchStatistics()
console.log('统计信息:', stats)
5. 在 Vue 组件中使用
<template>
<div>
<a-button @click="connectWebSocket">连接 WebSocket</a-button>
<a-button @click="disconnectWebSocket">断开连接</a-button>
<a-button @click="sendMessage">发送消息</a-button>
<div>连接状态: {{ connectionStatus }}</div>
</div>
</template>
<script setup>
import { ref, onMounted, onUnmounted } from 'vue'
import { getWebSocket } from '@/utils/websocket'
import { useUserStore } from '@/stores/modules/user'
const userStore = useUserStore()
const ws = ref(null)
const connectionStatus = ref('未连接')
const connectWebSocket = () => {
ws.value = getWebSocket(userStore.userInfo.id, userStore.token, {
onOpen: () => {
connectionStatus.value = '已连接'
},
onMessage: (message) => {
handleMessage(message)
},
onClose: () => {
connectionStatus.value = '已断开'
}
})
ws.value.connect()
}
const disconnectWebSocket = () => {
if (ws.value) {
ws.value.disconnect()
connectionStatus.value = '已断开'
}
}
const sendMessage = () => {
if (ws.value && ws.value.isConnected) {
ws.value.send('chat', {
to_user_id: 2,
content: '测试消息'
})
}
}
const handleMessage = (message) => {
switch (message.type) {
case 'notification':
message.success(message.data.title, message.data.message)
break
case 'data_update':
// 处理数据更新
break
case 'chat':
// 处理聊天消息
break
}
}
onMounted(() => {
connectWebSocket()
})
onUnmounted(() => {
disconnectWebSocket()
})
</script>
故障排查与修复
常见问题与解决方案
问题 1:WebSocket 连接失败
可能原因:
- WebSocket 服务未启动
- 端口被占用
- 防火墙阻止连接
- URL 配置错误
排查步骤:
# 1. 检查 Laravel-S 是否运行
php bin/laravels status
# 2. 启动 Laravel-S
php bin/laravels start
# 3. 检查端口是否被占用
netstat -ano | findstr :5200
# 4. 检查防火墙设置
# Windows: 控制面板 -> 系统和安全 -> Windows 防火墙 -> 允许应用通过防火墙
# Linux: sudo ufw allow 5200
问题 2:登录后 WebSocket 未连接
可能原因:
- 用户信息未加载完成
- token 无效
解决方法:
// 检查控制台日志
// 确认 userStore.isUserInfoComplete() 返回 true
// 查看 getWebSocket 调用参数
// 手动测试连接
import { useUserStore } from '@/stores/modules/user'
const userStore = useUserStore()
console.log('User Info:', userStore.userInfo)
console.log('Token:', userStore.token)
console.log('Is Complete:', userStore.isUserInfoComplete())
问题 3:消息未收到
可能原因:
- 消息处理器未注册
- 消息类型不匹配
- 网络问题
- dispatch_mode 配置错误
排查步骤:
# 1. 检查 dispatch_mode 配置
cat config/laravels.php | grep dispatch_mode
# 应该是 dispatch_mode = 2
# 2. 完全重启 Laravel-S
php bin/laravels stop && php bin/laravels start
# 3. 检查消息处理器
# 查看 useWebSocket.js 中的消息处理器注册
# 4. 查看网络面板 WebSocket 帧
# 浏览器开发者工具 -> Network -> WS
# 5. 查看日志
tail -f storage/logs/swoole.log
tail -f storage/logs/laravel.log
问题 4:消息发送失败
可能原因:
- wsTable 访问方式错误
- 用户不在线
- dispatch_mode 配置错误
错误信息:
TypeError: Access to undefined property Swoole\WebSocket\Server::$wsTable
正确修复方法:
确保通过 app('swoole')->wsTable 而不是 $server->wsTable 访问。
// ❌ 错误:$server 对象没有 wsTable 属性
$server->wsTable->set('uid:' . $userId, [...]);
$fdInfo = $server->wsTable->get('fd:' . $fd);
// ✅ 正确:通过服务容器获取 wsTable
$wsTable = app('swoole')->wsTable;
$wsTable->set('uid:' . $userId, [...]);
$fdInfo = $wsTable->get('fd:' . $fd);
问题 5:dispatch_mode 配置不生效
检查方法:
# 1. 确认配置文件
cat config/laravels.php | grep dispatch_mode
# 2. 完全重启 Laravel-S
php bin/laravels stop && php bin/laravels start
# 3. 检查运行时配置
php bin/laravels config
正确配置:
'swoole' => [
'dispatch_mode' => 2, // ✅ 正确:抢占模式
'worker_num' => 4,
],
问题 6:通知未收到
检查项:
- 用户是否在线
- WebSocket 连接是否正常
- 数据库中是否有通知记录
sent_via_websocket字段是否为 true
排查步骤:
# 1. 检查用户在线状态
php bin/laravels
# 在控制台中执行:
$wsService = app(App\Services\WebSocket\WebSocketService::class);
$wsService->isUserOnline(1);
# 2. 检查通知记录
php artisan tinker
>>> $notifications = App\Models\System\Notification::where('user_id', 1)->get();
>>> $notifications->each(fn($n) => echo "ID: {$n->id}, Sent via WS: {$n->sent_via_websocket}\n");
# 3. 检查日志
tail -f storage/logs/swoole.log | grep "notification"
问题 7:通知重复发送
检查项:
- 是否有多个任务在重试
retry_count是否超过限制- 是否有重复的创建逻辑
解决方法:
// 检查重试逻辑
// 通知最多重试3次,超过后不再重试
// 检查是否有重复的发送逻辑
// 确保不会多次调用 sendNotification
Laravel-S wsTable 使用规范
正确的 wsTable 访问方式
根据 Laravel-S 文档和源码,正确访问 wsTable 的方式有两种:
方式 1:在 WebSocketHandler 中通过构造函数获取(推荐)
class WebSocketHandler implements WebSocketHandlerInterface
{
protected $wsTable;
public function __construct()
{
$this->wsTable = app('swoole')->wsTable;
}
public function onOpen(Server $server, Request $request): void
{
// 直接使用 $this->wsTable
$this->wsTable->set('uid:' . $userId, [...]);
}
}
方式 2:在普通 Service 中通过服务容器获取
class WebSocketService
{
public function sendToUser(int $userId, array $data): bool
{
$server = $this->getServer();
// 每次都通过服务容器获取最新的 wsTable
$wsTable = app('swoole')->wsTable;
$fdInfo = $wsTable->get('uid:' . $userId);
// ...
}
}
错误的访问方式(禁止使用)
// ❌ 错误:$server 对象没有 wsTable 属性
$server->wsTable->set('uid:' . $userId, [...]);
$fdInfo = $server->wsTable->get('fd:' . $fd);
原因:
$server是 Swoole\WebSocket\Server 对象- 该对象没有
wsTable属性 - wsTable 是通过 Laravel-S 扩展动态添加到 Swoole Server 的
- Laravel-S 通过服务容器管理 wsTable,应该通过
app('swoole')->wsTable访问
Swoole Table 数据结构
wsTable 用于存储 WebSocket 连接映射关系:
'ws' => [
'size' => 102400, // 表最大行数
'columns' => [
'value' => ['type' => \Swoole\Table::TYPE_STRING, 'size' => 256], // 值
]
]
存储的键值对:
uid:{userId}→['value' => {fd}, 'expiry' => timestamp]- 用户 ID 到文件描述符的映射fd:{fd}→['value' => {userId}, 'expiry' => timestamp]- 文件描述符到用户 ID 的映射channel:{channel}:fd:{fd}→['value' => 1, 'expiry' => timestamp]- 频道订阅关系
多 Worker 进程注意事项
当使用多个 Worker 进程时(worker_num > 1):
- 进程隔离: 每个 Worker 有独立的内存空间
- 状态同步: 使用 Swoole Table 实现跨进程数据共享
- 连接一致性: 同一用户的连接必须由同一 Worker 处理
- 消息路由: dispatch_mode = 2 确保连接和消息在同一 Worker
性能优化
后端优化
1. 数据库优化
// 为常用查询字段添加索引
Schema::table('system_notifications', function (Blueprint $table) {
$table->index('user_id');
$table->index('is_read');
$table->index('type');
$table->index('category');
$table->index(['user_id', 'is_read']);
$table->index(['user_id', 'created_at']);
});
2. 批量操作
// 使用批量插入减少查询次数
$notifications = [];
foreach ($userIds as $userId) {
$notifications[] = [
'user_id' => $userId,
'title' => $title,
'content' => $content,
'type' => $type,
'category' => $category,
'created_at' => now(),
'updated_at' => now(),
];
}
Notification::insert($notifications);
3. 连接池管理
// 定期清理过期连接
public function cleanExpiredConnections(): void
{
$server = $this->getServer();
$wsTable = app('swoole')->wsTable;
$currentTime = time();
foreach ($wsTable as $key => $row) {
if (strpos($key, 'uid:') === 0) {
$fd = $row['value'];
if (!$server->isEstablished($fd)) {
// 清理无效的连接
$wsTable->del($key);
$wsTable->del('fd:' . $fd);
}
}
}
}
4. 消息队列
对于大量消息发送场景,建议使用队列异步处理:
use Illuminate\Support\Facades\Queue;
// 异步发送通知
dispatch(function () use ($userIds, $message) {
$webSocketService = new WebSocketService();
$webSocketService->sendNotificationToUsers($userIds, $title, $message);
})->onQueue('websocket');
5. 缓存优化
// 使用 Redis 缓存未读数量
use Illuminate\Support\Facades\Cache;
public function getUnreadCount(int $userId): int
{
$cacheKey = "unread_count:{$userId}";
return Cache::remember($cacheKey, 300, function() use ($userId) {
return Notification::where('user_id', $userId)
->where('is_read', false)
->count();
});
}
// 通知标记为已读时清除缓存
public function markAsRead(int $notificationId): bool
{
$notification = Notification::find($notificationId);
$notification->markAsRead();
// 清除缓存
Cache::forget("unread_count:{$notification->user_id}");
return true;
}
前端优化
1. 消息限制
// 限制本地存储数量
const maxLocalNotifications = 100
// 添加消息时检查数量
function addMessage(message) {
if (messages.value.length >= maxLocalNotifications) {
messages.value.pop() // 删除最旧的消息
}
messages.value.unshift(message)
}
2. 分页加载
// 消息列表使用分页,避免一次性加载过多数据
const currentPage = ref(1)
const pageSize = ref(20)
async function loadNotifications() {
const response = await api.notifications.get({
page: currentPage.value,
page_size: pageSize.value
})
// ...
}
3. 虚拟滚动
对于大量消息列表,使用虚拟滚动提升性能:
<template>
<a-virtual-list
:data-sources="messages"
:data-key="'id'"
:keeps="30"
:item-size="60"
>
<template #default="{ data }">
<NotificationItem :notification="data" />
</template>
</a-virtual-list>
</template>
4. 防抖和节流
// 搜索输入防抖
import { debounce } from 'lodash-es'
const handleSearch = debounce((keyword) => {
fetchNotifications({ keyword })
}, 300)
// 滚动加载节流
import { throttle } from 'lodash-es'
const handleScroll = throttle(() => {
if (isNearBottom()) {
loadMore()
}
}, 500)
安全考虑
后端安全
1. 连接认证
public function onOpen(Server $server, Request $request): void
{
$userId = $request->get['user_id'] ?? null;
$token = $request->get['token'] ?? null;
// ✅ 验证 token
if (!$token || !$this->validateToken($userId, $token)) {
$server->push($request->fd, json_encode([
'type' => 'error',
'data' => ['message' => 'Authentication failed']
]));
$server->disconnect($request->fd);
return;
}
// 认证成功,存储连接
$this->wsTable->set('uid:' . $userId, [
'value' => $request->fd,
'expiry' => time() + 3600
]);
}
2. 消息验证
public function onMessage(Server $server, Frame $frame): void
{
$message = json_decode($frame->data, true);
// ✅ 验证消息格式
if (!$message || !isset($message['type'])) {
$server->push($frame->fd, json_encode([
'type' => 'error',
'data' => ['message' => 'Invalid message format']
]));
return;
}
// 处理消息
// ...
}
3. 速率限制
// 防止消息滥用
private $messageRateLimits = [];
public function checkRateLimit(int $fd): bool
{
$key = 'fd:' . $fd;
$now = time();
if (!isset($this->messageRateLimits[$key])) {
$this->messageRateLimits[$key] = [];
}
// 清理旧记录
$this->messageRateLimits[$key] = array_filter(
$this->messageRateLimits[$key],
fn($time) => $time > $now - 60
);
// 检查频率(每分钟最多 60 条)
if (count($this->messageRateLimits[$key]) >= 60) {
return false;
}
$this->messageRateLimits[$key][] = $now;
return true;
}
4. 权限控制
// 确保用户只能查看和操作自己的通知
public function index(Request $request)
{
$userId = $request->user()->id;
$notifications = Notification::where('user_id', $userId)
->where(function($query) use ($request) {
// 应用搜索和筛选条件
if ($request->has('keyword')) {
$query->where('title', 'like', '%' . $request->keyword . '%');
}
// ...
})
->paginate($request->page_size ?? 20);
return response()->json($notifications);
}
5. 数据安全
- 敏感信息不要放在通知内容中
- 使用参数验证和过滤
- SQL 注入防护(使用 Eloquent ORM)
- XSS 防护(使用
htmlspecialchars或类似函数)
前端安全
1. Token 管理
// 不要在前端硬编码 token
// 从 store 中动态获取
import { useUserStore } from '@/stores/modules/user'
const userStore = useUserStore()
const ws = getWebSocket(userStore.userInfo.id, userStore.token)
2. 消息过滤
// 处理接收到的消息时,进行过滤和验证
function handleMessage(message) {
// 验证消息格式
if (!message || !message.type || !message.data) {
console.warn('Invalid message format:', message)
return
}
// 过滤敏感内容
if (containsSensitiveContent(message.data)) {
console.warn('Message contains sensitive content')
return
}
// 处理消息
// ...
}
3. 连接限制
// 限制每个用户的连接数量
const MAX_CONNECTIONS_PER_USER = 3
function canConnect(userId) {
const existingConnections = getConnectionsByUser(userId)
return existingConnections.length < MAX_CONNECTIONS_PER_USER
}
最佳实践
1. 通知类型选择
- info: 一般信息,如欢迎消息、功能更新
- success: 成功操作,如创建成功、导入成功
- warning: 警告信息,如即将过期、维护通知
- error: 错误信息,如执行失败、验证错误
- task: 任务相关,如任务提醒、执行结果
- system: 系统级,如系统维护、重要公告
2. 通知分类选择
- system: 系统管理、配置变更
- task: 定时任务、后台任务
- message: 用户消息、聊天消息
- reminder: 日程提醒、待办事项
- announcement: 公告、通知
3. 避免通知轰炸
- 合理设置通知频率
- 对相似通知进行合并
- 提供通知偏好设置
- 允许用户关闭特定类型的通知
4. 异步处理
// 大量通知建议使用队列异步处理
use App\Jobs\SendNotificationJob;
// 发送大量通知
dispatch(new SendNotificationJob(
$userIds,
$title,
$content,
$type,
$category
))->onQueue('notifications');
5. 错误处理
// 前端错误处理
async function markAsRead(notificationId) {
try {
await api.notifications.read(notificationId)
// 更新本地状态
} catch (error) {
console.error('标记已读失败:', error)
// 显示错误提示
message.error('标记已读失败,请重试')
}
}
6. 日志记录
// 后端日志记录
use Illuminate\Support\Facades\Log;
public function sendToUser(int $userId, array $data): bool
{
Log::info('Sending notification to user', [
'user_id' => $userId,
'type' => $data['type']
]);
// ... 发送逻辑
Log::info('Notification sent successfully', [
'user_id' => $userId,
'notification_id' => $notification->id
]);
return true;
}
7. 测试
// 单元测试示例
public function test_send_notification()
{
$result = $this->service->sendNotification([
'user_id' => 1,
'title' => 'Test',
'content' => 'Test content'
]);
$this->assertTrue($result['success']);
$this->assertDatabaseHas('system_notifications', [
'title' => 'Test'
]);
}
更新日志
2024-02-18
- ✅ 初始版本发布
- ✅ 实现基础 WebSocket 功能
- ✅ 实现通知系统功能
- ✅ 实现消息推送功能
- ✅ 实现频道订阅功能
- ✅ 实现前端客户端封装
- ✅ 实现管理 API 接口
- ✅ 修复 dispatch_mode 配置问题
- ✅ 修复 wsTable 访问问题
- ✅ 添加定时任务重试功能
参考资料
文档版本: v2.0
更新日期: 2024-02-18
维护者: Development Team