Extending Authentication and Authorization in Pulsar
Pulsar 提供了实现自定义认证和授权的机制。
认证
Pulsar supports mutual TLS and Athenz authentication plugins. For how to use these authentication plugins, you can refer to the description in Security.
You can choose to use a custom authentication mechanism by providing the implementation in the form of two plugins. One plugin is for the Client library and the other plugin is for the Pulsar Broker to validate the credentials.
客户端认证插件
For client library, you need to implement org.apache.pulsar.client.api.Authentication
. By entering the command below you can pass this class when you create a Pulsar client:
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.authentication(new MyAuthentication())
.build();
You can use 2 interfaces to implement on the client side:
Authentication
-> http://pulsar.apache.org/api/client/2.7.0-SNAPSHOT/org/apache/pulsar/client/api/Authentication.htmlAuthenticationDataProvider
-> http://pulsar.apache.org/api/client/2.7.0-SNAPSHOT/org/apache/pulsar/client/api/AuthenticationDataProvider.html
这反过来需要以 org.apache.pulsar.client.api.AuthentationDataProvider
的形式提供客户端凭据。 这样就有机会为不同类型的连接返回不同类型的身份验证令牌,或者通过传递证书链来为TLS使用。
你可以找到客户端认证实现的一些例子:
- Mutual TLS Auth — https://github.com/apache/pulsar/tree/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth
- Athenz — https://github.com/apache/pulsar/tree/master/pulsar-client-auth-athenz/src/main/java/org/apache/pulsar/client/impl/auth
Broker authentication plugin
On broker side, you need the corresponding plugin to validate the credentials that the client passes. Broker can support multiple authentication providers at the same time.
在配置文件conf/broker.conf
中,你能够指定一个有效提供者列表:
# Autentication provider name list, which is comma separated list of class names
authenticationProviders=
在单个接口实现org.apache.pulsar.broker.authentication.AuthenticationProvider
:
/**
* 认证机制提供者
*/
public interface AuthenticationProvider extends Closeable {
/**
* 初始化认证提供者
*
* @param config
* Broker 配置对象
* @throws IOException
* 如果初始化失败,则抛出异常
*/
void initialize(ServiceConfiguration config) throws IOException;
/**
* @return 返回本认证提供者的名称
*/
String getAuthMethodName();
/**
* 使用指定的身份验证数据验证给定凭据
*
* @param authData
* provider specific authentication data
* @return the "role" string for the authenticated connection, if the authentication was successful
* @throws AuthenticationException
* if the credentials are not valid
*/
String authenticate(AuthenticationDataSource authData) throws AuthenticationException;
}
以下是 Broker 认证插件的示例:
- Mutual TLS — https://github.com/apache/pulsar/blob/master/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTls.java
- Athenz — https://github.com/apache/pulsar/blob/master/pulsar-broker-auth-athenz/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderAthenz.java
授权
Authorization is the operation that checks whether a particular “role” or “principal” has a permission to perform a certain operation.
By default, Pulsar provides an embedded authorization, though configuring a different one through a plugin is also an alternative choice.
要提供自定义提供者,你必须实现org.apache.pulsar.broker.authorization.AuthorizationProvider
接口。并将该类放到 Pulsar 所用的 classpath中,并将类名配置到conf/broker.conf
中。
# Authorization provider fully qualified class-name
authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider
/**
* Provider of authorization mechanism
*/
public interface AuthorizationProvider extends Closeable {
/**
* Perform initialization for the authorization provider
*
* @param config
* broker config object
* @param configCache
* pulsar zk configuration cache service
* @throws IOException
* if the initialization fails
*/
void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException;
/**
* Check if the specified role has permission to send messages to the specified fully qualified topic name.
*
* @param topicName
* the fully qualified topic name associated with the topic.
* @param role
* the app id used to send messages to the topic.
*/
CompletableFuture<Boolean> canProduceAsync(TopicName topicName, String role,
AuthenticationDataSource authenticationData);
/**
* 检查指定角色是否具有从指定的完全限定主题接收消息的权限.
*
* @param topicName
* the fully qualified topic name associated with the topic.
* @param role
* the app id used to receive messages from the topic.
* @param subscription
* the subscription name defined by the client
*/
CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String role,
AuthenticationDataSource authenticationData, String subscription);
/**
* 检查指定角色是否可以执行指定主题的查找。
*
* 要调用它必须拥有生产者或消费者的权限.
*
* @param topicName
* @param role
* @return
* @throws Exception
*/
CompletableFuture<Boolean> canLookupAsync(TopicName topicName, String role,
AuthenticationDataSource authenticationData);
/**
*
* 在命名空间级别授予客户端授权操作的权限。
*
* @param namespace
* @param actions
* @param role
* @param authDataJson
* additional authdata in json format
* @return CompletableFuture
* @completesWith <br/>
* IllegalArgumentException when namespace not found<br/>
* IllegalStateException when failed to grant permission
*/
CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions, String role,
String authDataJson);
/**
* 在主题上授予客户端授权操作的权限。
*
* @param topicName
* @param role
* @param authDataJson
* additional authdata in json format
* @return CompletableFuture
* @completesWith <br/>
* IllegalArgumentException when namespace not found<br/>
* IllegalStateException when failed to grant permission
*/
CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<AuthAction> actions, String role,
String authDataJson);
}