记住一条主线:Langfuse 的数据流是「客户端缓冲 → 批量异步上报 → 服务端落库到 ClickHouse」。生产里几乎所有坑都长在这条链路的某个环节——丢数据是因为缓冲没 flush,成本不准是因为服务端匹配不到价格,PII 泄露是因为脱敏没发生在「离开客户端之前」。先把这条链路记牢,后面每个问题都能定位到具体环节。

一、LangChain 集成:CallbackHandler 自动追踪

如果你用 LangChain(或 LangGraph)编排,不要再手写 @observe 包每一步。Langfuse 提供了 CallbackHandler,挂到 chain/agent 的 config={"callbacks": [handler]} 上,LangChain 的每一次 LLM 调用、每一个 tool、每一层 chain 都会被自动转成对应的 generation / span,嵌套结构和你在 LangChain 里写的拓扑一一对应。

# Langfuse v3 Python SDK + LangChain 集成包
pip install langfuse langchain langchain-openai
import os
from langfuse.langchain import CallbackHandler
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

# 1) Langfuse 凭据走环境变量(推荐),SDK 会自动读取
os.environ["LANGFUSE_PUBLIC_KEY"] = "pk-lf-..."
os.environ["LANGFUSE_SECRET_KEY"] = "sk-lf-..."
os.environ["LANGFUSE_HOST"] = "https://cloud.langfuse.com"  # 自托管换成你的域名

# 2) 一个普通的 LangChain 链路,注意这里完全没有 Langfuse 侵入
prompt = ChatPromptTemplate.from_template(
    "用一句话解释这个概念给{audience}听:{topic}"
)
model = ChatOpenAI(model="gpt-4o-mini")
chain = prompt | model

# 3) 唯一的接入点:构造 handler,挂到 config.callbacks
langfuse_handler = CallbackHandler()

result = chain.invoke(
    {"audience": "五岁小孩", "topic": "向量数据库"},
    config={
        "callbacks": [langfuse_handler],
        # 关键:通过 metadata 注入业务维度,键名是 Langfuse 约定的固定字符串
        "metadata": {
            "langfuse_session_id": "chat-session-8842",  # 会话维度
            "langfuse_user_id": "user_007",              # 用户维度
            "langfuse_tags": ["prod", "onboarding"],      # 标签便于过滤
        },
    },
)
print(result.content)

二、成本与 Token:让 Langfuse 算对钱

Langfuse 的成本计算是服务端行为:每条 generation 落库时,按 model 名匹配后台的 Model Definition(价格表),结合 usage(input/output token 数)算出 cost。所以「成本不准」永远是两件事之一:要么 token 数没上报对,要么模型名匹配不到价格表。

你的场景Langfuse 怎么拿到 tokenLangfuse 怎么拿到 cost
langfuse.openai 或 OpenAI 兼容 APISDK 自动从 response.usage 抓按内置模型价格表自动算
LangChain CallbackHandler + 主流模型从 LLM 返回的 usage_metadata 自动抓匹配内置价格表自动算
自部署 / 私有模型(vLLM、本地模型)你在 generation 里手动传 usage_details建自定义 Model Definition,按你设的单价算
完全自己算好了的成本可不传 usage,直接传 cost_details用你上报的 cost_details,不再二次计算
口诀匹配不上价格表 → cost 为 0;token 没上报 → 算不出 cost。两个入口:usage_details(让它算)或 cost_details(直接给)。
from langfuse import Langfuse, observe, get_client

langfuse = Langfuse()  # 读环境变量

# 场景一:私有/自定义模型,手动上报 token usage,让服务端按价格表算钱
@observe(as_type="generation")
def call_private_llm(prompt: str):
    # ... 调用你自托管的模型,拿到 output 和 token 统计 ...
    output = "模型回答"
    prompt_tokens, completion_tokens = 128, 64

    client = get_client()
    # 在当前 generation 上更新模型名与 usage
    client.update_current_generation(
        model="my-private-llama-3-70b",   # 必须与你建的 Model Definition 名字匹配
        usage_details={
            "input": prompt_tokens,
            "output": completion_tokens,
            "total": prompt_tokens + completion_tokens,
        },
    )
    return output


# 场景二:成本你自己算好了,直接上报 cost_details(单位:美元),跳过价格表
@observe(as_type="generation")
def call_with_known_cost(prompt: str):
    output = "模型回答"
    client = get_client()
    client.update_current_generation(
        model="my-private-llama-3-70b",
        usage_details={"input": 128, "output": 64},
        cost_details={
            "input": 0.000064,   # 这次输入的美元成本
            "output": 0.000096,  # 这次输出的美元成本
            "total": 0.00016,
        },
    )
    return output

langfuse.flush()  # 脚本退出前务必 flush,见下一节
  1. 建自定义 Model Definition:进 Langfuse 项目 → Settings → Models → New model,填 model name(用于匹配,支持正则)、input price / output price(每 token 单价)、tokenizer。建好后所有 model 字段匹配上的 generation 会自动算 cost。
  2. 查 Token / Cost 趋势:Dashboard 默认有 Total cost、Token usage 的时间序列图,可切时间窗看每天的花费曲线,定位某天成本异常飙升。
  3. 按用户/模型分组:在 Dashboard 或 Metrics 里把 cost 指标按 user_idmodeltags 做 group by——这就是为什么前面注入 user_id 这么重要,没有它就做不了「哪个大客户最烧钱」这种归因。

三、生产部署踩坑

短生命周期进程丢 trace(Lambda / CLI 脚本)

典型表现
脚本本地跑正常,部署到 Lambda 或当 cron 一次性脚本跑后,Langfuse 里看不到 trace 或只有一部分。
判断标准
进程退出前所有缓冲数据都已成功上报,Langfuse UI 能看到完整 trace。
解决方向
Langfuse SDK 是异步批量上报,数据先进内存 buffer,后台线程定时/按量发送。短进程在 buffer 还没 flush 时就退出,数据就丢了。解决:进程结束前显式调用 langfuse.flush()(阻塞直到队列清空);进程级彻底退出用 langfuse.shutdown()。Lambda 里放在 handler 末尾或 finally 块。

高吞吐下上报成本/压力过大

典型表现
QPS 很高的服务,trace 量爆炸,既占带宽又在 Langfuse 攒了海量低价值数据。
判断标准
只采样一部分 trace,整体观测信号仍然可用,客户端开销可控。
解决方向
用客户端采样:初始化时设 Langfuse(sample_rate=0.2)(只上报 20% 的 trace,按 trace 维度整条保留或整条丢弃,不会出现半条 trace)。采样发生在客户端,被丢弃的 trace 根本不发出去,省带宽也省服务端存储。环境变量等价物是 LANGFUSE_SAMPLE_RATE

自托管 ClickHouse / Redis 成为瓶颈

典型表现
自托管 Langfuse 写入变慢、ingestion 积压、Dashboard 查询超时。
判断标准
ingestion 不积压,查询延迟稳定,数据有备份可恢复。
解决方向
Langfuse v3 的有状态组件是 Postgres(业务元数据)+ ClickHouse(trace/observation 大表)+ Redis(队列与缓存)+ S3/兼容对象存储(事件原始体)。扩容方向:ClickHouse 给足磁盘和内存、按需上集群分片;Redis 做持久化与主从;ingestion worker 水平扩副本消费队列。备份:Postgres 走常规 dump,ClickHouse 用 BACKUP 语句或对接对象存储,S3 桶本身做版本化与跨区。

PII 明文进了 Langfuse

典型表现
trace 的 input/output 里出现手机号、邮箱、身份证等敏感信息,合规风险。
判断标准
敏感字段在离开客户端之前就被脱敏,Langfuse 服务端永远存不到明文。
解决方向
初始化时传 mask 回调:Langfuse(mask=my_mask_fn)。SDK 在序列化每条 input/output 之前调用这个函数,你在里面用正则把手机号/邮箱替换成占位符。关键是脱敏发生在上报前,所以即使用 Langfuse Cloud 也不会把明文发出去。
import re
from langfuse import Langfuse, observe

# 在上报前对 input/output 做 PII 脱敏的 mask 回调
def pii_mask(data):
    # data 是即将上报的 input 或 output,可能是 str / dict / list
    if isinstance(data, str):
        # 手机号
        data = re.sub(r"1[3-9]\d{9}", "[PHONE]", data)
        # 邮箱
        data = re.sub(r"[\w.+-]+@[\w-]+\.[\w.-]+", "[EMAIL]", data)
        return data
    if isinstance(data, dict):
        return {k: pii_mask(v) for k, v in data.items()}
    if isinstance(data, list):
        return [pii_mask(v) for v in data]
    return data

# mask 在上报前对每条记录生效;sample_rate 控制采样;flush_at/flush_interval 调批量节奏
langfuse = Langfuse(
    mask=pii_mask,
    sample_rate=0.2,        # 只上报 20% 的 trace(高吞吐降本)
    flush_at=50,            # 攒满 50 条事件就发一批
    flush_interval=2.0,     # 或每 2 秒发一批(二者先到先发)
)

@observe()
def handle_user_message(text: str):
    # 即使这里 input 含手机号 13800001111,上报到 Langfuse 也会变成 [PHONE]
    return "已处理"

handle_user_message("我的手机号是 13800001111,邮箱 a@b.com")
langfuse.flush()  # 短进程/脚本退出前必须 flush
推荐做法
  • 短生命周期进程(Lambda、cron、CLI 脚本)在 finally 块里 flush(),进程彻底退出用 shutdown()
  • 高吞吐服务用 sample_rate 做客户端采样,并按业务节奏调 flush_at / flush_interval
  • PII 一律在 mask 回调里、即上报前脱敏,永不依赖服务端清洗
  • 自定义模型先建 Model Definition 再上线,保证 cost 从第一条数据就准
不推荐
  • 不要在每次调用后都同步 flush()——那会退化成阻塞串行,把异步优势全吃掉
  • 不要在 LangChain metadata 里写 user_id 而漏掉 langfuse_ 前缀,维度会聚不起来
  • 不要指望服务端帮你脱敏 PII,Langfuse 不会替你清洗业务明文
  • 不要给私有模型用一个匹配不到价格表的随意 model 名,cost 会一直是 0
常见误区
  • 长驻服务(Web Server)通常不用手动 flush,SDK 后台线程会持续上报;但优雅停机时仍应 shutdown 防丢尾批
  • sample_rate 是 trace 级整丢整留,不要误以为能按 span 采样
  • 改了 Model Definition 价格只对之后的新数据生效,历史 trace 的 cost 不会自动回溯重算

压测高峰下 ingestion 不积压、成本归因按用户/模型可分组、抽查任意一条线上 trace 都看不到 PII 明文、短进程零丢 trace。

可观测性的价值不在于「能看到」,而在于「看到的是对的、且没多看到不该看的」——成本要算准,PII 要看不到,trace 一条都不能丢。

— Langfuse 生产实践小结