Files
energy_storage/src/protocol/MqttEntity.cpp

464 lines
12 KiB
C++
Raw Normal View History

#include "MqttEntity.h"
#include "common/Spdlogger.h"
2025-09-05 19:44:26 +08:00
#include "common/Utils.h"
#include "app/Application.h"
#include "app/AppData.h"
#include "app/Station.h"
#include "app/Device.h"
2025-09-16 19:38:46 +08:00
#include "app/DataStruct.h"
#define TIMEOUT 10000L
2025-09-19 18:54:36 +08:00
bool MqttClient::load(std::string filename)
{
njson jsonroot;
bool ret = JSON::load(filename, jsonroot);
if (!ret)
{
spdlog::error("[mqtt] load config file failed, filename={}", filename);
return false;
}
return true;
}
std::map<std::string, TopicInfo> MqttClient::s_mapTopicInfo;
2025-09-05 19:44:26 +08:00
2025-09-16 19:38:46 +08:00
int MqttClient::init(string addr, string clientId, string username, string password)
2025-09-05 19:44:26 +08:00
{
2025-09-16 19:38:46 +08:00
if (addr.empty())
2025-09-05 19:44:26 +08:00
{
2025-09-16 19:38:46 +08:00
return MQTTASYNC_FAILURE;
2025-09-05 19:44:26 +08:00
}
if (isConnected)
{
return MQTTASYNC_SUCCESS;
}
2025-09-16 19:38:46 +08:00
isConnected = false;
this->addr = addr;
2025-09-04 19:31:04 +08:00
this->clientId = clientId;
MQTTAsync_connectOptions option = MQTTAsync_connectOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
int rc {0};
// "tcp://localhost:1883"
2025-09-05 19:44:26 +08:00
std::string str = "ESS-" + std::to_string(Utils::random(1000, 9999)) + "-" +clientId;
rc = MQTTAsync_create(&client, addr.c_str(), str.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL);
if (rc != MQTTASYNC_SUCCESS)
{
spdlog::error("[mqtt] MQTTAsync_create error: {}", rc);
return rc;
}
MQTTAsync_connectionLost* onConnectionLost =
[](void* context, char* cause)
{
static_cast<MqttClient*>(context)->onConnectionLost(cause);
};
MQTTAsync_messageArrived* onMessageArrived =
[](void* context, char* topicName, int topicLen, MQTTAsync_message* message)->int
{
return static_cast<MqttClient*>(context)->onMessageArrived(topicName, topicLen, message);
};
MQTTAsync_deliveryComplete* onDeliveryComplete =
[](void* context, MQTTAsync_token token)
{
};
//设置连接丢失、接受消息的回调函数
rc = MQTTAsync_setCallbacks(client, this, onConnectionLost, onMessageArrived, onDeliveryComplete);
if (rc != MQTTASYNC_SUCCESS)
{
spdlog::error("[mqtt] MQTTAsync_setCallbacks error");
2025-09-04 19:31:04 +08:00
this->destory();
return rc;
}
option.keepAliveInterval = 20;
option.cleansession = 1;
option.onSuccess = [](void* context, MQTTAsync_successData* resp) { static_cast<MqttClient*>(context)->onConnectSuccess(resp); };;
option.onFailure = [](void* context, MQTTAsync_failureData* resp) { static_cast<MqttClient*>(context)->onConnectFaiure(resp); };
option.context = this;
option.username = username.c_str();
option.password = password.c_str();
//断开重连设置
option.automaticReconnect = 1;//设置非零,断开自动重连
option.minRetryInterval = 5; //单位秒,重连间隔次数,每次重新连接失败时,重试间隔都会加倍,直到最大间隔
option.maxRetryInterval = 60;//单位秒,最大重连尝试间隔
rc = MQTTAsync_connect(client, &option);
if (rc != MQTTASYNC_SUCCESS)
{
spdlog::error("[mqtt] MQTTAsync_connect error");
return rc;
}
return 0;
//MQTTAsync_disconnect(client, NULL);
//MQTTAsync_destroy(&client);
}
2025-09-04 19:31:04 +08:00
void MqttClient::destory()
{
2025-09-05 19:44:26 +08:00
return;
2025-09-04 19:31:04 +08:00
if (client)
{
MQTTAsync_destroy(&client);
}
}
struct SubscribInfo
{
std::function<void(int id)> callback;
};
2025-09-04 19:31:04 +08:00
void MqttClient::subscribe()
{
2025-09-04 19:31:04 +08:00
MQTTAsync_onSuccess* funcSuccess = [](void* context, MQTTAsync_successData* response)
{
2025-09-04 19:31:04 +08:00
spdlog::info("[mqtt] subscribe {} success.", (char*)context);
};
2025-09-04 19:31:04 +08:00
MQTTAsync_onFailure* funcFailure = [](void* context, MQTTAsync_failureData* response)
{
2025-09-04 19:31:04 +08:00
spdlog::error("[mqtt] subscribe {} failed.", (char*)context);
};
2025-09-04 19:31:04 +08:00
MQTTAsync_responseOptions options = MQTTAsync_responseOptions_initializer;
options.onSuccess = funcSuccess;
options.onFailure = funcFailure;
2025-09-19 18:54:36 +08:00
for (auto& item: MqttClient::s_mapTopicInfo)
{
2025-09-19 18:54:36 +08:00
if (item.second.enabled)
{
2025-09-19 18:54:36 +08:00
std::string topic = "up/json/" + clientId + "/" + item.first;
options.context = (void*)&item.first;
int rc = MQTTAsync_subscribe(client, topic.data(), qos, &options);
if (rc != MQTTASYNC_SUCCESS)
{
spdlog::error("[mqtt] subscribe [{},{}] failed, err={}", topic, qos, rc);
}
else
{
spdlog::info("[mqtt] subscribe [{},{}] ", topic, qos);
}
}
}
}
2025-09-12 18:44:34 +08:00
int MqttClient::publish(std::string topic, std::string text)
{
if (!client) return 0;
MQTTAsync_responseOptions options = MQTTAsync_responseOptions_initializer;
2025-09-12 18:44:34 +08:00
options.onSuccess = [](void* context, MQTTAsync_successData* response) {};
options.onFailure = [](void* context, MQTTAsync_failureData* response) {};
options.context = this;
MQTTAsync_message msg = MQTTAsync_message_initializer;
msg.qos = this->qos;
msg.payload = text.data();
msg.payloadlen = text.size();
msg.retained = 0;
2025-09-12 18:44:34 +08:00
std::string topicName = "down/json/" + clientId + "/" + topic;
int rc = MQTTAsync_sendMessage(client, topicName.c_str(), &msg, &options);
if (rc == MQTTASYNC_SUCCESS)
{
2025-09-16 19:38:46 +08:00
spdlog::info("[mqtt] publish success, topic={}, text={}", topicName, text);
2025-09-12 18:44:34 +08:00
}
else
{
2025-09-16 19:38:46 +08:00
spdlog::error("[mqtt] publish error, topic={}, text={}", topicName, text);
2025-09-12 18:44:34 +08:00
}
return 0;
}
int MqttClient::polling()
{
if (!isConnected)
{
2025-09-16 19:38:46 +08:00
spdlog::error("[mqtt] poll error, mqtt is not connected, clientId={}", clientId);
2025-09-12 18:44:34 +08:00
return 0;
}
njson json;
json["ts"] = Utils::time();
json["no"] = 0; // 设备编号
auto& appdata = Application::data();
auto station = appdata.getStationByCode(clientId);
2025-09-16 19:38:46 +08:00
if (!station)
{
spdlog::error("[mqtt] poll error, get station NULL, mqtt clientId={}", clientId);
return 0;
}
2025-09-12 18:44:34 +08:00
2025-09-19 18:54:36 +08:00
for (auto& item: MqttClient::s_mapTopicInfo)
2025-09-12 18:44:34 +08:00
{
auto& topicInfo = item.second;
2025-09-19 18:54:36 +08:00
if (topicInfo.polling && topicInfo.enabled)
{
2025-09-16 19:38:46 +08:00
std::vector<std::shared_ptr<Device>> vecDevice;
station->getDeviceByType(topicInfo.deviceType, vecDevice);
for (auto device: vecDevice)
2025-09-12 18:44:34 +08:00
{
2025-09-16 19:38:46 +08:00
json["no"] = Utils::toInt(device->code);
if (topicInfo.name == "Gateway_YC")
{
json["addr"] = {"40001", "40002", "40021", "40038"};
}
else
2025-09-12 18:44:34 +08:00
{
2025-09-16 19:38:46 +08:00
json["addr"] = njson::array();
2025-09-12 18:44:34 +08:00
}
2025-09-16 19:38:46 +08:00
this->publish(topicInfo.name, json.dump());
2025-09-12 18:44:34 +08:00
}
}
}
2025-09-24 19:06:31 +08:00
tsPolling = Utils::time();
return 0;
}
void MqttClient::onConnectionLost(char* cause)
{
this->isConnected = false;
2025-09-05 19:44:26 +08:00
//this->destory();
spdlog::error("MQTT connection lost, cause={}", cause);
}
2025-09-04 19:31:04 +08:00
std::string GetSubStr(std::string c, std::string& str)
{
std::string v;
int pos = str.find_first_of("/");
if (pos != string::npos)
{
v = str.substr(0, pos);
str = str.substr(pos+1);
}
else
{
v = str;
str = "";
2025-09-04 19:31:04 +08:00
}
return v;
}
2025-09-16 19:38:46 +08:00
// 交付完成回调(可选)
void MqttClient::onDeliveryComplete(MQTTAsync_token token)
{
2025-09-16 19:38:46 +08:00
//spdlog::info("MQTT delivery complete, token={}", token);
}
2025-09-16 19:38:46 +08:00
void MqttClient::onConnectSuccess( MQTTAsync_successData* resp)
{
spdlog::info("[mqtt] connect to {} success, clientId={}.", addr, clientId);
this->isConnected = true;
this->subscribe();
2025-09-24 19:06:31 +08:00
this->polling();
2025-09-16 19:38:46 +08:00
}
2025-09-16 19:38:46 +08:00
void MqttClient::onConnectFaiure(MQTTAsync_failureData* resp)
{
spdlog::error("[mqtt] connect to {} error, clientId={}.", addr, clientId);
this->isConnected = false;
this->destory();
}
2025-09-04 19:31:04 +08:00
2025-09-16 19:38:46 +08:00
int MqttClient::onMessageArrived(char* topic, int topicLen, MQTTAsync_message* msg)
{
2025-09-16 19:38:46 +08:00
std::string topicStr = topic;
int len = msg->payloadlen;
std::string payload((const char*)msg->payload, len);
2025-09-12 18:44:34 +08:00
2025-09-16 19:38:46 +08:00
// <数据方向>/<数据格式>/<厂家ID>/<指合>/<设备标识,上行可选>
std::string direction = GetSubStr("/", topicStr);
std::string datatype = GetSubStr("/", topicStr);
std::string stationNo = GetSubStr("/", topicStr);
std::string command = GetSubStr("/", topicStr);
std::string deviceCode = GetSubStr("/", topicStr);
2025-09-22 20:01:41 +08:00
spdlog::info("[mqtt] <<< message arrived: topic=[{},{}], len={}, payload={}", topic, msg->qos, len, payload);
2025-09-16 19:38:46 +08:00
njson json;
bool ret = JSON::parse(payload, json);
if (!ret)
{
2025-09-16 19:38:46 +08:00
spdlog::error("[mqtt] json parse error.");
return 1;
}
auto station = Application::data().getStationByCode(stationNo);
if (!station)
{
spdlog::error("[mqtt] get station error, clientId={}, stationId={}", clientId, stationNo);
return 1;
}
2025-09-16 19:38:46 +08:00
if (command == "Gateway_YC")
{
2025-09-16 19:38:46 +08:00
for (auto& item: json.items())
{
2025-09-16 19:38:46 +08:00
std::string key = item.key();
auto& val = item.value();
2025-09-19 18:54:36 +08:00
if (key == "40001") { station->readGatewayMode(val.get<int>()); }
2025-09-24 19:06:31 +08:00
//else if (key == "40002") { spdlog::info("[mqtt] read register addr: [{}]={}, {}", key, val.dump(), "峰谷时间段"); }
//else if (key == "40021") { spdlog::info("[mqtt] read register addr: [{}]={}, {}", key, val.dump(), "自定时间段"); }
//else if (key == "40038") { spdlog::info("[mqtt] read register addr: [{}]={}, {}", key, val.dump(), "其他参数"); }
2025-09-16 19:38:46 +08:00
}
}
else if (command == "Gateway_YX")
{
2025-09-19 18:54:36 +08:00
int cdzStatus = -1;
int emuStatus = -1;
JSON::read(json, "cdz", cdzStatus);
JSON::read(json, "emu", cdzStatus);
station->readGatewayStatus(cdzStatus, emuStatus);
}
2025-09-16 19:38:46 +08:00
else
{
2025-09-19 18:54:36 +08:00
ParseArrivedMessage(json, command, station);
2025-09-16 19:38:46 +08:00
}
// 必须释放消息内存!
MQTTAsync_freeMessage(&msg);
MQTTAsync_free(topic);
return 1; // 1表示消息已经处理
}
2025-09-19 18:54:36 +08:00
void MqttClient::ParseArrivedMessage(njson& json, string command, std::shared_ptr<Station> station)
{
2025-09-19 18:54:36 +08:00
std::string stationNo = clientId;
2025-09-19 18:54:36 +08:00
auto iterTopic = MqttClient::s_mapTopicInfo.find(command);
if (iterTopic == MqttClient::s_mapTopicInfo.end())
{
2025-09-19 18:54:36 +08:00
spdlog::error("[mqtt] get topic info error, clientId={}, stationId={}, command={}", clientId, stationNo, command);
return;
}
TopicInfo& topicInfo = iterTopic->second;
2025-09-19 18:54:36 +08:00
int deviceNo = -1;
JSON::read(json, "no", deviceNo);
auto device = station->getDeviceByType(topicInfo.deviceType, Utils::toStr(deviceNo));
if (!device)
{
spdlog::error("[mqtt] get device info error, clientId={}, stationId={}, command={}", clientId, stationNo, command);
return;
}
2025-09-19 18:54:36 +08:00
auto mapRegPtr = REGAddr::getRegMap(command);
if (!mapRegPtr)
{
2025-09-19 18:54:36 +08:00
spdlog::error("[mqtt] get register add info error, clientId={}, stationId={}, command={}", clientId, stationNo, command);
return;
}
2025-09-19 18:54:36 +08:00
for (auto& item: json.items())
2025-09-17 19:55:59 +08:00
{
2025-09-19 18:54:36 +08:00
std::string key = item.key();
if (key == "ts" || key == "no")
{
continue;
}
auto data = json[key];
if (data.is_array())
{
int dataSize = data.size();
if (command == "Charger_YC")
{
if (key == "1") key = "11";
else if (key == "2") key = "21";
}
std::string addrText;
auto iter = mapRegPtr->find(key);
for (int i = 0; i<dataSize; ++i)
{
int val = data[i];
2025-09-19 18:54:36 +08:00
if (command == "BCU_YC" && (key == "0x0056" || key == "0x043E" || key == "0x0826" || key == "0x0C0E" || key == "0x0FF6"))
{
device->setBCUUnit(key, i, val, dataSize);
}
else
{
if (iter != mapRegPtr->end())
{
auto addr = iter->first;
auto& regUnit = iter->second;
2025-09-24 19:06:31 +08:00
spdlog::debug("[mqtt] read [{}]={}, {}{}", addr, val, regUnit.name, regUnit.remark);
2025-09-22 20:01:41 +08:00
if (regUnit.alert && val > 0)
2025-09-19 18:54:36 +08:00
{
2025-09-22 20:01:41 +08:00
station->readAlert(device, addr, val, "[" + command + "]" + regUnit.name + "(" + addr + ")");
2025-09-19 18:54:36 +08:00
}
device->setParam(addr, val);
2025-09-19 18:54:36 +08:00
if (command == "MEM_YC") { station->readRuntimeData(deviceNo, addr, val); }
else if (command == "Fire40_YX") { station->readFire40Data(deviceNo, addr, val); }
else if (command == "TH_YC") { station->readTHData(deviceNo, addr, val); }
else if (command == "Cooling_YX" || command == "Cooling_YC") { station->readCoolingData(deviceNo, addr, val); }
else if (command == "Gateway_YX")
{
//if (key == "CDZ") "CDZ": 1, //充电桩 1在线0离线
//else if (key == "EMU") //储能 1在线0离线
}
++iter;
}
}
}
}
else if (data.is_number())
{
device->setParam(key, data.get<int>());
}
else if (data.is_string())
{
device->setParam(key, Utils::toInt(data.get<std::string>()));
}
}
2025-09-19 18:54:36 +08:00
}
std::vector<std::string> KEY_CHARGER_1 = {"31071", "31073", "31075", "31077", "31079", "31081", "31083"};
std::vector<std::string> KEY_CHARGER_2 = {"31072", "31074", "31076", "31078", "31080", "31082", "31084"};
void MqttClient::ParseMessageCharge(njson& json, string command, std::shared_ptr<Station> station, std::shared_ptr<Device> device)
{
if (json.contains("1"))
{
2025-09-19 18:54:36 +08:00
auto& jsondata = json["1"];
if (jsondata.is_array())
{
for (int i = 0; i<jsondata.size(); i++)
{
if (i<KEY_CHARGER_1.size())
{
auto& addr = KEY_CHARGER_1[i];
auto val = jsondata[i].get<int>();
device->setParam(addr, val);
spdlog::info("[mqtt] read: 枪1 [{}]={}", addr, val);
}
}
}
}
2025-09-19 18:54:36 +08:00
if (json.contains("2"))
{
2025-09-19 18:54:36 +08:00
auto& jsondata = json["2"];
if (jsondata.is_array())
{
for (int i = 0; i<jsondata.size(); i++)
{
if (i<KEY_CHARGER_2.size())
{
auto& addr = KEY_CHARGER_2[i];
auto val = jsondata[i].get<int>();
device->setParam(addr, val);
spdlog::info("[mqtt] read: 枪2 [{}]={}", addr, val);
}
}
}
}
}