链路追踪基础&gf中的应用

前言

在微服务架构的系统中,请求在各服务之间流转,调用链错综复杂,一旦出现问题和异常,很难追查定位。链路追踪系统能够追踪并记录请求在系统中的调用顺序、调用时间等一系列关键信息,从而帮助我们定位异常服务和发现性能瓶颈。

OpenTelemetry

**分布式追踪(**OpenTelemetry) 是一个开源的可观测性框架,旨在帮助开发者收集、处理和导出应用程序的监控数据。主要包括:

  • 以请求维度串联服务调用关系并记录耗时。

  • 保留必要信息,通过 Span 串联分散的日志事件。

  • 帮助理解系统行为、辅助调试和排查性能问题。

::: opentracing相关介绍:

https://johng.cn/observability/opentelemetry-introduction

https://johng.cn/observability/opentracing-introduction :::

OpenTracing

Opentracing是分布式链路追踪的一种规范标准,和一般的规范标准不同,Opentracing不是传输协议,消息格式层面上的规范标准,而是一种语言层面上的API标准。

OpenTracing 数据模型

  • Trace (调用链/链路):在广义上,一个 Trace 代表了一个事务或者流程在(分布式)系统中的执行过程。一个 Trace 是由多个 Span 组成的一个有向无环图(DAG),每一个 Span 代表 Trace 中被命名并计时的连续性的执行片段。

  • Span :一个 Span 代表系统中具有开始时间和执行时长的逻辑运行单元,即应用中的一个逻辑操作。Span 之间通过嵌套或者顺序排列建立逻辑因果关系。一个 Span 可以被理解为一次方法调用,一个程序块的调用,或者一次 RPC / 数据库访问,只要是一个具有完整时间周期的程序访问,都可以被认为是一个 Span。

  • Logs:每个 Span 可以进行多次 Logs 操作,每一次 Logs 操作,都需要一个带时间戳的时间名称,以及可选的任意大小的存储结构。

  • Tags:每个Span可以有多个键值对(key:value)形式的 Tags,Tags 是没有时间戳的,支持简单的对 Span 进行注解和补充。

  • SpanContext:SpanContext 更像是一个“概念”,而不是通用 OpenTracing 层的有用功能。在创建 Span、向传输协议 Inject(注入)和从传输协议 中Extract(提取)调用链信息时,SpanContext 发挥着重要作用。

一个 tracer 过程中,各span的关系


        [Span A]  ←←←(the root span)
            |
     +------+------+
     |             |
 [Span B]      [Span C] ←←←(Span C 是 Span A 的孩子节点, ChildOf)
     |             |
 [Span D]      +---+-------+
               |           |
           [Span E]    [Span F] >>> [Span G] >>> [Span H]
                                       ↑
                                       ↑
                                       ↑
                         (Span G 在 Span F 后被调用)

tracer 与 span 的时间轴关系


––|–––––––|–––––––|–––––––|–––––––|–––––––|–––––––|–––––––|–> time

 [Span A···················································]
   [Span B··············································]
      [Span D··········································]
    [Span C········································]
         [Span E·······]        [Span F··] [Span G··] [Span H··]

(image.png)

开源分布式追踪系统对比:

Jaeger 

Jaeger是Uber团队开发分布式链路追踪产品,是支持 OpenTelemetry 的系统之一,由GO语言开发,用于监控和诊断微服务架构中的请求流,帮助开发者快速定位性能瓶颈、调试问题以及优化系统。

Jaeger 架构

Jaeger 的架构主要包括以下几个组件:

  • Jaeger Client:为不同语言实现符合 OpenTracing 的 SDK。应用程序通过 API 写入数据,client library 把 trace 信息按照应用程序制定的采样策略传递给 jaeger-agent 。

  • Agent:一个监听在 UDP 端口上接收 span 数据的网络守护进程,它会将数据批量发送给 collector 。它被设计成一个基础组件,部署到所有的宿主机上。Agent 将 client library 和 collector 解耦,为 client library 屏蔽了路由和发现 collector 的细节。

  • Collector:接收 jaeger-agent 发送来的数据,然后将数据写入后端存储。Collector 被设计成无状态的组件,因此用户可以运行任意数量的 Collector。

  • Data Store:后端存储被设计成一个可插拔的组件,支持数据写入 cassandra , elastic search 等。

  • Query:接收查询请求,从后端存储系统中检索 tarce 并通过 UI 进行展示。Query 是无状态的,可以启动多个实例并把它们部署在例如 nginx 这样的负载均衡器之后。

数据流

具体流程如下:

  • 客户端通过 6831 端口上报数据给 agent

  • agent通过 14250 端口将数据发送给 collector

  • collector 将数据写入 kafka

  • Ingester 从 kafka中读取数据并写入存储后端

  • query 从存储后端查询数据并展示

agent

jaeger-agent 是客户端代理,需要部署在每台主机上。

Port Protocol Function
6831 UDP 客户端上报jaeger.thrift compact协议数据,大部分客户端都使用这个
6832 UDP jaeger.thrift binary协议数据。为node客户端单独开的一个端口,因为node 不支持jaeger.thrift compact协议
5778 HTTP 服务器配置
5775 UDP zipkin.thrift compact 兼容zipkin的
14271 HTTP 健康检查和 metrics

collector

收集器,可以部署多个。收集 agent 发来的数据并写入 db 或 kafka。

Port Protocol Function
14250 gRPC jaeger-agent通过该端口将收集的 span以 model.proto 格式发送到 collector
14268 HTTP 客户端可以通过该端口直接将 span发送到 collector。
9411 HTTP 用于兼容 zipkin
14269 HTTP 健康检查和 metrics

query

UI 界面,主要做数据展示。

Port Protocol Function
16686 HTTP 默认url localhost:16686
16686 gRPC gRPC查询服务?
16687 HTTP 健康检查和 metrics

安装

All-in-one 是为快速本地测试而设计的可执行程序,通过内存存储组件启动 Jaeger UI、收集器、查询和代理,这里使用 Docker 镜像启动。使用浏览器访问 http://localhost:16686 Jaeger UI 。

通过docker安装

  docker run -d --name jaeger \
  -e COLLECTOR_ZIPKIN_HOST_PORT=:9411 \
  -p 16686:16686 \
  -p 4317:4317 \
  -p 4318:4318 \
  -p 14250:14250 \
  -p 14268:14268 \
  -p 14269:14269 \
  -p 9411:9411 \
  jaegertracing/all-in-one:1.64.0

UI界面:

结合 GoFrame 框架的示例

http服务

在 GoFrame 中实现HTTP服务的分布式跟踪

server端

package main

import (
	"github.com/gogf/gf/v2/frame/g"
	"github.com/gogf/gf/v2/net/ghttp"
	"github.com/gogf/gf/v2/net/gtrace"
	"github.com/gogf/gf/v2/os/gctx"

	"github.com/gogf/gf/contrib/trace/otlphttp/v2"
)

const (
	serviceName = "inprocess"
	endpoint    = "localhost:4318"
	path        = ""
)

func main() {
	var (
		ctx           = gctx.New()
		shutdown, err = otlphttp.Init(serviceName, endpoint, path)
	)
	if err != nil {
		g.Log().Fatal(ctx, err)
	}
	defer shutdown(ctx)

	// Start HTTP server
	s := g.Server()
	s.Group("/", func(group *ghttp.RouterGroup) {
		group.GET("/hello", HelloHandler)
	})
	s.SetPort(8199)
	s.Run()
}

func HelloHandler(r *ghttp.Request) {
	// Create a new trace span
	ctx, span := gtrace.NewSpan(r.Context(), "HelloHandler")
	defer span.End()

	// Get baggage value for tracing
	value := gtrace.GetBaggageVar(ctx, "name").String()

	// Return response
	r.Response.Write("hello:", value)
}

client端

package main

import (
	"github.com/gogf/gf/v2/frame/g"
	"github.com/gogf/gf/v2/net/gtrace"
	"github.com/gogf/gf/v2/os/gctx"

	"github.com/gogf/gf/contrib/trace/otlphttp/v2"
)

const (
	serviceName = "inprocess"
	endpoint    = "localhost:4318"
	path        = ""
)

// main initializes and starts an HTTP client with tracing
func main() {
	var (
		ctx           = gctx.New()
		shutdown, err = otlphttp.Init(serviceName, endpoint, path)
	)
	if err != nil {
		g.Log().Fatal(ctx, err)
	}
	defer shutdown(ctx)

	StartRequests()
}

func StartRequests() {
	// Create a new trace span
	ctx, span := gtrace.NewSpan(gctx.New(), "StartRequests")
	defer span.End()


	ctx = gtrace.SetBaggageValue(ctx, "name", "GoFrame")

	// Make HTTP request with tracing
	response, err := g.Client().Get(ctx, "http://127.0.0.1:8199/hello")
	if err != nil {
		g.Log().Error(ctx, err)
		return
	}
	defer response.Close()

	g.Log().Info(ctx, response.ReadAllString())
}

http-db

在 GoFrame 中实现HTTP服务与数据库交互的分布式跟踪

Server

package main

import (
	"context"
	"fmt"
	"time"

	_ "github.com/gogf/gf/contrib/drivers/mysql/v2"

	"github.com/gogf/gf/v2/database/gdb"
	"github.com/gogf/gf/v2/frame/g"
	"github.com/gogf/gf/v2/net/ghttp"
	"github.com/gogf/gf/v2/os/gcache"
	"github.com/gogf/gf/v2/os/gctx"

	"github.com/gogf/gf/contrib/trace/otlphttp/v2"
)

type cTrace struct{}

const (
	serviceName = "otlp-http-server-with-db" // Name of the service for tracing
	endpoint    = "localhost:4318"
	path        = ""
)

func main() {
	var (
		ctx           = gctx.New()
		shutdown, err = otlphttp.Init(serviceName, endpoint, path)
	)
	if err != nil {
		g.Log().Fatal(ctx, err)
	}
	defer shutdown(ctx)

	g.DB().GetCache().SetAdapter(gcache.NewAdapterRedis(g.Redis()))

	s := g.Server()
	s.Use(ghttp.MiddlewareHandlerResponse)
	s.Group("/", func(group *ghttp.RouterGroup) {
		group.ALL("/user", new(cTrace))
	})
	s.SetPort(8199)
	s.Run()
}

type InsertReq struct {
	Name string `v:"required#Please input user name."`
}

type InsertRes struct {
	ID int64
}

func (c *cTrace) Insert(ctx context.Context, req *InsertReq) (res *InsertRes, err error) {
	result, err := g.Model("user").Ctx(ctx).Insert(req)
	if err != nil {
		return nil, err
	}
	id, _ := result.LastInsertId()
	res = &InsertRes{
		ID: id,
	}
	return
}

type QueryReq struct {
	ID int `v:"min:1#User id is required for querying"`
}

type QueryRes struct {
	User gdb.Record
}

func (c *cTrace) Query(ctx context.Context, req *QueryReq) (res *QueryRes, err error) {
	one, err := g.Model("user").Ctx(ctx).Cache(gdb.CacheOption{
		Duration: 5 * time.Second,
		Name:     c.userCacheKey(req.ID),
		Force:    false,
	}).WherePri(req.ID).One()
	if err != nil {
		return nil, err
	}
	res = &QueryRes{
		User: one,
	}
	return
}

type DeleteReq struct {
	Id int `v:"min:1#User id is required for deleting."`
}

type DeleteRes struct{}

func (c *cTrace) Delete(ctx context.Context, req *DeleteReq) (res *DeleteRes, err error) {
	_, err = g.Model("user").Ctx(ctx).Cache(gdb.CacheOption{
		Duration: -1,
		Name:     c.userCacheKey(req.Id),
		Force:    false,
	}).WherePri(req.Id).Delete()
	if err != nil {
		return nil, err
	}
	return
}

func (c *cTrace) userCacheKey(id int) string {
	return fmt.Sprintf(`userInfo:%d`, id)
}

Client

package main

import (
	"github.com/gogf/gf/v2/database/gdb"
	"github.com/gogf/gf/v2/frame/g"
	"github.com/gogf/gf/v2/net/ghttp"
	"github.com/gogf/gf/v2/net/gtrace"
	"github.com/gogf/gf/v2/os/gctx"

	"github.com/gogf/gf/contrib/trace/otlphttp/v2"
)

const (
	serviceName = "otlp-http-client-with-db" // Name of the service for tracing
	endpoint    = "localhost:4318"
	path        = ""
)

func main() {
	var (
		ctx           = gctx.New()
		shutdown, err = otlphttp.Init(serviceName, endpoint, path)
	)
	if err != nil {
		g.Log().Fatal(ctx, err)
	}
	defer shutdown(ctx)

	StartRequests()
}

func StartRequests() {

	ctx, span := gtrace.NewSpan(gctx.New(), "StartRequests")
	defer span.End()

	var (
		err    error
		client = g.Client()
	)

	var insertRes = struct {
		ghttp.DefaultHandlerResponse
		Data struct{ ID int64 } `json:"data"`
	}{}
	err = client.PostVar(ctx, "http://127.0.0.1:8199/user/insert", g.Map{
		"name": "john",
	}).Scan(&insertRes)
	if err != nil {
		panic(err)
	}
	g.Log().Info(ctx, "insert result:", insertRes)
	if insertRes.Data.ID == 0 {
		g.Log().Error(ctx, "retrieve empty id string")
		return
	}

	var queryRes = struct {
		ghttp.DefaultHandlerResponse
		Data struct{ User gdb.Record } `json:"data"`
	}{}
	err = client.GetVar(ctx, "http://127.0.0.1:8199/user/query", g.Map{
		"id": insertRes.Data.ID,
	}).Scan(&queryRes)
	if err != nil {
		panic(err)
	}
	g.Log().Info(ctx, "query result:", queryRes)

	var deleteRes = struct {
		ghttp.DefaultHandlerResponse
	}{}
	err = client.PostVar(ctx, "http://127.0.0.1:8199/user/delete", g.Map{
		"id": insertRes.Data.ID,
	}).Scan(&deleteRes)
	if err != nil {
		panic(err)
	}
	g.Log().Info(ctx, "delete result:", deleteRes)
}

inprocess

GoFrame和基于HTTP的OpenTelemetry导出器在进程内服务中实现分布式跟踪

package main

import (
	"context"

	"github.com/gogf/gf/v2/frame/g"
	"github.com/gogf/gf/v2/net/gtrace"
	"github.com/gogf/gf/v2/os/gctx"
	"github.com/gogf/gf/v2/util/gutil"

	"github.com/gogf/gf/contrib/trace/otlphttp/v2"
)

const (
	serviceName = "inprocess"
	endpoint    = "localhost:4318"
	path        = ""
)

func main() {
	var (
		ctx           = gctx.New()
		shutdown, err = otlphttp.Init(serviceName, endpoint, path)
	)
	if err != nil {
		g.Log().Fatal(ctx, err)
	}
	defer shutdown(ctx)

	// Create root span for the entire process
	ctx, span := gtrace.NewSpan(ctx, "main")
	defer span.End()

	user1 := GetUser(ctx, 1)
	g.Dump(user1)

	user100 := GetUser(ctx, 100)
	g.Dump(user100)
}

func GetUser(ctx context.Context, id int) g.Map {
	ctx, span := gtrace.NewSpan(ctx, "GetUser")
	defer span.End()

	// Merge user data from different sources
	m := g.Map{}
	gutil.MapMerge(
		m,
		GetInfo(ctx, id),
		GetDetail(ctx, id),
		GetScores(ctx, id),
	)
	return m
}

func GetInfo(ctx context.Context, id int) g.Map {
	ctx, span := gtrace.NewSpan(ctx, "GetInfo")
	defer span.End()

	if id == 100 {
		return g.Map{
			"id":     100,
			"name":   "carter",
			"gender": 1,
		}
	}
	return nil
}

func GetDetail(ctx context.Context, id int) g.Map {
	ctx, span := gtrace.NewSpan(ctx, "GetDetail")
	defer span.End()

	if id == 100 {
		return g.Map{
			"site":  "https://goframe.org",
			"email": "123@test.com",
		}
	}
	return nil
}

func GetScores(ctx context.Context, id int) g.Map {
	ctx, span := gtrace.NewSpan(ctx, "GetScores")
	defer span.End()

	if id == 100 {
		return g.Map{
			"math":    100,
			"english": 60,
			"chinese": 50,
		}
	}
	return nil
}

gRPC 调用链路追踪示例

在 GoFrame 中实现gRPC服务与数据库交互的分布式跟踪

proto定义


syntax = "proto3";

package user;

option go_package = "protobuf/user";


service User {
  rpc Insert(InsertReq) returns (InsertRes) {}
  rpc Query(QueryReq) returns (QueryRes) {}
  rpc Delete(DeleteReq) returns (DeleteRes) {}
}

message InsertReq {
  string Name = 1; 
}
message InsertRes {
  int32 Id = 1;
}

message QueryReq {
  int32 Id = 1; 
}
message QueryRes {
  int32  Id = 1;
  string Name = 2;
}

message DeleteReq {
  int32 Id = 1; 
}
message DeleteRes {}

server

package main

import (
	"context"
	"fmt"
	"time"

	_ "github.com/gogf/gf/contrib/drivers/mysql/v2"
	_ "github.com/gogf/gf/contrib/nosql/redis/v2"

	"github.com/gogf/gf/v2/database/gdb"
	"github.com/gogf/gf/v2/frame/g"
	"github.com/gogf/gf/v2/os/gcache"
	"github.com/gogf/gf/v2/os/gctx"

	"github.com/gogf/gf/contrib/registry/etcd/v2"
	"github.com/gogf/gf/contrib/rpc/grpcx/v2"
	"github.com/gogf/gf/contrib/trace/otlpgrpc/v2"

	"main/protobuf/user"
)

type Controller struct {
	user.UnimplementedUserServer
}

const (
	serviceName = "otlp-grpc-server"
	endpoint    = "localhost:4317" //注意端口的区别
	traceToken  = ""
)

func main() {
	// Configure service discovery
	grpcx.Resolver.Register(etcd.New("127.0.0.1:2379"))

	var (
		ctx           = gctx.New()
		shutdown, err = otlpgrpc.Init(serviceName, endpoint, traceToken)
	)

	if err != nil {
		g.Log().Fatal(ctx, err)
	}
	defer shutdown(ctx)

	g.DB().GetCache().SetAdapter(gcache.NewAdapterRedis(g.Redis()))

	s := grpcx.Server.New()
	user.RegisterUserServer(s.Server, &Controller{})
	s.Run()
}

func (s *Controller) Insert(ctx context.Context, req *user.InsertReq) (res *user.InsertRes, err error) {

	result, err := g.Model("user").Ctx(ctx).Insert(g.Map{
		"name": req.Name,
	})
	if err != nil {
		return nil, err
	}

	// Get and return the inserted ID
	id, _ := result.LastInsertId()
	res = &user.InsertRes{
		Id: int32(id),
	}
	return
}

func (s *Controller) Query(ctx context.Context, req *user.QueryReq) (res *user.QueryRes, err error) {

	if err = g.Model("user").Ctx(ctx).Cache(gdb.CacheOption{
		Duration: 5 * time.Second,
		Name:     s.userCacheKey(req.Id),
		Force:    false,
	}).WherePri(req.Id).Scan(&res); err != nil {
		return nil, err
	}
	return
}

func (s *Controller) Delete(ctx context.Context, req *user.DeleteReq) (res *user.DeleteRes, err error) {

	err = g.Model("user").Ctx(ctx).Cache(gdb.CacheOption{
		Duration: -1,
		Name:     s.userCacheKey(req.Id),
		Force:    false,
	}).WherePri(req.Id).Scan(&res)
	return
}

func (s *Controller) userCacheKey(id int32) string {
	return fmt.Sprintf(`userInfo:%d`, id)
}

client

package main

import (
	"github.com/gogf/gf/v2/frame/g"
	"github.com/gogf/gf/v2/net/gtrace"
	"github.com/gogf/gf/v2/os/gctx"

	"github.com/gogf/gf/contrib/registry/etcd/v2"
	"github.com/gogf/gf/contrib/rpc/grpcx/v2"
	"github.com/gogf/gf/contrib/trace/otlpgrpc/v2"

	"main/protobuf/user"
)

// Service configuration constants

const (
	serviceName = "otlp-grpc-client" // Name of the service for tracing
	endpoint    = "localhost:4317"   //注意端口的区别
	traceToken  = ""
)

// main initializes and starts a gRPC client with tracing
func main() {
	// Configure service discovery
	grpcx.Resolver.Register(etcd.New("127.0.0.1:2379"))

	var (
		ctx           = gctx.New()
		shutdown, err = otlpgrpc.Init(serviceName, endpoint, traceToken)
	)

	if err != nil {
		g.Log().Fatal(ctx, err)
	}
	defer shutdown(ctx)

	// Start making requests with tracing
	StartRequests()
}

func StartRequests() {
	// Create a new trace span
	ctx, span := gtrace.NewSpan(gctx.New(), "StartRequests")
	defer span.End()

	// Create a gRPC client
	client := user.NewUserClient(grpcx.Client.MustNewGrpcClientConn("demo"))

	// Set baggage value for tracing
	// This value will be propagated to all child spans
	ctx = gtrace.SetBaggageValue(ctx, "uid", 100)

	// Insert a new user
	// This operation will be traced
	insertRes, err := client.Insert(ctx, &user.InsertReq{
		Name: "john",
	})
	if err != nil {
		g.Log().Fatalf(ctx, `%+v`, err)
	}
	g.Log().Info(ctx, "insert id:", insertRes.Id)

	// Query the inserted user
	// This operation will be traced
	queryRes, err := client.Query(ctx, &user.QueryReq{
		Id: insertRes.Id,
	})
	if err != nil {
		g.Log().Errorf(ctx, `%+v`, err)
		return
	}
	g.Log().Info(ctx, "query result:", queryRes)

	// Delete the user
	// This operation will be traced
	if _, err = client.Delete(ctx, &user.DeleteReq{
		Id: insertRes.Id,
	}); err != nil {
		g.Log().Errorf(ctx, `%+v`, err)
		return
	}
	g.Log().Info(ctx, "delete id:", insertRes.Id)

	// Try to delete a non-existent user
	// This will generate an error that will be traced
	if _, err = client.Delete(ctx, &user.DeleteReq{
		Id: -1,
	}); err != nil {
		g.Log().Errorf(ctx, `%+v`, err)
		return
	}
	g.Log().Info(ctx, "delete id:", -1)
}

打 赏