Pulsar C++ client

您可以使用 Pulsar C++ 客户端在 C++ 中创建 Pulsar 生产者和消费者。

C++ 客户端中生产者、消费者和 Reader 的所有方法都是线程安全的。

支持的平台

Pulsar C++ client is supported on Linux and MacOS platforms.

关于 Doxygen 生成的适用于 C++ 客户端的 API 文档,参阅这里

系统要求

您需要先安装下列组件才能使用 C++ 客户端:

Linux

编译

  1. 克隆Pulsar项目仓库
  1. $ git clone https://github.com/apache/pulsar
  1. 安装所有必要的依赖项。
  1. $ apt-get install cmake libssl-dev libcurl4-openssl-dev liblog4cxx-dev \
  2. libprotobuf-dev protobuf-compiler libboost-all-dev google-mock libgtest-dev libjsoncpp-dev
  1. 编译并安装 Google 测试
  1. # libgtest-dev version is 1.18.0 or above
  2. $ cd /usr/src/googletest
  3. $ sudo cmake .
  4. $ sudo make
  5. $ sudo cp ./googlemock/libgmock.a ./googlemock/gtest/libgtest.a /usr/lib/
  6. # less than 1.18.0
  7. $ cd /usr/src/gtest
  8. $ sudo cmake .
  9. $ sudo make
  10. $ sudo cp libgtest.a /usr/lib
  11. $ cd /usr/src/gmock
  12. $ sudo cmake .
  13. $ sudo make
  14. $ sudo cp libgmock.a /usr/lib
  1. 在 Pulsar 仓库中编译Pulsar C++ 客户端库。
  1. $ cd pulsar-client-cpp
  2. $ cmake .
  3. $ make

安装组件成功后,文件 libpulsar.solibpulsar.a 位于资源库的 liblib 文件夹中。 工具 famerProducer and familConsumerperf 目录中。

安装依赖项

Since 2.1.0 release, Pulsar ships pre-built RPM and Debian packages. You can download and install those packages directly.

下载并安装 RPM 或 DEB后, libpulsar.so, libpulsarnosl.so, libpulsar.libpulsarwithdeps.a 库位于您的 /usr/lib 目录。

By default, they are built in code path ${PULSAR_HOME}/pulsar-client-cpp. You can build with the command below.

cmake . -DBUILD_TESTS=OFF -DLINK_STATIC=ON && make pulsarShared pulsarSharedNossl pulsarStatic pulsarStaticWithDeps -j 3.

这些库依靠其他一些库。 如果您想要获得详细的依赖版本,请参阅 RPM or DEB 文件。

  1. libpulsar.so 是一个共享的库,其中包含静态链接的 boostopenssl 它还动态地连接所有其他必要的库。 您可以使用此Pulsar库下面的命令。
  1. g++ --std=c++11 PulsarTest.cpp -o test /usr/lib/libpulsar.so -I/usr/local/ssl/include
  1. libpulsarnossl.so 是一个共享的库,类似于 libpulsar.so ,然而 opensslcrypto 库是动态链接的。 您可以使用此Pulsar库下面的命令。
  1. g++ --std=c++11 PulsarTest.cpp -o test /usr/lib/libpulsarnossl.so -lssl -lcrypto -I/usr/local/ssl/include -L/usr/local/ssl/lib
  1. libpulsar.a is a static library. You need to load dependencies before using this library. 您可以使用此Pulsar库下面的命令。
  1. g++ --std=c++11 PulsarTest.cpp -o test /usr/lib/libpulsar.a -lssl -lcrypto -ldl -lpthread -I/usr/local/ssl/include -L/usr/local/ssl/lib -lboost_system -lboost_regex -lcurl -lprotobuf -lzstd -lz
  1. libpulsarwithdep.a 是一个静态库,基于 libpulsar.a。 它被归档于 libnourt_regex, libnot_system, libcurl, libprotobuf, libzstdlibz. 您可以使用此Pulsar库下面的命令。
  1. g++ --std=c++11 PulsarTest.cpp -o test /usr/lib/libpulsarwithdeps.a -lssl -lcrypto -ldl -lpthread -I/usr/local/ssl/include -L/usr/local/ssl/lib

libpulsarwithdeps. 不包括和 openssl 相关的 libssllibcrypto库, 因为这两个库与安全有关。 使用本地系统提供的版本来处理安全问题和升级库更加简单合理。

Install RPM

  1. Download a RPM package from the links in the table.
链接加密文件
clientasc, sha512
client-debuginfoasc, sha512
client-develasc, sha512
  1. Install the package using the following command.
  1. $ rpm -ivh apache-pulsar-client*.rpm

安装RPM成功后,Pulsar 库位于 /usr/lib 目录中。

Install Debian

  1. Download a Debian package from the links in the table.
链接加密文件
clientasc, sha512
client-develasc, sha512
  1. Install the package using the following command.
  1. $ apt install ./apache-pulsar-client*.deb

安装DEB成功后,Pulsar 库位于 /usr/lib 目录中。

编译

If you want to build RPM and Debian packages from the latest master, follow the instructions below. You should run all the instructions at the root directory of your cloned Pulsar repository.

你可以通过下面的方式来编译包含libpulsar.so / libpulsar.a / libpulsarnossl.so / libpulsarwithdeps.a静态库以及所有必要依赖的二进制包。

要构建C++库包,您需要先构建Java 包。

  1. mvn install -DskipTests

RPM

To build the RPM inside a Docker container, use the command below. The RPMs are in the pulsar-client-cpp/pkg/rpm/RPMS/x86_64/ path.

  1. pulsar-client-cpp/pkg/rpm/docker-build-rpm.sh
包名内容
pulsar-client共享库 libpulsar.so and libpulsarnossl.so
pulsar-client-devel静态库 libpulsar.a, libpulsarwithdeps.a和 C++ 和 Cheaders
pulsar-client-debuginfoDebug symbols for libpulsar.so

Debian

To build Debian packages, enter the following command.

  1. pulsar-client-cpp/pkg/deb/docker-build-deb.sh

编译好的DEB包会被保存在文件夹 pulsar-client-cpp/pkg/deb/BUILD/DEB/中.

包名内容
pulsar-client共享库 libpulsar.so and libpulsarnossl.so
pulsar-client-dev静态库 libpulsar.a, libpulsarwithdeps.a 和 C++ 和 Cheaders

MacOS

编译

  1. 克隆Pulsar项目仓库
  1. $ git clone https://github.com/apache/pulsar
  1. 安装所有必要的依赖项。
  1. # OpenSSL installation
  2. $ brew install openssl
  3. $ export OPENSSL_INCLUDE_DIR=/usr/local/opt/openssl/include/
  4. $ export OPENSSL_ROOT_DIR=/usr/local/opt/openssl/
  5. # Protocol Buffers installation
  6. $ brew tap homebrew/versions
  7. $ brew install protobuf260
  8. $ brew install boost
  9. $ brew install log4cxx
  10. # Google Test installation
  11. $ git clone https://github.com/google/googletest.git
  12. $ cd googletest
  13. $ cmake .
  14. $ make install
  1. 然后在您克隆的repo中编译Pulsar客户端库:
  1. $ cd pulsar-client-cpp
  2. $ cmake .
  3. $ make

安装 libpulsar

Pulsar releases are available in the Homebrew core repository. You can install the C++ client library with the following command. The package is installed with the library and headers.

  1. brew install libpulsar

连接 URL

要使用客户端库连接Pulsar,您需要指定一个 Pulsar 协议 URL。

Pulsar protocol URLs are assigned to specific clusters, you can use the Pulsar URI scheme. The default port is 6650. The following is an example for localhost.

  1. pulsar://localhost:6650

在一个Pulsar生产集群中,URL如下所示。

  1. pulsar://pulsar.us-west.example.com:6650

If you use TLS authentication, you need to add ssl, and the default port is 6651. The following is an example.

  1. pulsar+ssl://pulsar.us-west.example.com:6651

Create a consumer

To use Pulsar as a consumer, you need to create a consumer on the C++ client. The following is an example.

  1. Client client("pulsar://localhost:6650");
  2. Consumer consumer;
  3. Result result = client.subscribe("my-topic", "my-subscription-name", consumer);
  4. if (result != ResultOk) {
  5. LOG_ERROR("Failed to subscribe: " << result);
  6. return -1;
  7. }
  8. Message msg;
  9. while (true) {
  10. consumer.receive(msg);
  11. LOG_INFO("Received: " << msg
  12. << " with payload '" << msg.getDataAsString() << "'");
  13. consumer.acknowledge(msg);
  14. }
  15. client.close();

Create a producer

To use Pulsar as a producer, you need to create a producer on the C++ client. The following is an example.

  1. Client client("pulsar://localhost:6650");
  2. Producer producer;
  3. Result result = client.createProducer("my-topic", producer);
  4. if (result != ResultOk) {
  5. LOG_ERROR("Error creating producer: " << result);
  6. return -1;
  7. }
  8. // Publish 10 messages to the topic
  9. for (int i = 0; i < 10; i++){
  10. Message msg = MessageBuilder().setContent("my-message").build();
  11. Result res = producer.send(msg);
  12. LOG_INFO("Message sent: " << res);
  13. }
  14. client.close();

Enable authentication in connection URLs

If you use TLS authentication when connecting to Pulsar, you need to add ssl in the connection URLs, and the default port is 6651. The following is an example.

  1. ClientConfiguration config = ClientConfiguration();
  2. config.setUseTls(true);
  3. config.setTlsTrustCertsFilePath("/path/to/cacert.pem");
  4. config.setTlsAllowInsecureConnection(false);
  5. config.setAuth(pulsar::AuthTls::create(
  6. "/path/to/client-cert.pem", "/path/to/client-key.pem"););
  7. Client client("pulsar+ssl://my-broker.com:6651", config);

For complete examples, refer to C++ client examples.

Schema

This section describes some examples about schema. For more information about schema, see Pulsar schema.

Create producer with Avro schema

The following example shows how to create a producer with an Avro schema.

  1. static const std::string exampleSchema =
  2. "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
  3. "\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
  4. Producer producer;
  5. ProducerConfiguration producerConf;
  6. producerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
  7. client.createProducer("topic-avro", producerConf, producer);

Create consumer with Avro schema

The following example shows how to create a consumer with an Avro schema.

  1. static const std::string exampleSchema =
  2. "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\","
  3. "\"fields\":[{\"name\":\"a\",\"type\":\"int\"},{\"name\":\"b\",\"type\":\"int\"}]}";
  4. ConsumerConfiguration consumerConf;
  5. Consumer consumer;
  6. consumerConf.setSchema(SchemaInfo(AVRO, "Avro", exampleSchema));
  7. client.subscribe("topic-avro", "sub-2", consumerConf, consumer)