From fbc7a11b85845c995cae409615db790b680af6c4 Mon Sep 17 00:00:00 2001 From: Joon Park Date: Tue, 3 Mar 2020 06:16:26 -0500 Subject: [PATCH] . --- Source/Server_net/connection.h | 146 ++++++++----------------------- Source/Server_net/core.cpp | 6 +- Source/Server_net/mmo_server.cpp | 19 ++-- Source/Server_net/network.cpp | 57 +++++++----- Source/Server_net/network.h | 121 ++++++++++++++++--------- 5 files changed, 168 insertions(+), 181 deletions(-) diff --git a/Source/Server_net/connection.h b/Source/Server_net/connection.h index 966e6a1..13a2f30 100644 --- a/Source/Server_net/connection.h +++ b/Source/Server_net/connection.h @@ -4,19 +4,19 @@ namespace server { namespace core { - template - struct async_read_frame + template + struct async_recv_frame_impl { - socket_type& socket; - buffer_type buffer; + socket_t& socket; + buffer_t buffer; asio::error_code error_code; - async_read_frame(socket_type& socket, buffer_type&& buffer) : socket(socket), buffer(std::move(buffer)) + async_recv_frame_impl(socket_t& socket, buffer_t&& buffer) : socket(socket), buffer(std::move(buffer)) { this->buffer; } - ~async_read_frame() + ~async_recv_frame_impl() { this->buffer; } @@ -52,6 +52,12 @@ namespace server void await_suspend(std::experimental::coroutine_handle<> coro) { + socket.async_read_some(buffer, [this, coro](auto error_code, auto bytes_read) + { + this->error_code = error_code; + coro.resume(); + }); + /* asio::co_spawn(io_context, [this]() @@ -59,27 +65,23 @@ namespace server return async_read(); }, asio::detached); + co_await socket.async_receive(buffer, [this, coro](auto error_code, auto bytes_read) { coro.resume(); }); - */ - /* + asio::async_read_until(socket, buffer, "\n", [this, coro](auto error_code, auto bytes_read) { this->error_code = error_code; coro.resume(); }); - */ - /* - asio::async_read(socket, buffer, [this, coro](auto error_code, auto bytes_read) { - this->error_code = error_code; - coro.resume(); - }); - */ - socket.async_read_some(buffer, [this, coro](auto error_code, auto bytes_read) { + + asio::async_read(socket, buffer, [this, coro](auto error_code, auto bytes_read) { this->error_code = error_code; coro.resume(); }); + */ + //socket.async_receive(buffer, [this, coro](auto error_code) { this->error_code = error_code; coro.resume(); }); } @@ -95,28 +97,29 @@ namespace server struct tcp_connection { - asio::ip::tcp::socket socket; - std::vector rdata; - std::vector wdata; - int id; asio::ip::address address; uint16 port; bool marked_for_delete; + asio::ip::tcp::socket socket; + std::vector rdata; + std::vector wdata; + tcp_connection(asio::ip::tcp::socket&& new_socket) : - socket(std::move(new_socket)), id(-1), - address(socket.remote_endpoint().address()), - port(socket.remote_endpoint().port()) + address(new_socket.remote_endpoint().address()), + port(new_socket.remote_endpoint().port()), + marked_for_delete(false), + socket(std::move(new_socket)) { rdata.resize(12); //rdata.reserve(12); } - void close() + void shutdown_and_close() { asio::error_code error_code; socket.shutdown(asio::ip::tcp::socket::shutdown_both, error_code); @@ -130,108 +133,29 @@ namespace server marked_for_delete = true; } - //* - template - auto async_recv_helper2(socket_type& socket, buffer_type&& buffer) + template + auto async_recv_frame(socket_t& socket, buffer_t buffer) { - return async_read_frame(socket, std::forward(buffer)); + return async_recv_frame_impl(socket, std::forward(buffer)); } - template - auto async_recv_helper(socket_type& socket, buffer_type buffer) -> std::future + std::future begin_async_recv() { try { while (socket.is_open()) { - co_await async_recv_helper2(socket, std::forward(buffer)); + co_await async_recv_frame(socket, asio::buffer(rdata, 5)); } } - catch (const std::exception & /*exception*/) + catch (const std::exception& /*exception*/) { - printf("connection %d closed.", id); + shutdown_and_close(); + //printf("connection %d marked for delete.\n", id); //printf("exception: %s", exception.what()); } } - - auto async_recv() - { - auto buffer = asio::buffer(rdata, 5); - async_recv_helper(socket, asio::buffer(rdata, 5)); - } -#if 0 - //*/ - asio::awaitable async_read() - { - try - { - char data[1024]; - //for (rdata;;) - for (;;) - { - //std::size_t n; - //socket.async_receive(asio::buffer(rdata, 12), [this, &n](auto error_code, auto bytes_read) { n = bytes_read; }); - std::size_t n = co_await socket.async_read_some(asio::buffer(rdata, 12), asio::use_awaitable); - - //std::size_t n = co_await asio::async_read_until(socket, asio::dynamic_buffer(rdata, 12), "\n", asio::use_awaitable); - - printf("%s\n", /*this_thread::get_debug_name().c_str(),*/ rdata.data()); - - rdata.erase(rdata.begin(), rdata.begin() + n); - } - } - catch (std::exception & e) - { - printf("exception: %s", e.what()); - } - } -#endif - }; - - static_assert(std::is_nothrow_move_constructible::value); - - struct connection_pool - { - std::vector> connections; - bool allow_overflow; - - void init(uint32 max_connection_count, bool allow_overflow = false) - { - //connections.reserve(max_connection_count); - this->allow_overflow = allow_overflow; - } - - tcp_connection* allocate(asio::ip::tcp::socket&& socket) - { - tcp_connection* test = nullptr; - tcp_connection* taken = test + 1; - - int id = 0; - for (auto& i : connections) - { - bool exchanged = i.compare_exchange_weak(test, taken); - - if (exchanged) - { - //auto new_connection = new tcp_connection(std::move(socket)); - //new_connection->id = id; - //i = new_connection; - //return new_connection; - } - - id++; - } - - return nullptr; - } - - void release(int id) - { - //auto connection = connections[id].load(); - //delete connection; - //connections[id].store(nullptr); - } }; } } \ No newline at end of file diff --git a/Source/Server_net/core.cpp b/Source/Server_net/core.cpp index 72d2c0a..a372e7e 100644 --- a/Source/Server_net/core.cpp +++ b/Source/Server_net/core.cpp @@ -39,11 +39,11 @@ namespace server try { - while (true) + while (network_services_online) { Sleep(100); } - io_context.run(); + //io_context.run(); } catch (std::exception & e) @@ -51,6 +51,8 @@ namespace server printf("exception: %s", e.what()); //std::cerr << "Exception: " << e.what() << "\n"; } + + printf("server main_loop complete.\n"); } diff --git a/Source/Server_net/mmo_server.cpp b/Source/Server_net/mmo_server.cpp index 160fb96..3d49fd6 100644 --- a/Source/Server_net/mmo_server.cpp +++ b/Source/Server_net/mmo_server.cpp @@ -36,19 +36,25 @@ namespace mmo_server core::init_signals(signal_handler); //core::init_gui(); - unsigned short port = 9006; - auto accept_handler = [&port]() + auto accept_handler = [](core::tcp_connection* new_connection) { - auto newport = port + 1; - printf("accepted\n"); + printf("connection %d accepted\n", new_connection->id); }; + unsigned short port = 9006; core::spawn_network_service("127.0.0.1", port, 300, accept_handler); core::spawn_network_service("127.0.0.1", port + 1, 300, accept_handler); //core::async_accept("127.0.0.1", port, accept_handler, false); core::spawn_worker_threads(2); + core::start_network(2); - core::async([]() {printf("async_task\n"); }); + core::async_after(5s, []() + { + printf("server network stopped.\n"); + core::stop_network(); + }); + + //core::async([]() {printf("async_task\n"); }); /* core::async_after(5s, []() {printf("async_task\n"); }); core::async_every(1000ms, []() @@ -57,7 +63,7 @@ namespace mmo_server }); */ - core::async_every(5s, stop_signal, []() {printf("stoppable periodic_async_task\n"); }); + //core::async_every(5s, stop_signal, []() {printf("stoppable periodic_async_task\n"); }); } @@ -68,6 +74,7 @@ namespace mmo_server void stop() { + core::purge_worker_threads(); //core::stop_gui(); } } diff --git a/Source/Server_net/network.cpp b/Source/Server_net/network.cpp index 1a82d81..d616677 100644 --- a/Source/Server_net/network.cpp +++ b/Source/Server_net/network.cpp @@ -11,11 +11,8 @@ namespace server { main_thread_info main_thread; std::vector worker_threads; - std::vector connection_pool2; - connection_pool connection_pool3; - std::list network_services; - bool network_services_running; + bool network_services_online; @@ -56,45 +53,63 @@ namespace server } } */ - void spawn_worker_threads(/*asio::io_context& io_context, */int thread_count) - { - worker_threads.reserve(thread_count); - for (int i = 0; i < thread_count; i++) + void spawn_worker_threads(uint32 worker_count) + { + if (worker_count == 0) + { + worker_count = std::thread::hardware_concurrency(); + } + + worker_threads.reserve(worker_count); + for (int i = 0; i < worker_count; i++) { worker_threads.emplace_back( std::string("worker_thread_") + std::to_string(i), [i]() + { + while (!network_services_online) { - io_context.run(); - printf("%s exited.\n", worker_threads[i].name.c_str()); - }); + std::this_thread::sleep_for(10ms); + } + + io_context.run(); + printf("%s exited.\n", worker_threads[i].name.c_str()); + }); } - - auto id = worker_threads.back().get_id(); } - void stop_worker_threads() + void purge_worker_threads() { - + for (auto& worker : worker_threads) + { + if (worker.joinable()) + { + worker.join(); + } + } + worker_threads.clear(); } - - void start_network(int thread_count) + void start_network(uint32 worker_count) { - assert(!network_services_running); + assert(!network_services_online); - spawn_worker_threads(thread_count); + //spawn_worker_threads(worker_count); + + network_services_online = true; } void stop_network() { - assert(network_services_running); + assert(network_services_online); for (auto& i : network_services) { i.stop_signal = true; } - stop_worker_threads(); + + io_context.stop(); + network_services_online = false; } void this_thread::assign_main_thread() diff --git a/Source/Server_net/network.h b/Source/Server_net/network.h index 7a1c337..dfda241 100644 --- a/Source/Server_net/network.h +++ b/Source/Server_net/network.h @@ -32,8 +32,6 @@ namespace server extern std::mutex connection_lock; - extern std::vector connection_pool2; - extern connection_pool connection_pool3; struct main_thread_info @@ -60,23 +58,28 @@ namespace server extern std::vector worker_threads; - template - struct async_accept_frame + struct connection_pool : public std::vector { + uint32 conneciton_count = 0; + }; + + template + struct async_accept_frame_impl + { + async_accept_frame_impl(asio::ip::tcp::acceptor& acceptor, connection_pool& connections, std::string ip, unsigned short port, lambda& accept_handler) : + ip(ip), port(port), accept_handler(accept_handler), acceptor(acceptor), connection_pool(connections), connection_count(connection_count) {} + std::string ip; unsigned short port; lambda accept_handler; asio::error_code error_code; asio::ip::tcp::acceptor& acceptor; - std::vector& connection_pool; + uint32& connection_count; tcp_connection* return_value; //asio::ip::tcp::socket socket; - - async_accept_frame(asio::ip::tcp::acceptor& acceptor, std::vector& connection_pool, std::string ip, unsigned short port, lambda& accept_handler) : - ip(ip), port(port), accept_handler(accept_handler), acceptor(acceptor), connection_pool(connection_pool) - {} + connection_pool& connection_pool; bool await_ready() { @@ -89,23 +92,30 @@ namespace server { this->error_code = error_code; - auto new_connection = new tcp_connection(std::move(socket)); return_value = nullptr; - uint32 id = 0; - for (auto& i : this->connection_pool) + for (int id = 0; id < this->connection_pool.size(); id++) { - if (!i) + auto& connection = this->connection_pool[id]; + if (!connection) { - i = new_connection; - i->id = id; - return_value = i; + this->connection_pool.conneciton_count++; + connection = new tcp_connection(std::move(socket)); + connection->id = id; + return_value = connection; break; } - id++; } + if (return_value == nullptr) + { + socket.shutdown(asio::socket_base::shutdown_both); + socket.close(); + } + + accept_handler(return_value); + coro.resume(); }); } @@ -122,43 +132,77 @@ namespace server }; + template + auto async_accept_frame(asio::ip::tcp::acceptor& acceptor, connection_pool& connections, std::string ip, unsigned short port, lambda& accept_handler) + { + return async_accept_frame_impl(acceptor, connections, ip, port, accept_handler); + } + + struct async_network_service { - std::string ip; - unsigned short port; - std::vector connection_pool; - uint32 connection_count; - asio::ip::tcp::acceptor acceptor; - volatile bool stop_signal; - template async_network_service(std::string ip, uint16 port, uint16 max_connection_count, lambda& accept_handler) : - ip(ip), port(port), connection_count(0), acceptor(io_context, { asio::ip::address::from_string(ip), port }), + ip(ip), port(port), acceptor(io_context, { asio::ip::address::from_string(ip), port }), stop_signal(false) { connection_pool.resize(max_connection_count); + + async_every(1000ms, [this]() + { + int connections_purged = 0; + for (auto& connection : this->connection_pool) + { + if (connection) + { + uint32 id = connection->id; + if (connection->marked_for_delete) + { + delete connection; + connection = nullptr; + this->connection_pool.conneciton_count--; + connections_purged++; + } + } + } + + if (connections_purged) + { + printf("%d connections purged for %s:%d.\n", connections_purged, this->ip.c_str(), this->port); + } + + }); + begin_async_accept(ip, port, accept_handler, stop_signal); } + std::string ip; + unsigned short port; + connection_pool connection_pool; + asio::ip::tcp::acceptor acceptor; + volatile bool stop_signal; + + + template - auto begin_async_accept(std::string ip, unsigned short port, lambda& accept_handler, volatile bool& stop_signal) -> std::future + std::future begin_async_accept(std::string ip, unsigned short port, lambda& accept_handler, volatile bool& stop_signal) { try { while (!stop_signal) { - auto new_connection = co_await async_accept_frame(acceptor, connection_pool, ip, port, accept_handler); + auto new_connection = co_await async_accept_frame(acceptor, connection_pool, ip, port, accept_handler); - connection_count++; if (new_connection) { - printf("new connection accepted: %d from %s:%d\n", new_connection->id, new_connection->address, port); - new_connection->async_recv(); + //printf("new connection accepted: %d from %s:%d\n", new_connection->id, new_connection->address.to_string().c_str(), port); + new_connection->begin_async_recv(); } else { - printf("new connection rejected, connection_count: %d.\n", connection_count); + + //printf("new connection rejected, connection_count: %d.\n", connection_pool.conneciton_count); } //co_await printf("something\n"); } @@ -172,25 +216,20 @@ namespace server }; extern std::list network_services; - extern bool network_services_running; + extern bool network_services_online; template void spawn_network_service(std::string ip, uint16 port, uint16 max_connection_count, lambda& accept_handler) { - assert(!network_services_running); + assert(!network_services_online); network_services.emplace_back(ip, port, max_connection_count, accept_handler); } - void spawn_worker_threads(int thread_count); - void stop_worker_threads(); - - void start_network(int thread_count); + void spawn_worker_threads(uint32 worker_count = 0); + void purge_worker_threads(); + void start_network(uint32 thread_count = 0); void stop_network(); - - - - /* asio::awaitable reader() {