传输层与消息模式
一、两种消息模式
NestJS 微服务支持两种根本不同的通信模式:
| 模式 | 装饰器 | 发送方法 | 特点 |
|---|---|---|---|
| 请求-响应 | @MessagePattern | client.send() | 同步,等待返回值 |
| 事件驱动 | @EventPattern | client.emit() | 异步,不等待结果 |
请求-响应(适合:查询数据、需要结果的操作):
Gateway ──send('find_user', {id})──→ user-service
Gateway ←──{ id: 1, email: ... }──── user-service
事件驱动(适合:通知、副作用操作):
Gateway ──emit('user_deleted', {id})──→ user-service(处理日志)
──→ content-service(删除文章)二、请求-响应模式(MessagePattern)
服务端
typescript
// user-service/src/users/users.controller.ts
import { Controller } from '@nestjs/common';
import { MessagePattern, Payload, RpcException } from '@nestjs/microservices';
@Controller()
export class UsersController {
constructor(private usersService: UsersService) {}
// 创建用户
@MessagePattern('create_user')
async createUser(@Payload() dto: CreateUserDto) {
return this.usersService.create(dto);
}
// 查询单个用户
@MessagePattern('find_user')
async findUser(@Payload() data: { id: number }) {
const user = await this.usersService.findOne(data.id);
if (!user) {
throw new RpcException({
statusCode: 404,
message: `用户 #${data.id} 不存在`,
});
}
return user;
}
// 验证用户(供 auth-service 调用)
@MessagePattern('validate_user')
async validateUser(@Payload() data: { email: string; password: string }) {
return this.usersService.validateCredentials(data.email, data.password);
}
// 分页查询
@MessagePattern('find_users')
findMany(@Payload() query: { page: number; limit: number; role?: string }) {
return this.usersService.findPaginated(query);
}
}客户端(Gateway)
typescript
// gateway/src/users/users.service.ts
import { Injectable, Inject, NotFoundException } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { catchError, firstValueFrom, throwError, timeout } from 'rxjs';
@Injectable()
export class UsersProxyService {
constructor(
@Inject('USER_SERVICE') private userClient: ClientProxy,
) {}
async findOne(id: number) {
return firstValueFrom(
this.userClient.send('find_user', { id }).pipe(
timeout(5000), // 5秒超时
catchError(err => {
if (err.statusCode === 404) {
return throwError(() => new NotFoundException(err.message));
}
return throwError(() => new Error('user-service 不可用'));
}),
),
);
}
async findMany(query: { page: number; limit: number }) {
return firstValueFrom(
this.userClient.send('find_users', query).pipe(
timeout(3000),
),
);
}
}三、事件驱动模式(EventPattern)
发布事件
typescript
// user-service:用户被删除后发布事件
@MessagePattern('delete_user')
async deleteUser(@Payload() data: { id: number }) {
await this.usersService.remove(data.id);
// 通知其他服务(不等待)
this.eventBus.emit('user_deleted', {
userId: data.id,
deletedAt: new Date().toISOString(),
});
return { success: true };
}typescript
// gateway 也可以直接发布事件
@Delete(':id')
@Roles(Role.ADMIN)
async deleteUser(@Param('id', ParseIntPipe) id: number) {
await firstValueFrom(this.userClient.send('delete_user', { id }));
// 广播事件到所有订阅者
this.userClient.emit('user_deleted', { userId: id });
return { message: '删除成功' };
}订阅事件
typescript
// content-service:清理该用户的文章
@Controller()
export class ContentEventController {
constructor(private postsService: PostsService) {}
@EventPattern('user_deleted')
async handleUserDeleted(@Payload() data: { userId: number; deletedAt: string }) {
console.log(`接收到用户删除事件: userId=${data.userId}`);
// 软删除或硬删除该用户的所有文章
await this.postsService.deleteByAuthor(data.userId);
}
}
// notification-service:发送账户注销通知邮件
@Controller()
export class NotificationEventController {
@EventPattern('user_deleted')
async handleUserDeleted(@Payload() data: { userId: number }) {
await this.emailService.sendAccountDeletionEmail(data.userId);
}
}四、错误处理
RpcException 的使用
typescript
// 服务端:抛出可序列化传输的异常
import { RpcException } from '@nestjs/microservices';
@MessagePattern('find_user')
async findUser(@Payload() data: { id: number }) {
const user = await this.usersService.findOne(data.id);
if (!user) {
throw new RpcException({
statusCode: 404,
message: '用户不存在',
error: 'Not Found',
});
}
return user;
}typescript
// 客户端:将 RPC 异常转换为 HTTP 异常
findOne(id: number) {
return this.userClient.send('find_user', { id }).pipe(
catchError(err => {
// err 是从服务端序列化过来的对象
switch (err.statusCode) {
case 400: throw new BadRequestException(err.message);
case 404: throw new NotFoundException(err.message);
case 403: throw new ForbiddenException(err.message);
default: throw new InternalServerErrorException('下游服务异常');
}
}),
);
}超时处理
typescript
import { timeout, catchError, throwError } from 'rxjs';
import { TimeoutError } from 'rxjs';
findUser(id: number) {
return this.userClient.send('find_user', { id }).pipe(
timeout(3000),
catchError(err => {
if (err instanceof TimeoutError) {
return throwError(() => new ServiceUnavailableException('user-service 响应超时'));
}
return throwError(() => err);
}),
);
}五、各传输层配置详解
TCP(最简单)
typescript
// 服务端
NestFactory.createMicroservice({
transport: Transport.TCP,
options: { host: '0.0.0.0', port: 3001 },
});
// 客户端
ClientsModule.register([{
name: 'USER_SERVICE',
transport: Transport.TCP,
options: { host: 'user-service', port: 3001 },
}])Redis
typescript
// 服务端
NestFactory.createMicroservice({
transport: Transport.REDIS,
options: {
host: 'redis',
port: 6379,
password: process.env.REDIS_PASSWORD,
retryAttempts: 5,
retryDelay: 1000,
},
});RabbitMQ
bash
npm install amqplib amqp-connection-managertypescript
// 服务端
NestFactory.createMicroservice({
transport: Transport.RMQ,
options: {
urls: ['amqp://user:password@rabbitmq:5672'],
queue: 'user_service_queue',
queueOptions: {
durable: true, // 队列重启后保留
},
prefetchCount: 1, // 每次只取一条消息(公平分发)
noAck: false, // 手动确认消息(确保不丢失)
},
});Kafka
bash
npm install kafkajstypescript
// 服务端
NestFactory.createMicroservice({
transport: Transport.KAFKA,
options: {
client: {
brokers: ['kafka:9092'],
clientId: 'user-service',
},
consumer: {
groupId: 'user-service-consumer', // 消费组
},
},
});
// Kafka 事件模式(使用 topic 名称)
@EventPattern('user-events') // topic 名称
async handleUserEvent(@Payload() message: KafkaMessage) {
const { key, value } = message;
// value 是 Kafka 消息体
}gRPC
bash
npm install @grpc/grpc-js @grpc/proto-loaderprotobuf
// user.proto
syntax = "proto3";
package user;
service UserService {
rpc FindOne (FindOneRequest) returns (User);
rpc CreateUser (CreateUserRequest) returns (User);
rpc FindAll (FindAllRequest) returns (UserList);
}
message User {
int32 id = 1;
string email = 2;
string name = 3;
}
message FindOneRequest { int32 id = 1; }typescript
// gRPC 服务端
NestFactory.createMicroservice({
transport: Transport.GRPC,
options: {
package: 'user',
protoPath: join(__dirname, '../proto/user.proto'),
url: '0.0.0.0:50051',
},
});
// gRPC 服务端 Controller(使用 @GrpcMethod 而非 @MessagePattern)
import { GrpcMethod } from '@nestjs/microservices';
@Controller()
export class UsersGrpcController {
@GrpcMethod('UserService', 'FindOne')
findOne(data: { id: number }) {
return this.usersService.findOne(data.id);
}
}六、消息确认与幂等性
RabbitMQ 手动确认
typescript
@EventPattern('order_created')
async handleOrderCreated(
@Payload() data: any,
@Ctx() context: RmqContext,
) {
const channel = context.getChannelRef();
const message = context.getMessage();
try {
await this.processOrder(data);
channel.ack(message); // 处理成功,确认消息
} catch (err) {
channel.nack(message, false, true); // 处理失败,重新入队
}
}幂等性处理(避免重复消费)
typescript
@EventPattern('user_created')
async handleUserCreated(@Payload() data: { id: number; eventId: string }) {
// 检查事件是否已处理(Redis 或数据库)
const processed = await this.redis.get(`event:${data.eventId}`);
if (processed) {
console.log(`事件 ${data.eventId} 已处理,跳过`);
return;
}
// 处理业务逻辑
await this.createWelcomeContent(data.id);
// 标记已处理(TTL 设为消息最大延迟时间的 2 倍)
await this.redis.set(`event:${data.eventId}`, '1', 'EX', 86400);
}可运行 Demo:
practice/05-microservices— TCP Transport MessagePattern/EventPattern 示例
常见错误
| 错误 | 原因 | 解决 |
|---|---|---|
ClientProxy.send() 永不 resolve | 没有订阅 Observable | 必须 firstValueFrom(client.send(...)) 或 .subscribe() |
@EventPattern 事件未触发 | 使用了 send()(请求-响应)而不是 emit()(事件) | 单向事件用 client.emit(),对应 @EventPattern |
| RabbitMQ 消息重复消费 | 消费者未手动 ack | noAck: false + 处理完成后 channel.ack(message) |
| Redis 传输层消息丢失 | Redis 重启后 Pub/Sub 连接断开 | 生产环境用支持持久化的 RabbitMQ 或 Kafka |