This commit is contained in:
Joon Park 2020-03-03 06:16:26 -05:00
parent eb5bcc1f1c
commit fbc7a11b85
5 changed files with 168 additions and 181 deletions

View File

@ -4,19 +4,19 @@ namespace server
{ {
namespace core namespace core
{ {
template<typename socket_type, typename buffer_type> template<typename socket_t, typename buffer_t>
struct async_read_frame struct async_recv_frame_impl
{ {
socket_type& socket; socket_t& socket;
buffer_type buffer; buffer_t buffer;
asio::error_code error_code; asio::error_code error_code;
async_read_frame(socket_type& socket, buffer_type&& buffer) : socket(socket), buffer(std::move(buffer)) async_recv_frame_impl(socket_t& socket, buffer_t&& buffer) : socket(socket), buffer(std::move(buffer))
{ {
this->buffer; this->buffer;
} }
~async_read_frame() ~async_recv_frame_impl()
{ {
this->buffer; this->buffer;
} }
@ -52,6 +52,12 @@ namespace server
void await_suspend(std::experimental::coroutine_handle<> coro) void await_suspend(std::experimental::coroutine_handle<> coro)
{ {
socket.async_read_some(buffer, [this, coro](auto error_code, auto bytes_read)
{
this->error_code = error_code;
coro.resume();
});
/* /*
asio::co_spawn(io_context, asio::co_spawn(io_context,
[this]() [this]()
@ -59,27 +65,23 @@ namespace server
return async_read(); return async_read();
}, asio::detached); }, asio::detached);
co_await socket.async_receive(buffer, [this, coro](auto error_code, auto bytes_read) co_await socket.async_receive(buffer, [this, coro](auto error_code, auto bytes_read)
{ {
coro.resume(); coro.resume();
}); });
*/
/*
asio::async_read_until(socket, buffer, "\n", [this, coro](auto error_code, auto bytes_read) { asio::async_read_until(socket, buffer, "\n", [this, coro](auto error_code, auto bytes_read) {
this->error_code = error_code; this->error_code = error_code;
coro.resume(); coro.resume();
}); });
*/
/*
asio::async_read(socket, buffer, [this, coro](auto error_code, auto bytes_read) { asio::async_read(socket, buffer, [this, coro](auto error_code, auto bytes_read) {
this->error_code = error_code; this->error_code = error_code;
coro.resume(); coro.resume();
}); });
*/ */
socket.async_read_some(buffer, [this, coro](auto error_code, auto bytes_read) {
this->error_code = error_code;
coro.resume();
});
//socket.async_receive(buffer, [this, coro](auto error_code) { this->error_code = error_code; coro.resume(); }); //socket.async_receive(buffer, [this, coro](auto error_code) { this->error_code = error_code; coro.resume(); });
} }
@ -95,28 +97,29 @@ namespace server
struct tcp_connection struct tcp_connection
{ {
asio::ip::tcp::socket socket;
std::vector<uint8> rdata;
std::vector<uint8> wdata;
int id; int id;
asio::ip::address address; asio::ip::address address;
uint16 port; uint16 port;
bool marked_for_delete; bool marked_for_delete;
asio::ip::tcp::socket socket;
std::vector<uint8> rdata;
std::vector<uint8> wdata;
tcp_connection(asio::ip::tcp::socket&& new_socket) : tcp_connection(asio::ip::tcp::socket&& new_socket) :
socket(std::move(new_socket)),
id(-1), id(-1),
address(socket.remote_endpoint().address()), address(new_socket.remote_endpoint().address()),
port(socket.remote_endpoint().port()) port(new_socket.remote_endpoint().port()),
marked_for_delete(false),
socket(std::move(new_socket))
{ {
rdata.resize(12); rdata.resize(12);
//rdata.reserve(12); //rdata.reserve(12);
} }
void close() void shutdown_and_close()
{ {
asio::error_code error_code; asio::error_code error_code;
socket.shutdown(asio::ip::tcp::socket::shutdown_both, error_code); socket.shutdown(asio::ip::tcp::socket::shutdown_both, error_code);
@ -130,108 +133,29 @@ namespace server
marked_for_delete = true; marked_for_delete = true;
} }
//* template<typename socket_t, typename buffer_t>
template<typename socket_type, typename buffer_type> auto async_recv_frame(socket_t& socket, buffer_t buffer)
auto async_recv_helper2(socket_type& socket, buffer_type&& buffer)
{ {
return async_read_frame<socket_type, buffer_type>(socket, std::forward<buffer_type>(buffer)); return async_recv_frame_impl<socket_t, buffer_t>(socket, std::forward<buffer_t>(buffer));
} }
template<typename socket_type, typename buffer_type> std::future<void> begin_async_recv()
auto async_recv_helper(socket_type& socket, buffer_type buffer) -> std::future<void>
{ {
try try
{ {
while (socket.is_open()) while (socket.is_open())
{ {
co_await async_recv_helper2(socket, std::forward<buffer_type>(buffer)); co_await async_recv_frame(socket, asio::buffer(rdata, 5));
} }
} }
catch (const std::exception& /*exception*/) catch (const std::exception& /*exception*/)
{ {
printf("connection %d closed.", id); shutdown_and_close();
//printf("connection %d marked for delete.\n", id);
//printf("exception: %s", exception.what()); //printf("exception: %s", exception.what());
} }
} }
auto async_recv()
{
auto buffer = asio::buffer(rdata, 5);
async_recv_helper(socket, asio::buffer(rdata, 5));
}
#if 0
//*/
asio::awaitable<void> async_read()
{
try
{
char data[1024];
//for (rdata;;)
for (;;)
{
//std::size_t n;
//socket.async_receive(asio::buffer(rdata, 12), [this, &n](auto error_code, auto bytes_read) { n = bytes_read; });
std::size_t n = co_await socket.async_read_some(asio::buffer(rdata, 12), asio::use_awaitable);
//std::size_t n = co_await asio::async_read_until(socket, asio::dynamic_buffer(rdata, 12), "\n", asio::use_awaitable);
printf("%s\n", /*this_thread::get_debug_name().c_str(),*/ rdata.data());
rdata.erase(rdata.begin(), rdata.begin() + n);
}
}
catch (std::exception & e)
{
printf("exception: %s", e.what());
}
}
#endif
};
static_assert(std::is_nothrow_move_constructible<tcp_connection>::value);
struct connection_pool
{
std::vector<std::atomic<tcp_connection*>> connections;
bool allow_overflow;
void init(uint32 max_connection_count, bool allow_overflow = false)
{
//connections.reserve(max_connection_count);
this->allow_overflow = allow_overflow;
}
tcp_connection* allocate(asio::ip::tcp::socket&& socket)
{
tcp_connection* test = nullptr;
tcp_connection* taken = test + 1;
int id = 0;
for (auto& i : connections)
{
bool exchanged = i.compare_exchange_weak(test, taken);
if (exchanged)
{
//auto new_connection = new tcp_connection(std::move(socket));
//new_connection->id = id;
//i = new_connection;
//return new_connection;
}
id++;
}
return nullptr;
}
void release(int id)
{
//auto connection = connections[id].load();
//delete connection;
//connections[id].store(nullptr);
}
}; };
} }
} }

View File

@ -39,11 +39,11 @@ namespace server
try try
{ {
while (true) while (network_services_online)
{ {
Sleep(100); Sleep(100);
} }
io_context.run(); //io_context.run();
} }
catch (std::exception & e) catch (std::exception & e)
@ -51,6 +51,8 @@ namespace server
printf("exception: %s", e.what()); printf("exception: %s", e.what());
//std::cerr << "Exception: " << e.what() << "\n"; //std::cerr << "Exception: " << e.what() << "\n";
} }
printf("server main_loop complete.\n");
} }

View File

@ -36,19 +36,25 @@ namespace mmo_server
core::init_signals(signal_handler); core::init_signals(signal_handler);
//core::init_gui(); //core::init_gui();
unsigned short port = 9006; auto accept_handler = [](core::tcp_connection* new_connection)
auto accept_handler = [&port]()
{ {
auto newport = port + 1; printf("connection %d accepted\n", new_connection->id);
printf("accepted\n");
}; };
unsigned short port = 9006;
core::spawn_network_service("127.0.0.1", port, 300, accept_handler); core::spawn_network_service("127.0.0.1", port, 300, accept_handler);
core::spawn_network_service("127.0.0.1", port + 1, 300, accept_handler); core::spawn_network_service("127.0.0.1", port + 1, 300, accept_handler);
//core::async_accept("127.0.0.1", port, accept_handler, false); //core::async_accept("127.0.0.1", port, accept_handler, false);
core::spawn_worker_threads(2); core::spawn_worker_threads(2);
core::start_network(2);
core::async([]() {printf("async_task\n"); }); core::async_after(5s, []()
{
printf("server network stopped.\n");
core::stop_network();
});
//core::async([]() {printf("async_task\n"); });
/* /*
core::async_after(5s, []() {printf("async_task\n"); }); core::async_after(5s, []() {printf("async_task\n"); });
core::async_every(1000ms, []() core::async_every(1000ms, []()
@ -57,7 +63,7 @@ namespace mmo_server
}); });
*/ */
core::async_every(5s, stop_signal, []() {printf("stoppable periodic_async_task\n"); }); //core::async_every(5s, stop_signal, []() {printf("stoppable periodic_async_task\n"); });
} }
@ -68,6 +74,7 @@ namespace mmo_server
void stop() void stop()
{ {
core::purge_worker_threads();
//core::stop_gui(); //core::stop_gui();
} }
} }

View File

@ -11,11 +11,8 @@ namespace server
{ {
main_thread_info main_thread; main_thread_info main_thread;
std::vector<worker_thread> worker_threads; std::vector<worker_thread> worker_threads;
std::vector<tcp_connection> connection_pool2;
connection_pool connection_pool3;
std::list<async_network_service> network_services; std::list<async_network_service> network_services;
bool network_services_running; bool network_services_online;
@ -56,45 +53,63 @@ namespace server
} }
} }
*/ */
void spawn_worker_threads(/*asio::io_context& io_context, */int thread_count)
{
worker_threads.reserve(thread_count); void spawn_worker_threads(uint32 worker_count)
for (int i = 0; i < thread_count; i++) {
if (worker_count == 0)
{
worker_count = std::thread::hardware_concurrency();
}
worker_threads.reserve(worker_count);
for (int i = 0; i < worker_count; i++)
{ {
worker_threads.emplace_back( worker_threads.emplace_back(
std::string("worker_thread_") + std::to_string(i), std::string("worker_thread_") + std::to_string(i),
[i]() [i]()
{ {
while (!network_services_online)
{
std::this_thread::sleep_for(10ms);
}
io_context.run(); io_context.run();
printf("%s exited.\n", worker_threads[i].name.c_str()); printf("%s exited.\n", worker_threads[i].name.c_str());
}); });
} }
auto id = worker_threads.back().get_id();
} }
void stop_worker_threads() void purge_worker_threads()
{ {
for (auto& worker : worker_threads)
{
if (worker.joinable())
{
worker.join();
}
}
worker_threads.clear();
} }
void start_network(uint32 worker_count)
void start_network(int thread_count)
{ {
assert(!network_services_running); assert(!network_services_online);
spawn_worker_threads(thread_count); //spawn_worker_threads(worker_count);
network_services_online = true;
} }
void stop_network() void stop_network()
{ {
assert(network_services_running); assert(network_services_online);
for (auto& i : network_services) for (auto& i : network_services)
{ {
i.stop_signal = true; i.stop_signal = true;
} }
stop_worker_threads();
io_context.stop();
network_services_online = false;
} }
void this_thread::assign_main_thread() void this_thread::assign_main_thread()

View File

@ -32,8 +32,6 @@ namespace server
extern std::mutex connection_lock; extern std::mutex connection_lock;
extern std::vector<tcp_connection> connection_pool2;
extern connection_pool connection_pool3;
struct main_thread_info struct main_thread_info
@ -60,23 +58,28 @@ namespace server
extern std::vector<worker_thread> worker_threads; extern std::vector<worker_thread> worker_threads;
template<typename lambda> struct connection_pool : public std::vector<tcp_connection*>
struct async_accept_frame
{ {
uint32 conneciton_count = 0;
};
template<typename lambda>
struct async_accept_frame_impl
{
async_accept_frame_impl(asio::ip::tcp::acceptor& acceptor, connection_pool& connections, std::string ip, unsigned short port, lambda& accept_handler) :
ip(ip), port(port), accept_handler(accept_handler), acceptor(acceptor), connection_pool(connections), connection_count(connection_count) {}
std::string ip; std::string ip;
unsigned short port; unsigned short port;
lambda accept_handler; lambda accept_handler;
asio::error_code error_code; asio::error_code error_code;
asio::ip::tcp::acceptor& acceptor; asio::ip::tcp::acceptor& acceptor;
std::vector<tcp_connection*>& connection_pool; uint32& connection_count;
tcp_connection* return_value; tcp_connection* return_value;
//asio::ip::tcp::socket socket; //asio::ip::tcp::socket socket;
connection_pool& connection_pool;
async_accept_frame(asio::ip::tcp::acceptor& acceptor, std::vector<tcp_connection*>& connection_pool, std::string ip, unsigned short port, lambda& accept_handler) :
ip(ip), port(port), accept_handler(accept_handler), acceptor(acceptor), connection_pool(connection_pool)
{}
bool await_ready() bool await_ready()
{ {
@ -89,23 +92,30 @@ namespace server
{ {
this->error_code = error_code; this->error_code = error_code;
auto new_connection = new tcp_connection(std::move(socket));
return_value = nullptr; return_value = nullptr;
uint32 id = 0;
for (auto& i : this->connection_pool) for (int id = 0; id < this->connection_pool.size(); id++)
{ {
if (!i) auto& connection = this->connection_pool[id];
if (!connection)
{ {
i = new_connection; this->connection_pool.conneciton_count++;
i->id = id; connection = new tcp_connection(std::move(socket));
return_value = i; connection->id = id;
return_value = connection;
break; break;
} }
id++;
} }
if (return_value == nullptr)
{
socket.shutdown(asio::socket_base::shutdown_both);
socket.close();
}
accept_handler(return_value);
coro.resume(); coro.resume();
}); });
} }
@ -122,43 +132,77 @@ namespace server
}; };
template<typename lambda>
auto async_accept_frame(asio::ip::tcp::acceptor& acceptor, connection_pool& connections, std::string ip, unsigned short port, lambda& accept_handler)
{
return async_accept_frame_impl<lambda>(acceptor, connections, ip, port, accept_handler);
}
struct async_network_service struct async_network_service
{ {
std::string ip;
unsigned short port;
std::vector<tcp_connection*> connection_pool;
uint32 connection_count;
asio::ip::tcp::acceptor acceptor;
volatile bool stop_signal;
template<typename lambda> template<typename lambda>
async_network_service(std::string ip, uint16 port, uint16 max_connection_count, lambda& accept_handler) : async_network_service(std::string ip, uint16 port, uint16 max_connection_count, lambda& accept_handler) :
ip(ip), port(port), connection_count(0), acceptor(io_context, { asio::ip::address::from_string(ip), port }), ip(ip), port(port), acceptor(io_context, { asio::ip::address::from_string(ip), port }),
stop_signal(false) stop_signal(false)
{ {
connection_pool.resize(max_connection_count); connection_pool.resize(max_connection_count);
async_every(1000ms, [this]()
{
int connections_purged = 0;
for (auto& connection : this->connection_pool)
{
if (connection)
{
uint32 id = connection->id;
if (connection->marked_for_delete)
{
delete connection;
connection = nullptr;
this->connection_pool.conneciton_count--;
connections_purged++;
}
}
}
if (connections_purged)
{
printf("%d connections purged for %s:%d.\n", connections_purged, this->ip.c_str(), this->port);
}
});
begin_async_accept(ip, port, accept_handler, stop_signal); begin_async_accept(ip, port, accept_handler, stop_signal);
} }
std::string ip;
unsigned short port;
connection_pool connection_pool;
asio::ip::tcp::acceptor acceptor;
volatile bool stop_signal;
template<typename lambda> template<typename lambda>
auto begin_async_accept(std::string ip, unsigned short port, lambda& accept_handler, volatile bool& stop_signal) -> std::future<void> std::future<void> begin_async_accept(std::string ip, unsigned short port, lambda& accept_handler, volatile bool& stop_signal)
{ {
try try
{ {
while (!stop_signal) while (!stop_signal)
{ {
auto new_connection = co_await async_accept_frame<lambda>(acceptor, connection_pool, ip, port, accept_handler); auto new_connection = co_await async_accept_frame(acceptor, connection_pool, ip, port, accept_handler);
connection_count++;
if (new_connection) if (new_connection)
{ {
printf("new connection accepted: %d from %s:%d\n", new_connection->id, new_connection->address, port); //printf("new connection accepted: %d from %s:%d\n", new_connection->id, new_connection->address.to_string().c_str(), port);
new_connection->async_recv(); new_connection->begin_async_recv();
} }
else else
{ {
printf("new connection rejected, connection_count: %d.\n", connection_count);
//printf("new connection rejected, connection_count: %d.\n", connection_pool.conneciton_count);
} }
//co_await printf("something\n"); //co_await printf("something\n");
} }
@ -172,25 +216,20 @@ namespace server
}; };
extern std::list<async_network_service> network_services; extern std::list<async_network_service> network_services;
extern bool network_services_running; extern bool network_services_online;
template<typename lambda> template<typename lambda>
void spawn_network_service(std::string ip, uint16 port, uint16 max_connection_count, lambda& accept_handler) void spawn_network_service(std::string ip, uint16 port, uint16 max_connection_count, lambda& accept_handler)
{ {
assert(!network_services_running); assert(!network_services_online);
network_services.emplace_back(ip, port, max_connection_count, accept_handler); network_services.emplace_back(ip, port, max_connection_count, accept_handler);
} }
void spawn_worker_threads(int thread_count); void spawn_worker_threads(uint32 worker_count = 0);
void stop_worker_threads(); void purge_worker_threads();
void start_network(uint32 thread_count = 0);
void start_network(int thread_count);
void stop_network(); void stop_network();
/* /*
asio::awaitable<void> reader() asio::awaitable<void> reader()
{ {