WebSockets 实时通信
一、适用场景
| 场景 | 协议选择 |
|---|---|
| 聊天室、私信 | WebSocket |
| 实时通知(订单状态、消息提醒) | WebSocket 或 SSE |
| 在线协作(文档共编) | WebSocket |
| 实时数据大屏(单向推送) | SSE(更简单) |
| 游戏、光标同步(高频双向) | WebSocket |
二、安装配置
bash
npm install @nestjs/websockets @nestjs/platform-socket.io socket.ioNestJS 支持两种底层实现:socket.io(功能丰富,有房间/命名空间)和 ws(轻量原生)。生产推荐 socket.io。
三、基础 Gateway
@WebSocketGateway 等同于 HTTP 的 @Controller,但处理 WebSocket 事件:
typescript
// chat/chat.gateway.ts
import {
WebSocketGateway, WebSocketServer,
SubscribeMessage, MessageBody,
ConnectedSocket, OnGatewayConnection, OnGatewayDisconnect,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';
@WebSocketGateway({
cors: {
origin: ['http://localhost:4200', 'https://yourapp.com'],
credentials: true,
},
namespace: '/chat', // 命名空间(可选)
})
export class ChatGateway implements OnGatewayConnection, OnGatewayDisconnect {
@WebSocketServer()
server: Server; // socket.io Server 实例
private readonly logger = new Logger(ChatGateway.name);
// 客户端连接时触发
handleConnection(client: Socket) {
this.logger.log(`客户端连接: ${client.id}`);
}
// 客户端断开时触发
handleDisconnect(client: Socket) {
this.logger.log(`客户端断开: ${client.id}`);
}
// 监听 'send-message' 事件
@SubscribeMessage('send-message')
handleMessage(
@ConnectedSocket() client: Socket,
@MessageBody() data: { roomId: string; content: string },
) {
const message = {
id: randomUUID(),
content: data.content,
senderId: client.data.userId, // 从认证中间件注入
createdAt: new Date(),
};
// 广播到房间内所有人(包括发送者)
this.server.to(data.roomId).emit('new-message', message);
return message; // 返回值作为 ACK 发回给发送者
}
// 加入房间
@SubscribeMessage('join-room')
async joinRoom(
@ConnectedSocket() client: Socket,
@MessageBody() data: { roomId: string },
) {
await client.join(data.roomId);
// 通知房间内其他人
client.to(data.roomId).emit('user-joined', {
userId: client.data.userId,
roomId: data.roomId,
});
return { success: true };
}
// 离开房间
@SubscribeMessage('leave-room')
async leaveRoom(
@ConnectedSocket() client: Socket,
@MessageBody() data: { roomId: string },
) {
await client.leave(data.roomId);
client.to(data.roomId).emit('user-left', { userId: client.data.userId });
}
}四、WebSocket 认证
Socket.io 连接时通过握手携带 Token:
typescript
// chat/chat.gateway.ts
import { JwtService } from '@nestjs/jwt';
@WebSocketGateway({ cors: { origin: '*' } })
export class ChatGateway implements OnGatewayConnection {
constructor(private jwtService: JwtService) {}
async handleConnection(client: Socket) {
try {
// Token 从握手 auth 对象或 Authorization header 获取
const token =
client.handshake.auth?.token ||
client.handshake.headers.authorization?.replace('Bearer ', '');
if (!token) throw new UnauthorizedException('缺少 Token');
const payload = this.jwtService.verify(token);
// 将用户信息附加到 socket 实例上
client.data.userId = payload.sub;
client.data.role = payload.role;
this.logger.log(`用户 ${payload.sub} 已连接: ${client.id}`);
} catch (err) {
// 认证失败,断开连接
client.emit('error', { message: '认证失败' });
client.disconnect(true);
}
}
}typescript
// 前端 socket.io 客户端携带 Token
import { io } from 'socket.io-client';
const socket = io('http://localhost:3000/chat', {
auth: { token: localStorage.getItem('access_token') },
});五、守卫与拦截器在 WebSocket 中的应用
typescript
// common/guards/ws-jwt.guard.ts
import { CanActivate, ExecutionContext, Injectable } from '@nestjs/common';
import { WsException } from '@nestjs/websockets';
@Injectable()
export class WsJwtGuard implements CanActivate {
constructor(private jwtService: JwtService) {}
canActivate(context: ExecutionContext): boolean {
const client: Socket = context.switchToWs().getClient();
// 在 handleConnection 中已验证,这里直接检查是否有用户数据
if (!client.data.userId) {
throw new WsException('未认证');
}
return true;
}
}
// 使用
@UseGuards(WsJwtGuard)
@SubscribeMessage('send-message')
handleMessage(@ConnectedSocket() client: Socket, @MessageBody() data: any) {
// 到这里已确认用户已认证
}六、从 HTTP 服务推送 WebSocket 消息
在 HTTP Controller 中触发 WebSocket 推送(常见模式):
typescript
// notifications/notifications.service.ts
@Injectable()
export class NotificationsService {
constructor(
// 注入 Gateway
private chatGateway: ChatGateway,
) {}
// 当 HTTP 接口创建订单时,推送通知给相关用户
async sendOrderNotification(userId: number, orderId: number) {
// 找到该用户的所有 socket 连接
const sockets = await this.chatGateway.server.fetchSockets();
const userSockets = sockets.filter(s => s.data.userId === userId);
userSockets.forEach(socket => {
socket.emit('order-update', {
orderId,
status: 'confirmed',
message: '你的订单已确认',
});
});
}
}
// orders/orders.controller.ts
@Post()
async createOrder(@Body() dto: CreateOrderDto, @CurrentUser() user: User) {
const order = await this.ordersService.create(dto, user.id);
// HTTP 接口处理完后推送 WebSocket 通知
await this.notificationsService.sendOrderNotification(user.id, order.id);
return order;
}七、房间管理与在线状态
typescript
@WebSocketGateway({ namespace: '/presence' })
export class PresenceGateway implements OnGatewayConnection, OnGatewayDisconnect {
@WebSocketServer() server: Server;
// 用户 ID → Set<socketId>(一个用户可多设备在线)
private onlineUsers = new Map<number, Set<string>>();
async handleConnection(client: Socket) {
const userId = client.data.userId;
if (!this.onlineUsers.has(userId)) {
this.onlineUsers.set(userId, new Set());
}
this.onlineUsers.get(userId).add(client.id);
// 广播上线通知
this.server.emit('user-online', { userId });
}
async handleDisconnect(client: Socket) {
const userId = client.data.userId;
const sockets = this.onlineUsers.get(userId);
if (sockets) {
sockets.delete(client.id);
if (sockets.size === 0) {
// 所有设备都离线了
this.onlineUsers.delete(userId);
this.server.emit('user-offline', { userId });
}
}
}
@SubscribeMessage('get-online-users')
getOnlineUsers() {
return { onlineUserIds: [...this.onlineUsers.keys()] };
}
}八、多实例部署(Redis Adapter)
多个实例时,socket.io 默认只在单实例内广播,需要 Redis Adapter 跨实例同步:
bash
npm install @socket.io/redis-adapter ioredistypescript
// main.ts
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
const pubClient = createClient({ url: process.env.REDIS_URL });
const subClient = pubClient.duplicate();
await Promise.all([pubClient.connect(), subClient.connect()]);
const io = app.get(IoAdapter);
io.createIOServer = (port, options) => {
const server = super.createIOServer(port, options);
server.adapter(createAdapter(pubClient, subClient));
return server;
};
await app.listen(3000);
}九、异常处理
typescript
// WebSocket 使用 WsException,而不是 HttpException
import { WsException } from '@nestjs/websockets';
@SubscribeMessage('send-message')
handleMessage(@MessageBody() data: any) {
if (!data.content?.trim()) {
throw new WsException('消息内容不能为空');
}
// ...
}
// 全局 WebSocket 异常过滤器
@Catch(WsException)
export class WsExceptionFilter implements ExceptionFilter {
catch(exception: WsException, host: ArgumentsHost) {
const client = host.switchToWs().getClient<Socket>();
const error = exception.getError();
client.emit('error', {
message: typeof error === 'string' ? error : error['message'],
timestamp: new Date().toISOString(),
});
}
}十、前端使用示例
typescript
// 前端 socket.io 客户端(TypeScript)
import { io, Socket } from 'socket.io-client';
const socket: Socket = io('http://localhost:3000/chat', {
auth: { token: 'Bearer eyJhbGci...' },
transports: ['websocket'], // 跳过 HTTP 轮询,直接 WebSocket
reconnection: true,
reconnectionAttempts: 5,
reconnectionDelay: 1000,
});
socket.on('connect', () => console.log('已连接:', socket.id));
socket.on('connect_error', (err) => console.error('连接失败:', err.message));
socket.on('disconnect', (reason) => console.log('断开:', reason));
// 发送消息(带 ACK 回调)
socket.emit('send-message', { roomId: 'room-1', content: 'Hello' }, (ack) => {
console.log('消息已发送,服务端确认:', ack);
});
// 监听新消息
socket.on('new-message', (message) => {
console.log('收到新消息:', message);
});
// 加入房间
socket.emit('join-room', { roomId: 'room-1' });
// 断开连接
socket.disconnect();可运行 Demo:
practice/07-extensions— Chat Gateway Demo,用test-ws-client.html在浏览器测试
常见错误
| 错误 | 原因 | 解决 |
|---|---|---|
| 客户端连接立即断开 | handleConnection 中认证失败调用了 client.disconnect() | 检查 Token 格式;确认 client.handshake.auth.token 字段名与客户端一致 |
| 消息广播只在当前实例生效 | 多实例部署未配置 Redis Adapter | 安装 @socket.io/redis-adapter,在 main.ts 中配置 |
@SubscribeMessage 不触发 | 客户端 emit 的事件名与装饰器参数不一致(大小写) | 统一用小写+连字符命名,如 'send-message' |
| Gateway 中无法注入其他 Service | Gateway 未加入模块的 providers | 在 @Module({ providers: [ChatGateway, OtherService] }) 中同时注册 |
server.fetchSockets() 只返回当前实例 socket | 多实例下各自维护内存 | 配置 Redis Adapter 后 fetchSockets() 会跨实例查询 |
| CORS 报错 | @WebSocketGateway 默认不开 CORS | 配置 cors: { origin: 'http://yourapp.com', credentials: true } |