在现代微服务架构中,消息队列扮演着至关重要的角色,它负责服务间的解耦、异步通信和流量削峰。NATS 是一个追求极致性能、简洁和可扩展性的消息系统。然而,核心 NATS (Core NATS) 是一个纯粹的内存消息总线,不提供消息持久化,这意味着如果服务下线或重启,消息就会丢失。
为了解决这个问题,NATS 团队推出了 JetStream——一个内建于 NATS Server 的持久化引擎。JetStream 不仅弥补了 Core NATS 在持久化上的短板,还提供了“至少一次”的消息交付语义、灵活的消息消费模型等强大功能,使其成为构建可靠、高可用的分布式系统的理想选择。
我们探讨 NATS JetStream 的核心原理,并通过全新的 Go 代码示例,带你一步步上手 JetStream 的实际使用。
JetStream 的架构主要围绕两个核心概念构建:Stream 和 Consumer。
![JetStream Architecture]
(图片来源: NATS 官方文档)
Stream 是 JetStream 的基本存储单元。你可以把它想象成一个特定主题(Subject)下消息的持久化日志。当一条消息发布到 NATS,如果其 Subject 匹配了某个 Stream 的配置,这条消息就会被捕获并存储在 Stream 中。
关键配置:
ORDERS.*
可以捕获 ORDERS.new
, ORDERS.shipped
等所有以 ORDERS.
开头的消息。File
(磁盘) 或 Memory
(内存)。生产环境通常使用 File
存储以保证持久性。Limits
: 当消息数量、总大小或消息存活时间超过限制时,删除旧消息。Interest
: 只有当所有消费者都确认消费了某条消息后,才删除它。WorkQueue
: 只要有一个消费者确认消费了某条消息,就可以删除它。Consumer 是从 Stream 中读取消息的“视图”。它以一种可追踪、可扩展的方式来消费 Stream 中的消息。JetStream 的强大之处在于其灵活的消费者模型。
关键特性:
Ack
回执给服务器。如果在指定时间内(AckWait
)服务器没有收到 Ack
,它会认为消息处理失败,并会重新投递该消息,从而保证“至少一次”的交付。Fetch
)一批消息。这种模式非常适合工作队列(Work Queue)场景,可以有效防止慢消费者被大量消息压垮。下面,我们将通过 Go 语言构建一个简单的订单处理系统,来演示 JetStream 的使用。
首先,你需要一个运行中的 NATS Server,并启用 JetStream。使用 Docker 是最便捷的方式:
docker run --network host -p 4222:4222 nats -js
-js
参数告诉 NATS Server 启用 JetStream 功能。
我们将编写一个完整的 Go 程序,包含创建 Stream、发布消息、以及分别使用 Pull 和 Push 消费者来处理消息。
首先,确保你的 Go 环境已经准备好,并获取 NATS Go 客户端库:
go get github.com/nats-io/nats.go
以下是完整的示例代码 main.go
。
package main
import (
"fmt"
"log"
"time"
"github.com/nats-io/nats.go"
)
func main() {
// 连接到 NATS 服务器
nc, err := nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatalf("无法连接到 NATS: %v", err)
}
defer nc.Close()
// 1. 获取 JetStream 上下文
js, err := nc.JetStream()
if err != nil {
log.Fatalf("无法获取 JetStream 上下文: %v", err)
}
// 2. 创建或更新 Stream
streamName := "ORDERS"
err = createOrUpdateStream(js, streamName)
if err != nil {
log.Fatalf("创建 Stream 失败: %v", err)
}
// 3. 发布一些消息
publishOrders(js, streamName, 5)
// 4. 创建一个 Pull Consumer 来处理新订单
go processOrdersWithPullConsumer(js, streamName)
// 5. 创建一个 Push Consumer 来监控所有订单
go monitorOrdersWithPushConsumer(js, streamName)
// 阻塞主 goroutine,以便消费者可以持续运行
select {}
}
// createOrUpdateStream 负责创建或更新一个 Stream
func createOrUpdateStream(js nats.JetStreamContext, streamName string) error {
stream, err := js.StreamInfo(streamName)
// 如果 Stream 不存在,则创建它
if err != nil {
fmt.Printf("Stream %s 不存在,正在创建...\n", streamName)
_, err = js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{fmt.Sprintf("%s.*", streamName)}, // 监听 "ORDERS.*"
Storage: nats.FileStorage,
})
if err != nil {
return err
}
} else {
fmt.Printf("发现已存在的 Stream: %s\n", stream.Config.Name)
}
return nil
}
// publishOrders 发布模拟的订单消息
func publishOrders(js nats.JetStreamContext, streamName string, count int) {
fmt.Println("\n--- 开始发布订单 ---")
for i := 1; i <= count; i++ {
subject := fmt.Sprintf("%s.new", streamName)
payload := []byte(fmt.Sprintf("订单 #%d 的数据", i))
_, err := js.Publish(subject, payload)
if err != nil {
log.Printf("发布订单 #%d 失败: %v", i, err)
} else {
fmt.Printf("已发布消息到主题 [%s]\n", subject)
}
time.Sleep(100 * time.Millisecond)
}
fmt.Println("--- 订单发布完成 ---")
}
// processOrdersWithPullConsumer 使用 Pull Consumer 来获取和处理订单
func processOrdersWithPullConsumer(js nats.JetStreamContext, streamName string) {
fmt.Println("\n--- [Worker] Pull Consumer 正在启动 ---")
// 创建一个持久化的 Pull Consumer,名为 "worker"
sub, err := js.PullSubscribe(
fmt.Sprintf("%s.new", streamName), // 只订阅新订单
"worker", // 持久化名称
)
if err != nil {
log.Printf("[Worker] 创建 Pull Consumer 失败: %v", err)
return
}
for {
// 拉取最多 2 条消息,等待时间最长为 1 秒
msgs, err := sub.Fetch(2, nats.MaxWait(1*time.Second))
if err != nil {
// 如果没有消息,这是一个超时错误,是正常的
if err == nats.ErrTimeout {
continue
}
log.Printf("[Worker] 拉取消息失败: %v", err)
continue
}
for _, msg := range msgs {
fmt.Printf("[Worker] 收到新订单: %s\n", string(msg.Data))
// 模拟处理工作
time.Sleep(500 * time.Millisecond)
// 确认消息,通知 JetStream 此消息已成功处理
msg.Ack()
fmt.Printf("[Worker] 已确认订单: %s\n", string(msg.Data))
}
}
}
// monitorOrdersWithPushConsumer 使用 Push Consumer 来监控所有订单事件
func monitorOrdersWithPushConsumer(js nats.JetStreamContext, streamName string) {
fmt.Println("\n--- [Monitor] Push Consumer 正在启动 ---")
// 创建一个临时的 Push Consumer
// DeliverAll() 确保我们从头开始接收所有消息
_, err := js.Subscribe(
fmt.Sprintf("%s.*", streamName), // 订阅所有订单相关的消息
func(msg *nats.Msg) {
fmt.Printf("[Monitor] 监控到事件 [%s]: %s\n", msg.Subject, string(msg.Data))
// Push consumer 可以配置自动或手动 Ack,这里我们只是监控,不进行 Ack
},
nats.Durable("monitor"), // 给它一个持久化名称
nats.DeliverAll(), // 从 Stream 的第一条消息开始消费
)
if err != nil {
log.Printf("[Monitor] 创建 Push Consumer 失败: %v", err)
}
}
createOrUpdateStream
: 这个函数首先检查名为 ORDERS
的 Stream 是否存在。如果不存在,它会创建一个新的 Stream,该 Stream 会监听 ORDERS.*
主题下的所有消息,并将它们存储在磁盘上。publishOrders
: 这个函数模拟发布了 5 个新订单。消息被发布到 ORDERS.new
主题,因此它们会被我们创建的 ORDERS
Stream 捕获。processOrdersWithPullConsumer
:
js.PullSubscribe
的第二个参数 "worker"
是持久化名称(Durable Name),这使得我们的消费者是有状态的。for
循环中,我们使用 sub.Fetch(2, ...)
主动从服务器拉取最多 2 条消息。这给了消费者完全的控制权,可以根据自己的负载来决定何时拉取消息。msg.Ack()
来通知服务器。否则,服务器会在 AckWait
超时后重新投递该消息。monitorOrdersWithPushConsumer
:
ORDERS.*
,可以收到所有订单相关的事件(例如,未来我们可能还会发布 ORDERS.shipped
或 ORDERS.cancelled
)。nats.DeliverAll()
选项告诉 JetStream,这个消费者希望从 Stream 的第一条可用消息开始接收,而不是只接收订阅之后产生的新消息。这对于需要完整历史记录的监控或审计场景非常有用。func(msg *nats.Msg)
进行处理。当你运行上面的 Go 程序,你将看到类似下面的输出:
Stream ORDERS 不存在,正在创建...
--- 开始发布订单 ---
已发布消息到主题 [ORDERS.new]
已发布消息到主题 [ORDERS.new]
已发布消息到主题 [ORDERS.new]
已发布消息到主题 [ORDERS.new]
已发布消息到主题 [ORDERS.new]
--- 订单发布完成 ---
--- [Worker] Pull Consumer 正在启动 ---
--- [Monitor] Push Consumer 正在启动 ---
[Monitor] 监控到事件 [ORDERS.new]: 订单 #1 的数据
[Monitor] 监控到事件 [ORDERS.new]: 订单 #2 的数据
[Monitor] 监控到事件 [ORDERS.new]: 订单 #3 的数据
[Monitor] 监控到事件 [ORDERS.new]: 订单 #4 的数据
[Monitor] 监控到事件 [ORDERS.new]: 订单 #5 的数据
[Worker] 收到新订单: 订单 #1 的数据
[Worker] 收到新订单: 订单 #2 的数据
[Worker] 已确认订单: 订单 #1 的数据
[Worker] 已确认订单: 订单 #2 的数据
[Worker] 收到新订单: 订单 #3 的数据
[Worker] 收到新订单: 订单 #4 的数据
[Worker] 已确认订单: 订单 #3 的数据
[Worker] 已确认订单: 订单 #4 的数据
[Worker] 收到新订单: 订单 #5 的数据
[Worker] 已确认订单: 订单 #5 的数据
从输出中可以清晰地看到:
前面的示例展示了 JetStream 的基本功能。现在,让我们来看一个更贴近生产环境的复杂案例:一个基于 WebSocket 和 JetStream 的高性能实时协作后端。
这个后端服务器允许多个用户通过 WebSocket 连接到同一个“房间”(由 docId
区分),他们发送的消息会被广播给房间内的所有其他用户。即使服务器重启,聊天记录也不会丢失,并且新加入的用户可以看到最近的历史消息。
这个案例的精妙之处在于它并非简单地将 WebSocket 和 JetStream 连接起来,而是通过内存缓存和异步工作池等模式,在保证数据持久性的同时,实现了极高的性能和响应速度。
docId
参数被分配到一个特定的房间。Room
): 每个 docId
对应一个 Room
结构体,它在内存中维护两样东西:
jsPubCh
): 从 WebSocket 读到的消息不会直接发布到 JetStream。而是被放入一个 Go channel (jsPubCh
) 中。一个或多个后台 goroutine(工作者)会从这个 channel 中取出消息,再将其发布到 JetStream。
COLLAB
的 Stream 会捕获所有房间(例如 collab.chat.*
)的消息,并将其持久化到磁盘。collab.chat.*
,接收所有房间的消息。docId
。Room
,将消息广播给该房间内的所有 WebSocket 客户端。Room
的内存历史队列中。Room
的内存历史队列中取出最近的消息,直接发送给这个新客户端,使其能快速同步房间状态。这个过程完全不涉及 JetStream,速度极快。main.go
)以下是这个协作后端的完整 Go 代码实现。
package main
import (
"fmt"
"log"
"net/http"
"runtime"
"strings"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/nats-io/nats.go"
)
var (
upgrader = websocket.Upgrader{}
nc *nats.Conn
js nats.JetStreamContext
streamName = "COLLAB"
subjectPrefix = "collab.chat"
)
// WebSocket 超时配置
const (
writeWait = 10 * time.Second
pongWait = 60 * time.Second
)
// 最近消息历史(用于新连接回放,避免每连接创建 JetStream 消费者)
const historySize = 100
// 房间结构:每个文档一个房间
type Room struct {
clients map[*websocket.Conn]bool
mu sync.Mutex
history [][]byte
}
var (
rooms = make(map[string]*Room)
roomsMu sync.RWMutex
)
// JetStream 发布工作池(解耦 WS 读与 JS 发布,避免同步发布阻塞读循环)
type publishItem struct {
subject string
payload []byte
}
var jsPubCh chan publishItem
func getOrCreateRoom(docID string) *Room {
roomsMu.RLock()
room, ok := rooms[docID]
roomsMu.RUnlock()
if ok {
return room
}
roomsMu.Lock()
defer roomsMu.Unlock()
if room, ok = rooms[docID]; ok {
return room
}
room = &Room{clients: make(map[*websocket.Conn]bool)}
rooms[docID] = room
return room
}
func appendHistoryToRoom(room *Room, message []byte) {
copied := append([]byte(nil), message...)
room.mu.Lock()
if len(room.history) >= historySize {
room.history = room.history[1:]
}
room.history = append(room.history, copied)
room.mu.Unlock()
}
// 广播到房间内的所有 WebSocket 客户端
func broadcastToRoom(room *Room, message []byte) {
room.mu.Lock()
defer room.mu.Unlock()
for client := range room.clients {
_ = client.SetWriteDeadline(time.Now().Add(writeWait))
if err := client.WriteMessage(websocket.TextMessage, message); err != nil {
log.Println("写入失败:", err)
client.Close()
delete(room.clients, client)
}
}
}
func wsHandler(w http.ResponseWriter, r *http.Request) {
// 通过 docId 划分房间
docID := r.URL.Query().Get("docId")
if docID == "" {
http.Error(w, "missing docId", http.StatusBadRequest)
return
}
upgrader.CheckOrigin = func(r *http.Request) bool { return true }
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println("WS 升级失败:", err)
return
}
room := getOrCreateRoom(docID)
room.mu.Lock()
room.clients[conn] = true
room.mu.Unlock()
// 回放当前房间历史
room.mu.Lock()
for _, m := range room.history {
_ = conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := conn.WriteMessage(websocket.TextMessage, m); err != nil {
log.Println("历史消息写入失败:", err)
delete(room.clients, conn)
room.mu.Unlock()
conn.Close()
return
}
}
room.mu.Unlock()
// 读控制:限制消息大小、心跳保持与断连检测
conn.SetReadLimit(1 << 20) // 1MB
_ = conn.SetReadDeadline(time.Now().Add(pongWait))
conn.SetPongHandler(func(string) error {
return conn.SetReadDeadline(time.Now().Add(pongWait))
})
// 心跳 Ping(写通道侧,避免长时间空闲连接悬挂)
go func(c *websocket.Conn) {
ticker := time.NewTicker(pongWait / 2)
defer ticker.Stop()
for range ticker.C {
_ = c.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.WriteControl(websocket.PingMessage, nil, time.Now().Add(writeWait)); err != nil {
return
}
}
}(conn)
for {
_, msg, err := conn.ReadMessage()
if err != nil {
log.Println("客户端断开:", err)
room.mu.Lock()
delete(room.clients, conn)
room.mu.Unlock()
return
}
// 投递到发布队列(非阻塞,避免背压拖垮读循环),按文档路由
select {
case jsPubCh <- publishItem{subject: subjectPrefix + "." + docID, payload: msg}:
default:
log.Println("JS 发布队列已满,丢弃消息")
}
}
}
func main() {
var err error
// 连接 NATS
nc, err = nats.Connect(nats.DefaultURL)
if err != nil {
log.Fatal(err)
}
defer nc.Drain()
// 获取 JetStream 上下文(可配置异步发布上限)
js, err = nc.JetStream(nats.PublishAsyncMaxPending(1024))
if err != nil {
log.Fatal(err)
}
// 创建 Stream(若不存在),通配所有文档 subject
_, err = js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{subjectPrefix + ".*"},
Storage: nats.FileStorage, // 持久化
})
if err != nil && err != nats.ErrStreamNameAlreadyInUse {
log.Fatal(err)
}
// 订阅所有文档消息,按 subject 路由到对应房间
_, err = js.Subscribe(subjectPrefix+".*", func(m *nats.Msg) {
docID := strings.TrimPrefix(m.Subject, subjectPrefix+".")
if docID == "" || strings.Contains(docID, ".") {
m.Ack()
return
}
room := getOrCreateRoom(docID)
appendHistoryToRoom(room, m.Data)
broadcastToRoom(room, m.Data)
m.Ack()
}, nats.Durable("push"), nats.ManualAck())
if err != nil {
log.Fatal(err)
}
// 启动 JetStream 发布工作池
jsPubCh = make(chan publishItem, 1000)
workerCount := runtime.NumCPU()
if workerCount < 2 {
workerCount = 2
}
for i := 0; i < workerCount; i++ {
go func() {
for item := range jsPubCh {
if _, err := js.Publish(item.subject, item.payload); err != nil {
log.Println("推送到 JetStream 失败:", err)
}
}
}()
}
// 启动 WebSocket 服务
http.HandleFunc("/ws", wsHandler)
fmt.Println("WebSocket 服务启动: ws://localhost:8080/ws")
log.Fatal(http.ListenAndServe(":8080", nil))
select {}
}
启动 NATS Server:
docker run --network host -p 4222:4222 nats -js
运行 Go 服务:
go run main.go
你将看到输出 WebSocket 服务启动: ws://localhost:8080/ws
。
使用 WebSocket 客户端连接: 你可以使用任何 WebSocket 测试工具(如 Postman、Simple WebSocket Client 插件等),或者创建一个简单的前端页面来连接服务。
ws://localhost:8080/ws?docId=roomA
ws://localhost:8080/ws?docId=roomB
在连接 A 中发送消息,只有连接 A 的其他客户端能收到。在连接 B 中发送消息,只有连接 B 的客户端能收到。如果你断开一个客户端再重新连接,会立即收到该房间的最近历史消息。
NATS JetStream 为高性能的 NATS 核心带来了强大的持久化和可靠性保证。通过其灵活的 Stream 和 Consumer 模型,特别是 Pull 和 Push 两种消费模式,开发者可以轻松构建出适应不同业务场景的、健壮的分布式系统。
与传统的消息队列(如 Kafka, RabbitMQ)相比,JetStream 继承了 NATS 的简洁和易于运维的特点,同时提供了企业级的消息传递功能。
更多资源:
如果您喜欢我的文章,请点击下面按钮随意打赏,您的支持是我最大的动力。
最新评论