otel-trace中的spans队列

还是本地搭建的 tracing-benchmark 测试环境,测试过程中观察到 jaeger 在 v1 和 v2 不同版本下提供的 metrics 不同,且都包含关于 spans 数量的统计。
根据测试结果可知高负载环境下不同版本均会主动丢弃 spans,好奇丢弃 spans 的具体数量,丢弃行为发生在 client 端还是 server 端,以及能否设置为禁止丢弃。
因此将 otel trace 的主要逻辑提取出来并进行测试,顺便观察单个 span 从创建到保存的完整流程是什么样的。

问题模拟

原始结果

jaeger 服务端分别使用 jaeger-collector:1.64.0jaeger:2.1.0,存储后端则都是 elasticsearch:8.15.3,执行 wrk -c8 -t8 -d30 的测试结果为:

jaeger version total requests total spans saved spans dropped spans dropped percent raw metrics
1.64.0 435476 1306436 247340 1059096 81.07% 127.0.0.1:14269/metrics
2.1.0 519428 1558292* 197122 1361170* 87.35% 127.0.0.1:8888/metrics

根据 1.64.0 版本 metrics 中的数据完整性推测是 server 端发生的 spans 丢弃;但 2.1.0 版本的 metrics 中只有 otelcol_receiver_accepted_spansotelcol_exporter_sent_spans 两个相等的值,类似指标 otelcol_receiver_refused_spansotelcol_exporter_send_failed_spans 的值均为零,更像是在 client 端发生的 spans 丢弃,于是对问题进行简化模拟。

环境准备

原测试环境的 golang 应用程序是使用 Gin 框架编写的 web 应用,单次 API 请求生成的一条 trace 会包含 gin/redis/mysql 三个 span。
模拟环境不再保留实际的请求路径,从测试环境手动复制 span 信息,使用 goroutine 创建多个 worker 并行写入。相关的 jaeger 部署配置和 golang 代码如下:

compose.yaml
name: jaeger

services:
  es:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.15.3
    restart: always
    volumes:
      - es_data:/usr/share/elasticsearch/data
    environment:
      - node.name=es
      - discovery.type=single-node
      - ES_JAVA_OPTS=-Xms1g -Xmx1g
      - ELASTIC_PASSWORD=sHueH6Ut38ATxe4u0XvJ
      - xpack.security.enabled=true
      - xpack.security.http.ssl.enabled=false
    ports:
      - 9200:9200
    healthcheck:
      test:
        [
          "CMD-SHELL",
          "curl -s http://localhost:9200 | grep -q 'missing authentication credentials'",
        ]
      interval: 10s
      timeout: 10s
      retries: 120
    cpus: '2.000'
    mem_limit: 4gb

  v1:
    image: jaegertracing/jaeger-collector:1.64.0
    restart: always
    environment:
      - GOMAXPROCS=2
      - SPAN_STORAGE_TYPE=elasticsearch
      - ES_SERVER_URLS=http://es:9200
      - ES_USERNAME=elastic
      - ES_PASSWORD=sHueH6Ut38ATxe4u0XvJ
    tmpfs:
      - /tmp
    ports:
      - 4318:4318
      - 14269:14269
    depends_on:
      es:
        condition: service_healthy
    cpus: '2.000'
    mem_limit: 4gb

  v2:
    image: jaegertracing/jaeger:2.1.0
    restart: always
    environment:
      - GOMAXPROCS=2
      - ES_SERVER_URLS=http://es:9200
      - ES_USERNAME=elastic
      - ES_PASSWORD=sHueH6Ut38ATxe4u0XvJ
    volumes:
      - ./jaeger-config.yaml:/cmd/jaeger/config.yaml
    tmpfs:
      - /tmp
    command: ["--config", "/cmd/jaeger/config.yaml"]
    ports:
      - 4319:4318
      - 8888:8888
    depends_on:
      es:
        condition: service_healthy
    cpus: '2.000'
    mem_limit: 4gb

volumes:
  es_data: {}
jaeger-config.yaml
# https://github.com/jaegertracing/jaeger/blob/v2.1.0/cmd/jaeger/config-badger.yaml
# https://github.com/jaegertracing/jaeger/blob/v2.1.0/cmd/jaeger/internal/all-in-one.yaml
service:
  extensions: [jaeger_storage, jaeger_query, healthcheckv2]
  pipelines:
    traces:
      receivers: [otlp]
      processors: [batch]
      exporters: [jaeger_storage_exporter]
  telemetry:
    resource:
      service.name: jaeger
    metrics:
      level: basic
      address: 0.0.0.0:8888
    logs:
      level: info

extensions:
  healthcheckv2:
    use_v2: true
    http:
      endpoint: 0.0.0.0:13133

  jaeger_query:
    storage:
      traces: some_storage
    http:
      endpoint: 0.0.0.0:16686

  jaeger_storage:
    backends:
      some_storage:
        elasticsearch:
          server_urls:
            - "${env:ES_SERVER_URLS}"
          auth:
            basic:
              username: "${env:ES_USERNAME}" 
              password: "${env:ES_PASSWORD}"
          indices:
            index_prefix: "jaeger2-main"
            spans:
              date_layout: "2006-01-02"
              rollover_frequency: "day"
              shards: 5
              replicas: 1
            services:
              date_layout: "2006-01-02"
              rollover_frequency: "day"
              shards: 5
              replicas: 1
            dependencies:
              date_layout: "2006-01-02"
              rollover_frequency: "day"
              shards: 5
              replicas: 1
            sampling:
              date_layout: "2006-01-02"
              rollover_frequency: "day"
              shards: 5
              replicas: 1

receivers:
  otlp:
    protocols:
      http:
        endpoint: 0.0.0.0:4318

processors:
  batch:

exporters:
  jaeger_storage_exporter:
    trace_storage: some_storage
main.go
package main

import (
	"context"
	"flag"
	"log"
	"net/http"
	"sync"
	"time"

	"github.com/go-logr/stdr"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
	"go.opentelemetry.io/otel/propagation"
	"go.opentelemetry.io/otel/sdk/resource"
	sdktrace "go.opentelemetry.io/otel/sdk/trace"
	semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
	oteltrace "go.opentelemetry.io/otel/trace"
)

var CFG struct {
	Debug    bool
	Workers  int
	Traces   int
	Service  string
	Endpoint string
}

func init() {
	flag.BoolVar(&CFG.Debug, "debug", false, "Enable debug output")
	flag.IntVar(&CFG.Workers, "workers", 1, "Number of workers (goroutines)")
	flag.IntVar(&CFG.Traces, "traces", 1, "Number of traces for each worker")
	flag.StringVar(&CFG.Service, "service", "oteltrace", "Service name")
	flag.StringVar(&CFG.Endpoint, "endpoint", "http://127.0.0.1:4318", "OTLP http trace exporter endpoint")
	flag.Parse()
}

func main() {
	if CFG.Debug {
		stdr.SetVerbosity(8)
	}

	exporter, err := otlptracehttp.New(context.Background(), otlptracehttp.WithEndpointURL(CFG.Endpoint))
	panicIf(err)

	rsc, err := resource.Merge(
		resource.Default(),
		resource.NewWithAttributes(
			semconv.SchemaURL,
			semconv.ServiceName(CFG.Service),
		),
	)
	panicIf(err)

	provider := sdktrace.NewTracerProvider(
		sdktrace.WithBatcher(exporter),
		sdktrace.WithResource(rsc),
	)
	otel.SetTracerProvider(provider)
	otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))

	wg := new(sync.WaitGroup)
	wg.Add(CFG.Workers)
	for i := 0; i < CFG.Workers; i++ {
		go handle(wg, i)
	}
	wg.Wait()

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
	defer cancel()
	panicIf(provider.Shutdown(ctx))
}

func handle(wg *sync.WaitGroup, index int) {
	defer wg.Done()
	log.Printf("start  worker %d", index)

	for i := 0; i < CFG.Traces; i++ {
		// parent span
		ctx, span := otel.GetTracerProvider().Tracer(
			"go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin",
			oteltrace.WithInstrumentationVersion("0.56.0"),
		).Start(context.Background(), "/ping/GRM", oteltrace.WithSpanKind(oteltrace.SpanKindServer))
		span.SetAttributes(ginAttributes...)

		// child span (redis)
		_, redisSpan := otel.GetTracerProvider().Tracer(
			"github.com/redis/go-redis/extra/redisotel",
			oteltrace.WithInstrumentationVersion("semver:9.7.0"),
		).Start(ctx, "ping", oteltrace.WithSpanKind(oteltrace.SpanKindClient))
		redisSpan.SetAttributes(redisAttributes...)
		time.Sleep(time.Microsecond * 100)
		redisSpan.End()

		// child span (mysql)
		_, mysqlSpan := otel.GetTracerProvider().Tracer(
			"gorm.io/plugin/opentelemetry",
			oteltrace.WithInstrumentationVersion("0.1.8"),
		).Start(ctx, "gorm.Raw", oteltrace.WithSpanKind(oteltrace.SpanKindClient))
		mysqlSpan.SetAttributes(mysqlAttributes...)
		time.Sleep(time.Microsecond * 100)
		mysqlSpan.End()

		span.End()
	}
	log.Printf("finish worker %d", index)
}

func panicIf(err error) {
	if err != nil {
		panic(err)
	}
}

var (
	ginAttributes = []attribute.KeyValue{
		semconv.HTTPRequestMethodGet,
		semconv.HTTPRoute("/ping/GRM"),
		attribute.String("http.scheme", "http"),          // exists in go.opentelemetry.io/otel/semconv/v1.20.0
		attribute.Int("http.status_code", http.StatusOK), // exists in go.opentelemetry.io/otel/semconv/v1.20.0
		attribute.String("http.target", "/ping/GRM"),     // exists in go.opentelemetry.io/otel/semconv/v1.20.0
		semconv.NetworkTypeIpv4,
		semconv.NetworkLocalAddress("172.18.0.4"),
		semconv.NetworkLocalPort(8080),
		semconv.NetworkProtocolName("http"),
		semconv.NetworkProtocolVersion("1.1"),
		semconv.NetworkPeerAddress("172.18.0.1"),
		semconv.NetworkPeerPort(58868),
	}
	redisAttributes = []attribute.KeyValue{
		semconv.CodeFilepath("github.com/whoisnian/tracing-benchmark/router/handler.go"),
		semconv.CodeFunction("router.pingRedis"),
		semconv.CodeLineNumber(49),
		semconv.DBSystemRedis,
		attribute.String("db.connection_string", "redis://redis:6379"), // exists in go.opentelemetry.io/otel/semconv/v1.24.0
		semconv.ServerAddress("redis"),
		semconv.ServerPort(6379),
		semconv.DBOperationName("ping"),
		semconv.DBQueryText("ping"),
	}
	mysqlAttributes = []attribute.KeyValue{
		semconv.DBSystemMySQL,
		semconv.ServerAddress("mysql"),
		semconv.ServerPort(3306),
		semconv.DBOperationName("select"),
		semconv.DBQueryText("SELECT 1"),
		attribute.Int("db.rows_affected", 0), // exists in gorm.io/plugin/[email protected]
	}
)

测试复现

jaeger version duration total spans saved spans dropped spans dropped percent
1.64.0 18s 240000 227611 12389 5.16%
2.1.0 18s 240000* 232106 7894* 3.29%

原因分析

OpenTelemetry SDK

根据文档 Migration to OpenTelemetry SDK,Jaeger client 在 2022 年就已经被停用,并推荐迁移到 OpenTelemetry SDK。在 1.64.0 版本的文档中也已经清理了 Jaeger client 的相关说明。
模拟环境的 OpenTelemetry SDK 接入代码主要参考 OpenTelemetry-Go: Getting Started,整体可以分为 初始化 SDK提交 span 两部分,具体实现为:

batchSpanProcessor.exportSpans 的源码中看到有额外的 Debug 日志,对应的 logger 实现使用了 stdr.New,查找相关文档后在代码中提前设置 stdr.SetVerbosity(8) 即可正常输出日志。
开启 Debug 日志后再次执行测试,发现服务端使用 1.64.0 版本时 client 端确实未出现 spans 丢弃,而服务端切换为 2.1.0 版本后则在 client 端发生了 spans 丢弃。

Jaeger v1

旧版 jaeger-collector:1.64.0 的程序入口在 cmd/collector/main.go,主要功能由 jaeger 自行实现,重点关注的部分为:

因此 jaeger-collector:1.64.0 接收 spans 的 otlpReceiver 来自 opentelemetry-collector,接收到数据后再依次交由自行实现的 SpanProcessor 和 storage plugin 进行后续处理。
SpanProcessor 内维护了一个队列 BoundedQueue,在消费速度慢导致队列排满时就会主动丢弃 spans,从 client 端只能看到请求被正常响应,client 端感知不到服务端的主动丢弃。

Jaeger v2

新版 jaeger:2.1.0 的程序入口在 cmd/jaeger/main.go,主要功能均来自 opentelemetry-collector,重点关注的部分为:

因此 jaeger:2.1.0 接收处理 spans 的 receivers 和 processors 都来自 opentelemetry-collector,但保存数据的 jaeger_storage_exporter 是对自身已有 storage_v1 的封装。
BatchProcessor 内维护了一个队列 singleShardBatcher,但该队列不会主动丢弃 spans,在队列满时服务端会出现请求阻塞,推测 opentelemetry 更倾向把大量 spans 的场景优化放在 client 端。

功能增强

Jaeger v1

从原因分析中可知服务端使用 jaeger-collector:1.64.0 时是服务端主动丢弃,client 端无任何感知,因此完整数据指标只能以服务端为准。
此外服务端 BoundedQueue 未找到行为参数设置,需要针对源码进行修改,再加上 client 端设置 BlockOnQueueFull 才可以实现禁止丢弃。

// https://github.com/jaegertracing/jaeger/blob/65cff3c30823ea20d3dc48bae39d5685ae307da5/pkg/queue/bounded_queue.go#L104
func (q *BoundedQueue) ProduceSync(item any) bool {
	if q.stopped.Load() != 0 {
		q.onDroppedItem(item)
		return false
	}

	q.size.Add(1)
	*q.items <- item
	return true
}

// https://github.com/jaegertracing/jaeger/blob/65cff3c30823ea20d3dc48bae39d5685ae307da5/cmd/collector/app/span_processor.go#L219
func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat processor.SpanFormat, transport processor.InboundTransport, tenant string) bool {
	// ...
	return sp.queue.ProduceSync(item) // Produce() => ProduceSync()
}

func main() {
	// ...
	provider := sdktrace.NewTracerProvider(
		sdktrace.WithBatcher(exporter, sdktrace.WithBlocking()),
		sdktrace.WithResource(rsc),
	)
	// ...
}

Jaeger v2

从原因分析中可知服务端使用 jaeger:2.1.0 时是 client 端主动丢弃,因此服务端并不知道实际的 spans 数量,完整数据指标需要在 client 端进行统计。
例如补充一个自定义 SpanProcessor 用于统计 spans 总数,封装一层 exporter 统计实际发送的 spans 数量,计算两者的差值来得到实际的丢弃数量。

type countExporter struct {
	*otlptrace.Exporter
	Sum *atomic.Int64
}

func (e *countExporter) ExportSpans(ctx context.Context, ss []sdktrace.ReadOnlySpan) error {
	e.Sum.Add(int64(len(ss)))
	return e.Exporter.ExportSpans(ctx, ss)
}
func (e *countExporter) Shutdown(ctx context.Context) error {
	log.Printf("shutdown countExporter %d", e.Sum.Load())
	return e.Exporter.Shutdown(ctx)
}

type countSpanProcessor struct {
	Sum *atomic.Int64
}

func (sp *countSpanProcessor) OnStart(parent context.Context, s sdktrace.ReadWriteSpan) {}
func (sp *countSpanProcessor) OnEnd(s sdktrace.ReadOnlySpan)                            { sp.Sum.Add(1) }
func (sp *countSpanProcessor) Shutdown(ctx context.Context) error {
	log.Printf("shutdown countSpanProcessor %d", sp.Sum.Load())
	return nil
}
func (sp *countSpanProcessor) ForceFlush(ctx context.Context) error { return nil }

func main() {
	// ...
	provider := sdktrace.NewTracerProvider(
		sdktrace.WithBatcher(&countExporter{exporter, new(atomic.Int64)}),
		sdktrace.WithSpanProcessor(&countSpanProcessor{new(atomic.Int64)}),
		sdktrace.WithResource(rsc),
	)
	// ...
}

此外在 client 端为 BatchSpanProcessor 设置 BlockOnQueueFull 即可实现禁止丢弃。

func main() {
	// ...
	provider := sdktrace.NewTracerProvider(
		sdktrace.WithBatcher(exporter, sdktrace.WithBlocking()),
		sdktrace.WithResource(rsc),
	)
	// ...
}

拓展