{"id":504,"date":"2024-11-11T06:05:01","date_gmt":"2024-11-11T06:05:01","guid":{"rendered":"https:\/\/realstudy.net\/?p=504"},"modified":"2024-11-15T05:10:49","modified_gmt":"2024-11-15T05:10:49","slug":"nats-jetstream-2","status":"publish","type":"post","link":"https:\/\/realstudy.net\/?p=504","title":{"rendered":"NATS &#8211; JetStream"},"content":{"rendered":"\t\t<div data-elementor-type=\"wp-post\" data-elementor-id=\"504\" class=\"elementor elementor-504\" data-elementor-post-type=\"post\">\n\t\t\t\t<div class=\"elementor-element elementor-element-6c8f41d e-flex e-con-boxed e-con e-parent\" data-id=\"6c8f41d\" data-element_type=\"container\">\n\t\t\t\t\t<div class=\"e-con-inner\">\n\t\t\t\t<div class=\"elementor-element elementor-element-a926922 elementor-widget elementor-widget-text-editor\" data-id=\"a926922\" data-element_type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<div>NATS JetStream is an extension that provides advanced message streaming and data storage capabilities, expanding upon NATS&#8217;s core messaging functionality to meet more complex application requirements. JetStream is primarily used for <strong>persistent message storage, durability, message reprocessing, and stream processing<\/strong>.<\/div>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t<div class=\"elementor-element elementor-element-ff8f01c e-flex e-con-boxed e-con e-parent\" data-id=\"ff8f01c\" data-element_type=\"container\">\n\t\t\t\t\t<div class=\"e-con-inner\">\n\t\t\t\t<div class=\"elementor-element elementor-element-b22c4f6 elementor-widget elementor-widget-text-editor\" data-id=\"b22c4f6\" data-element_type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<div><strong>Key Use Cases for JetStream.<\/strong><\/div>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-d39e918 elementor-widget elementor-widget-text-editor\" data-id=\"d39e918\" data-element_type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<ol><li><strong>Durable Messaging:<\/strong> JetStream persistently stores messages on disk, allowing clients to retrieve messages later. This ensures no message loss, enabling access to messages even if subscribers were disconnected at the time of publication. This functionality is managed through streams, each of which defines a message retention policy and can set limitations such as retention duration, storage size, and subscriber interest.<\/li><li><strong>Message Reprocessing:<\/strong> JetStream provides the ability to replay previously sent messages, which is useful in applications requiring data analysis or message reprocessing. Streams in JetStream consume standard NATS subjects, capturing all messages published to a given subject within a defined storage system that can be replayed later.<\/li><li><strong>Stream Processing:<\/strong> JetStream enables managing sequential messages as streams, helping in the construction of complex data flows and stream-processing applications. Streams provide a structure for message retention, allowing applications to efficiently manage and analyze large volumes of data.<\/li><li><strong>Message Order Guarantee:<\/strong> JetStream ensures message order, supporting applications where consistency in message sequence is crucial for correct processing.<\/li><li><strong>Enhanced Publishing with Acknowledgments:<\/strong> While standard NATS publish calls can be used to send messages to a stream, JetStream-specific publishing calls are more reliable. This method provides a confirmation from the JetStream server that messages have been successfully stored, offering a high level of delivery guarantee.<\/li><li><strong>Scalability and Performance:<\/strong> JetStream maintains NATS&#8217;s lightweight and high-performance characteristics while adding storage and processing capabilities to support more complex tasks. It scales well to meet the demands of complex distributed systems, ensuring low-latency messaging. This translation covers each aspect in detail, matching your original structure and intent. Let me know if there\u2019s anything else you\u2019d like translated or expanded!<\/li><\/ol>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-a926b69 elementor-widget elementor-widget-text-editor\" data-id=\"a926b69\" data-element_type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<div>These advanced features make JetStream especially valuable for <strong>log management, event sourcing, streaming data analysis<\/strong>, and as a <strong>message queue<\/strong> in distributed systems. The ability to store messages with custom retention policies and to provide reliable message acknowledgments further solidifies JetStream\u2019s role in critical application infrastructure.<\/div>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t<div class=\"elementor-element elementor-element-b34a46c e-flex e-con-boxed e-con e-parent\" data-id=\"b34a46c\" data-element_type=\"container\">\n\t\t\t\t\t<div class=\"e-con-inner\">\n\t\t\t\t<div class=\"elementor-element elementor-element-6aa75ac elementor-widget elementor-widget-heading\" data-id=\"6aa75ac\" data-element_type=\"widget\" data-widget_type=\"heading.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t<h2 class=\"elementor-heading-title elementor-size-default\">1. Running a NATS server<\/h2>\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-ff7d644 elementor-widget elementor-widget-text-editor\" data-id=\"ff7d644\" data-element_type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<div><strong>Download NATS Server Binaries:<\/strong> Download the latest version of the NATS server binaries from GitHub.<\/div>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-c82b06b elementor-widget elementor-widget-code-highlight\" data-id=\"c82b06b\" data-element_type=\"widget\" data-widget_type=\"code-highlight.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t<div class=\"prismjs-tomorrow copy-to-clipboard word-wrap\">\n\t\t\t<pre data-line=\"\" class=\"highlight-height language-bash line-numbers\">\n\t\t\t\t<code readonly=\"true\" class=\"language-bash\">\n\t\t\t\t\t<xmp>wget https:\/\/github.com\/nats-io\/nats-server\/releases\/download\/v2.10.2\/nats-server-v2.10.2-linux-amd64.tar.gz\n<\/xmp>\n\t\t\t\t<\/code>\n\t\t\t<\/pre>\n\t\t<\/div>\n\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-35d2e40 elementor-widget elementor-widget-text-editor\" data-id=\"35d2e40\" data-element_type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<div><strong>Unzip:<\/strong> Unzip the downloaded file.<\/div>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-b0347c9 elementor-widget elementor-widget-code-highlight\" data-id=\"b0347c9\" data-element_type=\"widget\" data-widget_type=\"code-highlight.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t<div class=\"prismjs-tomorrow copy-to-clipboard word-wrap\">\n\t\t\t<pre data-line=\"\" class=\"highlight-height language-bash line-numbers\">\n\t\t\t\t<code readonly=\"true\" class=\"language-bash\">\n\t\t\t\t\t<xmp>tar -xzf nats-server-v2.10.2-linux-amd64.tar.gz\n<\/xmp>\n\t\t\t\t<\/code>\n\t\t\t<\/pre>\n\t\t<\/div>\n\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-8de5381 elementor-widget elementor-widget-text-editor\" data-id=\"8de5381\" data-element_type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<div><strong>Run Server:<\/strong> Runs the NATS server with default settings.<\/div>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-d767afd elementor-widget elementor-widget-code-highlight\" data-id=\"d767afd\" data-element_type=\"widget\" data-widget_type=\"code-highlight.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t<div class=\"prismjs-tomorrow copy-to-clipboard word-wrap\">\n\t\t\t<pre data-line=\"\" class=\"highlight-height language-bash line-numbers\">\n\t\t\t\t<code readonly=\"true\" class=\"language-bash\">\n\t\t\t\t\t<xmp>nats-server-v2.10.2-linux-amd64\/nats-server<\/xmp>\n\t\t\t\t<\/code>\n\t\t\t<\/pre>\n\t\t<\/div>\n\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t<div class=\"elementor-element elementor-element-01f4dae e-flex e-con-boxed e-con e-parent\" data-id=\"01f4dae\" data-element_type=\"container\">\n\t\t\t\t\t<div class=\"e-con-inner\">\n\t\t\t\t<div class=\"elementor-element elementor-element-54854e8 elementor-widget elementor-widget-heading\" data-id=\"54854e8\" data-element_type=\"widget\" data-widget_type=\"heading.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t<h2 class=\"elementor-heading-title elementor-size-default\">2. Python Test<\/h2>\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-de3bb11 elementor-widget elementor-widget-text-editor\" data-id=\"de3bb11\" data-element_type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<div>All examples use Python&#8217;s nats-py library. The nats-py library can be installed with the following command<\/div>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-59a345a elementor-widget elementor-widget-code-highlight\" data-id=\"59a345a\" data-element_type=\"widget\" data-widget_type=\"code-highlight.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t<div class=\"prismjs-tomorrow copy-to-clipboard word-wrap\">\n\t\t\t<pre data-line=\"\" class=\"highlight-height language-bash line-numbers\">\n\t\t\t\t<code readonly=\"true\" class=\"language-bash\">\n\t\t\t\t\t<xmp>pip install nats-py\n<\/xmp>\n\t\t\t\t<\/code>\n\t\t\t<\/pre>\n\t\t<\/div>\n\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t<div class=\"elementor-element elementor-element-234f958 e-flex e-con-boxed e-con e-parent\" data-id=\"234f958\" data-element_type=\"container\">\n\t\t\t\t\t<div class=\"e-con-inner\">\n\t\t\t\t<div class=\"elementor-element elementor-element-0f0d92f elementor-widget elementor-widget-heading\" data-id=\"0f0d92f\" data-element_type=\"widget\" data-widget_type=\"heading.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t<h2 class=\"elementor-heading-title elementor-size-default\">3. JetStream<\/h2>\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-1f6fbe2 elementor-widget elementor-widget-text-editor\" data-id=\"1f6fbe2\" data-element_type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<div>In NATS JetStream, a stream is a core concept that extends beyond the basic NATS messaging capabilities, offering advanced message storage and management functionalities. A stream acts as a logical message repository that defines how messages are retained and processed, providing options for reuse or reprocessing.<br \/>A stream in NATS JetStream captures all messages published to a specific subject (topic) and persistently stores them, allowing applications to access these messages later. This enables JetStream to offer persistent and durable messaging, along with replayable messaging, thereby facilitating more reliable message flow management.<\/div>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-fd58e91 elementor-widget elementor-widget-text-editor\" data-id=\"fd58e91\" data-element_type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<div><strong>Key Features of Streams in JetStream<\/strong><\/div>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-fd0319e elementor-widget elementor-widget-text-editor\" data-id=\"fd0319e\" data-element_type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<ol><li><strong>Message Retention<\/strong>:<br \/>&gt; Streams store messages published to specific subjects, which can later be <strong>replayed<\/strong>.<br \/>&gt; A <strong>retention policy<\/strong> defines how long messages are kept, with options based on <strong>duration<\/strong>, <strong>message count<\/strong>, or <strong>total storage size<\/strong>.<\/li><li><strong>Retention Policies<\/strong>:<br \/>&gt; <strong>Limit-Based Retention<\/strong>: Retains messages until specific limits, such as message count or stream size, are reached.<br \/>&gt; <strong>Interest-Based Retention<\/strong>: Retains messages as long as there are active subscribers.<br \/>&gt; <strong>Work Queue Retention<\/strong>: Deletes messages once they are successfully delivered to a consumer.<\/li><li><strong>Message Order Guarantee<\/strong>:<br \/>&gt; Streams store messages in the order they are published, ensuring message sequence consistency. This supports applications where message order is critical for accurate processing.<\/li><li><strong>Storage Options<\/strong>: JetStream provides various storage options for streams.<br \/>&gt; <strong>Memory Storage<\/strong>: Stores messages in memory for quick access but is volatile.<br \/>&gt; <strong>File Storage<\/strong>: Stores messages on disk, ensuring durability and persistence.<\/li><li><strong>Subjects<\/strong>:<br \/>&gt; A stream is associated with one or more subjects. All messages published to these subjects are stored in the stream.<br \/>&gt; For example, setting up a stream for the subject orders.* will capture all messages published to subjects like orders. created and orders. updated.<\/li><li><strong>Stream Configuration<\/strong>: When creating a stream, multiple parameters can be specified.<br \/>&gt; <strong>Name<\/strong>: A unique identifier for the stream.<br \/>&gt; <strong>Subjects<\/strong>: The subject(s) or wildcard patterns for capturing messages.<br \/>&gt; <strong>Retention Policy<\/strong>: Determines how long messages are retained.<br \/>&gt; <strong>Storage Type:<\/strong> Choice of memory or file storage.<br \/>&gt; <strong>Maximum Age<\/strong>: Specifies how long each message is retained.<br \/>&gt; <strong>Limits<\/strong>: Defines the maximum message count or storage capacity for the stream.<\/li><\/ol>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-13cc847 elementor-widget elementor-widget-text-editor\" data-id=\"13cc847\" data-element_type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<div><strong>Creating a Stream in NATS JetStream<\/strong> (stream.py)<\/div>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-2d471e7 elementor-widget elementor-widget-code-highlight\" data-id=\"2d471e7\" data-element_type=\"widget\" data-widget_type=\"code-highlight.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t<div class=\"prismjs-tomorrow copy-to-clipboard word-wrap\">\n\t\t\t<pre data-line=\"\" class=\"highlight-height language-python line-numbers\">\n\t\t\t\t<code readonly=\"true\" class=\"language-python\">\n\t\t\t\t\t<xmp>import asyncio\nfrom nats.aio.client import Client as NATS\nfrom nats.js.api import StreamConfig, RetentionPolicy, StorageType\n\nasync def main():\n    # Create a NATS client instance\n    nc = NATS()\n\n    # Connect to NATS server\n    await nc.connect(servers=[\"nats:\/\/localhost:4222\"])\n\n    # Create JetStream context\n    js = nc.jetstream()\n\n    stream_name = \"example_stream\"\n\n    # Check if the stream already exists\n    try:\n        stream_info = await js.stream_info(stream_name)\n        # If it exists, delete the stream\n        await js.delete_stream(stream_name)\n        print(f\"Existing stream '{stream_name}' deleted.\")\n    except Exception as e:\n        # Stream does not exist\n        print(f\"Stream '{stream_name}' does not exist. Proceeding to create a new one.\")\n\n    # Define stream configuration with correct enums and integer max_age\n    config = StreamConfig(\n        name=stream_name,                    # Stream name\n        subjects=[\"my.subject.*\"],           # Subjects to subscribe\n        retention=RetentionPolicy.LIMITS,    # Retention policy: LIMITS, INTEREST, or WORK_QUEUE\n        storage=StorageType.FILE,            # Storage type: MEMORY or FILE\n        max_age=9 * 10**9,                   # Max age in nanoseconds (9 seconds)\n        max_msgs=1000,                       # Max number of messages\n        max_bytes=10 * 1024 * 1024,          # Max storage size in bytes (10MB)\n    )\n\n    # Add stream to JetStream context\n    try:\n        await js.add_stream(config=config)\n        print(f\"Stream '{config.name}' created successfully.\")\n    except Exception as e:\n        print(f\"Error creating stream: {e}\")\n\n    # Close the NATS connection\n    await nc.close()\n\nif __name__ == '__main__':\n    asyncio.run(main())\n<\/xmp>\n\t\t\t\t<\/code>\n\t\t\t<\/pre>\n\t\t<\/div>\n\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-8984f72 elementor-widget elementor-widget-text-editor\" data-id=\"8984f72\" data-element_type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<div><strong>How to run<\/strong><\/div>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-22f7830 elementor-widget elementor-widget-code-highlight\" data-id=\"22f7830\" data-element_type=\"widget\" data-widget_type=\"code-highlight.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t<div class=\"prismjs-tomorrow copy-to-clipboard word-wrap\">\n\t\t\t<pre data-line=\"\" class=\"highlight-height language-python line-numbers\">\n\t\t\t\t<code readonly=\"true\" class=\"language-python\">\n\t\t\t\t\t<xmp># Run the setup\npython stream.py<\/xmp>\n\t\t\t\t<\/code>\n\t\t\t<\/pre>\n\t\t<\/div>\n\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t<div class=\"elementor-element elementor-element-dcc5250 e-flex e-con-boxed e-con e-parent\" data-id=\"dcc5250\" data-element_type=\"container\">\n\t\t\t\t\t<div class=\"e-con-inner\">\n\t\t\t\t<div class=\"elementor-element elementor-element-f6b204f elementor-widget elementor-widget-heading\" data-id=\"f6b204f\" data-element_type=\"widget\" data-widget_type=\"heading.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t<h2 class=\"elementor-heading-title elementor-size-default\">4. Publish-Subscribe<\/h2>\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-8a741b4 elementor-widget elementor-widget-text-editor\" data-id=\"8a741b4\" data-element_type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<div><div>A\u00a0<strong>Publisher<\/strong>\u00a0sends messages to a specific subject.<\/div><div><strong>Subscribers<\/strong>\u00a0listen to that subject and receive messages.<\/div>.<\/div>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-fb8b670 elementor-widget elementor-widget-image\" data-id=\"fb8b670\" data-element_type=\"widget\" data-widget_type=\"image.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t<img fetchpriority=\"high\" decoding=\"async\" width=\"359\" height=\"152\" src=\"https:\/\/realstudy.net\/wp-content\/uploads\/2024\/11\/nats_pubsub.jpg\" class=\"attachment-large size-large wp-image-384\" alt=\"\" srcset=\"https:\/\/realstudy.net\/wp-content\/uploads\/2024\/11\/nats_pubsub.jpg 359w, https:\/\/realstudy.net\/wp-content\/uploads\/2024\/11\/nats_pubsub-300x127.jpg 300w\" sizes=\"(max-width: 359px) 100vw, 359px\" \/>\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-1feb62d elementor-widget elementor-widget-text-editor\" data-id=\"1feb62d\" data-element_type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<div><strong>Publisher<\/strong> (publisher.py)<\/div>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-c678424 elementor-widget elementor-widget-code-highlight\" data-id=\"c678424\" data-element_type=\"widget\" data-widget_type=\"code-highlight.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t<div class=\"prismjs-tomorrow copy-to-clipboard word-wrap\">\n\t\t\t<pre data-line=\"\" class=\"highlight-height language-python line-numbers\">\n\t\t\t\t<code readonly=\"true\" class=\"language-python\">\n\t\t\t\t\t<xmp>import asyncio\nfrom nats.aio.client import Client as NATS\nfrom nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers\n\nasync def publish_messages(js, subject):\n    \"\"\"\n    Publishes a series of messages to the specified subject.\n    \"\"\"\n    try:\n        for i in range(1, 6):\n            message = f\"Message {i}\"\n            # Publish the message to the subject\n            await js.publish(subject, message.encode())\n            print(f\"Published: {message}\")\n            await asyncio.sleep(1)  # Wait for a second before sending the next message\n    except Exception as e:\n        print(f\"Error publishing message: {e}\")\n\nasync def main():\n    \"\"\"\n    Main coroutine that sets up the NATS client, connects to the server,\n    and publishes messages to a subject.\n    \"\"\"\n    nc = NATS()\n\n    try:\n        # Connect to the NATS server\n        await nc.connect(servers=[\"nats:\/\/localhost:4222\"])\n        print(\"Publisher connected to NATS server.\")\n\n        # Create JetStream context\n        js = nc.jetstream()\n\n        # Define the subject to publish to\n        subject = \"my.subject.test\"\n\n        # Publish messages\n        await publish_messages(js, subject)\n\n    except ErrNoServers as e:\n        print(f\"Could not connect to NATS server: {e}\")\n    except Exception as e:\n        print(f\"An error occurred: {e}\")\n    finally:\n        # Close the NATS connection\n        await nc.close()\n        print(\"Publisher disconnected from NATS server.\")\n\nif __name__ == '__main__':\n    asyncio.run(main())\n<\/xmp>\n\t\t\t\t<\/code>\n\t\t\t<\/pre>\n\t\t<\/div>\n\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-309b4fd elementor-widget elementor-widget-text-editor\" data-id=\"309b4fd\" data-element_type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<div><strong>Subscriber<\/strong> (publisher.py)<\/div>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-001c13e elementor-widget elementor-widget-code-highlight\" data-id=\"001c13e\" data-element_type=\"widget\" data-widget_type=\"code-highlight.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t<div class=\"prismjs-tomorrow copy-to-clipboard word-wrap\">\n\t\t\t<pre data-line=\"\" class=\"highlight-height language-python line-numbers\">\n\t\t\t\t<code readonly=\"true\" class=\"language-python\">\n\t\t\t\t\t<xmp>import asyncio\nfrom nats.aio.client import Client as NATS\nfrom nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers\n\nasync def message_handler(msg):\n    \"\"\"\n    Callback function to handle received messages.\n    \"\"\"\n    data = msg.data.decode()\n    print(f\"Received a message on '{msg.subject}': {data}\")\n    await msg.ack()  # Acknowledge the message\n\nasync def subscribe_to_subject(js, subject):\n    \"\"\"\n    Subscribes to the specified subject and handles incoming messages.\n    \"\"\"\n    try:\n        # Subscribe to the subject without a queue group\n        await js.subscribe(subject, cb=message_handler)\n        print(f\"Subscriber is listening on '{subject}'.\")\n    except Exception as e:\n        print(f\"Error subscribing to subject: {e}\")\n\nasync def main():\n    \"\"\"\n    Main coroutine that sets up the NATS client, connects to the server,\n    and subscribes to a subject to receive messages.\n    \"\"\"\n    nc = NATS()\n\n    try:\n        # Connect to the NATS server\n        await nc.connect(servers=[\"nats:\/\/localhost:4222\"])\n        print(\"Subscriber connected to NATS server.\")\n\n        # Create JetStream context\n        js = nc.jetstream()\n\n        # Define the subject to subscribe to\n        subject = \"my.subject.*\"\n\n        # Subscribe to the subject\n        await subscribe_to_subject(js, subject)\n\n        # Keep the subscriber running to listen for incoming messages\n        print(\"Subscriber is running. Press Ctrl+C to exit.\")\n        while True:\n            await asyncio.sleep(1)\n\n    except ErrNoServers as e:\n        print(f\"Could not connect to NATS server: {e}\")\n    except asyncio.CancelledError:\n        pass  # Handle task cancellation gracefully\n    except Exception as e:\n        print(f\"An error occurred: {e}\")\n    finally:\n        # Close the NATS connection\n        await nc.close()\n        print(\"Subscriber disconnected from NATS server.\")\n\nif __name__ == '__main__':\n    try:\n        asyncio.run(main())\n    except KeyboardInterrupt:\n        print(\"\\nSubscriber stopped by user.\")\n<\/xmp>\n\t\t\t\t<\/code>\n\t\t\t<\/pre>\n\t\t<\/div>\n\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-e33a3fa elementor-widget elementor-widget-text-editor\" data-id=\"e33a3fa\" data-element_type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<div><strong>How to run<\/strong><\/div>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-abd2bc9 elementor-widget elementor-widget-code-highlight\" data-id=\"abd2bc9\" data-element_type=\"widget\" data-widget_type=\"code-highlight.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t<div class=\"prismjs-tomorrow copy-to-clipboard word-wrap\">\n\t\t\t<pre data-line=\"\" class=\"highlight-height language-python line-numbers\">\n\t\t\t\t<code readonly=\"true\" class=\"language-python\">\n\t\t\t\t\t<xmp># Run multiple subscribers in several different terminals\npython subscriber.py\npython subscriber.py\npython subscriber.py\n\n# In another terminal, Run the Publisher\npython publisher.py<\/xmp>\n\t\t\t\t<\/code>\n\t\t\t<\/pre>\n\t\t<\/div>\n\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t<div class=\"elementor-element elementor-element-b62153a e-flex e-con-boxed e-con e-parent\" data-id=\"b62153a\" data-element_type=\"container\">\n\t\t\t\t\t<div class=\"e-con-inner\">\n\t\t\t\t<div class=\"elementor-element elementor-element-e443d4f elementor-widget elementor-widget-heading\" data-id=\"e443d4f\" data-element_type=\"widget\" data-widget_type=\"heading.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t<h2 class=\"elementor-heading-title elementor-size-default\">5. Request-Reply<\/h2>\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-af8c21f elementor-widget elementor-widget-text-editor\" data-id=\"af8c21f\" data-element_type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<div><strong>Requester<\/strong>\u00a0sends a request message to a subject and waits for a reply.<\/div><div><strong>Replier<\/strong> subscribes to that subject and responds to requests.<\/div>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-4e81ca0 elementor-widget elementor-widget-image\" data-id=\"4e81ca0\" data-element_type=\"widget\" data-widget_type=\"image.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t<img decoding=\"async\" width=\"359\" height=\"195\" src=\"https:\/\/realstudy.net\/wp-content\/uploads\/2024\/11\/nats_reqrep.jpg\" class=\"attachment-large size-large wp-image-389\" alt=\"\" srcset=\"https:\/\/realstudy.net\/wp-content\/uploads\/2024\/11\/nats_reqrep.jpg 359w, https:\/\/realstudy.net\/wp-content\/uploads\/2024\/11\/nats_reqrep-300x163.jpg 300w\" sizes=\"(max-width: 359px) 100vw, 359px\" \/>\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-c625ae4 elementor-widget elementor-widget-text-editor\" data-id=\"c625ae4\" data-element_type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<div><strong>Requester<\/strong> (requester.py)<\/div>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-6031b3d elementor-widget elementor-widget-code-highlight\" data-id=\"6031b3d\" data-element_type=\"widget\" data-widget_type=\"code-highlight.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t<div class=\"prismjs-tomorrow copy-to-clipboard word-wrap\">\n\t\t\t<pre data-line=\"\" class=\"highlight-height language-python line-numbers\">\n\t\t\t\t<code readonly=\"true\" class=\"language-python\">\n\t\t\t\t\t<xmp>import asyncio\nimport nats\nfrom nats.aio.msg import Msg\nimport uuid\nfrom functools import partial\n\n# Subscribe to the reply subject to receive responses from consumers\nasync def response_handler(msg: Msg, responses: list):\n    # Decode the response message\n    response = msg.data.decode()\n    print(f\"Received response: {response}\")\n    responses.append(response)\n\nasync def collect_responses(responses):\n    # Wait until the expected number of responses are received\n    expected_responses = 1  # Set this based on the number of consumers\n    while len(responses) < expected_responses:\n        await asyncio.sleep(0.1)\n    print(\"Collected all responses\")\n\nasync def main():\n    # Connect to the NATS server\n    nc = await nats.connect(\"nats:\/\/localhost:4222\")\n\n    # Create a JetStream context\n    js = nc.jetstream()\n\n    # Create a unique inbox (reply subject) for receiving responses\n    reply_subject = nc.new_inbox()\n    responses = []\n\n    # Subscribe to the reply subject\n    handler = partial(response_handler, responses=responses)\n    await nc.subscribe(reply_subject, cb=handler)\n\n    # Include the reply subject in the message headers\n    headers = {'reply-to': reply_subject}\n\n    # Publish a request message with the reply subject included in headers\n    await js.publish(\n        subject='my.subject.requests',\n        payload=b'Hello, Consumers!',\n        headers=headers\n    )\n    print(\"Published request with reply subject in headers:\", reply_subject)\n\n    # Wait for responses with a timeout\n    try:\n        await asyncio.wait_for(collect_responses(responses), timeout=5)\n    except asyncio.TimeoutError:\n        print(\"Timed out waiting for responses\")\n\n    # Print all collected responses\n    print(\"All responses:\", responses)\n\n    # Close the NATS connection\n    await nc.close()\n\nif __name__ == '__main__':\n    asyncio.run(main())\n<\/xmp>\n\t\t\t\t<\/code>\n\t\t\t<\/pre>\n\t\t<\/div>\n\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-19a37d7 elementor-widget elementor-widget-text-editor\" data-id=\"19a37d7\" data-element_type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<div><strong>Replier<\/strong> (replier.py)<\/div>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-b67ba59 elementor-widget elementor-widget-code-highlight\" data-id=\"b67ba59\" data-element_type=\"widget\" data-widget_type=\"code-highlight.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t<div class=\"prismjs-tomorrow copy-to-clipboard word-wrap\">\n\t\t\t<pre data-line=\"\" class=\"highlight-height language-python line-numbers\">\n\t\t\t\t<code readonly=\"true\" class=\"language-python\">\n\t\t\t\t\t<xmp>import asyncio\nimport nats\nfrom nats.aio.msg import Msg\nimport uuid\nfrom functools import partial\n\n# Define the message handler coroutine\nasync def message_handler(msg: Msg, consumer_name: str, nc: nats.aio.client.Client):\n    # Decode and process the message\n    request = msg.data.decode()\n    print(f\"{consumer_name} received request: {request}\")\n\n    # Extract the reply subject from the message headers\n    reply_subject = msg.header.get('reply-to') if msg.header else None\n\n    # Generate a response\n    response = f\"Response from {consumer_name} : {request.upper()}\"\n\n    # Send the response back to the reply subject if it exists\n    if reply_subject:\n        await nc.publish(reply_subject, response.encode())\n        print(f\"{consumer_name} sent response\")\n\n    # Acknowledge the message to confirm processing\n    await msg.ack()\n\nasync def main():\n    # Connect to the NATS server\n    nc = await nats.connect(\"nats:\/\/localhost:4222\")\n\n    # Create a JetStream context\n    js = nc.jetstream()\n\n    # Generate a unique durable consumer name\n    consumer_name = f'consumer_{uuid.uuid4()}'\n\n    print(f\"{consumer_name} is listening for messages...\")\n\n    # Subscribe to the 'requests' subject using push-based subscription\n    handler = partial(message_handler, consumer_name=consumer_name, nc=nc)\n    await js.subscribe(\n        subject='my.subject.requests',\n        durable=consumer_name,\n        stream='example_stream',\n        cb=handler,\n        manual_ack=True\n    )\n\n    # Keep the consumer running\n    await asyncio.Event().wait()\n\nif __name__ == '__main__':\n    asyncio.run(main())\n<\/xmp>\n\t\t\t\t<\/code>\n\t\t\t<\/pre>\n\t\t<\/div>\n\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-4c7d57c elementor-widget elementor-widget-text-editor\" data-id=\"4c7d57c\" data-element_type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<div><strong>How to run<\/strong><\/div>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-b67b389 elementor-widget elementor-widget-code-highlight\" data-id=\"b67b389\" data-element_type=\"widget\" data-widget_type=\"code-highlight.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t<div class=\"prismjs-tomorrow copy-to-clipboard word-wrap\">\n\t\t\t<pre data-line=\"\" class=\"highlight-height language-python line-numbers\">\n\t\t\t\t<code readonly=\"true\" class=\"language-python\">\n\t\t\t\t\t<xmp># Run the Subscriber\npython replier.py\n\n# In another terminal, Run the Publisher\npython requester.py<\/xmp>\n\t\t\t\t<\/code>\n\t\t\t<\/pre>\n\t\t<\/div>\n\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t<div class=\"elementor-element elementor-element-7a66462 e-flex e-con-boxed e-con e-parent\" data-id=\"7a66462\" data-element_type=\"container\">\n\t\t\t\t\t<div class=\"e-con-inner\">\n\t\t\t\t<div class=\"elementor-element elementor-element-390ecb7 elementor-widget elementor-widget-heading\" data-id=\"390ecb7\" data-element_type=\"widget\" data-widget_type=\"heading.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t<h2 class=\"elementor-heading-title elementor-size-default\">6. Queue Groups (Load Balancing)<\/h2>\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-65385c4 elementor-widget elementor-widget-text-editor\" data-id=\"65385c4\" data-element_type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<div>Multiple workers subscribe to the same subject within a queue group.<\/div><div>Messages are load-balanced among the workers; each message is delivered to only one worker in the group.<\/div>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-e543fb7 elementor-widget elementor-widget-image\" data-id=\"e543fb7\" data-element_type=\"widget\" data-widget_type=\"image.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t<img decoding=\"async\" width=\"385\" height=\"152\" src=\"https:\/\/realstudy.net\/wp-content\/uploads\/2024\/11\/nats_queue.jpg\" class=\"attachment-large size-large wp-image-393\" alt=\"\" srcset=\"https:\/\/realstudy.net\/wp-content\/uploads\/2024\/11\/nats_queue.jpg 385w, https:\/\/realstudy.net\/wp-content\/uploads\/2024\/11\/nats_queue-300x118.jpg 300w\" sizes=\"(max-width: 385px) 100vw, 385px\" \/>\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-8100a40 elementor-widget elementor-widget-text-editor\" data-id=\"8100a40\" data-element_type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<div><strong>Task Publisher<\/strong> (task_publisher.py)<\/div>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-1ef9e4f elementor-widget elementor-widget-code-highlight\" data-id=\"1ef9e4f\" data-element_type=\"widget\" data-widget_type=\"code-highlight.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t<div class=\"prismjs-tomorrow copy-to-clipboard word-wrap\">\n\t\t\t<pre data-line=\"\" class=\"highlight-height language-python line-numbers\">\n\t\t\t\t<code readonly=\"true\" class=\"language-python\">\n\t\t\t\t\t<xmp>import asyncio\nfrom nats.aio.client import Client as NATS\nfrom nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers\n\nasync def publish_messages(js, subject):\n    \"\"\"\n    Publishes a series of messages to the specified subject.\n    \"\"\"\n    try:\n        for i in range(1, 6):\n            message = f\"Message {i}\"\n            # Publish the message to the subject\n            await js.publish(subject, message.encode())\n            print(f\"Published: {message}\")\n            await asyncio.sleep(1)  # Wait for a second before sending the next message\n    except Exception as e:\n        print(f\"Error publishing message: {e}\")\n\nasync def main():\n    \"\"\"\n    Main coroutine that sets up the NATS client, connects to the server,\n    and publishes messages to a subject.\n    \"\"\"\n    nc = NATS()\n\n    try:\n        # Connect to the NATS server\n        await nc.connect(servers=[\"nats:\/\/localhost:4222\"])\n        print(\"Publisher connected to NATS server.\")\n\n        # Create JetStream context\n        js = nc.jetstream()\n\n        # Define the subject to publish to\n        subject = \"my.subject.test\"\n\n        # Publish messages\n        await publish_messages(js, subject)\n\n    except ErrNoServers as e:\n        print(f\"Could not connect to NATS server: {e}\")\n    except Exception as e:\n        print(f\"An error occurred: {e}\")\n    finally:\n        # Close the NATS connection\n        await nc.close()\n        print(\"Publisher disconnected from NATS server.\")\n\nif __name__ == '__main__':\n    asyncio.run(main())\n<\/xmp>\n\t\t\t\t<\/code>\n\t\t\t<\/pre>\n\t\t<\/div>\n\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-9ffeba0 elementor-widget elementor-widget-text-editor\" data-id=\"9ffeba0\" data-element_type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<div><strong>Worker<\/strong> (worker.py)<\/div>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-6cc821d elementor-widget elementor-widget-code-highlight\" data-id=\"6cc821d\" data-element_type=\"widget\" data-widget_type=\"code-highlight.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t<div class=\"prismjs-tomorrow copy-to-clipboard word-wrap\">\n\t\t\t<pre data-line=\"\" class=\"highlight-height language-python line-numbers\">\n\t\t\t\t<code readonly=\"true\" class=\"language-python\">\n\t\t\t\t\t<xmp>import asyncio\nfrom nats.aio.client import Client as NATS\nfrom nats.aio.errors import ErrConnectionClosed, ErrTimeout, ErrNoServers\nfrom functools import partial\n\nasync def message_handler(msg, name):\n    \"\"\"\n    Callback function to handle received messages.\n    \"\"\"\n    data = msg.data.decode()\n    print(f\"[{name}] Received a message on '{msg.subject}': {data}\")\n    await msg.ack()  # Acknowledge the message\n\nasync def subscribe_to_subject(js, subject, queue_group, name):\n    \"\"\"\n    Subscribes to the specified subject with a given queue group and handles incoming messages.\n    \"\"\"\n    handler = partial(message_handler, name=name)\n    try:\n        # Subscribe to the subject with the specified queue group\n        await js.subscribe(subject, queue=queue_group, cb=handler)\n        print(f\"Subscriber is listening on '{subject}' with queue group '{queue_group}'.\")\n    except Exception as e:\n        print(f\"Error subscribing to subject: {e}\")\n\nasync def main(name):\n    \"\"\"\n    Main coroutine that sets up the NATS client, connects to the server,\n    and subscribes to a subject to receive messages.\n    \"\"\"\n    nc = NATS()\n\n    try:\n        # Connect to the NATS server\n        await nc.connect(servers=[\"nats:\/\/nats.nats:4222\"])\n        print(\"Subscriber connected to NATS server.\")\n\n        # Create JetStream context\n        js = nc.jetstream()\n\n        # Define the subject to subscribe to and the queue group\n        subject = \"my.subject.*\"\n        queue_group = \"workers\"\n\n        # Subscribe to the subject\n        await subscribe_to_subject(js, subject, queue_group, name)\n\n        # Keep the subscriber running to listen for incoming messages\n        print(f\"Subscriber '{name}' is running. Press Ctrl+C to exit.\")\n        while True:\n            await asyncio.sleep(1)\n\n    except ErrNoServers as e:\n        print(f\"Could not connect to NATS server: {e}\")\n    except asyncio.CancelledError:\n        pass  # Handle task cancellation gracefully\n    except Exception as e:\n        print(f\"An error occurred: {e}\")\n    finally:\n        # Close the NATS connection\n        await nc.close()\n        print(\"Subscriber disconnected from NATS server.\")\n\nif __name__ == '__main__':\n    # Define the queue group name\n    queue_group = \"order_group\"\n\n    # Create two consumer tasks in the same queue group\n    consumers = [\n        main(\"Consumer1\"),\n        main(\"Consumer2\"),\n    ]\n\n    # Run both consumers concurrently\n    loop = asyncio.get_event_loop()\n    try:\n        loop.run_until_complete(asyncio.gather(*consumers))\n    except KeyboardInterrupt:\n        print(\"Consumers are shutting down.\")\n<\/xmp>\n\t\t\t\t<\/code>\n\t\t\t<\/pre>\n\t\t<\/div>\n\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-fa30fa1 elementor-widget elementor-widget-text-editor\" data-id=\"fa30fa1\" data-element_type=\"widget\" data-widget_type=\"text-editor.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t\t\t<div><strong>How to run<\/strong><\/div>\t\t\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<div class=\"elementor-element elementor-element-f9108ad elementor-widget elementor-widget-code-highlight\" data-id=\"f9108ad\" data-element_type=\"widget\" data-widget_type=\"code-highlight.default\">\n\t\t\t\t<div class=\"elementor-widget-container\">\n\t\t\t\t\t\t\t<div class=\"prismjs-tomorrow copy-to-clipboard word-wrap\">\n\t\t\t<pre data-line=\"\" class=\"highlight-height language-python line-numbers\">\n\t\t\t\t<code readonly=\"true\" class=\"language-python\">\n\t\t\t\t\t<xmp># Start multiple workers\npython worker.py\n\n# In another terminal, Run the Task Publisher\npython task_publisher.py<\/xmp>\n\t\t\t\t<\/code>\n\t\t\t<\/pre>\n\t\t<\/div>\n\t\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t\t\t<\/div>\n\t\t","protected":false},"excerpt":{"rendered":"<p>NATS JetStream is an extension that provides advanced message streaming and data storage capabilities, expanding upon NATS&#8217;s core messaging functionality&hellip;<\/p>\n","protected":false},"author":1,"featured_media":434,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"om_disable_all_campaigns":false,"_monsterinsights_skip_tracking":false,"_monsterinsights_sitenote_active":false,"_monsterinsights_sitenote_note":"","_monsterinsights_sitenote_category":0,"footnotes":""},"categories":[21],"tags":[],"class_list":["post-504","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-computer_en"],"aioseo_notices":[],"jetpack_featured_media_url":"https:\/\/realstudy.net\/wp-content\/uploads\/2024\/11\/nats_jetstream.webp","_links":{"self":[{"href":"https:\/\/realstudy.net\/index.php?rest_route=\/wp\/v2\/posts\/504","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/realstudy.net\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/realstudy.net\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/realstudy.net\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/realstudy.net\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=504"}],"version-history":[{"count":19,"href":"https:\/\/realstudy.net\/index.php?rest_route=\/wp\/v2\/posts\/504\/revisions"}],"predecessor-version":[{"id":529,"href":"https:\/\/realstudy.net\/index.php?rest_route=\/wp\/v2\/posts\/504\/revisions\/529"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/realstudy.net\/index.php?rest_route=\/wp\/v2\/media\/434"}],"wp:attachment":[{"href":"https:\/\/realstudy.net\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=504"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/realstudy.net\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=504"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/realstudy.net\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=504"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}