#include "TcpEntity.h" #include "common/Utils.h" #include #include #include static std::string ToHexText(std::string s) { std::stringstream ss; for (unsigned int i = 0; i < s.size(); ++i) { unsigned char ch = s[i]; ss << std::setw(2) << std::setfill('0') << std::hex << std::uppercase << (int)ch << ((i < s.size() - 1) ? " " : ""); } return ss.str(); } TcpEntity::TcpEntity(TcpHandler* handler) : handler_(handler), isClient_(true) { } TcpEntity::~TcpEntity() { } void TcpEntity::setHandler(TcpHandler* handler) { handler_ = handler; } void TcpEntity::setHost(string host, int port, bool isClient) { host_ = host; port_ = port; isClient_ = isClient; } void TcpEntity::setReconnect(int ms) { tReconnect_ = ms; } int TcpEntity::start() { if (isAlive_) { return 1; } isAlive_ = true; WSADATA wsaData; if (WSAStartup(MAKEWORD(1, 1), &wsaData) != 0) { return -1; } sockaddr_.sin_family = AF_INET; sockaddr_.sin_port = htons(port_); sockaddr_.sin_addr.S_un.S_addr = (isClient_ ? inet_addr(host_.c_str()) : htonl(INADDR_ANY)); std::thread([=]() { this->runThreadTcp(); }).detach(); return 0; } void TcpEntity::runThreadTcp() { //if (isRequestClose_) { break; } //std::this_thread::sleep_for(std::chrono::milliseconds(1000)); std::cout << "TCP thread start ..." << std::endl; if (isClient_) { this->runClientLoop(); } else { this->runServerLoop(); } isAlive_ = false; } void TcpEntity::close() { isCloseRequest_ = true; } void TcpEntity::runServerLoop() { sock_ = ::socket(AF_INET, SOCK_STREAM, 0); // 绑定套接字 【注意】functional中定义了bind与winsock2的定义发生重载导致异常,这里需要使用::bind(加::) if (::bind(sock_, (SOCKADDR*)&sockaddr_, sizeof(SOCKADDR)) == SOCKET_ERROR) { std::cout << "TCP server bind [" << hostport() << "] failed." << std::endl; return; } // 启动监听,准备接收客户请求 if (::listen(sock_, 5) == SOCKET_ERROR) { std::cout << "TCP server listen [" << hostport() << "] failed." << std::endl; return; } int addrlen = sizeof(SOCKADDR); while (1) { if (isCloseRequest_) { break; } Client client; // 等待客户请求到来 client.sock = ::accept(sock_, (SOCKADDR*)&client.sock_addr, &addrlen); if (client.sock == INVALID_SOCKET) { break; } client.host = inet_ntoa(client.sock_addr.sin_addr); // 存储客户端的连接信息 vecClient_.push_back(client); // 创建线程处理 std::thread th([=]() { this->runServerRecvLoop(client, client.host); }); th.detach(); } ::closesocket(sock_); // 连接关闭 for (auto iter = vecClient_.begin(); iter != vecClient_.end(); ++iter) { ::closesocket(iter->sock); vecClient_.erase(iter); } isCloseRequest_ = false; } void TcpEntity::runServerRecvLoop(Client client, std::string client_name) { std::vector buf(1024000, 0); while (1) { if (isCloseRequest_ || !isAlive_) { break; } memset(buf.data(), 0, buf.size()); // 接收数据 int n = ::recv(client.sock, &buf[0], buf.size(), 0); // 需要判断 errno是否等于 EINTR 。如果errno == EINTR 则说明recv函数是由于程序接收到信号后返回的,socket连接还是正常的 if (n <= 0 && GetLastError() != EINTR) { // 客户端连接异常(关闭) break; } else { } } // 连接关闭 for (auto iter = vecClient_.begin(); iter != vecClient_.end(); ++iter) { if (iter->sock == client.sock) { vecClient_.erase(iter); break; } } } void TcpEntity::runClientLoop() { // 数据缓存 std::vector buf(1024000, 0); while (1) { if (isCloseRequest_) { break; } //创建套接字,向服务器发出连接请求 sock_ = ::socket(AF_INET, SOCK_STREAM, 0); if (::connect(sock_, (SOCKADDR*)&sockaddr_, sizeof(SOCKADDR)) != SOCKET_ERROR) { isConnected_ = true; std::cout << "TCP client connect to [" << hostport() << "] success." << std::endl; // 连接服务器成功,循环等待接受消息 while (1) { if (isCloseRequest_) { break; } memset(buf.data(), 0, buf.size()); int n = ::recv(sock_, buf.data(), buf.size(), 0); if (n <= 0 && GetLastError() != EINTR) { // TCP通讯异常, 关闭连接 ::closesocket(sock_); isConnected_ = false; break; } else { // 处理消息 } } } else { isConnected_ = false; std::cout << "TCP client connect to [" << hostport() << "] failed." << std::endl; } // 连接异常 if (tReconnect_ > 0) { // 重新连接 std::cout << "TCP client [" << hostport() << "] reconnect (" << tReconnect_ << ")." << std::endl; //std::this_thread::sleep_for(std::chrono::microseconds(tReconnect_)); Sleep(tReconnect_); } else { // 关闭线程 std::cout << "TCP client [" << hostport() << "] close." << std::endl; break; } } if (sock_ != INVALID_SOCKET) { ::closesocket(sock_); sock_ = INVALID_SOCKET; isConnected_ = false; } if (isCloseRequest_) { } isCloseRequest_ = false; } bool TcpEntity::sendData(std::string data, std::string clientId) { if (isClient_) { // #客户端 if (sock_ == INVALID_SOCKET) { //Spdlogger::error("TCP client send data failed, connect error, invalid socket, device: {}:{}.", this->type_, client_code); return false; } int len = ::send(sock_, data.c_str(), data.size(), 0); //Spdlogger::info("TCP client send data success, data length={}, device: {}:{}.", len, this->type_, client_code); return (len > 0); } else { if (vecClient_.size() <= 0) { return false; } for (auto& client : vecClient_) { std::string client_addr = inet_ntoa(client.sock_addr.sin_addr); ::send(client.sock, data.c_str(), data.size(), 0); } return true; } } bool TcpEntity::isAlive() { return isAlive_; } bool TcpEntity::isConnected() { return isConnected_; }