#include "MqttEntity.h" #include "common/Spdlogger.h" #include "common/JsonN.h" #define TIMEOUT 10000L int MqttClient::init(string addr, string clientId, string username, string password) { this->addr = addr; this->clientId = clientId; this->vecTopic = { "up/json/" + clientId + "/EMS_YX", "up/json/" + clientId + "/EMS_YC", "up/json/" + clientId + "/EMS_YT", "up/json/" + clientId + "/PCU_YX", "up/json/" + clientId + "/PCU_YC", "up/json/" + clientId + "/PCS_YX", "up/json/" + clientId + "/PCS_YC", "up/json/" + clientId + "/BCU_YX", "up/json/" + clientId + "/BCU_YC", "up/json/" + clientId + "/BMS_YX", "up/json/" + clientId + "/BMS_YC", "up/json/" + clientId + "/MEM_YC", }; MQTTAsync_connectOptions option = MQTTAsync_connectOptions_initializer; MQTTAsync_message pubmsg = MQTTAsync_message_initializer; int rc {0}; // "tcp://localhost:1883" rc = MQTTAsync_create(&client, addr.c_str(), clientId.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL); if (rc != MQTTASYNC_SUCCESS) { spdlog::error("[mqtt] MQTTAsync_create error: {}", rc); return rc; } MQTTAsync_connectionLost* onConnectionLost = [](void* context, char* cause) { static_cast(context)->onConnectionLost(cause); }; MQTTAsync_messageArrived* onMessageArrived = [](void* context, char* topicName, int topicLen, MQTTAsync_message* message)->int { return static_cast(context)->onMessageArrived(topicName, topicLen, message); }; MQTTAsync_deliveryComplete* onDeliveryComplete = [](void* context, MQTTAsync_token token) { }; //设置连接丢失、接受消息的回调函数 rc = MQTTAsync_setCallbacks(client, this, onConnectionLost, onMessageArrived, onDeliveryComplete); if (rc != MQTTASYNC_SUCCESS) { spdlog::error("[mqtt] MQTTAsync_setCallbacks error"); this->destory(); return rc; } option.keepAliveInterval = 20; option.cleansession = 1; option.onSuccess = [](void* context, MQTTAsync_successData* resp) { static_cast(context)->onConnectSuccess(resp); };; option.onFailure = [](void* context, MQTTAsync_failureData* resp) { static_cast(context)->onConnectFaiure(resp); }; option.context = this; option.username = username.c_str(); option.password = password.c_str(); //断开重连设置 option.automaticReconnect = 1;//设置非零,断开自动重连 option.minRetryInterval = 5; //单位秒,重连间隔次数,每次重新连接失败时,重试间隔都会加倍,直到最大间隔 option.maxRetryInterval = 60;//单位秒,最大重连尝试间隔 rc = MQTTAsync_connect(client, &option); if (rc != MQTTASYNC_SUCCESS) { spdlog::error("[mqtt] MQTTAsync_connect error"); return rc; } return 0; //MQTTAsync_disconnect(client, NULL); //MQTTAsync_destroy(&client); } void MqttClient::destory() { if (client) { MQTTAsync_destroy(&client); client = nullptr; } } struct SubscribInfo { std::function callback; }; void MqttClient::subscribe() { MQTTAsync_onSuccess* funcSuccess = [](void* context, MQTTAsync_successData* response) { spdlog::info("[mqtt] subscribe {} success.", (char*)context); }; MQTTAsync_onFailure* funcFailure = [](void* context, MQTTAsync_failureData* response) { spdlog::error("[mqtt] subscribe {} failed.", (char*)context); }; MQTTAsync_responseOptions options = MQTTAsync_responseOptions_initializer; options.onSuccess = funcSuccess; options.onFailure = funcFailure; for (auto& topic: vecTopic) { options.context = topic.data(); int rc = MQTTAsync_subscribe(client, topic.data(), qos, &options); if (rc != MQTTASYNC_SUCCESS) { spdlog::error("[mqtt] subscribe [{},{}] failed, err={}", topic, qos, rc); } } } int MqttClient::publish(string topic, string text) { spdlog::info("MQTT publish: topic={}, text={}", topic, text); MQTTAsync_responseOptions options = MQTTAsync_responseOptions_initializer; //options.onSuccess = onSend; //options.onFailure = onSendFailure; options.context = this; MQTTAsync_message msg = MQTTAsync_message_initializer; msg.qos = 1; msg.payload = text.data(); msg.payloadlen = text.size(); msg.retained = 0; int rc = MQTTAsync_sendMessage(client, topic.c_str(), &msg, &options); if (rc == MQTTASYNC_SUCCESS) { spdlog::info("MQTT send message success, topic={}, text={}", topic, text); } else { spdlog::error("MQTT send message error, topic={}, text={}", topic, text); } return rc; } void MqttClient::onConnectionLost(char* cause) { this->isConnected = false; this->destory(); spdlog::error("MQTT connection lost, cause={}", cause); } std::string GetSubStr(std::string c, std::string& str) { std::string v; int pos = str.find_first_of("/"); if (pos != string::npos) { v = str.substr(0, pos); str = str.substr(pos); } return v; } int MqttClient::onMessageArrived(char* topic, int topicLen, MQTTAsync_message* msg) { std::string topicStr = topic; int len = msg->payloadlen; std::string payload = (char*)msg->payload; spdlog::info("MQTT message arrived: topic=[{},{}], payload len={}, payload msg={}", topic, msg->qos, len, payload); // <数据方向>/<数据格式>/<厂家ID>/<指合>/<设备标识,上行可选> std::string direction = GetSubStr("/", topicStr); std::string datatype = GetSubStr("/", topicStr); std::string stationId = GetSubStr("/", topicStr); std::string command = GetSubStr("/", topicStr); std::string deviceCode = GetSubStr("/", topicStr); if (command == "EMS_YX") {} else if (command == "EMS_YC") {} else if (command == "PCU_YX") { this->parsePCU_YX(payload); } else if (command == "PCU_YC") {} // 必须释放消息内存! MQTTAsync_freeMessage(&msg); MQTTAsync_free(topic); return 1; // 1表示消息已经处理 } // 交付完成回调(可选) void MqttClient::onDeliveryComplete(MQTTAsync_token token) { //spdlog::info("MQTT delivery complete, token={}", token); } void MqttClient::onConnectSuccess( MQTTAsync_successData* resp) { spdlog::info("[mqtt] connect to {} success.", addr); this->isConnected = true; this->subscribe(); //MQTTAsync_responseOptions options = MQTTAsync_responseOptions_initializer; //options.context = this; //options.onSuccess = [](void* context, MQTTAsync_successData* response) // { // spdlog::info("[mqtt] subscribe success."); // }; //options.onFailure = [](void* context, MQTTAsync_failureData* response) // { // spdlog::info("[mqtt] subscribe failed."); // }; // //for (auto& topic: vecTopic) //{ // int rc = MQTTAsync_subscribe(client, topic.c_str(), qos, &options); // if (rc != MQTTASYNC_SUCCESS) // { // spdlog::error("[mqtt] subscribe [{},{}] failed, err={}", topic, qos, rc); // } // else // { // spdlog::info("[mqtt] subscribe [{},{}] success", topic, qos); // } //} } void MqttClient::onConnectFaiure(MQTTAsync_failureData* resp) { spdlog::error("[mqtt] connect to {} error.", addr); this->isConnected = false; this->destory(); } void MqttClient::parseEMS_YC(std::string& text) { } void MqttClient::parsePCU_YX(std::string& text) { } string MQTT::packEquipmentInfo() { NJsonNode jsonroot; return jsonroot.dump(); }