Pulsar SQL configuration and deployment

You can configure the Pulsar Trino plugin and deploy a cluster with the following instruction.

Configure Pulsar Trino plugin

You can configure the Pulsar Trino plugin in the ${project.root}/trino/conf/catalog/pulsar.properties properties file. The configuration for the connector and the default values are as follows.

  1. # name of the connector to be displayed in the catalog
  2. connector.name=pulsar
  3. # the URL of Pulsar broker service
  4. pulsar.web-service-url=http://localhost:8080
  5. # the URL of Pulsar broker binary service
  6. pulsar.broker-binary-service-url=pulsar://localhost:6650
  7. # the URL of Zookeeper cluster
  8. pulsar.zookeeper-uri=localhost:2181
  9. # minimum number of entries to read at a single time
  10. pulsar.entry-read-batch-size=100
  11. # default number of splits to use per query
  12. pulsar.target-num-splits=4
  13. # max size of one batch message (default value is 5MB)
  14. pulsar.max-message-size=5242880
  15. # number of split used when querying data from Pulsar
  16. pulsar.target-num-splits=2
  17. # size of queue to buffer entry read from Pulsar
  18. pulsar.max-split-entry-queue-size=1000
  19. # size of queue to buffer message extract from entries
  20. pulsar.max-split-message-queue-size=10000
  21. # status provider to record connector metrics
  22. pulsar.stats-provider=org.apache.bookkeeper.stats.NullStatsProvider
  23. # config in map format for stats provider e.g. {"key1":"val1","key2":"val2"}
  24. pulsar.stats-provider-configs={}
  25. # whether to rewrite Pulsar's default topic delimiter '/'
  26. pulsar.namespace-delimiter-rewrite-enable=false
  27. # delimiter used to rewrite Pulsar's default delimiter '/', use if default is causing incompatibility with other system like Superset
  28. pulsar.rewrite-namespace-delimiter="/"
  29. # maximum number of thread pool size for ledger offloader.
  30. pulsar.managed-ledger-offload-max-threads=2
  31. # driver used to offload or read cold data to or from long-term storage
  32. pulsar.managed-ledger-offload-driver=null
  33. # directory to load offloaders nar file.
  34. pulsar.offloaders-directory="./offloaders"
  35. # properties and configurations related to specific offloader implementation as map e.g. {"key1":"val1","key2":"val2"}
  36. pulsar.offloader-properties={}
  37. # authentication plugin used to authenticate to Pulsar cluster
  38. pulsar.auth-plugin=null
  39. # authentication parameter used to authenticate to the Pulsar cluster as a string e.g. "key1:val1,key2:val2".
  40. pulsar.auth-params=null
  41. # whether the Pulsar client accept an untrusted TLS certificate from broker
  42. pulsar.tls-allow-insecure-connection=null
  43. # whether to allow hostname verification when a client connects to broker over TLS.
  44. pulsar.tls-hostname-verification-enable=null
  45. # path for the trusted TLS certificate file of Pulsar broker
  46. pulsar.tls-trust-cert-file-path=null
  47. ## whether to enable Pulsar authorization
  48. pulsar.authorization-enabled=false
  49. # set the threshold for BookKeeper request throttle, default is disabled
  50. pulsar.bookkeeper-throttle-value=0
  51. # set the number of IO thread
  52. pulsar.bookkeeper-num-io-threads=2 * Runtime.getRuntime().availableProcessors()
  53. # set the number of worker thread
  54. pulsar.bookkeeper-num-worker-threads=Runtime.getRuntime().availableProcessors()
  55. # whether to use BookKeeper V2 wire protocol
  56. pulsar.bookkeeper-use-v2-protocol=true
  57. # interval to check the need for sending an explicit LAC, default is disabled
  58. pulsar.bookkeeper-explicit-interval=0
  59. # size for managed ledger entry cache (in MB).
  60. pulsar.managed-ledger-cache-size-MB=0
  61. # number of threads to be used for managed ledger tasks dispatching
  62. pulsar.managed-ledger-num-worker-threads=Runtime.getRuntime().availableProcessors()
  63. # number of threads to be used for managed ledger scheduled tasks
  64. pulsar.managed-ledger-num-scheduler-threads=Runtime.getRuntime().availableProcessors()
  65. # directory used to store extraction NAR file
  66. pulsar.nar-extraction-directory=System.getProperty("java.io.tmpdir")

Enable authentication and authorization between Pulsar and Pulsar SQL

By default, the authentication and authorization between Pulsar and Pulsar SQL are disabled.

To enable it, set the following configurations in the ${project.root}/trino/conf/catalog/pulsar.properties properties file:

  1. pulsar.authorization-enabled=true
  2. pulsar.broker-binary-service-url=pulsar://localhost:6650

Connect Trino to Pulsar with multiple hosts

You can connect Trino to a Pulsar cluster with multiple hosts.

  • To configure multiple hosts for brokers, add multiple URLs to pulsar.web-service-url.
  • To configure multiple hosts for ZooKeeper, add multiple URIs to pulsar.zookeeper-uri.

The following is an example.

  1. pulsar.web-service-url=http://localhost:8080,localhost:8081,localhost:8082
  2. pulsar.zookeeper-uri=localhost1,localhost2:2181

Get the last message in a topic

Configuration and deployment - 图1note

By default, Pulsar SQL does not get the last message in a topic. It is by design and controlled by settings. By default, BookKeeper LAC only advances when subsequent entries are added. If there is no subsequent entry added, the last written entry is not visible to readers until the ledger is closed. This is not a problem for Pulsar which uses managed ledger, but Pulsar SQL directly reads from BookKeeper ledger.

If you want to get the last message in a topic, set the following configurations:

  1. For the broker configuration, set bookkeeperExplicitLacIntervalInMills > 0 in broker.conf or standalone.conf.

  2. For the Trino configuration, set pulsar.bookkeeper-explicit-interval > 0 and pulsar.bookkeeper-use-v2-protocol=false.

However, using BookKeeper V3 protocol introduces additional GC overhead to BK as it uses Protobuf.

Query data from existing Trino clusters

If you already have a Trino cluster compatible to version 363, you can copy the Pulsar Trino plugin to your existing cluster. Download the archived plugin package with the following command.

  1. wget https://archive.apache.org/dist/pulsar/pulsar-3.1.1/apache-pulsar-3.1.1-bin.tar.gz

Deploy a new cluster

Since Pulsar SQL is powered by Trino, the configuration for deployment is the same for the Pulsar SQL worker.

Configuration and deployment - 图2note

For how to set up a standalone single node environment, refer to Query data.

You can use the same CLI args as the Trino launcher.

The default configuration for the cluster is located in ${project.root}/trino/conf. You can customize your deployment by modifying the default configuration.

You can set the worker to read from a different configuration directory, or set a different directory to write data.

  1. ./bin/pulsar sql-worker run --etc-dir /tmp/pulsar/trino/conf --data-dir /tmp/trino-1

You can start the worker as daemon process.

  1. ./bin/pulsar sql-worker start

Deploy a cluster on multiple nodes

You can deploy a Pulsar SQL cluster or Trino cluster on multiple nodes. The following example shows how to deploy a cluster on three-node cluster.

  1. Copy the Pulsar binary distribution to three nodes.

The first node runs as Trino coordinator. The minimal configuration required in the ${project.root}/trino/conf/config.properties file is as follows.

  1. coordinator=true
  2. node-scheduler.include-coordinator=true
  3. http-server.http.port=8080
  4. query.max-memory=50GB
  5. query.max-memory-per-node=1GB
  6. discovery-server.enabled=true
  7. discovery.uri=<coordinator-url>

The other two nodes serve as worker nodes, you can use the following configuration for worker nodes.

  1. coordinator=false
  2. http-server.http.port=8080
  3. query.max-memory=50GB
  4. query.max-memory-per-node=1GB
  5. discovery.uri=<coordinator-url>
  1. Modify pulsar.web-service-url and pulsar.zookeeper-uri configuration in the ${project.root}/trino/conf/catalog/pulsar.properties file accordingly for the three nodes.

  2. Start the coordinator node:

  1. ./bin/pulsar sql-worker run
  1. Start worker nodes:
  1. ./bin/pulsar sql-worker run
  1. Start the SQL CLI and check the status of your cluster:
  1. ./bin/pulsar sql --server <coordinate_url>
  1. Check the status of your nodes:
  1. trino> SELECT * FROM system.runtime.nodes;
  2. node_id | http_uri | node_version | coordinator | state
  3. ---------+-------------------------+--------------+-------------+--------
  4. 1 | http://192.168.2.1:8081 | testversion | true | active
  5. 3 | http://192.168.2.2:8081 | testversion | false | active
  6. 2 | http://192.168.2.3:8081 | testversion | false | active

For more information about the deployment in Trino, refer to Trino deployment.

Configuration and deployment - 图3note

The broker does not advance LAC, so when Pulsar SQL bypasses brokers to query data, it can only read entries up to the LAC that all the bookies learned. You can enable periodically write LAC on the broker by setting “bookkeeperExplicitLacIntervalInMills” in the broker.conf file.