#include "MqttEntity.h" #include "common/Spdlogger.h" #include "common/Utils.h" #include "app/Application.h" #include "app/AppData.h" #include "app/Station.h" #include "app/Device.h" #include "app/DataStruct.h" #define TIMEOUT 10000L int MqttClient::init(string addr, string clientId, string username, string password) { if (addr.empty()) { return MQTTASYNC_FAILURE; } if (isConnected) { return MQTTASYNC_SUCCESS; } isConnected = false; this->addr = addr; this->clientId = clientId; //this->mapTopicInfo["EMS_YX"] = TopicInfo("EMS_YX", 101); this->mapTopicInfo["EMS_YC"] = TopicInfo("EMS_YC", 101); //this->mapTopicInfo["EMS_YT"] = TopicInfo("EMS_YT", 101); //this->mapTopicInfo["PCS_YX"] = TopicInfo("PCS_YX", 102, 1); //this->mapTopicInfo["PCS_YC"] = TopicInfo("PCS_YC", 102, 1); //this->mapTopicInfo["PCU_YX"] = TopicInfo("PCU_YX", 103); //this->mapTopicInfo["PCU_YC"] = TopicInfo("PCU_YC", 103); //this->mapTopicInfo["BMS_YX"] = TopicInfo("BMS_YX", 104); // BMS没有遥信 //this->mapTopicInfo["BMS_YC"] = TopicInfo("BMS_YC", 104); //this->mapTopicInfo["BCU_YX"] = TopicInfo("BCU_YX", 105, 1); //this->mapTopicInfo["BCU_YC"] = TopicInfo("BCU_YC", 105, 1); //this->mapTopicInfo["MEM_YC"] = TopicInfo("MEM_YC", 3, 1); //this->mapTopicInfo["TH_YC"] = TopicInfo("TH_YC", 10, 1); //this->mapTopicInfo["Fire40_YX"] = TopicInfo("Fire40_YX", 7, 1); this->mapTopicInfo["Cooling_YC"] = TopicInfo("Cooling_YC", 14, 1); this->mapTopicInfo["Cooling_YX"] = TopicInfo("Cooling_YX", 14, 1); //this->mapTopicInfo["Gateway_YX"] = TopicInfo("Gateway_YX", 15, 1); //this->mapTopicInfo["Gateway_YC"] = TopicInfo("Gateway_YC", 15, 1); //this->mapTopicInfo["Charger_YC"] = TopicInfo("Charger_YC", 106, 1); MQTTAsync_connectOptions option = MQTTAsync_connectOptions_initializer; MQTTAsync_message pubmsg = MQTTAsync_message_initializer; int rc {0}; // "tcp://localhost:1883" std::string str = "ESS-" + std::to_string(Utils::random(1000, 9999)) + "-" +clientId; rc = MQTTAsync_create(&client, addr.c_str(), str.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL); if (rc != MQTTASYNC_SUCCESS) { spdlog::error("[mqtt] MQTTAsync_create error: {}", rc); return rc; } MQTTAsync_connectionLost* onConnectionLost = [](void* context, char* cause) { static_cast(context)->onConnectionLost(cause); }; MQTTAsync_messageArrived* onMessageArrived = [](void* context, char* topicName, int topicLen, MQTTAsync_message* message)->int { return static_cast(context)->onMessageArrived(topicName, topicLen, message); }; MQTTAsync_deliveryComplete* onDeliveryComplete = [](void* context, MQTTAsync_token token) { }; //设置连接丢失、接受消息的回调函数 rc = MQTTAsync_setCallbacks(client, this, onConnectionLost, onMessageArrived, onDeliveryComplete); if (rc != MQTTASYNC_SUCCESS) { spdlog::error("[mqtt] MQTTAsync_setCallbacks error"); this->destory(); return rc; } option.keepAliveInterval = 20; option.cleansession = 1; option.onSuccess = [](void* context, MQTTAsync_successData* resp) { static_cast(context)->onConnectSuccess(resp); };; option.onFailure = [](void* context, MQTTAsync_failureData* resp) { static_cast(context)->onConnectFaiure(resp); }; option.context = this; option.username = username.c_str(); option.password = password.c_str(); //断开重连设置 option.automaticReconnect = 1;//设置非零,断开自动重连 option.minRetryInterval = 5; //单位秒,重连间隔次数,每次重新连接失败时,重试间隔都会加倍,直到最大间隔 option.maxRetryInterval = 60;//单位秒,最大重连尝试间隔 rc = MQTTAsync_connect(client, &option); if (rc != MQTTASYNC_SUCCESS) { spdlog::error("[mqtt] MQTTAsync_connect error"); return rc; } return 0; //MQTTAsync_disconnect(client, NULL); //MQTTAsync_destroy(&client); } void MqttClient::destory() { return; if (client) { MQTTAsync_destroy(&client); } } struct SubscribInfo { std::function callback; }; void MqttClient::subscribe() { MQTTAsync_onSuccess* funcSuccess = [](void* context, MQTTAsync_successData* response) { spdlog::info("[mqtt] subscribe {} success.", (char*)context); }; MQTTAsync_onFailure* funcFailure = [](void* context, MQTTAsync_failureData* response) { spdlog::error("[mqtt] subscribe {} failed.", (char*)context); }; MQTTAsync_responseOptions options = MQTTAsync_responseOptions_initializer; options.onSuccess = funcSuccess; options.onFailure = funcFailure; for (auto& item: mapTopicInfo) { std::string topic = "up/json/" + clientId + "/" + item.first; options.context = (void*)&item.first; int rc = MQTTAsync_subscribe(client, topic.data(), qos, &options); if (rc != MQTTASYNC_SUCCESS) { spdlog::error("[mqtt] subscribe [{},{}] failed, err={}", topic, qos, rc); } else { spdlog::info("[mqtt] subscribe [{},{}] ", topic, qos); } } } int MqttClient::publish(std::string topic, std::string text) { if (!client) return 0; MQTTAsync_responseOptions options = MQTTAsync_responseOptions_initializer; options.onSuccess = [](void* context, MQTTAsync_successData* response) {}; options.onFailure = [](void* context, MQTTAsync_failureData* response) {}; options.context = this; MQTTAsync_message msg = MQTTAsync_message_initializer; msg.qos = this->qos; msg.payload = text.data(); msg.payloadlen = text.size(); msg.retained = 0; std::string topicName = "down/json/" + clientId + "/" + topic; int rc = MQTTAsync_sendMessage(client, topicName.c_str(), &msg, &options); if (rc == MQTTASYNC_SUCCESS) { spdlog::info("[mqtt] publish success, topic={}, text={}", topicName, text); } else { spdlog::error("[mqtt] publish error, topic={}, text={}", topicName, text); } return 0; } int MqttClient::polling() { if (!isConnected) { spdlog::error("[mqtt] poll error, mqtt is not connected, clientId={}", clientId); return 0; } njson json; json["ts"] = Utils::time(); json["no"] = 0; // 设备编号 auto& appdata = Application::data(); auto station = appdata.getStationByCode(clientId); if (!station) { spdlog::error("[mqtt] poll error, get station NULL, mqtt clientId={}", clientId); return 0; } for (auto& item: mapTopicInfo) { auto& topicInfo = item.second; if (topicInfo.polling) { std::vector> vecDevice; station->getDeviceByType(topicInfo.deviceType, vecDevice); for (auto device: vecDevice) { json["no"] = Utils::toInt(device->code); if (topicInfo.name == "Gateway_YC") { json["addr"] = {"40001", "40002", "40021", "40038"}; } else { json["addr"] = njson::array(); } this->publish(topicInfo.name, json.dump()); } } } return 0; } void MqttClient::onConnectionLost(char* cause) { this->isConnected = false; //this->destory(); spdlog::error("MQTT connection lost, cause={}", cause); } std::string GetSubStr(std::string c, std::string& str) { std::string v; int pos = str.find_first_of("/"); if (pos != string::npos) { v = str.substr(0, pos); str = str.substr(pos+1); } else { v = str; str = ""; } return v; } // 交付完成回调(可选) void MqttClient::onDeliveryComplete(MQTTAsync_token token) { //spdlog::info("MQTT delivery complete, token={}", token); } void MqttClient::onConnectSuccess( MQTTAsync_successData* resp) { spdlog::info("[mqtt] connect to {} success, clientId={}.", addr, clientId); this->isConnected = true; this->subscribe(); } void MqttClient::onConnectFaiure(MQTTAsync_failureData* resp) { spdlog::error("[mqtt] connect to {} error, clientId={}.", addr, clientId); this->isConnected = false; this->destory(); } void MqttClient::ParseArrivedMessage(njson& json, string clientId, string command, std::shared_ptr station) { std::string stationNo = clientId; auto mapRegPtr = REGAddr::getRegMap(command); if (!mapRegPtr) { spdlog::error("[mqtt] get register add info error, clientId={}, stationId={}, command={}", clientId, stationNo, command); return; } auto iterTopic = mapTopicInfo.find(command); if (iterTopic == mapTopicInfo.end()) { spdlog::error("[mqtt] get topic info error, clientId={}, stationId={}, command={}", clientId, stationNo, command); return; } TopicInfo& topicInfo = iterTopic->second; int deviceNo = -1; JSON::read(json, "no", deviceNo); auto device = station->getDeviceByType(topicInfo.deviceType, Utils::toStr(deviceNo)); if (!device) { return; } for (auto& item: json.items()) { std::string key = item.key(); if (key != "ts" && key != "no") { auto data = json.at(key); if (data.is_array()) { std::string addrText; auto iter = mapRegPtr->find(key); for (int i = 0; iend()) { auto addr = iter->first; int val = data[i]; device->setParam(addr, val); spdlog::info("[mqtt] read [{}]={},{}", addr, val, iter->second.remark); if (command == "EMS_YC" && addr == "0x110C") { int a = 30; a = 100; } if (command == "EMS_YC") { station->setRuntimeData(addr, val); } else if (command == "Fire40_YX") { station->setFire40Data(deviceNo, addr, val); } else if (command == "TH_YC") { station->setTHData(deviceNo, addr, val); } else if (command == "Cooling_YX" || command == "Cooling_YC") { station->setCoolingData(deviceNo, addr, val); } else if (command == "Gateway_YX") { //if (key == "CDZ") "CDZ": 1, //充电桩 1:在线,0:离线 //else if (key == "EMU") //储能 1:在线,0:离线 } ++iter; } } } else if (data.is_number()) { device->setParam(key, data.get()); } else if (data.is_string()) { device->setParam(key, Utils::toInt(data.get())); } } } } int MqttClient::onMessageArrived(char* topic, int topicLen, MQTTAsync_message* msg) { std::string topicStr = topic; int len = msg->payloadlen; std::string payload((const char*)msg->payload, len); // <数据方向>/<数据格式>/<厂家ID>/<指合>/<设备标识,上行可选> std::string direction = GetSubStr("/", topicStr); std::string datatype = GetSubStr("/", topicStr); std::string stationNo = GetSubStr("/", topicStr); std::string command = GetSubStr("/", topicStr); std::string deviceCode = GetSubStr("/", topicStr); spdlog::info("[mqtt] <<<<<<<<<< message arrived: topic=[{},{}], len={}, payload={}", topic, msg->qos, len, payload); njson json; bool ret = JSON::parse(payload, json); if (!ret) { spdlog::error("[mqtt] json parse error."); return 1; } auto station = Application::data().getStationByCode(stationNo); if (!station) { spdlog::error("[mqtt] get station error, clientId={}, stationId={}", clientId, stationNo); return 1; } if (command == "Gateway_YC") { for (auto& item: json.items()) { std::string key = item.key(); auto& val = item.value(); if (key == "40001") { spdlog::info("[mqtt] read register addr: [{}]={}, {}", key, val.dump(), "模式"); } else if (key == "40002") { spdlog::info("[mqtt] read register addr: [{}]={}, {}", key, val.dump(), "峰谷时间段"); } else if (key == "40021") { spdlog::info("[mqtt] read register addr: [{}]={}, {}", key, val.dump(), "自定时间段"); } else if (key == "40038") { spdlog::info("[mqtt] read register addr: [{}]={}, {}", key, val.dump(), "其他参数"); } } } else if (command == "Gateway_YX") { for (auto& item: json.items()) { std::string key = item.key(); auto& val = item.value(); if (key == "cdz") { spdlog::info("[mqtt] read register addr: [{}]={}, {}", key, val.dump(), "充电桩通讯状态"); } else if (key == "emu") { spdlog::info("[mqtt] read register addr: [{}]={}, {}", key, val.dump(), "充电桩通讯状态"); } } } else { ParseArrivedMessage(json, clientId, command, station); } // 必须释放消息内存! MQTTAsync_freeMessage(&msg); MQTTAsync_free(topic); return 1; // 1表示消息已经处理 } string MQTT::pack(std::string name) { njson json; json["ts"] = Utils::time(); json["no"] = 1; if (name == "EMS_YC") { //A相电压 R uint32 1V 0x107E //B相电压 R uint32 1V 0x1080 //C相电压 R uint32 1V 0x1082 //A相电流 R int32 1A 0x1084 //B相电流 R int32 1A 0x1086 //C相电流 R int32 1A 0x1088 //储能系统SOC R uint16 0.1 0x107A //储能系统SOH R uint16 0.1 0x107B json["addr"] = {"0x107A", "0x107B", "0x107E", "0x1080", "0x1082", "0x1084", "0x1086", "0x1088"}; } else if (name == "PCS_YC") { //总充电量 R uint32 1kWh 0x0003 //总放电量 R uint32 1kWh 0x0005 //A相电压 R int16 1V 0x0010 //B相电压 R int16 1V 0x0011 //C相电压 R int16 1V 0x0012 //A相电流 R int16 1A 0x0019 //B相电流 R int16 1A 0x001A //C相电流 R int16 1A 0x001B //三相总有功功率 R int16 1kW 0x0025 //三相总无功功率 R int16 1kVar 0x0026 //三相总视在功率 R int16 1kVA 0x0027 //三相总功率因数 R int16 1 0x0028 //充电功率 R int16 1kW 0x002C //放电功率 R int16 1kW 0x002D json["addr"] = {"0x0003", "0x0005", "0x0010", "0x0011", "0x0012", "0x0019", "0x001A", "0x001B", "0x0025", "0x0026", "0x0027", "0x0028", "0x002C", "0x002D"}; } else if (name == "PCU_YC") { //PCS侧线A相电压 R int16 1v 0x0013 //PCS侧线B相电压 R int16 1v 0x0014 //PCS侧线C相电压 R int16 1v 0x0015 //PCS侧功率因数A R int16 1 0x0019 //PCS侧功率因数B R int16 1 0x001A //PCS侧功率因数C R int16 1 0x001B //PCS侧相电流A R int16 1A 0x001C //PCS侧相电流B R int16 1A 0x001D //PCS侧相电流C R int16 1A 0x001E //PCS侧三相总有功功率 R int16 1kW 0x0028 //PCS侧三相总无功功率 R int16 1kVar 0x0029 //PCS侧三相总视在功率 R int16 1kVA 0x002A //PCS侧三相总功率因数 R int16 1 0x002B json["addr"] = {"0x0013", "0x0014", "0x0015", "0x1080", "0x1082", "0x1084", "0x1086", "0x1088"}; } else if (name == "BMS_YC") { //SOC R uint16 0.1 0x0001 //SOH R uint16 0.1 0x0002 //电压 R uint32 0.1V 0x0003 //电流 R int32 0.1A 0x0005 //可充电量 R uint32 1kWh 0x0007 //可放电量 R uint32 1kWh 0x0009 //可充电状态 R uint16 1:可充电;0:不可充电 0x0047 //可放电状态 R uint16 1:可放电;0:不可放电 0x0048 //运行状态 R uint16 运行状态 0-正常 1-告警 2-保护 0x0049 //充放电状态 R uint16 0-待机 1-充电 2-放电 0x004A json["addr"] = {"0x0001", "0x0002", "0x0003", "0x0005", "0x0007", "0x0009", "0x0047", "0x0048", "0x0049", "0x004A"}; } else if (name == "BCU_YC") { //电表类型 R uint16 "0:储能站总表 1:逆变前侧电表 2:逆变后侧电表 3:配电柜电表 4:并网口电表" 0x0008 //A相电压 R uint32 1V 0x000B //B相电压 R uint32 1V 0x000D //C相电压 R uint32 1V 0x000F //A相电流 R int32 1A 0x0011 //B相电流 R int32 1A 0x0013 //C相电流 R int32 1A 0x0015 //尖段电价 R uint32 1RMB 0x0027 //峰段电价 R uint32 1RMB 0x0029 //平段电价 R uint32 1RMB 0x002B //谷段电价 R uint32 1RMB 0x002D //日充电电量 R uint32 1kWh 0x002F //日放电电量 R uint32 1kWh 0x0031 //日充电费用 R uint32 1RMB 0x0033 //日放电费用 R uint32 1RMB 0x0035 //日收益 R int32 1RMB 0x0037 //总充电电量 R uint32 1kWh 0x004D //总放电电量 R uint32 1kWh 0x004F //总充电费用 R uint32 1RMB 0x0051 //总放电费用 R uint32 1RMB 0x0053 //总收益 R int32 1RMB 0x0055 } else if (name == "TH_YC") { //所属通道号 R uint16 1 0x0001 //所属温湿度号 R uint16 1~10 0x0002 //温度 R int16 0.1℃ 0x0003 //湿度 R int16 0.1℃ 0x0004 } return json.dump(); }