实现MQTT协议消息订阅和消息解析流程

This commit is contained in:
lixiaoyuan
2025-09-08 19:34:12 +08:00
parent 566a3b050c
commit e2995eff92
17 changed files with 642 additions and 288 deletions

View File

@@ -82,7 +82,7 @@ static void GetRequestParam(const httplib::Request& req, const std::vector<std::
{
njson json;
JSON::parse(req.body, json);
JsonToFields(json, vecKeys, fields);
}
}
@@ -169,6 +169,10 @@ static std::map<std::string, HandlerOptions> g_mapHttpHandlerGet =
{"/queryStatDayList", HandlerOptions(&HttpEntity::queryStatDayList, {})},
{"/queryEnvironment", HandlerOptions(&HttpEntity::queryEnvironment, { "station_id"})},
{"/queryServiceApiList", HandlerOptions(&HttpEntity::queryServiceApiList, {})},
{"/deleteServiceApi", HandlerOptions(&HttpEntity::deleteServiceApi, {"api_id"})},
//{"/insert", HandlerOptions(&HttpEntity::insert, {})},
//{"/update", HandlerOptions(&HttpEntity::update, {})},
@@ -194,6 +198,9 @@ static std::map<std::string, HandlerOptions> g_mapHttpHandlerPost
{"/insertPolicy", HandlerOptions(&HttpEntity::insertPolicy, { DMPolicy::NAME})},
{"/updatePolicy", HandlerOptions(&HttpEntity::updatePolicy, { DMPolicy::POLICY_ID})},
{"/insertServiceApi", HandlerOptions(&HttpEntity::insertServiceApi, {})},
{"/updateServiceApi", HandlerOptions(&HttpEntity::updateServiceApi, {"api_id"})},
};
bool CheckHttpToken(const httplib::Request& req)
@@ -495,6 +502,10 @@ Errcode HttpEntity::updateStation(const httplib::Request& req, njson& json, std:
{
Fields params;
GetRequestParam(req, {"station_id", "name", "address", "lon", "lat", "tel", "capacity", "status"}, params);
params.check("capacity", "", "0.0");
params.check("lon", "", "0.0");
params.check("lat", "", "0.0");
params.check("status", "", "1");
return DAO::updateStationById(params);
};
@@ -683,6 +694,8 @@ Errcode HttpEntity::queryDevicByCategory(const httplib::Request& req, njson& jso
jsonnode["name"] = device->name;
jsonnode["code"] = device->code;
jsonnode["type"] = device->type;
jsonnode["typename"] = Application::data().getDeviceNameById(device->type);
jsonnode["view"] = 1;
jsonnode["is_online"] = device->online;// ? "在线" : "离线";
jsonnode["is_error"] = device->err;// ? "故障" : "正常";
@@ -799,19 +812,25 @@ Errcode HttpEntity::updateAlertLog(const httplib::Request& req, njson& json, std
Errcode HttpEntity::queryPredictionDetail(const httplib::Request& req, njson& json, std::string& errmsg)
{
njson jsonData = njson::array();
for (int i = 1; i<=5; i++)
std::vector<std::string> vecStoreIn(144), vecStoreOut(144), vecCharge(144), vecSolar(144), vecSolarP(144);
for (int i = 0; i<144; ++i)
{
njson jnode;
jnode["datatype"] = i;
njson jsonValues = njson::array();
for (int i = 0; i<1440; ++i)
{
jsonValues.push_back(float(Utils::random(50, 100)));
}
jnode["values"] = jsonValues;
jsonData.push_back(jnode);
vecStoreIn[i] = Utils::toStr(float(Utils::random(50, 100)));
vecStoreOut[i] = Utils::toStr(float(Utils::random(50, 100)));
vecCharge[i] = Utils::toStr(float(Utils::random(50, 100)));
vecSolar[i] = Utils::toStr(float(Utils::random(50, 100)));
vecSolarP[i] = Utils::toStr(float(Utils::random(50, 100)));
}
json["data"] = jsonData;
json["data"] = {
{"W_store_in", vecStoreIn},
{"W_store_out", vecStoreOut},
{"W_charge", vecCharge},
{"W_solar", vecSolar},
{"P_solar", vecSolarP }
};
return Errcode::OK;
}
@@ -1010,4 +1029,42 @@ Errcode HttpEntity::queryEnvironment(const httplib::Request& req, njson& json, s
}
json["data"] = jsondata;
return Errcode::OK;
}
Errcode HttpEntity::queryServiceApiList(const httplib::Request& req, njson& json, std::string& errmsg)
{
PageInfo pageinfo;
pageinfo.index = Utils::toInt(req.get_param_value("page"));
pageinfo.size = Utils::toInt(req.get_param_value("page_size"));
std::vector<Fields> result;
std::string sql = "SELECT * FROM serviceapi;" ;
auto err = DAO::exec(NULL, sql, result);
HttpHelper::setPagination(pageinfo, result, json);
return err;
}
Errcode HttpEntity::insertServiceApi(const httplib::Request& req, njson& json, std::string& errmsg)
{
Fields params;
GetRequestParam(req, {"name", "describe", "params", "is_open"}, params);
if (params.size() == 0) { return Errcode::ERR_PARAM; }
return DAO::insert(NULL, "serviceapi", params);
}
Errcode HttpEntity::updateServiceApi(const httplib::Request& req, njson& json, std::string& errmsg)
{
Fields params;
GetRequestParam(req, {"api_id", "name", "describe", "params", "is_open"}, params);
if (params.size() == 0) { return Errcode::ERR_PARAM; }
return DAO::update(NULL, "serviceapi", params, "api_id");
}
Errcode HttpEntity::deleteServiceApi(const httplib::Request& req, njson& json, std::string& errmsg)
{
Fields params;
GetRequestParam(req, {"api_id"}, params);
return DAO::remove(NULL, "serviceapi", "api_id", params.value("api_id"));
}

View File

@@ -85,4 +85,10 @@ public:
Errcode queryStatDayList(const httplib::Request& req, njson& json, std::string& errmsg);
Errcode queryEnvironment(const httplib::Request& req, njson& json, std::string& errmsg);
Errcode queryServiceApiList(const httplib::Request& req, njson& json, std::string& errmsg);
Errcode insertServiceApi(const httplib::Request& req, njson& json, std::string& errmsg);
Errcode updateServiceApi(const httplib::Request& req, njson& json, std::string& errmsg);
Errcode deleteServiceApi(const httplib::Request& req, njson& json, std::string& errmsg);
};

View File

@@ -1,10 +1,21 @@
#include "MqttEntity.h"
#include "common/Spdlogger.h"
#include "common/JsonN.h"
#include "common/Utils.h"
#include "app/Application.h"
#include "app/AppData.h"
#include "app/Station.h"
#include "app/Device.h"
#define TIMEOUT 10000L
std::string REGAddrOffset(std::string addr, int offset)
{
unsigned int val;
std::stringstream ss;
ss << std::hex << addr;
ss >> val;
return Utils::toHexStr(val + offset);
}
static std::map<std::string, std::map<std::string, REGInfo>> g_mapRegInfo;
@@ -16,12 +27,28 @@ void MqttClient::loadDataStruct(std::string filename)
// 遍历 JSON 对象
for (auto& jsonitem : json.items())
{
auto& mapItem = g_mapRegInfo[jsonitem.key()];
std::string name = jsonitem.key();
auto jsonnodeItem = jsonitem.value();
int count = jsonnodeItem["count"];
auto jsonaddrs = jsonnodeItem["addr"];
for (auto& itemaddrs : jsonitem.value().items())
auto& mapItem = g_mapRegInfo[name];
int size = 0;
for (int i = 0; i<2; ++i)
{
auto& jsonreg = itemaddrs.value();
mapItem[jsonreg["key"]] = REGInfo(jsonreg["key"], jsonreg["datatype"], jsonreg["remark"]);
for (auto& item : jsonaddrs)
{
std::string addr = item["key"];
if (i > 0)
{
addr = REGAddrOffset(addr, size*i);
}
mapItem[addr] = REGInfo(addr, item["datatype"], item["remark"]);
if (i ==0)
{
size += mapItem[addr].bytes;
}
}
}
}
}
@@ -188,7 +215,12 @@ std::string GetSubStr(std::string c, std::string& str)
if (pos != string::npos)
{
v = str.substr(0, pos);
str = str.substr(pos);
str = str.substr(pos+1);
}
else
{
v = str;
str = "";
}
return v;
}
@@ -208,51 +240,69 @@ int MqttClient::onMessageArrived(char* topic, int topicLen, MQTTAsync_message* m
std::string deviceCode = GetSubStr("/", topicStr);
//EMS遥信
//EMS遥测
//EMS遥调
//PCU遥信
//PCU遥测
//PCS遥信
//PCS遥测
//BMS遥测
//BCU遥信
//BCU遥测
//电表遥测
//温湿度遥测
//消防遥信4.0
//冷机遥信
//冷机遥测
//充电桩遥测
//网关遥信
//网关遥测
//网关遥调
//台区
njson json;
bool ret = JSON::parse(payload, json);
if (!ret)
{
spdlog::error("[mqtt] json parse error.");
return 1;
}
auto station = Application::data().getStation(Utils::toInt(stationId));
if (!station)
{
spdlog::error("[mqtt] get station error, clientId={}, stationId={}", clientId, stationId);
return 1;
}
if (command == "EMS_YX") { this->parseEMS_YX(payload); }
else if (command == "EMS_YC") { this->parseEMS_YC(payload); }
else if (command == "PCU_YX") { this->parsePCU_YX(payload); }
else if (command == "PCU_YC") { this->parsePCU_YC(payload); }
else if (command == "PCS_YX") { this->parsePCS_YX(payload); }
else if (command == "PCS_YC") { this->parsePCS_YC(payload); }
else if (command == "BMS_YC") { this->parseBMS_YC(payload); }
else if (command == "BCU_YX") { this->parseBCU_YX(payload); }
else if (command == "BCU_YC") { this->parseBCU_YC(payload); }
else if (command == "MEM_YC") { this->parseMEM_YC(payload); }
else if (command == "TH_YC") { this->parseTH_YC(payload); }
else if (command == "Fire40_YX") { this->parseFire40_YX(payload); }
else if (command == "Cooling_YX") { this->parseCooling_YX(payload); }
else if (command == "Cooling_YC") { this->parseCooling_YC(payload); }
else if (command == "Charger_YC") { this->parseCharger_YC(payload); }
else if (command == "Gateway_YX") { this->parseGateway_YX(payload); }
else if (command == "Gateway_YC") { this->parseGateway_YC(payload); }
else if (command == "Gateway_YT") { this->parseGateway_YT(payload); }
else if (command == "TQ") { this->parseTQ(payload); }
auto iter = g_mapRegInfo.find(command);
if (iter == g_mapRegInfo.end())
{
spdlog::error("[mqtt] get register add info error, clientId={}, stationId={}, command={}", clientId, stationId, command);
return 1;
}
std::map<std::string, REGInfo>& mapRegInfo = iter->second;
int deviceNo = -1;
JSON::read(json, "no", deviceNo);
auto device = station->getDeviceByType(101, Utils::toStr(deviceNo));
if (!device)
{
return 1;
}
for (auto& item: json.items())
{
std::string key = item.key();
if (key != "ts" && key != "no")
{
auto data = json.at(key);
if (data.is_array())
{
auto iter = mapRegInfo.find(key);
for (int i = 0; i<data.size(); ++i)
{
if (iter != mapRegInfo.end())
{
auto addr = iter->first;
device->setParam(addr, JSON::readStr(data[i], addr));
++iter;
}
}
}
else if (data.is_number())
{
device->setParam(key, Utils::toStr(data.get<int>()));
}
else if (data.is_string())
{
device->setParam(key, Utils::toStr(data.get<int>()));
}
}
}
// 必须释放消息内存!
MQTTAsync_freeMessage(&msg);
MQTTAsync_free(topic);
return 1; // 1表示消息已经处理
}
@@ -299,28 +349,159 @@ void MqttClient::onConnectFaiure(MQTTAsync_failureData* resp)
this->destory();
}
void MqttClient::parseEMS_YX(std::string& text){}
void MqttClient::parseEMS_YC(std::string& text) {};
void MqttClient::parsePCU_YX(std::string& text) {};
void MqttClient::parsePCU_YC(std::string& text) {};
void MqttClient::parsePCS_YX(std::string& text) {};
void MqttClient::parsePCS_YC(std::string& text) {};
void MqttClient::parseBMS_YC(std::string& text) {};
void MqttClient::parseBCU_YX(std::string& text) {};
void MqttClient::parseBCU_YC(std::string& text) {};
void MqttClient::parseMEM_YC(std::string& text) {};
void MqttClient::parseTH_YC(std::string& text) {};
void MqttClient::parseFire40_YX(std::string& text) {};
void MqttClient::parseCooling_YX(std::string& text) {};
void MqttClient::parseCooling_YC(std::string& text) {};
void MqttClient::parseCharger_YC(std::string& text) {};
void MqttClient::parseGateway_YX(std::string& text) {};
void MqttClient::parseGateway_YC(std::string& text) {};
void MqttClient::parseGateway_YT(std::string& text) {};
void MqttClient::parseTQ(std::string& text) {};
string MQTT::packEquipmentInfo()
void MqttClient::parseEMS_YX(std::shared_ptr<Station> station, njson& json, std::map<std::string, REGInfo>& mapRegInfo)
{
njson jsonroot;
return jsonroot.dump();
int deviceNo = -1;
JSON::read(json, "no", deviceNo);
auto device = station->getDeviceByType(101, Utils::toStr(deviceNo));
if (!device)
{
return;
}
for (auto& item: json.items())
{
std::string key = item.key();
if (key != "ts" && key != "no")
{
auto data = json.at(key);
if (data.is_array())
{
auto iter = mapRegInfo.find(key);
for (int i = 0; i<data.size(); ++i)
{
if (iter != mapRegInfo.end())
{
auto addr = iter->first;
device->mapParams[addr] = JSON::readStr(data[i], addr);
++iter;
}
}
}
else if (data.is_number())
{
device->mapParams[key] = Utils::toStr(data.get<int>());
}
else if (data.is_string())
{
device->mapParams[key] = Utils::toStr(data.get<int>());
}
}
}
}
string MQTT::pack(std::string name)
{
njson json;
json["ts"] = Utils::time();
json["no"] = 1;
if (name == "EMS_YC")
{
//A相电压 R uint32 1V 0x107E
//B相电压 R uint32 1V 0x1080
//C相电压 R uint32 1V 0x1082
//A相电流 R int32 1A 0x1084
//B相电流 R int32 1A 0x1086
//C相电流 R int32 1A 0x1088
//储能系统SOC R uint16 0.1 0x107A
//储能系统SOH R uint16 0.1 0x107B
json["addr"] = {"0x107A", "0x107B", "0x107E", "0x1080", "0x1082", "0x1084", "0x1086", "0x1088"};
}
else if (name == "PCS_YC")
{
//总充电量 R uint32 1kWh 0x0003
//总放电量 R uint32 1kWh 0x0005
//A相电压 R int16 1V 0x0010
//B相电压 R int16 1V 0x0011
//C相电压 R int16 1V 0x0012
//A相电流 R int16 1A 0x0019
//B相电流 R int16 1A 0x001A
//C相电流 R int16 1A 0x001B
//三相总有功功率 R int16 1kW 0x0025
//三相总无功功率 R int16 1kVar 0x0026
//三相总视在功率 R int16 1kVA 0x0027
//三相总功率因数 R int16 1 0x0028
//充电功率 R int16 1kW 0x002C
//放电功率 R int16 1kW 0x002D
json["addr"] = {"0x0003", "0x0005", "0x0010", "0x0011", "0x0012", "0x0019", "0x001A", "0x001B", "0x0025", "0x0026", "0x0027", "0x0028", "0x002C", "0x002D"};
}
else if (name == "PCU_YC")
{
//PCS侧线A相电压 R int16 1v 0x0013
//PCS侧线B相电压 R int16 1v 0x0014
//PCS侧线C相电压 R int16 1v 0x0015
//PCS侧功率因数A R int16 1 0x0019
//PCS侧功率因数B R int16 1 0x001A
//PCS侧功率因数C R int16 1 0x001B
//PCS侧相电流A R int16 1A 0x001C
//PCS侧相电流B R int16 1A 0x001D
//PCS侧相电流C R int16 1A 0x001E
//PCS侧三相总有功功率 R int16 1kW 0x0028
//PCS侧三相总无功功率 R int16 1kVar 0x0029
//PCS侧三相总视在功率 R int16 1kVA 0x002A
//PCS侧三相总功率因数 R int16 1 0x002B
json["addr"] = {"0x0013", "0x0014", "0x0015", "0x1080", "0x1082", "0x1084", "0x1086", "0x1088"};
}
else if (name == "BMS_YC")
{
//SOC R uint16 0.1 0x0001
//SOH R uint16 0.1 0x0002
//电压 R uint32 0.1V 0x0003
//电流 R int32 0.1A 0x0005
//可充电量 R uint32 1kWh 0x0007
//可放电量 R uint32 1kWh 0x0009
//可充电状态 R uint16 1可充电0不可充电 0x0047
//可放电状态 R uint16 1可放电0不可放电 0x0048
//运行状态 R uint16 运行状态 0-正常 1-告警 2-保护 0x0049
//充放电状态 R uint16 0-待机 1-充电 2-放电 0x004A
json["addr"] = {"0x0001", "0x0002", "0x0003", "0x0005", "0x0007", "0x0009", "0x0047", "0x0048", "0x0049", "0x004A"};
}
else if (name == "BCU_YC")
{
//电表类型 R uint16 "0储能站总表 1逆变前侧电表 2逆变后侧电表 3配电柜电表 4并网口电表" 0x0008
//A相电压 R uint32 1V 0x000B
//B相电压 R uint32 1V 0x000D
//C相电压 R uint32 1V 0x000F
//A相电流 R int32 1A 0x0011
//B相电流 R int32 1A 0x0013
//C相电流 R int32 1A 0x0015
//尖段电价 R uint32 1RMB 0x0027
//峰段电价 R uint32 1RMB 0x0029
//平段电价 R uint32 1RMB 0x002B
//谷段电价 R uint32 1RMB 0x002D
//日充电电量 R uint32 1kWh 0x002F
//日放电电量 R uint32 1kWh 0x0031
//日充电费用 R uint32 1RMB 0x0033
//日放电费用 R uint32 1RMB 0x0035
//日收益 R int32 1RMB 0x0037
//总充电电量 R uint32 1kWh 0x004D
//总放电电量 R uint32 1kWh 0x004F
//总充电费用 R uint32 1RMB 0x0051
//总放电费用 R uint32 1RMB 0x0053
//总收益 R int32 1RMB 0x0055
}
else if (name == "TH_YC")
{
//所属通道号 R uint16 1 0x0001
//所属温湿度号 R uint16 1~10 0x0002
//温度 R int16 0.1℃ 0x0003
//湿度 R int16 0.1℃ 0x0004
}
return json.dump();
}

View File

@@ -4,6 +4,9 @@
#include <vector>
#include <functional>
#include "MQTTAsync.h"
#include "common/JsonN.h"
class Station;
struct REGInfo
{
@@ -11,6 +14,7 @@ struct REGInfo
std::string datatype;
int bytes {0};
std::string remark;
int ratio {1};
REGInfo() {}
REGInfo(std::string key, std::string datatype, std::string remark)
@@ -41,32 +45,32 @@ public:
void onConnectSuccess(MQTTAsync_successData* resp);
void onConnectFaiure(MQTTAsync_failureData* resp);
void parseEMS_YX(std::string& text);
void parseEMS_YC(std::string& text);
void parsePCU_YX(std::string& text);
void parsePCU_YC(std::string& text);
void parsePCS_YX(std::string& text);
void parsePCS_YC(std::string& text);
void parseBMS_YC(std::string& text);
void parseBCU_YX(std::string& text);
void parseBCU_YC(std::string& text);
void parseMEM_YC(std::string& text);
void parseTH_YC(std::string& text);
void parseFire40_YX(std::string& text);
void parseCooling_YX(std::string& text);
void parseCooling_YC(std::string& text);
void parseCharger_YC(std::string& text);
void parseGateway_YX(std::string& text);
void parseGateway_YC(std::string& text);
void parseGateway_YT(std::string& text);
void parseTQ(std::string& text);
void parseEMS_YX(std::shared_ptr<Station> station, njson& json, std::map<std::string, REGInfo>& mapRegInfo);
//void parseEMS_YC(std::shared_ptr<Station> station, njson& json, std::map<std::string, REGInfo>& mapRegInfo);
//void parsePCU_YX(std::shared_ptr<Station> station, njson& json, std::map<std::string, REGInfo>& mapRegInfo);
//void parsePCU_YC(std::shared_ptr<Station> station, njson& json, std::map<std::string, REGInfo>& mapRegInfo);
//void parsePCS_YX(std::shared_ptr<Station> station, njson& json, std::map<std::string, REGInfo>& mapRegInfo);
//void parsePCS_YC(std::shared_ptr<Station> station, njson& json, std::map<std::string, REGInfo>& mapRegInfo);
//void parseBMS_YC(std::shared_ptr<Station> station, njson& json, std::map<std::string, REGInfo>& mapRegInfo);
//void parseBCU_YX(std::shared_ptr<Station> station, njson& json, std::map<std::string, REGInfo>& mapRegInfo);
//void parseBCU_YC(std::shared_ptr<Station> station, njson& json, std::map<std::string, REGInfo>& mapRegInfo);
//void parseMEM_YC(std::shared_ptr<Station> station, njson& json, std::map<std::string, REGInfo>& mapRegInfo);
//void parseTH_YC(std::shared_ptr<Station> station, njson& json, std::map<std::string, REGInfo>& mapRegInfo);
//void parseFire40_YX(std::string& text);
//void parseCooling_YX(std::string& text);
//void parseCooling_YC(std::string& text);
//void parseCharger_YC(std::string& text);
//void parseGateway_YX(std::string& text);
//void parseGateway_YC(std::string& text);
//void parseGateway_YT(std::string& text);
//void parseTQ(std::string& text);
public:
std::string clientId;
MQTTAsync client = nullptr;
std::vector<std::string> vecTopic;
std::string addr; // "tcp://localhost:1883"
int qos {1};
int qos {0};
bool isConnected {false};
bool isSubscribed {false};
};
@@ -95,5 +99,5 @@ public:
public:
static string packEquipmentInfo();
static string pack(std::string name);
};