实现HTTP服务架构

This commit is contained in:
lixiaoyuan
2025-08-31 14:38:53 +08:00
parent 4af4e670d2
commit e0b64a20c4
46 changed files with 1436 additions and 545 deletions

View File

@@ -11,7 +11,7 @@ std::shared_ptr<CommEntity> CommEntity::create(Fields& data)
if (commType == "TCP")
{
auto entity = std::make_shared<TcpEntity>();
entity->setHost(ip, port, isclient);
entity->setAddr(ip, port, isclient);
return entity;
}
else if (commType == "MODBUS")

View File

@@ -18,17 +18,16 @@ public:
// 启动通讯连接
virtual int start() { return 0; };
// 关闭通讯连接
virtual void close() { isCloseRequest_ = true; };
virtual void close() { isCloseRequest = true; };
std::string id() { return id_; }
bool isAlive() { return isAlive_; }
bool isConnected() { return isConnected_; }
public:
std::string id_;
bool isAlive_ = false;
bool isConnected_ = false;
bool isCloseRequest_ = false;
std::string type;
int commtype = 0;
bool alive = false;
bool isConnected = false;
bool isCloseRequest = false;
};

502
src/protocol/HttpEntity.cpp Normal file
View File

@@ -0,0 +1,502 @@
#include "HttpEntity.h"
#include "database/Dao.h"
#include <functional>
#include "common/Utils.h"
#include "common/Snowflake.h"
#include "app/Application.h"
#include "app/AppData.h"
static NJsonNode FieldsToJsonArray(std::vector<Fields> vecFields)
{
NJsonNode jsonnode = NJsonNode::array();
for (auto& fields : vecFields)
{
NJsonNode jnode;
for (auto& item : fields.map())
{
jnode[item.first] = item.second;
}
jsonnode.push_back(jnode);
}
return jsonnode;
}
static void GetRequestParam(const httplib::Request& req, const std::vector<std::string>& vecKeys, Fields& fields)
{
for (auto& key : vecKeys)
{
if (req.has_param(key))
{
fields.set(key, req.get_param_value(key));
}
}
}
class HttpHelper
{
public:
static bool CheckRequestParam(const httplib::Request& req, httplib::Response& resp, const std::vector<std::string>& vecKeys, std::string& errmsg)
{
errmsg = "";
for (auto& key : vecKeys)
{
if (!req.has_param(key))
{
if (!errmsg.empty()) { errmsg += ","; }
errmsg += "缺少参数[" + key + "]";
}
}
if (!errmsg.empty())
{
return false;
}
return true;
}
static void setPagination(PageInfo& pageinfo, std::vector<Fields> result, NJsonNode& json)
{
json["count"] = pageinfo.total;
json["page"] = pageinfo.index;
json["page_size"] = pageinfo.size;
json["data"] = FieldsToJsonArray(result);
}
};
using HandlerFunc = Errcode(HttpEntity::*)(const httplib::Request& req, httplib::Response& resp, NJsonNode& jnode);
struct HandlerOptions
{
HandlerFunc func;
std::vector<std::string> requiredKeys;
std::vector<std::string> keys;
HandlerOptions(HandlerFunc func, const std::vector<std::string>& requiredKeys)
: func(func), requiredKeys(requiredKeys)
{
}
};
static std::map<std::string, HandlerOptions> g_mapHttpHandler =
{
{"/login", HandlerOptions(&HttpEntity::login, {DMUser::ACCOUNT, DMUser::PASSWD})},
{"/queryUserList", HandlerOptions(&HttpEntity::queryUserList, {"token"})},
{"/insertUser", HandlerOptions(&HttpEntity::insertUser, {"token", DMUser::ACCOUNT})},
{"/updateUser", HandlerOptions(&HttpEntity::updateUser, {"token", DMUser::USER_ID})},
{"/deleteUser", HandlerOptions(&HttpEntity::deleteUser, {"token", DMUser::USER_ID})},
{"/queryPermissionList", HandlerOptions(&HttpEntity::queryPermissionList, {"token"})},
{"/insertPermission", HandlerOptions(&HttpEntity::insertPermission, {"token", DMPermission::NAME})},
{"/updatePermission", HandlerOptions(&HttpEntity::updatePermission, {"token", DMPermission::PERMISSION_ID})},
{"/deletePermission", HandlerOptions(&HttpEntity::deletePermission, {"token", DMPermission::PERMISSION_ID})},
{"/queryRoleList", HandlerOptions(&HttpEntity::queryRoleList, {"token"})},
{"/insertRole", HandlerOptions(&HttpEntity::insertRole, {"token", DMRole::NAME})},
{"/updateRole", HandlerOptions(&HttpEntity::updateRole, {"token", DMRole::ROLE_ID})},
{"/deleteRole", HandlerOptions(&HttpEntity::deleteRole, {"token", DMRole::ROLE_ID})},
{"/queryStationList", HandlerOptions(&HttpEntity::queryStationList, {"token"})},
{"/insertStation", HandlerOptions(&HttpEntity::insertStation, {"token", DMStation::NAME})},
{"/updateStation", HandlerOptions(&HttpEntity::updateStation, {"token", DMStation::STATION_ID})},
{"/deleteStation", HandlerOptions(&HttpEntity::deleteStation, {"token", DMStation::STATION_ID})},
{"/queryDeviceList", HandlerOptions(&HttpEntity::queryDeviceList, {"token"})},
{"/insertDevice", HandlerOptions(&HttpEntity::insertDevice, {"token", DMDevice::NAME})},
{"/updateDevice", HandlerOptions(&HttpEntity::updateDevice, {"token", DMDevice::DEVICE_ID})},
{"/deleteDevice", HandlerOptions(&HttpEntity::deleteDevice, {"token", DMDevice::DEVICE_ID})},
{"/queryDevicTypeDef", HandlerOptions(&HttpEntity::queryDevicTypeDef, {"token"})},
{"/queryPolicyList", HandlerOptions(&HttpEntity::queryPolicyList, {"token"})},
{"/insertPolicy", HandlerOptions(&HttpEntity::insertPolicy, {"token", DMPolicy::NAME})},
{"/updatePolicy", HandlerOptions(&HttpEntity::updatePolicy, {"token", DMPolicy::POLICY_ID})},
{"/deletePolicy", HandlerOptions(&HttpEntity::deletePolicy, {"token", DMPolicy::POLICY_ID})},
{"/querySystemLogList", HandlerOptions(&HttpEntity::querySystemLogList, {"token"})},
{"/queryAlertLogList", HandlerOptions(&HttpEntity::queryAlertLogList, {"token"})},
{"/queryPredictionDetail", HandlerOptions(&HttpEntity::queryPredictionDetail, {"token"})},
//{"/insert", HandlerOptions(&HttpEntity::insert, {})},
//{"/update", HandlerOptions(&HttpEntity::update, {})},
//{"/delete", HandlerOptions(&HttpEntity::delete, {})},
};
void HttpEntity::listen(std::string addr, int port)
{
for (auto& item : g_mapHttpHandler)
{
std::string name = item.first;
HandlerOptions& handler = item.second;
this->httpsvr.Get(name, [=, &handler](const httplib::Request& req, httplib::Response& resp)
{
NJsonNode json;
Errcode errcode = Errcode::OK;
if (name != "/login" && Config::option.useToken)
{
// 验证token
std::string token = req.get_param_value("token");
if (token.empty())
{
errcode = Errcode::ERR_TOKEN;
}
else
{
User user = Application::data().getUser(token);
if (user.userId.empty())
{
errcode = Errcode::ERR_TOKEN;
}
}
}
std::string errmsg;
if (errcode == Errcode::OK)
{
if (!HttpHelper::CheckRequestParam(req, resp, handler.requiredKeys, errmsg))
{
errcode = Errcode::ERR_PARAM;
}
else
{
errcode = (this->*(handler.func))(req, resp, json);
}
}
json["errcode"] = errcode;
json["errmsg"] = ErrcodeStr(errcode) + (errmsg.empty() ? "" : (":"+errmsg));
resp.set_content(json.dump(), "text/plain; charset=utf-8");
resp.status = 200;
});
}
if (addr.empty()) addr = "0.0.0.0";
httpsvr.listen(addr, port);
}
void HttpEntity::registGet(std::string name, void (HttpEntity::* func)(const httplib::Request& req, httplib::Response& resp))
{
this->httpsvr.Get(name, std::bind(func, this, std::placeholders::_1, std::placeholders::_2));
}
Errcode HttpEntity::login(const httplib::Request& req, httplib::Response& resp, NJsonNode& json)
{
std::string userId;
std::string token;
std::string account = req.get_param_value("account");
std::string passwd = req.get_param_value("passwd");
Fields fields;
auto dao = DaoEntity::create("");
Errcode err = DAO::login(dao, account, passwd, fields);
userId = fields.value(DMUser::USER_ID);
token = Application::data().userLogin(userId, account);
if (err == Errcode::OK)
{
json["token"] = token;
std::vector<Fields> vecPermission;
int roleId = fields.get<int>(DMRole::ROLE_ID);
DAO::queryRolePermission(dao, roleId, vecPermission);
NJsonNode jnode = NJsonNode::array();
for (auto& item : vecPermission) { jnode.push_back(item.value("name")); }
json["permission"] = jnode;
}
DAO::insertSystemLogUser(token, "用户登录:" + ErrcodeStr(err), (err==Errcode::OK) ? 0: 1);
return err;
}
Errcode HttpEntity::queryUserList(const httplib::Request& req, httplib::Response& resp, NJsonNode& json)
{
std::string token = req.get_param_value("token");
PageInfo pageinfo;
pageinfo.index = Utils::toInt(req.get_param_value("page"));
pageinfo.size = Utils::toInt(req.get_param_value("page_size"));
std::vector<Fields> result;
auto err = DAO::queryUserList(pageinfo, result);
if (err == Errcode::OK)
{
HttpHelper::setPagination(pageinfo, result, json);
}
DAO::insertSystemLogUser(token, "查询用户列表:" + ErrcodeStr(err), (err==Errcode::OK) ? 0 : 1);
return err;
}
Errcode HttpEntity::insertUser(const httplib::Request& req, httplib::Response& resp, NJsonNode& json)
{
Fields params;
GetRequestParam(req, {"account", "name", "gender", "age", "phone", "email", "role_id"}, params);
return DAO::insertUser(params);
}
Errcode HttpEntity::updateUser(const httplib::Request& req, httplib::Response& resp, NJsonNode& json)
{
Fields params;
GetRequestParam(req, {"user_id", "name", "gender", "age", "phone", "email", "role_id"}, params);
return DAO::updateUserById(params);
}
Errcode HttpEntity::deleteUser(const httplib::Request& req, httplib::Response& resp, NJsonNode& json)
{
std::string userId = req.get_param_value("user_id");
return DAO::deleteUserById(userId);
}
Errcode HttpEntity::queryPermissionList(const httplib::Request& req, httplib::Response& resp, NJsonNode& json)
{
PageInfo pageinfo;
pageinfo.index = Utils::toInt(req.get_param_value("page"));
pageinfo.size = Utils::toInt(req.get_param_value("page_size"));
std::vector<Fields> result;
auto err = DAO::queryPermissionList(pageinfo, result);
HttpHelper::setPagination(pageinfo, result, json);
return err;
}
Errcode HttpEntity::insertPermission(const httplib::Request& req, httplib::Response& resp, NJsonNode& json)
{
Fields params;
GetRequestParam(req, {"name", "describe", "is_open"}, params);
return DAO::insertPermission(params);
}
Errcode HttpEntity::updatePermission(const httplib::Request& req, httplib::Response& resp, NJsonNode& json)
{
Fields params;
GetRequestParam(req, {"permission_id", "name", "describe", "is_open"}, params);
return DAO::updatePermissionById(params);
}
Errcode HttpEntity::deletePermission(const httplib::Request& req, httplib::Response& resp, NJsonNode& json)
{
std::string permissionId = req.get_param_value("permission_id");
return DAO::deletePermissionById(permissionId);
}
Errcode HttpEntity::queryRoleList(const httplib::Request& req, httplib::Response& resp, NJsonNode& json)
{
std::string token = req.get_param_value("page");
PageInfo pageinfo;
pageinfo.index = Utils::toInt(req.get_param_value("page"));
pageinfo.size = Utils::toInt(req.get_param_value("page_size"));
std::vector<Fields> result;
auto err = DAO::queryRoleList(pageinfo, result);
// 查询所有的角色权限关联
if (err == Errcode::OK)
{
std::vector<Fields> vecPermission;
err = DAO::queryRolePermission(NULL, vecPermission);
if (err != Errcode::OK)
{
return err;
}
std::map<std::string, std::vector<NJsonNode>> mapPermission;
for (auto& item: vecPermission)
{
std::string roleId = item.value("role_id");
auto& v = mapPermission[roleId];
NJsonNode jnode;
jnode["id"] = item.value("permission_id");
jnode["name"] = item.value("permission_name");
v.push_back(jnode);
}
HttpHelper::setPagination(pageinfo, result, json);
if (json.contains("data"))
{
for (auto& item : json["data"])
{
std::string roleId = item["role_id"];
item["permission"] = mapPermission[roleId];
}
}
}
return err;
}
Errcode HttpEntity::insertRole(const httplib::Request& req, httplib::Response& resp, NJsonNode& json)
{
Fields params;
GetRequestParam(req, {"name", "describe", "is_open", "permission"}, params);
return DAO::insertRole(params);
};
Errcode HttpEntity::updateRole(const httplib::Request& req, httplib::Response& resp, NJsonNode& json)
{
Fields params;
GetRequestParam(req, {"role_id", "name", "describe", "is_open", "permission"}, params);
return DAO::updateRoleById(params);
};
Errcode HttpEntity::deleteRole(const httplib::Request& req, httplib::Response& resp, NJsonNode& json)
{
std::string roleId = req.get_param_value(DMRole::ROLE_ID);
return DAO::remove(NULL, DMRole::TABLENAME, DMRole::ROLE_ID, roleId);
};
Errcode HttpEntity::queryStationList(const httplib::Request& req, httplib::Response& resp, NJsonNode& json)
{
PageInfo pageinfo;
pageinfo.index = Utils::toInt(req.get_param_value("page"));
pageinfo.size = Utils::toInt(req.get_param_value("page_size"));
std::vector<Fields> result;
auto err = DAO::queryStationList(pageinfo, result);
HttpHelper::setPagination(pageinfo, result, json);
return err;
};
Errcode HttpEntity::insertStation(const httplib::Request& req, httplib::Response& resp, NJsonNode& json)
{
Fields params;
GetRequestParam(req, {"name", "address", "lon", "lat", "tel", "capacity", "status"}, params);
return DAO::insertStation(params);
};
Errcode HttpEntity::updateStation(const httplib::Request& req, httplib::Response& resp, NJsonNode& json)
{
Fields params;
GetRequestParam(req, {"station_id", "name", "address", "lon", "lat", "tel", "capacity", "status"}, params);
return DAO::updateStationById(params);
};
Errcode HttpEntity::deleteStation(const httplib::Request& req, httplib::Response& resp, NJsonNode& json)
{
std::string primaryKey = DMStation::STATION_ID;
return DAO::remove(NULL, DMStation::TABLENAME, primaryKey, req.get_param_value(primaryKey));
};
Errcode HttpEntity::queryDeviceList(const httplib::Request& req, httplib::Response& resp, NJsonNode& json)
{
PageInfo pageinfo;
pageinfo.index = Utils::toInt(req.get_param_value("page"));
pageinfo.size = Utils::toInt(req.get_param_value("page_size"));
std::vector<Fields> result;
auto err = DAO::queryDeviceList(pageinfo, result);
HttpHelper::setPagination(pageinfo, result, json);
return err;
};
Errcode HttpEntity::insertDevice(const httplib::Request& req, httplib::Response& resp, NJsonNode& json)
{
Fields params;
GetRequestParam(req, {"station_id", "type", "name", "code", "model", "factory", "factory_tel", "is_open", "attrs"}, params);
return DAO::insertDevice(params);
};
Errcode HttpEntity::updateDevice(const httplib::Request& req, httplib::Response& resp, NJsonNode& json)
{
Fields params;
GetRequestParam(req, {"device_id", "station_id", "type", "name", "code", "model", "factory", "factory_tel", "is_open", "attrs"}, params);
return DAO::updateDeviceById(params);
};
Errcode HttpEntity::deleteDevice(const httplib::Request& req, httplib::Response& resp, NJsonNode& json)
{
std::string primaryKey = DMDevice::DEVICE_ID;
return DAO::remove(NULL, DMDevice::TABLENAME, primaryKey, req.get_param_value(primaryKey));
};
Errcode HttpEntity::queryDevicTypeDef(const httplib::Request& req, httplib::Response& resp, NJsonNode& json)
{
std::string sql = "SELECT device_type_id, name FROM def_device_type;";
std::vector<Fields> result;
auto err = DAO::exec(NULL, sql, result);
json["data"] = FieldsToJsonArray(result);
return err;
}
Errcode HttpEntity::queryPolicyList(const httplib::Request& req, httplib::Response& resp, NJsonNode& json)
{
PageInfo pageinfo;
pageinfo.index = Utils::toInt(req.get_param_value("page"));
pageinfo.size = Utils::toInt(req.get_param_value("page_size"));
std::vector<Fields> result;
auto err = DAO::queryPolicyList(pageinfo, result);
HttpHelper::setPagination(pageinfo, result, json);
return err;
};
Errcode HttpEntity::insertPolicy(const httplib::Request& req, httplib::Response& resp, NJsonNode& json)
{
Fields params;
GetRequestParam(req, {"type", "name", "describe", "value", "is_open"}, params);
return DAO::insertPolicy(params);
};
Errcode HttpEntity::updatePolicy(const httplib::Request& req, httplib::Response& resp, NJsonNode& json)
{
Fields params;
GetRequestParam(req, {"policy_id", "type", "describe", "value", "is_open"}, params);
return DAO::updatePolicyById(params);
};
Errcode HttpEntity::deletePolicy(const httplib::Request& req, httplib::Response& resp, NJsonNode& json)
{
return DAO::deletePolicyById(req.get_param_value("prolicy_id"));
};
Errcode HttpEntity::querySystemLogList(const httplib::Request& req, httplib::Response& resp, NJsonNode& json)
{
PageInfo pageinfo;
pageinfo.index = Utils::toInt(req.get_param_value("page"));
pageinfo.size = Utils::toInt(req.get_param_value("page_size"));
std::vector<Fields> result;
auto err = DAO::querySystemLogList(pageinfo, result);
HttpHelper::setPagination(pageinfo, result, json);
return err;
}
//Errcode insertSystemLog(const httplib::Request& req, httplib::Response& resp, NJsonNode& json);
Errcode HttpEntity::updateSystemLog(const httplib::Request& req, httplib::Response& resp, NJsonNode& json)
{
Fields params;
GetRequestParam(req, {"log_id", "status"}, params);
return DAO::updateSystemLogById(params);
}
Errcode HttpEntity::queryAlertLogList(const httplib::Request& req, httplib::Response& resp, NJsonNode& json)
{
PageInfo pageinfo;
pageinfo.index = Utils::toInt(req.get_param_value("page"));
pageinfo.size = Utils::toInt(req.get_param_value("page_size"));
std::vector<Fields> result;
auto err = DAO::queryAlertLogList(pageinfo, result);
HttpHelper::setPagination(pageinfo, result, json);
return err;
}
//Errcode insertAlertLog(const httplib::Request& req, httplib::Response& resp, NJsonNode& json);
Errcode HttpEntity::updateAlertLog(const httplib::Request& req, httplib::Response& resp, NJsonNode& json)
{
Fields params;
GetRequestParam(req, {"log_id", "status"}, params);
return DAO::updateAlertLogById(params);
}
Errcode HttpEntity::queryPredictionDetail(const httplib::Request& req, httplib::Response& resp, NJsonNode& json)
{
NJsonNode jsonData = NJsonNode::array();
for (int i = 1; i<=5; i++)
{
NJsonNode jnode;
jnode["datatype"] = i;
NJsonNode jsonValues = NJsonNode::array();
for (int i = 0; i<1440; ++i)
{
jsonValues.push_back(float(Utils::random(50, 100)));
}
jnode["values"] = jsonValues;
jsonData.push_back(jnode);
}
json["data"] = jsonData;
return Errcode::OK;
}

58
src/protocol/HttpEntity.h Normal file
View File

@@ -0,0 +1,58 @@
#include "httplib.h"
#include "common/JsonN.h"
#include "errcode.h"
class HttpEntity
{
public:
httplib::Server httpsvr;
void listen(std::string addr, int port);
void registGet(std::string name, void (HttpEntity::* func)(const httplib::Request& req, httplib::Response& resp));
//void onGet(const httplib::Request& req, httplib::Response& resp);
Errcode login(const httplib::Request& req, httplib::Response& resp, NJsonNode& json);
Errcode queryUserList(const httplib::Request& req, httplib::Response& resp, NJsonNode& json);
Errcode insertUser(const httplib::Request& req, httplib::Response& resp, NJsonNode& json);
Errcode updateUser(const httplib::Request& req, httplib::Response& resp, NJsonNode& json);
Errcode deleteUser(const httplib::Request& req, httplib::Response& resp, NJsonNode& json);
Errcode queryPermissionList(const httplib::Request& req, httplib::Response& resp, NJsonNode& json);
Errcode insertPermission(const httplib::Request& req, httplib::Response& resp, NJsonNode& json);
Errcode updatePermission(const httplib::Request& req, httplib::Response& resp, NJsonNode& json);
Errcode deletePermission(const httplib::Request& req, httplib::Response& resp, NJsonNode& json);
Errcode queryRoleList(const httplib::Request& req, httplib::Response& resp, NJsonNode& json);
Errcode insertRole(const httplib::Request& req, httplib::Response& resp, NJsonNode& json);
Errcode updateRole(const httplib::Request& req, httplib::Response& resp, NJsonNode& json);
Errcode deleteRole(const httplib::Request& req, httplib::Response& resp, NJsonNode& json);
Errcode queryStationList(const httplib::Request& req, httplib::Response& resp, NJsonNode& json);
Errcode insertStation(const httplib::Request& req, httplib::Response& resp, NJsonNode& json);
Errcode updateStation(const httplib::Request& req, httplib::Response& resp, NJsonNode& json);
Errcode deleteStation(const httplib::Request& req, httplib::Response& resp, NJsonNode& json);
Errcode queryDeviceList(const httplib::Request& req, httplib::Response& resp, NJsonNode& json);
Errcode insertDevice(const httplib::Request& req, httplib::Response& resp, NJsonNode& json);
Errcode updateDevice(const httplib::Request& req, httplib::Response& resp, NJsonNode& json);
Errcode deleteDevice(const httplib::Request& req, httplib::Response& resp, NJsonNode& json);
Errcode queryDevicTypeDef(const httplib::Request& req, httplib::Response& resp, NJsonNode& json);
Errcode queryPolicyList(const httplib::Request& req, httplib::Response& resp, NJsonNode& json);
Errcode insertPolicy(const httplib::Request& req, httplib::Response& resp, NJsonNode& json);
Errcode updatePolicy(const httplib::Request& req, httplib::Response& resp, NJsonNode& json);
Errcode deletePolicy(const httplib::Request& req, httplib::Response& resp, NJsonNode& json);
Errcode querySystemLogList(const httplib::Request& req, httplib::Response& resp, NJsonNode& json);
//Errcode insertSystemLog(const httplib::Request& req, httplib::Response& resp, NJsonNode& json);
Errcode updateSystemLog(const httplib::Request& req, httplib::Response& resp, NJsonNode& json);
Errcode queryAlertLogList(const httplib::Request& req, httplib::Response& resp, NJsonNode& json);
//Errcode insertAlertLog(const httplib::Request& req, httplib::Response& resp, NJsonNode& json);
Errcode updateAlertLog(const httplib::Request& req, httplib::Response& resp, NJsonNode& json);
Errcode queryPredictionDetail(const httplib::Request& req, httplib::Response& resp, NJsonNode& json);
};

View File

@@ -15,25 +15,18 @@ static std::string ToHexText(std::string s)
return ss.str();
}
TcpEntity::TcpEntity(TcpHandler* handler)
: handler_(handler), isClient_(true)
TcpEntity::TcpEntity()
{
}
TcpEntity::~TcpEntity()
{
}
void TcpEntity::setHandler(TcpHandler* handler)
void TcpEntity::setAddr(string addr, int port, int commtype)
{
handler_ = handler;
}
void TcpEntity::setHost(string host, int port, bool isClient)
{
host_ = host;
port_ = port;
isClient_ = isClient;
this->addr = addr;
this->port = port;
this->commtype = commtype;
}
@@ -44,11 +37,8 @@ void TcpEntity::setReconnect(int ms)
int TcpEntity::start()
{
if (isAlive_)
{
return 1;
}
isAlive_ = true;
if (alive) { return 1; }
alive = true;
WSADATA wsaData;
if (WSAStartup(MAKEWORD(1, 1), &wsaData) != 0)
@@ -56,97 +46,88 @@ int TcpEntity::start()
return -1;
}
sockaddr_.sin_family = AF_INET;
sockaddr_.sin_port = htons(port_);
sockaddr_.sin_addr.S_un.S_addr = (isClient_ ? inet_addr(host_.c_str()) : htonl(INADDR_ANY));
sockaddr.sin_family = AF_INET;
sockaddr.sin_port = htons(port);
sockaddr.sin_addr.S_un.S_addr = ((commtype != 0) ? inet_addr(addr.c_str()) : htonl(INADDR_ANY));
std::thread([=]() { this->runThreadTcp(); }).detach();
std::thread([=]() {
//if (isRequestClose_) { break; }
//std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::cout << "TCP thread start ..." << std::endl;
if ((commtype != 0))
{
this->runClientLoop();
}
else
{
this->runServerLoop();
}
alive = false;
}).detach();
return 0;
}
void TcpEntity::runThreadTcp()
{
//if (isRequestClose_) { break; }
//std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::cout << "TCP thread start ..." << std::endl;
if (isClient_)
{
this->runClientLoop();
}
else
{
this->runServerLoop();
}
isAlive_ = false;
}
void TcpEntity::close()
{
isCloseRequest_ = true;
}
void TcpEntity::runServerLoop()
{
sock_ = ::socket(AF_INET, SOCK_STREAM, 0);
sock = ::socket(AF_INET, SOCK_STREAM, 0);
// 绑定套接字 【注意】functional中定义了bind与winsock2的定义发生重载导致异常这里需要使用::bind(加::)
if (::bind(sock_, (SOCKADDR*)&sockaddr_, sizeof(SOCKADDR)) == SOCKET_ERROR)
if (::bind(sock, (SOCKADDR*)&sockaddr, sizeof(SOCKADDR)) == SOCKET_ERROR)
{
std::cout << "TCP server bind [" << hostport() << "] failed." << std::endl;
std::cout << "TCP server bind [" << getAddrPort() << "] failed." << std::endl;
return;
}
// 启动监听,准备接收客户请求
if (::listen(sock_, 5) == SOCKET_ERROR)
if (::listen(sock, 5) == SOCKET_ERROR)
{
std::cout << "TCP server listen [" << hostport() << "] failed." << std::endl;
std::cout << "TCP server listen [" << getAddrPort() << "] failed." << std::endl;
return;
}
int addrlen = sizeof(SOCKADDR);
while (1)
{
if (isCloseRequest_) { break; }
if (isCloseRequest) { break; }
// 等待client连接请求
Client client;
// 等待客户请求到来
client.sock = ::accept(sock_, (SOCKADDR*)&client.sock_addr, &addrlen);
client.sock = ::accept(sock, (SOCKADDR*)&client.sockaddr, &addrlen);
if (client.sock == INVALID_SOCKET)
{
break;
}
client.host = inet_ntoa(client.sock_addr.sin_addr);
// 存储客户端的连接信息
vecClient_.push_back(client);
// client连接成功存储信息
client.host = inet_ntoa(client.sockaddr.sin_addr);
vecClient.push_back(client);
// 创建线程处理
// 创建client处理线程
std::thread th([=]() { this->runServerRecvLoop(client, client.host); });
th.detach();
}
::closesocket(sock_);
// 连接关闭
for (auto iter = vecClient_.begin(); iter != vecClient_.end(); ++iter)
// 客户端的连接关闭
for (auto iter = vecClient.begin(); iter != vecClient.end(); ++iter)
{
::closesocket(iter->sock);
vecClient_.erase(iter);
vecClient.erase(iter);
}
isCloseRequest_ = false;
// 关闭socket
::closesocket(sock);
isCloseRequest = false;
}
void TcpEntity::runServerRecvLoop(Client client, std::string client_name)
{
std::vector<char> buf(1024000, 0);
std::vector<char> buf(10240, 0);
while (1)
{
if (isCloseRequest_ || !isAlive_)
{
break;
}
memset(buf.data(), 0, buf.size());
if (isCloseRequest || !alive) { break; }
// 接收数据
memset(buf.data(), 0, buf.size());
int n = ::recv(client.sock, &buf[0], buf.size(), 0);
// 需要判断 errno是否等于 EINTR 。如果errno == EINTR 则说明recv函数是由于程序接收到信号后返回的socket连接还是正常的
if (n <= 0 && GetLastError() != EINTR)
@@ -159,11 +140,11 @@ void TcpEntity::runServerRecvLoop(Client client, std::string client_name)
}
}
// 连接关闭
for (auto iter = vecClient_.begin(); iter != vecClient_.end(); ++iter)
for (auto iter = vecClient.begin(); iter != vecClient.end(); ++iter)
{
if (iter->sock == client.sock)
{
vecClient_.erase(iter);
vecClient.erase(iter);
break;
}
}
@@ -172,31 +153,31 @@ void TcpEntity::runServerRecvLoop(Client client, std::string client_name)
void TcpEntity::runClientLoop()
{
// 数据缓存
std::vector<char> buf(1024000, 0);
std::vector<char> buf(10240, 0);
while (1)
{
if (isCloseRequest_) { break; }
if (isCloseRequest) { break; }
//创建套接字,向服务器发出连接请求
sock_ = ::socket(AF_INET, SOCK_STREAM, 0);
if (::connect(sock_, (SOCKADDR*)&sockaddr_, sizeof(SOCKADDR)) != SOCKET_ERROR)
sock = ::socket(AF_INET, SOCK_STREAM, 0);
if (::connect(sock, (SOCKADDR*)&sockaddr, sizeof(SOCKADDR)) != SOCKET_ERROR)
{
isConnected_ = true;
std::cout << "TCP client connect to [" << hostport() << "] success." << std::endl;
isConnected = true;
std::cout << "TCP client connect to [" << getAddrPort() << "] success." << std::endl;
// 连接服务器成功,循环等待接受消息
while (1)
{
if (isCloseRequest_) { break; }
if (isCloseRequest) { break; }
memset(buf.data(), 0, buf.size());
int n = ::recv(sock_, buf.data(), buf.size(), 0);
int n = ::recv(sock, buf.data(), buf.size(), 0);
if (n <= 0 && GetLastError() != EINTR)
{
// TCP通讯异常 关闭连接
::closesocket(sock_);
isConnected_ = false;
::closesocket(sock);
isConnected = false;
break;
}
else
@@ -207,75 +188,55 @@ void TcpEntity::runClientLoop()
}
else
{
isConnected_ = false;
std::cout << "TCP client connect to [" << hostport() << "] failed." << std::endl;
isConnected = false;
std::cout << "TCP client connect to [" << getAddrPort() << "] failed." << std::endl;
}
// 连接异常
if (tReconnect_ > 0)
{
// 重新连接
std::cout << "TCP client [" << hostport() << "] reconnect (" << tReconnect_ << ")." << std::endl;
std::cout << "TCP client [" << getAddrPort() << "] reconnect (" << tReconnect_ << ")." << std::endl;
//std::this_thread::sleep_for(std::chrono::microseconds(tReconnect_));
Sleep(tReconnect_);
}
else
{
// 关闭线程
std::cout << "TCP client [" << hostport() << "] close." << std::endl;
std::cout << "TCP client [" << getAddrPort() << "] close." << std::endl;
break;
}
}
if (sock_ != INVALID_SOCKET)
if (sock != INVALID_SOCKET)
{
::closesocket(sock_);
sock_ = INVALID_SOCKET;
isConnected_ = false;
::closesocket(sock);
sock = INVALID_SOCKET;
isConnected = false;
}
if (isCloseRequest_)
if (isCloseRequest)
{
}
isCloseRequest_ = false;
isCloseRequest = false;
}
bool TcpEntity::sendData(std::string data, std::string clientId)
bool TcpEntity::write(std::string data)
{
if (isClient_)
if (commtype == 0)
{
// #客户
if (sock_ == INVALID_SOCKET)
// #服务
if (vecClient.size() <= 0) { return false; }
for (auto& client : vecClient)
{
//Spdlogger::error("TCP client send data failed, connect error, invalid socket, device: {}:{}.", this->type_, client_code);
return false;
std::string clientAddr = inet_ntoa(client.sockaddr.sin_addr);
::send(client.sock, data.c_str(), data.size(), 0);
}
int len = ::send(sock_, data.c_str(), data.size(), 0);
//Spdlogger::info("TCP client send data success, data length={}, device: {}:{}.", len, this->type_, client_code);
return (len > 0);
return true;
}
else
{
if (vecClient_.size() <= 0)
{
return false;
}
for (auto& client : vecClient_)
{
std::string client_addr = inet_ntoa(client.sock_addr.sin_addr);
::send(client.sock, data.c_str(), data.size(), 0);
}
return true;
// #客户端
if (sock == INVALID_SOCKET) { return false; }
int len = ::send(sock, data.c_str(), data.size(), 0);
return (len > 0);
}
}
bool TcpEntity::isAlive()
{
return isAlive_;
}
bool TcpEntity::isConnected()
{
return isConnected_;
}

View File

@@ -11,6 +11,12 @@
using namespace std;
enum class ETcpType
{
SERVER = 0,
CLIENT = 1,
};
enum class ETcpEvent
{
NUL = 0, //
@@ -32,80 +38,54 @@ class TcpEntity : public CommEntity, public std::enable_shared_from_this<TcpEnti
public:
struct Client
{
std::string clientId;
SOCKET sock;
SOCKADDR_IN sock_addr;
SOCKADDR_IN sockaddr;
std::string host;
std::shared_ptr<TcpParser> parser = nullptr;
};
public:
// 初始化服务端
TcpEntity(TcpHandler* handler = nullptr);
TcpEntity();
~TcpEntity();
int start() override;
void close() override;
void runThreadTcp();
void setHost(string host, int port, bool is_client);
std::string host() { return host_; }
int port() { return port_; }
std::string hostport() { return host_ + ":" + std::to_string(port_); }
void setAddr(string host, int port, int commtype);
std::string getAddr() { return addr; }
std::string getAddrPort() { return addr + ":" + std::to_string(port); }
int getPort() { return port; }
void setReconnect(int ms);
bool isClient() { return isClient_; }
void setHandler(TcpHandler* handler);
bool sendData(std::string data, std::string clientId="");
bool isAlive();
bool isConnected();
std::shared_ptr<TcpParser> parser = nullptr;
bool write(std::string data);
private:
void runServerLoop();
void runServerRecvLoop(Client client, std::string client_name);
void runClientLoop();
private:
// 本机的SOCKET对象
SOCKET sock_ = INVALID_SOCKET;
SOCKET sock = INVALID_SOCKET;
// socket addr信息
SOCKADDR_IN sockaddr_;
// TCP类型是否是客户端 true: 客户端, false: 服务端
bool isClient_ = true;
SOCKADDR_IN sockaddr {};
// 通讯地址,作为客户端时有效
std::string host_;
std::string addr;
// 通讯端口
int port_ = 0;
int port = 0;
// 重连间隔时间,单位秒
int tReconnect_ = 0;
// 作为服务端时连接的客户端SOCKET
std::vector<Client> vecClient_;
// 回调处理对象
TcpHandler* handler_ = nullptr;
bool isAlive_ = false;
bool isCloseRequest_ = false;
bool isConnected_ = false;
std::vector<Client> vecClient;
// 状态更新时间戳
int64_t ts_;
int64_t ts_ = 0;
int64_t tsHeartbeat_=0;
int64_t tsHeartbeat_ = 0;
};
class TcpHandler