NATS JetStream is an extension that provides advanced message streaming and data storage capabilities, expanding upon NATS’s core messaging functionality to meet more complex application requirements. JetStream is primarily used for persistent message storage, durability, message reprocessing, and stream processing.
Key Use Cases for JetStream.
- Durable Messaging: JetStream persistently stores messages on disk, allowing clients to retrieve messages later. This ensures no message loss, enabling access to messages even if subscribers were disconnected at the time of publication. This functionality is managed through streams, each of which defines a message retention policy and can set limitations such as retention duration, storage size, and subscriber interest.
- Message Reprocessing: JetStream provides the ability to replay previously sent messages, which is useful in applications requiring data analysis or message reprocessing. Streams in JetStream consume standard NATS subjects, capturing all messages published to a given subject within a defined storage system that can be replayed later.
- Stream Processing: JetStream enables managing sequential messages as streams, helping in the construction of complex data flows and stream-processing applications. Streams provide a structure for message retention, allowing applications to efficiently manage and analyze large volumes of data.
- Message Order Guarantee: JetStream ensures message order, supporting applications where consistency in message sequence is crucial for correct processing.
- Enhanced Publishing with Acknowledgments: While standard NATS publish calls can be used to send messages to a stream, JetStream-specific publishing calls are more reliable. This method provides a confirmation from the JetStream server that messages have been successfully stored, offering a high level of delivery guarantee.
- Scalability and Performance: JetStream maintains NATS’s lightweight and high-performance characteristics while adding storage and processing capabilities to support more complex tasks. It scales well to meet the demands of complex distributed systems, ensuring low-latency messaging. This translation covers each aspect in detail, matching your original structure and intent. Let me know if there’s anything else you’d like translated or expanded!
These advanced features make JetStream especially valuable for log management, event sourcing, streaming data analysis, and as a message queue in distributed systems. The ability to store messages with custom retention policies and to provide reliable message acknowledgments further solidifies JetStream’s role in critical application infrastructure.
1. Running a NATS server
Download NATS Server Binaries: Download the latest version of the NATS server binaries from GitHub.
wget https://github.com/nats-io/nats-server/releases/download/v2.10.2/nats-server-v2.10.2-linux-amd64.tar.gz
Unzip: Unzip the downloaded file.
tar -xzf nats-server-v2.10.2-linux-amd64.tar.gz
Run Server: Runs the NATS server with default settings.
nats-server-v2.10.2-linux-amd64/nats-server
2. Python Test
All examples use Python’s nats-py library. The nats-py library can be installed with the following command
pip install nats-py
3. JetStream
In NATS JetStream, a stream is a core concept that extends beyond the basic NATS messaging capabilities, offering advanced message storage and management functionalities. A stream acts as a logical message repository that defines how messages are retained and processed, providing options for reuse or reprocessing.
A stream in NATS JetStream captures all messages published to a specific subject (topic) and persistently stores them, allowing applications to access these messages later. This enables JetStream to offer persistent and durable messaging, along with replayable messaging, thereby facilitating more reliable message flow management.
A stream in NATS JetStream captures all messages published to a specific subject (topic) and persistently stores them, allowing applications to access these messages later. This enables JetStream to offer persistent and durable messaging, along with replayable messaging, thereby facilitating more reliable message flow management.
Key Features of Streams in JetStream
- Message Retention:
> Streams store messages published to specific subjects, which can later be replayed.
> A retention policy defines how long messages are kept, with options based on duration, message count, or total storage size. - Retention Policies:
> Limit-Based Retention: Retains messages until specific limits, such as message count or stream size, are reached.
> Interest-Based Retention: Retains messages as long as there are active subscribers.
> Work Queue Retention: Deletes messages once they are successfully delivered to a consumer. - Message Order Guarantee:
> Streams store messages in the order they are published, ensuring message sequence consistency. This supports applications where message order is critical for accurate processing. - Storage Options: JetStream provides various storage options for streams.
> Memory Storage: Stores messages in memory for quick access but is volatile.
> File Storage: Stores messages on disk, ensuring durability and persistence. - Subjects:
> A stream is associated with one or more subjects. All messages published to these subjects are stored in the stream.
> For example, setting up a stream for the subject orders.* will capture all messages published to subjects like orders. created and orders. updated. - Stream Configuration: When creating a stream, multiple parameters can be specified.
> Name: A unique identifier for the stream.
> Subjects: The subject(s) or wildcard patterns for capturing messages.
> Retention Policy: Determines how long messages are retained.
> Storage Type: Choice of memory or file storage.
> Maximum Age: Specifies how long each message is retained.
> Limits: Defines the maximum message count or storage capacity for the stream.
Creating a Stream in NATS JetStream (stream.py)
import asyncio
from nats.aio.client import Client as NATS
from nats.js.api import StreamConfig, RetentionPolicy, StorageType
async def main():
# Create a NATS client instance
nc = NATS()
# Connect to NATS server
await nc.connect(servers=["nats://localhost:4222"])
# Create JetStream context
js = nc.jetstream()
stream_name = "example_stream"
# Check if the stream already exists
try:
stream_info = await js.stream_info(stream_name)
# If it exists, delete the stream
await js.delete_stream(stream_name)
print(f"Existing stream '{stream_name}' deleted.")
except Exception as e:
# Stream does not exist
print(f"Stream '{stream_name}' does not exist. Proceeding to create a new one.")
# Define stream configuration with correct enums and integer max_age
config = StreamConfig(
name=stream_name, # Stream name
subjects=["my.subject.*"], # Subjects to subscribe
retention=RetentionPolicy.LIMITS, # Retention policy: LIMITS, INTEREST, or WORK_QUEUE
storage=StorageType.FILE, # Storage type: MEMORY or FILE
max_age=9 * 10**9, # Max age in nanoseconds (9 seconds)
max_msgs=1000, # Max number of messages
max_bytes=10 * 1024 * 1024, # Max storage size in bytes (10MB)
)
# Add stream to JetStream context
try:
await js.add_stream(config=config)
print(f"Stream '{config.name}' created successfully.")
except Exception as e:
print(f"Error creating stream: {e}")
# Close the NATS connection
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
How to run
# Run the setup
python stream.py
4. Publish-Subscribe
A Publisher sends messages to a specific subject.
Subscribers listen to that subject and receive messages.
.
Publisher (publisher.py)
import asyncio
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers
async def publish_messages(js, subject):
"""
Publishes a series of messages to the specified subject.
"""
try:
for i in range(1, 6):
message = f"Message {i}"
# Publish the message to the subject
await js.publish(subject, message.encode())
print(f"Published: {message}")
await asyncio.sleep(1) # Wait for a second before sending the next message
except Exception as e:
print(f"Error publishing message: {e}")
async def main():
"""
Main coroutine that sets up the NATS client, connects to the server,
and publishes messages to a subject.
"""
nc = NATS()
try:
# Connect to the NATS server
await nc.connect(servers=["nats://localhost:4222"])
print("Publisher connected to NATS server.")
# Create JetStream context
js = nc.jetstream()
# Define the subject to publish to
subject = "my.subject.test"
# Publish messages
await publish_messages(js, subject)
except ErrNoServers as e:
print(f"Could not connect to NATS server: {e}")
except Exception as e:
print(f"An error occurred: {e}")
finally:
# Close the NATS connection
await nc.close()
print("Publisher disconnected from NATS server.")
if __name__ == '__main__':
asyncio.run(main())
Subscriber (publisher.py)
import asyncio
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers
async def message_handler(msg):
"""
Callback function to handle received messages.
"""
data = msg.data.decode()
print(f"Received a message on '{msg.subject}': {data}")
await msg.ack() # Acknowledge the message
async def subscribe_to_subject(js, subject):
"""
Subscribes to the specified subject and handles incoming messages.
"""
try:
# Subscribe to the subject without a queue group
await js.subscribe(subject, cb=message_handler)
print(f"Subscriber is listening on '{subject}'.")
except Exception as e:
print(f"Error subscribing to subject: {e}")
async def main():
"""
Main coroutine that sets up the NATS client, connects to the server,
and subscribes to a subject to receive messages.
"""
nc = NATS()
try:
# Connect to the NATS server
await nc.connect(servers=["nats://localhost:4222"])
print("Subscriber connected to NATS server.")
# Create JetStream context
js = nc.jetstream()
# Define the subject to subscribe to
subject = "my.subject.*"
# Subscribe to the subject
await subscribe_to_subject(js, subject)
# Keep the subscriber running to listen for incoming messages
print("Subscriber is running. Press Ctrl+C to exit.")
while True:
await asyncio.sleep(1)
except ErrNoServers as e:
print(f"Could not connect to NATS server: {e}")
except asyncio.CancelledError:
pass # Handle task cancellation gracefully
except Exception as e:
print(f"An error occurred: {e}")
finally:
# Close the NATS connection
await nc.close()
print("Subscriber disconnected from NATS server.")
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nSubscriber stopped by user.")
How to run
# Run multiple subscribers in several different terminals
python subscriber.py
python subscriber.py
python subscriber.py
# In another terminal, Run the Publisher
python publisher.py
5. Request-Reply
Requester sends a request message to a subject and waits for a reply.
Replier subscribes to that subject and responds to requests.

Requester (requester.py)
import asyncio
import nats
from nats.aio.msg import Msg
import uuid
from functools import partial
# Subscribe to the reply subject to receive responses from consumers
async def response_handler(msg: Msg, responses: list):
# Decode the response message
response = msg.data.decode()
print(f"Received response: {response}")
responses.append(response)
async def collect_responses(responses):
# Wait until the expected number of responses are received
expected_responses = 1 # Set this based on the number of consumers
while len(responses) < expected_responses:
await asyncio.sleep(0.1)
print("Collected all responses")
async def main():
# Connect to the NATS server
nc = await nats.connect("nats://localhost:4222")
# Create a JetStream context
js = nc.jetstream()
# Create a unique inbox (reply subject) for receiving responses
reply_subject = nc.new_inbox()
responses = []
# Subscribe to the reply subject
handler = partial(response_handler, responses=responses)
await nc.subscribe(reply_subject, cb=handler)
# Include the reply subject in the message headers
headers = {'reply-to': reply_subject}
# Publish a request message with the reply subject included in headers
await js.publish(
subject='my.subject.requests',
payload=b'Hello, Consumers!',
headers=headers
)
print("Published request with reply subject in headers:", reply_subject)
# Wait for responses with a timeout
try:
await asyncio.wait_for(collect_responses(responses), timeout=5)
except asyncio.TimeoutError:
print("Timed out waiting for responses")
# Print all collected responses
print("All responses:", responses)
# Close the NATS connection
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
Replier (replier.py)
import asyncio
import nats
from nats.aio.msg import Msg
import uuid
from functools import partial
# Define the message handler coroutine
async def message_handler(msg: Msg, consumer_name: str, nc: nats.aio.client.Client):
# Decode and process the message
request = msg.data.decode()
print(f"{consumer_name} received request: {request}")
# Extract the reply subject from the message headers
reply_subject = msg.header.get('reply-to') if msg.header else None
# Generate a response
response = f"Response from {consumer_name} : {request.upper()}"
# Send the response back to the reply subject if it exists
if reply_subject:
await nc.publish(reply_subject, response.encode())
print(f"{consumer_name} sent response")
# Acknowledge the message to confirm processing
await msg.ack()
async def main():
# Connect to the NATS server
nc = await nats.connect("nats://localhost:4222")
# Create a JetStream context
js = nc.jetstream()
# Generate a unique durable consumer name
consumer_name = f'consumer_{uuid.uuid4()}'
print(f"{consumer_name} is listening for messages...")
# Subscribe to the 'requests' subject using push-based subscription
handler = partial(message_handler, consumer_name=consumer_name, nc=nc)
await js.subscribe(
subject='my.subject.requests',
durable=consumer_name,
stream='example_stream',
cb=handler,
manual_ack=True
)
# Keep the consumer running
await asyncio.Event().wait()
if __name__ == '__main__':
asyncio.run(main())
How to run
# Run the Subscriber
python replier.py
# In another terminal, Run the Publisher
python requester.py
6. Queue Groups (Load Balancing)
Multiple workers subscribe to the same subject within a queue group.
Messages are load-balanced among the workers; each message is delivered to only one worker in the group.

Task Publisher (task_publisher.py)
import asyncio
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers
async def publish_messages(js, subject):
"""
Publishes a series of messages to the specified subject.
"""
try:
for i in range(1, 6):
message = f"Message {i}"
# Publish the message to the subject
await js.publish(subject, message.encode())
print(f"Published: {message}")
await asyncio.sleep(1) # Wait for a second before sending the next message
except Exception as e:
print(f"Error publishing message: {e}")
async def main():
"""
Main coroutine that sets up the NATS client, connects to the server,
and publishes messages to a subject.
"""
nc = NATS()
try:
# Connect to the NATS server
await nc.connect(servers=["nats://localhost:4222"])
print("Publisher connected to NATS server.")
# Create JetStream context
js = nc.jetstream()
# Define the subject to publish to
subject = "my.subject.test"
# Publish messages
await publish_messages(js, subject)
except ErrNoServers as e:
print(f"Could not connect to NATS server: {e}")
except Exception as e:
print(f"An error occurred: {e}")
finally:
# Close the NATS connection
await nc.close()
print("Publisher disconnected from NATS server.")
if __name__ == '__main__':
asyncio.run(main())
Worker (worker.py)
import asyncio
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers
from functools import partial
async def message_handler(msg, name):
"""
Callback function to handle received messages.
"""
data = msg.data.decode()
print(f"[{name}] Received a message on '{msg.subject}': {data}")
await msg.ack() # Acknowledge the message
async def subscribe_to_subject(js, subject, queue_group, name):
"""
Subscribes to the specified subject with a given queue group and handles incoming messages.
"""
handler = partial(message_handler, name=name)
try:
# Subscribe to the subject with the specified queue group
await js.subscribe(subject, queue=queue_group, cb=handler)
print(f"Subscriber is listening on '{subject}' with queue group '{queue_group}'.")
except Exception as e:
print(f"Error subscribing to subject: {e}")
async def main(name):
"""
Main coroutine that sets up the NATS client, connects to the server,
and subscribes to a subject to receive messages.
"""
nc = NATS()
try:
# Connect to the NATS server
await nc.connect(servers=["nats://nats.nats:4222"])
print("Subscriber connected to NATS server.")
# Create JetStream context
js = nc.jetstream()
# Define the subject to subscribe to and the queue group
subject = "my.subject.*"
queue_group = "workers"
# Subscribe to the subject
await subscribe_to_subject(js, subject, queue_group, name)
# Keep the subscriber running to listen for incoming messages
print(f"Subscriber '{name}' is running. Press Ctrl+C to exit.")
while True:
await asyncio.sleep(1)
except ErrNoServers as e:
print(f"Could not connect to NATS server: {e}")
except asyncio.CancelledError:
pass # Handle task cancellation gracefully
except Exception as e:
print(f"An error occurred: {e}")
finally:
# Close the NATS connection
await nc.close()
print("Subscriber disconnected from NATS server.")
if __name__ == '__main__':
# Define the queue group name
queue_group = "order_group"
# Create two consumer tasks in the same queue group
consumers = [
main("Consumer1"),
main("Consumer2"),
]
# Run both consumers concurrently
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(asyncio.gather(*consumers))
except KeyboardInterrupt:
print("Consumers are shutting down.")
How to run
# Start multiple workers
python worker.py
# In another terminal, Run the Task Publisher
python task_publisher.py