mirror of
https://gitee.com/js-yhsec/energy_storage.git
synced 2026-05-27 18:59:26 +08:00
实现MQTT功能, 修改HTTP接口
This commit is contained in:
@@ -4,17 +4,31 @@
|
||||
|
||||
#define TIMEOUT 10000L
|
||||
|
||||
int MqttClient::init(string addr, string client_id, string username, string password, std::vector<std::string> vecTopic)
|
||||
int MqttClient::init(string addr, string clientId, string username, string password)
|
||||
{
|
||||
this->addr = addr;
|
||||
this->vecTopic = vecTopic;
|
||||
this->clientId = clientId;
|
||||
this->vecTopic = {
|
||||
"up/json/" + clientId + "/EMS_YX",
|
||||
"up/json/" + clientId + "/EMS_YC",
|
||||
"up/json/" + clientId + "/EMS_YT",
|
||||
"up/json/" + clientId + "/PCU_YX",
|
||||
"up/json/" + clientId + "/PCU_YC",
|
||||
"up/json/" + clientId + "/PCS_YX",
|
||||
"up/json/" + clientId + "/PCS_YC",
|
||||
"up/json/" + clientId + "/BCU_YX",
|
||||
"up/json/" + clientId + "/BCU_YC",
|
||||
"up/json/" + clientId + "/BMS_YX",
|
||||
"up/json/" + clientId + "/BMS_YC",
|
||||
"up/json/" + clientId + "/MEM_YC",
|
||||
};
|
||||
|
||||
MQTTAsync_connectOptions option = MQTTAsync_connectOptions_initializer;
|
||||
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
|
||||
int rc {0};
|
||||
|
||||
// "tcp://localhost:1883"
|
||||
rc = MQTTAsync_create(&client, addr.c_str(), client_id.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL);
|
||||
rc = MQTTAsync_create(&client, addr.c_str(), clientId.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL);
|
||||
if (rc != MQTTASYNC_SUCCESS)
|
||||
{
|
||||
spdlog::error("[mqtt] MQTTAsync_create error: {}", rc);
|
||||
@@ -42,6 +56,7 @@ int MqttClient::init(string addr, string client_id, string username, string pass
|
||||
if (rc != MQTTASYNC_SUCCESS)
|
||||
{
|
||||
spdlog::error("[mqtt] MQTTAsync_setCallbacks error");
|
||||
this->destory();
|
||||
return rc;
|
||||
}
|
||||
|
||||
@@ -70,55 +85,43 @@ int MqttClient::init(string addr, string client_id, string username, string pass
|
||||
//MQTTAsync_destroy(&client);
|
||||
}
|
||||
|
||||
void MqttClient::destory()
|
||||
{
|
||||
if (client)
|
||||
{
|
||||
MQTTAsync_destroy(&client);
|
||||
client = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
struct SubscribInfo
|
||||
{
|
||||
std::function<void(int id)> callback;
|
||||
};
|
||||
|
||||
void MqttClient::subscribe(std::vector<std::string> vecTopics, std::function<void(int)> callback)
|
||||
void MqttClient::subscribe()
|
||||
{
|
||||
SubscribInfo* info = new SubscribInfo();
|
||||
info->callback = callback;
|
||||
MQTTAsync_onSuccess* funcSuccess = [](void* context, MQTTAsync_successData* response)
|
||||
{
|
||||
spdlog::info("[mqtt] subscribe {} success.", (char*)context);
|
||||
};
|
||||
MQTTAsync_onFailure* funcFailure = [](void* context, MQTTAsync_failureData* response)
|
||||
{
|
||||
spdlog::error("[mqtt] subscribe {} failed.", (char*)context);
|
||||
};
|
||||
|
||||
MQTTAsync_responseOptions options = MQTTAsync_responseOptions_initializer;
|
||||
options.context = info;
|
||||
options.onSuccess = [](void* context, MQTTAsync_successData* response)
|
||||
{
|
||||
spdlog::info("[mqtt] subscribe success.");
|
||||
SubscribInfo* info = (SubscribInfo*)context;
|
||||
info->callback(0);
|
||||
delete info;
|
||||
|
||||
};
|
||||
options.onFailure = [](void* context, MQTTAsync_failureData* response)
|
||||
{
|
||||
spdlog::error("[mqtt] subscribe failed.");
|
||||
SubscribInfo* info = (SubscribInfo*)context;
|
||||
info->callback(-1);
|
||||
delete info;
|
||||
};
|
||||
|
||||
|
||||
int count = 3;
|
||||
char* topicsTmp[] = {
|
||||
"topic/aa",
|
||||
"topic/bb",
|
||||
"topic/cc"
|
||||
};
|
||||
std::vector<int> qosTmp(count, 1); // 为每个主题指定 QoS
|
||||
|
||||
if (count > 0)
|
||||
options.onSuccess = funcSuccess;
|
||||
options.onFailure = funcFailure;
|
||||
for (auto& topic: vecTopic)
|
||||
{
|
||||
int rc = MQTTAsync_subscribeMany(client, count, topicsTmp, qosTmp.data(), &options);
|
||||
options.context = topic.data();
|
||||
int rc = MQTTAsync_subscribe(client, topic.data(), qos, &options);
|
||||
if (rc != MQTTASYNC_SUCCESS)
|
||||
{
|
||||
spdlog::error("[mqtt] subscribe failed, err={}", rc);
|
||||
spdlog::error("[mqtt] subscribe [{},{}] failed, err={}", topic, qos, rc);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
delete info;
|
||||
}
|
||||
}
|
||||
|
||||
int MqttClient::publish(string topic, string text)
|
||||
@@ -151,18 +154,45 @@ int MqttClient::publish(string topic, string text)
|
||||
void MqttClient::onConnectionLost(char* cause)
|
||||
{
|
||||
this->isConnected = false;
|
||||
this->destory();
|
||||
spdlog::error("MQTT connection lost, cause={}", cause);
|
||||
}
|
||||
|
||||
std::string GetSubStr(std::string c, std::string& str)
|
||||
{
|
||||
std::string v;
|
||||
int pos = str.find_first_of("/");
|
||||
if (pos != string::npos)
|
||||
{
|
||||
v = str.substr(0, pos);
|
||||
str = str.substr(pos);
|
||||
}
|
||||
return v;
|
||||
}
|
||||
|
||||
int MqttClient::onMessageArrived(char* topic, int topicLen, MQTTAsync_message* msg)
|
||||
{
|
||||
std::string topicStr = topic;
|
||||
int len = msg->payloadlen;
|
||||
char* payload = (char*)msg->payload;
|
||||
std::string payload = (char*)msg->payload;
|
||||
spdlog::info("MQTT message arrived: topic=[{},{}], payload len={}, payload msg={}", topic, msg->qos, len, payload);
|
||||
|
||||
// <数据方向>/<数据格式>/<厂家ID>/<指合>/<设备标识,上行可选>
|
||||
std::string direction = GetSubStr("/", topicStr);
|
||||
std::string datatype = GetSubStr("/", topicStr);
|
||||
std::string stationId = GetSubStr("/", topicStr);
|
||||
std::string command = GetSubStr("/", topicStr);
|
||||
std::string deviceCode = GetSubStr("/", topicStr);
|
||||
|
||||
if (command == "EMS_YX") {}
|
||||
else if (command == "EMS_YC") {}
|
||||
else if (command == "PCU_YX") { this->parsePCU_YX(payload); }
|
||||
else if (command == "PCU_YC") {}
|
||||
|
||||
// 必须释放消息内存!
|
||||
MQTTAsync_freeMessage(&msg);
|
||||
MQTTAsync_free(topic);
|
||||
|
||||
return 1; // 1表示消息已经处理
|
||||
}
|
||||
|
||||
@@ -175,8 +205,9 @@ void MqttClient::onDeliveryComplete(MQTTAsync_token token)
|
||||
|
||||
void MqttClient::onConnectSuccess( MQTTAsync_successData* resp)
|
||||
{
|
||||
spdlog::info("[mqtt] connect to {} success.", addr);
|
||||
this->isConnected = true;
|
||||
//spdlog::info("[mqtt] connect success: {}", addr);
|
||||
this->subscribe();
|
||||
//MQTTAsync_responseOptions options = MQTTAsync_responseOptions_initializer;
|
||||
//options.context = this;
|
||||
//options.onSuccess = [](void* context, MQTTAsync_successData* response)
|
||||
@@ -203,63 +234,23 @@ void MqttClient::onConnectSuccess( MQTTAsync_successData* resp)
|
||||
}
|
||||
void MqttClient::onConnectFaiure(MQTTAsync_failureData* resp)
|
||||
{
|
||||
spdlog::error("[mqtt] connect to {} error.", addr);
|
||||
this->isConnected = false;
|
||||
this->destory();
|
||||
}
|
||||
|
||||
string MQTT::packEquipmentInfo(mqtt::EquipmentInfo& info)
|
||||
|
||||
void MqttClient::parseEMS_YC(std::string& text)
|
||||
{
|
||||
|
||||
}
|
||||
void MqttClient::parsePCU_YX(std::string& text)
|
||||
{
|
||||
NJsonNode jsonroot;
|
||||
jsonroot["EquipmentID"] = info.EquipmentID.c_str();
|
||||
jsonroot["ManufacturerID"] = info.ManufacturerID.c_str();
|
||||
jsonroot["EquipmentModel"] = info.EquipmentModel.c_str();
|
||||
jsonroot["ProductionDate"] = info.ProductionDate.c_str();
|
||||
jsonroot["OpenForBusinessDate"] = info.OpenForBusinessDate.c_str();
|
||||
jsonroot["EquipmentType"] = info.EquipmentType;
|
||||
return jsonroot.dump();
|
||||
}
|
||||
|
||||
|
||||
string MQTT::packSwapEquipmentStatusInfo(string node_id)
|
||||
string MQTT::packEquipmentInfo()
|
||||
{
|
||||
NJsonNode jsonroot;
|
||||
return jsonroot.dump();
|
||||
}
|
||||
|
||||
|
||||
string MQTT::packNotifyStationInfo()
|
||||
{
|
||||
NJsonNode jsonroot;
|
||||
return jsonroot.dump();
|
||||
}
|
||||
|
||||
|
||||
string MQTT::packNotifyAlarm()
|
||||
{
|
||||
NJsonNode jsonroot;
|
||||
return jsonroot.dump();
|
||||
}
|
||||
|
||||
string MQTT::packNotifyChargeStatus()
|
||||
{
|
||||
NJsonNode jsonroot;
|
||||
return jsonroot.dump();
|
||||
}
|
||||
|
||||
string MQTT::packNotifySwapStatus()
|
||||
{
|
||||
NJsonNode jsonroot;
|
||||
return jsonroot.dump();
|
||||
}
|
||||
|
||||
string MQTT::packNotifyChargeOrder()
|
||||
{
|
||||
NJsonNode jsonroot;
|
||||
return jsonroot.dump();
|
||||
}
|
||||
|
||||
|
||||
string MQTT::packNotifySwapOrder()
|
||||
{
|
||||
NJsonNode jsonroot;
|
||||
return jsonroot.dump();
|
||||
}
|
||||
Reference in New Issue
Block a user