在微服务架构的系统中,请求在各服务之间流转,调用链错综复杂,一旦出现问题和异常,很难追查定位。链路追踪系统能够追踪并记录请求在系统中的调用顺序、调用时间等一系列关键信息,从而帮助我们定位异常服务和发现性能瓶颈。
**分布式追踪(**OpenTelemetry) 是一个开源的可观测性框架,旨在帮助开发者收集、处理和导出应用程序的监控数据。主要包括:
以请求维度串联服务调用关系并记录耗时。
保留必要信息,通过 Span 串联分散的日志事件。
帮助理解系统行为、辅助调试和排查性能问题。
::: opentracing相关介绍:
https://johng.cn/observability/opentelemetry-introduction
https://johng.cn/observability/opentracing-introduction :::
Opentracing是分布式链路追踪的一种规范标准,和一般的规范标准不同,Opentracing不是传输协议,消息格式层面上的规范标准,而是一种语言层面上的API标准。
Trace (调用链/链路):在广义上,一个 Trace 代表了一个事务或者流程在(分布式)系统中的执行过程。一个 Trace 是由多个 Span 组成的一个有向无环图(DAG),每一个 Span 代表 Trace 中被命名并计时的连续性的执行片段。
Span :一个 Span 代表系统中具有开始时间和执行时长的逻辑运行单元,即应用中的一个逻辑操作。Span 之间通过嵌套或者顺序排列建立逻辑因果关系。一个 Span 可以被理解为一次方法调用,一个程序块的调用,或者一次 RPC / 数据库访问,只要是一个具有完整时间周期的程序访问,都可以被认为是一个 Span。
Logs:每个 Span 可以进行多次 Logs 操作,每一次 Logs 操作,都需要一个带时间戳的时间名称,以及可选的任意大小的存储结构。
Tags:每个Span可以有多个键值对(key:value)形式的 Tags,Tags 是没有时间戳的,支持简单的对 Span 进行注解和补充。
SpanContext:SpanContext 更像是一个“概念”,而不是通用 OpenTracing 层的有用功能。在创建 Span、向传输协议 Inject(注入)和从传输协议 中Extract(提取)调用链信息时,SpanContext 发挥着重要作用。
一个 tracer 过程中,各span的关系
[Span A] ←←←(the root span)
|
+------+------+
| |
[Span B] [Span C] ←←←(Span C 是 Span A 的孩子节点, ChildOf)
| |
[Span D] +---+-------+
| |
[Span E] [Span F] >>> [Span G] >>> [Span H]
↑
↑
↑
(Span G 在 Span F 后被调用)
tracer 与 span 的时间轴关系
––|–––––––|–––––––|–––––––|–––––––|–––––––|–––––––|–––––––|–> time
[Span A···················································]
[Span B··············································]
[Span D··········································]
[Span C········································]
[Span E·······] [Span F··] [Span G··] [Span H··]
()
开源分布式追踪系统对比:
Jaeger是Uber团队开发分布式链路追踪产品,是支持 OpenTelemetry 的系统之一,由GO语言开发,用于监控和诊断微服务架构中的请求流,帮助开发者快速定位性能瓶颈、调试问题以及优化系统。
Jaeger 的架构主要包括以下几个组件:
Jaeger Client:为不同语言实现符合 OpenTracing 的 SDK。应用程序通过 API 写入数据,client library 把 trace 信息按照应用程序制定的采样策略传递给 jaeger-agent 。
Agent:一个监听在 UDP 端口上接收 span 数据的网络守护进程,它会将数据批量发送给 collector 。它被设计成一个基础组件,部署到所有的宿主机上。Agent 将 client library 和 collector 解耦,为 client library 屏蔽了路由和发现 collector 的细节。
Collector:接收 jaeger-agent 发送来的数据,然后将数据写入后端存储。Collector 被设计成无状态的组件,因此用户可以运行任意数量的 Collector。
Data Store:后端存储被设计成一个可插拔的组件,支持数据写入 cassandra , elastic search 等。
Query:接收查询请求,从后端存储系统中检索 tarce 并通过 UI 进行展示。Query 是无状态的,可以启动多个实例并把它们部署在例如 nginx 这样的负载均衡器之后。
具体流程如下:
客户端通过 6831 端口上报数据给 agent
agent通过 14250 端口将数据发送给 collector
collector 将数据写入 kafka
Ingester 从 kafka中读取数据并写入存储后端
query 从存储后端查询数据并展示
jaeger-agent 是客户端代理,需要部署在每台主机上。
Port | Protocol | Function |
---|---|---|
6831 | UDP | 客户端上报jaeger.thrift compact协议数据,大部分客户端都使用这个 |
6832 | UDP | jaeger.thrift binary协议数据。为node客户端单独开的一个端口,因为node 不支持jaeger.thrift compact协议 |
5778 | HTTP | 服务器配置 |
5775 | UDP | zipkin.thrift compact 兼容zipkin的 |
14271 | HTTP | 健康检查和 metrics |
收集器,可以部署多个。收集 agent 发来的数据并写入 db 或 kafka。
Port | Protocol | Function |
---|---|---|
14250 | gRPC | jaeger-agent通过该端口将收集的 span以 model.proto 格式发送到 collector |
14268 | HTTP | 客户端可以通过该端口直接将 span发送到 collector。 |
9411 | HTTP | 用于兼容 zipkin |
14269 | HTTP | 健康检查和 metrics |
UI 界面,主要做数据展示。
Port | Protocol | Function |
---|---|---|
16686 | HTTP | 默认url localhost:16686 |
16686 | gRPC | gRPC查询服务? |
16687 | HTTP | 健康检查和 metrics |
All-in-one 是为快速本地测试而设计的可执行程序,通过内存存储组件启动 Jaeger UI、收集器、查询和代理,这里使用 Docker 镜像启动。使用浏览器访问 http://localhost:16686
Jaeger UI 。
通过docker安装
docker run -d --name jaeger \
-e COLLECTOR_ZIPKIN_HOST_PORT=:9411 \
-p 16686:16686 \
-p 4317:4317 \
-p 4318:4318 \
-p 14250:14250 \
-p 14268:14268 \
-p 14269:14269 \
-p 9411:9411 \
jaegertracing/all-in-one:1.64.0
UI界面:
在 GoFrame 中实现HTTP服务的分布式跟踪
server端
package main
import (
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/ghttp"
"github.com/gogf/gf/v2/net/gtrace"
"github.com/gogf/gf/v2/os/gctx"
"github.com/gogf/gf/contrib/trace/otlphttp/v2"
)
const (
serviceName = "inprocess"
endpoint = "localhost:4318"
path = ""
)
func main() {
var (
ctx = gctx.New()
shutdown, err = otlphttp.Init(serviceName, endpoint, path)
)
if err != nil {
g.Log().Fatal(ctx, err)
}
defer shutdown(ctx)
// Start HTTP server
s := g.Server()
s.Group("/", func(group *ghttp.RouterGroup) {
group.GET("/hello", HelloHandler)
})
s.SetPort(8199)
s.Run()
}
func HelloHandler(r *ghttp.Request) {
// Create a new trace span
ctx, span := gtrace.NewSpan(r.Context(), "HelloHandler")
defer span.End()
// Get baggage value for tracing
value := gtrace.GetBaggageVar(ctx, "name").String()
// Return response
r.Response.Write("hello:", value)
}
client端
package main
import (
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/gtrace"
"github.com/gogf/gf/v2/os/gctx"
"github.com/gogf/gf/contrib/trace/otlphttp/v2"
)
const (
serviceName = "inprocess"
endpoint = "localhost:4318"
path = ""
)
// main initializes and starts an HTTP client with tracing
func main() {
var (
ctx = gctx.New()
shutdown, err = otlphttp.Init(serviceName, endpoint, path)
)
if err != nil {
g.Log().Fatal(ctx, err)
}
defer shutdown(ctx)
StartRequests()
}
func StartRequests() {
// Create a new trace span
ctx, span := gtrace.NewSpan(gctx.New(), "StartRequests")
defer span.End()
ctx = gtrace.SetBaggageValue(ctx, "name", "GoFrame")
// Make HTTP request with tracing
response, err := g.Client().Get(ctx, "http://127.0.0.1:8199/hello")
if err != nil {
g.Log().Error(ctx, err)
return
}
defer response.Close()
g.Log().Info(ctx, response.ReadAllString())
}
在 GoFrame 中实现HTTP服务与数据库交互的分布式跟踪
Server
package main
import (
"context"
"fmt"
"time"
_ "github.com/gogf/gf/contrib/drivers/mysql/v2"
"github.com/gogf/gf/v2/database/gdb"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/ghttp"
"github.com/gogf/gf/v2/os/gcache"
"github.com/gogf/gf/v2/os/gctx"
"github.com/gogf/gf/contrib/trace/otlphttp/v2"
)
type cTrace struct{}
const (
serviceName = "otlp-http-server-with-db" // Name of the service for tracing
endpoint = "localhost:4318"
path = ""
)
func main() {
var (
ctx = gctx.New()
shutdown, err = otlphttp.Init(serviceName, endpoint, path)
)
if err != nil {
g.Log().Fatal(ctx, err)
}
defer shutdown(ctx)
g.DB().GetCache().SetAdapter(gcache.NewAdapterRedis(g.Redis()))
s := g.Server()
s.Use(ghttp.MiddlewareHandlerResponse)
s.Group("/", func(group *ghttp.RouterGroup) {
group.ALL("/user", new(cTrace))
})
s.SetPort(8199)
s.Run()
}
type InsertReq struct {
Name string `v:"required#Please input user name."`
}
type InsertRes struct {
ID int64
}
func (c *cTrace) Insert(ctx context.Context, req *InsertReq) (res *InsertRes, err error) {
result, err := g.Model("user").Ctx(ctx).Insert(req)
if err != nil {
return nil, err
}
id, _ := result.LastInsertId()
res = &InsertRes{
ID: id,
}
return
}
type QueryReq struct {
ID int `v:"min:1#User id is required for querying"`
}
type QueryRes struct {
User gdb.Record
}
func (c *cTrace) Query(ctx context.Context, req *QueryReq) (res *QueryRes, err error) {
one, err := g.Model("user").Ctx(ctx).Cache(gdb.CacheOption{
Duration: 5 * time.Second,
Name: c.userCacheKey(req.ID),
Force: false,
}).WherePri(req.ID).One()
if err != nil {
return nil, err
}
res = &QueryRes{
User: one,
}
return
}
type DeleteReq struct {
Id int `v:"min:1#User id is required for deleting."`
}
type DeleteRes struct{}
func (c *cTrace) Delete(ctx context.Context, req *DeleteReq) (res *DeleteRes, err error) {
_, err = g.Model("user").Ctx(ctx).Cache(gdb.CacheOption{
Duration: -1,
Name: c.userCacheKey(req.Id),
Force: false,
}).WherePri(req.Id).Delete()
if err != nil {
return nil, err
}
return
}
func (c *cTrace) userCacheKey(id int) string {
return fmt.Sprintf(`userInfo:%d`, id)
}
Client
package main
import (
"github.com/gogf/gf/v2/database/gdb"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/ghttp"
"github.com/gogf/gf/v2/net/gtrace"
"github.com/gogf/gf/v2/os/gctx"
"github.com/gogf/gf/contrib/trace/otlphttp/v2"
)
const (
serviceName = "otlp-http-client-with-db" // Name of the service for tracing
endpoint = "localhost:4318"
path = ""
)
func main() {
var (
ctx = gctx.New()
shutdown, err = otlphttp.Init(serviceName, endpoint, path)
)
if err != nil {
g.Log().Fatal(ctx, err)
}
defer shutdown(ctx)
StartRequests()
}
func StartRequests() {
ctx, span := gtrace.NewSpan(gctx.New(), "StartRequests")
defer span.End()
var (
err error
client = g.Client()
)
var insertRes = struct {
ghttp.DefaultHandlerResponse
Data struct{ ID int64 } `json:"data"`
}{}
err = client.PostVar(ctx, "http://127.0.0.1:8199/user/insert", g.Map{
"name": "john",
}).Scan(&insertRes)
if err != nil {
panic(err)
}
g.Log().Info(ctx, "insert result:", insertRes)
if insertRes.Data.ID == 0 {
g.Log().Error(ctx, "retrieve empty id string")
return
}
var queryRes = struct {
ghttp.DefaultHandlerResponse
Data struct{ User gdb.Record } `json:"data"`
}{}
err = client.GetVar(ctx, "http://127.0.0.1:8199/user/query", g.Map{
"id": insertRes.Data.ID,
}).Scan(&queryRes)
if err != nil {
panic(err)
}
g.Log().Info(ctx, "query result:", queryRes)
var deleteRes = struct {
ghttp.DefaultHandlerResponse
}{}
err = client.PostVar(ctx, "http://127.0.0.1:8199/user/delete", g.Map{
"id": insertRes.Data.ID,
}).Scan(&deleteRes)
if err != nil {
panic(err)
}
g.Log().Info(ctx, "delete result:", deleteRes)
}
GoFrame和基于HTTP的OpenTelemetry导出器在进程内服务中实现分布式跟踪
package main
import (
"context"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/gtrace"
"github.com/gogf/gf/v2/os/gctx"
"github.com/gogf/gf/v2/util/gutil"
"github.com/gogf/gf/contrib/trace/otlphttp/v2"
)
const (
serviceName = "inprocess"
endpoint = "localhost:4318"
path = ""
)
func main() {
var (
ctx = gctx.New()
shutdown, err = otlphttp.Init(serviceName, endpoint, path)
)
if err != nil {
g.Log().Fatal(ctx, err)
}
defer shutdown(ctx)
// Create root span for the entire process
ctx, span := gtrace.NewSpan(ctx, "main")
defer span.End()
user1 := GetUser(ctx, 1)
g.Dump(user1)
user100 := GetUser(ctx, 100)
g.Dump(user100)
}
func GetUser(ctx context.Context, id int) g.Map {
ctx, span := gtrace.NewSpan(ctx, "GetUser")
defer span.End()
// Merge user data from different sources
m := g.Map{}
gutil.MapMerge(
m,
GetInfo(ctx, id),
GetDetail(ctx, id),
GetScores(ctx, id),
)
return m
}
func GetInfo(ctx context.Context, id int) g.Map {
ctx, span := gtrace.NewSpan(ctx, "GetInfo")
defer span.End()
if id == 100 {
return g.Map{
"id": 100,
"name": "carter",
"gender": 1,
}
}
return nil
}
func GetDetail(ctx context.Context, id int) g.Map {
ctx, span := gtrace.NewSpan(ctx, "GetDetail")
defer span.End()
if id == 100 {
return g.Map{
"site": "https://goframe.org",
"email": "123@test.com",
}
}
return nil
}
func GetScores(ctx context.Context, id int) g.Map {
ctx, span := gtrace.NewSpan(ctx, "GetScores")
defer span.End()
if id == 100 {
return g.Map{
"math": 100,
"english": 60,
"chinese": 50,
}
}
return nil
}
在 GoFrame 中实现gRPC服务与数据库交互的分布式跟踪
proto定义
syntax = "proto3";
package user;
option go_package = "protobuf/user";
service User {
rpc Insert(InsertReq) returns (InsertRes) {}
rpc Query(QueryReq) returns (QueryRes) {}
rpc Delete(DeleteReq) returns (DeleteRes) {}
}
message InsertReq {
string Name = 1;
}
message InsertRes {
int32 Id = 1;
}
message QueryReq {
int32 Id = 1;
}
message QueryRes {
int32 Id = 1;
string Name = 2;
}
message DeleteReq {
int32 Id = 1;
}
message DeleteRes {}
server
package main
import (
"context"
"fmt"
"time"
_ "github.com/gogf/gf/contrib/drivers/mysql/v2"
_ "github.com/gogf/gf/contrib/nosql/redis/v2"
"github.com/gogf/gf/v2/database/gdb"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/gcache"
"github.com/gogf/gf/v2/os/gctx"
"github.com/gogf/gf/contrib/registry/etcd/v2"
"github.com/gogf/gf/contrib/rpc/grpcx/v2"
"github.com/gogf/gf/contrib/trace/otlpgrpc/v2"
"main/protobuf/user"
)
type Controller struct {
user.UnimplementedUserServer
}
const (
serviceName = "otlp-grpc-server"
endpoint = "localhost:4317" //注意端口的区别
traceToken = ""
)
func main() {
// Configure service discovery
grpcx.Resolver.Register(etcd.New("127.0.0.1:2379"))
var (
ctx = gctx.New()
shutdown, err = otlpgrpc.Init(serviceName, endpoint, traceToken)
)
if err != nil {
g.Log().Fatal(ctx, err)
}
defer shutdown(ctx)
g.DB().GetCache().SetAdapter(gcache.NewAdapterRedis(g.Redis()))
s := grpcx.Server.New()
user.RegisterUserServer(s.Server, &Controller{})
s.Run()
}
func (s *Controller) Insert(ctx context.Context, req *user.InsertReq) (res *user.InsertRes, err error) {
result, err := g.Model("user").Ctx(ctx).Insert(g.Map{
"name": req.Name,
})
if err != nil {
return nil, err
}
// Get and return the inserted ID
id, _ := result.LastInsertId()
res = &user.InsertRes{
Id: int32(id),
}
return
}
func (s *Controller) Query(ctx context.Context, req *user.QueryReq) (res *user.QueryRes, err error) {
if err = g.Model("user").Ctx(ctx).Cache(gdb.CacheOption{
Duration: 5 * time.Second,
Name: s.userCacheKey(req.Id),
Force: false,
}).WherePri(req.Id).Scan(&res); err != nil {
return nil, err
}
return
}
func (s *Controller) Delete(ctx context.Context, req *user.DeleteReq) (res *user.DeleteRes, err error) {
err = g.Model("user").Ctx(ctx).Cache(gdb.CacheOption{
Duration: -1,
Name: s.userCacheKey(req.Id),
Force: false,
}).WherePri(req.Id).Scan(&res)
return
}
func (s *Controller) userCacheKey(id int32) string {
return fmt.Sprintf(`userInfo:%d`, id)
}
client
package main
import (
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/gtrace"
"github.com/gogf/gf/v2/os/gctx"
"github.com/gogf/gf/contrib/registry/etcd/v2"
"github.com/gogf/gf/contrib/rpc/grpcx/v2"
"github.com/gogf/gf/contrib/trace/otlpgrpc/v2"
"main/protobuf/user"
)
// Service configuration constants
const (
serviceName = "otlp-grpc-client" // Name of the service for tracing
endpoint = "localhost:4317" //注意端口的区别
traceToken = ""
)
// main initializes and starts a gRPC client with tracing
func main() {
// Configure service discovery
grpcx.Resolver.Register(etcd.New("127.0.0.1:2379"))
var (
ctx = gctx.New()
shutdown, err = otlpgrpc.Init(serviceName, endpoint, traceToken)
)
if err != nil {
g.Log().Fatal(ctx, err)
}
defer shutdown(ctx)
// Start making requests with tracing
StartRequests()
}
func StartRequests() {
// Create a new trace span
ctx, span := gtrace.NewSpan(gctx.New(), "StartRequests")
defer span.End()
// Create a gRPC client
client := user.NewUserClient(grpcx.Client.MustNewGrpcClientConn("demo"))
// Set baggage value for tracing
// This value will be propagated to all child spans
ctx = gtrace.SetBaggageValue(ctx, "uid", 100)
// Insert a new user
// This operation will be traced
insertRes, err := client.Insert(ctx, &user.InsertReq{
Name: "john",
})
if err != nil {
g.Log().Fatalf(ctx, `%+v`, err)
}
g.Log().Info(ctx, "insert id:", insertRes.Id)
// Query the inserted user
// This operation will be traced
queryRes, err := client.Query(ctx, &user.QueryReq{
Id: insertRes.Id,
})
if err != nil {
g.Log().Errorf(ctx, `%+v`, err)
return
}
g.Log().Info(ctx, "query result:", queryRes)
// Delete the user
// This operation will be traced
if _, err = client.Delete(ctx, &user.DeleteReq{
Id: insertRes.Id,
}); err != nil {
g.Log().Errorf(ctx, `%+v`, err)
return
}
g.Log().Info(ctx, "delete id:", insertRes.Id)
// Try to delete a non-existent user
// This will generate an error that will be traced
if _, err = client.Delete(ctx, &user.DeleteReq{
Id: -1,
}); err != nil {
g.Log().Errorf(ctx, `%+v`, err)
return
}
g.Log().Info(ctx, "delete id:", -1)
}
如果您喜欢我的文章,请点击下面按钮随意打赏,您的支持是我最大的动力。
最新评论