NATS JetStream은 고급 메시지 스트리밍 및 데이터 저장 기능을 제공하는 확장 기능으로, NATS의 기본 메시징 기능을 확장하여 더 복잡한 애플리케이션 요구 사항을 만족할수 있게 만들어 줍니다. JetStream은 주로 지속적인 메시지 저장, 내구성, 메시지 재처리, 스트림 처리를 위해 사용됩니다.
JetStream의 주요 용도.
- 내구성 있는 메시징: JetStream은 메시지를 디스크에 영구 저장하여 클라이언트가 이후에 메시지를 다시 가져올 수 있도록 합니다. 이를 통해 메시지가 손실되지 않으며, 메시지가 발행되었을 때 구독자가 연결되어 있지 않았더라도 나중에 접근할 수 있게 합니다. 이러한 기능은 스트림을 통해 관리되며, 각 스트림은 메시지 보존 방식을 정의하고, 보존 기간, 크기, 구독자 관심도와 같은 제한을 설정할 수 있습니다.
- 메시지 재처리: JetStream은 이전에 전송된 메시지를 재생할 수 있는 기능을 제공합니다. 이는 데이터 분석이나 메시지 재처리가 필요한 애플리케이션에서 유용하게 사용할 수 있습니다. JetStream의 스트림은 일반적인 NATS 주제를 소비하며, 해당 주제로 발행된 모든 메시지를 정의된 저장 시스템에 캡처하고 나중에 재생할 수 있습니다.
- 스트림 처리: JetStream은 연속적인 메시지를 스트림으로 관리할 수 있게 하여 복잡한 데이터 흐름 및 스트림 처리 애플리케이션을 구축하는 데 도움을 줍니다. 스트림은 메시지 보존을 위한 구조를 제공하며, 이를 통해 애플리케이션이 대량의 데이터를 효율적으로 관리하고 분석할 수 있습니다.
- 메시지 순서 보장: JetStream은 메시지의 순서를 보장하여, 메시지 순서 일관성이 필요한 애플리케이션에서 올바르게 처리될 수 있도록 지원합니다.
- 확인 응답을 통한 향상된 발행: 스트림에 메시지를 보내기 위해 일반 NATS 발행 호출을 사용할 수 있지만, JetStream 전용 발행 호출을 사용하는 것이 더 신뢰할 수 있습니다. 이 방법은 JetStream 서버가 메시지가 성공적으로 저장되었음을 확인하는 응답을 제공하여 높은 수준의 전송 보장을 제공합니다.
- 확장성 및 성능: JetStream은 NATS의 경량 및 높은 성능 특성을 유지하면서도, 더 복잡한 작업을 지원하기 위한 추가 저장 및 처리 기능을 제공합니다. 복잡한 분산 시스템의 요구를 충족시키기 위해 잘 확장되며, 여전히 저지연 메시징을 보장합니다.
이러한 고급 기능들은 JetStream을 로그 관리, 이벤트 소싱, 스트리밍 데이터 분석 및 분산 시스템의 메시지 큐로서 특히 가치 있게 만듭니다. 메시지를 사용자 정의된 보존 정책으로 저장하고 신뢰할 수 있는 메시지 확인을 제공하는 기능은 중요한 애플리케이션 인프라에서 JetStream의 역할을 더욱 강화합니다.
1. NATS 서버 실행
NATS 서버 바이너리 다운로드: 최신 버전의 NATS 서버 바이너리를 GitHub에서 다운로드합니다.
wget https://github.com/nats-io/nats-server/releases/download/v2.10.2/nats-server-v2.10.2-linux-amd64.tar.gz
압축 해제: 다운로드한 파일을 압축 해제합니다.
tar -xzf nats-server-v2.10.2-linux-amd64.tar.gz
서버 실행: NATS 서버를 기본 설정으로 실행.(-js option을 지정해야 함)
nats-server-v2.10.2-linux-amd64/nats-server -js
2. Python 테스트
모든 예제는 Python의 nats-py 라이브러리를 사용합니다. nats-py 라이브러리는 다음 명령어로 설치할 수 있습니다.
pip install nats-py
3. JetStream
NATS JetStream에서 스트림(stream)은 기본 NATS 메시징 기능을 넘어서, 고급 메시지 저장 및 관리 기능을 제공하는 핵심 개념입니다. 스트림은 논리적 메시지 저장소로서 메시지가 어떻게 보존되고 처리되는지 정의하며, 이를 재사용하거나 재처리할 수 있는 기능을 제공합니다.
스트림은 NATS JetStream에서 특정 주제(토픽)로 발행된 모든 메시지를 캡처하고, 이를 지속적으로 저장하여 애플리케이션이 나중에 이 메시지를 사용할 수 있도록 하는 메시지 저장소입니다. 이를 통해 JetStream은 지속적이고 내구성 있는 메시징과 재생 가능한 메시징을 제공하여, 더 안정적인 메시지 흐름을 관리할 수 있게 합니다.
스트림의 주요 기능
- 메시지 보존 :
> 스트림은 특정 주제로 발행된 메시지를 저장하며, 이를 나중에 재생할 수 있습니다.
> 보존 정책을 설정하여 메시지를 얼마나 오래 보존할지 정의할 수 있습니다. 보존 정책은 시간(duration), 메시지 수 또는 총 저장 용량에 따라 설정할 수 있습니다. - 보존 정책 :
> 한계 기반 보존: 메시지 수, 스트림 크기 등 특정 한계에 도달할 때까지 메시지를 보존합니다.
> 관심 기반 보존: 활성 구독자가 존재하는 동안 메시지를 보존합니다.
> 작업 대기열 보존: 메시지가 소비자에게 성공적으로 전달되면 삭제됩니다. - 메시지 순서 보장 :
> 스트림은 메시지가 발행된 순서대로 저장되며, 이를 통해 메시지 순서를 보장합니다. 이는 메시지 순서가 중요한 애플리케이션에서 일관성 있게 처리될 수 있도록 지원합니다. - 저장 옵션 : 스트림 저장을 위한 다양한 저장 옵션을 제공.
> 메모리 저장: 메시지를 메모리에 저장하여 빠르게 액세스할 수 있지만, 휘발성이 있습니다.
> 파일 저장: 메시지를 디스크에 저장하여 내구성 및 지속성을 보장합니다. - 주제(Subjects) :
> 스트림은 하나 이상의 주제와 연관되어 있습니다. 해당 주제로 발행된 모든 메시지는 스트림에 저장됩니다.
> 예를 들어,orders.*
주제에 대해 스트림을 설정하면orders.created
,orders.updated
등과 같은 주제로 발행된 모든 메시지가 저장됩니다. - 스트림 구성 : 스트림을 생성할 때 다양한 매개변수를 지정 할 수 있음.
> 이름(Name): 스트림의 고유 이름.
> 주제(Subjects): 캡처할 메시지의 주제 또는 와일드카드 패턴.
> 보존 정책(Retention Policy): 메시지를 얼마나 오래 보존할지 결정.
> 저장 유형(Storage Type):memory
또는file
저장 방식 선택.
> 최대 보존 시간(Maximum Age): 각 메시지를 얼마나 오래 보관할지 설정.
> 한계(Limits): 스트림이 보관할 수 있는 최대 메시지 수 또는 저장 용량.
스트림 생성 (stream.py)
import asyncio
from nats.aio.client import Client as NATS
from nats.js.api import StreamConfig, RetentionPolicy, StorageType
async def main():
# Create a NATS client instance
nc = NATS()
# Connect to NATS server
await nc.connect(servers=["nats://localhost:4222"])
# Create JetStream context
js = nc.jetstream()
stream_name = "example_stream"
# Check if the stream already exists
try:
stream_info = await js.stream_info(stream_name)
# If it exists, delete the stream
await js.delete_stream(stream_name)
print(f"Existing stream '{stream_name}' deleted.")
except Exception as e:
# Stream does not exist
print(f"Stream '{stream_name}' does not exist. Proceeding to create a new one.")
# Define stream configuration with correct enums and integer max_age
config = StreamConfig(
name=stream_name, # Stream name
subjects=["my.subject.*"], # Subjects to subscribe
retention=RetentionPolicy.LIMITS, # Retention policy: LIMITS, INTEREST, or WORK_QUEUE
storage=StorageType.FILE, # Storage type: MEMORY or FILE
max_age=9 * 10**9, # Max age in nanoseconds (9 seconds)
max_msgs=1000, # Max number of messages
max_bytes=10 * 1024 * 1024, # Max storage size in bytes (10MB)
)
# Add stream to JetStream context
try:
await js.add_stream(config=config)
print(f"Stream '{config.name}' created successfully.")
except Exception as e:
print(f"Error creating stream: {e}")
# Close the NATS connection
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
테스트 실행
# Run the setup
python stream.py
4. Publish-Subscribe
발행자 : 특정 주제로 메시지를 발행.
구독자 : 특정 주제를 구독해서 메시지를 수신.

발행자 (publisher.py)
import asyncio
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers
async def publish_messages(js, subject):
"""
Publishes a series of messages to the specified subject.
"""
try:
for i in range(1, 6):
message = f"Message {i}"
# Publish the message to the subject
await js.publish(subject, message.encode())
print(f"Published: {message}")
await asyncio.sleep(1) # Wait for a second before sending the next message
except Exception as e:
print(f"Error publishing message: {e}")
async def main():
"""
Main coroutine that sets up the NATS client, connects to the server,
and publishes messages to a subject.
"""
nc = NATS()
try:
# Connect to the NATS server
await nc.connect(servers=["nats://localhost:4222"])
print("Publisher connected to NATS server.")
# Create JetStream context
js = nc.jetstream()
# Define the subject to publish to
subject = "my.subject.test"
# Publish messages
await publish_messages(js, subject)
except ErrNoServers as e:
print(f"Could not connect to NATS server: {e}")
except Exception as e:
print(f"An error occurred: {e}")
finally:
# Close the NATS connection
await nc.close()
print("Publisher disconnected from NATS server.")
if __name__ == '__main__':
asyncio.run(main())
구독자 (subscriber.py)
import asyncio
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers
async def message_handler(msg):
"""
Callback function to handle received messages.
"""
data = msg.data.decode()
print(f"Received a message on '{msg.subject}': {data}")
await msg.ack() # Acknowledge the message
async def subscribe_to_subject(js, subject):
"""
Subscribes to the specified subject and handles incoming messages.
"""
try:
# Subscribe to the subject without a queue group
await js.subscribe(subject, cb=message_handler)
print(f"Subscriber is listening on '{subject}'.")
except Exception as e:
print(f"Error subscribing to subject: {e}")
async def main():
"""
Main coroutine that sets up the NATS client, connects to the server,
and subscribes to a subject to receive messages.
"""
nc = NATS()
try:
# Connect to the NATS server
await nc.connect(servers=["nats://localhost:4222"])
print("Subscriber connected to NATS server.")
# Create JetStream context
js = nc.jetstream()
# Define the subject to subscribe to
subject = "my.subject.*"
# Subscribe to the subject
await subscribe_to_subject(js, subject)
# Keep the subscriber running to listen for incoming messages
print("Subscriber is running. Press Ctrl+C to exit.")
while True:
await asyncio.sleep(1)
except ErrNoServers as e:
print(f"Could not connect to NATS server: {e}")
except asyncio.CancelledError:
pass # Handle task cancellation gracefully
except Exception as e:
print(f"An error occurred: {e}")
finally:
# Close the NATS connection
await nc.close()
print("Subscriber disconnected from NATS server.")
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\nSubscriber stopped by user.")
테스트 실행
# Run multiple subscribers in several different terminals
python subscriber.py
python subscriber.py
python subscriber.py
# In another terminal, Run the Publisher
python publisher.py
5. Request-Reply
요청자 : 특정 주제로 요청 메시지를 송신.
응답자 : 특정 주제를 구독하고 요청에 응답.

요청자 (requester.py)
import asyncio
import nats
from nats.aio.msg import Msg
import uuid
from functools import partial
# Subscribe to the reply subject to receive responses from consumers
async def response_handler(msg: Msg, responses: list):
# Decode the response message
response = msg.data.decode()
print(f"Received response: {response}")
responses.append(response)
async def collect_responses(responses):
# Wait until the expected number of responses are received
expected_responses = 1 # Set this based on the number of consumers
while len(responses) < expected_responses:
await asyncio.sleep(0.1)
print("Collected all responses")
async def main():
# Connect to the NATS server
nc = await nats.connect("nats://localhost:4222")
# Create a JetStream context
js = nc.jetstream()
# Create a unique inbox (reply subject) for receiving responses
reply_subject = nc.new_inbox()
responses = []
# Subscribe to the reply subject
handler = partial(response_handler, responses=responses)
await nc.subscribe(reply_subject, cb=handler)
# Include the reply subject in the message headers
headers = {'reply-to': reply_subject}
# Publish a request message with the reply subject included in headers
await js.publish(
subject='my.subject.requests',
payload=b'Hello, Consumers!',
headers=headers
)
print("Published request with reply subject in headers:", reply_subject)
# Wait for responses with a timeout
try:
await asyncio.wait_for(collect_responses(responses), timeout=5)
except asyncio.TimeoutError:
print("Timed out waiting for responses")
# Print all collected responses
print("All responses:", responses)
# Close the NATS connection
await nc.close()
if __name__ == '__main__':
asyncio.run(main())
응답자 (replier.py)
import asyncio
import nats
from nats.aio.msg import Msg
import uuid
from functools import partial
# Define the message handler coroutine
async def message_handler(msg: Msg, consumer_name: str, nc: nats.aio.client.Client):
# Decode and process the message
request = msg.data.decode()
print(f"{consumer_name} received request: {request}")
# Extract the reply subject from the message headers
reply_subject = msg.header.get('reply-to') if msg.header else None
# Generate a response
response = f"Response from {consumer_name} : {request.upper()}"
# Send the response back to the reply subject if it exists
if reply_subject:
await nc.publish(reply_subject, response.encode())
print(f"{consumer_name} sent response")
# Acknowledge the message to confirm processing
await msg.ack()
async def main():
# Connect to the NATS server
nc = await nats.connect("nats://localhost:4222")
# Create a JetStream context
js = nc.jetstream()
# Generate a unique durable consumer name
consumer_name = f'consumer_{uuid.uuid4()}'
print(f"{consumer_name} is listening for messages...")
# Subscribe to the 'requests' subject using push-based subscription
handler = partial(message_handler, consumer_name=consumer_name, nc=nc)
await js.subscribe(
subject='my.subject.requests',
durable=consumer_name,
stream='example_stream',
cb=handler,
manual_ack=True
)
# Keep the consumer running
await asyncio.Event().wait()
if __name__ == '__main__':
asyncio.run(main())
테스트 실행
# Run the Subscriber
python replier.py
# In another terminal, Run the Publisher
python requester.py
6. Queue Groups (로드 밸런싱)
여러 작업자(워커)가 같은 큐 그룹에 가입하여 작업을 분산 처리.
메시지는 큐 그룹 내의 한 작업자에게만 전달됨.

작업 발행자 (task_publisher.py)
import asyncio
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers
async def publish_messages(js, subject):
"""
Publishes a series of messages to the specified subject.
"""
try:
for i in range(1, 6):
message = f"Message {i}"
# Publish the message to the subject
await js.publish(subject, message.encode())
print(f"Published: {message}")
await asyncio.sleep(1) # Wait for a second before sending the next message
except Exception as e:
print(f"Error publishing message: {e}")
async def main():
"""
Main coroutine that sets up the NATS client, connects to the server,
and publishes messages to a subject.
"""
nc = NATS()
try:
# Connect to the NATS server
await nc.connect(servers=["nats://localhost:4222"])
print("Publisher connected to NATS server.")
# Create JetStream context
js = nc.jetstream()
# Define the subject to publish to
subject = "my.subject.test"
# Publish messages
await publish_messages(js, subject)
except ErrNoServers as e:
print(f"Could not connect to NATS server: {e}")
except Exception as e:
print(f"An error occurred: {e}")
finally:
# Close the NATS connection
await nc.close()
print("Publisher disconnected from NATS server.")
if __name__ == '__main__':
asyncio.run(main())
작업자 (worker.py)
import asyncio
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers
from functools import partial
async def message_handler(msg, name):
"""
Callback function to handle received messages.
"""
data = msg.data.decode()
print(f"[{name}] Received a message on '{msg.subject}': {data}")
await msg.ack() # Acknowledge the message
async def subscribe_to_subject(js, subject, queue_group, name):
"""
Subscribes to the specified subject with a given queue group and handles incoming messages.
"""
handler = partial(message_handler, name=name)
try:
# Subscribe to the subject with the specified queue group
await js.subscribe(subject, queue=queue_group, cb=handler)
print(f"Subscriber is listening on '{subject}' with queue group '{queue_group}'.")
except Exception as e:
print(f"Error subscribing to subject: {e}")
async def main(name):
"""
Main coroutine that sets up the NATS client, connects to the server,
and subscribes to a subject to receive messages.
"""
nc = NATS()
try:
# Connect to the NATS server
await nc.connect(servers=["nats://nats.nats:4222"])
print("Subscriber connected to NATS server.")
# Create JetStream context
js = nc.jetstream()
# Define the subject to subscribe to and the queue group
subject = "my.subject.*"
queue_group = "workers"
# Subscribe to the subject
await subscribe_to_subject(js, subject, queue_group, name)
# Keep the subscriber running to listen for incoming messages
print(f"Subscriber '{name}' is running. Press Ctrl+C to exit.")
while True:
await asyncio.sleep(1)
except ErrNoServers as e:
print(f"Could not connect to NATS server: {e}")
except asyncio.CancelledError:
pass # Handle task cancellation gracefully
except Exception as e:
print(f"An error occurred: {e}")
finally:
# Close the NATS connection
await nc.close()
print("Subscriber disconnected from NATS server.")
if __name__ == '__main__':
# Define the queue group name
queue_group = "order_group"
# Create two consumer tasks in the same queue group
consumers = [
main("Consumer1"),
main("Consumer2"),
]
# Run both consumers concurrently
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(asyncio.gather(*consumers))
except KeyboardInterrupt:
print("Consumers are shutting down.")
테스트 실행
# Start multiple workers
python worker.py
# In another terminal, Run the Task Publisher
python task_publisher.py