This commit is contained in:
Joon Park 2020-03-03 22:43:43 -05:00
parent fbc7a11b85
commit d3deaa4d7e
21 changed files with 303 additions and 287 deletions

View File

@ -1,161 +0,0 @@
#pragma once
namespace server
{
namespace core
{
template<typename socket_t, typename buffer_t>
struct async_recv_frame_impl
{
socket_t& socket;
buffer_t buffer;
asio::error_code error_code;
async_recv_frame_impl(socket_t& socket, buffer_t&& buffer) : socket(socket), buffer(std::move(buffer))
{
this->buffer;
}
~async_recv_frame_impl()
{
this->buffer;
}
asio::awaitable<void> async_read()
{
try
{
std::size_t n = co_await socket.async_read_some(buffer, asio::use_awaitable);
//printf("%s\n", /*this_thread::get_debug_name().c_str(),*/ buffer);
//rdata.erase(rdata.begin(), rdata.begin() + n);
//for (;;)
{
//std::size_t n;
//socket.async_receive(asio::buffer(rdata, 12), [this, &n](auto error_code, auto bytes_read) { n = bytes_read; });
//std::size_t n = co_await asio::async_read_until(socket, asio::dynamic_buffer(rdata, 12), "\n", asio::use_awaitable);
}
}
catch (std::exception & e)
{
printf("exception: %s", e.what());
}
}
bool await_ready()
{
return false;
}
void await_suspend(std::experimental::coroutine_handle<> coro)
{
socket.async_read_some(buffer, [this, coro](auto error_code, auto bytes_read)
{
this->error_code = error_code;
coro.resume();
});
/*
asio::co_spawn(io_context,
[this]()
{
return async_read();
}, asio::detached);
co_await socket.async_receive(buffer, [this, coro](auto error_code, auto bytes_read)
{
coro.resume();
});
asio::async_read_until(socket, buffer, "\n", [this, coro](auto error_code, auto bytes_read) {
this->error_code = error_code;
coro.resume();
});
asio::async_read(socket, buffer, [this, coro](auto error_code, auto bytes_read) {
this->error_code = error_code;
coro.resume();
});
*/
//socket.async_receive(buffer, [this, coro](auto error_code) { this->error_code = error_code; coro.resume(); });
}
void await_resume()
{
if (error_code)
{
throw asio::system_error(error_code);
}
}
};
struct tcp_connection
{
int id;
asio::ip::address address;
uint16 port;
bool marked_for_delete;
asio::ip::tcp::socket socket;
std::vector<uint8> rdata;
std::vector<uint8> wdata;
tcp_connection(asio::ip::tcp::socket&& new_socket) :
id(-1),
address(new_socket.remote_endpoint().address()),
port(new_socket.remote_endpoint().port()),
marked_for_delete(false),
socket(std::move(new_socket))
{
rdata.resize(12);
//rdata.reserve(12);
}
void shutdown_and_close()
{
asio::error_code error_code;
socket.shutdown(asio::ip::tcp::socket::shutdown_both, error_code);
if (error_code)
printf("network: tcp_connection::close: %s errored when shutting down socket: %i (%s)", address.to_string().c_str(),
error_code.value(), error_code.message().c_str());
socket.close();
marked_for_delete = true;
}
template<typename socket_t, typename buffer_t>
auto async_recv_frame(socket_t& socket, buffer_t buffer)
{
return async_recv_frame_impl<socket_t, buffer_t>(socket, std::forward<buffer_t>(buffer));
}
std::future<void> begin_async_recv()
{
try
{
while (socket.is_open())
{
co_await async_recv_frame(socket, asio::buffer(rdata, 5));
}
}
catch (const std::exception& /*exception*/)
{
shutdown_and_close();
//printf("connection %d marked for delete.\n", id);
//printf("exception: %s", exception.what());
}
}
};
}
}

View File

@ -0,0 +1,83 @@
#pragma once
namespace server
{
namespace core
{
template<typename lambda>
struct async_accept_frame
{
async_accept_frame(asio::ip::tcp::acceptor& acceptor, connection_pool& connections, std::string ip, unsigned short port, lambda& accept_handler) :
ip(ip), port(port), accept_handler(accept_handler), acceptor(acceptor), connection_pool(connections), connection_count(connection_pool.conneciton_count), return_value(nullptr) {}
std::string ip;
unsigned short port;
lambda accept_handler;
asio::error_code error_code;
asio::ip::tcp::acceptor& acceptor;
uint32& connection_count;
tcp_connection* return_value;
//asio::ip::tcp::socket socket;
connection_pool& connection_pool;
bool await_ready()
{
return false;
}
void await_suspend(std::experimental::coroutine_handle<> coro)
{
acceptor.async_accept([this, coro](auto error_code, auto socket)
{
this->error_code = error_code;
return_value = nullptr;
for (int id = 0; id < this->connection_pool.size(); id++)
{
auto& connection = this->connection_pool[id];
if (!connection)
{
this->connection_pool.conneciton_count++;
connection = new tcp_connection(std::move(socket));
connection->id = id;
return_value = connection;
break;
}
}
if (return_value == nullptr)
{
socket.shutdown(asio::socket_base::shutdown_both);
socket.close();
}
accept_handler(return_value);
coro.resume();
});
}
tcp_connection* await_resume()
{
if (error_code)
{
throw asio::system_error(error_code);
}
return return_value;
}
};
template<typename lambda>
auto async_accept(asio::ip::tcp::acceptor& acceptor, connection_pool& connections, std::string ip, unsigned short port, lambda& accept_handler)
{
return async_accept_frame<lambda>(acceptor, connections, ip, port, accept_handler);
}
}
}

View File

@ -7,6 +7,8 @@
#include <vector>
#include <thread>
#include <future>
#include <chrono>
using namespace std::chrono;
// From Cod

View File

@ -0,0 +1,92 @@
#include "asio/ip/tcp.hpp"
#include "common.h"
#include "core.h"
#include "connection.h"
#include "timer.h"
namespace server
{
namespace core
{
tcp_connection::tcp_connection(asio::ip::tcp::socket&& new_socket) :
id(-1),
address(new_socket.remote_endpoint().address()),
port(new_socket.remote_endpoint().port()),
marked_for_delete(false),
socket(std::move(new_socket))
{
rdata.resize(1024);
//rdata.reserve(12);
}
void tcp_connection::shutdown_and_close()
{
asio::error_code error_code;
socket.shutdown(asio::ip::tcp::socket::shutdown_both, error_code);
if (error_code)
printf("network: tcp_connection::close: %s errored when shutting down socket: %i (%s)", address.to_string().c_str(),
error_code.value(), error_code.message().c_str());
socket.close();
marked_for_delete = true;
}
std::future<void> server::core::tcp_connection::begin_async_recv()
{
try
{
while (socket.is_open())
{
size_t bytes_read = co_await async_recv(*this, asio::buffer(rdata, 1024));
printf("connection %d:%s\n", id,/*this_thread::get_debug_name().c_str(),*/ &rdata[0]);
memset(&rdata[0], 0, bytes_read);
}
}
catch (const std::exception& /*exception*/)
{
shutdown_and_close();
//printf("connection %d marked for delete.\n", id);
//printf("exception: %s", exception.what());
}
}
connection_pool::connection_pool(asio::io_context& io_context, std::string ip, uint16 port, uint16 max_connection_count) :
ip(ip), port(port), io_context(io_context)
{
this->resize(max_connection_count);
async_every(1000ms, [this]()
{
int connections_purged = 0;
for (auto& connection : *this)
{
if (connection)
{
uint32 id = connection->id;
if (connection->marked_for_delete)
{
delete connection;
connection = nullptr;
this->conneciton_count--;
connections_purged++;
}
}
}
if (connections_purged)
{
printf("%d connections purged for %s:%d.\n", connections_purged, this->ip.c_str(), this->port);
}
});
}
}
}

View File

@ -0,0 +1,108 @@
#pragma once
namespace server
{
namespace core
{
struct tcp_connection
{
int id;
asio::ip::address address;
uint16 port;
bool marked_for_delete;
asio::ip::tcp::socket socket;
std::vector<uint8> rdata;
std::vector<uint8> wdata;
tcp_connection(asio::ip::tcp::socket&& new_socket);
void shutdown_and_close();
template<typename buffer_t>
auto async_recv(tcp_connection& connection, buffer_t buffer);
std::future<void> begin_async_recv();
};
struct connection_pool : public std::vector<tcp_connection*>
{
std::string ip;
unsigned short port;
asio::io_context& io_context;
uint32 conneciton_count = 0;
connection_pool(asio::io_context& io_context, std::string ip, uint16 port, uint16 max_connection_count);
};
template<typename connection_t, typename buffer_t>
struct async_recv_frame
{
connection_t& connection;
buffer_t buffer;
asio::error_code error_code;
size_t return_value;
async_recv_frame(connection_t& connection, buffer_t&& buffer) : connection(connection), buffer(std::move(buffer)) {}
bool await_ready()
{
return false;
}
void await_suspend(std::experimental::coroutine_handle<> coro)
{
connection.socket.async_read_some(buffer, [this, coro](auto error_code, size_t bytes_read)
{
this->error_code = error_code;
printf("%lld bytes read from connection %d\n", bytes_read, connection.id);
return_value = bytes_read;
coro.resume();
});
/*
asio::co_spawn(io_context,
[this]()
{
return async_read();
}, asio::detached);
co_await socket.async_receive(buffer, [this, coro](auto error_code, auto bytes_read)
{
coro.resume();
});
asio::async_read_until(socket, buffer, "\n", [this, coro](auto error_code, auto bytes_read) {
this->error_code = error_code;
coro.resume();
});
asio::async_read(socket, buffer, [this, coro](auto error_code, auto bytes_read) {
this->error_code = error_code;
coro.resume();
});
*/
//socket.async_receive(buffer, [this, coro](auto error_code) { this->error_code = error_code; coro.resume(); });
}
size_t await_resume()
{
if (error_code)
{
throw asio::system_error(error_code);
}
return return_value;
}
};
template<typename buffer_t>
auto tcp_connection::async_recv(tcp_connection& connection, buffer_t buffer)
{
return async_recv_frame<tcp_connection, buffer_t>(connection, std::forward<buffer_t>(buffer));
}
}
}

View File

@ -41,14 +41,14 @@ namespace mmo_server
printf("connection %d accepted\n", new_connection->id);
};
unsigned short port = 9006;
unsigned short port = 9030;
core::spawn_network_service("127.0.0.1", port, 300, accept_handler);
core::spawn_network_service("127.0.0.1", port + 1, 300, accept_handler);
//core::spawn_network_service("127.0.0.1", port + 1, 300, accept_handler);
//core::async_accept("127.0.0.1", port, accept_handler, false);
core::spawn_worker_threads(2);
core::start_network(2);
core::async_after(5s, []()
core::async_after(100s, []()
{
printf("server network stopped.\n");
core::stop_network();

View File

@ -62,7 +62,7 @@ namespace server
}
worker_threads.reserve(worker_count);
for (int i = 0; i < worker_count; i++)
for (uint32 i = 0; i < worker_count; i++)
{
worker_threads.emplace_back(
std::string("worker_thread_") + std::to_string(i),

View File

@ -13,12 +13,12 @@
#include "timer.h"
#include "connection.h"
#include "acceptor.h"
namespace server
{
namespace core
{
using namespace std::chrono;
extern asio::io_context io_context;
@ -31,9 +31,6 @@ namespace server
};
extern std::mutex connection_lock;
struct main_thread_info
{
std::thread::id id;
@ -57,131 +54,24 @@ namespace server
extern main_thread_info main_thread;
extern std::vector<worker_thread> worker_threads;
struct connection_pool : public std::vector<tcp_connection*>
{
uint32 conneciton_count = 0;
};
template<typename lambda>
struct async_accept_frame_impl
{
async_accept_frame_impl(asio::ip::tcp::acceptor& acceptor, connection_pool& connections, std::string ip, unsigned short port, lambda& accept_handler) :
ip(ip), port(port), accept_handler(accept_handler), acceptor(acceptor), connection_pool(connections), connection_count(connection_count) {}
std::string ip;
unsigned short port;
lambda accept_handler;
asio::error_code error_code;
asio::ip::tcp::acceptor& acceptor;
uint32& connection_count;
tcp_connection* return_value;
//asio::ip::tcp::socket socket;
connection_pool& connection_pool;
bool await_ready()
{
return false;
}
void await_suspend(std::experimental::coroutine_handle<> coro)
{
acceptor.async_accept([this, coro](auto error_code, auto socket)
{
this->error_code = error_code;
return_value = nullptr;
for (int id = 0; id < this->connection_pool.size(); id++)
{
auto& connection = this->connection_pool[id];
if (!connection)
{
this->connection_pool.conneciton_count++;
connection = new tcp_connection(std::move(socket));
connection->id = id;
return_value = connection;
break;
}
}
if (return_value == nullptr)
{
socket.shutdown(asio::socket_base::shutdown_both);
socket.close();
}
accept_handler(return_value);
coro.resume();
});
}
tcp_connection* await_resume()
{
if (error_code)
{
throw asio::system_error(error_code);
}
return return_value;
}
};
template<typename lambda>
auto async_accept_frame(asio::ip::tcp::acceptor& acceptor, connection_pool& connections, std::string ip, unsigned short port, lambda& accept_handler)
{
return async_accept_frame_impl<lambda>(acceptor, connections, ip, port, accept_handler);
}
struct async_network_service
{
template<typename lambda>
async_network_service(std::string ip, uint16 port, uint16 max_connection_count, lambda& accept_handler) :
ip(ip), port(port), acceptor(io_context, { asio::ip::address::from_string(ip), port }),
stop_signal(false)
{
connection_pool.resize(max_connection_count);
async_every(1000ms, [this]()
{
int connections_purged = 0;
for (auto& connection : this->connection_pool)
{
if (connection)
{
uint32 id = connection->id;
if (connection->marked_for_delete)
{
delete connection;
connection = nullptr;
this->connection_pool.conneciton_count--;
connections_purged++;
}
}
}
if (connections_purged)
{
printf("%d connections purged for %s:%d.\n", connections_purged, this->ip.c_str(), this->port);
}
});
begin_async_accept(ip, port, accept_handler, stop_signal);
}
std::string ip;
unsigned short port;
connection_pool connection_pool;
asio::ip::tcp::acceptor acceptor;
volatile bool stop_signal;
template<typename lambda>
async_network_service(std::string ip, uint16 port, uint16 max_connection_count, lambda& accept_handler) :
ip(ip), port(port), connection_pool(io_context, ip, port, max_connection_count), acceptor(io_context, { asio::ip::address::from_string(ip), port }),
stop_signal(false)
{
begin_async_accept(ip, port, accept_handler, stop_signal);
}
template<typename lambda>
@ -191,7 +81,7 @@ namespace server
{
while (!stop_signal)
{
auto new_connection = co_await async_accept_frame(acceptor, connection_pool, ip, port, accept_handler);
auto new_connection = co_await async_accept(acceptor, connection_pool, ip, port, accept_handler);
if (new_connection)

View File

@ -0,0 +1 @@
#pragma once

View File

@ -0,0 +1 @@
#pragma once