分布式事件总线Kafka集成

本文解释了**如何配置Kafka**做为分布式总线提供程序. 参阅分布式事件总线文档了解如何使用分布式事件总线系统.

安装

使用ABP CLI添加[Volo.Abp.EventBus.KafkaVolo.Abp.EventBus.KafkaNuGet包到你的项目:

  • 安装ABP CLI,如果你还没有安装.
  • 在你想要安装 Volo.Abp.EventBus.Kafka 包的 .csproj 文件目录打开命令行(终端).
  • 运行 abp add-package Volo.Abp.EventBus.Kafka 命令.

如果你想要手动安装,安装Volo.Abp.EventBus.Kafka NuGet 包到你的项目然后添加 [DependsOn(typeof(AbpEventBusKafkaModule))] 到你的项目模块类.

配置

可以使用配置使用标准的配置系统,如 appsettings.json 文件,或选项类.

appsettings.json 文件配置

这是配置Kafka设置最简单的方法. 它也非常强大,因为你可以使用由AspNet Core支持的的任何其他配置源(如环境变量).

示例:最小化配置与默认配置连接到本地的Kafka服务器

  1. {
  2. "Kafka": {
  3. "EventBus": {
  4. "GroupId": "MyGroupId",
  5. "TopicName": "MyTopicName"
  6. }
  7. }
  8. }
  • MyGroupId 是应用程序的名称,用于Kafka的GroupId.
  • MyTopicNametopic名称.

参阅Kafka文档更好的了解这些选项.

连接

如果需要连接到本地主机以外的另一台服务器,需要配置连接属性.

示例: 指定主机名 (如IP地址)

  1. {
  2. "Kafka": {
  3. "Connections": {
  4. "Default": {
  5. "BootstrapServers": "123.123.123.123:9092"
  6. }
  7. },
  8. "EventBus": {
  9. "GroupId": "MyGroupId",
  10. "TopicName": "MyTopicName"
  11. }
  12. }
  13. }

允许定义多个连接. 在这种情况下,你可以指定用于事件总线的连接.

示例: 声明两个连接并将其中一个用于事件总线

  1. {
  2. "Kafka": {
  3. "Connections": {
  4. "Default": {
  5. "BootstrapServers": "123.123.123.123:9092"
  6. },
  7. "SecondConnection": {
  8. "BootstrapServers": "321.321.321.321:9092"
  9. }
  10. },
  11. "EventBus": {
  12. "GroupId": "MyGroupId",
  13. "TopicName": "MyTopicName",
  14. "ConnectionName": "SecondConnection"
  15. }
  16. }
  17. }

这允许你可以在你的应用程序使用多个Kafka服务器,但将其中一个做为事件总线.

你可以使用任何ClientConfig属性作为连接属性.

示例: 指定socket超时时间

  1. {
  2. "Kafka": {
  3. "Connections": {
  4. "Default": {
  5. "BootstrapServers": "123.123.123.123:9092",
  6. "SocketTimeoutMs": 60000
  7. }
  8. }
  9. }
  10. }

选项类

AbpKafkaOptionsAbpKafkaEventBusOptions 类用于配置Kafka的连接字符串和事件总线选项.

你可以在你的模块ConfigureServices 方法配置选项.

示例: 配置连接

  1. Configure<AbpKafkaOptions>(options =>
  2. {
  3. options.Connections.Default.BootstrapServers = "123.123.123.123:9092";
  4. options.Connections.Default.SaslUsername = "user";
  5. options.Connections.Default.SaslPassword = "pwd";
  6. });

示例: 配置 consumer config

  1. Configure<AbpKafkaOptions>(options =>
  2. {
  3. options.ConfigureConsumer = config =>
  4. {
  5. config.GroupId = "MyGroupId";
  6. config.EnableAutoCommit = false;
  7. };
  8. });

示例: 配置 producer config

  1. Configure<AbpKafkaOptions>(options =>
  2. {
  3. options.ConfigureProducer = config =>
  4. {
  5. config.MessageTimeoutMs = 6000;
  6. config.Acks = Acks.All;
  7. };
  8. });

示例: 配置 topic specification

  1. Configure<AbpKafkaOptions>(options =>
  2. {
  3. options.ConfigureTopic = specification =>
  4. {
  5. specification.ReplicationFactor = 3;
  6. specification.NumPartitions = 3;
  7. };
  8. });

使用这些选项类可以与 appsettings.json 组合在一起. 在代码中配置选项属性会覆盖配置文件中的值.