Pulsar provides a way to use custom authentication and authorization mechanisms.

Authentication

Pulsar supports mutual TLS and Athenz authentication plugins. For how to use these authentication plugins, you can refer to the description in Security.

You can choose to use a custom authentication mechanism by providing the implementation in the form of two plugins. One plugin is for the Client library and the other plugin is for the Pulsar Broker to validate the credentials.

Client authentication plugin

For client library, you need to implement org.apache.pulsar.client.api.Authentication. By entering the command below you can pass this class when you create a Pulsar client:

  1. PulsarClient client = PulsarClient.builder()
  2. .serviceUrl("pulsar://localhost:6650")
  3. .authentication(new MyAuthentication())
  4. .build();

You can use 2 interfaces to implement on the client side:

This in turn needs to provide the client credentials in the form of org.apache.pulsar.client.api.AuthenticationDataProvider. This leaves the chance to return different kinds of authentication token for different types of connection or by passing a certificate chain to use for TLS.

You can find examples for client authentication providers at:

Broker authentication plugin

On broker side, you need the corresponding plugin to validate the credentials that the client passes. Broker can support multiple authentication providers at the same time.

In conf/broker.conf you can choose to specify a list of valid providers:

  1. # Autentication provider name list, which is comma separated list of class names
  2. authenticationProviders=

To implement org.apache.pulsar.broker.authentication.AuthenticationProvider on one single interface:

  1. /**
  2. * Provider of authentication mechanism
  3. */
  4. public interface AuthenticationProvider extends Closeable {
  5. /**
  6. * Perform initialization for the authentication provider
  7. *
  8. * @param config
  9. * broker config object
  10. * @throws IOException
  11. * if the initialization fails
  12. */
  13. void initialize(ServiceConfiguration config) throws IOException;
  14. /**
  15. * @return the authentication method name supported by this provider
  16. */
  17. String getAuthMethodName();
  18. /**
  19. * Validate the authentication for the given credentials with the specified authentication data
  20. *
  21. * @param authData
  22. * provider specific authentication data
  23. * @return the "role" string for the authenticated connection, if the authentication was successful
  24. * @throws AuthenticationException
  25. * if the credentials are not valid
  26. */
  27. String authenticate(AuthenticationDataSource authData) throws AuthenticationException;
  28. }

The following is the example for Broker authentication plugins:

Authorization

Authorization is the operation that checks whether a particular “role” or “principal” has a permission to perform a certain operation.

By default, Pulsar provides an embedded authorization, though configuring a different one through a plugin is also an alternative choice.

To provide a custom provider, you need to implement the org.apache.pulsar.broker.authorization.AuthorizationProvider interface, put this class in the Pulsar broker classpath and configure the class in conf/broker.conf:

  1. # Authorization provider fully qualified class-name
  2. authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider
  1. /**
  2. * Provider of authorization mechanism
  3. */
  4. public interface AuthorizationProvider extends Closeable {
  5. /**
  6. * Perform initialization for the authorization provider
  7. *
  8. * @param config
  9. * broker config object
  10. * @param configCache
  11. * pulsar zk configuration cache service
  12. * @throws IOException
  13. * if the initialization fails
  14. */
  15. void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException;
  16. /**
  17. * Check if the specified role has permission to send messages to the specified fully qualified topic name.
  18. *
  19. * @param topicName
  20. * the fully qualified topic name associated with the topic.
  21. * @param role
  22. * the app id used to send messages to the topic.
  23. */
  24. CompletableFuture<Boolean> canProduceAsync(TopicName topicName, String role,
  25. AuthenticationDataSource authenticationData);
  26. /**
  27. * Check if the specified role has permission to receive messages from the specified fully qualified topic name.
  28. *
  29. * @param topicName
  30. * the fully qualified topic name associated with the topic.
  31. * @param role
  32. * the app id used to receive messages from the topic.
  33. * @param subscription
  34. * the subscription name defined by the client
  35. */
  36. CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String role,
  37. AuthenticationDataSource authenticationData, String subscription);
  38. /**
  39. * Check whether the specified role can perform a lookup for the specified topic.
  40. *
  41. * For that the caller needs to have producer or consumer permission.
  42. *
  43. * @param topicName
  44. * @param role
  45. * @return
  46. * @throws Exception
  47. */
  48. CompletableFuture<Boolean> canLookupAsync(TopicName topicName, String role,
  49. AuthenticationDataSource authenticationData);
  50. /**
  51. *
  52. * Grant authorization-action permission on a namespace to the given client
  53. *
  54. * @param namespace
  55. * @param actions
  56. * @param role
  57. * @param authDataJson
  58. * additional authdata in json format
  59. * @return CompletableFuture
  60. * @completesWith <br/>
  61. * IllegalArgumentException when namespace not found<br/>
  62. * IllegalStateException when failed to grant permission
  63. */
  64. CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions, String role,
  65. String authDataJson);
  66. /**
  67. * Grant authorization-action permission on a topic to the given client
  68. *
  69. * @param topicName
  70. * @param role
  71. * @param authDataJson
  72. * additional authdata in json format
  73. * @return CompletableFuture
  74. * @completesWith <br/>
  75. * IllegalArgumentException when namespace not found<br/>
  76. * IllegalStateException when failed to grant permission
  77. */
  78. CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<AuthAction> actions, String role,
  79. String authDataJson);
  80. }