Pulsar Encryption

应用程序可以使用 Pulsar 加密在生产者侧加密消息,并在消费者侧解密消息。 你可以使用应用程序配置的公钥私钥对进行加密。 只有拥有有效密钥的消费者可以解密加密过的消息。

对称加密与非对称加密

Pulsar使用动态生成的对称AES秘钥来加密消息(数据)。 你可以使用应用程序提供的 ECDSA/RSA 密钥对来加密 AES 密钥(数据密钥), 所以你不必与大家分享秘密。

Key is a public and private key pair used for encryption or decryption. The producer key is the public key of the key pair, and the consumer key is the private key of the key pair.

应用程序用公钥配置生产者。 你可以使用此密钥来加密 AES 数据密钥。 加密数据秘钥作为消息头部的一部分发送。 只有拥有私钥的实体 (在这种情况下是消费者) 才能解密用于解密消息的数据密钥。

You can encrypt a message with more than one key. Any one of the keys used for encrypting the message is sufficient to decrypt the message.

Pulsar does not store the encryption key anywhere in the Pulsar service. If you lose or delete the private key, your message is irretrievably lost, and is unrecoverable.

生产者(Producer)

alt text

消费者(Consumer)

alt text

开始

  1. 输入下面的命令来创建你的 ECDSA 或 RSA 公钥私钥对。
  1. openssl ecparam -name secp521r1 -genkey -param_enc explicit -out test_ecdsa_privkey.pem
  2. openssl ec -in test_ecdsa_privkey.pem -pubout -outform pem -out test_ecdsa_pubkey.pem
  1. 将公钥和私钥添加到秘钥管理中,并且配置你的生产者去得到公钥,消费者去得到私钥。

  2. 实现 CryptoKeyReader 接口,尤其是生产者的 CryptoKeyReader.getPublicKey() 和消费者的 CryptoKeyReader.getPrivateKey() ,Pulsar 客户端会调用这两个方法来加载密钥。

  3. 向生产者 builder 添加加密密钥名称: PulsarClient.newProducer().addEncryptionKey(“myapp.key”)。

  4. 将 CryptoKeyReader 实现添加到生产者或消费者 builder: PulsarClient.newProducer().cryptoKeyReader(keyReader) / PulsarClient.newConsumer().cryptoKeyReader(keyReader)。

  5. 生产者代码示例:

  1. class RawFileKeyReader implements CryptoKeyReader {
  2. String publicKeyFile = "";
  3. String privateKeyFile = "";
  4. RawFileKeyReader(String pubKeyFile, String privKeyFile) {
  5. publicKeyFile = pubKeyFile;
  6. privateKeyFile = privKeyFile;
  7. }
  8. @Override
  9. public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
  10. EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
  11. try {
  12. keyInfo.setKey(Files.readAllBytes(Paths.get(publicKeyFile)));
  13. } catch (IOException e) {
  14. System.out.println("ERROR: Failed to read public key from file " + publicKeyFile);
  15. e.printStackTrace();
  16. }
  17. return keyInfo;
  18. }
  19. @Override
  20. public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
  21. EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
  22. try {
  23. keyInfo.setKey(Files.readAllBytes(Paths.get(privateKeyFile)));
  24. } catch (IOException e) {
  25. System.out.println("ERROR: Failed to read private key from file " + privateKeyFile);
  26. e.printStackTrace();
  27. }
  28. return keyInfo;
  29. }
  30. }
  31. PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
  32. Producer producer = pulsarClient.newProducer()
  33. .topic("persistent://my-tenant/my-ns/my-topic")
  34. .addEncryptionKey("myappkey")
  35. .cryptoKeyReader(new RawFileKeyReader("test_ecdsa_pubkey.pem", "test_ecdsa_privkey.pem"))
  36. .create();
  37. for (int i = 0; i < 10; i++) {
  38. producer.send("my-message".getBytes());
  39. }
  40. producer.close();
  41. pulsarClient.close();
  1. 简单的消费者示例
  1. class RawFileKeyReader implements CryptoKeyReader {
  2. String publicKeyFile = "";
  3. String privateKeyFile = "";
  4. RawFileKeyReader(String pubKeyFile, String privKeyFile) {
  5. publicKeyFile = pubKeyFile;
  6. privateKeyFile = privKeyFile;
  7. }
  8. @Override
  9. public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
  10. EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
  11. try {
  12. keyInfo.setKey(Files.readAllBytes(Paths.get(publicKeyFile)));
  13. } catch (IOException e) {
  14. System.out.println("ERROR: Failed to read public key from file " + publicKeyFile);
  15. e.printStackTrace();
  16. }
  17. return keyInfo;
  18. }
  19. @Override
  20. public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
  21. EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
  22. try {
  23. keyInfo.setKey(Files.readAllBytes(Paths.get(privateKeyFile)));
  24. } catch (IOException e) {
  25. System.out.println("ERROR: Failed to read private key from file " + privateKeyFile);
  26. e.printStackTrace();
  27. }
  28. return keyInfo;
  29. }
  30. }
  31. PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
  32. Consumer consumer = pulsarClient.newConsumer()
  33. .topic("persistent://my-tenant/my-ns/my-topic")
  34. .subscriptionName("my-subscriber-name")
  35. .cryptoKeyReader(new RawFileKeyReader("test_ecdsa_pubkey.pem", "test_ecdsa_privkey.pem"))
  36. .subscribe();
  37. Message msg = null;
  38. for (int i = 0; i < 10; i++) {
  39. msg = consumer.receive();
  40. // do something
  41. System.out.println("Received: " + new String(msg.getData()));
  42. }
  43. // Acknowledge the consumption of all messages at once
  44. consumer.acknowledgeCumulative(msg);
  45. consumer.close();
  46. pulsarClient.close();

秘钥更新

Pulsar 每隔 4 小时或在发布一定数量的消息后生成一个新的 AES 数据密钥。 生产者通过调用 CryptoKeyReader.getPublicKey() 获取最新版本,以此来每隔 4 小时获取一次不对称公钥。

在生产者应用程序中启用加密

如果生产和消费的程序不是同一个, 你需要确保消费者能够访问能够解密消息的私钥。 You can do this in two ways:

  1. 消费者应用程序提供了访问他们公钥的权限,你可以将公钥添加到你的生产者密钥上。
  2. 你授予访问权限给生产者使用的密钥对中的其中一个私钥。

When producers want to encrypt the messages with multiple keys, producers add all such keys to the config. Consumer can decrypt the message as long as the consumer has access to at least one of the keys.

如果你需要使用两个键(myapp.messagekey1和myapp.messagekey2)加密消息,请参阅下面的例子。

  1. PulsarClient.newProducer().addEncryptionKey("myapp.messagekey1").addEncryptionKey("myapp.messagekey2");

解密消费者应用程序中的加密消息

消费者需要访问其中一个私钥才能解密生产者所产生的信息。 如果你想要接收加密信件, 创建一个公钥或私钥,并将你的公钥交给生产者应用程序来使用你的公钥加密消息。

处理失败

  • 生产者/消费者无法访问秘钥
    • 生产者操作失败指明失败的原因。 在这种情况下,应用可以选择继续发送未加密的消息。 调用 PulsarClient.newProducer().cryptoFailureAction(ProducerCryptoFailureAction) 控制生产者的行为。 默认的行为是请求失败。
    • 如果消费因解密失败或消费者缺失密钥而失败,应用程序可以选择消费加密的消息或丢弃它。 调用 PulsarClient.newConsumer().cryptoFailureAction(ConsumerCryptoFailureAction) 控制消费者的行为。 默认的行为是请求失败。 如果私钥永久丢失,应用程序永远无法解密消息。
  • 批量消息
    • 如果解密失败且消息包含批量消息,客户端会无法在批次中检索单独的消息, 因此,即使将 cryptoFailureAction() 设置为 ConsumerCryptoFailureAction.CONSUME,消息消费还是会失败。
  • 如果解密失败,消息消费会停止,应用程序除了在客户端日志中记录解密失败消息外还会通知积压增加。 如果应用程序不能访问私钥来解密消息,唯一的选项是跳过或丢弃已经积压的消息。