Pulsar adaptor for Apache Kafka

Pulsar 为使用 Apache Kafka Java 客户端 API 编写的应用程序提供了一个简单的解决方案。

使用 Pulsar Kafka 兼容封装器

In an existing application, change the regular Kafka client dependency and replace it with the Pulsar Kafka wrapper. Remove the following dependency in pom.xml:

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>0.10.2.1</version>
  5. </dependency>

引入 Pulsar Kafka 包装器

  1. <dependency>
  2. <groupId>org.apache.pulsar</groupId>
  3. <artifactId>pulsar-client-kafka</artifactId>
  4. <version>2.7.2</version>
  5. </dependency>

无需更改代码即可使用新的依赖。 需要修改配置信息,并确保将 producer 和 consumer 接入 Pulsar 并使用特定的 Pulsar topic,而不是接入 Kafka。

使用 Pulsar Kafka 兼容封装器和 Kafka 客户端

从 Kafka 迁移到 Pulsar 的过程中,应用程序可能需要同时使用原始 Kafka 客户端和 Pulsar Kafka 封装器。 这种情况下,可以考虑使用透明的 Pulsar Kafka 封装器。

  1. <dependency>
  2. <groupId>org.apache.pulsar</groupId>
  3. <artifactId>pulsar-client-kafka-original</artifactId>
  4. <version>2.7.2</version>
  5. </dependency>

当使用这个依赖时,应使用 org.apache.kafka.clients.producer.PulsarKafkaProducer 而不是 org.apache.kafka.clients.producer.KafkaProducer 创建 producer,使用 org.apache.kafka.clients.producer.PulsarKafkaConsumer 创建 consumer。

生产者示例

  1. // 主题应为常规 Pulsar topic
  2. String topic = "persistent://public/default/my-topic";
  3. Properties props = new Properties();
  4. // 指向一个 Pulsar 服务
  5. props.put("bootstrap.servers", "pulsar://localhost:6650");
  6. props.put("key.serializer", IntegerSerializer.class.getName());
  7. props.put("value.serializer", StringSerializer.class.getName());
  8. Producer<Integer, String> producer = new KafkaProducer<>(props);
  9. for (int i = 0; i < 10; i++) {
  10. producer.send(new ProducerRecord<Integer, String>(topic, i, "hello-" + i));
  11. log.info("Message {} sent successfully", i);
  12. }
  13. producer.close();

消费者示例

  1. String topic = "persistent://public/default/my-topic";
  2. Properties props = new Properties();
  3. // 指向一个 Pulsar 服务
  4. props.put("bootstrap.servers", "pulsar://localhost:6650");
  5. props.put("group.id", "my-subscription-name");
  6. props.put("enable.auto.commit", "false");
  7. props.put("key.deserializer", IntegerDeserializer.class.getName());
  8. props.put("value.deserializer", StringDeserializer.class.getName());
  9. Consumer<Integer, String> consumer = new KafkaConsumer<>(props);
  10. consumer.subscribe(Arrays.asList(topic));
  11. while (true) {
  12. ConsumerRecords<Integer, String> records = consumer.poll(100);
  13. records.forEach(record -> {
  14. log.info("Received record: {}", record);
  15. });
  16. // 提交最近的 offset
  17. consumer.commitSync();
  18. }

完整示例

点击这里查看完整的生产者和消费者示例

兼容性列表

目前,Pulsar Kafka 封装器支持 Kafka API 提供的多数操作。

生产者(Producer)

API:

Producer 方法支持备注
Future<RecordMetadata> send(ProducerRecord<K, V> record)
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
void flush()
List<PartitionInfo> partitionsFor(String topic)
Map<MetricName, ? extends Metric> metrics()
void close()
void close(long timeout, TimeUnit unit)

属性:

配置属性支持备注
acks忽略已在命名空间级别配置持久性和 quorum write
auto.offset.reset若用户为提供特定设置,则默认值为 latest
batch.size忽略
bootstrap.servers
buffer.memory忽略
client.id忽略
compression.typeAllows gzip and lz4. No snappy.
connections.max.idle.ms仅支持最多2 147 483 647 000 (Integrer.MAX_VALUE * 1000)毫秒的闲置时间
interceptor.classes
key.serializer
linger.ms批处理时控制提交时间
max.block.ms忽略
max.in.flight.requests.per.connection忽略Pulsar中即使有多个请求也会保证顺序
max.request.size忽略
metric.reporters忽略
metrics.num.samples忽略
metrics.sample.window.ms忽略
partitioner.class
receive.buffer.bytes忽略
reconnect.backoff.ms忽略
request.timeout.ms忽略
retries忽略Pulsar client retries with exponential backoff until the send timeout expires.
send.buffer.bytes忽略
timeout.ms
value.serializer

消费者(Consumer)

下表为 consumer API 列表。

Consumer 方法支持备注
Set<TopicPartition> assignment()
Set<String> subscription()
void subscribe(Collection<String> topics)
void subscribe(Collection<String> topics, ConsumerRebalanceListener callback)
void assign(Collection<TopicPartition> partitions)
void subscribe(Pattern pattern, ConsumerRebalanceListener callback)
void unsubscribe()
ConsumerRecords<K, V> poll(long timeoutMillis)
void commitSync()
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)
void commitAsync()
void commitAsync(OffsetCommitCallback callback)
void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)
void seek(TopicPartition partition, long offset)
void seekToBeginning(Collection<TopicPartition> partitions)
void seekToEnd(Collection<TopicPartition> partitions)
long position(TopicPartition partition)
OffsetAndMetadata committed(TopicPartition partition)
Map<MetricName, ? extends Metric> metrics()
List<PartitionInfo> partitionsFor(String topic)
Map<String, List<PartitionInfo>> listTopics()
Set<TopicPartition> paused()
void pause(Collection<TopicPartition> partitions)
void resume(Collection<TopicPartition> partitions)
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)
Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions)
Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions)
void close()
void close(long timeout, TimeUnit unit)
void wakeup()

属性:

配置属性支持备注
group.id到 Pulsar 订阅名称的映射
max.poll.records
max.poll.interval.ms忽略broker “推送”的消息
session.timeout.ms忽略
heartbeat.interval.ms忽略
bootstrap.servers指定 Pulsar 服务地址
enable.auto.commit
auto.commit.interval.ms忽略自动提交后,立即向 broker 发送 ack。
partition.assignment.strategy忽略
auto.offset.reset仅支持最早和最新消息。
fetch.min.bytes忽略
fetch.max.bytes忽略
fetch.max.wait.ms忽略
interceptor.classes
metadata.max.age.ms忽略
max.partition.fetch.bytes忽略
send.buffer.bytes忽略
receive.buffer.bytes忽略
client.id忽略

Customize Pulsar configurations

可以通过 Kafka 的属性直接配置 Pulsar 的身份认证。

Pulsar client properties

配置属性默认值备注
pulsar.authentication.classConfigure to auth provider. For example, org.apache.pulsar.client.impl.auth.AuthenticationTls.
pulsar.authentication.params.mapMap which represents parameters for the Authentication-Plugin.
pulsar.authentication.params.stringString which represents parameters for the Authentication-Plugin, for example, key1:val1,key2:val2.
pulsar.use.tlsfalseEnable TLS transport encryption.
pulsar.tls.trust.certs.file.pathPath for the TLS trust certificate store.
pulsar.tls.allow.insecure.connectionfalseAccept self-signed certificates from brokers.
pulsar.operation.timeout.ms30000General operations timeout.
pulsar.stats.interval.seconds60Pulsar client lib stats printing interval.
pulsar.num.io.threads1The number of Netty IO threads to use.
pulsar.connections.per.broker1The maximum number of connection to each broker.
pulsar.use.tcp.nodelaytrueTCP no-delay.
pulsar.concurrent.lookup.requests50000The maximum number of concurrent topic lookups.
pulsar.max.number.rejected.request.per.connection50The threshold of errors to forcefully close a connection.
pulsar.keepalive.interval.ms30000Keep alive interval for each client-broker-connection.

Pulsar producer properties

配置属性默认值备注
pulsar.producer.name指定 producer 名称。
pulsar.producer.initial.sequence.id为 producer 指定序列 id 的基线。
pulsar.producer.max.pending.messages1000设置等待接收 broker 确认的消息队列的最大值。
pulsar.producer.max.pending.messages.across.partitions50000设置所有分区等待确认消息的最大值。
pulsar.producer.batching.enabledtrue是否允许自动批量接收消息。
pulsar.producer.batching.max.messages1000批消息中消息数量的最大值。
pulsar.block.if.producer.queue.fullSpecify the block producer if queue is full.

Pulsar consumer Properties

配置属性默认值备注
pulsar.consumer.name指定 consumer 名称。
pulsar.consumer.receiver.queue.size1000Set the size of the consumer receiver queue.
pulsar.consumer.acknowledgments.group.time.millis100Set the maximum amount of group time for consumers to send the acknowledgments to the broker.
pulsar.consumer.total.receiver.queue.size.across.partitions50000Set the maximum size of the total receiver queue across partitions.
pulsar.consumer.subscription.topics.modePersistentOnlySet the subscription topic mode for consumers.