用MQTT+Python复刻工业腐蚀监测仪表盘
这周我看到一条新闻:英国的 CorrosionRADAR 和巴西工程公司 Tecnofink 合作,要在巴西推广保温层下腐蚀(CUI)的实时监测方案。他们的系统核心是一个叫 CLARITY 的仪表盘平台,能从传感器实时拉取腐蚀和湿度数据,帮运维人员提前发现问题。
如果你和我一样是做后端或物联网的,第一反应可能是:这不就是一个 MQTT + 时序数据库 + 前端图表的事儿吗? 对,但工业场景的坑远比Demo多。不过没关系,今天我们先从原型入手,让你1小时内跑通一个“低配版CLARITY”。
这套系统到底在干什么
CUI(Corrosion Under Insulation)是石化、能源行业的隐形杀手。保温材料一旦进水,钢管表面就会加速腐蚀,而且从外部根本看不见。传统方法是定期拆开保温层检查,费时费力。
CorrosionRADAR 的核心方案是在保温层下埋入无线传感器,实时监测金属电位、湿度、温度等参数,数据通过网关上传到云端的 CLARITY 平台。运维人员就能在网页上看到腐蚀风险的实时热力图、报警和趋势曲线。
从技术角度看,这就是一个典型的工业 IoT 架构:传感器 → 边缘网关 → MQTT Broker → 后端处理 → 数据库 → 前端仪表盘。
开发者最该关注的三个技术细节
1. 为什么是MQTT而不是HTTP?
工业传感器通常是低功耗设备,可能用电池供电好几年。MQTT协议非常轻量,连接开销小,支持遗嘱消息(Will Message)检测设备掉线,还能设置QoS保证数据不丢。相比之下,HTTP的请求-响应模型太浪费了。
2. 边缘计算少不了
直接往云端扔原始数据是愚蠢的。一条腐蚀数据可能包含高精度的电压、阻抗等信号,每秒采样几十次。工业现场通常会做一个边缘聚合:比如每分钟计算一次平均值或变化率,只在超过阈值时上传完整波形。CorrosionRADAR 的传感器内置微处理器,在节点端就做了初步诊断,这大大减少了通信和云存储成本。
3. 时序数据库是基操
CLARITY 后端大概率用的是 InfluxDB 或 TimescaleDB。腐蚀数据是典型的时间序列:每个传感器ID + 时间戳 + 指标值。用 PostgreSQL 硬查也不是不行,但时间分区、连续聚合、保留策略会让你写不少代码。专业时序数据库已经把这一切内建好了。
动手:20分钟搭建一个简易腐蚀监测仪表盘
下面我用 Python 模拟一个传感器,通过 MQTT 上报数据,后端用 Flask + InfluxDB 存储,前端用 Dash 展示曲线。所有代码在 GitHub 可找到完整版(稍微修改就能接入真实传感器)。
准备工作
你需要安装:
pip install paho-mqtt influxdb-client dash
docker run -p 8086:8086 influxdb:2.0 # 或者用云服务
1. 模拟传感器,每秒发送随机腐蚀率数据
import paho.mqtt.client as mqtt
import time
import random
import json
client = mqtt.Client()
client.connect("localhost", 1883, 60)
while True:
data = {
"sensor_id": "CUI-001",
"corrosion_rate_mmy": round(random.uniform(0.01, 0.5), 3), # mm/year
"humidity_pct": round(random.uniform(20, 95), 1),
"timestamp": int(time.time())
}
client.publish("corrosion/CUI-001", json.dumps(data))
time.sleep(1)
2. MQTT订阅并写入InfluxDB(使用InfluxDB 2.0 API)
# subscriber.py
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import paho.mqtt.client as mqtt
import json
bucket = "corrosion"
org = "demo"
token = "my-token"
client_influx = InfluxDBClient(url="http://localhost:8086", token=token, org=org)
write_api = client_influx.write_api(write_type=SYNCHRONOUS)
def on_message(client, userdata, msg):
data = json.loads(msg.payload)
point = Point("cui_data") \
.tag("sensor_id", data["sensor_id"]) \
.field("corrosion_rate", data["corrosion_rate_mmy"]) \
.field("humidity", data["humidity_pct"]) \
.time(data["timestamp"], write_precision="s")
write_api.write(bucket=bucket, record=point)
print(f"Written: {data}")
client_mqtt = mqtt.Client()
client_mqtt.on_message = on_message
client_mqtt.connect("localhost", 1883, 60)
client_mqtt.subscribe("corrosion/#")
client_mqtt.loop_forever()
3. 用Dash展示最近一小时的曲线
import dash
from dash import dcc, html
from dash.dependencies import Input, Output
import plotly.graph_objs as go
from influxdb_client import InfluxDBClient
import pandas as pd
app = dash.Dash(__name__)
def query_influx():
client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="demo")
query = '''from(bucket: "corrosion")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cui_data")
|> filter(fn: (r) => r._field == "corrosion_rate")'''
tables = client.query_api().query(query)
if not tables:
return pd.DataFrame()
points = [(record.get_time(), record.get_value()) for table in tables for record in table.records]
return pd.DataFrame(points, columns=["time", "corrosion_rate"])
app.layout = html.Div([
dcc.Graph(id="live-graph"),
dcc.Interval(id="interval", interval=5*1000) # 5秒刷新
])
@app.callback(
Output("live-graph", "figure"),
[Input("interval", "n_intervals")]
)
def update_graph(n):
df = query_influx()
fig = go.Figure([go.Scatter(x=df["time"], y=df["corrosion_rate"], mode="lines+markers")])
fig.update_layout(title="CUI腐蚀速率实时监控", xaxis_title="时间", yaxis_title="腐蚀速率 (mm/year)")
return fig
if __name__ == "__main__":
app.run_server(debug=True)
运行后,浏览器访问 http://localhost:8050 就能看到实时跳动的腐蚀曲线了。
我的看法
这条新闻揭示了一个趋势:工业资产管理正在从“定期巡检”转向“实时数据驱动”。CorrosionRADAR 的技术本身并不神秘,其核心竞争力在于传感器在极端环境下的可靠性(防爆、耐高温)以及与工程公司合作绑定的行业知识。对于普通开发者来说,与其追逐那些听起来高大上的数字孪生概念,不如先把 MQTT + 时序数据库 + 轻量仪表盘这一套基础工具练熟。因为不管前端界面多炫酷,底层的原始数据管道永远是那三板斧。
下一步,如果你想把它变得“工业级”,可以尝试添加:
- 边缘侧用 MicroPython 跑在 ESP32 上模拟传感器
- 集成 Anomaly Detection(比如用 Isolation Forest 实时检测腐蚀率突变)
- 前端加入热力图(用 Plotly 的 Heatmap 对多个传感器坐标定位)
你可以把上面的代码直接 clone 下来做实验。工业 IoT 的门槛没有想象中那么高,缺的就是一个能亲自跑起来的 Demo。