NATS – Messaging System

NATS, developed in the Go programming language, is a high-performance, lightweight open-source messaging system designed for cloud-native applications, IoT messaging, microservice architectures, and more. It is based on a simple and highly scalable Pub/Sub model and offers various other messaging patterns and features.
The graph below, which compares the throughput of various message queues, clearly demonstrates NATS’s superior performance. NATS exhibits exceptionally high throughput, with both publishing and subscribing rates exceeding approximately 180,000 messages per second. In comparison, Kafka shows a publishing throughput of around 90,000 and a subscribing throughput of about 60,000, making it the next highest in terms of throughput but still operating at roughly half the speed of NATS.
* Basic Messaging Patterns
  > Publish-Subscribe: A publisher sends messages to a subject, and subscribers listening to that subject receive the messages.
  > Request-Reply: A client sends a request and waits for a response from a service.
  > Queue Groups: Messages are load-balanced among a group of subscribers, ensuring that each message is processed by only one subscriber in the group.

1. Running a NATS server

Download NATS Server Binaries: Download the latest version of the NATS server binaries from GitHub.
				
					wget https://github.com/nats-io/nats-server/releases/download/v2.10.2/nats-server-v2.10.2-linux-amd64.tar.gz

				
			
Unzip: Unzip the downloaded file.
				
					tar -xzf nats-server-v2.10.2-linux-amd64.tar.gz

				
			
Run Server: Runs the NATS server with default settings.
				
					nats-server-v2.10.2-linux-amd64/nats-server
				
			

2. Python Test

All examples use Python’s nats-py library. The nats-py library can be installed with the following command
				
					pip install nats-py

				
			

3. Publish-Subscribe

A Publisher sends messages to a specific subject.
Subscribers listen to that subject and receive messages.
Publisher (publisher.py)
				
					import asyncio
from nats.aio.client import Client as NATS

async def main():
    nc = NATS()
    await nc.connect("nats://127.0.0.1:4222")

    # Message to publish
    message = "Hello, this is a NATS message."

    # Publish the message
    await nc.publish("greetings", message.encode('utf-8'))
    print("Published a message.")

    await nc.close()

if __name__ == '__main__':
    asyncio.run(main())
				
			
Subscriber (publisher.py)
				
					import asyncio
from nats.aio.client import Client as NATS

async def message_handler(msg):
    data = msg.data.decode('utf-8')
    print(f"Received message: {data}")

async def main():
    nc = NATS()

    await nc.connect("nats://127.0.0.1:4222")

    # Subscribe to the subject
    await nc.subscribe("greetings", cb=message_handler)
    print("Subscribed to subject 'greetings'.")

    # Keep the subscriber running
    while True:
        await asyncio.sleep(1)

if __name__ == '__main__':
    asyncio.run(main())

				
			
How to run
				
					# Run the Subscriber
python subscriber.py

# In another terminal, Run the Publisher
python publisher.py
				
			

4. Request-Reply

Requester sends a request message to a subject and waits for a reply.
Replier subscribes to that subject and responds to requests.
Requester (requester.py)
				
					import asyncio
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrTimeout

async def main():
    nc = NATS()
    await nc.connect("nats://127.0.0.1:4222")

    # Request message
    message = "What is the current time?"

    try:
        # Send a request
        response = await nc.request("time.request", message.encode('utf-8'), timeout=2)
        print(f"Received response: {response.data.decode('utf-8')}")
    except ErrTimeout:
        print("Request timed out.")

    await nc.close()

if __name__ == '__main__':
    asyncio.run(main())
				
			
Replier (replier.py)
				
					import asyncio
from datetime import datetime
from nats.aio.client import Client as NATS

async def main():
    nc = NATS()

    async def request_handler(msg):
        # Print the request message
        request = msg.data.decode('utf-8')
        print(f"Received request: {request}")

        # Generate current time
        current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        response = f"The current time is {current_time}."

        # Send the response
        print(f"response subject: {msg.reply}")
        await nc.publish(msg.reply, response.encode('utf-8'))
        print("Sent a response.")

    await nc.connect("nats://127.0.0.1:4222")

    # Subscribe to the subject
    await nc.subscribe("time.request", cb=request_handler)
    print("Subscribed to subject 'time.request' and ready to handle requests.")

    # Keep the replier running
    while True:
        await asyncio.sleep(1)

if __name__ == '__main__':
    asyncio.run(main())
				
			
How to run
				
					# Run the Subscriber
python replier.py

# In another terminal, Run the Publisher
python requester.py
				
			

5. Queue Groups (Load Balancing)

Multiple workers subscribe to the same subject within a queue group.
Messages are load-balanced among the workers; each message is delivered to only one worker in the group.
Task Publisher (task_publisher.py)
				
					import asyncio
from nats.aio.client import Client as NATS

async def main():
    nc = NATS()
    await nc.connect("nats://127.0.0.1:4222")

    # Publish tasks
    for i in range(1, 11):
        task = f"Task number {i}"
        await nc.publish("tasks", task.encode('utf-8'))
        print(f"Published task: {task}")
        await asyncio.sleep(0.5)

    await nc.close()

if __name__ == '__main__':
    asyncio.run(main())
				
			
Worker (worker.py)
				
					import asyncio
import sys
from nats.aio.client import Client as NATS

async def main(worker_id):
    nc = NATS()

    async def task_handler(msg):
        task = msg.data.decode('utf-8')
        print(f"Worker {worker_id} processing {task}...")
        # Simulate task processing
        await asyncio.sleep(1)
        print(f"Worker {worker_id} completed {task}")

    await nc.connect("nats://127.0.0.1:4222")

    # Subscribe to the subject within a queue group
    await nc.subscribe("tasks", queue="workers", cb=task_handler)
    print(f"Worker {worker_id} has started.")

    # Keep the worker running
    while True:
        await asyncio.sleep(1)

if __name__ == '__main__':
    worker_id = sys.argv[1] if len(sys.argv) > 1 else '1'
    asyncio.run(main(worker_id))
				
			
How to run
				
					# Start multiple workers in separate terminals
python worker.py 1
python worker.py 2
python worker.py 3

# In another terminal, Run the Task Publisher
python task_publisher.py
				
			

6. Message Headers

Messages can include headers to pass metadata along with the message body.
Publisher with Headers (header_publisher.py)
				
					import asyncio
from nats.aio.client import Client as NATS

async def main():
    nc = NATS()
    await nc.connect("nats://127.0.0.1:4222")

    # Set headers
    headers = {
        "Content-Type": "application/json",
        "X-Request-ID": "12345"
    }

    # Message to publish
    message = '{"message": "This message includes headers."}'

    # Publish the message with headers
    await nc.publish("updates", message.encode('utf-8'), headers=headers)
    print("Published a message with headers.")

    await nc.close()

if __name__ == '__main__':
    asyncio.run(main())
				
			
Subscriber Reading Headers (header_subscriber.py)
				
					import asyncio
from nats.aio.client import Client as NATS

async def main():
    nc = NATS()

    async def message_handler(msg):
        data = msg.data.decode('utf-8')
        headers = msg.headers

        print(f"Received message: {data}")
        if headers:
            for key, value in headers.items():
                print(f"Header {key}: {value}")
        else:
            print("No headers received.")

    await nc.connect("nats://127.0.0.1:4222")

    # Subscribe to the subject
    await nc.subscribe("updates", cb=message_handler)
    print("Subscribed to subject 'updates'.")

    # Keep the subscriber running
    while True:
        await asyncio.sleep(1)

if __name__ == '__main__':
    asyncio.run(main())
				
			
How to run
				
					# Run the subscriber
python header_subscriber.py

# In another terminal, Run the publisher
python task_publisher.py
				
			

Leave a Reply

Your email address will not be published. Required fields are marked *