Skip to content

传输层与消息模式

一、两种消息模式

NestJS 微服务支持两种根本不同的通信模式:

模式装饰器发送方法特点
请求-响应@MessagePatternclient.send()同步,等待返回值
事件驱动@EventPatternclient.emit()异步,不等待结果
请求-响应(适合:查询数据、需要结果的操作):
Gateway ──send('find_user', {id})──→ user-service
Gateway ←──{ id: 1, email: ... }──── user-service

事件驱动(适合:通知、副作用操作):
Gateway ──emit('user_deleted', {id})──→ user-service(处理日志)
                                      ──→ content-service(删除文章)

二、请求-响应模式(MessagePattern)

服务端

typescript
// user-service/src/users/users.controller.ts
import { Controller } from '@nestjs/common';
import { MessagePattern, Payload, RpcException } from '@nestjs/microservices';

@Controller()
export class UsersController {
  constructor(private usersService: UsersService) {}

  // 创建用户
  @MessagePattern('create_user')
  async createUser(@Payload() dto: CreateUserDto) {
    return this.usersService.create(dto);
  }

  // 查询单个用户
  @MessagePattern('find_user')
  async findUser(@Payload() data: { id: number }) {
    const user = await this.usersService.findOne(data.id);
    if (!user) {
      throw new RpcException({
        statusCode: 404,
        message: `用户 #${data.id} 不存在`,
      });
    }
    return user;
  }

  // 验证用户(供 auth-service 调用)
  @MessagePattern('validate_user')
  async validateUser(@Payload() data: { email: string; password: string }) {
    return this.usersService.validateCredentials(data.email, data.password);
  }

  // 分页查询
  @MessagePattern('find_users')
  findMany(@Payload() query: { page: number; limit: number; role?: string }) {
    return this.usersService.findPaginated(query);
  }
}

客户端(Gateway)

typescript
// gateway/src/users/users.service.ts
import { Injectable, Inject, NotFoundException } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { catchError, firstValueFrom, throwError, timeout } from 'rxjs';

@Injectable()
export class UsersProxyService {
  constructor(
    @Inject('USER_SERVICE') private userClient: ClientProxy,
  ) {}

  async findOne(id: number) {
    return firstValueFrom(
      this.userClient.send('find_user', { id }).pipe(
        timeout(5000),  // 5秒超时
        catchError(err => {
          if (err.statusCode === 404) {
            return throwError(() => new NotFoundException(err.message));
          }
          return throwError(() => new Error('user-service 不可用'));
        }),
      ),
    );
  }

  async findMany(query: { page: number; limit: number }) {
    return firstValueFrom(
      this.userClient.send('find_users', query).pipe(
        timeout(3000),
      ),
    );
  }
}

三、事件驱动模式(EventPattern)

发布事件

typescript
// user-service:用户被删除后发布事件
@MessagePattern('delete_user')
async deleteUser(@Payload() data: { id: number }) {
  await this.usersService.remove(data.id);

  // 通知其他服务(不等待)
  this.eventBus.emit('user_deleted', {
    userId: data.id,
    deletedAt: new Date().toISOString(),
  });

  return { success: true };
}
typescript
// gateway 也可以直接发布事件
@Delete(':id')
@Roles(Role.ADMIN)
async deleteUser(@Param('id', ParseIntPipe) id: number) {
  await firstValueFrom(this.userClient.send('delete_user', { id }));

  // 广播事件到所有订阅者
  this.userClient.emit('user_deleted', { userId: id });

  return { message: '删除成功' };
}

订阅事件

typescript
// content-service:清理该用户的文章
@Controller()
export class ContentEventController {
  constructor(private postsService: PostsService) {}

  @EventPattern('user_deleted')
  async handleUserDeleted(@Payload() data: { userId: number; deletedAt: string }) {
    console.log(`接收到用户删除事件: userId=${data.userId}`);
    // 软删除或硬删除该用户的所有文章
    await this.postsService.deleteByAuthor(data.userId);
  }
}

// notification-service:发送账户注销通知邮件
@Controller()
export class NotificationEventController {
  @EventPattern('user_deleted')
  async handleUserDeleted(@Payload() data: { userId: number }) {
    await this.emailService.sendAccountDeletionEmail(data.userId);
  }
}

四、错误处理

RpcException 的使用

typescript
// 服务端:抛出可序列化传输的异常
import { RpcException } from '@nestjs/microservices';

@MessagePattern('find_user')
async findUser(@Payload() data: { id: number }) {
  const user = await this.usersService.findOne(data.id);
  if (!user) {
    throw new RpcException({
      statusCode: 404,
      message: '用户不存在',
      error: 'Not Found',
    });
  }
  return user;
}
typescript
// 客户端:将 RPC 异常转换为 HTTP 异常
findOne(id: number) {
  return this.userClient.send('find_user', { id }).pipe(
    catchError(err => {
      // err 是从服务端序列化过来的对象
      switch (err.statusCode) {
        case 400: throw new BadRequestException(err.message);
        case 404: throw new NotFoundException(err.message);
        case 403: throw new ForbiddenException(err.message);
        default:  throw new InternalServerErrorException('下游服务异常');
      }
    }),
  );
}

超时处理

typescript
import { timeout, catchError, throwError } from 'rxjs';
import { TimeoutError } from 'rxjs';

findUser(id: number) {
  return this.userClient.send('find_user', { id }).pipe(
    timeout(3000),
    catchError(err => {
      if (err instanceof TimeoutError) {
        return throwError(() => new ServiceUnavailableException('user-service 响应超时'));
      }
      return throwError(() => err);
    }),
  );
}

五、各传输层配置详解

TCP(最简单)

typescript
// 服务端
NestFactory.createMicroservice({
  transport: Transport.TCP,
  options: { host: '0.0.0.0', port: 3001 },
});

// 客户端
ClientsModule.register([{
  name: 'USER_SERVICE',
  transport: Transport.TCP,
  options: { host: 'user-service', port: 3001 },
}])

Redis

typescript
// 服务端
NestFactory.createMicroservice({
  transport: Transport.REDIS,
  options: {
    host: 'redis',
    port: 6379,
    password: process.env.REDIS_PASSWORD,
    retryAttempts: 5,
    retryDelay: 1000,
  },
});

RabbitMQ

bash
npm install amqplib amqp-connection-manager
typescript
// 服务端
NestFactory.createMicroservice({
  transport: Transport.RMQ,
  options: {
    urls: ['amqp://user:password@rabbitmq:5672'],
    queue: 'user_service_queue',
    queueOptions: {
      durable: true,      // 队列重启后保留
    },
    prefetchCount: 1,     // 每次只取一条消息(公平分发)
    noAck: false,         // 手动确认消息(确保不丢失)
  },
});

Kafka

bash
npm install kafkajs
typescript
// 服务端
NestFactory.createMicroservice({
  transport: Transport.KAFKA,
  options: {
    client: {
      brokers: ['kafka:9092'],
      clientId: 'user-service',
    },
    consumer: {
      groupId: 'user-service-consumer',  // 消费组
    },
  },
});

// Kafka 事件模式(使用 topic 名称)
@EventPattern('user-events')   // topic 名称
async handleUserEvent(@Payload() message: KafkaMessage) {
  const { key, value } = message;
  // value 是 Kafka 消息体
}

gRPC

bash
npm install @grpc/grpc-js @grpc/proto-loader
protobuf
// user.proto
syntax = "proto3";
package user;

service UserService {
  rpc FindOne (FindOneRequest) returns (User);
  rpc CreateUser (CreateUserRequest) returns (User);
  rpc FindAll (FindAllRequest) returns (UserList);
}

message User {
  int32 id = 1;
  string email = 2;
  string name = 3;
}

message FindOneRequest { int32 id = 1; }
typescript
// gRPC 服务端
NestFactory.createMicroservice({
  transport: Transport.GRPC,
  options: {
    package: 'user',
    protoPath: join(__dirname, '../proto/user.proto'),
    url: '0.0.0.0:50051',
  },
});

// gRPC 服务端 Controller(使用 @GrpcMethod 而非 @MessagePattern)
import { GrpcMethod } from '@nestjs/microservices';

@Controller()
export class UsersGrpcController {
  @GrpcMethod('UserService', 'FindOne')
  findOne(data: { id: number }) {
    return this.usersService.findOne(data.id);
  }
}

六、消息确认与幂等性

RabbitMQ 手动确认

typescript
@EventPattern('order_created')
async handleOrderCreated(
  @Payload() data: any,
  @Ctx() context: RmqContext,
) {
  const channel = context.getChannelRef();
  const message = context.getMessage();

  try {
    await this.processOrder(data);
    channel.ack(message);         // 处理成功,确认消息
  } catch (err) {
    channel.nack(message, false, true);  // 处理失败,重新入队
  }
}

幂等性处理(避免重复消费)

typescript
@EventPattern('user_created')
async handleUserCreated(@Payload() data: { id: number; eventId: string }) {
  // 检查事件是否已处理(Redis 或数据库)
  const processed = await this.redis.get(`event:${data.eventId}`);
  if (processed) {
    console.log(`事件 ${data.eventId} 已处理,跳过`);
    return;
  }

  // 处理业务逻辑
  await this.createWelcomeContent(data.id);

  // 标记已处理(TTL 设为消息最大延迟时间的 2 倍)
  await this.redis.set(`event:${data.eventId}`, '1', 'EX', 86400);
}

可运行 Demo: practice/05-microservices — TCP Transport MessagePattern/EventPattern 示例


常见错误

错误原因解决
ClientProxy.send() 永不 resolve没有订阅 Observable必须 firstValueFrom(client.send(...)).subscribe()
@EventPattern 事件未触发使用了 send()(请求-响应)而不是 emit()(事件)单向事件用 client.emit(),对应 @EventPattern
RabbitMQ 消息重复消费消费者未手动 acknoAck: false + 处理完成后 channel.ack(message)
Redis 传输层消息丢失Redis 重启后 Pub/Sub 连接断开生产环境用支持持久化的 RabbitMQ 或 Kafka

NestJS 深度学习体系