#include "MqttEntity.h" #include "common/Spdlogger.h" #include "common/JsonN.h" #include "common/Utils.h" #define TIMEOUT 10000L static std::map> g_mapRegInfo; void MqttClient::loadDataStruct(std::string filename) { njson json; JSON::load(filename, json); // 遍历 JSON 对象 for (auto& jsonitem : json.items()) { auto& mapItem = g_mapRegInfo[jsonitem.key()]; for (auto& itemaddrs : jsonitem.value().items()) { auto& jsonreg = itemaddrs.value(); mapItem[jsonreg["key"]] = REGInfo(jsonreg["key"], jsonreg["datatype"], jsonreg["remark"]); } } } int MqttClient::init(string addr, string clientId, string username, string password) { this->addr = addr; this->clientId = clientId; this->vecTopic = { "up/json/" + clientId + "/EMS_YX", "up/json/" + clientId + "/EMS_YC", "up/json/" + clientId + "/EMS_YT", "up/json/" + clientId + "/PCU_YX", "up/json/" + clientId + "/PCU_YC", "up/json/" + clientId + "/PCS_YX", "up/json/" + clientId + "/PCS_YC", "up/json/" + clientId + "/BCU_YX", "up/json/" + clientId + "/BCU_YC", "up/json/" + clientId + "/BMS_YX", "up/json/" + clientId + "/BMS_YC", "up/json/" + clientId + "/MEM_YC", }; MQTTAsync_connectOptions option = MQTTAsync_connectOptions_initializer; MQTTAsync_message pubmsg = MQTTAsync_message_initializer; int rc {0}; // "tcp://localhost:1883" std::string str = "ESS-" + std::to_string(Utils::random(1000, 9999)) + "-" +clientId; rc = MQTTAsync_create(&client, addr.c_str(), str.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL); if (rc != MQTTASYNC_SUCCESS) { spdlog::error("[mqtt] MQTTAsync_create error: {}", rc); return rc; } MQTTAsync_connectionLost* onConnectionLost = [](void* context, char* cause) { static_cast(context)->onConnectionLost(cause); }; MQTTAsync_messageArrived* onMessageArrived = [](void* context, char* topicName, int topicLen, MQTTAsync_message* message)->int { return static_cast(context)->onMessageArrived(topicName, topicLen, message); }; MQTTAsync_deliveryComplete* onDeliveryComplete = [](void* context, MQTTAsync_token token) { }; //设置连接丢失、接受消息的回调函数 rc = MQTTAsync_setCallbacks(client, this, onConnectionLost, onMessageArrived, onDeliveryComplete); if (rc != MQTTASYNC_SUCCESS) { spdlog::error("[mqtt] MQTTAsync_setCallbacks error"); this->destory(); return rc; } option.keepAliveInterval = 20; option.cleansession = 1; option.onSuccess = [](void* context, MQTTAsync_successData* resp) { static_cast(context)->onConnectSuccess(resp); };; option.onFailure = [](void* context, MQTTAsync_failureData* resp) { static_cast(context)->onConnectFaiure(resp); }; option.context = this; option.username = username.c_str(); option.password = password.c_str(); //断开重连设置 option.automaticReconnect = 1;//设置非零,断开自动重连 option.minRetryInterval = 5; //单位秒,重连间隔次数,每次重新连接失败时,重试间隔都会加倍,直到最大间隔 option.maxRetryInterval = 60;//单位秒,最大重连尝试间隔 rc = MQTTAsync_connect(client, &option); if (rc != MQTTASYNC_SUCCESS) { spdlog::error("[mqtt] MQTTAsync_connect error"); return rc; } return 0; //MQTTAsync_disconnect(client, NULL); //MQTTAsync_destroy(&client); } void MqttClient::destory() { return; if (client) { MQTTAsync_destroy(&client); } } struct SubscribInfo { std::function callback; }; void MqttClient::subscribe() { MQTTAsync_onSuccess* funcSuccess = [](void* context, MQTTAsync_successData* response) { spdlog::info("[mqtt] subscribe {} success.", (char*)context); }; MQTTAsync_onFailure* funcFailure = [](void* context, MQTTAsync_failureData* response) { spdlog::error("[mqtt] subscribe {} failed.", (char*)context); }; MQTTAsync_responseOptions options = MQTTAsync_responseOptions_initializer; options.onSuccess = funcSuccess; options.onFailure = funcFailure; for (auto& topic: vecTopic) { options.context = topic.data(); int rc = MQTTAsync_subscribe(client, topic.data(), qos, &options); if (rc != MQTTASYNC_SUCCESS) { spdlog::error("[mqtt] subscribe [{},{}] failed, err={}", topic, qos, rc); } } } int MqttClient::publish(string topic, string text) { spdlog::info("MQTT publish: topic={}, text={}", topic, text); MQTTAsync_responseOptions options = MQTTAsync_responseOptions_initializer; //options.onSuccess = onSend; //options.onFailure = onSendFailure; options.context = this; MQTTAsync_message msg = MQTTAsync_message_initializer; msg.qos = 1; msg.payload = text.data(); msg.payloadlen = text.size(); msg.retained = 0; int rc = MQTTAsync_sendMessage(client, topic.c_str(), &msg, &options); if (rc == MQTTASYNC_SUCCESS) { spdlog::info("MQTT send message success, topic={}, text={}", topic, text); } else { spdlog::error("MQTT send message error, topic={}, text={}", topic, text); } return rc; } void MqttClient::onConnectionLost(char* cause) { this->isConnected = false; //this->destory(); spdlog::error("MQTT connection lost, cause={}", cause); } std::string GetSubStr(std::string c, std::string& str) { std::string v; int pos = str.find_first_of("/"); if (pos != string::npos) { v = str.substr(0, pos); str = str.substr(pos); } return v; } int MqttClient::onMessageArrived(char* topic, int topicLen, MQTTAsync_message* msg) { std::string topicStr = topic; int len = msg->payloadlen; std::string payload = (char*)msg->payload; spdlog::info("MQTT message arrived: topic=[{},{}], payload len={}, payload msg={}", topic, msg->qos, len, payload); // <数据方向>/<数据格式>/<厂家ID>/<指合>/<设备标识,上行可选> std::string direction = GetSubStr("/", topicStr); std::string datatype = GetSubStr("/", topicStr); std::string stationId = GetSubStr("/", topicStr); std::string command = GetSubStr("/", topicStr); std::string deviceCode = GetSubStr("/", topicStr); //EMS遥信 //EMS遥测 //EMS遥调 //PCU遥信 //PCU遥测 //PCS遥信 //PCS遥测 //BMS遥测 //BCU遥信 //BCU遥测 //电表遥测 //温湿度遥测 //消防遥信4.0 //冷机遥信 //冷机遥测 //充电桩遥测 //网关遥信 //网关遥测 //网关遥调 //台区 if (command == "EMS_YX") { this->parseEMS_YX(payload); } else if (command == "EMS_YC") { this->parseEMS_YC(payload); } else if (command == "PCU_YX") { this->parsePCU_YX(payload); } else if (command == "PCU_YC") { this->parsePCU_YC(payload); } else if (command == "PCS_YX") { this->parsePCS_YX(payload); } else if (command == "PCS_YC") { this->parsePCS_YC(payload); } else if (command == "BMS_YC") { this->parseBMS_YC(payload); } else if (command == "BCU_YX") { this->parseBCU_YX(payload); } else if (command == "BCU_YC") { this->parseBCU_YC(payload); } else if (command == "MEM_YC") { this->parseMEM_YC(payload); } else if (command == "TH_YC") { this->parseTH_YC(payload); } else if (command == "Fire40_YX") { this->parseFire40_YX(payload); } else if (command == "Cooling_YX") { this->parseCooling_YX(payload); } else if (command == "Cooling_YC") { this->parseCooling_YC(payload); } else if (command == "Charger_YC") { this->parseCharger_YC(payload); } else if (command == "Gateway_YX") { this->parseGateway_YX(payload); } else if (command == "Gateway_YC") { this->parseGateway_YC(payload); } else if (command == "Gateway_YT") { this->parseGateway_YT(payload); } else if (command == "TQ") { this->parseTQ(payload); } // 必须释放消息内存! MQTTAsync_freeMessage(&msg); MQTTAsync_free(topic); return 1; // 1表示消息已经处理 } // 交付完成回调(可选) void MqttClient::onDeliveryComplete(MQTTAsync_token token) { //spdlog::info("MQTT delivery complete, token={}", token); } void MqttClient::onConnectSuccess( MQTTAsync_successData* resp) { spdlog::info("[mqtt] connect to {} success, clientId={}.", addr, clientId); this->isConnected = true; this->subscribe(); //MQTTAsync_responseOptions options = MQTTAsync_responseOptions_initializer; //options.context = this; //options.onSuccess = [](void* context, MQTTAsync_successData* response) // { // spdlog::info("[mqtt] subscribe success."); // }; //options.onFailure = [](void* context, MQTTAsync_failureData* response) // { // spdlog::info("[mqtt] subscribe failed."); // }; // //for (auto& topic: vecTopic) //{ // int rc = MQTTAsync_subscribe(client, topic.c_str(), qos, &options); // if (rc != MQTTASYNC_SUCCESS) // { // spdlog::error("[mqtt] subscribe [{},{}] failed, err={}", topic, qos, rc); // } // else // { // spdlog::info("[mqtt] subscribe [{},{}] success", topic, qos); // } //} } void MqttClient::onConnectFaiure(MQTTAsync_failureData* resp) { spdlog::error("[mqtt] connect to {} error, clientId={}.", addr, clientId); this->isConnected = false; this->destory(); } void MqttClient::parseEMS_YX(std::string& text){} void MqttClient::parseEMS_YC(std::string& text) {}; void MqttClient::parsePCU_YX(std::string& text) {}; void MqttClient::parsePCU_YC(std::string& text) {}; void MqttClient::parsePCS_YX(std::string& text) {}; void MqttClient::parsePCS_YC(std::string& text) {}; void MqttClient::parseBMS_YC(std::string& text) {}; void MqttClient::parseBCU_YX(std::string& text) {}; void MqttClient::parseBCU_YC(std::string& text) {}; void MqttClient::parseMEM_YC(std::string& text) {}; void MqttClient::parseTH_YC(std::string& text) {}; void MqttClient::parseFire40_YX(std::string& text) {}; void MqttClient::parseCooling_YX(std::string& text) {}; void MqttClient::parseCooling_YC(std::string& text) {}; void MqttClient::parseCharger_YC(std::string& text) {}; void MqttClient::parseGateway_YX(std::string& text) {}; void MqttClient::parseGateway_YC(std::string& text) {}; void MqttClient::parseGateway_YT(std::string& text) {}; void MqttClient::parseTQ(std::string& text) {}; string MQTT::packEquipmentInfo() { njson jsonroot; return jsonroot.dump(); }