Skip to content

WebSockets 实时通信

一、适用场景

场景协议选择
聊天室、私信WebSocket
实时通知(订单状态、消息提醒)WebSocket 或 SSE
在线协作(文档共编)WebSocket
实时数据大屏(单向推送)SSE(更简单)
游戏、光标同步(高频双向)WebSocket

二、安装配置

bash
npm install @nestjs/websockets @nestjs/platform-socket.io socket.io

NestJS 支持两种底层实现:socket.io(功能丰富,有房间/命名空间)和 ws(轻量原生)。生产推荐 socket.io


三、基础 Gateway

@WebSocketGateway 等同于 HTTP 的 @Controller,但处理 WebSocket 事件:

typescript
// chat/chat.gateway.ts
import {
  WebSocketGateway, WebSocketServer,
  SubscribeMessage, MessageBody,
  ConnectedSocket, OnGatewayConnection, OnGatewayDisconnect,
} from '@nestjs/websockets';
import { Server, Socket } from 'socket.io';

@WebSocketGateway({
  cors: {
    origin: ['http://localhost:4200', 'https://yourapp.com'],
    credentials: true,
  },
  namespace: '/chat',   // 命名空间(可选)
})
export class ChatGateway implements OnGatewayConnection, OnGatewayDisconnect {
  @WebSocketServer()
  server: Server;  // socket.io Server 实例

  private readonly logger = new Logger(ChatGateway.name);

  // 客户端连接时触发
  handleConnection(client: Socket) {
    this.logger.log(`客户端连接: ${client.id}`);
  }

  // 客户端断开时触发
  handleDisconnect(client: Socket) {
    this.logger.log(`客户端断开: ${client.id}`);
  }

  // 监听 'send-message' 事件
  @SubscribeMessage('send-message')
  handleMessage(
    @ConnectedSocket() client: Socket,
    @MessageBody() data: { roomId: string; content: string },
  ) {
    const message = {
      id: randomUUID(),
      content: data.content,
      senderId: client.data.userId,  // 从认证中间件注入
      createdAt: new Date(),
    };

    // 广播到房间内所有人(包括发送者)
    this.server.to(data.roomId).emit('new-message', message);

    return message;  // 返回值作为 ACK 发回给发送者
  }

  // 加入房间
  @SubscribeMessage('join-room')
  async joinRoom(
    @ConnectedSocket() client: Socket,
    @MessageBody() data: { roomId: string },
  ) {
    await client.join(data.roomId);
    // 通知房间内其他人
    client.to(data.roomId).emit('user-joined', {
      userId: client.data.userId,
      roomId: data.roomId,
    });
    return { success: true };
  }

  // 离开房间
  @SubscribeMessage('leave-room')
  async leaveRoom(
    @ConnectedSocket() client: Socket,
    @MessageBody() data: { roomId: string },
  ) {
    await client.leave(data.roomId);
    client.to(data.roomId).emit('user-left', { userId: client.data.userId });
  }
}

四、WebSocket 认证

Socket.io 连接时通过握手携带 Token:

typescript
// chat/chat.gateway.ts
import { JwtService } from '@nestjs/jwt';

@WebSocketGateway({ cors: { origin: '*' } })
export class ChatGateway implements OnGatewayConnection {
  constructor(private jwtService: JwtService) {}

  async handleConnection(client: Socket) {
    try {
      // Token 从握手 auth 对象或 Authorization header 获取
      const token =
        client.handshake.auth?.token ||
        client.handshake.headers.authorization?.replace('Bearer ', '');

      if (!token) throw new UnauthorizedException('缺少 Token');

      const payload = this.jwtService.verify(token);
      // 将用户信息附加到 socket 实例上
      client.data.userId = payload.sub;
      client.data.role = payload.role;

      this.logger.log(`用户 ${payload.sub} 已连接: ${client.id}`);
    } catch (err) {
      // 认证失败,断开连接
      client.emit('error', { message: '认证失败' });
      client.disconnect(true);
    }
  }
}
typescript
// 前端 socket.io 客户端携带 Token
import { io } from 'socket.io-client';

const socket = io('http://localhost:3000/chat', {
  auth: { token: localStorage.getItem('access_token') },
});

五、守卫与拦截器在 WebSocket 中的应用

typescript
// common/guards/ws-jwt.guard.ts
import { CanActivate, ExecutionContext, Injectable } from '@nestjs/common';
import { WsException } from '@nestjs/websockets';

@Injectable()
export class WsJwtGuard implements CanActivate {
  constructor(private jwtService: JwtService) {}

  canActivate(context: ExecutionContext): boolean {
    const client: Socket = context.switchToWs().getClient();

    // 在 handleConnection 中已验证,这里直接检查是否有用户数据
    if (!client.data.userId) {
      throw new WsException('未认证');
    }
    return true;
  }
}

// 使用
@UseGuards(WsJwtGuard)
@SubscribeMessage('send-message')
handleMessage(@ConnectedSocket() client: Socket, @MessageBody() data: any) {
  // 到这里已确认用户已认证
}

六、从 HTTP 服务推送 WebSocket 消息

在 HTTP Controller 中触发 WebSocket 推送(常见模式):

typescript
// notifications/notifications.service.ts
@Injectable()
export class NotificationsService {
  constructor(
    // 注入 Gateway
    private chatGateway: ChatGateway,
  ) {}

  // 当 HTTP 接口创建订单时,推送通知给相关用户
  async sendOrderNotification(userId: number, orderId: number) {
    // 找到该用户的所有 socket 连接
    const sockets = await this.chatGateway.server.fetchSockets();
    const userSockets = sockets.filter(s => s.data.userId === userId);

    userSockets.forEach(socket => {
      socket.emit('order-update', {
        orderId,
        status: 'confirmed',
        message: '你的订单已确认',
      });
    });
  }
}

// orders/orders.controller.ts
@Post()
async createOrder(@Body() dto: CreateOrderDto, @CurrentUser() user: User) {
  const order = await this.ordersService.create(dto, user.id);

  // HTTP 接口处理完后推送 WebSocket 通知
  await this.notificationsService.sendOrderNotification(user.id, order.id);

  return order;
}

七、房间管理与在线状态

typescript
@WebSocketGateway({ namespace: '/presence' })
export class PresenceGateway implements OnGatewayConnection, OnGatewayDisconnect {
  @WebSocketServer() server: Server;

  // 用户 ID → Set<socketId>(一个用户可多设备在线)
  private onlineUsers = new Map<number, Set<string>>();

  async handleConnection(client: Socket) {
    const userId = client.data.userId;
    if (!this.onlineUsers.has(userId)) {
      this.onlineUsers.set(userId, new Set());
    }
    this.onlineUsers.get(userId).add(client.id);

    // 广播上线通知
    this.server.emit('user-online', { userId });
  }

  async handleDisconnect(client: Socket) {
    const userId = client.data.userId;
    const sockets = this.onlineUsers.get(userId);
    if (sockets) {
      sockets.delete(client.id);
      if (sockets.size === 0) {
        // 所有设备都离线了
        this.onlineUsers.delete(userId);
        this.server.emit('user-offline', { userId });
      }
    }
  }

  @SubscribeMessage('get-online-users')
  getOnlineUsers() {
    return { onlineUserIds: [...this.onlineUsers.keys()] };
  }
}

八、多实例部署(Redis Adapter)

多个实例时,socket.io 默认只在单实例内广播,需要 Redis Adapter 跨实例同步:

bash
npm install @socket.io/redis-adapter ioredis
typescript
// main.ts
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);

  const pubClient = createClient({ url: process.env.REDIS_URL });
  const subClient = pubClient.duplicate();
  await Promise.all([pubClient.connect(), subClient.connect()]);

  const io = app.get(IoAdapter);
  io.createIOServer = (port, options) => {
    const server = super.createIOServer(port, options);
    server.adapter(createAdapter(pubClient, subClient));
    return server;
  };

  await app.listen(3000);
}

九、异常处理

typescript
// WebSocket 使用 WsException,而不是 HttpException
import { WsException } from '@nestjs/websockets';

@SubscribeMessage('send-message')
handleMessage(@MessageBody() data: any) {
  if (!data.content?.trim()) {
    throw new WsException('消息内容不能为空');
  }
  // ...
}

// 全局 WebSocket 异常过滤器
@Catch(WsException)
export class WsExceptionFilter implements ExceptionFilter {
  catch(exception: WsException, host: ArgumentsHost) {
    const client = host.switchToWs().getClient<Socket>();
    const error = exception.getError();
    client.emit('error', {
      message: typeof error === 'string' ? error : error['message'],
      timestamp: new Date().toISOString(),
    });
  }
}

十、前端使用示例

typescript
// 前端 socket.io 客户端(TypeScript)
import { io, Socket } from 'socket.io-client';

const socket: Socket = io('http://localhost:3000/chat', {
  auth: { token: 'Bearer eyJhbGci...' },
  transports: ['websocket'],  // 跳过 HTTP 轮询,直接 WebSocket
  reconnection: true,
  reconnectionAttempts: 5,
  reconnectionDelay: 1000,
});

socket.on('connect', () => console.log('已连接:', socket.id));
socket.on('connect_error', (err) => console.error('连接失败:', err.message));
socket.on('disconnect', (reason) => console.log('断开:', reason));

// 发送消息(带 ACK 回调)
socket.emit('send-message', { roomId: 'room-1', content: 'Hello' }, (ack) => {
  console.log('消息已发送,服务端确认:', ack);
});

// 监听新消息
socket.on('new-message', (message) => {
  console.log('收到新消息:', message);
});

// 加入房间
socket.emit('join-room', { roomId: 'room-1' });

// 断开连接
socket.disconnect();

可运行 Demo: practice/07-extensions — Chat Gateway Demo,用 test-ws-client.html 在浏览器测试


常见错误

错误原因解决
客户端连接立即断开handleConnection 中认证失败调用了 client.disconnect()检查 Token 格式;确认 client.handshake.auth.token 字段名与客户端一致
消息广播只在当前实例生效多实例部署未配置 Redis Adapter安装 @socket.io/redis-adapter,在 main.ts 中配置
@SubscribeMessage 不触发客户端 emit 的事件名与装饰器参数不一致(大小写)统一用小写+连字符命名,如 'send-message'
Gateway 中无法注入其他 ServiceGateway 未加入模块的 providers@Module({ providers: [ChatGateway, OtherService] }) 中同时注册
server.fetchSockets() 只返回当前实例 socket多实例下各自维护内存配置 Redis Adapter 后 fetchSockets() 会跨实例查询
CORS 报错@WebSocketGateway 默认不开 CORS配置 cors: { origin: 'http://yourapp.com', credentials: true }

NestJS 深度学习体系