Pulsar adaptor for Apache Kafka

Pulsar 为使用 Apache Kafka Java 客户端 API 编写的应用程序提供了一个简单的解决方案。

使用 Pulsar Kafka 兼容封装器

In an existing application, change the regular Kafka client dependency and replace it with the Pulsar Kafka wrapper. Remove the following dependency in pom.xml:

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>0.10.2.1</version>
  5. </dependency>

引入 Pulsar Kafka 包装器

  1. <dependency>
  2. <groupId>org.apache.pulsar</groupId>
  3. <artifactId>pulsar-client-kafka</artifactId>
  4. <version>2.8.0</version>
  5. </dependency>

无需更改代码即可使用新的依赖。 需要修改配置信息,并确保将 producer 和 consumer 接入 Pulsar 并使用特定的 Pulsar topic,而不是接入 Kafka。

使用 Pulsar Kafka 兼容封装器和 Kafka 客户端

从 Kafka 迁移到 Pulsar 的过程中,应用程序可能需要同时使用原始 Kafka 客户端和 Pulsar Kafka 封装器。 这种情况下,可以考虑使用透明的 Pulsar Kafka 封装器。

  1. <dependency>
  2. <groupId>org.apache.pulsar</groupId>
  3. <artifactId>pulsar-client-kafka-original</artifactId>
  4. <version>2.8.0</version>
  5. </dependency>

当使用这个依赖时,应使用 org.apache.kafka.clients.producer.PulsarKafkaProducer 而不是 org.apache.kafka.clients.producer.KafkaProducer 创建 producer,使用 org.apache.kafka.clients.producer.PulsarKafkaConsumer 创建 consumer。

生产者示例

  1. // 主题应为常规 Pulsar topic
  2. String topic = "persistent://public/default/my-topic";
  3. Properties props = new Properties();
  4. // 指向一个 Pulsar 服务
  5. props.put("bootstrap.servers", "pulsar://localhost:6650");
  6. props.put("key.serializer", IntegerSerializer.class.getName());
  7. props.put("value.serializer", StringSerializer.class.getName());
  8. Producer<Integer, String> producer = new KafkaProducer<>(props);
  9. for (int i = 0; i < 10; i++) {
  10. producer.send(new ProducerRecord<Integer, String>(topic, i, "hello-" + i));
  11. log.info("Message {} sent successfully", i);
  12. }
  13. producer.close();

消费者示例

  1. String topic = "persistent://public/default/my-topic";
  2. Properties props = new Properties();
  3. // 指向一个 Pulsar 服务
  4. props.put("bootstrap.servers", "pulsar://localhost:6650");
  5. props.put("group.id", "my-subscription-name");
  6. props.put("enable.auto.commit", "false");
  7. props.put("key.deserializer", IntegerDeserializer.class.getName());
  8. props.put("value.deserializer", StringDeserializer.class.getName());
  9. Consumer<Integer, String> consumer = new KafkaConsumer<>(props);
  10. consumer.subscribe(Arrays.asList(topic));
  11. while (true) {
  12. ConsumerRecords<Integer, String> records = consumer.poll(100);
  13. records.forEach(record -> {
  14. log.info("Received record: {}", record);
  15. });
  16. // 提交最近的 offset
  17. consumer.commitSync();
  18. }

完整示例

这里你可以找到完整的生产者和消费者示例。

兼容性列表

目前,Pulsar Kafka 封装器支持 Kafka API 提供的多数操作。

生产者(Producer)

API:

Producer 方法支持备注
Future<RecordMetadata> send(ProducerRecord<K, V> record)
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
void flush()
List<PartitionInfo> partitionsFor(String topic)
Map<MetricName, ? extends Metric> metrics()
void close()
void close(long timeout, TimeUnit unit)

属性:

配置属性支持备注
acks忽略已在命名空间级别配置持久性和 quorum write
auto.offset.reset如果你没有给出一个特定的设置,它将使用一个默认值earliest
batch.size忽略
bootstrap.servers
buffer.memory忽略
client.id忽略
compression.typeAllows gzip and lz4. No snappy.
connections.max.idle.ms仅支持最多2 147 483 647 000 (Integrer.MAX_VALUE * 1000)毫秒的闲置时间
interceptor.classes
key.serializer
linger.ms批处理时控制提交时间
max.block.ms忽略
max.in.flight.requests.per.connection忽略Pulsar中即使有多个请求也会保证顺序
max.request.size忽略
metric.reporters忽略
metrics.num.samples忽略
metrics.sample.window.ms忽略
partitioner.class
receive.buffer.bytes忽略
reconnect.backoff.ms忽略
request.timeout.ms忽略
retries忽略Pulsar client retries with exponential backoff until the send timeout expires.
send.buffer.bytes忽略
timeout.ms
value.serializer

消费者(Consumer)

下表为 consumer API 列表。

Consumer 方法支持备注
Set<TopicPartition> assignment()
Set<String> subscription()
void subscribe(Collection<String> topics)
void subscribe(Collection<String> topics, ConsumerRebalanceListener callback)
void assign(Collection<TopicPartition> partitions)
void subscribe(Pattern pattern, ConsumerRebalanceListener callback)
void unsubscribe()
ConsumerRecords<K, V> poll(long timeoutMillis)
void commitSync()
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)
void commitAsync()
void commitAsync(OffsetCommitCallback callback)
void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
void seek(TopicPartition partition, long offset)
void seekToBeginning(Collection<TopicPartition> partitions)
void seekToEnd(Collection<TopicPartition> partitions)
long position(TopicPartition partition)
OffsetAndMetadata committed(TopicPartition partition)
Map<MetricName, ? extends Metric> metrics()
List<PartitionInfo> partitionsFor(String topic)
Map<String, List<PartitionInfo>> listTopics()
Set<TopicPartition> paused()
void pause(Collection<TopicPartition> partitions)
void resume(Collection<TopicPartition> partitions)
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)
Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions)
Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions)
void close()
void close(long timeout, TimeUnit unit)
void wakeup()

属性:

配置属性支持备注
group.id到 Pulsar 订阅名称的映射
max.poll.records
max.poll.interval.ms忽略broker “推送”的消息
session.timeout.ms忽略
heartbeat.interval.ms忽略
bootstrap.servers指定 Pulsar 服务地址
enable.auto.commit
auto.commit.interval.ms忽略自动提交后,立即向 broker 发送 ack。
partition.assignment.strategy忽略
auto.offset.reset仅支持最早和最新消息。
fetch.min.bytes忽略
fetch.max.bytes忽略
fetch.max.wait.ms忽略
interceptor.classes
metadata.max.age.ms忽略
max.partition.fetch.bytes忽略
send.buffer.bytes忽略
receive.buffer.bytes忽略
client.id忽略

自定义 Pulsar 配置

可以通过 Kafka 的属性直接配置 Pulsar 的身份认证。

Pulsar client properties

配置属性默认值备注
pulsar.authentication.classConfigure to auth provider. For example, org.apache.pulsar.client.impl.auth.AuthenticationTls.
pulsar.authentication.params.map表示 Authentication-Plugin 参数的Map.
pulsar.authentication.params.string表示 Authentication-Plugin 参数的字符串, 例如. key1:val1,key2:val2.
pulsar.use.tlsfalse启用 TLS 传输加密。
pulsar.tls.trust.certs.file.pathTLS 证书文件的路径。
pulsar.tls.allow.insecure.connectionfalse接受 broker 的自签名证书。
pulsar.operation.timeout.ms30000一般操作超时时间
pulsar.stats.interval.seconds60Pulsar 客户端数据统计信息打印间隔时间。
pulsar.num.io.threads1要使用的 Netty IO 线程数量。
pulsar.connections.per.broker1每个 broker 的最大连接数。
pulsar.use.tcp.nodelaytrueTCP无延时。
pulsar.concurrent.lookup.requests50000并发查询 topic 的最大连接数。
pulsar.max.number.rejected.request.per.connection50强制关闭连接的错误阈值。
pulsar.keepalive.interval.ms30000每个 client-broker 连接的 keep-alive 时间间隔。

Pulsar producer properties

配置属性默认值备注
pulsar.producer.name指定 producer 名称。
pulsar.producer.initial.sequence.id为 producer 指定序列 id 的基线。
pulsar.producer.max.pending.messages1000设置等待接收 broker 确认的消息队列的最大值。
pulsar.producer.max.pending.messages.across.partitions50000设置所有分区等待确认消息的最大值。
pulsar.producer.batching.enabledtrue是否允许自动批量接收消息。
pulsar.producer.batching.max.messages1000批消息中消息数量的最大值。
pulsar.block.if.producer.queue.full如果队列已满,则指定区块生产者。

Pulsar consumer Properties

配置属性默认值备注
pulsar.consumer.name指定 consumer 名称。
pulsar.consumer.receiver.queue.size1000设置消费者接收队列的大小。
pulsar.consumer.acknowledgments.group.time.millis100设置消费者向 broker 发送确认的最大分组时间。
pulsar.consumer.total.receiver.queue.size.across.partitions50000设置跨分区总接收队列的最大尺寸。
pulsar.consumer.subscription.topics.modePersistentOnly设置消费者订阅主题的模式。