记住一条主线:Langfuse 的数据流是「客户端缓冲 → 批量异步上报 → 服务端落库到 ClickHouse」。生产里几乎所有坑都长在这条链路的某个环节——丢数据是因为缓冲没 flush,成本不准是因为服务端匹配不到价格,PII 泄露是因为脱敏没发生在「离开客户端之前」。先把这条链路记牢,后面每个问题都能定位到具体环节。
一、LangChain 集成:CallbackHandler 自动追踪
如果你用 LangChain(或 LangGraph)编排,不要再手写 @observe 包每一步。Langfuse 提供了 CallbackHandler,挂到 chain/agent 的 config={"callbacks": [handler]} 上,LangChain 的每一次 LLM 调用、每一个 tool、每一层 chain 都会被自动转成对应的 generation / span,嵌套结构和你在 LangChain 里写的拓扑一一对应。
# Langfuse v3 Python SDK + LangChain 集成包
pip install langfuse langchain langchain-openai
import os
from langfuse.langchain import CallbackHandler
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
# 1) Langfuse 凭据走环境变量(推荐),SDK 会自动读取
os.environ["LANGFUSE_PUBLIC_KEY"] = "pk-lf-..."
os.environ["LANGFUSE_SECRET_KEY"] = "sk-lf-..."
os.environ["LANGFUSE_HOST"] = "https://cloud.langfuse.com" # 自托管换成你的域名
# 2) 一个普通的 LangChain 链路,注意这里完全没有 Langfuse 侵入
prompt = ChatPromptTemplate.from_template(
"用一句话解释这个概念给{audience}听:{topic}"
)
model = ChatOpenAI(model="gpt-4o-mini")
chain = prompt | model
# 3) 唯一的接入点:构造 handler,挂到 config.callbacks
langfuse_handler = CallbackHandler()
result = chain.invoke(
{"audience": "五岁小孩", "topic": "向量数据库"},
config={
"callbacks": [langfuse_handler],
# 关键:通过 metadata 注入业务维度,键名是 Langfuse 约定的固定字符串
"metadata": {
"langfuse_session_id": "chat-session-8842", # 会话维度
"langfuse_user_id": "user_007", # 用户维度
"langfuse_tags": ["prod", "onboarding"], # 标签便于过滤
},
},
)
print(result.content)
二、成本与 Token:让 Langfuse 算对钱
Langfuse 的成本计算是服务端行为:每条 generation 落库时,按
model名匹配后台的 Model Definition(价格表),结合 usage(input/output token 数)算出 cost。所以「成本不准」永远是两件事之一:要么 token 数没上报对,要么模型名匹配不到价格表。
| 你的场景 | Langfuse 怎么拿到 token | Langfuse 怎么拿到 cost |
|---|---|---|
用 langfuse.openai 或 OpenAI 兼容 API | SDK 自动从 response.usage 抓 | 按内置模型价格表自动算 |
| LangChain CallbackHandler + 主流模型 | 从 LLM 返回的 usage_metadata 自动抓 | 匹配内置价格表自动算 |
| 自部署 / 私有模型(vLLM、本地模型) | 你在 generation 里手动传 usage_details | 建自定义 Model Definition,按你设的单价算 |
| 完全自己算好了的成本 | 可不传 usage,直接传 cost_details | 用你上报的 cost_details,不再二次计算 |
口诀匹配不上价格表 → cost 为 0;token 没上报 → 算不出 cost。两个入口:usage_details(让它算)或 cost_details(直接给)。
from langfuse import Langfuse, observe, get_client
langfuse = Langfuse() # 读环境变量
# 场景一:私有/自定义模型,手动上报 token usage,让服务端按价格表算钱
@observe(as_type="generation")
def call_private_llm(prompt: str):
# ... 调用你自托管的模型,拿到 output 和 token 统计 ...
output = "模型回答"
prompt_tokens, completion_tokens = 128, 64
client = get_client()
# 在当前 generation 上更新模型名与 usage
client.update_current_generation(
model="my-private-llama-3-70b", # 必须与你建的 Model Definition 名字匹配
usage_details={
"input": prompt_tokens,
"output": completion_tokens,
"total": prompt_tokens + completion_tokens,
},
)
return output
# 场景二:成本你自己算好了,直接上报 cost_details(单位:美元),跳过价格表
@observe(as_type="generation")
def call_with_known_cost(prompt: str):
output = "模型回答"
client = get_client()
client.update_current_generation(
model="my-private-llama-3-70b",
usage_details={"input": 128, "output": 64},
cost_details={
"input": 0.000064, # 这次输入的美元成本
"output": 0.000096, # 这次输出的美元成本
"total": 0.00016,
},
)
return output
langfuse.flush() # 脚本退出前务必 flush,见下一节
- 建自定义 Model Definition:进 Langfuse 项目 → Settings → Models → New model,填
model name(用于匹配,支持正则)、input price / output price(每 token 单价)、tokenizer。建好后所有model字段匹配上的 generation 会自动算 cost。 - 查 Token / Cost 趋势:Dashboard 默认有 Total cost、Token usage 的时间序列图,可切时间窗看每天的花费曲线,定位某天成本异常飙升。
- 按用户/模型分组:在 Dashboard 或 Metrics 里把
cost指标按user_id、model、tags做 group by——这就是为什么前面注入user_id这么重要,没有它就做不了「哪个大客户最烧钱」这种归因。
三、生产部署踩坑
短生命周期进程丢 trace(Lambda / CLI 脚本)
- 典型表现
- 脚本本地跑正常,部署到 Lambda 或当 cron 一次性脚本跑后,Langfuse 里看不到 trace 或只有一部分。
- 判断标准
- 进程退出前所有缓冲数据都已成功上报,Langfuse UI 能看到完整 trace。
- 解决方向
- Langfuse SDK 是异步批量上报,数据先进内存 buffer,后台线程定时/按量发送。短进程在 buffer 还没 flush 时就退出,数据就丢了。解决:进程结束前显式调用
langfuse.flush()(阻塞直到队列清空);进程级彻底退出用langfuse.shutdown()。Lambda 里放在 handler 末尾或 finally 块。
高吞吐下上报成本/压力过大
- 典型表现
- QPS 很高的服务,trace 量爆炸,既占带宽又在 Langfuse 攒了海量低价值数据。
- 判断标准
- 只采样一部分 trace,整体观测信号仍然可用,客户端开销可控。
- 解决方向
- 用客户端采样:初始化时设
Langfuse(sample_rate=0.2)(只上报 20% 的 trace,按 trace 维度整条保留或整条丢弃,不会出现半条 trace)。采样发生在客户端,被丢弃的 trace 根本不发出去,省带宽也省服务端存储。环境变量等价物是LANGFUSE_SAMPLE_RATE。
自托管 ClickHouse / Redis 成为瓶颈
- 典型表现
- 自托管 Langfuse 写入变慢、ingestion 积压、Dashboard 查询超时。
- 判断标准
- ingestion 不积压,查询延迟稳定,数据有备份可恢复。
- 解决方向
- Langfuse v3 的有状态组件是 Postgres(业务元数据)+ ClickHouse(trace/observation 大表)+ Redis(队列与缓存)+ S3/兼容对象存储(事件原始体)。扩容方向:ClickHouse 给足磁盘和内存、按需上集群分片;Redis 做持久化与主从;ingestion worker 水平扩副本消费队列。备份:Postgres 走常规 dump,ClickHouse 用 BACKUP 语句或对接对象存储,S3 桶本身做版本化与跨区。
PII 明文进了 Langfuse
- 典型表现
- trace 的 input/output 里出现手机号、邮箱、身份证等敏感信息,合规风险。
- 判断标准
- 敏感字段在离开客户端之前就被脱敏,Langfuse 服务端永远存不到明文。
- 解决方向
- 初始化时传
mask回调:Langfuse(mask=my_mask_fn)。SDK 在序列化每条 input/output 之前调用这个函数,你在里面用正则把手机号/邮箱替换成占位符。关键是脱敏发生在上报前,所以即使用 Langfuse Cloud 也不会把明文发出去。
import re
from langfuse import Langfuse, observe
# 在上报前对 input/output 做 PII 脱敏的 mask 回调
def pii_mask(data):
# data 是即将上报的 input 或 output,可能是 str / dict / list
if isinstance(data, str):
# 手机号
data = re.sub(r"1[3-9]\d{9}", "[PHONE]", data)
# 邮箱
data = re.sub(r"[\w.+-]+@[\w-]+\.[\w.-]+", "[EMAIL]", data)
return data
if isinstance(data, dict):
return {k: pii_mask(v) for k, v in data.items()}
if isinstance(data, list):
return [pii_mask(v) for v in data]
return data
# mask 在上报前对每条记录生效;sample_rate 控制采样;flush_at/flush_interval 调批量节奏
langfuse = Langfuse(
mask=pii_mask,
sample_rate=0.2, # 只上报 20% 的 trace(高吞吐降本)
flush_at=50, # 攒满 50 条事件就发一批
flush_interval=2.0, # 或每 2 秒发一批(二者先到先发)
)
@observe()
def handle_user_message(text: str):
# 即使这里 input 含手机号 13800001111,上报到 Langfuse 也会变成 [PHONE]
return "已处理"
handle_user_message("我的手机号是 13800001111,邮箱 a@b.com")
langfuse.flush() # 短进程/脚本退出前必须 flush
✓推荐做法
- 短生命周期进程(Lambda、cron、CLI 脚本)在 finally 块里
flush(),进程彻底退出用shutdown() - 高吞吐服务用
sample_rate做客户端采样,并按业务节奏调flush_at/flush_interval - PII 一律在
mask回调里、即上报前脱敏,永不依赖服务端清洗 - 自定义模型先建 Model Definition 再上线,保证 cost 从第一条数据就准
✗不推荐
- 不要在每次调用后都同步
flush()——那会退化成阻塞串行,把异步优势全吃掉 - 不要在 LangChain metadata 里写
user_id而漏掉langfuse_前缀,维度会聚不起来 - 不要指望服务端帮你脱敏 PII,Langfuse 不会替你清洗业务明文
- 不要给私有模型用一个匹配不到价格表的随意 model 名,cost 会一直是 0
⚠常见误区
- 长驻服务(Web Server)通常不用手动 flush,SDK 后台线程会持续上报;但优雅停机时仍应 shutdown 防丢尾批
- sample_rate 是 trace 级整丢整留,不要误以为能按 span 采样
- 改了 Model Definition 价格只对之后的新数据生效,历史 trace 的 cost 不会自动回溯重算
压测高峰下 ingestion 不积压、成本归因按用户/模型可分组、抽查任意一条线上 trace 都看不到 PII 明文、短进程零丢 trace。
可观测性的价值不在于「能看到」,而在于「看到的是对的、且没多看到不该看的」——成本要算准,PII 要看不到,trace 一条都不能丢。
— Langfuse 生产实践小结