#include "ofxTCPServer.h"
#include "ofxTCPClient.h"
#include "ofUtils.h"
#include "ofLog.h"
ofxTCPServer::ofxTCPServer(){
connected = false;
idCount = 0;
port = 0;
str = "";
messageDelimiter = "[/TCP]";
bClientBlocking = false;
}
ofxTCPServer::~ofxTCPServer(){
close();
}
void ofxTCPServer::setVerbose(bool _verbose){
ofLogWarning("ofxTCPServer") << "setVerbose(): is deprecated, replaced by ofLogWarning and ofLogError";
}
bool ofxTCPServer::setup(int _port, bool blocking){
ofxTCPSettings settings(_port);
settings.blocking = blocking;
return setup(settings);
}
bool ofxTCPServer::setup(const ofxTCPSettings &settings){
if( !TCPServer.Create() ){
ofLogError("ofxTCPServer") << "setup(): couldn't create server";
return false;
}
if( !TCPServer.Bind(settings.port, settings.reuse) ){
ofLogError("ofxTCPServer") << "setup(): couldn't bind to port " << settings.port;
return false;
}
connected = true;
port = settings.port;
bClientBlocking = settings.blocking;
setMessageDelimiter(settings.messageDelimiter);
std::unique_lock<std::mutex> lck(mConnectionsLock);
startThread();
serverReady.wait(lck);
return true;
}
void ofxTCPServer::setMessageDelimiter(std::string delim){
if(delim != ""){
messageDelimiter = delim;
}
}
bool ofxTCPServer::close(){
stopThread();
if( !TCPServer.Close() ){
ofLogWarning("ofxTCPServer") << "close(): couldn't close connections";
waitForThread(false);
return false;
}else{
ofLogVerbose("ofxTCPServer") << "Closing server";
waitForThread(false);
return true;
}
}
ofxTCPClient & ofxTCPServer::getClient(int clientID){
return *TCPConnections.find(clientID)->second;
}
bool ofxTCPServer::disconnectClient(int clientID){
std::unique_lock<std::mutex> lck( mConnectionsLock );
if( !isClientSetup(clientID) ){
ofLogWarning("ofxTCPServer") << "disconnectClient(): client " << clientID << " doesn't exist";
return false;
}else if(getClient(clientID).close()){
TCPConnections.erase(clientID);
return true;
}
return false;
}
bool ofxTCPServer::disconnectAllClients(){
std::unique_lock<std::mutex> lck( mConnectionsLock );
TCPConnections.clear();
return true;
}
bool ofxTCPServer::send(int clientID, std::string message){
std::unique_lock<std::mutex> lck( mConnectionsLock );
if( !isClientSetup(clientID) ){
ofLogWarning("ofxTCPServer") << "send(): client " << clientID << " doesn't exist";
return false;
}else{
auto ret = getClient(clientID).send(message);
if(!getClient(clientID).isConnected()) TCPConnections.erase(clientID);
return ret;
}
}
bool ofxTCPServer::sendToAll(std::string message){
std::unique_lock<std::mutex> lck( mConnectionsLock );
if(TCPConnections.size() == 0) return false;
std::vector<int> disconnect;
for(auto & conn: TCPConnections){
if(conn.second->isConnected()) conn.second->send(message);
else disconnect.push_back(conn.first);
}
for(int i=0; i<(int)disconnect.size(); i++){
TCPConnections.erase(disconnect[i]);
}
return true;
}
std::string ofxTCPServer::receive(int clientID){
std::unique_lock<std::mutex> lck( mConnectionsLock );
if( !isClientSetup(clientID) ){
ofLogWarning("ofxTCPServer") << "receive(): client " << clientID << " doesn't exist";
return "client " + ofToString(clientID) + "doesn't exist";
}
if( !getClient(clientID).isConnected() ){
TCPConnections.erase(clientID);
return "";
}
return getClient(clientID).receive();
}
bool ofxTCPServer::sendRawBytes(int clientID, const char * rawBytes, const int numBytes){
std::unique_lock<std::mutex> lck( mConnectionsLock );
if( !isClientSetup(clientID) ){
ofLogWarning("ofxTCPServer") << "sendRawBytes(): client " << clientID << " doesn't exist";
return false;
}
else{
return getClient(clientID).sendRawBytes(rawBytes, numBytes);
}
}
bool ofxTCPServer::sendRawBytesToAll(const char * rawBytes, const int numBytes){
std::unique_lock<std::mutex> lck( mConnectionsLock );
if(TCPConnections.size() == 0 || numBytes <= 0) return false;
for(auto & conn: TCPConnections){
if(conn.second->isConnected()){
conn.second->sendRawBytes(rawBytes, numBytes);
}
}
return true;
}
bool ofxTCPServer::sendRawMsg(int clientID, const char * rawBytes, const int numBytes){
std::unique_lock<std::mutex> lck( mConnectionsLock );
if( !isClientSetup(clientID) ){
ofLogWarning("ofxTCPServer") << "sendRawMsg(): client " << clientID << " doesn't exist";
return false;
}
else{
return getClient(clientID).sendRawMsg(rawBytes, numBytes);
}
}
bool ofxTCPServer::sendRawMsgToAll(const char * rawBytes, const int numBytes){
std::unique_lock<std::mutex> lck( mConnectionsLock );
if(TCPConnections.empty() || numBytes <= 0) return false;
for(auto & conn: TCPConnections){
if(conn.second->isConnected()){
conn.second->sendRawMsg(rawBytes, numBytes);
}
}
return true;
}
int ofxTCPServer::getNumReceivedBytes(int clientID){
std::unique_lock<std::mutex> lck( mConnectionsLock );
if( !isClientSetup(clientID) ){
ofLogWarning("ofxTCPServer") << "getNumReceivedBytes(): client " << clientID << " doesn't exist";
return 0;
}
return getClient(clientID).getNumReceivedBytes();
}
int ofxTCPServer::receiveRawBytes(int clientID, char * receiveBytes, int numBytes){
std::unique_lock<std::mutex> lck( mConnectionsLock );
if( !isClientSetup(clientID) ){
ofLogWarning("ofxTCPServer") << "receiveRawBytes(): client " << clientID << " doesn't exist";
return 0;
}
return getClient(clientID).receiveRawBytes(receiveBytes, numBytes);
}
int ofxTCPServer::peekReceiveRawBytes(int clientID, char * receiveBytes, int numBytes){
std::unique_lock<std::mutex> lck( mConnectionsLock );
if( !isClientSetup(clientID) ){
ofLog(OF_LOG_WARNING, "ofxTCPServer: client " + ofToString(clientID) + " doesn't exist");
return 0;
}
return getClient(clientID).peekReceiveRawBytes(receiveBytes, numBytes);
}
int ofxTCPServer::receiveRawMsg(int clientID, char * receiveBytes, int numBytes){
std::unique_lock<std::mutex> lck( mConnectionsLock );
if( !isClientSetup(clientID) ){
ofLogWarning("ofxTCPServer") << "receiveRawMsg(): client " << clientID << " doesn't exist";
return 0;
}
return getClient(clientID).receiveRawMsg(receiveBytes, numBytes);
}
int ofxTCPServer::getClientPort(int clientID){
std::unique_lock<std::mutex> lck( mConnectionsLock );
if( !isClientSetup(clientID) ){
ofLogWarning("ofxTCPServer") << "getClientPort(): client " << clientID << " doesn't exist";
return 0;
}
else return getClient(clientID).getPort();
}
std::string ofxTCPServer::getClientIP(int clientID){
std::unique_lock<std::mutex> lck( mConnectionsLock );
if( !isClientSetup(clientID) ){
ofLogWarning("ofxTCPServer") << "getClientIP(): client " << clientID << " doesn't exist";
return "000.000.000.000";
}
else return getClient(clientID).getIP();
}
int ofxTCPServer::getNumClients(){
return TCPConnections.size();
}
int ofxTCPServer::getLastID(){
return idCount;
}
int ofxTCPServer::getPort(){
return port;
}
bool ofxTCPServer::isConnected(){
return connected;
}
bool ofxTCPServer::isClientSetup(int clientID){
return TCPConnections.find(clientID)!=TCPConnections.end();
}
bool ofxTCPServer::isClientConnected(int clientID){
std::unique_lock<std::mutex> lck( mConnectionsLock );
return isClientSetup(clientID) && getClient(clientID).isConnected();
}
void ofxTCPServer::waitConnectedClient(){
std::unique_lock<std::mutex> lck( mConnectionsLock );
if(TCPConnections.empty()){
serverReady.wait(lck);
}
}
void ofxTCPServer::waitConnectedClient(int ms){
std::unique_lock<std::mutex> lck( mConnectionsLock );
if(TCPConnections.empty()){
serverReady.wait_for(lck, std::chrono::milliseconds(ms));
}
}
void ofxTCPServer::threadedFunction(){
ofLogVerbose("ofxTCPServer") << "listening thread started";
while( isThreadRunning() ){
int acceptId;
for(acceptId = 0; acceptId <= idCount; acceptId++){
if(!isClientConnected(acceptId)) break;
}
if(acceptId == TCP_MAX_CLIENTS){
ofLogWarning("ofxTCPServer") << "no longer accepting connections, maximum number of clients reached: " << TCP_MAX_CLIENTS;
break;
}
if( !TCPServer.Listen(TCP_MAX_CLIENTS) ){
if(isThreadRunning()) ofLogError("ofxTCPServer") << "listening failed";
}
{
std::unique_lock<std::mutex> lck( mConnectionsLock );
serverReady.notify_one();
}
std::shared_ptr<ofxTCPClient> client(new ofxTCPClient);
if( !TCPServer.Accept( client->TCPClient ) ){
if(isThreadRunning()) ofLogError("ofxTCPServer") << "couldn't accept client " << acceptId;
}else{
std::unique_lock<std::mutex> lck( mConnectionsLock );
TCPConnections[acceptId] = client;
TCPConnections[acceptId]->setupConnectionIdx(acceptId, bClientBlocking);
TCPConnections[acceptId]->setMessageDelimiter(messageDelimiter);
ofLogVerbose("ofxTCPServer") << "client " << acceptId << " connected on port " << TCPConnections[acceptId]->getPort();
if(acceptId == idCount) idCount++;
serverReady.notify_all();
}
}
idCount = 0;
std::unique_lock<std::mutex> lck( mConnectionsLock );
TCPConnections.clear();
connected = false;
ofLogVerbose("ofxTCPServer") << "listening thread stopped";
}
Comments