NATS JetStream 介绍及实际应用

前言

在现代微服务架构中,消息队列扮演着至关重要的角色,它负责服务间的解耦、异步通信和流量削峰。NATS 是一个追求极致性能、简洁和可扩展性的消息系统。然而,核心 NATS (Core NATS) 是一个纯粹的内存消息总线,不提供消息持久化,这意味着如果服务下线或重启,消息就会丢失。

为了解决这个问题,NATS 团队推出了 JetStream——一个内建于 NATS Server 的持久化引擎。JetStream 不仅弥补了 Core NATS 在持久化上的短板,还提供了“至少一次”的消息交付语义、灵活的消息消费模型等强大功能,使其成为构建可靠、高可用的分布式系统的理想选择。

我们探讨 NATS JetStream 的核心原理,并通过全新的 Go 代码示例,带你一步步上手 JetStream 的实际使用。

JetStream 核心概念

JetStream 的架构主要围绕两个核心概念构建:StreamConsumer

![JetStream Architecture] (图片来源: NATS 官方文档)

1. Stream (流)

Stream 是 JetStream 的基本存储单元。你可以把它想象成一个特定主题(Subject)下消息的持久化日志。当一条消息发布到 NATS,如果其 Subject 匹配了某个 Stream 的配置,这条消息就会被捕获并存储在 Stream 中。

关键配置:

  • Subjects: 定义此 Stream 关心哪些主题的消息。支持通配符,例如 ORDERS.* 可以捕获 ORDERS.new, ORDERS.shipped 等所有以 ORDERS. 开头的消息。
  • Storage: 存储方式。可以是 File (磁盘) 或 Memory (内存)。生产环境通常使用 File 存储以保证持久性。
  • Retention Policy: 消息保留策略。
    • Limits: 当消息数量、总大小或消息存活时间超过限制时,删除旧消息。
    • Interest: 只有当所有消费者都确认消费了某条消息后,才删除它。
    • WorkQueue: 只要有一个消费者确认消费了某条消息,就可以删除它。
  • Replicas: 副本数。在集群模式下,可以设置大于 1 的副本数,以实现数据的高可用。

2. Consumer (消费者)

Consumer 是从 Stream 中读取消息的“视图”。它以一种可追踪、可扩展的方式来消费 Stream 中的消息。JetStream 的强大之处在于其灵活的消费者模型。

关键特性:

  • 持久化状态 (Durable Name): Consumer 可以是持久化的。这意味着即使消费者应用下线再上线,它也能从上次消费的位置继续处理,不会丢失或重复消费消息。
  • 消息确认 (Ack Policy): JetStream 支持显式的消息确认。消费者在处理完一条消息后,需要发送一个 Ack 回执给服务器。如果在指定时间内(AckWait)服务器没有收到 Ack,它会认为消息处理失败,并会重新投递该消息,从而保证“至少一次”的交付。
  • 消费模型:
    • Push (推送) 模式: NATS 服务器主动将消息推送给消费者。这种模式延迟低,适用于需要实时处理消息的场景。
    • Pull (拉取) 模式: 消费者根据自己的处理能力,主动向服务器请求(Fetch)一批消息。这种模式非常适合工作队列(Work Queue)场景,可以有效防止慢消费者被大量消息压垮。

构建一个订单处理系统

下面,我们将通过 Go 语言构建一个简单的订单处理系统,来演示 JetStream 的使用。

步骤 1: 启动 NATS Server (JetStream 模式)

首先,你需要一个运行中的 NATS Server,并启用 JetStream。使用 Docker 是最便捷的方式:

docker run --network host -p 4222:4222 nats -js

-js 参数告诉 NATS Server 启用 JetStream 功能。

步骤 2: 编写 Go 代码

我们将编写一个完整的 Go 程序,包含创建 Stream、发布消息、以及分别使用 Pull 和 Push 消费者来处理消息。

首先,确保你的 Go 环境已经准备好,并获取 NATS Go 客户端库:

go get github.com/nats-io/nats.go

以下是完整的示例代码 main.go

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/nats-io/nats.go"
)

func main() {
	// 连接到 NATS 服务器
	nc, err := nats.Connect(nats.DefaultURL)
	if err != nil {
		log.Fatalf("无法连接到 NATS: %v", err)
	}
	defer nc.Close()

	// 1. 获取 JetStream 上下文
	js, err := nc.JetStream()
	if err != nil {
		log.Fatalf("无法获取 JetStream 上下文: %v", err)
	}

	// 2. 创建或更新 Stream
	streamName := "ORDERS"
	err = createOrUpdateStream(js, streamName)
	if err != nil {
		log.Fatalf("创建 Stream 失败: %v", err)
	}

	// 3. 发布一些消息
	publishOrders(js, streamName, 5)

	// 4. 创建一个 Pull Consumer 来处理新订单
	go processOrdersWithPullConsumer(js, streamName)

	// 5. 创建一个 Push Consumer 来监控所有订单
	go monitorOrdersWithPushConsumer(js, streamName)

	// 阻塞主 goroutine,以便消费者可以持续运行
	select {}
}

// createOrUpdateStream 负责创建或更新一个 Stream
func createOrUpdateStream(js nats.JetStreamContext, streamName string) error {
	stream, err := js.StreamInfo(streamName)
	// 如果 Stream 不存在,则创建它
	if err != nil {
		fmt.Printf("Stream %s 不存在,正在创建...\n", streamName)
		_, err = js.AddStream(&nats.StreamConfig{
			Name:     streamName,
			Subjects: []string{fmt.Sprintf("%s.*", streamName)}, // 监听 "ORDERS.*"
			Storage:  nats.FileStorage,
		})
		if err != nil {
			return err
		}
	} else {
		fmt.Printf("发现已存在的 Stream: %s\n", stream.Config.Name)
	}
	return nil
}

// publishOrders 发布模拟的订单消息
func publishOrders(js nats.JetStreamContext, streamName string, count int) {
	fmt.Println("\n--- 开始发布订单 ---")
	for i := 1; i <= count; i++ {
		subject := fmt.Sprintf("%s.new", streamName)
		payload := []byte(fmt.Sprintf("订单 #%d 的数据", i))
		_, err := js.Publish(subject, payload)
		if err != nil {
			log.Printf("发布订单 #%d 失败: %v", i, err)
		} else {
			fmt.Printf("已发布消息到主题 [%s]\n", subject)
		}
		time.Sleep(100 * time.Millisecond)
	}
	fmt.Println("--- 订单发布完成 ---")
}

// processOrdersWithPullConsumer 使用 Pull Consumer 来获取和处理订单
func processOrdersWithPullConsumer(js nats.JetStreamContext, streamName string) {
	fmt.Println("\n--- [Worker] Pull Consumer 正在启动 ---")
	// 创建一个持久化的 Pull Consumer,名为 "worker"
	sub, err := js.PullSubscribe(
		fmt.Sprintf("%s.new", streamName), // 只订阅新订单
		"worker",                          // 持久化名称
	)
	if err != nil {
		log.Printf("[Worker] 创建 Pull Consumer 失败: %v", err)
		return
	}

	for {
		// 拉取最多 2 条消息,等待时间最长为 1 秒
		msgs, err := sub.Fetch(2, nats.MaxWait(1*time.Second))
		if err != nil {
			// 如果没有消息,这是一个超时错误,是正常的
			if err == nats.ErrTimeout {
				continue
			}
			log.Printf("[Worker] 拉取消息失败: %v", err)
			continue
		}

		for _, msg := range msgs {
			fmt.Printf("[Worker] 收到新订单: %s\n", string(msg.Data))
			// 模拟处理工作
			time.Sleep(500 * time.Millisecond)
			// 确认消息,通知 JetStream 此消息已成功处理
			msg.Ack()
			fmt.Printf("[Worker] 已确认订单: %s\n", string(msg.Data))
		}
	}
}

// monitorOrdersWithPushConsumer 使用 Push Consumer 来监控所有订单事件
func monitorOrdersWithPushConsumer(js nats.JetStreamContext, streamName string) {
	fmt.Println("\n--- [Monitor] Push Consumer 正在启动 ---")
	// 创建一个临时的 Push Consumer
	// DeliverAll() 确保我们从头开始接收所有消息
	_, err := js.Subscribe(
		fmt.Sprintf("%s.*", streamName), // 订阅所有订单相关的消息
		func(msg *nats.Msg) {
			fmt.Printf("[Monitor] 监控到事件 [%s]: %s\n", msg.Subject, string(msg.Data))
			// Push consumer 可以配置自动或手动 Ack,这里我们只是监控,不进行 Ack
		},
		nats.Durable("monitor"),   // 给它一个持久化名称
		nats.DeliverAll(),         // 从 Stream 的第一条消息开始消费
	)
	if err != nil {
		log.Printf("[Monitor] 创建 Push Consumer 失败: %v", err)
	}
}

代码解析

  1. createOrUpdateStream: 这个函数首先检查名为 ORDERS 的 Stream 是否存在。如果不存在,它会创建一个新的 Stream,该 Stream 会监听 ORDERS.* 主题下的所有消息,并将它们存储在磁盘上。
  2. publishOrders: 这个函数模拟发布了 5 个新订单。消息被发布到 ORDERS.new 主题,因此它们会被我们创建的 ORDERS Stream 捕获。
  3. processOrdersWithPullConsumer:
    • 这里我们创建了一个 Pull Consumerjs.PullSubscribe 的第二个参数 "worker" 是持久化名称(Durable Name),这使得我们的消费者是有状态的。
    • for 循环中,我们使用 sub.Fetch(2, ...) 主动从服务器拉取最多 2 条消息。这给了消费者完全的控制权,可以根据自己的负载来决定何时拉取消息。
    • 处理完消息后,必须调用 msg.Ack() 来通知服务器。否则,服务器会在 AckWait 超时后重新投递该消息。
  4. monitorOrdersWithPushConsumer:
    • 这里我们创建了一个 Push Consumer。它订阅了 ORDERS.*,可以收到所有订单相关的事件(例如,未来我们可能还会发布 ORDERS.shippedORDERS.cancelled)。
    • nats.DeliverAll() 选项告诉 JetStream,这个消费者希望从 Stream 的第一条可用消息开始接收,而不是只接收订阅之后产生的新消息。这对于需要完整历史记录的监控或审计场景非常有用。
    • 消息由 NATS 服务器主动推送,并通过回调函数 func(msg *nats.Msg) 进行处理。

运行结果

当你运行上面的 Go 程序,你将看到类似下面的输出:

Stream ORDERS 不存在,正在创建...

--- 开始发布订单 ---
已发布消息到主题 [ORDERS.new]
已发布消息到主题 [ORDERS.new]
已发布消息到主题 [ORDERS.new]
已发布消息到主题 [ORDERS.new]
已发布消息到主题 [ORDERS.new]
--- 订单发布完成 ---

--- [Worker] Pull Consumer 正在启动 ---

--- [Monitor] Push Consumer 正在启动 ---
[Monitor] 监控到事件 [ORDERS.new]: 订单 #1 的数据
[Monitor] 监控到事件 [ORDERS.new]: 订单 #2 的数据
[Monitor] 监控到事件 [ORDERS.new]: 订单 #3 的数据
[Monitor] 监控到事件 [ORDERS.new]: 订单 #4 的数据
[Monitor] 监控到事件 [ORDERS.new]: 订单 #5 的数据
[Worker] 收到新订单: 订单 #1 的数据
[Worker] 收到新订单: 订单 #2 的数据
[Worker] 已确认订单: 订单 #1 的数据
[Worker] 已确认订单: 订单 #2 的数据
[Worker] 收到新订单: 订单 #3 的数据
[Worker] 收到新订单: 订单 #4 的数据
[Worker] 已确认订单: 订单 #3 的数据
[Worker] 已确认订单: 订单 #4 的数据
[Worker] 收到新订单: 订单 #5 的数据
[Worker] 已确认订单: 订单 #5 的数据

从输出中可以清晰地看到:

  • Monitor (Push Consumer) 几乎立刻收到了所有 5 条消息,因为它被动接收推送。
  • Worker (Pull Consumer) 按照自己的节奏(每次最多拉取 2 条,处理一条需要 500ms)分批处理并确认了所有 5 条订单消息。

构建高性能的实时协作后端

前面的示例展示了 JetStream 的基本功能。现在,让我们来看一个更贴近生产环境的复杂案例:一个基于 WebSocket 和 JetStream 的高性能实时协作后端。

这个后端服务器允许多个用户通过 WebSocket 连接到同一个“房间”(由 docId 区分),他们发送的消息会被广播给房间内的所有其他用户。即使服务器重启,聊天记录也不会丢失,并且新加入的用户可以看到最近的历史消息。

这个案例的精妙之处在于它并非简单地将 WebSocket 和 JetStream 连接起来,而是通过内存缓存异步工作池等模式,在保证数据持久性的同时,实现了极高的性能和响应速度。

架构设计与核心思想

  1. WebSocket 层: 负责与客户端进行双向通信。每个客户端连接后,会根据 URL 中的 docId 参数被分配到一个特定的房间。
  2. 内存房间 (Room): 每个 docId 对应一个 Room 结构体,它在内存中维护两样东西:
    • 当前所有连接到这个房间的 WebSocket 客户端列表。
    • 一个固定大小的历史消息队列(例如,最近 100 条)。
  3. 发布工作池 (jsPubCh): 从 WebSocket 读到的消息不会直接发布到 JetStream。而是被放入一个 Go channel (jsPubCh) 中。一个或多个后台 goroutine(工作者)会从这个 channel 中取出消息,再将其发布到 JetStream。
    • 核心优势: 这种异步处理方式将 WebSocket 的读操作与 JetStream 的网络写操作完全解耦,避免了 JetStream 网络延迟或阻塞影响到 WebSocket 的心跳和响应,极大地提升了系统的稳定性和吞吐量。
  4. JetStream 层:
    • Stream: 一个名为 COLLAB 的 Stream 会捕获所有房间(例如 collab.chat.*)的消息,并将其持久化到磁盘。
    • Consumer: 整个应用只有一个持久化的 Push Consumer。它订阅通配符主题 collab.chat.*,接收所有房间的消息。
  5. 消息闭环:
    • 当这个全局唯一的 Consumer 收到来自 JetStream 的消息后,它会解析出消息所属的 docId
    • 然后,找到内存中对应的 Room,将消息广播给该房间内的所有 WebSocket 客户端。
    • 同时,这条消息也会被存入该 Room 的内存历史队列中。
  6. 新用户加入: 当一个新客户端连接时,服务器会立即从对应 Room 的内存历史队列中取出最近的消息,直接发送给这个新客户端,使其能快速同步房间状态。这个过程完全不涉及 JetStream,速度极快。

完整代码示例 (main.go)

以下是这个协作后端的完整 Go 代码实现。

package main

import (
	"fmt"
	"log"
	"net/http"
	"runtime"
	"strings"
	"sync"
	"time"

	"github.com/gorilla/websocket"
	"github.com/nats-io/nats.go"
)

var (
	upgrader      = websocket.Upgrader{}
	nc            *nats.Conn
	js            nats.JetStreamContext
	streamName    = "COLLAB"
	subjectPrefix = "collab.chat"
)

// WebSocket 超时配置
const (
	writeWait = 10 * time.Second
	pongWait  = 60 * time.Second
)

// 最近消息历史(用于新连接回放,避免每连接创建 JetStream 消费者)
const historySize = 100

// 房间结构:每个文档一个房间
type Room struct {
	clients map[*websocket.Conn]bool
	mu      sync.Mutex
	history [][]byte
}

var (
	rooms   = make(map[string]*Room)
	roomsMu sync.RWMutex
)

// JetStream 发布工作池(解耦 WS 读与 JS 发布,避免同步发布阻塞读循环)
type publishItem struct {
	subject string
	payload []byte
}

var jsPubCh chan publishItem

func getOrCreateRoom(docID string) *Room {
	roomsMu.RLock()
	room, ok := rooms[docID]
	roomsMu.RUnlock()
	if ok {
		return room
	}
	roomsMu.Lock()
	defer roomsMu.Unlock()
	if room, ok = rooms[docID]; ok {
		return room
	}
	room = &Room{clients: make(map[*websocket.Conn]bool)}
	rooms[docID] = room
	return room
}

func appendHistoryToRoom(room *Room, message []byte) {
	copied := append([]byte(nil), message...)
	room.mu.Lock()
	if len(room.history) >= historySize {
		room.history = room.history[1:]
	}
	room.history = append(room.history, copied)
	room.mu.Unlock()
}

// 广播到房间内的所有 WebSocket 客户端
func broadcastToRoom(room *Room, message []byte) {
	room.mu.Lock()
	defer room.mu.Unlock()
	for client := range room.clients {
		_ = client.SetWriteDeadline(time.Now().Add(writeWait))
		if err := client.WriteMessage(websocket.TextMessage, message); err != nil {
			log.Println("写入失败:", err)
			client.Close()
			delete(room.clients, client)
		}
	}
}

func wsHandler(w http.ResponseWriter, r *http.Request) {
	// 通过 docId 划分房间
	docID := r.URL.Query().Get("docId")
	if docID == "" {
		http.Error(w, "missing docId", http.StatusBadRequest)
		return
	}
	upgrader.CheckOrigin = func(r *http.Request) bool { return true }
	conn, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		log.Println("WS 升级失败:", err)
		return
	}

	room := getOrCreateRoom(docID)
	room.mu.Lock()
	room.clients[conn] = true
	room.mu.Unlock()

	// 回放当前房间历史
	room.mu.Lock()
	for _, m := range room.history {
		_ = conn.SetWriteDeadline(time.Now().Add(writeWait))
		if err := conn.WriteMessage(websocket.TextMessage, m); err != nil {
			log.Println("历史消息写入失败:", err)
			delete(room.clients, conn)
			room.mu.Unlock()
			conn.Close()
			return
		}
	}
	room.mu.Unlock()

	// 读控制:限制消息大小、心跳保持与断连检测
	conn.SetReadLimit(1 << 20) // 1MB
	_ = conn.SetReadDeadline(time.Now().Add(pongWait))
	conn.SetPongHandler(func(string) error {
		return conn.SetReadDeadline(time.Now().Add(pongWait))
	})

	// 心跳 Ping(写通道侧,避免长时间空闲连接悬挂)
	go func(c *websocket.Conn) {
		ticker := time.NewTicker(pongWait / 2)
		defer ticker.Stop()
		for range ticker.C {
			_ = c.SetWriteDeadline(time.Now().Add(writeWait))
			if err := c.WriteControl(websocket.PingMessage, nil, time.Now().Add(writeWait)); err != nil {
				return
			}
		}
	}(conn)

	for {
		_, msg, err := conn.ReadMessage()
		if err != nil {
			log.Println("客户端断开:", err)
			room.mu.Lock()
			delete(room.clients, conn)
			room.mu.Unlock()
			return
		}

		// 投递到发布队列(非阻塞,避免背压拖垮读循环),按文档路由
		select {
		case jsPubCh <- publishItem{subject: subjectPrefix + "." + docID, payload: msg}:
		default:
			log.Println("JS 发布队列已满,丢弃消息")
		}
	}
}

func main() {
	var err error
	// 连接 NATS
	nc, err = nats.Connect(nats.DefaultURL)
	if err != nil {
		log.Fatal(err)
	}
	defer nc.Drain()

	// 获取 JetStream 上下文(可配置异步发布上限)
	js, err = nc.JetStream(nats.PublishAsyncMaxPending(1024))
	if err != nil {
		log.Fatal(err)
	}

	// 创建 Stream(若不存在),通配所有文档 subject
	_, err = js.AddStream(&nats.StreamConfig{
		Name:     streamName,
		Subjects: []string{subjectPrefix + ".*"},
		Storage:  nats.FileStorage, // 持久化
	})
	if err != nil && err != nats.ErrStreamNameAlreadyInUse {
		log.Fatal(err)
	}

	// 订阅所有文档消息,按 subject 路由到对应房间
	_, err = js.Subscribe(subjectPrefix+".*", func(m *nats.Msg) {
		docID := strings.TrimPrefix(m.Subject, subjectPrefix+".")
		if docID == "" || strings.Contains(docID, ".") {
			m.Ack()
			return
		}
		room := getOrCreateRoom(docID)
		appendHistoryToRoom(room, m.Data)
		broadcastToRoom(room, m.Data)
		m.Ack()
	}, nats.Durable("push"), nats.ManualAck())
	if err != nil {
		log.Fatal(err)
	}

	// 启动 JetStream 发布工作池
	jsPubCh = make(chan publishItem, 1000)
	workerCount := runtime.NumCPU()
	if workerCount < 2 {
		workerCount = 2
	}
	for i := 0; i < workerCount; i++ {
		go func() {
			for item := range jsPubCh {
				if _, err := js.Publish(item.subject, item.payload); err != nil {
					log.Println("推送到 JetStream 失败:", err)
				}
			}
		}()
	}

	// 启动 WebSocket 服务
	http.HandleFunc("/ws", wsHandler)
	fmt.Println("WebSocket 服务启动: ws://localhost:8080/ws")
	log.Fatal(http.ListenAndServe(":8080", nil))

	select {}
}

运行和测试

  1. 启动 NATS Server:

    docker run --network host -p 4222:4222 nats -js
    
  2. 运行 Go 服务:

    go run main.go
    

    你将看到输出 WebSocket 服务启动: ws://localhost:8080/ws

  3. 使用 WebSocket 客户端连接: 你可以使用任何 WebSocket 测试工具(如 Postman、Simple WebSocket Client 插件等),或者创建一个简单的前端页面来连接服务。

    • 连接地址 1 (房间 A): ws://localhost:8080/ws?docId=roomA
    • 连接地址 2 (房间 B): ws://localhost:8080/ws?docId=roomB

    在连接 A 中发送消息,只有连接 A 的其他客户端能收到。在连接 B 中发送消息,只有连接 B 的客户端能收到。如果你断开一个客户端再重新连接,会立即收到该房间的最近历史消息。

总结

NATS JetStream 为高性能的 NATS 核心带来了强大的持久化和可靠性保证。通过其灵活的 Stream 和 Consumer 模型,特别是 Pull 和 Push 两种消费模式,开发者可以轻松构建出适应不同业务场景的、健壮的分布式系统。

与传统的消息队列(如 Kafka, RabbitMQ)相比,JetStream 继承了 NATS 的简洁和易于运维的特点,同时提供了企业级的消息传递功能。


更多资源:

打 赏