从一条突发新闻说起

2026年5月26日,比利时Buggenhout镇发生火车与校车相撞事故,多人死亡。新闻在几分钟内传遍全球,但如果你是一个需要实时跟踪这类事件的开发者(比如做风险分析、情报聚合、自动化新闻简报),你不能只靠手动刷页面。你需要一个事件监控Agent——它能持续抓取RSS、解析内容、提取关键字段、评估影响,甚至自动生成摘要推送到你的Slack。

这篇文章不做空洞的概念介绍。我会用这条新闻作为完整案例,拆解一个生产级别的新闻监控Agent应该怎么设计。你会看到:

  • 架构全景图(事件流、规划、工具、记忆)
  • 核心模块的伪代码(真正能跑的逻辑)
  • 我在实际项目中踩过的三个坑
  • 一个简化版实现,你可以30分钟内跑起来

news monitoring agent pipeline diagram


为什么需要Agent,而不是一个简单的爬虫+LLM调用?

单次对话能做:给定一篇新闻,让LLM总结。但你要监控持续变化的事件流,并且根据事件类型执行不同动作(比如火车事故需要计算死亡人数趋势,而股票新闻需要对比价格波动)。纯爬虫+LLM方案有两个硬伤:

  1. 状态缺失:昨天已处理过的事件今天又出现,需要去重和增量更新。
  2. 任务僵化:每个事件的处理步骤写死在代码里,无法根据内容动态调整(比如事故新闻需要查历史同类事件对比,而娱乐新闻不需要)。

Agent架构通过规划器动态决定步骤,通过记忆模块保留历史状态,通过工具调用获取外部数据。这就是为什么虽然单次LLM调用也能做摘要,但面对真实事件流,你必须用Agent。


Agent架构拆解:四个模块一条管道

整个系统分为四个核心模块:

1. 事件流接入层(工具:RSS/API监听)

输入源可以是NewsAPI、特定RSS、Twitter流(X API)等。工具的作用是定期轮询或通过Webhook接收新事件,返回原始文本。

python
1 2 3 4 5 6 7 8 9 10 11 12 13 14
# 伪代码:RSS轮询工具
class RSSPoller:
    def fetch_new_entries(self, feed_url: str, last_check: datetime) -> List[dict]:
        feed = feedparser.parse(feed_url)
        new_items = []
        for entry in feed.entries:
            if entry.published_parsed > last_check:
                new_items.append({
                    'title': entry.title,
                    'body': entry.summary,
                    'url': entry.link,
                    'published': entry.published_parsed
                })
        return new_items

关键点:这里要处理时区、编码、HTML标签剥离。我踩的第一个坑就是某些RSS的摘要带大量HTML,直接喂给LLM会导致Token浪费和格式污染。所以工具必须暴露一个clean_text方法。

2. 规划器(LLM驱动的决策者)

规划器读取新事件,根据事件类型和优先级决定后续步骤。比如对于“火车事故”这类高影响力事件,规划器可能输出:

  1. 提取关键字段(伤亡人数、地点、原因)
  2. 查询历史相似事件(记忆模块)
  3. 计算风险等级(调用计算器工具)
  4. 生成报告并推送

规划器通常是一个LLM调用,输入事件原始文本 + 系统指令,输出JSON格式的计划。

python
1 2 3 4 5 6 7 8 9 10 11 12 13
def plan(event_text: str) -> dict:
    prompt = f"""
    你是一个新闻监控规划器。给定以下事件文本,输出一个JSON列表表示需要执行的步骤,每个步骤包含:
    - step_name: 唯一标识
    - tool: 需要调用的工具名(可选:extractor, memory_searcher, calculator, reporter)
    - params: 传递给该工具的参数
    
    事件文本:{event_text}
    
    输出格式:{{'steps': [{{'step_name': 'extract_fields', 'tool': 'extractor', 'params': {{'fields': ['deaths','location']}} }}]}}
    """
    response = llm.chat(prompt)
    return json.loads(response)

实际踩坑:LLM经常输出不完整的JSON,或者包含注释。需要在后置处理中用正则修正或使用结构化输出(如用JSON mode或function calling)。我推荐直接用OpenAI的function calling,让LLM以工具调用的形式返回计划。

3. 记忆模块(短期+长期)

  • 短期记忆:当前事件的处理上下文,比如已经提取的字段,正在生成中的报告。用字典存储,每个事件ID对应一个上下文。
  • 长期记忆:历史事件的embedding向量,用于相似度检索。我们使用FAISS或简单的ChromaDB,将过往新闻标题+摘要向量化,新事件到来后计算余弦相似度,找出最相关的历史事件。
python
1 2 3 4 5 6 7 8
class Memory:
    def __init__(self):
        self.short = {}  # event_id -> context dict
        self.long = vector_store(...)  # 存储过往事件的embedding
    
    def find_similar(self, event_text: str, top_k=3):
        query_embed = get_embedding(event_text)
        return self.long.search(query_embed, top_k)

为什么需要长期记忆:比利时这条新闻如果单独看,你不会知道它是不是最近第五起铁路事故。记忆模块能告诉你“过去30天欧洲火车事故有3起,死亡总数上升趋势”,从而影响风险评估。

4. 执行器与工具集

执行器接收规划器的步骤列表,依次调用工具并收集结果。每个工具是一个独立函数,可以同步或异步。

常用工具:

  • extractor:调用LLM从原始文本中提取结构化字段(如死亡人数、地点、时间)
  • calculator:做数值运算(如死亡率趋势计算)
  • reporter:生成最终摘要并推送到Slack/邮件
  • search:搜索外部知识库(比如维基百科、新闻数据库)
python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
class Executor:
    def __init__(self):
        self.tools = {
            'extractor': ExtractorTool(),
            'memory_searcher': MemoryTool(),
            'calculator': CalculatorTool(),
            'reporter': ReporterTool()
        }
    
    def execute(self, plan: dict, event_id: str):
        context = {}
        for step in plan['steps']:
            tool = self.tools[step['tool']]
            result = tool.run(**step['params'], context=context)
            context[step['step_name']] = result
        return context

核心流程图:从事件到报告

下图描述了Agent处理比利时火车事故的完整流程。

flowchart of event processing steps

文字描述

  1. RSS轮询工具推送新事件(比利时火车事故)到Agent入口。
  2. 规划器接收事件文本,决定需要提取字段、查询记忆、生成报告。
  3. 提取器调用LLM,输出:{ "deaths": "several", "location": "Buggenhout, Belgium", "time": "2026-05-26 09:36" }。注意“several”是不精确的,规划器随后会决定(根据历史记忆)是否需要等待后续官方数据再更新。
  4. 记忆模块检索相似事件,返回最近欧洲铁路事故列表。
  5. 计算器根据历史死亡人数中位数,估算本次“several”对应的可能范围(比如4-8人)。
  6. 报告器合并以上信息,生成一条带风险等级的推送:“比利时火车撞校车事故,死亡数若干(估计4-8人),为近30天欧洲第4起铁路事故,建议关注救援进展。”
  7. 报告推送到Slack,同时更新短期记忆标记该事件为“已处理”。

关键实现细节和踩坑记录

坑1:LLM提取字段的不稳定性

当原文说“several people”,LLM可能会提取为“0”或空字符串。我们在extractor工具中加入了验证层:如果数值字段不是整数或明确范围,则标记为unknown并传给规划器,让规划器决定是等待后续更新还是使用默认值。

python
1 2 3 4 5 6 7 8
class ExtractorTool:
    def run(self, fields, text):
        result = llm.extract(text, fields)
        for key in fields:
            if key in ['deaths', 'injured']:
                if not result[key] or not result[key].isdigit():
                    result[key] = 'unknown'
        return result

坑2:事件去重

同一则新闻可能被多个RSS源重复推送。我们的方法是对新闻URL做SHA256哈希作为事件ID,并在记忆模块的短期存储中记录已处理ID。如果新事件的ID已存在,则直接跳过。

坑3:异步与并发

如果你监听多个RSS源,每个新事件都需要独立走一遍规划-执行流程。使用异步事件循环(asyncio)并行处理,但要注意规划器LLM调用是IO密集型,应该用协程池。Python的asyncio.gather配合semaphore控制并发数,避免API限速。


简化版动手实现:30分钟跑起来

以下是一个最小化但可运行的Agent核心。只需要Python 3.9+,openai库和feedparser

步骤1:安装依赖

bash
1
pip install openai feedparser python-dotenv

步骤2:配置.env

text
1 2
OPENAI_API_KEY=sk-...
RSS_URL=https://rss.nytimes.com/services/xml/rss/nyt/World.xml

步骤3:写主文件agent.py

python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
import os
import json
import hashlib
from datetime import datetime
import feedparser
from openai import OpenAI

client = OpenAI()
MEMORY = {}  # event_id -> context

def fetch_news():
    feed = feedparser.parse(os.getenv('RSS_URL'))
    entries = []
    for entry in feed.entries:
        entries.append({
            'title': entry.title,
            'summary': entry.summary,
            'link': entry.link,
            'published': entry.published_parsed
        })
    return entries

def plan(event_text: str) -> dict:
    response = client.chat.completions.create(
        model='gpt-4',
        messages=[{
            "role": "system",
            "content": "你是一个新闻规划器。输出JSON列表的步骤,每个步骤有step_name和tool。可能工具:extractor, reporter"
        },{
            "role": "user",
            "content": f"事件:{event_text}"
        }],
        response_format={"type": "json_object"}
    )
    return json.loads(response.choices[0].message.content)

def run(event: dict):
    event_id = hashlib.sha256(event['link'].encode()).hexdigest()
    if event_id in MEMORY:
        return
    text = f"标题:{event['title']}\n摘要:{event['summary']}"
    plan_steps = plan(text)
    context = {}
    for step in plan_steps.get('steps', []):
        if step['tool'] == 'extractor':
            fields = step['params']['fields']
            # 简化:直接调用LLM提取
            resp = client.chat.completions.create(
                model='gpt-3.5-turbo',
                messages=[{"role": "user", "content": f"从以下文本提取{fields}并返回JSON:{text}"}],
                response_format={"type": "json_object"}
            )
            extracted = json.loads(resp.choices[0].message.content)
            context['extracted'] = extracted
        elif step['tool'] == 'reporter':
            report = f"事故报告:{context.get('extracted', {})}\n来源:{event['link']}"
            print(report)
    MEMORY[event_id] = context

if __name__ == '__main__':
    entries = fetch_news()
    for e in entries:
        run(e)

用法:运行python agent.py,它会抓取最新世界新闻,对每条自动规划提取和报告。你可以扩展工具(比如添加记忆检索)和规划逻辑。


你现在应该做什么

  1. 如果你在构建实时事件系统:参考本文的四个模块,先从事件接入和提取开始,不要一开始就做完美记忆。
  2. 如果你只是想快速验证想法:运行上面的简化版,用你的新闻源替换RSS,观察Agent是如何自动处理不同类型的新闻的。
  3. 注意风险:Agent自动处理新闻可能产生误报。务必加入人工审核通道(比如报告推送到Slack后由人确认)。

记住,Agent的价值在于动态适应而非固定流水线。从比利时火车事故这个案例可以看出,当事件信息不完整时(“several”),Agent会调用记忆和计算来估算,而不是停滞。这是传统爬虫做不到的。

现在,打开你的编辑器,动手吧。