产品 Demo 效果

想象你是一个银行的风控开发者,每天有上万个贷款审批请求经过你的AI模型。FSB新规出来后,董事会要求在24小时内提供任何一笔拒绝贷款的完整决策链:用了哪个模型版本、输入了什么特征、输出了什么分数、最终决策是谁触发的。

下图展示了一个简单的审计日志面板,每一行记录一次模型推理的完整上下文:

金融AI审计日志界面

你可以按时间、用户、模型版本过滤,点开详情能看到原始请求JSON和模型响应。这就是今天你要交付的东西。


技术选型

核心需求:每个推理请求都要立即持久化,并且后续不能被任何人篡改(包括DBA)。

  • 后端框架:FastAPI(Python 3.10+),异步处理日志写入,不影响主推理延迟。
  • 存储:SQLite 作为演示,生产推荐 PostgreSQL(支持行级安全与WAL归档)。
  • 防篡改:对每条日志计算 sha256 哈希,并把前一条的哈希链入当前记录(区块链思路)。
  • 日志结构:包含 request_id, timestamp, user_id, model_id, model_version, input_hash, output_hash, latency_ms, previous_hash, current_hash。

选型理由:SQLite够轻量,演示完全够用;哈希链保证了日志的不可抵赖性;FastAPI的依赖注入可以轻松集成到任意AI服务中。


核心代码实现

1. 数据模型与哈希链函数

python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
import hashlib, json, time
from pydantic import BaseModel
from sqlalchemy import Column, String, Float, Text, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

Base = declarative_base()

class AuditLog(Base):
    __tablename__ = "audit_logs"
    request_id = Column(String(64), primary_key=True)
    timestamp = Column(Float, nullable=False)
    user_id = Column(String(64), nullable=False)
    model_id = Column(String(128), nullable=False)
    model_version = Column(String(32), nullable=False)
    input_data = Column(Text, nullable=False)   # 序列化JSON
    output_data = Column(Text, nullable=False)  # 序列化JSON
    input_hash = Column(String(64), nullable=False)
    output_hash = Column(String(64), nullable=False)
    previous_hash = Column(String(64), default="0"*64)
    current_hash = Column(String(64), nullable=False)

    def compute_hash(self, prev_hash):
        raw = f"{self.request_id}{self.timestamp}{self.user_id}{self.model_id}{self.model_version}{self.input_hash}{self.output_hash}{prev_hash}"
        return hashlib.sha256(raw.encode()).hexdigest()

2. 审计中间件(装饰器)

python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
from functools import wraps
from datetime import datetime

def audit_log(model_id: str, model_version: str, db_session):
    def decorator(func):
        @wraps(func)
        async def wrapper(user_id: str, input_data: dict, *args, **kwargs):
            # 序列化输入输出
            input_str = json.dumps(input_data, sort_keys=True)
            input_hash = hashlib.sha256(input_str.encode()).hexdigest()
            
            # 执行模型推理
            start = time.time()
            output_data = await func(input_data, *args, **kwargs)
            latency = time.time() - start
            
            output_str = json.dumps(output_data, sort_keys=True)
            output_hash = hashlib.sha256(output_str.encode()).hexdigest()
            
            # 创建日志记录
            log_entry = AuditLog(
                request_id = str(uuid.uuid4()),
                timestamp = datetime.utcnow().timestamp(),
                user_id = user_id,
                model_id = model_id,
                model_version = model_version,
                input_data = input_str,
                output_data = output_str,
                input_hash = input_hash,
                output_hash = output_hash,
            )
            # 获取上一条日志(按时间降序取第一条)
            prev = db_session.query(AuditLog).order_by(AuditLog.timestamp.desc()).first()
            prev_hash = prev.current_hash if prev else "0"*64
            log_entry.previous_hash = prev_hash
            log_entry.current_hash = log_entry.compute_hash(prev_hash)
            
            db_session.add(log_entry)
            db_session.commit()
            
            return output_data
        return wrapper
    return decorator

3. 使用示例

python
1 2 3 4 5 6 7 8 9 10 11
from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session

app = FastAPI()

# 假设有一个风控模型
@audit_log(model_id="credit-risk-v1", model_version="2.1.0", db_session=get_db)
async def predict_credit_risk(features: dict):
    # 实际调用ML模型
    risk_score = some_model.predict(features)
    return {"risk_score": risk_score, "decision": "approve" if risk_score < 0.7 else "reject"}

项目结构和配置

text
1 2 3 4 5 6 7 8
├── app
│   ├── main.py          # FastAPI 入口,路由
│   ├── models.py        # SQLAlchemy 模型
│   ├── audit.py         # 审计日志中间件
│   ├── database.py      # 数据库连接配置
│   └── schemas.py       # Pydantic schemas
├── requirements.txt
└── .env                 # DATABASE_URL=sqlite:///./logs.db

配置方面只需要一个环境变量 DATABASE_URL。生产部署建议 postgresql://user:pass@host/db,并开启 ssl_mode=require


上线要注意的坑

  1. 哈希链的完整性不能依赖代码——恶意管理员可能直接修改数据库。更稳妥的方案是定期将日志哈希发送到区块链(如以太坊)或另存到AWS S3的Glacier归档,不可变存储。
  2. 性能开销:每次推理多一次DB写操作,对高并发(>1000 QPS)场景可能成为瓶颈。解决方案:使用异步写入队列(如Redis List + 后台批处理),但必须在重启前确保队列写入完成。
  3. 日志保留期限:不同司法管辖区要求不同(通常5-7年)。SQLite不适合海量长期存储,建议按时间分区表(PostgreSQL的分区表或ClickHouse)。
  4. 敏感数据脱敏:input_data中可能包含身份证号、收入等PII。写入日志前必须脱敏(如用 replace() 掩码)或加密存储,密钥由HSM管理。
  5. 审计日志与主数据库事务一致性:如果模型推理成功但日志写入失败,应该回滚还是继续?建议使用两阶段提交或Saga模式。对大多数场景,先写日志再推理,若推理失败则标记日志状态为failed

我的看法:FSB这个框架虽然目前是咨询文件,但金融业AI治理的收紧是必然趋势。开发者不能等到合规部门找上门才动手——现在就开始为每个模型推理打上审计戳,成本最低。上面这套方案我已在两个Demo项目中跑通,从零到集成不到一天。别再问“要不要做”,问“明天能不能上线”。