#include "MqttEntity.h" #include "common/Spdlogger.h" #include "common/Utils.h" #include "app/Application.h" #include "app/AppData.h" #include "app/Station.h" #include "app/Device.h" #include "app/DataStruct.h" #define TIMEOUT 10000L bool MqttClient::load(std::string filename) { njson jsonroot; bool ret = JSON::load(filename, jsonroot); if (!ret) { spdlog::error("[mqtt] load config file failed, filename={}", filename); return false; } return true; } std::map MqttClient::s_mapTopicInfo; int MqttClient::init(string addr, string clientId, string username, string password) { if (addr.empty()) { return MQTTASYNC_FAILURE; } if (isConnected) { return MQTTASYNC_SUCCESS; } isConnected = false; this->addr = addr; this->clientId = clientId; MQTTAsync_connectOptions option = MQTTAsync_connectOptions_initializer; MQTTAsync_message pubmsg = MQTTAsync_message_initializer; int rc {0}; // "tcp://localhost:1883" std::string str = "ESS-" + std::to_string(Utils::random(1000, 9999)) + "-" +clientId; rc = MQTTAsync_create(&client, addr.c_str(), str.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL); if (rc != MQTTASYNC_SUCCESS) { spdlog::error("[mqtt] MQTTAsync_create error: {}", rc); return rc; } MQTTAsync_connectionLost* onConnectionLost = [](void* context, char* cause) { static_cast(context)->onConnectionLost(cause); }; MQTTAsync_messageArrived* onMessageArrived = [](void* context, char* topicName, int topicLen, MQTTAsync_message* message)->int { return static_cast(context)->onMessageArrived(topicName, topicLen, message); }; MQTTAsync_deliveryComplete* onDeliveryComplete = [](void* context, MQTTAsync_token token) { }; //设置连接丢失、接受消息的回调函数 rc = MQTTAsync_setCallbacks(client, this, onConnectionLost, onMessageArrived, onDeliveryComplete); if (rc != MQTTASYNC_SUCCESS) { spdlog::error("[mqtt] MQTTAsync_setCallbacks error"); this->destory(); return rc; } option.keepAliveInterval = 20; option.cleansession = 1; option.onSuccess = [](void* context, MQTTAsync_successData* resp) { static_cast(context)->onConnectSuccess(resp); };; option.onFailure = [](void* context, MQTTAsync_failureData* resp) { static_cast(context)->onConnectFaiure(resp); }; option.context = this; option.username = username.c_str(); option.password = password.c_str(); //断开重连设置 option.automaticReconnect = 1;//设置非零,断开自动重连 option.minRetryInterval = 5; //单位秒,重连间隔次数,每次重新连接失败时,重试间隔都会加倍,直到最大间隔 option.maxRetryInterval = 60;//单位秒,最大重连尝试间隔 rc = MQTTAsync_connect(client, &option); if (rc != MQTTASYNC_SUCCESS) { spdlog::error("[mqtt] MQTTAsync_connect error"); return rc; } return 0; //MQTTAsync_disconnect(client, NULL); //MQTTAsync_destroy(&client); } void MqttClient::destory() { return; if (client) { MQTTAsync_destroy(&client); } } struct SubscribInfo { std::function callback; }; void MqttClient::subscribe() { MQTTAsync_onSuccess* funcSuccess = [](void* context, MQTTAsync_successData* response) { spdlog::info("[mqtt] subscribe {} success.", (char*)context); }; MQTTAsync_onFailure* funcFailure = [](void* context, MQTTAsync_failureData* response) { spdlog::error("[mqtt] subscribe {} failed.", (char*)context); }; MQTTAsync_responseOptions options = MQTTAsync_responseOptions_initializer; options.onSuccess = funcSuccess; options.onFailure = funcFailure; for (auto& item: MqttClient::s_mapTopicInfo) { if (item.second.enabled) { std::string topic = "up/json/" + clientId + "/" + item.first; options.context = (void*)&item.first; int rc = MQTTAsync_subscribe(client, topic.data(), qos, &options); if (rc != MQTTASYNC_SUCCESS) { spdlog::error("[mqtt] subscribe [{},{}] failed, err={}", topic, qos, rc); } else { spdlog::info("[mqtt] subscribe [{},{}] ", topic, qos); } } } } int MqttClient::publish(std::string topic, std::string text) { if (!client) return 0; MQTTAsync_responseOptions options = MQTTAsync_responseOptions_initializer; options.onSuccess = [](void* context, MQTTAsync_successData* response) {}; options.onFailure = [](void* context, MQTTAsync_failureData* response) {}; options.context = this; MQTTAsync_message msg = MQTTAsync_message_initializer; msg.qos = this->qos; msg.payload = text.data(); msg.payloadlen = text.size(); msg.retained = 0; std::string topicName = "down/json/" + clientId + "/" + topic; int rc = MQTTAsync_sendMessage(client, topicName.c_str(), &msg, &options); if (rc == MQTTASYNC_SUCCESS) { spdlog::info("[mqtt] publish success, topic={}, text={}", topicName, text); } else { spdlog::error("[mqtt] publish error, topic={}, text={}", topicName, text); } return 0; } int MqttClient::polling() { if (!isConnected) { spdlog::error("[mqtt] poll error, mqtt is not connected, clientId={}", clientId); return 0; } njson json; json["ts"] = Utils::time(); json["no"] = 0; // 设备编号 auto& appdata = Application::data(); auto station = appdata.getStationByCode(clientId); if (!station) { spdlog::error("[mqtt] poll error, get station NULL, mqtt clientId={}", clientId); return 0; } for (auto& item: MqttClient::s_mapTopicInfo) { auto& topicInfo = item.second; if (topicInfo.polling && topicInfo.enabled) { std::vector> vecDevice; station->getDeviceByType(topicInfo.deviceType, vecDevice); for (auto device: vecDevice) { json["no"] = Utils::toInt(device->code); if (topicInfo.name == "Gateway_YC") { json["addr"] = {"40001", "40002", "40021", "40038"}; } else { json["addr"] = njson::array(); } this->publish(topicInfo.name, json.dump()); } } } return 0; } void MqttClient::onConnectionLost(char* cause) { this->isConnected = false; //this->destory(); spdlog::error("MQTT connection lost, cause={}", cause); } std::string GetSubStr(std::string c, std::string& str) { std::string v; int pos = str.find_first_of("/"); if (pos != string::npos) { v = str.substr(0, pos); str = str.substr(pos+1); } else { v = str; str = ""; } return v; } // 交付完成回调(可选) void MqttClient::onDeliveryComplete(MQTTAsync_token token) { //spdlog::info("MQTT delivery complete, token={}", token); } void MqttClient::onConnectSuccess( MQTTAsync_successData* resp) { spdlog::info("[mqtt] connect to {} success, clientId={}.", addr, clientId); this->isConnected = true; this->subscribe(); } void MqttClient::onConnectFaiure(MQTTAsync_failureData* resp) { spdlog::error("[mqtt] connect to {} error, clientId={}.", addr, clientId); this->isConnected = false; this->destory(); } int MqttClient::onMessageArrived(char* topic, int topicLen, MQTTAsync_message* msg) { std::string topicStr = topic; int len = msg->payloadlen; std::string payload((const char*)msg->payload, len); // <数据方向>/<数据格式>/<厂家ID>/<指合>/<设备标识,上行可选> std::string direction = GetSubStr("/", topicStr); std::string datatype = GetSubStr("/", topicStr); std::string stationNo = GetSubStr("/", topicStr); std::string command = GetSubStr("/", topicStr); std::string deviceCode = GetSubStr("/", topicStr); spdlog::info("[mqtt] <<< message arrived: topic=[{},{}], len={}, payload={}", topic, msg->qos, len, payload); njson json; bool ret = JSON::parse(payload, json); if (!ret) { spdlog::error("[mqtt] json parse error."); return 1; } auto station = Application::data().getStationByCode(stationNo); if (!station) { spdlog::error("[mqtt] get station error, clientId={}, stationId={}", clientId, stationNo); return 1; } if (command == "Gateway_YC") { for (auto& item: json.items()) { std::string key = item.key(); auto& val = item.value(); if (key == "40001") { station->readGatewayMode(val.get()); } else if (key == "40002") { spdlog::info("[mqtt] read register addr: [{}]={}, {}", key, val.dump(), "峰谷时间段"); } else if (key == "40021") { spdlog::info("[mqtt] read register addr: [{}]={}, {}", key, val.dump(), "自定时间段"); } else if (key == "40038") { spdlog::info("[mqtt] read register addr: [{}]={}, {}", key, val.dump(), "其他参数"); } } } else if (command == "Gateway_YX") { int cdzStatus = -1; int emuStatus = -1; JSON::read(json, "cdz", cdzStatus); JSON::read(json, "emu", cdzStatus); station->readGatewayStatus(cdzStatus, emuStatus); } else { ParseArrivedMessage(json, command, station); } // 必须释放消息内存! MQTTAsync_freeMessage(&msg); MQTTAsync_free(topic); return 1; // 1表示消息已经处理 } void MqttClient::ParseArrivedMessage(njson& json, string command, std::shared_ptr station) { std::string stationNo = clientId; auto iterTopic = MqttClient::s_mapTopicInfo.find(command); if (iterTopic == MqttClient::s_mapTopicInfo.end()) { spdlog::error("[mqtt] get topic info error, clientId={}, stationId={}, command={}", clientId, stationNo, command); return; } TopicInfo& topicInfo = iterTopic->second; int deviceNo = -1; JSON::read(json, "no", deviceNo); auto device = station->getDeviceByType(topicInfo.deviceType, Utils::toStr(deviceNo)); if (!device) { spdlog::error("[mqtt] get device info error, clientId={}, stationId={}, command={}", clientId, stationNo, command); return; } auto mapRegPtr = REGAddr::getRegMap(command); if (!mapRegPtr) { spdlog::error("[mqtt] get register add info error, clientId={}, stationId={}, command={}", clientId, stationNo, command); return; } for (auto& item: json.items()) { std::string key = item.key(); if (key == "ts" || key == "no") { continue; } auto data = json[key]; if (data.is_array()) { int dataSize = data.size(); if (command == "Charger_YC") { if (key == "1") key = "11"; else if (key == "2") key = "21"; } std::string addrText; auto iter = mapRegPtr->find(key); for (int i = 0; isetBCUUnit(key, i, val, dataSize); } else { if (iter != mapRegPtr->end()) { auto addr = iter->first; auto& regUnit = iter->second; if (regUnit.alert && val > 0) { station->readAlert(device, addr, val, "[" + command + "]" + regUnit.name + "(" + addr + ")"); } device->setParam(addr, val); spdlog::debug("[mqtt] read [{}]={}, {}{}", addr, val, regUnit.name, regUnit.remark); if (command == "MEM_YC") { station->readRuntimeData(deviceNo, addr, val); } else if (command == "Fire40_YX") { station->readFire40Data(deviceNo, addr, val); } else if (command == "TH_YC") { station->readTHData(deviceNo, addr, val); } else if (command == "Cooling_YX" || command == "Cooling_YC") { station->readCoolingData(deviceNo, addr, val); } else if (command == "Gateway_YX") { //if (key == "CDZ") "CDZ": 1, //充电桩 1:在线,0:离线 //else if (key == "EMU") //储能 1:在线,0:离线 } ++iter; } } } } else if (data.is_number()) { device->setParam(key, data.get()); } else if (data.is_string()) { device->setParam(key, Utils::toInt(data.get())); } } } std::vector KEY_CHARGER_1 = {"31071", "31073", "31075", "31077", "31079", "31081", "31083"}; std::vector KEY_CHARGER_2 = {"31072", "31074", "31076", "31078", "31080", "31082", "31084"}; void MqttClient::ParseMessageCharge(njson& json, string command, std::shared_ptr station, std::shared_ptr device) { if (json.contains("1")) { auto& jsondata = json["1"]; if (jsondata.is_array()) { for (int i = 0; i(); device->setParam(addr, val); spdlog::info("[mqtt] read: 枪1 [{}]={}", addr, val); } } } } if (json.contains("2")) { auto& jsondata = json["2"]; if (jsondata.is_array()) { for (int i = 0; i(); device->setParam(addr, val); spdlog::info("[mqtt] read: 枪2 [{}]={}", addr, val); } } } } }