Extending Authentication and Authorization in Pulsar

Pulsar 提供了实现自定义认证和授权的机制。

认证

Pulsar supports mutual TLS and Athenz authentication plugins. For how to use these authentication plugins, you can refer to the description in Security.

你能够使用自定义的认证机制,只要实现以下两个插件的具体实现。 一个插件用于客户端库,另一个插件用于Pulsar Proxy 和 Pulsar Broker 来验证凭据。

客户端认证插件

For the client library, you need to implement org.apache.pulsar.client.api.Authentication. By entering the command below you can pass this class when you create a Pulsar client:

  1. PulsarClient client = PulsarClient.builder()
  2. .serviceUrl("pulsar://localhost:6650")
  3. .authentication(new MyAuthentication())
  4. .build();

You can use 2 interfaces to implement on the client side:

这反过来需要以 org.apache.pulsar.client.api.AuthentationDataProvider 的形式提供客户端凭据。 这样就有机会为不同类型的连接返回不同类型的身份验证令牌,或者通过传递证书链来为TLS使用。

你可以找到客户端认证实现的一些例子:

Proxy/Broker 认证插件

在 Proxy/Broker 这一侧,你需要配置相应的插件去检验客户端发送过来的凭据信息。 Proxy 和 Broker 可以同时支持多个身份认证提供者。

在配置文件conf/broker.conf中,你能够指定一个有效提供者列表:

  1. # 认证提供者名字列表, 用逗号分隔类名
  2. authenticationProviders=

在单个接口实现org.apache.pulsar.broker.authentication.AuthenticationProvider

  1. /**
  2. * 认证机制提供者
  3. */
  4. public interface AuthenticationProvider extends Closeable {
  5. /**
  6. * 初始化认证提供者
  7. *
  8. * @param config
  9. * Broker 配置对象
  10. * @throws IOException
  11. * 如果初始化失败,则抛出异常
  12. */
  13. void initialize(ServiceConfiguration config) throws IOException;
  14. /**
  15. * @return 返回本认证提供者的名称
  16. */
  17. String getAuthMethodName();
  18. /**
  19. * 使用指定的身份验证数据验证给定凭据
  20. *
  21. * @param authData
  22. * provider specific authentication data
  23. * @return the "role" string for the authenticated connection, if the authentication was successful
  24. * @throws AuthenticationException
  25. * if the credentials are not valid
  26. */
  27. String authenticate(AuthenticationDataSource authData) throws AuthenticationException;
  28. }

以下是 Broker 认证插件的示例:

授权

授权是检查某个“角色”或“主体”是否具有执行某个操作权限的操作。

默认情况下,你能够使用 Pulsar 自带的授权提供商。 你也可以通过插件的形式,配置不同的授权提供商。 注意,尽管认证插件被设计成可以在 Proxy 和 Broker 中使用。但是授权插件却被设计为只能在 Broker 中使用。所以,如果启用了授权机制,Proxy 只会执行一些简单的针对角色的授权检查。

要提供自定义提供者,你必须实现org.apache.pulsar.broker.authorization.AuthorizationProvider接口。并将该类放到 Pulsar 所用的 classpath中,并将类名配置到conf/broker.conf中。

  1. # Authorization provider fully qualified class-name
  2. authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider
  1. /**
  2. * 授权策略提供者接口
  3. */
  4. public interface AuthorizationProvider extends Closeable {
  5. /**
  6. * 初始化授权提供者
  7. *
  8. * @param conf
  9. * broker config object
  10. * @param configCache
  11. * pulsar zk configuration cache service
  12. * @throws IOException
  13. * if the initialization fails
  14. */
  15. void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException;
  16. /**
  17. * 检查指定角色是否具有向指定的完全限定主题名称发送消息的权限
  18. *
  19. * @param topicName
  20. * the fully qualified topic name associated with the topic.
  21. * @param role
  22. * the app id used to send messages to the topic.
  23. */
  24. CompletableFuture<Boolean> canProduceAsync(TopicName topicName, String role,
  25. AuthenticationDataSource authenticationData);
  26. /**
  27. * 检查指定角色是否具有从指定的完全限定主题接收消息的权限.
  28. *
  29. * @param topicName
  30. * the fully qualified topic name associated with the topic.
  31. * @param role
  32. * the app id used to receive messages from the topic.
  33. * @param subscription
  34. * the subscription name defined by the client
  35. */
  36. CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String role,
  37. AuthenticationDataSource authenticationData, String subscription);
  38. /**
  39. * 检查指定角色是否可以执行指定主题的查找。
  40. *
  41. * 要调用它必须拥有生产者或消费者的权限.
  42. *
  43. * @param topicName
  44. * @param role
  45. * @return
  46. * @throws Exception
  47. */
  48. CompletableFuture<Boolean> canLookupAsync(TopicName topicName, String role,
  49. AuthenticationDataSource authenticationData);
  50. /**
  51. *
  52. * 在命名空间级别授予客户端授权操作的权限。
  53. *
  54. * @param namespace
  55. * @param actions
  56. * @param role
  57. * @param authDataJson
  58. * additional authdata in json format
  59. * @return CompletableFuture
  60. * @completesWith <br/>
  61. * IllegalArgumentException when namespace not found<br/>
  62. * IllegalStateException when failed to grant permission
  63. */
  64. CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions, String role,
  65. String authDataJson);
  66. /**
  67. * 在主题上授予客户端授权操作的权限。
  68. *
  69. * @param topicName
  70. * @param role
  71. * @param authDataJson
  72. * additional authdata in json format
  73. * @return CompletableFuture
  74. * @completesWith <br/>
  75. * IllegalArgumentException when namespace not found<br/>
  76. * IllegalStateException when failed to grant permission
  77. */
  78. CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<AuthAction> actions, String role,
  79. String authDataJson);
  80. }