Pulsar Encryption

Pulsar encryption 允许应用程序在生产者处加密消息并在消费者处执行解密。 Encryption 会使用应用程序配置的公钥/私钥对执行加解密操作. 加密后的消息只能被使用正确秘钥的消费者解密。

对称加密与非对称加密

Pulsar使用动态生成的对称AES秘钥来加密消息(数据)。 AES 秘钥(数据秘钥) 使用应用程序提供的 ECDSA/RSA 秘钥来加密,因此没有必要与所有人共享秘钥。

Key is a public/private key pair used for encryption/decryption. The producer key is the public key, and the consumer key is the private key of the key pair.

应用程序用公钥配置生产者。 这个密钥用于加密 AES 数据密钥。 加密数据秘钥作为消息头部的一部分发送。 只有拥有私钥的实体(例子中的消费者) 才能解密用于解密消息的数据秘钥。

A message can be encrypted with more than one key. Any one of the keys used for encrypting the message is sufficient to decrypt the message

Pulsar does not store the encryption key anywhere in the pulsar service. If you lose/delete the private key, your message is irretrievably lost, and is unrecoverable

生产者(Producer)

alt text

消费者(Consumer)

alt text

操作步骤:

  1. 创建 ECDSA 或者 RSA 公钥/私钥对。
  1. openssl ecparam -name secp521r1 -genkey -param_enc explicit -out test_ecdsa_privkey.pem
  2. openssl ec -in test_ecdsa_privkey.pem -pubout -outform pem -out test_ecdsa_pubkey.pem
  1. 将公钥和私钥添加到秘钥管理中,并且配置你的生产者去得到公钥,消费者去得到私钥。
  2. 生产者实现 CryptoKeyReader::getPublicKey() 接口,消费者实现 CryptoKeyReader::getPrivateKey() 接口,Pulsar 客户端会调用这两个接口加载秘钥。
  3. 生产者配置加密秘钥:conf.addEncryptionKey(“myapp.key”)
  4. 生产者/消费者配置 CryptoKeyReader 的实现:conf.setCryptoKeyReader(keyReader)
  5. 生产者代码示例:
  1. class RawFileKeyReader implements CryptoKeyReader {
  2. String publicKeyFile = "";
  3. String privateKeyFile = "";
  4. RawFileKeyReader(String pubKeyFile, String privKeyFile) {
  5. publicKeyFile = pubKeyFile;
  6. privateKeyFile = privKeyFile;
  7. }
  8. @Override
  9. public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
  10. EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
  11. try {
  12. keyInfo.setKey(Files.readAllBytes(Paths.get(publicKeyFile)));
  13. } catch (IOException e) {
  14. System.out.println("ERROR: Failed to read public key from file " + publicKeyFile);
  15. e.printStackTrace();
  16. }
  17. return keyInfo;
  18. }
  19. @Override
  20. public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
  21. EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
  22. try {
  23. keyInfo.setKey(Files.readAllBytes(Paths.get(privateKeyFile)));
  24. } catch (IOException e) {
  25. System.out.println("ERROR: Failed to read private key from file " + privateKeyFile);
  26. e.printStackTrace();
  27. }
  28. return keyInfo;
  29. }
  30. }
  31. PulsarClient pulsarClient = PulsarClient.create("http://localhost:8080");
  32. ProducerConfiguration prodConf = new ProducerConfiguration();
  33. prodConf.setCryptoKeyReader(new RawFileKeyReader("test_ecdsa_pubkey.pem", "test_ecdsa_privkey.pem"));
  34. prodConf.addEncryptionKey("myappkey");
  35. Producer producer = pulsarClient.createProducer("persistent://my-tenant/my-ns/my-topic", prodConf);
  36. for (int i = 0; i < 10; i++) {
  37. producer.send("my-message".getBytes());
  38. }
  39. pulsarClient.close();
  1. 简单的消费者示例
  1. class RawFileKeyReader implements CryptoKeyReader {
  2. String publicKeyFile = "";
  3. String privateKeyFile = "";
  4. RawFileKeyReader(String pubKeyFile, String privKeyFile) {
  5. publicKeyFile = pubKeyFile;
  6. privateKeyFile = privKeyFile;
  7. }
  8. @Override
  9. public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
  10. EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
  11. try {
  12. keyInfo.setKey(Files.readAllBytes(Paths.get(publicKeyFile)));
  13. } catch (IOException e) {
  14. System.out.println("ERROR: Failed to read public key from file " + publicKeyFile);
  15. e.printStackTrace();
  16. }
  17. return keyInfo;
  18. }
  19. @Override
  20. public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
  21. EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
  22. try {
  23. keyInfo.setKey(Files.readAllBytes(Paths.get(privateKeyFile)));
  24. } catch (IOException e) {
  25. System.out.println("ERROR: Failed to read private key from file " + privateKeyFile);
  26. e.printStackTrace();
  27. }
  28. return keyInfo;
  29. }
  30. }
  31. ConsumerConfiguration consConf = new ConsumerConfiguration();
  32. consConf.setCryptoKeyReader(new RawFileKeyReader("test_ecdsa_pubkey.pem", "test_ecdsa_privkey.pem"));
  33. PulsarClient pulsarClient = PulsarClient.create("http://localhost:8080");
  34. Consumer consumer = pulsarClient.subscribe("persistent://my-tenant//my-ns/my-topic", "my-subscriber-name", consConf);
  35. Message msg = null;
  36. for (int i = 0; i < 10; i++) {
  37. msg = consumer.receive();
  38. // do something
  39. System.out.println("Received: " + new String(msg.getData()));
  40. }
  41. // Acknowledge the consumption of all messages at once
  42. consumer.acknowledgeCumulative(msg);
  43. pulsarClient.close();

秘钥更新

Pulsar 每四个小时或者发送了一批消息后会生成新的 AES 数据秘钥。 生产者通过调用 CryptoKeyReader::getPublicKey() 检索最新版本,每4小时自动获取非对称公钥。

生产者程序启用加密:

如果生产和消费的程序不是同一个, 你需要确保消费者能够访问能够解密消息的私钥。 This can be done in two ways:

  1. The consumer application provides you access to their public key, which you add to your producer keys
  2. 你可以授权对 producer 使用的某个秘钥的访问权。

在某些情况下,生产者可能想要用多个密钥加密消息。 For this, add all such keys to the config. Consumer will be able to decrypt the message, as long as it has access to at least one of the keys.

例如:如果消息需要使用如下两个秘钥加密: myapp.messagekey1 和 myapp.messagekey2,

  1. conf.addEncryptionKey("myapp.messagekey1");
  2. conf.addEncryptionKey("myapp.messagekey2");

消费者程序启用解密:

消费者必须能够获得生产者用来加密消息的所有私钥的其中一个,才能解密消息。 如果你想接收加密消息,需要先创建公钥和私钥,然后把公钥给生产者应用程序,生产者使用该公钥加密消息。

失败处理:

  • 生产者/消费者无法访问秘钥
    • 生产者将会操作失败,并提示失败的原因。 在这种情况下,应用可以选择继续发送未加密的消息。 通过配置 conf.setCryptoFailureAction(ProducerCryptoFailureAction) 来控制失败后的后续动作。 默认的行为是请求失败。
    • 如果消费者因为解密失败或者丢失秘钥导致消费失败,应用程序可以选择消费加密的消息或者丢弃它。 通过配置 conf.setCryptoFailureAction(ConsumerCryptoFailureAction) 来控制失败后的后续动作。 默认的行为是请求失败。 如果私钥永久丢失,应用将永远不能解密消息。
  • 批量消息
    • 如果消息包含有批量消息,并且解密失败,客户端将会无法在在批次中搜索单独的消息。即使将 conf.setCryptoFailureAction() 设置为 CONSUME,消息消费还是会失败。
  • 如果解密失败,消息消费将会停止,应用程序会在客户端日志中记录解密失败的信息,并且会观察到积压的增长。 如果应用无法使用秘钥去解密消息,唯一的选项是跳过/丢弃已经积压的消息。