访问 EMQX 下载 页面下载适合您操作系统的安装包
emqx-ee-4.4.32-otp24.3.4.2-4-ubuntu22.04-amd64.deb
安装
sudo apt install ./emqx-ee-4.4.32-otp24.3.4.2-4-ubuntu22.04-amd64.deb

登陆本机
http://localhost:18083

账户 admin  密码public

安装 TDengine
为了方便测试使用通过 Docker 进行安装(需映射网络端口)
## 拉取并启动容器
docker run -d --name tdengine -p 6030-6041:6030-6041 tdengine/tdengine:latest

## 启动后检查容器运行状态
docker ps -a

安装 Grafana
使用以下命令通过 Docker 安装并启动 Grafana

docker run -d --name=grafana -p 3000:3000 -e "GF_INSTALL_PLUGINS=tdengine-datasource" grafana/grafana

登陆网页

http://127.0.0.1:3000 
账户 admin
密码 admin


TDengine 创建数据库与数据表
进入TDengine Docker 容器:

taos
create database test;

use test;
CREATE TABLE sensor_data (
  ts timestamp,
  temperature float,
  humidity float,
  volume float,
  PM10 float,
  pm25 float,
  SO2 float,
  NO2 float,
  CO float,
  sensor_id NCHAR(255), 
  area TINYINT,
  coll_time timestamp
);

查看数据表
SELECT COUNT(*) FROM sensor_data;


配置 EMQX 规则引擎
打开 EMQX Dashboared,进入 规则引擎 -> 规则 页面,点击 创建 按钮进入创建页面。

规则 SQL
规则 SQL 用于 EMQX 消息以及事件筛选,以下 SQL 表示从 sensor/data 主题筛选出 payload 数据:

SELECT
  payload
FROM
  "sensor/data"
  
  
  响应动作
  
  关联资源:HTTP 服务器配置信息
  消息内容模板:此处为携带数据的 INSERT SQL
  
  INSERT INTO test.sensor_data VALUES(
  now,
  ${payload.temperature},
  ${payload.humidity},
  ${payload.volume},
  ${payload.PM10},
  ${payload.pm25},
  ${payload.SO2},
  ${payload.NO2},
  ${payload.CO},
  '${payload.id}',
  ${payload.area},
  ${payload.ts}
)
  
  
 点击响应动作下的 添加 按钮,在弹出框内选择 发送数据到 Web 服务,点击 新建资源 新建一个 WebHook 资源。
 
 资源类型选择 Webhook,请求 URL 填写 http://127.0.0.1:6041/rest/sql,请求方法选择 POST, 还需添加 Authorization 请求头作为认证信息 。

Authorization 的值为 Basic + TDengine 的 {username}:{password} 经过 Base64 编码之后的字符串, 例如 root:taosdata 编码后实际填入的值为:Basic cm9vdDp0YW9zZGF0YQ==


 安装mattx工具,连接emqx,发送数据
 主题:
 sensor/data
 数据
 {
  "temperature": 50,
  "humidity": 20,
  "volume": 10,
  "PM10": 10,
  "pm25": 30,
  "SO2": 1,
  "NO2": 2,
  "CO": 4,
  "id": "10-c6-1f-1a-1f-47",
  "area": 1,
  "ts": 1596157444170
  }

或者使用python脚本发送


import json
import random
import time
import paho.mqtt.client as mqtt
from datetime import datetime

# EMQX服务器配置
EMQX_BROKER = "localhost"  # 如果EMQX运行在本机
EMQX_PORT = 1883           # MQTT默认端口
EMQX_TOPIC = "sensor/data"  # MQTT主题

# 创建MQTT客户端
client = mqtt.Client()
client.username_pw_set("mqtt_user_1", "xxx")
# 连接到EMQX
try:
    client.connect(EMQX_BROKER, EMQX_PORT, 60)
    print(f"连接到EMQX服务器 {EMQX_BROKER}:{EMQX_PORT}")
except Exception as e:
    print(f"连接失败: {e}")
    exit(1)

# 生成模拟传感器数据
def generate_sensor_data():
    # 当前时间戳
    timestamp = datetime.now().isoformat()
    
    # 生成模拟数据
    data = {   
        "temperature": round(random.uniform(20.0, 30.0), 2),
        "humidity": round(random.uniform(40.0, 80.0), 2),
        "volume": round(random.uniform(10.0, 80.0), 2),
        "PM10": round(random.uniform(20.0, 100.0), 2),
        "pm25": round(random.uniform(30.0, 80.0), 2),
        "SO2": round(random.uniform(50.0, 80.0), 2),
        "NO2":round(random.uniform(60.0, 80.0), 2),
        "CO":round(random.uniform(70.0, 90.0), 2),
        "id": "10-c6-1f-1a-1f-47",
        "area": 1,
        "ts": 1596157444170
    }
    return data

# 主循环
try:
    print("开始发送模拟数据到EMQX (按Ctrl+C停止)...")
    while True:
        # 生成数据
        sensor_data = generate_sensor_data()
        
        # 转换为JSON字符串
        payload = json.dumps(sensor_data)
        
        # 发布到MQTT主题
        client.publish(EMQX_TOPIC, payload)
        
        # 打印发送的信息
        print(f"发送数据: {payload}")
        
        # 等待一段时间(例如每2秒发送一次)
        time.sleep(2)

except KeyboardInterrupt:
    print("停止发送数据")
finally:
    # 断开连接
    client.disconnect()
    
  
  查看  数据库
  

  SELECT COUNT(*) FROM sensor_data;
  正常可以看到有数据
  
  可视化配置
组件安装完成,模拟数据写入成功后,按照 Grafana 可视化界面的操作指引,完成业务所需数据可视化配置。
     # 获取TDengine容器的IP地址
docker inspect -f '{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' 0a79c4ea2ca5 

添加数据源(Add data source)


查询需要的数据

select * from test.sensor_data

点击刷新即可看到曲线


 

Logo

永洪科技,致力于打造全球领先的数据技术厂商,具备从数据应用方案咨询、BI、AIGC智能分析、数字孪生、数据资产、数据治理、数据实施的端到端大数据价值服务能力。

更多推荐