应用程序可以使用 Pulsar 加密在生产者侧加密消息,并在消费者侧解密消息。 你可以使用应用程序配置的公钥私钥对进行加密。 只有拥有有效密钥的消费者可以解密加密过的消息。

对称加密与非对称加密

Pulsar使用动态生成的AES秘钥来加密消息(数据)。 你可以使用应用程序提供的 ECDSA/RSA 密钥对来加密 AES 密钥(数据密钥), 所以你不必与大家分享秘密。

Key is a public and private key pair used for encryption or decryption. The producer key is the public key of the key pair, and the consumer key is the private key of the key pair.

应用程序用公钥配置生产者。 你可以使用此密钥来加密 AES 数据密钥。 The encrypted data key is sent as part of message header. 只有拥有私钥的实体 (在这种情况下是消费者) 才能解密用于解密消息的数据密钥。

You can encrypt a message with more than one key. Any one of the keys used for encrypting the message is sufficient to decrypt the message.

Pulsar does not store the encryption key anywhere in the Pulsar service. If you lose or delete the private key, your message is irretrievably lost, and is unrecoverable.

Producer

alt text

Consumer

alt text

Get started

  1. 输入下面的命令来创建你的 ECDSA 或 RSA 公钥私钥对。
  1. openssl ecparam -name secp521r1 -genkey -param_enc explicit -out test_ecdsa_privkey.pem
  2. openssl ec -in test_ecdsa_privkey.pem -pubout -outform pkcs8 -out test_ecdsa_pubkey.pem
  1. Add the public and private key to the key management and configure your producers to retrieve public keys and consumers clients to retrieve private keys.

  2. Implement CryptoKeyReader::getPublicKey() interface from producer and CryptoKeyReader::getPrivateKey() interface from consumer, which Pulsar client invokes to load the key.

  3. Add encryption key to producer configuration: conf.addEncryptionKey(“myapp.key”).

  4. Add CryptoKeyReader implementation to producer or consumer config: conf.setCryptoKeyReader(keyReader).

  5. Sample producer application:

  1. class RawFileKeyReader implements CryptoKeyReader {
  2. String publicKeyFile = "";
  3. String privateKeyFile = "";
  4. RawFileKeyReader(String pubKeyFile, String privKeyFile) {
  5. publicKeyFile = pubKeyFile;
  6. privateKeyFile = privKeyFile;
  7. }
  8. @Override
  9. public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
  10. EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
  11. try {
  12. keyInfo.setKey(Files.readAllBytes(Paths.get(publicKeyFile)));
  13. } catch (IOException e) {
  14. System.out.println("ERROR: Failed to read public key from file " + publicKeyFile);
  15. e.printStackTrace();
  16. }
  17. return keyInfo;
  18. }
  19. @Override
  20. public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
  21. EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
  22. try {
  23. keyInfo.setKey(Files.readAllBytes(Paths.get(privateKeyFile)));
  24. } catch (IOException e) {
  25. System.out.println("ERROR: Failed to read private key from file " + privateKeyFile);
  26. e.printStackTrace();
  27. }
  28. return keyInfo;
  29. }
  30. }
  31. PulsarClient pulsarClient = PulsarClient.create("http://localhost:8080");
  32. ProducerConfiguration prodConf = new ProducerConfiguration();
  33. prodConf.setCryptoKeyReader(new RawFileKeyReader("test_ecdsa_pubkey.pem", "test_ecdsa_privkey.pem"));
  34. prodConf.addEncryptionKey("myappkey");
  35. Producer producer = pulsarClient.createProducer("persistent://my-tenant/my-ns/my-topic", prodConf);
  36. for (int i = 0; i < 10; i++) {
  37. producer.send("my-message".getBytes());
  38. }
  39. pulsarClient.close();
  1. 简单的消费者示例
  1. class RawFileKeyReader implements CryptoKeyReader {
  2. String publicKeyFile = "";
  3. String privateKeyFile = "";
  4. RawFileKeyReader(String pubKeyFile, String privKeyFile) {
  5. publicKeyFile = pubKeyFile;
  6. privateKeyFile = privKeyFile;
  7. }
  8. @Override
  9. public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
  10. EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
  11. try {
  12. keyInfo.setKey(Files.readAllBytes(Paths.get(publicKeyFile)));
  13. } catch (IOException e) {
  14. System.out.println("ERROR: Failed to read public key from file " + publicKeyFile);
  15. e.printStackTrace();
  16. }
  17. return keyInfo;
  18. }
  19. @Override
  20. public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
  21. EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
  22. try {
  23. keyInfo.setKey(Files.readAllBytes(Paths.get(privateKeyFile)));
  24. } catch (IOException e) {
  25. System.out.println("ERROR: Failed to read private key from file " + privateKeyFile);
  26. e.printStackTrace();
  27. }
  28. return keyInfo;
  29. }
  30. }
  31. ConsumerConfiguration consConf = new ConsumerConfiguration();
  32. consConf.setCryptoKeyReader(new RawFileKeyReader("test_ecdsa_pubkey.pem", "test_ecdsa_privkey.pem"));
  33. PulsarClient pulsarClient = PulsarClient.create("http://localhost:8080");
  34. Consumer consumer = pulsarClient.subscribe("persistent://my-tenant/my-ns/my-topic", "my-subscriber-name", consConf);
  35. Message msg = null;
  36. for (int i = 0; i < 10; i++) {
  37. msg = consumer.receive();
  38. // do something
  39. System.out.println("Received: " + new String(msg.getData()));
  40. }
  41. // Acknowledge the consumption of all messages at once
  42. consumer.acknowledgeCumulative(msg);
  43. pulsarClient.close();

Key rotation

Pulsar generates new AES data key every 4 hours or after publishing a certain number of messages. A producer fetches the asymmetric public key every 4 hours by calling CryptoKeyReader::getPublicKey() to retrieve the latest version.

在生产者应用程序中启用加密

If you produce messages that are consumed across application boundaries, you need to ensure that consumers in other applications have access to one of the private keys that can decrypt the messages. You can do this in two ways:

  1. 消费者应用程序提供了访问他们公钥的权限,你可以将公钥添加到你的生产者密钥上。
  2. 你授予访问权限给生产者使用的密钥对中的其中一个私钥。

当生产者想要用多个密钥加密消息时,生产者将所有这些密钥添加到配置中。 只要消费者能够访问至少一个密钥,消费者就可以解密消息。

如果你需要使用两个键(myapp.messagekey1和myapp.messagekey2)加密消息,请参阅下面的例子。

  1. conf.addEncryptionKey("myapp.messagekey1");
  2. conf.addEncryptionKey("myapp.messagekey2");

解密消费者应用程序中的加密消息

消费者需要访问其中一个私钥才能解密生产者所产生的信息。 如果你想要接收加密信件, 创建一个公钥或私钥,并将你的公钥交给生产者应用程序来使用你的公钥加密消息。

处理失败

  • Producer/ Consumer loses access to the key
    • 生产者操作失败指明失败的原因。 Application has the option to proceed with sending unencrypted message in such cases. Call conf.setCryptoFailureAction(ProducerCryptoFailureAction) to control the producer behavior. The default behavior is to fail the request.
    • 如果消费因解密失败或消费者缺失密钥而失败,应用程序可以选择消费加密的消息或丢弃它。 Call conf.setCryptoFailureAction(ConsumerCryptoFailureAction) to control the consumer behavior. The default behavior is to fail the request. 如果私钥永久丢失,应用程序永远无法解密消息。
  • Batch messaging
    • If decryption fails and the message contains batch messages, client is not able to retrieve individual messages in the batch, hence message consumption fails even if conf.setCryptoFailureAction() is set to CONSUME.
  • 如果解密失败,消息消费会停止,应用程序除了在客户端日志中记录解密失败消息外还会通知积压增加。 如果应用程序不能访问私钥来解密消息,唯一的选项是跳过或丢弃已经积压的消息。