同步服务端也是相当简单的。它只需要两个线程,一个负责监听新的客户端连接,另外一个负责处理已经存在的客户端请求。它不能使用单线程,因为等待新的客户端连接是一个阻塞操作(因为accept()是阻塞的),因此我们需要另外一个线程来处理已经存在的客户端请求。
基于tcp的同步服务端
1.流程图
2.实现
#ifdef win32
#define _win32_winnt 0x0501
#include
#endif
#include
#include
#include
#include
#include
using namespace boost::asio;
using namespace boost::posix_time;
io_service service;
struct talk_to_client;
typedef boost::shared_ptr client_ptr;
typedef std::vector array;
array clients;
// thread-safe access to clients array
boost::recursive_mutex cs;
void update_clients_changed();
/** simple connection to server:
- logs in just with username (no password)
- all connections are initiated by the client: client asks, server answers
- server disconnects any client that hasn't pinged for 5 seconds
possible requests:
- gets a list of all connected clients
- ping: the server answers either with "ping ok" or "ping client_list_changed"
*/
struct talk_to_client : boost::enable_shared_from_this {
talk_to_client()
: sock_(service), started_(false), already_read_(0) {
last_ping = microsec_clock::local_time();
}
std::string username() const { return username_; }
void answer_to_client() {
try {
read_request();
process_request();
}
catch (boost::system::system_error&) {
stop();
}
if (timed_out()) {
stop();
std::cout << "stopping " << username_ << " - no ping in time" << std::endl;
}
}
void set_clients_changed() { clients_changed_ = true; }
ip::tcp::socket & sock() { return sock_; }
bool timed_out() const {
ptime now = microsec_clock::local_time();
long long ms = (now - last_ping).total_milliseconds();
return ms > 5000;
}
void stop() {
// close client connection
boost::system::error_code err;
sock_.close(err);
}
private:
void read_request() {
if (sock_.available())
already_read_ = sock_.read_some(
buffer(buff_ already_read_, max_msg - already_read_));
}
void process_request() {
bool found_enter = std::find(buff_, buff_ already_read_, '\n')
< buff_ already_read_;
if (!found_enter)
return; // message is not full
// process the msg
last_ping = microsec_clock::local_time();
size_t pos = std::find(buff_, buff_ already_read_, '\n') - buff_;
std::string msg(buff_, pos);
std::copy(buff_ already_read_, buff_ max_msg, buff_);
already_read_ -= pos 1;
if (msg.find("login ") == 0) on_login(msg);
else if (msg.find("ping") == 0) on_ping();
else if (msg.find("ask_clients") == 0) on_clients();
else std::cerr << "invalid msg " << msg << std::endl;
}
void on_login(const std::string & msg) {
std::istringstream in(msg);
in >> username_ >> username_;
std::cout << username_ << " logged in" << std::endl;
write("login ok\n");
update_clients_changed();
}
void on_ping() {
write(clients_changed_ ? "ping client_list_changed\n" : "ping ok\n");
clients_changed_ = false;
}
void on_clients() {
std::string msg;
{ boost::recursive_mutex::scoped_lock lk(cs);
for (array::const_iterator b = clients.begin(), e = clients.end(); b != e; b)
msg = (*b)->username() " ";
}
write("clients " msg "\n");
}
void write(const std::string & msg) {
sock_.write_some(buffer(msg));
}
private:
ip::tcp::socket sock_;
enum { max_msg = 1024 };
int already_read_;
char buff_[max_msg];
bool started_;
std::string username_;
bool clients_changed_;
ptime last_ping;
};
void update_clients_changed() {
boost::recursive_mutex::scoped_lock lk(cs);
for (array::iterator b = clients.begin(), e = clients.end(); b != e; b)
(*b)->set_clients_changed();
}
void accept_thread() {
ip::tcp::acceptor acceptor(service, ip::tcp::endpoint(ip::tcp::v4(), 8001));
while (true) {
client_ptr new_(new talk_to_client);
acceptor.accept(new_->sock());
boost::recursive_mutex::scoped_lock lk(cs);
clients.push_back(new_);
}
}
void handle_clients_thread() {
while (true) {
boost::this_thread::sleep(millisec(1));
boost::recursive_mutex::scoped_lock lk(cs);
for (array::iterator b = clients.begin(), e = clients.end(); b != e; b)
(*b)->answer_to_client();
// erase clients that timed out
clients.erase(std::remove_if(clients.begin(), clients.end(),
boost::bind(&talk_to_client::timed_out, _1)), clients.end());
}
}
int main(int argc, char* argv[]) {
boost::thread_group threads;
threads.create_thread(accept_thread);
threads.create_thread(handle_clients_thread);
threads.join_all();
}
在accept_thread中会循环接受客户端的链接,因为clients容器中的元素在两个线程中都要访问,所以需要加锁进行同步。
在handle_clients_thread线程中会处理和各客户端的消息会话,并且把掉线的客户端从clients容器中删除。这里用到了std::remove_if,它通常配合std::vector::erase使用。std::remove_if定义于头文件
template
forwarditerator remove_if (forwarditerator first, forwarditerator last,unarypredicate pred);
函数remove_if()移除序列[first, last)中所有应用于谓词predict返回true的元素。
remove_if()并不会实际移除序列[first, last)中的元素;如果在一个容器上应用remove_if(), 容器的长度并不会改变(remove_if()不可能仅通过迭代器改变容器的属性), 所有的元素都还在容器里面。实际做法是, remove_if()将所有应该移除的元素都移动到了容器尾部并返回一个分界的迭代器, 移除的所有元素仍然可以通过返回的迭代器访问到。为了实际移除元素, 你必须对容器自行调用erase()以擦除需要移除的元素。
下面是std::remove_if的一个例子:
#include
#include
bool isodd(int i) { return ((i % 2) == 1); }
int main() {
int myints[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
int* pbegin = myints;
int* pend = myints sizeof(myints) / sizeof(int);
pend = std::remove_if(pbegin, pend, isodd); // 将符合要求的元素都移动到尾部
// ^ ^
std::cout << "the range contains:"; // 输出:the range contains: 2 4 6 8
for (int* p = pbegin; p != pend; p)
std::cout << ' ' << *p;
std::cout << '\n';
system("pause");
return 0;
}