微服务

基本

「微服务」在这里不是合适的词。实际上,Nest microservice 只是一个使用不同传输层(不是HTTP)的应用程序。、

微服务 - 图1

Nest 提供两种通信支持 - TCP 和 Redis pub/sub,但通过 CustomTransportStrategy 接口很容易实现新的传输策略。要创建 Nest 微服务,我们使用包 @nestjs/core 中的 NestFactory 。

我们来创建一个简单的微服务,它将通过 TCP 协议来侦听消息。我们要从 bootstrap() 功能开始。

main.ts

  1. import { NestFactory } from '@nestjs/core';
  2. import { ApplicationModule } from './app.module';
  3. import { Transport } from '@nestjs/microservices';
  4. async function bootstrap() {
  5. const app = await NestFactory.createMicroservice(ApplicationModule, {
  6. transport: Transport.TCP,
  7. });
  8. app.listen(() => console.log('Microservice is listening'));
  9. }
  10. bootstrap();

!> Transport 是一个帮助器枚举。

该 createMicroservice() 方法的第二个参数是一个选项对象。该对象可能有3个成员:

transport 指定传输方法(Transport.TCP 或者 Transport.REDIS 可以立即使用)
port 确定要使用的端口
url 确定使用 * 仅用于 Redis 的 url。
strategy 确定使用 * 仅用于自定义策略的策略

模式(Patterns)

Nest 的 microService 通过识别消息模式。该模式是一个普通的 JavaScript 值 - 对象,字符串甚至数字。
要创建模式处理程序, 我们正在使用从 @nestjs/microservices 包中导入的 @MessagePattern () 修饰器。

math.controller.ts

  1. import { Controller } from '@nestjs/common';
  2. import { MessagePattern } from '@nestjs/microservices';
  3. @Controller()
  4. export class MathController {
  5. @MessagePattern({ cmd: 'sum' })
  6. sum(data: number[]): number {
  7. return (data || []).reduce((a, b) => a + b);
  8. }
  9. }

!> 我们只能在 @Controller() 类中注册模式处理程序。

上述处理程序正在监听满足该c md: ‘sum’ 模式的消息。每个模式处理程序都有一个参数,data 从客户端传递。在这种情况下,数据是一组数字,必须累积。

异步响应

每个模式处理程序都可以async,所以你可以返回 Promise。而且,你可以返回 RxJS Observable,所以这些值将被返回,直到流完成。

math.controller.ts

  1. @MessagePattern({ cmd: 'sum' })
  2. sum(data: number[]): Observable<number> {
  3. return Observable.from([1, 2, 3]);
  4. }

上面的消息处理程序将响应3次(来自数组中的每个项)。

客户端

要连接 Nest microservice,我们使用的是 ClientProxy 类,由 @Client()decorator 将该实例分配给属性。这个装饰器接受与 Nest microservice 选项对象相同的对象的单个参数。

  1. @Client({ transport: Transport.TCP, port: 5667 })
  2. client: ClientProxy

!> 这两种 @Client() 装饰器和 ClientProxy 从 @nestjs/microservices 包引入。

该 ClientProxy 有一个 send() 方法。此方法旨在调用微服务并返回 Observable 响应。

  1. @Get()
  2. call(): Observable<number> {
  3. const pattern = { cmd: 'sum' };
  4. const data = [1, 2, 3, 4, 5];
  5. return this.client.send<number>(pattern, data);
  6. }

它需要2个参数, 模式 和 data。模式必须与 @MessagePattern() 修饰符中声明的样式相同。就这样。

Redis

还有另一种方式可以与 Nest 微服务 一起使用。我们可以使用伟大的 Redis 功能 - 发布/订阅 (publish / subscribe)来代替 TCP 通信 。

微服务 - 图2

要从TCP传输策略切换到 Redis pub/sub,我们只需要更改传递给 createMicroservice() 方法的选项对象即可。

main.ts

  1. const app = await NestFactory.createMicroservice(ApplicationModule, {
  2. transport: Transport.REDIS,
  3. url: 'redis://localhost:6379',
  4. });

?> 不要忘记更改 Transport.REDIS 的 @Client() 装饰器。

异常过滤器

websockets 异常层的工作原理与 prime 层完全相同。唯一的区别是 不要使用 HttpException,你应该使用 RpcException 。

  1. throw new RpcException('Invalid credentials.');

?> 该 RpcException 类需要引入 @nestjs/microservices 包。

Nest 会处理这个异常并使用 exception 发出带有以下数据的消息:

  1. {
  2. status: 'error',
  3. message: 'Invalid credentials.'
  4. }

该异常过滤器是也很相似,其工作方式与主程序完全相同。唯一的区别是该catch()方法应该返回一个 Observable。

RPC-exception.filter.ts

  1. import { Catch, RpcExceptionFilter } from '@nestjs/common';
  2. import { Observable } from 'rxjs/Observable';
  3. import { RpcException } from '@nestjs/microservices';
  4. import 'rxjs/add/observable/throw';
  5. @Catch(RpcException)
  6. export class ExceptionFilter implements RpcExceptionFilter {
  7. catch(exception: RpcException): Observable<any> {
  8. return Observable.throw(exception.getError());
  9. }
  10. }

!> 全局设置微服务异常过滤器是不可能的。

管道

微服务管道和普通管道没有区别。你应该知道的唯一一件事情就是,不要使用 HttpException,你应该使用 RpcException。

!> RpcException 需要引入 @nestjs/microservices。

看守器

常规看守器和微服务看守器之间有一个区别。微服务看守器 data 从客户端传递而不是将 expressjs 请求对象作为 canActivate() 函数的参数。另外,当看守器返回时 false,它会抛出 RpcException(而不是 HttpException)。

!> RpcException 需要引入 @nestjs/microservices。

拦截器

常规拦截器和微服务拦截器 之间有一个区别。微服务拦截器 data 从客户端传递而不是将 expressjs 请求对象作为 intercept() 函数的参数。

自定义传输

Nest具有通过 TCP 和 Redis 的内置传输,但其他通信方案可以通过 CustomTransportStrategy 接口实现。出于演示目的,我们将使用 ampqlib 库移植RabbitMQ 传输策略。

服务器

让我们从将传入消息与正确消息处理程序匹配的 RabbitMQServer 开始。

rabbitmq-server.ts

  1. import * as amqp from 'amqplib';
  2. import { Server, CustomTransportStrategy } from '@nestjs/microservices';
  3. import { Observable } from 'rxjs/Observable';
  4. export class RabbitMQServer extends Server implements CustomTransportStrategy {
  5. private server: amqp.Connection = null;
  6. private channel: amqp.Channel = null;
  7. constructor(
  8. private readonly host: string,
  9. private readonly queue: string) {
  10. super();
  11. }
  12. public async listen(callback: () => void) {
  13. await this.init();
  14. this.channel.consume(`${this.queue}_sub`, this.handleMessage.bind(this), {
  15. noAck: true,
  16. });
  17. }
  18. public close() {
  19. this.channel && this.channel.close();
  20. this.server && this.server.close();
  21. }
  22. private async handleMessage(message) {
  23. const { content } = message;
  24. const messageObj = JSON.parse(content.toString());
  25. const handlers = this.getHandlers();
  26. const pattern = JSON.stringify(messageObj.pattern);
  27. if (!this.messageHandlers[pattern]) {
  28. return;
  29. }
  30. const handler = this.messageHandlers[pattern];
  31. const response$ = this.transformToObservable(await handler(messageObj.data)) as Observable<any>;
  32. response$ && this.send(response$, (data) => this.sendMessage(data));
  33. }
  34. private sendMessage(message) {
  35. const buffer = Buffer.from(JSON.stringify(message));
  36. this.channel.sendToQueue(`${this.queue}_pub`, buffer);
  37. }
  38. private async init() {
  39. this.server = await amqp.connect(this.host);
  40. this.channel = await this.server.createChannel();
  41. this.channel.assertQueue(`${this.queue}_sub`, { durable: false });
  42. this.channel.assertQueue(`${this.queue}_pub`, { durable: false });
  43. }
  44. }

CustomTransportStrategy 强制执行两种基本方法 listen() 和 close()。此外, RabbitMQServer 将扩展抽象 server 类。此类提供核心getHandlers()和send()方法, 以及帮助器 transformToObservable () 方法。

最后一步是设置 RabbitMQServer:

main.ts

  1. const app = await NestFactory.createMicroservice(ApplicationModule, {
  2. strategy: new RabbitMQServer('amqp://localhost', 'channel'),
  3. });

客户端

RabbitMQ 服务器正在侦听消息。现在是创建客户端类的时候了, 它将扩展抽象 ClientProxy 类。为了使其正常工作, 我们只需要重写 sendSingleMessage() 方法。

rabbitmq-client.ts

  1. import * as amqp from 'amqplib';
  2. import { ClientProxy } from '@nestjs/microservices';
  3. export class RabbitMQClient extends ClientProxy {
  4. constructor(
  5. private readonly host: string,
  6. private readonly queue: string) {
  7. super();
  8. }
  9. protected async sendSingleMessage(messageObj, callback: (err, result, disposed?: boolean) => void) {
  10. const server = await amqp.connect(this.host);
  11. const channel = await server.createChannel();
  12. const { sub, pub } = this.getQueues();
  13. channel.assertQueue(sub, { durable: false });
  14. channel.assertQueue(pub, { durable: false });
  15. channel.consume(pub, (message) => this.handleMessage(message, server, callback), { noAck: true });
  16. channel.sendToQueue(sub, Buffer.from(JSON.stringify(messageObj)));
  17. }
  18. private handleMessage(message, server, callback: (err, result, disposed?: boolean) => void) {
  19. const { content } = message;
  20. const { err, response, disposed } = JSON.parse(content.toString());
  21. if (disposed) {
  22. server.close();
  23. }
  24. callback(err, response, disposed);
  25. }
  26. private getQueues() {
  27. return { pub: `${this.queue}_pub`, sub: `${this.queue}_sub` };
  28. }
  29. }

早些时候, Nest 负责创建客户端类的实例。我们一直在使用 @Client() 装饰器。现在, 当我们创建了自己的解决方案时, 我们只需使用 new 运算符就可以直接创建 RabbitMQClient 实例。

  1. this.client = new RabbitMQClient('amqp://localhost', 'example');