使用emqx,TDengine,Grafana,docker,实现数据可视化
Authorization 的值为 Basic + TDengine 的 {username}:{password} 经过 Base64 编码之后的字符串, 例如 root:taosdata 编码后实际填入的值为:Basic cm9vdDp0YW9zZGF0YQ==组件安装完成,模拟数据写入成功后,按照 Grafana 可视化界面的操作指引,完成业务所需数据可视化配置。print(f"连接到EMQ
访问 EMQX 下载 页面下载适合您操作系统的安装包
emqx-ee-4.4.32-otp24.3.4.2-4-ubuntu22.04-amd64.deb
安装
sudo apt install ./emqx-ee-4.4.32-otp24.3.4.2-4-ubuntu22.04-amd64.deb
登陆本机
http://localhost:18083
账户 admin 密码public
安装 TDengine
为了方便测试使用通过 Docker 进行安装(需映射网络端口)
## 拉取并启动容器
docker run -d --name tdengine -p 6030-6041:6030-6041 tdengine/tdengine:latest
## 启动后检查容器运行状态
docker ps -a
安装 Grafana
使用以下命令通过 Docker 安装并启动 Grafana
docker run -d --name=grafana -p 3000:3000 -e "GF_INSTALL_PLUGINS=tdengine-datasource" grafana/grafana
登陆网页
http://127.0.0.1:3000
账户 admin
密码 admin
TDengine 创建数据库与数据表
进入TDengine Docker 容器:
taos
create database test;
use test;
CREATE TABLE sensor_data (
ts timestamp,
temperature float,
humidity float,
volume float,
PM10 float,
pm25 float,
SO2 float,
NO2 float,
CO float,
sensor_id NCHAR(255),
area TINYINT,
coll_time timestamp
);
查看数据表
SELECT COUNT(*) FROM sensor_data;
配置 EMQX 规则引擎
打开 EMQX Dashboared,进入 规则引擎 -> 规则 页面,点击 创建 按钮进入创建页面。
规则 SQL
规则 SQL 用于 EMQX 消息以及事件筛选,以下 SQL 表示从 sensor/data 主题筛选出 payload 数据:
SELECT
payload
FROM
"sensor/data"
响应动作
关联资源:HTTP 服务器配置信息
消息内容模板:此处为携带数据的 INSERT SQL
INSERT INTO test.sensor_data VALUES(
now,
${payload.temperature},
${payload.humidity},
${payload.volume},
${payload.PM10},
${payload.pm25},
${payload.SO2},
${payload.NO2},
${payload.CO},
'${payload.id}',
${payload.area},
${payload.ts}
)
点击响应动作下的 添加 按钮,在弹出框内选择 发送数据到 Web 服务,点击 新建资源 新建一个 WebHook 资源。
资源类型选择 Webhook,请求 URL 填写 http://127.0.0.1:6041/rest/sql,请求方法选择 POST, 还需添加 Authorization 请求头作为认证信息 。
Authorization 的值为 Basic + TDengine 的 {username}:{password} 经过 Base64 编码之后的字符串, 例如 root:taosdata 编码后实际填入的值为:Basic cm9vdDp0YW9zZGF0YQ==
安装mattx工具,连接emqx,发送数据
主题:
sensor/data
数据
{
"temperature": 50,
"humidity": 20,
"volume": 10,
"PM10": 10,
"pm25": 30,
"SO2": 1,
"NO2": 2,
"CO": 4,
"id": "10-c6-1f-1a-1f-47",
"area": 1,
"ts": 1596157444170
}
或者使用python脚本发送
import json
import random
import time
import paho.mqtt.client as mqtt
from datetime import datetime
# EMQX服务器配置
EMQX_BROKER = "localhost" # 如果EMQX运行在本机
EMQX_PORT = 1883 # MQTT默认端口
EMQX_TOPIC = "sensor/data" # MQTT主题
# 创建MQTT客户端
client = mqtt.Client()
client.username_pw_set("mqtt_user_1", "xxx")
# 连接到EMQX
try:
client.connect(EMQX_BROKER, EMQX_PORT, 60)
print(f"连接到EMQX服务器 {EMQX_BROKER}:{EMQX_PORT}")
except Exception as e:
print(f"连接失败: {e}")
exit(1)
# 生成模拟传感器数据
def generate_sensor_data():
# 当前时间戳
timestamp = datetime.now().isoformat()
# 生成模拟数据
data = {
"temperature": round(random.uniform(20.0, 30.0), 2),
"humidity": round(random.uniform(40.0, 80.0), 2),
"volume": round(random.uniform(10.0, 80.0), 2),
"PM10": round(random.uniform(20.0, 100.0), 2),
"pm25": round(random.uniform(30.0, 80.0), 2),
"SO2": round(random.uniform(50.0, 80.0), 2),
"NO2":round(random.uniform(60.0, 80.0), 2),
"CO":round(random.uniform(70.0, 90.0), 2),
"id": "10-c6-1f-1a-1f-47",
"area": 1,
"ts": 1596157444170
}
return data
# 主循环
try:
print("开始发送模拟数据到EMQX (按Ctrl+C停止)...")
while True:
# 生成数据
sensor_data = generate_sensor_data()
# 转换为JSON字符串
payload = json.dumps(sensor_data)
# 发布到MQTT主题
client.publish(EMQX_TOPIC, payload)
# 打印发送的信息
print(f"发送数据: {payload}")
# 等待一段时间(例如每2秒发送一次)
time.sleep(2)
except KeyboardInterrupt:
print("停止发送数据")
finally:
# 断开连接
client.disconnect()
查看 数据库
SELECT COUNT(*) FROM sensor_data;
正常可以看到有数据
可视化配置
组件安装完成,模拟数据写入成功后,按照 Grafana 可视化界面的操作指引,完成业务所需数据可视化配置。
# 获取TDengine容器的IP地址
docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' 0a79c4ea2ca5
添加数据源(Add data source)
查询需要的数据
select * from test.sensor_data
点击刷新即可看到曲线
更多推荐


所有评论(0)