NATS – JetStream

NATS JetStream은 고급 메시지 스트리밍 및 데이터 저장 기능을 제공하는 확장 기능으로, NATS의 기본 메시징 기능을 확장하여 더 복잡한 애플리케이션 요구 사항을 만족할수 있게 만들어 줍니다. JetStream은 주로 지속적인 메시지 저장, 내구성, 메시지 재처리, 스트림 처리를 위해 사용됩니다.
JetStream의 주요 용도.
  1. 내구성 있는 메시징: JetStream은 메시지를 디스크에 영구 저장하여 클라이언트가 이후에 메시지를 다시 가져올 수 있도록 합니다. 이를 통해 메시지가 손실되지 않으며, 메시지가 발행되었을 때 구독자가 연결되어 있지 않았더라도 나중에 접근할 수 있게 합니다. 이러한 기능은 스트림을 통해 관리되며, 각 스트림은 메시지 보존 방식을 정의하고, 보존 기간, 크기, 구독자 관심도와 같은 제한을 설정할 수 있습니다.
  2. 메시지 재처리: JetStream은 이전에 전송된 메시지를 재생할 수 있는 기능을 제공합니다. 이는 데이터 분석이나 메시지 재처리가 필요한 애플리케이션에서 유용하게 사용할 수 있습니다. JetStream의 스트림은 일반적인 NATS 주제를 소비하며, 해당 주제로 발행된 모든 메시지를 정의된 저장 시스템에 캡처하고 나중에 재생할 수 있습니다.
  3. 스트림 처리: JetStream은 연속적인 메시지를 스트림으로 관리할 수 있게 하여 복잡한 데이터 흐름 및 스트림 처리 애플리케이션을 구축하는 데 도움을 줍니다. 스트림은 메시지 보존을 위한 구조를 제공하며, 이를 통해 애플리케이션이 대량의 데이터를 효율적으로 관리하고 분석할 수 있습니다.
  4. 메시지 순서 보장: JetStream은 메시지의 순서를 보장하여, 메시지 순서 일관성이 필요한 애플리케이션에서 올바르게 처리될 수 있도록 지원합니다.
  5. 확인 응답을 통한 향상된 발행: 스트림에 메시지를 보내기 위해 일반 NATS 발행 호출을 사용할 수 있지만, JetStream 전용 발행 호출을 사용하는 것이 더 신뢰할 수 있습니다. 이 방법은 JetStream 서버가 메시지가 성공적으로 저장되었음을 확인하는 응답을 제공하여 높은 수준의 전송 보장을 제공합니다.
  6. 확장성 및 성능: JetStream은 NATS의 경량 및 높은 성능 특성을 유지하면서도, 더 복잡한 작업을 지원하기 위한 추가 저장 및 처리 기능을 제공합니다. 복잡한 분산 시스템의 요구를 충족시키기 위해 잘 확장되며, 여전히 저지연 메시징을 보장합니다.
이러한 고급 기능들은 JetStream을 로그 관리, 이벤트 소싱, 스트리밍 데이터 분석 및 분산 시스템의 메시지 큐로서 특히 가치 있게 만듭니다. 메시지를 사용자 정의된 보존 정책으로 저장하고 신뢰할 수 있는 메시지 확인을 제공하는 기능은 중요한 애플리케이션 인프라에서 JetStream의 역할을 더욱 강화합니다.

1. NATS 서버 실행

NATS 서버 바이너리 다운로드: 최신 버전의 NATS 서버 바이너리를 GitHub에서 다운로드합니다.
				
					wget https://github.com/nats-io/nats-server/releases/download/v2.10.2/nats-server-v2.10.2-linux-amd64.tar.gz

				
			
압축 해제: 다운로드한 파일을 압축 해제합니다.
				
					tar -xzf nats-server-v2.10.2-linux-amd64.tar.gz

				
			
서버 실행: NATS 서버를 기본 설정으로 실행.(-js option을 지정해야 함)
				
					nats-server-v2.10.2-linux-amd64/nats-server -js
				
			

2. Python 테스트

모든 예제는 Python의 nats-py 라이브러리를 사용합니다. nats-py 라이브러리는 다음 명령어로 설치할 수 있습니다.
				
					pip install nats-py

				
			

3. JetStream

NATS JetStream에서 스트림(stream)은 기본 NATS 메시징 기능을 넘어서, 고급 메시지 저장 및 관리 기능을 제공하는 핵심 개념입니다. 스트림은 논리적 메시지 저장소로서 메시지가 어떻게 보존되고 처리되는지 정의하며, 이를 재사용하거나 재처리할 수 있는 기능을 제공합니다.
스트림은 NATS JetStream에서 특정 주제(토픽)로 발행된 모든 메시지를 캡처하고, 이를 지속적으로 저장하여 애플리케이션이 나중에 이 메시지를 사용할 수 있도록 하는 메시지 저장소입니다. 이를 통해 JetStream은 지속적이고 내구성 있는 메시징과 재생 가능한 메시징을 제공하여, 더 안정적인 메시지 흐름을 관리할 수 있게 합니다.
스트림의 주요 기능
  1. 메시지 보존 :
    > 스트림은 특정 주제로 발행된 메시지를 저장하며, 이를 나중에 재생할 수 있습니다.
    > 보존 정책을 설정하여 메시지를 얼마나 오래 보존할지 정의할 수 있습니다. 보존 정책은 시간(duration), 메시지 수 또는 총 저장 용량에 따라 설정할 수 있습니다.
  2. 보존 정책 :
    한계 기반 보존: 메시지 수, 스트림 크기 등 특정 한계에 도달할 때까지 메시지를 보존합니다.
    관심 기반 보존: 활성 구독자가 존재하는 동안 메시지를 보존합니다.
    작업 대기열 보존: 메시지가 소비자에게 성공적으로 전달되면 삭제됩니다.
  3. 메시지 순서 보장 :
    > 스트림은 메시지가 발행된 순서대로 저장되며, 이를 통해 메시지 순서를 보장합니다. 이는 메시지 순서가 중요한 애플리케이션에서 일관성 있게 처리될 수 있도록 지원합니다.
  4. 저장 옵션 : 스트림 저장을 위한 다양한 저장 옵션을 제공.
    메모리 저장: 메시지를 메모리에 저장하여 빠르게 액세스할 수 있지만, 휘발성이 있습니다.
    파일 저장: 메시지를 디스크에 저장하여 내구성 및 지속성을 보장합니다.
  5. 주제(Subjects) :
    > 스트림은 하나 이상의 주제와 연관되어 있습니다. 해당 주제로 발행된 모든 메시지는 스트림에 저장됩니다.
    > 예를 들어, orders.* 주제에 대해 스트림을 설정하면 orders.created, orders.updated 등과 같은 주제로 발행된 모든 메시지가 저장됩니다.
  6. 스트림 구성 : 스트림을 생성할 때 다양한 매개변수를 지정 할 수 있음.
    이름(Name): 스트림의 고유 이름.
    주제(Subjects): 캡처할 메시지의 주제 또는 와일드카드 패턴.
    > 보존 정책(Retention Policy): 메시지를 얼마나 오래 보존할지 결정.
    저장 유형(Storage Type): memory 또는 file 저장 방식 선택.
    최대 보존 시간(Maximum Age): 각 메시지를 얼마나 오래 보관할지 설정.
    한계(Limits): 스트림이 보관할 수 있는 최대 메시지 수 또는 저장 용량.
스트림 생성 (stream.py)
				
					import asyncio
from nats.aio.client import Client as NATS
from nats.js.api import StreamConfig, RetentionPolicy, StorageType

async def main():
    # Create a NATS client instance
    nc = NATS()

    # Connect to NATS server
    await nc.connect(servers=["nats://localhost:4222"])

    # Create JetStream context
    js = nc.jetstream()

    stream_name = "example_stream"

    # Check if the stream already exists
    try:
        stream_info = await js.stream_info(stream_name)
        # If it exists, delete the stream
        await js.delete_stream(stream_name)
        print(f"Existing stream '{stream_name}' deleted.")
    except Exception as e:
        # Stream does not exist
        print(f"Stream '{stream_name}' does not exist. Proceeding to create a new one.")

    # Define stream configuration with correct enums and integer max_age
    config = StreamConfig(
        name=stream_name,                    # Stream name
        subjects=["my.subject.*"],           # Subjects to subscribe
        retention=RetentionPolicy.LIMITS,    # Retention policy: LIMITS, INTEREST, or WORK_QUEUE
        storage=StorageType.FILE,            # Storage type: MEMORY or FILE
        max_age=9 * 10**9,                   # Max age in nanoseconds (9 seconds)
        max_msgs=1000,                       # Max number of messages
        max_bytes=10 * 1024 * 1024,          # Max storage size in bytes (10MB)
    )

    # Add stream to JetStream context
    try:
        await js.add_stream(config=config)
        print(f"Stream '{config.name}' created successfully.")
    except Exception as e:
        print(f"Error creating stream: {e}")

    # Close the NATS connection
    await nc.close()

if __name__ == '__main__':
    asyncio.run(main())

				
			
테스트 실행
				
					# Run the setup
python stream.py
				
			

4. Publish-Subscribe

발행자 : 특정 주제로 메시지를 발행.
구독자 : 특정 주제를 구독해서 메시지를 수신.
발행자 (publisher.py)
				
					import asyncio
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers

async def publish_messages(js, subject):
    """
    Publishes a series of messages to the specified subject.
    """
    try:
        for i in range(1, 6):
            message = f"Message {i}"
            # Publish the message to the subject
            await js.publish(subject, message.encode())
            print(f"Published: {message}")
            await asyncio.sleep(1)  # Wait for a second before sending the next message
    except Exception as e:
        print(f"Error publishing message: {e}")

async def main():
    """
    Main coroutine that sets up the NATS client, connects to the server,
    and publishes messages to a subject.
    """
    nc = NATS()

    try:
        # Connect to the NATS server
        await nc.connect(servers=["nats://localhost:4222"])
        print("Publisher connected to NATS server.")

        # Create JetStream context
        js = nc.jetstream()

        # Define the subject to publish to
        subject = "my.subject.test"

        # Publish messages
        await publish_messages(js, subject)

    except ErrNoServers as e:
        print(f"Could not connect to NATS server: {e}")
    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        # Close the NATS connection
        await nc.close()
        print("Publisher disconnected from NATS server.")

if __name__ == '__main__':
    asyncio.run(main())

				
			
구독자 (subscriber.py)
				
					import asyncio
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers

async def message_handler(msg):
    """
    Callback function to handle received messages.
    """
    data = msg.data.decode()
    print(f"Received a message on '{msg.subject}': {data}")
    await msg.ack()  # Acknowledge the message

async def subscribe_to_subject(js, subject):
    """
    Subscribes to the specified subject and handles incoming messages.
    """
    try:
        # Subscribe to the subject without a queue group
        await js.subscribe(subject, cb=message_handler)
        print(f"Subscriber is listening on '{subject}'.")
    except Exception as e:
        print(f"Error subscribing to subject: {e}")

async def main():
    """
    Main coroutine that sets up the NATS client, connects to the server,
    and subscribes to a subject to receive messages.
    """
    nc = NATS()

    try:
        # Connect to the NATS server
        await nc.connect(servers=["nats://localhost:4222"])
        print("Subscriber connected to NATS server.")

        # Create JetStream context
        js = nc.jetstream()

        # Define the subject to subscribe to
        subject = "my.subject.*"

        # Subscribe to the subject
        await subscribe_to_subject(js, subject)

        # Keep the subscriber running to listen for incoming messages
        print("Subscriber is running. Press Ctrl+C to exit.")
        while True:
            await asyncio.sleep(1)

    except ErrNoServers as e:
        print(f"Could not connect to NATS server: {e}")
    except asyncio.CancelledError:
        pass  # Handle task cancellation gracefully
    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        # Close the NATS connection
        await nc.close()
        print("Subscriber disconnected from NATS server.")

if __name__ == '__main__':
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\nSubscriber stopped by user.")

				
			
테스트 실행
				
					# Run multiple subscribers in several different terminals
python subscriber.py
python subscriber.py
python subscriber.py

# In another terminal, Run the Publisher
python publisher.py
				
			

5. Request-Reply

요청자 : 특정 주제로 요청 메시지를 송신.
응답자 : 특정 주제를 구독하고 요청에 응답.
요청자 (requester.py)
				
					import asyncio
import nats
from nats.aio.msg import Msg
import uuid
from functools import partial

# Subscribe to the reply subject to receive responses from consumers
async def response_handler(msg: Msg, responses: list):
    # Decode the response message
    response = msg.data.decode()
    print(f"Received response: {response}")
    responses.append(response)

async def collect_responses(responses):
    # Wait until the expected number of responses are received
    expected_responses = 1  # Set this based on the number of consumers
    while len(responses) < expected_responses:
        await asyncio.sleep(0.1)
    print("Collected all responses")

async def main():
    # Connect to the NATS server
    nc = await nats.connect("nats://localhost:4222")

    # Create a JetStream context
    js = nc.jetstream()

    # Create a unique inbox (reply subject) for receiving responses
    reply_subject = nc.new_inbox()
    responses = []

    # Subscribe to the reply subject
    handler = partial(response_handler, responses=responses)
    await nc.subscribe(reply_subject, cb=handler)

    # Include the reply subject in the message headers
    headers = {'reply-to': reply_subject}

    # Publish a request message with the reply subject included in headers
    await js.publish(
        subject='my.subject.requests',
        payload=b'Hello, Consumers!',
        headers=headers
    )
    print("Published request with reply subject in headers:", reply_subject)

    # Wait for responses with a timeout
    try:
        await asyncio.wait_for(collect_responses(responses), timeout=5)
    except asyncio.TimeoutError:
        print("Timed out waiting for responses")

    # Print all collected responses
    print("All responses:", responses)

    # Close the NATS connection
    await nc.close()

if __name__ == '__main__':
    asyncio.run(main())

				
			
응답자 (replier.py)
				
					import asyncio
import nats
from nats.aio.msg import Msg
import uuid
from functools import partial

# Define the message handler coroutine
async def message_handler(msg: Msg, consumer_name: str, nc: nats.aio.client.Client):
    # Decode and process the message
    request = msg.data.decode()
    print(f"{consumer_name} received request: {request}")

    # Extract the reply subject from the message headers
    reply_subject = msg.header.get('reply-to') if msg.header else None

    # Generate a response
    response = f"Response from {consumer_name} : {request.upper()}"

    # Send the response back to the reply subject if it exists
    if reply_subject:
        await nc.publish(reply_subject, response.encode())
        print(f"{consumer_name} sent response")

    # Acknowledge the message to confirm processing
    await msg.ack()

async def main():
    # Connect to the NATS server
    nc = await nats.connect("nats://localhost:4222")

    # Create a JetStream context
    js = nc.jetstream()

    # Generate a unique durable consumer name
    consumer_name = f'consumer_{uuid.uuid4()}'

    print(f"{consumer_name} is listening for messages...")

    # Subscribe to the 'requests' subject using push-based subscription
    handler = partial(message_handler, consumer_name=consumer_name, nc=nc)
    await js.subscribe(
        subject='my.subject.requests',
        durable=consumer_name,
        stream='example_stream',
        cb=handler,
        manual_ack=True
    )

    # Keep the consumer running
    await asyncio.Event().wait()

if __name__ == '__main__':
    asyncio.run(main())

				
			
테스트 실행
				
					# Run the Subscriber
python replier.py

# In another terminal, Run the Publisher
python requester.py
				
			

6. Queue Groups (로드 밸런싱)

여러 작업자(워커)가 같은 큐 그룹에 가입하여 작업을 분산 처리.
메시지는 큐 그룹 내의 한 작업자에게만 전달됨.
작업 발행자 (task_publisher.py)
				
					import asyncio
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers

async def publish_messages(js, subject):
    """
    Publishes a series of messages to the specified subject.
    """
    try:
        for i in range(1, 6):
            message = f"Message {i}"
            # Publish the message to the subject
            await js.publish(subject, message.encode())
            print(f"Published: {message}")
            await asyncio.sleep(1)  # Wait for a second before sending the next message
    except Exception as e:
        print(f"Error publishing message: {e}")

async def main():
    """
    Main coroutine that sets up the NATS client, connects to the server,
    and publishes messages to a subject.
    """
    nc = NATS()

    try:
        # Connect to the NATS server
        await nc.connect(servers=["nats://localhost:4222"])
        print("Publisher connected to NATS server.")

        # Create JetStream context
        js = nc.jetstream()

        # Define the subject to publish to
        subject = "my.subject.test"

        # Publish messages
        await publish_messages(js, subject)

    except ErrNoServers as e:
        print(f"Could not connect to NATS server: {e}")
    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        # Close the NATS connection
        await nc.close()
        print("Publisher disconnected from NATS server.")

if __name__ == '__main__':
    asyncio.run(main())

				
			
작업자 (worker.py)
				
					import asyncio
from nats.aio.client import Client as NATS
from nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers
from functools import partial

async def message_handler(msg, name):
    """
    Callback function to handle received messages.
    """
    data = msg.data.decode()
    print(f"[{name}] Received a message on '{msg.subject}': {data}")
    await msg.ack()  # Acknowledge the message

async def subscribe_to_subject(js, subject, queue_group, name):
    """
    Subscribes to the specified subject with a given queue group and handles incoming messages.
    """
    handler = partial(message_handler, name=name)
    try:
        # Subscribe to the subject with the specified queue group
        await js.subscribe(subject, queue=queue_group, cb=handler)
        print(f"Subscriber is listening on '{subject}' with queue group '{queue_group}'.")
    except Exception as e:
        print(f"Error subscribing to subject: {e}")

async def main(name):
    """
    Main coroutine that sets up the NATS client, connects to the server,
    and subscribes to a subject to receive messages.
    """
    nc = NATS()

    try:
        # Connect to the NATS server
        await nc.connect(servers=["nats://nats.nats:4222"])
        print("Subscriber connected to NATS server.")

        # Create JetStream context
        js = nc.jetstream()

        # Define the subject to subscribe to and the queue group
        subject = "my.subject.*"
        queue_group = "workers"

        # Subscribe to the subject
        await subscribe_to_subject(js, subject, queue_group, name)

        # Keep the subscriber running to listen for incoming messages
        print(f"Subscriber '{name}' is running. Press Ctrl+C to exit.")
        while True:
            await asyncio.sleep(1)

    except ErrNoServers as e:
        print(f"Could not connect to NATS server: {e}")
    except asyncio.CancelledError:
        pass  # Handle task cancellation gracefully
    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        # Close the NATS connection
        await nc.close()
        print("Subscriber disconnected from NATS server.")

if __name__ == '__main__':
    # Define the queue group name
    queue_group = "order_group"

    # Create two consumer tasks in the same queue group
    consumers = [
        main("Consumer1"),
        main("Consumer2"),
    ]

    # Run both consumers concurrently
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(asyncio.gather(*consumers))
    except KeyboardInterrupt:
        print("Consumers are shutting down.")

				
			
테스트 실행
				
					# Start multiple workers
python worker.py

# In another terminal, Run the Task Publisher
python task_publisher.py
				
			

답글 남기기

이메일 주소는 공개되지 않습니다. 필수 필드는 *로 표시됩니다