Files
energy_storage/src/protocol/MqttEntity.cpp

470 lines
13 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#include "MqttEntity.h"
#include "common/Spdlogger.h"
#include "common/Utils.h"
#include "app/Application.h"
#include "app/AppData.h"
#include "app/Station.h"
#include "app/Device.h"
#include "app/DataStruct.h"
#define TIMEOUT 10000L
bool MqttClient::load(std::string filename)
{
njson jsonroot;
bool ret = JSON::load(filename, jsonroot);
if (!ret)
{
spdlog::error("[mqtt] load config file failed, filename={}", filename);
return false;
}
return true;
}
std::map<std::string, TopicInfo> MqttClient::s_mapTopicInfo;
int MqttClient::init(string addr, string clientId, string username, string password)
{ if (addr.empty())
{
return MQTTASYNC_FAILURE;
}
if (isConnected)
{
return MQTTASYNC_SUCCESS;
}
isConnected = false;
this->addr = addr;
this->clientId = clientId;
MQTTAsync_connectOptions option = MQTTAsync_connectOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
int rc {0};
// "tcp://localhost:1883"
std::string str = "ESS-" + clientId + "-" + std::to_string(Utils::time());
rc = MQTTAsync_create(&client, addr.c_str(), str.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL);
if (rc != MQTTASYNC_SUCCESS)
{
spdlog::error("[mqtt] MQTTAsync_create error: {}", rc);
return rc;
}
MQTTAsync_connectionLost* onConnectionLost =
[](void* context, char* cause)
{
static_cast<MqttClient*>(context)->onConnectionLost(cause);
};
MQTTAsync_messageArrived* onMessageArrived =
[](void* context, char* topicName, int topicLen, MQTTAsync_message* message)->int
{
return static_cast<MqttClient*>(context)->onMessageArrived(topicName, topicLen, message);
};
MQTTAsync_deliveryComplete* onDeliveryComplete =
[](void* context, MQTTAsync_token token)
{
};
//设置连接丢失、接受消息的回调函数
rc = MQTTAsync_setCallbacks(client, this, onConnectionLost, onMessageArrived, onDeliveryComplete);
if (rc != MQTTASYNC_SUCCESS)
{
spdlog::error("[mqtt] MQTTAsync_setCallbacks error");
this->destory();
return rc;
}
option.keepAliveInterval = 20;
option.cleansession = 1;
option.onSuccess = [](void* context, MQTTAsync_successData* resp) { static_cast<MqttClient*>(context)->onConnectSuccess(resp); };;
option.onFailure = [](void* context, MQTTAsync_failureData* resp) { static_cast<MqttClient*>(context)->onConnectFaiure(resp); };
option.context = this;
option.username = username.c_str();
option.password = password.c_str();
//断开重连设置
option.automaticReconnect = 1;//设置非零,断开自动重连
option.minRetryInterval = 5; //单位秒,重连间隔次数,每次重新连接失败时,重试间隔都会加倍,直到最大间隔
option.maxRetryInterval = 60;//单位秒,最大重连尝试间隔
rc = MQTTAsync_connect(client, &option);
if (rc != MQTTASYNC_SUCCESS)
{
spdlog::error("[mqtt] MQTTAsync_connect error");
return rc;
}
return 0;
//MQTTAsync_disconnect(client, NULL);
//MQTTAsync_destroy(&client);
}
void MqttClient::destory()
{
return;
if (client)
{
MQTTAsync_destroy(&client);
}
}
struct SubscribInfo
{
std::function<void(int id)> callback;
};
void MqttClient::subscribe()
{
MQTTAsync_onSuccess* funcSuccess = [](void* context, MQTTAsync_successData* response)
{
spdlog::info("[mqtt] subscribe {} success.", (char*)context);
};
MQTTAsync_onFailure* funcFailure = [](void* context, MQTTAsync_failureData* response)
{
spdlog::error("[mqtt] subscribe {} failed.", (char*)context);
};
MQTTAsync_responseOptions options = MQTTAsync_responseOptions_initializer;
options.onSuccess = funcSuccess;
options.onFailure = funcFailure;
for (auto& item: MqttClient::s_mapTopicInfo)
{
if (item.second.enabled)
{
std::string topic = "up/json/" + clientId + "/" + item.first;
options.context = (void*)&item.first;
int rc = MQTTAsync_subscribe(client, topic.data(), qos, &options);
if (rc != MQTTASYNC_SUCCESS)
{
spdlog::error("[mqtt] subscribe [{},{}] failed, err={}", topic, qos, rc);
}
else
{
spdlog::info("[mqtt] subscribe [{},{}] ", topic, qos);
}
}
}
}
int MqttClient::publish(std::string topic, std::string text)
{
if (!client) return 0;
MQTTAsync_responseOptions options = MQTTAsync_responseOptions_initializer;
options.onSuccess = [](void* context, MQTTAsync_successData* response) {};
options.onFailure = [](void* context, MQTTAsync_failureData* response) {};
options.context = this;
MQTTAsync_message msg = MQTTAsync_message_initializer;
msg.qos = this->qos;
msg.payload = text.data();
msg.payloadlen = text.size();
msg.retained = 0;
std::string topicName = "down/json/" + clientId + "/" + topic;
int rc = MQTTAsync_sendMessage(client, topicName.c_str(), &msg, &options);
if (rc == MQTTASYNC_SUCCESS)
{
spdlog::info("[mqtt] publish success, topic={}, text={}", topicName, text);
}
else
{
spdlog::error("[mqtt] publish error, topic={}, text={}", topicName, text);
}
return 0;
}
int MqttClient::polling()
{
if (!isConnected)
{
spdlog::error("[mqtt] poll error, mqtt is not connected, clientId={}", clientId);
return 0;
}
njson json;
json["ts"] = Utils::time();
json["no"] = 0; // 设备编号
auto& appdata = Application::data();
auto station = appdata.getStationByCode(clientId);
if (!station)
{
spdlog::error("[mqtt] poll error, get station NULL, mqtt clientId={}", clientId);
return 0;
}
for (auto& item: MqttClient::s_mapTopicInfo)
{
auto& topicInfo = item.second;
if (topicInfo.polling && topicInfo.enabled)
{
std::vector<std::shared_ptr<Device>> vecDevice;
station->getDeviceByType(topicInfo.deviceType, vecDevice);
for (auto device: vecDevice)
{
json["no"] = Utils::toInt(device->code);
if (topicInfo.name == "Gateway_YC")
{
json["addr"] = {"40001", "40002", "40021", "40038"};
}
else
{
json["addr"] = njson::array();
}
this->publish(topicInfo.name, json.dump());
}
}
}
tsPolling = Utils::time();
return 0;
}
void MqttClient::onConnectionLost(char* cause)
{
this->isConnected = false;
//this->destory();
spdlog::error("MQTT connection lost, cause={}", cause);
}
std::string GetSubStr(std::string c, std::string& str)
{
std::string v;
int pos = str.find_first_of("/");
if (pos != string::npos)
{
v = str.substr(0, pos);
str = str.substr(pos+1);
}
else
{
v = str;
str = "";
}
return v;
}
// 交付完成回调(可选)
void MqttClient::onDeliveryComplete(MQTTAsync_token token)
{
//spdlog::info("MQTT delivery complete, token={}", token);
}
void MqttClient::onConnectSuccess( MQTTAsync_successData* resp)
{
spdlog::info("[mqtt] connect to {} success, clientId={}.", addr, clientId);
this->isConnected = true;
this->subscribe();
this->polling();
}
void MqttClient::onConnectFaiure(MQTTAsync_failureData* resp)
{
spdlog::error("[mqtt] connect to {} error, clientId={}.", addr, clientId);
this->isConnected = false;
this->destory();
}
int MqttClient::onMessageArrived(char* topic, int topicLen, MQTTAsync_message* msg)
{
std::string topicStr = topic;
int len = msg->payloadlen;
std::string payload((const char*)msg->payload, len);
// <数据方向>/<数据格式>/<厂家ID>/<指合>/<设备标识,上行可选>
std::string direction = GetSubStr("/", topicStr);
std::string datatype = GetSubStr("/", topicStr);
std::string stationNo = GetSubStr("/", topicStr);
std::string command = GetSubStr("/", topicStr);
std::string deviceCode = GetSubStr("/", topicStr);
spdlog::info("[mqtt] <<< message arrived: topic=[{},{}], len={}, payload={}", topic, msg->qos, len, payload);
njson json;
bool ret = JSON::parse(payload, json);
if (!ret)
{
spdlog::error("[mqtt] json parse error.");
return 1;
}
auto station = Application::data().getStationByCode(stationNo);
if (!station)
{
spdlog::error("[mqtt] get station error, clientId={}, stationId={}", clientId, stationNo);
return 1;
}
if (command == "Gateway_YC")
{
for (auto& item: json.items())
{
std::string key = item.key();
auto& val = item.value();
if (key == "40001")
{
station->readGatewayMode(val.get<int>());
}
//else if (key == "40002") { spdlog::info("[mqtt] read register addr: [{}]={}, {}", key, val.dump(), "峰谷时间段"); }
//else if (key == "40021") { spdlog::info("[mqtt] read register addr: [{}]={}, {}", key, val.dump(), "自定时间段"); }
//else if (key == "40038") { spdlog::info("[mqtt] read register addr: [{}]={}, {}", key, val.dump(), "其他参数"); }
}
}
else if (command == "Gateway_YX")
{
int cdzStatus = -1;
int emuStatus = -1;
JSON::read(json, "cdz", cdzStatus);
JSON::read(json, "emu", emuStatus);
station->readGatewayStatus(cdzStatus, emuStatus);
}
else
{
ParseArrivedMessage(json, command, station);
}
// 必须释放消息内存!
MQTTAsync_freeMessage(&msg);
MQTTAsync_free(topic);
return 1; // 1表示消息已经处理
}
void MqttClient::ParseArrivedMessage(njson& json, string command, std::shared_ptr<Station> station)
{
std::string stationNo = clientId;
auto iterTopic = MqttClient::s_mapTopicInfo.find(command);
if (iterTopic == MqttClient::s_mapTopicInfo.end())
{
spdlog::error("[mqtt] get topic info error, clientId={}, stationId={}, command={}", clientId, stationNo, command);
return;
}
TopicInfo& topicInfo = iterTopic->second;
int deviceNo = -1;
JSON::read(json, "no", deviceNo);
auto device = station->getDeviceByType(topicInfo.deviceType, Utils::toStr(deviceNo));
if (!device)
{
spdlog::error("[mqtt] get device info error, clientId={}, stationId={}, command={}", clientId, stationNo, command);
return;
}
auto mapRegPtr = REGAddr::getRegMap(command);
if (!mapRegPtr)
{
spdlog::error("[mqtt] get register add info error, clientId={}, stationId={}, command={}", clientId, stationNo, command);
return;
}
for (auto& item: json.items())
{
std::string key = item.key();
if (key == "ts" || key == "no")
{
continue;
}
auto data = json[key];
if (data.is_array())
{
int dataSize = data.size();
if (command == "Charger_YC")
{
if (key == "1") key = "11";
else if (key == "2") key = "21";
}
std::string addrText;
auto iter = mapRegPtr->find(key);
for (int i = 0; i<dataSize; ++i)
{
int val = data[i];
if (command == "BCU_YC" && (key == "0x0056" || key == "0x043E" || key == "0x0826" || key == "0x0C0E" || key == "0x0FF6"))
{
device->setBCUUnit(key, i, val, dataSize);
}
else
{
if (iter != mapRegPtr->end())
{
auto addr = iter->first;
auto& regUnit = iter->second;
spdlog::debug("[mqtt] read [{}]={}, {}{}", addr, val, regUnit.name, regUnit.remark);
if (command == "BCU_YC" || command == "BCU_YX")
{
//spdlog::info("[mqtt] read [{}]={}, {}{}", addr, val, regUnit.name, regUnit.remark);
}
if (regUnit.alert && val > 0)
{
station->readAlert(device, addr, val, "[" + command + "]" + regUnit.name + "(" + addr + ")");
}
device->setParam(addr, val);
if (command == "MEM_YC") { station->readRuntimeData(deviceNo, addr, val); }
else if (command == "Fire40_YX") { station->readFire40Data(deviceNo, addr, val); }
else if (command == "TH_YC") { station->readTHData(deviceNo, addr, val); }
else if (command == "Cooling_YX" || command == "Cooling_YC") { station->readCoolingData(deviceNo, addr, val); }
else if (command == "Gateway_YX")
{
//if (key == "CDZ") "CDZ": 1, //充电桩 1在线0离线
//else if (key == "EMU") //储能 1在线0离线
}
++iter;
}
}
}
}
else if (data.is_number())
{
device->setParam(key, data.get<int>());
}
else if (data.is_string())
{
device->setParam(key, Utils::toInt(data.get<std::string>()));
}
}
}
std::vector<std::string> KEY_CHARGER_1 = {"31071", "31073", "31075", "31077", "31079", "31081", "31083"};
std::vector<std::string> KEY_CHARGER_2 = {"31072", "31074", "31076", "31078", "31080", "31082", "31084"};
void MqttClient::ParseMessageCharge(njson& json, string command, std::shared_ptr<Station> station, std::shared_ptr<Device> device)
{
if (json.contains("1"))
{
auto& jsondata = json["1"];
if (jsondata.is_array())
{
for (int i = 0; i<jsondata.size(); i++)
{
if (i<KEY_CHARGER_1.size())
{
auto& addr = KEY_CHARGER_1[i];
auto val = jsondata[i].get<int>();
device->setParam(addr, val);
spdlog::info("[mqtt] read: 枪1 [{}]={}", addr, val);
}
}
}
}
if (json.contains("2"))
{
auto& jsondata = json["2"];
if (jsondata.is_array())
{
for (int i = 0; i<jsondata.size(); i++)
{
if (i<KEY_CHARGER_2.size())
{
auto& addr = KEY_CHARGER_2[i];
auto val = jsondata[i].get<int>();
device->setParam(addr, val);
spdlog::info("[mqtt] read: 枪2 [{}]={}", addr, val);
}
}
}
}
}