Extending Authentication and Authorization in Pulsar

Pulsar 提供了实现自定义认证和授权的机制。

认证

Pulsar supports mutual TLS and Athenz authentication plugins. For how to use these authentication plugins, you can refer to the description in Security.

You can choose to use a custom authentication mechanism by providing the implementation in the form of two plugins. One plugin is for the Client library and the other plugin is for the Pulsar Broker to validate the credentials.

客户端认证插件

For client library, you need to implement org.apache.pulsar.client.api.Authentication. By entering the command below you can pass this class when you create a Pulsar client:

  1. PulsarClient client = PulsarClient.builder()
  2. .serviceUrl("pulsar://localhost:6650")
  3. .authentication(new MyAuthentication())
  4. .build();

You can use 2 interfaces to implement on the client side:

这反过来需要以 org.apache.pulsar.client.api.AuthentationDataProvider 的形式提供客户端凭据。 这样就有机会为不同类型的连接返回不同类型的身份验证令牌,或者通过传递证书链来为TLS使用。

你可以找到客户端认证实现的一些例子:

Broker authentication plugin

On broker side, you need the corresponding plugin to validate the credentials that the client passes. Broker can support multiple authentication providers at the same time.

在配置文件conf/broker.conf中,你能够指定一个有效提供者列表:

  1. # Autentication provider name list, which is comma separated list of class names
  2. authenticationProviders=

在单个接口实现org.apache.pulsar.broker.authentication.AuthenticationProvider

  1. /**
  2. * 认证机制提供者
  3. */
  4. public interface AuthenticationProvider extends Closeable {
  5. /**
  6. * 初始化认证提供者
  7. *
  8. * @param config
  9. * Broker 配置对象
  10. * @throws IOException
  11. * 如果初始化失败,则抛出异常
  12. */
  13. void initialize(ServiceConfiguration config) throws IOException;
  14. /**
  15. * @return 返回本认证提供者的名称
  16. */
  17. String getAuthMethodName();
  18. /**
  19. * 使用指定的身份验证数据验证给定凭据
  20. *
  21. * @param authData
  22. * provider specific authentication data
  23. * @return the "role" string for the authenticated connection, if the authentication was successful
  24. * @throws AuthenticationException
  25. * if the credentials are not valid
  26. */
  27. String authenticate(AuthenticationDataSource authData) throws AuthenticationException;
  28. }

以下是 Broker 认证插件的示例:

授权

Authorization is the operation that checks whether a particular “role” or “principal” has a permission to perform a certain operation.

By default, Pulsar provides an embedded authorization, though configuring a different one through a plugin is also an alternative choice.

要提供自定义提供者,你必须实现org.apache.pulsar.broker.authorization.AuthorizationProvider接口。并将该类放到 Pulsar 所用的 classpath中,并将类名配置到conf/broker.conf中。

  1. # Authorization provider fully qualified class-name
  2. authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider
  1. /**
  2. * Provider of authorization mechanism
  3. */
  4. public interface AuthorizationProvider extends Closeable {
  5. /**
  6. * Perform initialization for the authorization provider
  7. *
  8. * @param config
  9. * broker config object
  10. * @param configCache
  11. * pulsar zk configuration cache service
  12. * @throws IOException
  13. * if the initialization fails
  14. */
  15. void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException;
  16. /**
  17. * Check if the specified role has permission to send messages to the specified fully qualified topic name.
  18. *
  19. * @param topicName
  20. * the fully qualified topic name associated with the topic.
  21. * @param role
  22. * the app id used to send messages to the topic.
  23. */
  24. CompletableFuture<Boolean> canProduceAsync(TopicName topicName, String role,
  25. AuthenticationDataSource authenticationData);
  26. /**
  27. * 检查指定角色是否具有从指定的完全限定主题接收消息的权限.
  28. *
  29. * @param topicName
  30. * the fully qualified topic name associated with the topic.
  31. * @param role
  32. * the app id used to receive messages from the topic.
  33. * @param subscription
  34. * the subscription name defined by the client
  35. */
  36. CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String role,
  37. AuthenticationDataSource authenticationData, String subscription);
  38. /**
  39. * 检查指定角色是否可以执行指定主题的查找。
  40. *
  41. * 要调用它必须拥有生产者或消费者的权限.
  42. *
  43. * @param topicName
  44. * @param role
  45. * @return
  46. * @throws Exception
  47. */
  48. CompletableFuture<Boolean> canLookupAsync(TopicName topicName, String role,
  49. AuthenticationDataSource authenticationData);
  50. /**
  51. *
  52. * 在命名空间级别授予客户端授权操作的权限。
  53. *
  54. * @param namespace
  55. * @param actions
  56. * @param role
  57. * @param authDataJson
  58. * additional authdata in json format
  59. * @return CompletableFuture
  60. * @completesWith <br/>
  61. * IllegalArgumentException when namespace not found<br/>
  62. * IllegalStateException when failed to grant permission
  63. */
  64. CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions, String role,
  65. String authDataJson);
  66. /**
  67. * 在主题上授予客户端授权操作的权限。
  68. *
  69. * @param topicName
  70. * @param role
  71. * @param authDataJson
  72. * additional authdata in json format
  73. * @return CompletableFuture
  74. * @completesWith <br/>
  75. * IllegalArgumentException when namespace not found<br/>
  76. * IllegalStateException when failed to grant permission
  77. */
  78. CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<AuthAction> actions, String role,
  79. String authDataJson);
  80. }