Wildcard Subscriptions

There is no special code to subscribe with a wildcard subject. Wildcards are a normal part of the subject name. However, it is a common technique to use the subject provided with the incoming message to determine what to do with the message.

For example, you can subscribe using * and then act based on the actual subject.

{% tabs %} {% tab title=”Go” %}

  1. nc, err := nats.Connect("demo.nats.io")
  2. if err != nil {
  3. log.Fatal(err)
  4. }
  5. defer nc.Close()
  6. // Use a WaitGroup to wait for 2 messages to arrive
  7. wg := sync.WaitGroup{}
  8. wg.Add(2)
  9. // Subscribe
  10. if _, err := nc.Subscribe("time.*.east", func(m *nats.Msg) {
  11. log.Printf("%s: %s", m.Subject, m.Data)
  12. wg.Done()
  13. }); err != nil {
  14. log.Fatal(err)
  15. }
  16. // Wait for the 2 messages to come in
  17. wg.Wait()

{% endtab %}

{% tab title=”Java” %}

  1. Connection nc = Nats.connect("nats://demo.nats.io:4222");
  2. // Use a latch to wait for 2 messages to arrive
  3. CountDownLatch latch = new CountDownLatch(2);
  4. // Create a dispatcher and inline message handler
  5. Dispatcher d = nc.createDispatcher((msg) -> {
  6. String subject = msg.getSubject();
  7. String str = new String(msg.getData(), StandardCharsets.UTF_8);
  8. System.out.println(subject + ": " + str);
  9. latch.countDown();
  10. });
  11. // Subscribe
  12. d.subscribe("time.*.east");
  13. // Wait for messages to come in
  14. latch.await();
  15. // Close the connection
  16. nc.close();

{% endtab %}

{% tab title=”JavaScript” %}

  1. nc.subscribe("time.us.*", (_err, msg) => {
  2. // converting timezones correctly in node requires a library
  3. // this doesn't take into account *many* things.
  4. let time;
  5. switch (msg.subject) {
  6. case "time.us.east":
  7. time = new Date().toLocaleTimeString("en-us", {
  8. timeZone: "America/New_York",
  9. });
  10. break;
  11. case "time.us.central":
  12. time = new Date().toLocaleTimeString("en-us", {
  13. timeZone: "America/Chicago",
  14. });
  15. break;
  16. case "time.us.mountain":
  17. time = new Date().toLocaleTimeString("en-us", {
  18. timeZone: "America/Denver",
  19. });
  20. break;
  21. case "time.us.west":
  22. time = new Date().toLocaleTimeString("en-us", {
  23. timeZone: "America/Los_Angeles",
  24. });
  25. break;
  26. default:
  27. time = "I don't know what you are talking about Willis";
  28. }
  29. t.log(subject, time);
  30. });

{% endtab %}

{% tab title=”Python” %}

  1. nc = NATS()
  2. await nc.connect(servers=["nats://demo.nats.io:4222"])
  3. # Use queue to wait for 2 messages to arrive
  4. queue = asyncio.Queue()
  5. async def cb(msg):
  6. await queue.put_nowait(msg)
  7. await nc.subscribe("time.*.east", cb=cb)
  8. # Send 2 messages and wait for them to come in
  9. await nc.publish("time.A.east", b'A')
  10. await nc.publish("time.B.east", b'B')
  11. msg_A = await queue.get()
  12. msg_B = await queue.get()
  13. print("Msg A:", msg_A)
  14. print("Msg B:", msg_B)

{% endtab %}

{% tab title=”Ruby” %}

  1. require 'nats/client'
  2. require 'fiber'
  3. NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
  4. Fiber.new do
  5. f = Fiber.current
  6. nc.subscribe("time.*.east") do |msg, reply|
  7. f.resume Time.now
  8. end
  9. nc.publish("time.A.east", "A")
  10. nc.publish("time.B.east", "B")
  11. # Use the response
  12. msg_A = Fiber.yield
  13. puts "Msg A: #{msg_A}"
  14. msg_B = Fiber.yield
  15. puts "Msg B: #{msg_B}"
  16. end.resume
  17. end

{% endtab %}

{% tab title=”C” %}

  1. static void
  2. onMsg(natsConnection *conn, natsSubscription *sub, natsMsg *msg, void *closure)
  3. {
  4. printf("Received msg: %s - %.*s\n",
  5. natsMsg_GetSubject(msg),
  6. natsMsg_GetDataLength(msg),
  7. natsMsg_GetData(msg));
  8. // Need to destroy the message!
  9. natsMsg_Destroy(msg);
  10. }
  11. (...)
  12. natsConnection *conn = NULL;
  13. natsSubscription *sub = NULL;
  14. natsStatus s;
  15. s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
  16. if (s == NATS_OK)
  17. s = natsConnection_Subscribe(&sub, conn, "time.*.east", onMsg, NULL);
  18. (...)
  19. // Destroy objects that were created
  20. natsSubscription_Destroy(sub);
  21. natsConnection_Destroy(conn);

{% endtab %} {% endtabs %}

or do something similar with >:

{% tabs %} {% tab title=”Go” %}

  1. nc, err := nats.Connect("demo.nats.io")
  2. if err != nil {
  3. log.Fatal(err)
  4. }
  5. defer nc.Close()
  6. // Use a WaitGroup to wait for 4 messages to arrive
  7. wg := sync.WaitGroup{}
  8. wg.Add(4)
  9. // Subscribe
  10. if _, err := nc.Subscribe("time.>", func(m *nats.Msg) {
  11. log.Printf("%s: %s", m.Subject, m.Data)
  12. wg.Done()
  13. }); err != nil {
  14. log.Fatal(err)
  15. }
  16. // Wait for the 4 messages to come in
  17. wg.Wait()
  18. // Close the connection
  19. nc.Close()

{% endtab %}

{% tab title=”Java” %}

  1. Connection nc = Nats.connect("nats://demo.nats.io:4222");
  2. // Use a latch to wait for 4 messages to arrive
  3. CountDownLatch latch = new CountDownLatch(4);
  4. // Create a dispatcher and inline message handler
  5. Dispatcher d = nc.createDispatcher((msg) -> {
  6. String subject = msg.getSubject();
  7. String str = new String(msg.getData(), StandardCharsets.UTF_8);
  8. System.out.println(subject + ": " + str);
  9. latch.countDown();
  10. });
  11. // Subscribe
  12. d.subscribe("time.>");
  13. // Wait for messages to come in
  14. latch.await();
  15. // Close the connection
  16. nc.close();

{% endtab %}

{% tab title=”JavaScript” %}

  1. let nc = NATS.connect({
  2. url: "nats://demo.nats.io:4222"
  3. });
  4. nc.subscribe('time.>', (msg, reply, subject) => {
  5. // converting timezones correctly in node requires a library
  6. // this doesn't take into account *many* things.
  7. let time = "";
  8. switch (subject) {
  9. case 'time.us.east':
  10. time = new Date().toLocaleTimeString("en-us", {timeZone: "America/New_York"});
  11. break;
  12. case 'time.us.central':
  13. time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Chicago"});
  14. break;
  15. case 'time.us.mountain':
  16. time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Denver"});
  17. break;
  18. case 'time.us.west':
  19. time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Los_Angeles"});
  20. break;
  21. default:
  22. time = "I don't know what you are talking about Willis";
  23. }
  24. t.log(subject, time);
  25. });

{% endtab %}

{% tab title=”Python” %}

  1. nc = NATS()
  2. await nc.connect(servers=["nats://demo.nats.io:4222"])
  3. # Use queue to wait for 4 messages to arrive
  4. queue = asyncio.Queue()
  5. async def cb(msg):
  6. await queue.put(msg)
  7. await nc.subscribe("time.>", cb=cb)
  8. # Send 2 messages and wait for them to come in
  9. await nc.publish("time.A.east", b'A')
  10. await nc.publish("time.B.east", b'B')
  11. await nc.publish("time.C.west", b'C')
  12. await nc.publish("time.D.west", b'D')
  13. for i in range(0, 4):
  14. msg = await queue.get()
  15. print("Msg:", msg)
  16. await nc.close()

{% endtab %}

{% tab title=”Ruby” %}

  1. require 'nats/client'
  2. require 'fiber'
  3. NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
  4. Fiber.new do
  5. f = Fiber.current
  6. nc.subscribe("time.>") do |msg, reply|
  7. f.resume Time.now.to_f
  8. end
  9. nc.publish("time.A.east", "A")
  10. nc.publish("time.B.east", "B")
  11. nc.publish("time.C.west", "C")
  12. nc.publish("time.D.west", "D")
  13. # Use the response
  14. 4.times do
  15. msg = Fiber.yield
  16. puts "Msg: #{msg}"
  17. end
  18. end.resume
  19. end

{% endtab %}

{% tab title=”TypeScript” %}

  1. await nc.subscribe('time.>', (err, msg) => {
  2. // converting timezones correctly in node requires a library
  3. // this doesn't take into account *many* things.
  4. let time = "";
  5. switch (msg.subject) {
  6. case 'time.us.east':
  7. time = new Date().toLocaleTimeString("en-us", {timeZone: "America/New_York"});
  8. break;
  9. case 'time.us.central':
  10. time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Chicago"});
  11. break;
  12. case 'time.us.mountain':
  13. time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Denver"});
  14. break;
  15. case 'time.us.west':
  16. time = new Date().toLocaleTimeString("en-us", {timeZone: "America/Los_Angeles"});
  17. break;
  18. default:
  19. time = "I don't know what you are talking about Willis";
  20. }
  21. t.log(msg.subject, time);
  22. });

{% endtab %}

{% tab title=”C” %}

  1. static void
  2. onMsg(natsConnection *conn, natsSubscription *sub, natsMsg *msg, void *closure)
  3. {
  4. printf("Received msg: %s - %.*s\n",
  5. natsMsg_GetSubject(msg),
  6. natsMsg_GetDataLength(msg),
  7. natsMsg_GetData(msg));
  8. // Need to destroy the message!
  9. natsMsg_Destroy(msg);
  10. }
  11. (...)
  12. natsConnection *conn = NULL;
  13. natsSubscription *sub = NULL;
  14. natsStatus s;
  15. s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
  16. if (s == NATS_OK)
  17. s = natsConnection_Subscribe(&sub, conn, "time.>", onMsg, NULL);
  18. (...)
  19. // Destroy objects that were created
  20. natsSubscription_Destroy(sub);
  21. natsConnection_Destroy(conn);

{% endtab %} {% endtabs %}

The following example can be used to test these two subscribers. The * subscriber should receive at most 2 messages, while the > subscriber receives 4. More importantly the time.*.east subscriber won’t receive on time.us.east.atlanta because that won’t match.

{% tabs %} {% tab title=”Go” %}

  1. nc, err := nats.Connect("demo.nats.io")
  2. if err != nil {
  3. log.Fatal(err)
  4. }
  5. defer nc.Close()
  6. zoneID, err := time.LoadLocation("America/New_York")
  7. if err != nil {
  8. log.Fatal(err)
  9. }
  10. now := time.Now()
  11. zoneDateTime := now.In(zoneID)
  12. formatted := zoneDateTime.String()
  13. nc.Publish("time.us.east", []byte(formatted))
  14. nc.Publish("time.us.east.atlanta", []byte(formatted))
  15. zoneID, err = time.LoadLocation("Europe/Warsaw")
  16. if err != nil {
  17. log.Fatal(err)
  18. }
  19. zoneDateTime = now.In(zoneID)
  20. formatted = zoneDateTime.String()
  21. nc.Publish("time.eu.east", []byte(formatted))
  22. nc.Publish("time.eu.east.warsaw", []byte(formatted))

{% endtab %}

{% tab title=”Java” %}

  1. Connection nc = Nats.connect("nats://demo.nats.io:4222");
  2. ZoneId zoneId = ZoneId.of("America/New_York");
  3. ZonedDateTime zonedDateTime = ZonedDateTime.ofInstant(Instant.now(), zoneId);
  4. String formatted = zonedDateTime.format(DateTimeFormatter.ISO_ZONED_DATE_TIME);
  5. nc.publish("time.us.east", formatted.getBytes(StandardCharsets.UTF_8));
  6. nc.publish("time.us.east.atlanta", formatted.getBytes(StandardCharsets.UTF_8));
  7. zoneId = ZoneId.of("Europe/Warsaw");
  8. zonedDateTime = ZonedDateTime.ofInstant(Instant.now(), zoneId);
  9. formatted = zonedDateTime.format(DateTimeFormatter.ISO_ZONED_DATE_TIME);
  10. nc.publish("time.eu.east", formatted.getBytes(StandardCharsets.UTF_8));
  11. nc.publish("time.eu.east.warsaw", formatted.getBytes(StandardCharsets.UTF_8));
  12. nc.flush(Duration.ZERO);
  13. nc.close();

{% endtab %}

{% tab title=”JavaScript” %}

  1. nc.publish('time.us.east');
  2. nc.publish('time.us.central');
  3. nc.publish('time.us.mountain');
  4. nc.publish('time.us.west');

{% endtab %}

{% tab title=”Python” %}

  1. nc = NATS()
  2. await nc.connect(servers=["nats://demo.nats.io:4222"])
  3. await nc.publish("time.us.east", b'...')
  4. await nc.publish("time.us.east.atlanta", b'...')
  5. await nc.publish("time.eu.east", b'...')
  6. await nc.publish("time.eu.east.warsaw", b'...')
  7. await nc.close()

{% endtab %}

{% tab title=”Ruby” %}

  1. NATS.start do |nc|
  2. nc.publish("time.us.east", '...')
  3. nc.publish("time.us.east.atlanta", '...')
  4. nc.publish("time.eu.east", '...')
  5. nc.publish("time.eu.east.warsaw", '...')
  6. nc.drain
  7. end

{% endtab %}

{% tab title=”TypeScript” %}

  1. nc.publish('time.us.east');
  2. nc.publish('time.us.central');
  3. nc.publish('time.us.mountain');
  4. nc.publish('time.us.west');

{% endtab %} {% endtabs %}