#include "MqttEntity.h" #include "common/Spdlogger.h" #include "common/JsonN.h" #define TIMEOUT 10000L int MqttClient::init(string addr, string client_id, string username, string password, std::vector vecTopic) { this->addr = addr; this->vecTopic = vecTopic; MQTTAsync_connectOptions option = MQTTAsync_connectOptions_initializer; MQTTAsync_message pubmsg = MQTTAsync_message_initializer; int rc {0}; // "tcp://localhost:1883" rc = MQTTAsync_create(&client, addr.c_str(), client_id.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL); if (rc != MQTTASYNC_SUCCESS) { spdlog::error("[mqtt] MQTTAsync_create error: {}", rc); return rc; } MQTTAsync_connectionLost* onConnectionLost = [](void* context, char* cause) { static_cast(context)->onConnectionLost(cause); }; MQTTAsync_messageArrived* onMessageArrived = [](void* context, char* topicName, int topicLen, MQTTAsync_message* message)->int { return static_cast(context)->onMessageArrived(topicName, topicLen, message); }; MQTTAsync_deliveryComplete* onDeliveryComplete = [](void* context, MQTTAsync_token token) { }; //设置连接丢失、接受消息的回调函数 rc = MQTTAsync_setCallbacks(client, this, onConnectionLost, onMessageArrived, onDeliveryComplete); if (rc != MQTTASYNC_SUCCESS) { spdlog::error("[mqtt] MQTTAsync_setCallbacks error"); return rc; } option.keepAliveInterval = 20; option.cleansession = 1; option.onSuccess = [](void* context, MQTTAsync_successData* resp) { static_cast(context)->onConnectSuccess(resp); };; option.onFailure = [](void* context, MQTTAsync_failureData* resp) { static_cast(context)->onConnectFaiure(resp); }; option.context = this; option.username = username.c_str(); option.password = password.c_str(); //断开重连设置 option.automaticReconnect = 1;//设置非零,断开自动重连 option.minRetryInterval = 5; //单位秒,重连间隔次数,每次重新连接失败时,重试间隔都会加倍,直到最大间隔 option.maxRetryInterval = 60;//单位秒,最大重连尝试间隔 rc = MQTTAsync_connect(client, &option); if (rc != MQTTASYNC_SUCCESS) { spdlog::error("[mqtt] MQTTAsync_connect error"); return rc; } return 0; //MQTTAsync_disconnect(client, NULL); //MQTTAsync_destroy(&client); } struct SubscribInfo { std::function callback; }; void MqttClient::subscribe(std::vector vecTopics, std::function callback) { SubscribInfo* info = new SubscribInfo(); info->callback = callback; MQTTAsync_responseOptions options = MQTTAsync_responseOptions_initializer; options.context = info; options.onSuccess = [](void* context, MQTTAsync_successData* response) { spdlog::info("[mqtt] subscribe success."); SubscribInfo* info = (SubscribInfo*)context; info->callback(0); delete info; }; options.onFailure = [](void* context, MQTTAsync_failureData* response) { spdlog::error("[mqtt] subscribe failed."); SubscribInfo* info = (SubscribInfo*)context; info->callback(-1); delete info; }; int count = 3; char* topicsTmp[] = { "topic/aa", "topic/bb", "topic/cc" }; std::vector qosTmp(count, 1); // 为每个主题指定 QoS if (count > 0) { int rc = MQTTAsync_subscribeMany(client, count, topicsTmp, qosTmp.data(), &options); if (rc != MQTTASYNC_SUCCESS) { spdlog::error("[mqtt] subscribe failed, err={}", rc); } } else { delete info; } } int MqttClient::publish(string topic, string text) { spdlog::info("MQTT publish: topic={}, text={}", topic, text); MQTTAsync_responseOptions options = MQTTAsync_responseOptions_initializer; //options.onSuccess = onSend; //options.onFailure = onSendFailure; options.context = this; MQTTAsync_message msg = MQTTAsync_message_initializer; msg.qos = 1; msg.payload = text.data(); msg.payloadlen = text.size(); msg.retained = 0; int rc = MQTTAsync_sendMessage(client, topic.c_str(), &msg, &options); if (rc == MQTTASYNC_SUCCESS) { spdlog::info("MQTT send message success, topic={}, text={}", topic, text); } else { spdlog::error("MQTT send message error, topic={}, text={}", topic, text); } return rc; } void MqttClient::onConnectionLost(char* cause) { this->isConnected = false; spdlog::error("MQTT connection lost, cause={}", cause); } int MqttClient::onMessageArrived(char* topic, int topicLen, MQTTAsync_message* msg) { int len = msg->payloadlen; char* payload = (char*)msg->payload; spdlog::info("MQTT message arrived: topic=[{},{}], payload len={}, payload msg={}", topic, msg->qos, len, payload); // 必须释放消息内存! MQTTAsync_freeMessage(&msg); MQTTAsync_free(topic); return 1; // 1表示消息已经处理 } // 交付完成回调(可选) void MqttClient::onDeliveryComplete(MQTTAsync_token token) { //spdlog::info("MQTT delivery complete, token={}", token); } void MqttClient::onConnectSuccess( MQTTAsync_successData* resp) { this->isConnected = true; //spdlog::info("[mqtt] connect success: {}", addr); //MQTTAsync_responseOptions options = MQTTAsync_responseOptions_initializer; //options.context = this; //options.onSuccess = [](void* context, MQTTAsync_successData* response) // { // spdlog::info("[mqtt] subscribe success."); // }; //options.onFailure = [](void* context, MQTTAsync_failureData* response) // { // spdlog::info("[mqtt] subscribe failed."); // }; // //for (auto& topic: vecTopic) //{ // int rc = MQTTAsync_subscribe(client, topic.c_str(), qos, &options); // if (rc != MQTTASYNC_SUCCESS) // { // spdlog::error("[mqtt] subscribe [{},{}] failed, err={}", topic, qos, rc); // } // else // { // spdlog::info("[mqtt] subscribe [{},{}] success", topic, qos); // } //} } void MqttClient::onConnectFaiure(MQTTAsync_failureData* resp) { this->isConnected = false; } string MQTT::packEquipmentInfo(mqtt::EquipmentInfo& info) { NJsonNode jsonroot; jsonroot["EquipmentID"] = info.EquipmentID.c_str(); jsonroot["ManufacturerID"] = info.ManufacturerID.c_str(); jsonroot["EquipmentModel"] = info.EquipmentModel.c_str(); jsonroot["ProductionDate"] = info.ProductionDate.c_str(); jsonroot["OpenForBusinessDate"] = info.OpenForBusinessDate.c_str(); jsonroot["EquipmentType"] = info.EquipmentType; return jsonroot.dump(); } string MQTT::packSwapEquipmentStatusInfo(string node_id) { NJsonNode jsonroot; return jsonroot.dump(); } string MQTT::packNotifyStationInfo() { NJsonNode jsonroot; return jsonroot.dump(); } string MQTT::packNotifyAlarm() { NJsonNode jsonroot; return jsonroot.dump(); } string MQTT::packNotifyChargeStatus() { NJsonNode jsonroot; return jsonroot.dump(); } string MQTT::packNotifySwapStatus() { NJsonNode jsonroot; return jsonroot.dump(); } string MQTT::packNotifyChargeOrder() { NJsonNode jsonroot; return jsonroot.dump(); } string MQTT::packNotifySwapOrder() { NJsonNode jsonroot; return jsonroot.dump(); }