Quarkus - Amazon SNS Client

Amazon Simple Notification Service (SNS) is a highly available and fully managed pub/sub messaging service. It provides topics for high-throughput, push-based, many-to-many messaging. Messages can fan out to a large number of subscriber endpoints for parallel processing, including Amazon SQS queues, AWS Lambda functions, and HTTP/S webhooks. Additionally, SNS can be used to fan out notifications to end users using mobile push, SMS and email.

You can find more information about SNS at the Amazon SNS website.

The SNS extension is based on AWS Java SDK 2.x. It’s a major rewrite of the 1.x code base that offers two programming models (Blocking & Async).

This technology is considered preview.

In preview, backward compatibility and presence in the ecosystem is not guaranteed. Specific improvements might require to change configuration or APIs and plans to become stable are under way. Feedback is welcome on our mailing list or as issues in our GitHub issue tracker.

For a full list of possible extension statuses, check our FAQ entry.

The Quarkus extension supports two programming models:

  • Blocking access using URL Connection HTTP client (by default) or the Apache HTTP Client

  • Asynchronous programming based on JDK’s CompletableFuture objects and the Netty HTTP client.

In this guide, we see how you can get your REST services to use SNS locally and on AWS.

Prerequisites

To complete this guide, you need:

  • JDK 1.8+ installed with JAVA_HOME configured appropriately

  • an IDE

  • Apache Maven 3.6.2+

  • An AWS Account to access the SNS service

  • Optionally, Docker for your system to run SNS locally for testing purposes

Set up SNS locally

The easiest way to start working with SNS is to run a local instance as a container.

  1. docker run -it --publish 8009:4575 -e SERVICES=sns -e START_WEB=0 localstack/localstack:0.11.1

This starts a SNS instance that is accessible on port 8009.

Create an AWS profile for your local instance using AWS CLI:

  1. $ aws configure --profile localstack
  2. AWS Access Key ID [None]: test-key
  3. AWS Secret Access Key [None]: test-secret
  4. Default region name [None]: us-east-1
  5. Default output format [None]:

Create a SNS topic

Create a SNS topic using AWS CLI and store in TOPIC_ARN environment variable

  1. TOPIC_ARN=`aws sns create-topic --name=QuarksCollider --profile localstack --endpoint-url=http://localhost:8009`

If you want to run the demo using SNS on your AWS account, you can create a topic using AWS default profile

  1. TOPIC_ARN=`aws sns create-topic --name=QuarksCollider`

Solution

The application built here allows to shoot elementary particles (quarks) into a QuarksCollider topic of the AWS SNS. Additionally, we create a resource that allows to subscribe to the QuarksCollider topic in order to receive published quarks.

We recommend that you follow the instructions in the next sections and create the application step by step. However, you can go right to the completed example.

Clone the Git repository: git clone [https://github.com/quarkusio/quarkus-quickstarts.git](https://github.com/quarkusio/quarkus-quickstarts.git), or download an archive.

The solution is located in the amazon-sns-quickstart directory.

Creating the Maven project

First, we need a new project. Create a new project with the following command:

  1. mvn io.quarkus:quarkus-maven-plugin:1.7.6.Final:create \
  2. -DprojectGroupId=org.acme \
  3. -DprojectArtifactId=amazon-sns-quickstart \
  4. -DclassName="org.acme.sns.QuarksCannonSyncResource" \
  5. -Dpath="/sync-cannon" \
  6. -Dextensions="resteasy-jsonb,amazon-sns,resteasy-mutiny"
  7. cd amazon-sns-quickstart

This command generates a Maven structure importing the RESTEasy/JAX-RS, Mutiny and Amazon SNS Client extensions. After this, the amazon-sns extension has been added to your pom.xml as well as the Mutiny support for RESTEasy.

Creating JSON REST service

In this example, we will create an application that allows to publish quarks. The example application will demonstrate the two programming models supported by the extension.

First, let’s create the Quark bean as follows:

  1. package org.acme.sns.model;
  2. import io.quarkus.runtime.annotations.RegisterForReflection;
  3. import java.util.Objects;
  4. @RegisterForReflection
  5. public class Quark {
  6. private String flavor;
  7. private String spin;
  8. public Quark() {
  9. }
  10. public String getFlavor() {
  11. return flavor;
  12. }
  13. public void setFlavor(String flavor) {
  14. this.flavor = flavor;
  15. }
  16. public String getSpin() {
  17. return spin;
  18. }
  19. public void setSpin(String spin) {
  20. this.spin = spin;
  21. }
  22. @Override
  23. public boolean equals(Object obj) {
  24. if (!(obj instanceof Quark)) {
  25. return false;
  26. }
  27. Quark other = (Quark) obj;
  28. return Objects.equals(other.flavor, this.flavor);
  29. }
  30. @Override
  31. public int hashCode() {
  32. return Objects.hash(this.flavor);
  33. }
  34. }

Then, create a org.acme.sns.QuarksCannonSyncResource that will provide an API to shoot quarks into the SNS topic via the SNS synchronous client.

  1. package org.acme.sns;
  2. import com.fasterxml.jackson.databind.ObjectMapper;
  3. import com.fasterxml.jackson.databind.ObjectWriter;
  4. import javax.inject.Inject;
  5. import javax.ws.rs.Consumes;
  6. import javax.ws.rs.POST;
  7. import javax.ws.rs.Path;
  8. import javax.ws.rs.Produces;
  9. import javax.ws.rs.core.MediaType;
  10. import javax.ws.rs.core.Response;
  11. import org.acme.sns.model.Quark;
  12. import org.eclipse.microprofile.config.inject.ConfigProperty;
  13. import org.jboss.logging.Logger;
  14. import software.amazon.awssdk.services.sns.SnsClient;
  15. import software.amazon.awssdk.services.sns.model.PublishResponse;
  16. @Path("/sync/cannon")
  17. @Produces(MediaType.TEXT_PLAIN)
  18. public class QuarksCannonSyncResource {
  19. private static final Logger LOGGER = Logger.getLogger(QuarksCannonSyncResource.class);
  20. @Inject
  21. SnsClient sns;
  22. @ConfigProperty(name = "topic.arn")
  23. String topicArn;
  24. static ObjectWriter QUARK_WRITER = new ObjectMapper().writerFor(Quark.class);
  25. @POST
  26. @Path("/shoot")
  27. @Consumes(MediaType.APPLICATION_JSON)
  28. public Response publish(Quark quark) throws Exception {
  29. String message = QUARK_WRITER.writeValueAsString(quark);
  30. PublishResponse response = sns.publish(p -> p.topicArn(topicArn).message(message));
  31. LOGGER.infov("Fired Quark[{0}, {1}}]", quark.getFlavor(), quark.getSpin());
  32. return Response.ok().entity(response.messageId()).build();
  33. }
  34. }

Because of the fact that messages published must be simply a String we’re using Jackson’s ObjectWriter in order to serialize our Quark objects into a String.

The missing piece is the subscriber that will receive the messages published to our topic. Before implementing subscribers, we need to define POJO classes representing messages posted by the AWS SNS.

Let’s create two classes that represent SNS Notification and SNS Subscription Confirmation messages based on the AWS SNS Message and JSON formats

Create org.acme.sns.model.SnsNotification class

  1. package org.acme.sns.model;
  2. import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
  3. import com.fasterxml.jackson.annotation.JsonProperty;
  4. @JsonIgnoreProperties(ignoreUnknown = true)
  5. public class SnsNotification {
  6. @JsonProperty("Message")
  7. private String message;
  8. @JsonProperty("MessageId")
  9. private String messageId;
  10. @JsonProperty("Signature")
  11. private String signature;
  12. @JsonProperty("SignatureVersion")
  13. private String signatureVersion;
  14. @JsonProperty("SigningCertURL")
  15. private String signinCertUrl;
  16. @JsonProperty("Subject")
  17. private String subject;
  18. @JsonProperty("Timestamp")
  19. private String timestamp;
  20. @JsonProperty("TopicArn")
  21. private String topicArn;
  22. @JsonProperty("Type")
  23. private String type;
  24. @JsonProperty("UnsubscribeURL")
  25. private String unsubscribeURL;
  26. public String getMessage() {
  27. return message;
  28. }
  29. public void setMessage(String message) {
  30. this.message = message;
  31. }
  32. public String getMessageId() {
  33. return messageId;
  34. }
  35. public void setMessageId(String messageId) {
  36. this.messageId = messageId;
  37. }
  38. public String getSignature() {
  39. return signature;
  40. }
  41. public void setSignature(String signature) {
  42. this.signature = signature;
  43. }
  44. public String getSignatureVersion() {
  45. return signatureVersion;
  46. }
  47. public void setSignatureVersion(String signatureVersion) {
  48. this.signatureVersion = signatureVersion;
  49. }
  50. public String getSigninCertUrl() {
  51. return signinCertUrl;
  52. }
  53. public void setSigninCertUrl(String signinCertUrl) {
  54. this.signinCertUrl = signinCertUrl;
  55. }
  56. public String getSubject() {
  57. return subject;
  58. }
  59. public void setSubject(String subject) {
  60. this.subject = subject;
  61. }
  62. public String getTimestamp() {
  63. return timestamp;
  64. }
  65. public void setTimestamp(String timestamp) {
  66. this.timestamp = timestamp;
  67. }
  68. public String getTopicArn() {
  69. return topicArn;
  70. }
  71. public void setTopicArn(String topicArn) {
  72. this.topicArn = topicArn;
  73. }
  74. public String getType() {
  75. return type;
  76. }
  77. public void setType(String type) {
  78. this.type = type;
  79. }
  80. public String getUnsubscribeURL() {
  81. return unsubscribeURL;
  82. }
  83. public void setUnsubscribeURL(String unsubscribeURL) {
  84. this.unsubscribeURL = unsubscribeURL;
  85. }
  86. }

Then, create org.acme.sns.SnsSubscriptionConfirmation

  1. package org.acme.sns.model;
  2. import com.fasterxml.jackson.annotation.JsonProperty;
  3. public class SnsSubscriptionConfirmation {
  4. @JsonProperty("Message")
  5. private String message;
  6. @JsonProperty("MessageId")
  7. private String messageId;
  8. @JsonProperty("Signature")
  9. private String signature;
  10. @JsonProperty("SignatureVersion")
  11. private String signatureVersion;
  12. @JsonProperty("SigningCertURL")
  13. private String signingCertUrl;
  14. @JsonProperty("SubscribeURL")
  15. private String subscribeUrl;
  16. @JsonProperty("Timestamp")
  17. private String timestamp;
  18. @JsonProperty("Token")
  19. private String token;
  20. @JsonProperty("TopicArn")
  21. private String topicArn;
  22. @JsonProperty("Type")
  23. private String type;
  24. public String getMessage() {
  25. return message;
  26. }
  27. public void setMessage(String message) {
  28. this.message = message;
  29. }
  30. public String getMessageId() {
  31. return messageId;
  32. }
  33. public void setMessageId(String messageId) {
  34. this.messageId = messageId;
  35. }
  36. public String getSignature() {
  37. return signature;
  38. }
  39. public void setSignature(String signature) {
  40. this.signature = signature;
  41. }
  42. public String getSignatureVersion() {
  43. return signatureVersion;
  44. }
  45. public void setSignatureVersion(String signatureVersion) {
  46. this.signatureVersion = signatureVersion;
  47. }
  48. public String getSigningCertUrl() {
  49. return signingCertUrl;
  50. }
  51. public void setSigningCertUrl(String signingCertUrl) {
  52. this.signingCertUrl = signingCertUrl;
  53. }
  54. public String getSubscribeUrl() {
  55. return subscribeUrl;
  56. }
  57. public void setSubscribeUrl(String subscribeUrl) {
  58. this.subscribeUrl = subscribeUrl;
  59. }
  60. public String getTimestamp() {
  61. return timestamp;
  62. }
  63. public void setTimestamp(String timestamp) {
  64. this.timestamp = timestamp;
  65. }
  66. public String getToken() {
  67. return token;
  68. }
  69. public void setToken(String token) {
  70. this.token = token;
  71. }
  72. public String getTopicArn() {
  73. return topicArn;
  74. }
  75. public void setTopicArn(String topicArn) {
  76. this.topicArn = topicArn;
  77. }
  78. public String getType() {
  79. return type;
  80. }
  81. public void setType(String type) {
  82. this.type = type;
  83. }
  84. }

Now, create org.acme.QuarksShieldSyncResource REST resources that: - Allows to subscribe itself to our SNS topic - Unsubscribe from the SNS topic - Receive notifications from the subscribed SNS topic

Keep in mind that AWS SNS supports multiple types of subscribers (that is web servers, email addresses, AWS SQS queues, AWS Lambda functions, and many more), but for the sake of the quickstart we will show how to subscribe an HTTP endpoint served by our application.
  1. package org.acme.sns;
  2. import com.fasterxml.jackson.core.JsonProcessingException;
  3. import com.fasterxml.jackson.databind.ObjectMapper;
  4. import com.fasterxml.jackson.databind.ObjectReader;
  5. import java.util.HashMap;
  6. import java.util.Map;
  7. import javax.inject.Inject;
  8. import javax.ws.rs.Consumes;
  9. import javax.ws.rs.HeaderParam;
  10. import javax.ws.rs.POST;
  11. import javax.ws.rs.Path;
  12. import javax.ws.rs.core.MediaType;
  13. import javax.ws.rs.core.Response;
  14. import org.acme.sns.model.Quark;
  15. import org.acme.sns.model.SnsNotification;
  16. import org.acme.sns.model.SnsSubscriptionConfirmation;
  17. import org.eclipse.microprofile.config.inject.ConfigProperty;
  18. import org.jboss.logging.Logger;
  19. import software.amazon.awssdk.services.sns.SnsClient;
  20. import software.amazon.awssdk.services.sns.model.SubscribeResponse;
  21. @Path("/sync/shield")
  22. public class QuarksShieldSyncResource {
  23. private static final Logger LOGGER = Logger.getLogger(QuarksShieldSyncResource.class);
  24. private static final String NOTIFICATION_TYPE = "Notification";
  25. private static final String SUBSCRIPTION_CONFIRMATION_TYPE = "SubscriptionConfirmation";
  26. private static final String UNSUBSCRIPTION_CONFIRMATION_TYPE = "UnsubscribeConfirmation";
  27. @Inject
  28. SnsClient sns;
  29. @ConfigProperty(name = "topic.arn")
  30. String topicArn;
  31. @ConfigProperty(name = "quarks.shield.base.url")
  32. String quarksShieldBaseUrl;
  33. private volatile String subscriptionArn;
  34. static Map<Class<?>, ObjectReader> READERS = new HashMap<>();
  35. static {
  36. READERS.put(SnsNotification.class, new ObjectMapper().readerFor(SnsNotification.class));
  37. READERS.put(SnsSubscriptionConfirmation.class, new ObjectMapper().readerFor(SnsSubscriptionConfirmation.class));
  38. READERS.put(Quark.class, new ObjectMapper().readerFor(Quark.class));
  39. }
  40. @POST
  41. @Consumes({MediaType.TEXT_PLAIN})
  42. public Response notificationEndpoint(@HeaderParam("x-amz-sns-message-type") String messageType, String message) throws JsonProcessingException {
  43. if (messageType == null) {
  44. return Response.status(400).build();
  45. }
  46. if (messageType.equals(NOTIFICATION_TYPE)) {
  47. SnsNotification notification = readObject(SnsNotification.class, message);
  48. Quark quark = readObject(Quark.class, notification.getMessage());
  49. LOGGER.infov("Quark[{0}, {1}] collision with the shield.", quark.getFlavor(), quark.getSpin());
  50. } else if (messageType.equals(SUBSCRIPTION_CONFIRMATION_TYPE)) {
  51. SnsSubscriptionConfirmation subConf = readObject(SnsSubscriptionConfirmation.class, message);
  52. sns.confirmSubscription(cs -> cs.topicArn(topicArn).token(subConf.getToken()));
  53. LOGGER.info("Subscription confirmed. Ready for quarks collisions.");
  54. } else if (messageType.equals(UNSUBSCRIPTION_CONFIRMATION_TYPE)) {
  55. LOGGER.info("We are unsubscribed");
  56. } else {
  57. return Response.status(400).entity("Unknown messageType").build();
  58. }
  59. return Response.ok().build();
  60. }
  61. @POST
  62. @Path("/subscribe")
  63. public Response subscribe() {
  64. String notificationEndpoint = notificationEndpoint();
  65. SubscribeResponse response = sns.subscribe(s -> s.topicArn(topicArn).protocol("http").endpoint(notificationEndpoint));
  66. subscriptionArn = response.subscriptionArn();
  67. LOGGER.infov("Subscribed Quarks shield <{0}> : {1} ", notificationEndpoint, response.subscriptionArn());
  68. return Response.ok().entity(response.subscriptionArn()).build();
  69. }
  70. @POST
  71. @Path("/unsubscribe")
  72. public Response unsubscribe() {
  73. if (subscriptionArn != null) {
  74. sns.unsubscribe(s -> s.subscriptionArn(subscriptionArn));
  75. LOGGER.infov("Unsubscribed quarks shield for id = {0}", subscriptionArn);
  76. return Response.ok().build();
  77. } else {
  78. LOGGER.info("Not subscribed yet");
  79. return Response.status(400).entity("Not subscribed yet").build();
  80. }
  81. }
  82. private String notificationEndpoint() {
  83. return quarksShieldBaseUrl + "/sync/shield";
  84. }
  85. private <T> T readObject(Class<T> clazz, String message) {
  86. T object = null;
  87. try {
  88. object = READERS.get(clazz).readValue(message);
  89. } catch (JsonProcessingException e) {
  90. LOGGER.errorv("Unable to deserialize message <{0}> to Class <{1}>", message, clazz.getSimpleName());
  91. throw new RuntimeException(e);
  92. }
  93. return object;
  94. }
  95. }
  1. subscribe() endpoint subscribes to our topic by providing the URL to the POST endpoint receiving SNS notification requests.

  2. unsubscribe() simply removes our subscription, so any messages published to the topic will not be routed to our endpoint anymore

  3. notificationEndpoint() is called by SNS on new message if endpoint is subscribed. See Amazon SNS message and JSON formats for details about the format of the messages SNS can submit.

Configuring SNS clients

Both SNS clients (sync and async) are configurable via the application.properties file that can be provided in the src/main/resources directory. Additionally, you need to add to the classpath a proper implementation of the sync client. By default the extension uses URL connection HTTP client, so you need to add a URL connection client dependency to the pom.xml file:

  1. <dependency>
  2. <groupId>software.amazon.awssdk</groupId>
  3. <artifactId>url-connection-client</artifactId>
  4. </dependency>

If you want to use Apache HTTP client instead, configure it as follows:

  1. quarkus.sns.sync-client.type=apache

And add the following dependency to the application pom.xml:

  1. <dependency>
  2. <groupId>software.amazon.awssdk</groupId>
  3. <artifactId>apache-client</artifactId>
  4. </dependency>

If you’re going to use a local SNS instance, configure it as follows:

  1. quarkus.sns.endpoint-override=http://localhost:8009
  2. quarkus.sns.aws.region=us-east-1
  3. quarkus.sns.aws.credentials.type=static
  4. quarkus.sns.aws.credentials.static-provider.access-key-id=test-key
  5. quarkus.sns.aws.credentials.static-provider.secret-access-key=test-secret
  • quarkus.sns.aws.region - It’s required by the client, but since you’re using a local SNS instance you can pick any valid AWS region.

  • quarkus.sns.aws.credentials.type - Set static credentials provider with any values for access-key-id and secret-access-key

  • quarkus.sns.endpoint-override - Override the SNS client to use a local instance instead of an AWS service

If you want to work with an AWS account, you’d need to set it with:

  1. quarkus.sns.aws.region=<YOUR_REGION>
  2. quarkus.sns.aws.credentials.type=default
  • quarkus.sns.aws.region you should set it to the region where you provisioned the SNS table,

  • quarkus.sns.aws.credentials.type - use the default credentials provider chain that looks for credentials in this order:

    • Java System Properties - aws.accessKeyId and aws.secretAccessKey

    • Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY

    • Credential profiles file at the default location (~/.aws/credentials) shared by all AWS SDKs and the AWS CLI

    • Credentials delivered through the Amazon ECS if the AWS_CONTAINER_CREDENTIALS_RELATIVE_URI environment variable is set and the security manager has permission to access the variable,

    • Instance profile credentials delivered through the Amazon EC2 metadata service

Next steps

Packaging

Packaging your application is as simple as ./mvnw clean package. It can be run with java -Dtopic.arn=$TOPIC_ARN -jar target/amazon-sns-quickstart-1.0-SNAPSHOT-runner.jar.

With GraalVM installed, you can also create a native executable binary: ./mvnw clean package -Dnative. Depending on your system, that will take some time.

Going asynchronous

Thanks to the AWS SDK v2.x used by the Quarkus extension, you can use the asynchronous programming model out of the box.

Create a org.acme.sns.QuarksCannonAsyncResource REST resource that will be similar to our QuarksCannonSyncResource but using an asynchronous programming model.

  1. package org.acme.sns;
  2. import com.fasterxml.jackson.databind.ObjectMapper;
  3. import com.fasterxml.jackson.databind.ObjectWriter;
  4. import io.smallrye.mutiny.Uni;
  5. import javax.inject.Inject;
  6. import javax.ws.rs.Consumes;
  7. import javax.ws.rs.POST;
  8. import javax.ws.rs.Path;
  9. import javax.ws.rs.Produces;
  10. import javax.ws.rs.core.MediaType;
  11. import javax.ws.rs.core.Response;
  12. import org.acme.sns.model.Quark;
  13. import org.eclipse.microprofile.config.inject.ConfigProperty;
  14. import org.jboss.logging.Logger;
  15. import software.amazon.awssdk.services.sns.SnsAsyncClient;
  16. import software.amazon.awssdk.services.sns.model.PublishResponse;
  17. @Path("/async/cannon")
  18. @Produces(MediaType.APPLICATION_JSON)
  19. @Consumes(MediaType.APPLICATION_JSON)
  20. public class QuarksCannonAsyncResource {
  21. private static final Logger LOGGER = Logger.getLogger(QuarksCannonAsyncResource.class);
  22. @Inject
  23. SnsAsyncClient sns;
  24. @ConfigProperty(name = "topic.arn")
  25. String topicArn;
  26. static ObjectWriter QUARK_WRITER = new ObjectMapper().writerFor(Quark.class);
  27. @POST
  28. @Path("/shoot")
  29. @Consumes(MediaType.APPLICATION_JSON)
  30. public Uni<Response> publish(Quark quark) throws Exception {
  31. String message = QUARK_WRITER.writeValueAsString(quark);
  32. return Uni.createFrom()
  33. .completionStage(sns.publish(p -> p.topicArn(topicArn).message(message)))
  34. .onItem().invoke(item -> LOGGER.infov("Fired Quark[{0}, {1}}]", quark.getFlavor(), quark.getSpin()))
  35. .onItem().transform(PublishResponse::messageId)
  36. .onItem().transform(id -> Response.ok().entity(id).build());
  37. }
  38. }

We create Uni instances from the CompletionStage objects returned by the asynchronous SNS client, and then transform the emitted item.

And corresponding async subscriber org.acme.sns.QuarksShieldAsyncResource

  1. package org.acme.sns;
  2. import com.fasterxml.jackson.core.JsonProcessingException;
  3. import com.fasterxml.jackson.databind.ObjectMapper;
  4. import com.fasterxml.jackson.databind.ObjectReader;
  5. import io.smallrye.mutiny.Uni;
  6. import java.util.HashMap;
  7. import java.util.Map;
  8. import javax.inject.Inject;
  9. import javax.ws.rs.Consumes;
  10. import javax.ws.rs.HeaderParam;
  11. import javax.ws.rs.POST;
  12. import javax.ws.rs.Path;
  13. import javax.ws.rs.core.MediaType;
  14. import javax.ws.rs.core.Response;
  15. import org.acme.sns.model.Quark;
  16. import org.acme.sns.model.SnsNotification;
  17. import org.acme.sns.model.SnsSubscriptionConfirmation;
  18. import org.eclipse.microprofile.config.inject.ConfigProperty;
  19. import org.jboss.logging.Logger;
  20. import software.amazon.awssdk.services.sns.SnsAsyncClient;
  21. import software.amazon.awssdk.services.sns.model.SubscribeResponse;
  22. @Path("/async/shield")
  23. public class QuarksShieldAsyncResource {
  24. private static final Logger LOGGER = Logger.getLogger(QuarksShieldAsyncResource.class);
  25. private static final String NOTIFICATION_TYPE = "Notification";
  26. private static final String SUBSCRIPTION_CONFIRMATION_TYPE = "SubscriptionConfirmation";
  27. private static final String UNSUBSCRIPTION_CONFIRMATION_TYPE = "UnsubscribeConfirmation";
  28. @Inject
  29. SnsAsyncClient sns;
  30. @ConfigProperty(name = "topic.arn")
  31. String topicArn;
  32. @ConfigProperty(name = "quarks.shield.base.url")
  33. String quarksShieldBaseUrl;
  34. private volatile String subscriptionArn;
  35. static Map<Class<?>, ObjectReader> READERS = new HashMap<>();
  36. static {
  37. READERS.put(SnsNotification.class, new ObjectMapper().readerFor(SnsNotification.class));
  38. READERS.put(SnsSubscriptionConfirmation.class, new ObjectMapper().readerFor(SnsSubscriptionConfirmation.class));
  39. READERS.put(Quark.class, new ObjectMapper().readerFor(Quark.class));
  40. }
  41. @POST
  42. @Consumes({MediaType.TEXT_PLAIN})
  43. public Uni<Response> notificationEndpoint(@HeaderParam("x-amz-sns-message-type") String messageType, String message) {
  44. if (messageType == null) {
  45. return Uni.createFrom().item(Response.status(400).build());
  46. }
  47. if (messageType.equals(NOTIFICATION_TYPE)) {
  48. return Uni.createFrom().item(readObject(SnsNotification.class, message))
  49. .onItem().transform(notification -> readObject(Quark.class, notification.getMessage()))
  50. .onItem().invoke(quark -> LOGGER.infov("Quark[{0}, {1}] collision with the shield.", quark.getFlavor(), quark.getSpin()))
  51. .onItem().transform(quark -> Response.ok().build());
  52. } else if (messageType.equals(SUBSCRIPTION_CONFIRMATION_TYPE)) {
  53. return Uni.createFrom().item(readObject(SnsSubscriptionConfirmation.class, message))
  54. .onItem().transformToUni(msg ->
  55. Uni.createFrom().completionStage(
  56. sns.confirmSubscription(confirm -> confirm.topicArn(topicArn).token(msg.getToken())))
  57. )
  58. .onItem().invoke(resp -> LOGGER.info("Subscription confirmed. Ready for quarks collisions."))
  59. .onItem().transform(resp -> Response.ok().build());
  60. } else if (messageType.equals(UNSUBSCRIPTION_CONFIRMATION_TYPE)) {
  61. LOGGER.info("We are unsubscribed");
  62. return Uni.createFrom().item(Response.ok().build());
  63. }
  64. return Uni.createFrom().item(Response.status(400).entity("Unknown messageType").build());
  65. }
  66. @POST
  67. @Path("/subscribe")
  68. public Uni<Response> subscribe() {
  69. return Uni.createFrom()
  70. .completionStage(sns.subscribe(s -> s.topicArn(topicArn).protocol("http").endpoint(notificationEndpoint())))
  71. .onItem().transform(SubscribeResponse::subscriptionArn)
  72. .onItem().invoke(this::setSubscriptionArn)
  73. .onItem().invoke(arn -> LOGGER.infov("Subscribed Quarks shield with id = {0} ", arn))
  74. .onItem().transform(arn -> Response.ok().entity(arn).build());
  75. }
  76. @POST
  77. @Path("/unsubscribe")
  78. public Uni<Response> unsubscribe() {
  79. if (subscriptionArn != null) {
  80. return Uni.createFrom()
  81. .completionStage(sns.unsubscribe(s -> s.subscriptionArn(subscriptionArn)))
  82. .onItem().invoke(arn -> LOGGER.infov("Unsubscribed quarks shield for id = {0}", subscriptionArn))
  83. .onItem().invoke(arn -> subscriptionArn = null)
  84. .onItem().transform(arn -> Response.ok().build());
  85. } else {
  86. return Uni.createFrom().item(Response.status(400).entity("Not subscribed yet").build());
  87. }
  88. }
  89. private String notificationEndpoint() {
  90. return quarksShieldBaseUrl + "/async/shield";
  91. }
  92. private void setSubscriptionArn(String arn) {
  93. this.subscriptionArn = arn;
  94. }
  95. private <T> T readObject(Class<T> clazz, String message) {
  96. T object = null;
  97. try {
  98. object = READERS.get(clazz).readValue(message);
  99. } catch (JsonProcessingException e) {
  100. LOGGER.errorv("Unable to deserialize message <{0}> to Class <{1}>", message, clazz.getSimpleName());
  101. throw new RuntimeException(e);
  102. }
  103. return object;
  104. }
  105. }

And we need to add Netty HTTP client dependency to the pom.xml:

  1. <dependency>
  2. <groupId>software.amazon.awssdk</groupId>
  3. <artifactId>netty-nio-client</artifactId>
  4. </dependency>

Configuration Reference

About the Duration format

The format for durations uses the standard java.time.Duration format. You can learn more about it in the Duration#parse() javadoc.

You can also provide duration values starting with a number. In this case, if the value consists only of a number, the converter treats the value as seconds. Otherwise, PT is implicitly prepended to the value to obtain a standard java.time.Duration format.

About the MemorySize format

A size configuration option recognises string in this format (shown as a regular expression): [0-9]+[KkMmGgTtPpEeZzYy]?. If no suffix is given, assume bytes.