mirror of
https://gitee.com/js-yhsec/energy_storage.git
synced 2026-05-27 18:59:26 +08:00
调试修改MQTT通讯解析
This commit is contained in:
@@ -5,85 +5,44 @@
|
||||
#include "app/AppData.h"
|
||||
#include "app/Station.h"
|
||||
#include "app/Device.h"
|
||||
#include "app/DataStruct.h"
|
||||
|
||||
#define TIMEOUT 10000L
|
||||
|
||||
std::string REGAddrOffset(std::string addr, int offset)
|
||||
{
|
||||
unsigned int val;
|
||||
std::stringstream ss;
|
||||
ss << std::hex << addr;
|
||||
ss >> val;
|
||||
return Utils::toHexStr(val + offset);
|
||||
}
|
||||
|
||||
static std::map<std::string, std::map<std::string, REGInfo>> g_mapRegInfo;
|
||||
|
||||
void MqttClient::loadDataStruct(std::string filename)
|
||||
{
|
||||
njson json;
|
||||
JSON::load(filename, json);
|
||||
|
||||
// 遍历 JSON 对象
|
||||
for (auto& jsonitem : json.items())
|
||||
{
|
||||
std::string name = jsonitem.key();
|
||||
auto& jsonnodeItem = jsonitem.value();
|
||||
//int count = jsonnodeItem["count"];
|
||||
auto jsonaddrs = jsonnodeItem["addr"];
|
||||
|
||||
auto& mapItem = g_mapRegInfo[name];
|
||||
int size = 0;
|
||||
for (int i = 0; i<2; ++i)
|
||||
{
|
||||
for (auto& item : jsonaddrs)
|
||||
{
|
||||
std::string addr = item["key"];
|
||||
if (i > 0)
|
||||
{
|
||||
addr = REGAddrOffset(addr, size*i);
|
||||
}
|
||||
mapItem[addr] = REGInfo(addr, item["datatype"], item["remark"]);
|
||||
if (i ==0)
|
||||
{
|
||||
size += mapItem[addr].bytes;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
//
|
||||
|
||||
int MqttClient::init(string addr, string clientId, string username, string password)
|
||||
{
|
||||
if (isConnected)
|
||||
{
|
||||
return MQTTASYNC_SUCCESS;
|
||||
}
|
||||
if (addr.empty())
|
||||
{
|
||||
return MQTTASYNC_FAILURE;
|
||||
}
|
||||
|
||||
if (isConnected)
|
||||
{
|
||||
return MQTTASYNC_SUCCESS;
|
||||
}
|
||||
isConnected = false;
|
||||
this->addr = addr;
|
||||
this->clientId = clientId;
|
||||
|
||||
this->mapTopicInfo["EMS_YX"] = TopicInfo("EMS_YX", 101);
|
||||
//this->mapTopicInfo["EMS_YX"] = TopicInfo("EMS_YX", 101);
|
||||
this->mapTopicInfo["EMS_YC"] = TopicInfo("EMS_YC", 101);
|
||||
this->mapTopicInfo["EMS_YT"] = TopicInfo("EMS_YT", 101);
|
||||
this->mapTopicInfo["PCS_YX"] = TopicInfo("PCS_YX", 102, 1);
|
||||
this->mapTopicInfo["PCS_YC"] = TopicInfo("PCS_YC", 102, 1);
|
||||
this->mapTopicInfo["PCU_YX"] = TopicInfo("PCU_YX", 103);
|
||||
this->mapTopicInfo["PCU_YC"] = TopicInfo("PCU_YC", 103);
|
||||
this->mapTopicInfo["BMS_YX"] = TopicInfo("BMS_YX", 104);
|
||||
this->mapTopicInfo["BMS_YC"] = TopicInfo("BMS_YC", 104);
|
||||
this->mapTopicInfo["BCU_YX"] = TopicInfo("BCU_YX", 105, 1);
|
||||
this->mapTopicInfo["BCU_YC"] = TopicInfo("BCU_YC", 105, 1);
|
||||
this->mapTopicInfo["MEM_YC"] = TopicInfo("MEM_YC", 3);
|
||||
this->mapTopicInfo["Cooling_YC"] = TopicInfo("Cooling_YC", 110);
|
||||
this->mapTopicInfo["TH_YC"] = TopicInfo("TH_YC", 111);
|
||||
this->mapTopicInfo["Gateway_YX"] = TopicInfo("Gateway_YX", 112);
|
||||
this->mapTopicInfo["Charger_YC"] = TopicInfo("Charger_YC", 113);
|
||||
//this->mapTopicInfo["EMS_YT"] = TopicInfo("EMS_YT", 101);
|
||||
//this->mapTopicInfo["PCS_YX"] = TopicInfo("PCS_YX", 102, 1);
|
||||
//this->mapTopicInfo["PCS_YC"] = TopicInfo("PCS_YC", 102, 1);
|
||||
//this->mapTopicInfo["PCU_YX"] = TopicInfo("PCU_YX", 103);
|
||||
//this->mapTopicInfo["PCU_YC"] = TopicInfo("PCU_YC", 103);
|
||||
//this->mapTopicInfo["BMS_YX"] = TopicInfo("BMS_YX", 104); // BMS没有遥信
|
||||
//this->mapTopicInfo["BMS_YC"] = TopicInfo("BMS_YC", 104);
|
||||
//this->mapTopicInfo["BCU_YX"] = TopicInfo("BCU_YX", 105, 1);
|
||||
//this->mapTopicInfo["BCU_YC"] = TopicInfo("BCU_YC", 105, 1);
|
||||
//this->mapTopicInfo["MEM_YC"] = TopicInfo("MEM_YC", 3, 1);
|
||||
//this->mapTopicInfo["TH_YC"] = TopicInfo("TH_YC", 10, 1);
|
||||
//this->mapTopicInfo["Fire40_YX"] = TopicInfo("Fire40_YX", 7, 1);
|
||||
this->mapTopicInfo["Cooling_YC"] = TopicInfo("Cooling_YC", 14, 1);
|
||||
this->mapTopicInfo["Cooling_YX"] = TopicInfo("Cooling_YX", 14, 1);
|
||||
//this->mapTopicInfo["Gateway_YX"] = TopicInfo("Gateway_YX", 15, 1);
|
||||
//this->mapTopicInfo["Gateway_YC"] = TopicInfo("Gateway_YC", 15, 1);
|
||||
//this->mapTopicInfo["Charger_YC"] = TopicInfo("Charger_YC", 106, 1);
|
||||
|
||||
MQTTAsync_connectOptions option = MQTTAsync_connectOptions_initializer;
|
||||
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
|
||||
@@ -211,11 +170,11 @@ int MqttClient::publish(std::string topic, std::string text)
|
||||
int rc = MQTTAsync_sendMessage(client, topicName.c_str(), &msg, &options);
|
||||
if (rc == MQTTASYNC_SUCCESS)
|
||||
{
|
||||
spdlog::info("[mqtt] publish MQTTAsync_sendMessage success, topic={}, text={}", topicName, text);
|
||||
spdlog::info("[mqtt] publish success, topic={}, text={}", topicName, text);
|
||||
}
|
||||
else
|
||||
{
|
||||
spdlog::error("[mqtt] publish MQTTAsync_sendMessage error, topic={}, text={}", topicName, text);
|
||||
spdlog::error("[mqtt] publish error, topic={}, text={}", topicName, text);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
@@ -224,6 +183,7 @@ int MqttClient::polling()
|
||||
{
|
||||
if (!isConnected)
|
||||
{
|
||||
spdlog::error("[mqtt] poll error, mqtt is not connected, clientId={}", clientId);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -233,21 +193,31 @@ int MqttClient::polling()
|
||||
|
||||
auto& appdata = Application::data();
|
||||
auto station = appdata.getStationByCode(clientId);
|
||||
if (!station)
|
||||
{
|
||||
spdlog::error("[mqtt] poll error, get station NULL, mqtt clientId={}", clientId);
|
||||
return 0;
|
||||
}
|
||||
|
||||
for (auto& item: mapTopicInfo)
|
||||
{
|
||||
auto& topicInfo = item.second;
|
||||
if (topicInfo.polling)
|
||||
{
|
||||
if (station)
|
||||
std::vector<std::shared_ptr<Device>> vecDevice;
|
||||
station->getDeviceByType(topicInfo.deviceType, vecDevice);
|
||||
for (auto device: vecDevice)
|
||||
{
|
||||
std::vector<std::shared_ptr<Device>> vecDevice;
|
||||
station->getDeviceByType(topicInfo.deviceType, vecDevice);
|
||||
for (auto device: vecDevice)
|
||||
json["no"] = Utils::toInt(device->code);
|
||||
if (topicInfo.name == "Gateway_YC")
|
||||
{
|
||||
json["no"] = Utils::toInt(device->code);
|
||||
this->publish(topicInfo.name, json.dump());
|
||||
json["addr"] = {"40001", "40002", "40021", "40038"};
|
||||
}
|
||||
else
|
||||
{
|
||||
json["addr"] = njson::array();
|
||||
}
|
||||
this->publish(topicInfo.name, json.dump());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -278,6 +248,116 @@ std::string GetSubStr(std::string c, std::string& str)
|
||||
return v;
|
||||
}
|
||||
|
||||
// 交付完成回调(可选)
|
||||
void MqttClient::onDeliveryComplete(MQTTAsync_token token)
|
||||
{
|
||||
//spdlog::info("MQTT delivery complete, token={}", token);
|
||||
}
|
||||
|
||||
void MqttClient::onConnectSuccess( MQTTAsync_successData* resp)
|
||||
{
|
||||
spdlog::info("[mqtt] connect to {} success, clientId={}.", addr, clientId);
|
||||
this->isConnected = true;
|
||||
this->subscribe();
|
||||
}
|
||||
|
||||
void MqttClient::onConnectFaiure(MQTTAsync_failureData* resp)
|
||||
{
|
||||
spdlog::error("[mqtt] connect to {} error, clientId={}.", addr, clientId);
|
||||
this->isConnected = false;
|
||||
this->destory();
|
||||
}
|
||||
|
||||
|
||||
void MqttClient::ParseArrivedMessage(njson& json, string clientId, string command, std::shared_ptr<Station> station)
|
||||
{
|
||||
std::string stationNo = clientId;
|
||||
|
||||
auto mapRegPtr = REGAddr::getRegMap(command);
|
||||
if (!mapRegPtr)
|
||||
{
|
||||
spdlog::error("[mqtt] get register add info error, clientId={}, stationId={}, command={}", clientId, stationNo, command);
|
||||
return;
|
||||
}
|
||||
|
||||
auto iterTopic = mapTopicInfo.find(command);
|
||||
if (iterTopic == mapTopicInfo.end())
|
||||
{
|
||||
spdlog::error("[mqtt] get topic info error, clientId={}, stationId={}, command={}", clientId, stationNo, command);
|
||||
return;
|
||||
}
|
||||
TopicInfo& topicInfo = iterTopic->second;
|
||||
|
||||
int deviceNo = -1;
|
||||
JSON::read(json, "no", deviceNo);
|
||||
auto device = station->getDeviceByType(topicInfo.deviceType, Utils::toStr(deviceNo));
|
||||
if (!device)
|
||||
{
|
||||
return;
|
||||
}
|
||||
for (auto& item: json.items())
|
||||
{
|
||||
std::string key = item.key();
|
||||
if (key != "ts" && key != "no")
|
||||
{
|
||||
auto data = json.at(key);
|
||||
if (data.is_array())
|
||||
{
|
||||
std::string addrText;
|
||||
auto iter = mapRegPtr->find(key);
|
||||
for (int i = 0; i<data.size(); ++i)
|
||||
{
|
||||
if (iter != mapRegPtr->end())
|
||||
{
|
||||
|
||||
auto addr = iter->first;
|
||||
int val = data[i];
|
||||
device->setParam(addr, val);
|
||||
spdlog::info("[mqtt] read [{}]={},{}", addr, val, iter->second.remark);
|
||||
|
||||
if (command == "EMS_YC" && addr == "0x110C")
|
||||
{
|
||||
int a = 30;
|
||||
a = 100;
|
||||
}
|
||||
|
||||
if (command == "EMS_YC")
|
||||
{
|
||||
station->setRuntimeData(addr, val);
|
||||
}
|
||||
else if (command == "Fire40_YX")
|
||||
{
|
||||
station->setFire40Data(deviceNo, addr, val);
|
||||
}
|
||||
else if (command == "TH_YC")
|
||||
{
|
||||
station->setTHData(deviceNo, addr, val);
|
||||
}
|
||||
else if (command == "Cooling_YX" || command == "Cooling_YC")
|
||||
{
|
||||
station->setCoolingData(deviceNo, addr, val);
|
||||
}
|
||||
else if (command == "Gateway_YX")
|
||||
{
|
||||
//if (key == "CDZ") "CDZ": 1, //充电桩 1:在线,0:离线
|
||||
//else if (key == "EMU") //储能 1:在线,0:离线
|
||||
}
|
||||
++iter;
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (data.is_number())
|
||||
{
|
||||
device->setParam(key, data.get<int>());
|
||||
}
|
||||
else if (data.is_string())
|
||||
{
|
||||
device->setParam(key, Utils::toInt(data.get<std::string>()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int MqttClient::onMessageArrived(char* topic, int topicLen, MQTTAsync_message* msg)
|
||||
{
|
||||
std::string topicStr = topic;
|
||||
@@ -308,62 +388,32 @@ int MqttClient::onMessageArrived(char* topic, int topicLen, MQTTAsync_message* m
|
||||
return 1;
|
||||
}
|
||||
|
||||
auto iter = g_mapRegInfo.find(command);
|
||||
if (iter == g_mapRegInfo.end())
|
||||
if (command == "Gateway_YC")
|
||||
{
|
||||
spdlog::error("[mqtt] get register add info error, clientId={}, stationId={}, command={}", clientId, stationNo, command);
|
||||
return 1;
|
||||
}
|
||||
std::map<std::string, REGInfo>& mapRegInfo = iter->second;
|
||||
|
||||
|
||||
auto iterTopic = mapTopicInfo.find(command);
|
||||
if (iterTopic == mapTopicInfo.end())
|
||||
{
|
||||
spdlog::error("[mqtt] get topic info error, clientId={}, stationId={}, command={}", clientId, stationNo, command);
|
||||
return 1;
|
||||
}
|
||||
TopicInfo& topicInfo = iterTopic->second;
|
||||
|
||||
int deviceNo = -1;
|
||||
JSON::read(json, "no", deviceNo);
|
||||
auto device = station->getDeviceByType(topicInfo.deviceType, Utils::toStr(deviceNo));
|
||||
if (!device)
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
|
||||
for (auto& item: json.items())
|
||||
{
|
||||
std::string key = item.key();
|
||||
if (key != "ts" && key != "no")
|
||||
for (auto& item: json.items())
|
||||
{
|
||||
auto data = json.at(key);
|
||||
if (data.is_array())
|
||||
{
|
||||
auto iter = mapRegInfo.find(key);
|
||||
for (int i = 0; i<data.size(); ++i)
|
||||
{
|
||||
if (iter != mapRegInfo.end())
|
||||
{
|
||||
auto addr = iter->first;
|
||||
auto& val = data[i];
|
||||
//spdlog::info("[mqtt] read register addr: [{}]={}, {}", addr, val, iter->second.remark);
|
||||
device->setParam(addr, val);
|
||||
++iter;
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (data.is_number())
|
||||
{
|
||||
device->setParam(key, data.get<int>());
|
||||
}
|
||||
else if (data.is_string())
|
||||
{
|
||||
device->setParam(key, Utils::toInt(data.get<std::string>()));
|
||||
}
|
||||
std::string key = item.key();
|
||||
auto& val = item.value();
|
||||
if (key == "40001") { spdlog::info("[mqtt] read register addr: [{}]={}, {}", key, val.dump(), "模式"); }
|
||||
else if (key == "40002") { spdlog::info("[mqtt] read register addr: [{}]={}, {}", key, val.dump(), "峰谷时间段"); }
|
||||
else if (key == "40021") { spdlog::info("[mqtt] read register addr: [{}]={}, {}", key, val.dump(), "自定时间段"); }
|
||||
else if (key == "40038") { spdlog::info("[mqtt] read register addr: [{}]={}, {}", key, val.dump(), "其他参数"); }
|
||||
}
|
||||
}
|
||||
else if (command == "Gateway_YX")
|
||||
{
|
||||
for (auto& item: json.items())
|
||||
{
|
||||
std::string key = item.key();
|
||||
auto& val = item.value();
|
||||
if (key == "cdz") { spdlog::info("[mqtt] read register addr: [{}]={}, {}", key, val.dump(), "充电桩通讯状态"); }
|
||||
else if (key == "emu") { spdlog::info("[mqtt] read register addr: [{}]={}, {}", key, val.dump(), "充电桩通讯状态"); }
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
ParseArrivedMessage(json, clientId, command, station);
|
||||
}
|
||||
|
||||
// 必须释放消息内存!
|
||||
MQTTAsync_freeMessage(&msg);
|
||||
@@ -371,67 +421,6 @@ int MqttClient::onMessageArrived(char* topic, int topicLen, MQTTAsync_message* m
|
||||
return 1; // 1表示消息已经处理
|
||||
}
|
||||
|
||||
// 交付完成回调(可选)
|
||||
void MqttClient::onDeliveryComplete(MQTTAsync_token token)
|
||||
{
|
||||
//spdlog::info("MQTT delivery complete, token={}", token);
|
||||
}
|
||||
|
||||
void MqttClient::onConnectSuccess( MQTTAsync_successData* resp)
|
||||
{
|
||||
spdlog::info("[mqtt] connect to {} success, clientId={}.", addr, clientId);
|
||||
this->isConnected = true;
|
||||
this->subscribe();
|
||||
}
|
||||
|
||||
void MqttClient::onConnectFaiure(MQTTAsync_failureData* resp)
|
||||
{
|
||||
spdlog::error("[mqtt] connect to {} error, clientId={}.", addr, clientId);
|
||||
this->isConnected = false;
|
||||
this->destory();
|
||||
}
|
||||
|
||||
void MqttClient::parseEMS_YX(std::shared_ptr<Station> station, njson& json, std::map<std::string, REGInfo>& mapRegInfo)
|
||||
{
|
||||
int deviceNo = -1;
|
||||
JSON::read(json, "no", deviceNo);
|
||||
auto device = station->getDeviceByType(101, Utils::toStr(deviceNo));
|
||||
if (!device)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
for (auto& item: json.items())
|
||||
{
|
||||
std::string key = item.key();
|
||||
if (key != "ts" && key != "no")
|
||||
{
|
||||
auto data = json.at(key);
|
||||
if (data.is_array())
|
||||
{
|
||||
auto iter = mapRegInfo.find(key);
|
||||
for (int i = 0; i<data.size(); ++i)
|
||||
{
|
||||
if (iter != mapRegInfo.end())
|
||||
{
|
||||
auto addr = iter->first;
|
||||
device->mapParams[addr] = JSON::readStr(data[i], addr);
|
||||
++iter;
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (data.is_number())
|
||||
{
|
||||
device->mapParams[key] = Utils::toStr(data.get<int>());
|
||||
}
|
||||
else if (data.is_string())
|
||||
{
|
||||
device->mapParams[key] = Utils::toStr(data.get<int>());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
string MQTT::pack(std::string name)
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user