NATS, developed in the Go programming language, is a high-performance, lightweight open-source messaging system designed for cloud-native applications, IoT messaging, microservice architectures, and more. It is based on a simple and highly scalable Pub/Sub model and offers various other messaging patterns and features.
The graph below, which compares the throughput of various message queues, clearly demonstrates NATS’s superior performance. NATS exhibits exceptionally high throughput, with both publishing and subscribing rates exceeding approximately 180,000 messages per second. In comparison, Kafka shows a publishing throughput of around 90,000 and a subscribing throughput of about 60,000, making it the next highest in terms of throughput but still operating at roughly half the speed of NATS.

* Basic Messaging Patterns
> Publish-Subscribe: A publisher sends messages to a subject, and subscribers listening to that subject receive the messages.
> Request-Reply: A client sends a request and waits for a response from a service.
> Queue Groups: Messages are load-balanced among a group of subscribers, ensuring that each message is processed by only one subscriber in the group.
> Publish-Subscribe: A publisher sends messages to a subject, and subscribers listening to that subject receive the messages.
> Request-Reply: A client sends a request and waits for a response from a service.
> Queue Groups: Messages are load-balanced among a group of subscribers, ensuring that each message is processed by only one subscriber in the group.
1. Running a NATS server
Download NATS Server Binaries: Download the latest version of the NATS server binaries from GitHub.
wget https://github.com/nats-io/nats-server/releases/download/v2.10.2/nats-server-v2.10.2-linux-amd64.tar.gz
Unzip: Unzip the downloaded file.
tar -xzf nats-server-v2.10.2-linux-amd64.tar.gz
Run Server: Runs the NATS server with default settings.
nats-server-v2.10.2-linux-amd64/nats-server
2. Python Test
All examples use Python’s nats-py library. The nats-py library can be installed with the following command
pip install nats-py
3. Publish-Subscribe
A Publisher sends messages to a specific subject.
Subscribers listen to that subject and receive messages.

Publisher (publisher.py)
import asyncio
from nats.aio.client import Client as NATS
async def main():
nc = NATS()
await nc.connect("nats://127.0.0.1:4222")
# Message to publish
message = "Hello, this is a NATS message."
# Publish the message
await nc.publish("greetings", message.encode('utf-8'))
print("Published a message.")
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
Subscriber (publisher.py)
import asyncio
from nats.aio.client import Client as NATS
async def message_handler(msg):
data = msg.data.decode('utf-8')
print(f"Received message: {data}")
async def main():
nc = NATS()
await nc.connect("nats://127.0.0.1:4222")
# Subscribe to the subject
await nc.subscribe("greetings", cb=message_handler)
print("Subscribed to subject 'greetings'.")
# Keep the subscriber running
while True:
await asyncio.sleep(1)
if __name__ == '__main__':
asyncio.run(main())
How to run
# Run the Subscriber
python subscriber.py
# In another terminal, Run the Publisher
python publisher.py
4. Request-Reply
Requester sends a request message to a subject and waits for a reply.
Replier subscribes to that subject and responds to requests.

Requester (requester.py)
import asyncio
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrTimeout
async def main():
nc = NATS()
await nc.connect("nats://127.0.0.1:4222")
# Request message
message = "What is the current time?"
try:
# Send a request
response = await nc.request("time.request", message.encode('utf-8'), timeout=2)
print(f"Received response: {response.data.decode('utf-8')}")
except ErrTimeout:
print("Request timed out.")
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
Replier (replier.py)
import asyncio
from datetime import datetime
from nats.aio.client import Client as NATS
async def main():
nc = NATS()
async def request_handler(msg):
# Print the request message
request = msg.data.decode('utf-8')
print(f"Received request: {request}")
# Generate current time
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
response = f"The current time is {current_time}."
# Send the response
print(f"response subject: {msg.reply}")
await nc.publish(msg.reply, response.encode('utf-8'))
print("Sent a response.")
await nc.connect("nats://127.0.0.1:4222")
# Subscribe to the subject
await nc.subscribe("time.request", cb=request_handler)
print("Subscribed to subject 'time.request' and ready to handle requests.")
# Keep the replier running
while True:
await asyncio.sleep(1)
if __name__ == '__main__':
asyncio.run(main())
How to run
# Run the Subscriber
python replier.py
# In another terminal, Run the Publisher
python requester.py
5. Queue Groups (Load Balancing)
Multiple workers subscribe to the same subject within a queue group.
Messages are load-balanced among the workers; each message is delivered to only one worker in the group.

Task Publisher (task_publisher.py)
import asyncio
from nats.aio.client import Client as NATS
async def main():
nc = NATS()
await nc.connect("nats://127.0.0.1:4222")
# Publish tasks
for i in range(1, 11):
task = f"Task number {i}"
await nc.publish("tasks", task.encode('utf-8'))
print(f"Published task: {task}")
await asyncio.sleep(0.5)
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
Worker (worker.py)
import asyncio
import sys
from nats.aio.client import Client as NATS
async def main(worker_id):
nc = NATS()
async def task_handler(msg):
task = msg.data.decode('utf-8')
print(f"Worker {worker_id} processing {task}...")
# Simulate task processing
await asyncio.sleep(1)
print(f"Worker {worker_id} completed {task}")
await nc.connect("nats://127.0.0.1:4222")
# Subscribe to the subject within a queue group
await nc.subscribe("tasks", queue="workers", cb=task_handler)
print(f"Worker {worker_id} has started.")
# Keep the worker running
while True:
await asyncio.sleep(1)
if __name__ == '__main__':
worker_id = sys.argv[1] if len(sys.argv) > 1 else '1'
asyncio.run(main(worker_id))
How to run
# Start multiple workers in separate terminals
python worker.py 1
python worker.py 2
python worker.py 3
# In another terminal, Run the Task Publisher
python task_publisher.py
6. Message Headers
Messages can include headers to pass metadata along with the message body.
Publisher with Headers (header_publisher.py)
import asyncio
from nats.aio.client import Client as NATS
async def main():
nc = NATS()
await nc.connect("nats://127.0.0.1:4222")
# Set headers
headers = {
"Content-Type": "application/json",
"X-Request-ID": "12345"
}
# Message to publish
message = '{"message": "This message includes headers."}'
# Publish the message with headers
await nc.publish("updates", message.encode('utf-8'), headers=headers)
print("Published a message with headers.")
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
Subscriber Reading Headers (header_subscriber.py)
import asyncio
from nats.aio.client import Client as NATS
async def main():
nc = NATS()
async def message_handler(msg):
data = msg.data.decode('utf-8')
headers = msg.headers
print(f"Received message: {data}")
if headers:
for key, value in headers.items():
print(f"Header {key}: {value}")
else:
print("No headers received.")
await nc.connect("nats://127.0.0.1:4222")
# Subscribe to the subject
await nc.subscribe("updates", cb=message_handler)
print("Subscribed to subject 'updates'.")
# Keep the subscriber running
while True:
await asyncio.sleep(1)
if __name__ == '__main__':
asyncio.run(main())
How to run
# Run the subscriber
python header_subscriber.py
# In another terminal, Run the publisher
python task_publisher.py