Unsubscribing

The client libraries provide a means to unsubscribe a previous subscription request.

This process requires an interaction with the server, so for an asynchronous subscription there may be a small window of time where a message comes through as the unsubscribe is processed by the library. Ignoring that slight edge case, the client library will clean up any outstanding messages and tell the server that the subscription is no longer used.

Go

  1. nc, err := nats.Connect("demo.nats.io")
  2. if err != nil {
  3. log.Fatal(err)
  4. }
  5. defer nc.Close()
  6. // Sync Subscription
  7. sub, err := nc.SubscribeSync("updates")
  8. if err != nil {
  9. log.Fatal(err)
  10. }
  11. if err := sub.Unsubscribe(); err != nil {
  12. log.Fatal(err)
  13. }
  14. // Async Subscription
  15. sub, err = nc.Subscribe("updates", func(_ *nats.Msg) {})
  16. if err != nil {
  17. log.Fatal(err)
  18. }
  19. if err := sub.Unsubscribe(); err != nil {
  20. log.Fatal(err)
  21. }

Java

  1. Connection nc = Nats.connect("nats://demo.nats.io:4222");
  2. Dispatcher d = nc.createDispatcher((msg) -> {
  3. String str = new String(msg.getData(), StandardCharsets.UTF_8);
  4. System.out.println(str);
  5. });
  6. // Sync Subscription
  7. Subscription sub = nc.subscribe("updates");
  8. sub.unsubscribe();
  9. // Async Subscription
  10. d.subscribe("updates");
  11. d.unsubscribe("updates");
  12. // Close the connection
  13. nc.close();

JavaScript

  1. let nc = NATS.connect({
  2. url: "nats://demo.nats.io:4222"
  3. });
  4. // set up a subscription to process a request
  5. let sub = nc.subscribe(NATS.createInbox(), (msg, reply) => {
  6. if (msg.reply) {
  7. nc.publish(reply, new Date().toLocaleTimeString());
  8. }
  9. });
  10. // without arguments the subscription will cancel when the server receives it
  11. // you can also specify how many messages are expected by the subscription
  12. nc.unsubscribe(sub);

Python

  1. nc = NATS()
  2. await nc.connect(servers=["nats://demo.nats.io:4222"])
  3. future = asyncio.Future()
  4. async def cb(msg):
  5. nonlocal future
  6. future.set_result(msg)
  7. sid = await nc.subscribe("updates", cb=cb)
  8. await nc.publish("updates", b'All is Well')
  9. # Remove interest in subject
  10. await nc.unsubscribe(sid)
  11. # Won't be received...
  12. await nc.publish("updates", b'...')

Ruby

  1. require 'nats/client'
  2. require 'fiber'
  3. NATS.start(servers:["nats://127.0.0.1:4222"]) do |nc|
  4. Fiber.new do
  5. f = Fiber.current
  6. sid = nc.subscribe("time") do |msg, reply|
  7. f.resume Time.now
  8. end
  9. nc.publish("time", 'What is the time?', NATS.create_inbox)
  10. # Use the response
  11. msg = Fiber.yield
  12. puts "Reply: #{msg}"
  13. nc.unsubscribe(sid)
  14. # Won't be received
  15. nc.publish("time", 'What is the time?', NATS.create_inbox)
  16. end.resume
  17. end

TypeScript

  1. // set up a subscription to process a request
  2. let sub = await nc.subscribe(createInbox(), (err, msg) => {
  3. if (msg.reply) {
  4. nc.publish(msg.reply, new Date().toLocaleTimeString());
  5. } else {
  6. t.log('got a request for the time, but no reply subject was set.');
  7. }
  8. });
  9. // without arguments the subscription will cancel when the server receives it
  10. // you can also specify how many messages are expected by the subscription
  11. sub.unsubscribe();

C

  1. natsConnection *conn = NULL;
  2. natsSubscription *sub = NULL;
  3. natsStatus s = NATS_OK;
  4. s = natsConnection_ConnectTo(&conn, NATS_DEFAULT_URL);
  5. // Subscribe
  6. if (s == NATS_OK)
  7. s = natsConnection_SubscribeSync(&sub, conn, "updates");
  8. // Unsubscribe
  9. if (s == NATS_OK)
  10. s = natsSubscription_Unsubscribe(sub);
  11. (...)
  12. // Destroy objects that were created
  13. natsSubscription_Destroy(sub);
  14. natsConnection_Destroy(conn);