This commit is contained in:
Joon Park 2020-03-04 05:18:18 -05:00
parent d3deaa4d7e
commit 224369ce59
34 changed files with 610 additions and 1031 deletions

View File

@ -1,4 +1,5 @@
SET( DEFINE
_CRT_SECURE_NO_WARNINGS
)
SET( INCLUDE
)

View File

@ -1,7 +1,13 @@
SET( DEFINE
)
SET( INCLUDE
asio
docopt
imgui
)
SET( LINK
asio
docopt
imgui
)
create_project(STATIC DEFINE INCLUDE LINK)

View File

@ -1,8 +1,6 @@
#include "asio/ip/tcp.hpp"
#include "core/utils/index.h"
#include "common.h"
#include "core.h"
#include "core/shared.h"
#include "connection.h"
#include "timer.h"
@ -36,7 +34,7 @@ namespace server
marked_for_delete = true;
}
std::future<void> server::core::tcp_connection::begin_async_recv()
std::future<void> server::core::tcp_connection::async_recv_loop()
{
try
{

View File

@ -1,9 +1,89 @@
#pragma once
#include "asio/ip/tcp.hpp"
#include "asio/read.hpp"
namespace server
{
namespace core
{
struct tcp_connection
{
int id;
asio::ip::address address;
uint16 port;
bool marked_for_delete;
asio::ip::tcp::socket socket;
std::vector<uint8> rdata;
std::vector<uint8> wdata;
tcp_connection(asio::ip::tcp::socket&& new_socket);
void shutdown_and_close();
//template<typename buffer_t>
//auto async_recv(tcp_connection& connection, buffer_t buffer);
std::future<void> async_recv_loop();
};
struct connection_pool : public std::vector<tcp_connection*>
{
std::string ip;
unsigned short port;
asio::io_context& io_context;
uint32 conneciton_count = 0;
connection_pool(asio::io_context& io_context, std::string ip, uint16 port, uint16 max_connection_count);
};
template<typename connection_t, typename buffer_t>
struct async_recv_frame
{
connection_t& connection;
buffer_t buffer;
asio::error_code error_code;
size_t return_value;
async_recv_frame(connection_t& connection, buffer_t&& buffer) :
connection(connection), buffer(std::move(buffer)), error_code(), return_value(0) {}
bool await_ready()
{
return false;
}
void await_suspend(std::experimental::coroutine_handle<> coro)
{
connection.socket.async_receive(buffer, [this, coro](auto error_code, size_t bytes_read)
{
this->error_code = error_code;
printf("%lld bytes read from connection %d\n", bytes_read, connection.id);
return_value = bytes_read;
coro.resume();
});
}
size_t await_resume()
{
if (error_code)
{
throw asio::system_error(error_code);
}
return return_value;
}
};
template<typename buffer_t>
auto async_recv(tcp_connection& connection, buffer_t buffer)
{
return async_recv_frame<tcp_connection, buffer_t>(connection, std::forward<buffer_t>(buffer));
}
template<typename lambda>
struct async_accept_frame
{

View File

@ -0,0 +1,36 @@
#include "core/utils/index.h"
#include "core/shared.h"
#include "core/network/network_service.h"
#include "core/utils/assertion_macros.h" // fix assert override from asio
namespace server
{
namespace core
{
std::list<async_network_service> network_services;
void start_network()
{
assert(server_status != e_server_status::running);
server_status = e_server_status::running;
}
void stop_network()
{
assert(server_status == e_server_status::running);
for (auto& i : network_services)
{
i.stop_signal = true;
}
io_context.stop();
server_status = e_server_status::stopped;
}
}
}

View File

@ -0,0 +1,73 @@
#pragma once
#include "core/network/connection.h"
namespace server
{
namespace core
{
// public
extern std::list<struct async_network_service> network_services;
template<typename lambda>
void spawn_network_service(std::string ip, uint16 port, uint16 max_connection_count, lambda& accept_handler);
void start_network();
void stop_network();
// detail
struct async_network_service
{
std::string ip;
unsigned short port;
connection_pool connection_pool;
asio::ip::tcp::acceptor acceptor;
volatile bool stop_signal;
template<typename lambda>
async_network_service(std::string ip, uint16 port, uint16 max_connection_count, lambda& accept_handler) :
ip(ip), port(port), connection_pool(io_context, ip, port, max_connection_count), acceptor(io_context, { asio::ip::address::from_string(ip), port }), stop_signal(false)
{
async_accept_loop(ip, port, accept_handler, stop_signal);
}
template<typename lambda>
std::future<void> async_accept_loop(std::string ip, unsigned short port, lambda& accept_handler, volatile bool& stop_signal);
};
template<typename lambda>
std::future<void> async_network_service::async_accept_loop(std::string ip, unsigned short port, lambda& accept_handler, volatile bool& stop_signal)
{
try
{
while (!stop_signal)
{
auto new_connection = co_await async_accept(acceptor, connection_pool, ip, port, accept_handler);
if (new_connection)
{
//printf("new connection accepted: %d from %s:%d\n", new_connection->id, new_connection->address.to_string().c_str(), port);
new_connection->async_recv_loop();
}
else
{
//printf("new connection rejected, connection_count: %d.\n", connection_pool.conneciton_count);
}
//co_await printf("something\n");
}
}
catch (const std::exception & exception)
{
printf("exception: %s", exception.what());
}
}
template<typename lambda>
void spawn_network_service(std::string ip, uint16 port, uint16 max_connection_count, lambda& accept_handler)
{
assert(server_status == e_server_status::initializing);
network_services.emplace_back(ip, port, max_connection_count, accept_handler);
}
}
}

View File

@ -0,0 +1,6 @@
#pragma once
#include "core/shared.h"
#include "connection.h"
#include "session.h"

View File

@ -0,0 +1,27 @@
#pragma once
#include "connection.h"
namespace server
{
namespace core
{
enum connection_type : uint8
{
CONNECTION_TYPE_GAME = 0,
CONNECTION_TYPE_MAIL = 1,
MAX_CONNECTION_TYPES,
};
struct session
{
tcp_connection* connections[MAX_CONNECTION_TYPES];
};
struct session_pool : std::vector<session>
{
uint32 session_count;
};
}
}

View File

@ -1,18 +1,34 @@
#pragma once
#include "asio/steady_timer.hpp"
namespace server
{
namespace core
{
// public
template<typename lambda>
auto async(lambda task);
template<typename duration = std::chrono::milliseconds, typename lambda>
auto async_after(duration delay, lambda task) -> std::future<void>;
template<typename duration = std::chrono::milliseconds, typename lambda>
auto async_every(duration delay, lambda task) -> std::future<void>;
template<typename duration = std::chrono::milliseconds, typename lambda>
auto async_every(duration delay, volatile int& stop_signal, lambda task) -> std::future<void>;
// detail
template<typename duration>
struct async_timer_impl
struct async_timer_frame
{
asio::steady_timer timer;
duration delay;
asio::error_code error_code;
async_timer_impl(/*asio::io_context& io_context, */duration delay) : timer(io_context), delay(delay) {}
async_timer_frame(duration delay) : timer(io_context), delay(delay) {}
bool await_ready()
{
@ -37,42 +53,28 @@ namespace server
template<typename duration>
auto async_timer(duration delay)
{
return async_timer_impl<duration>(delay);
return async_timer_frame<duration>(delay);
}
/*
template<typename lambda, typename duration = std::chrono::milliseconds>
auto async(lambda task, duration delay = 0ms) -> std::future<void>
{
co_await async_timer(delay);
task();
}
*/
template<typename lambda>
auto async(lambda task)
{
asio::co_spawn(io_context,
[&task]() -> asio::awaitable<void>
{
auto executor = co_await asio::this_coro::executor;
task();
}
, asio::detached);
//co_await asio::this_coro::executor;
//task();
asio::co_spawn(io_context, [&task]() -> asio::awaitable<void>
{
auto executor = co_await asio::this_coro::executor;
task();
}, asio::detached);
}
template<typename duration = std::chrono::milliseconds, typename lambda>
template<typename duration, typename lambda>
auto async_after(duration delay, lambda task) -> std::future<void>
{
co_await async_timer(delay);
task();
}
template<typename lambda, typename duration = std::chrono::milliseconds>
auto async_every(duration delay, lambda task)->std::future<void>
template<typename duration, typename lambda>
auto async_every(duration delay, lambda task) -> std::future<void>
{
while (true)
{
@ -81,7 +83,7 @@ namespace server
}
}
template<typename duration = std::chrono::milliseconds, typename lambda>
template<typename duration, typename lambda>
auto async_every(duration delay, volatile int& stop_signal, lambda task) -> std::future<void>
{
while (!stop_signal)

View File

@ -1,37 +1,26 @@
#include <functional>
#include <future>
#include <chrono>
#include <thread>
#include <csignal>
#include "imgui.h"
#include "imgui_impl_glfw_gl3.h"
#include "GL/gl3w.h"
#include "glfw/glfw3.h"
#include "common.h"
#include "core.h"
#include "network.h"
#include "core/utils/index.h"
#include "core/shared.h"
namespace server
{
volatile int signal_status;
namespace core
{
// public
asio::io_context io_context;
volatile e_server_status server_status;
// IO loop
// detail
void main_loop()
{
this_thread::assign_main_thread();
@ -39,7 +28,7 @@ namespace server
try
{
while (network_services_online)
while (server_status == e_server_status::running)
{
Sleep(100);
}
@ -67,10 +56,6 @@ namespace server
std::signal(SIGTERM, signal_handler_wrapper);
}
void init_io_context(int thread_count)
{
}
// GUI
GLFWwindow* glfw_window;

37
Source/core/core/shared.h Normal file
View File

@ -0,0 +1,37 @@
#pragma once
#include "core/utils/standard_libraries.h"
#include "core/utils/defines.h"
#include "core/utils/types.h"
#include "core/utils/assertion_macros.h"
#include "core/utils/thread.h"
#include "asio/io_context.hpp"
#include "core/utils/assertion_macros.h" // fix assert override from asio
namespace server
{
namespace core
{
// public
extern asio::io_context io_context;
extern volatile enum class e_server_status server_status;
void main_loop();
void init_signals(::std::function<void(int)> custom_signal_handler);
void init_gui();
void render_gui();
void stop_gui();
// detail
enum class e_server_status
{
initializing = 0,
running = 1,
stopped = 2,
MAX_SIGNAL_TYPES
};
};
}

View File

@ -0,0 +1,24 @@
#include <stdarg.h>
#include <cstdio>
#include "core/utils/assertion_macros.h"
void assert_impl(char const* file, int line, char const* function, char const* message)
{
fprintf(stderr, "\n%s:%i in %s ASSERTION FAILED:\n %s\n", file, line, function, message);
__debugbreak();
}
void assert_impl(char const* file, int line, char const* function, char const* message, char const* format, ...)
{
va_list args;
va_start(args, format);
fprintf(stderr, "\n%s:%i in %s ASSERTION FAILED:\n %s ", file, line, function, message);
vfprintf(stderr, format, args);
fprintf(stderr, "\n");
fflush(stderr);
va_end(args);
__debugbreak();
}

View File

@ -1,4 +1,5 @@
//#pragma once
#include "core/utils/defines.h"
#if ENABLED(DEBUG_PROGRAM)
void assert_impl(char const* file, int line, char const* function, char const* message);

View File

@ -0,0 +1,21 @@
#pragma once
// From Cod
#ifndef ENABLE
#define ENABLE 1
#endif
#ifndef DISABLED
#define DISABLED (-1)
#endif
#ifndef ENABLE_IF
#define ENABLE_IF(x) ((x) ? 1 : -1)
#endif
#ifndef ENABLED
#define ENABLED(x) (1 / (x) == 1)
#endif
#define DEBUG_PROGRAM ENABLE_IF(_DEBUG)

View File

@ -0,0 +1,13 @@
#pragma once
#include <cstdio>
#include <cassert>
#include <csignal>
#include <stdarg.h>
#include <vector>
#include <thread>
#include <future>
#include <chrono>
#include <string>
#include <functional>
using namespace std::chrono;

View File

@ -0,0 +1,134 @@
#include <string>
#include <vector>
#include <thread>
#include "core/utils/types.h"
#include "core/utils/assertion_macros.h"
#include "core/utils/thread.h"
namespace server
{
namespace core
{
main_thread_info main_thread;
std::vector<worker_thread> worker_threads;
void purge_worker_threads()
{
for (auto& worker : worker_threads)
{
if (worker.joinable())
{
worker.join();
}
}
worker_threads.clear();
}
namespace this_thread
{
void assign_main_thread()
{
main_thread.id = std::this_thread::get_id();
this_thread::set_debug_name(main_thread.name.c_str());
}
std::thread::id this_thread::get_id()
{
return std::this_thread::get_id();
}
void set_debug_name(const std::string& new_name)
{
if (std::this_thread::get_id() == main_thread.id)
{
main_thread.name = new_name;
set_thread_name(new_name.c_str());
return;
}
for (auto& i : worker_threads)
{
if (std::this_thread::get_id() == i.id)
{
i.name = new_name;
set_thread_name(new_name.c_str());
return;
}
}
assert(false);
return;
}
std::string get_debug_name()
{
if (std::this_thread::get_id() == main_thread.id)
{
return main_thread.name;
}
for (auto& i : worker_threads)
{
if (std::this_thread::get_id() == i.id)
{
return i.name;
}
}
assert(false);
return std::string();
}
}
}
}
#ifdef _WIN32
//#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#include <thread>
const DWORD MS_VC_EXCEPTION = 0x406D1388;
#pragma pack(push,8)
typedef struct tagTHREADNAME_INFO
{
DWORD dwType; // Must be 0x1000.
LPCSTR szName; // Pointer to name (in user addr space).
DWORD dwThreadID; // Thread ID (-1=caller thread).
DWORD dwFlags; // Reserved for future use, must be zero.
} THREADNAME_INFO;
#pragma pack(pop)
void set_thread_name(uint32_t dwThreadID, const char* threadName)
{
THREADNAME_INFO info;
info.dwType = 0x1000;
info.szName = threadName;
info.dwThreadID = dwThreadID;
info.dwFlags = 0;
__try
{
RaiseException(MS_VC_EXCEPTION, 0, sizeof(info) / sizeof(ULONG_PTR), (ULONG_PTR*)&info);
}
__except (EXCEPTION_EXECUTE_HANDLER)
{
}
}
void set_thread_name(const char* threadName)
{
set_thread_name(GetCurrentThreadId(), threadName);
}
void set_thread_name(std::thread& thread, const char* threadName)
{
DWORD threadId = ::GetThreadId(static_cast<HANDLE>(thread.native_handle()));
set_thread_name(threadId, threadName);
}
#endif

View File

@ -0,0 +1,85 @@
#pragma once
namespace server
{
namespace core
{
namespace this_thread
{
void assign_main_thread();
std::thread::id get_id();
void set_debug_name(const std::string& name);
std::string get_debug_name();
};
struct main_thread_info
{
std::thread::id id;
std::string name = "main_thread";
};
struct worker_thread : public std::thread
{
std::thread::id id;
std::string name;
template<typename lambda>
worker_thread(std::string&& new_name, lambda function) : id(this->get_id()), name(std::move(new_name)), std::thread(function)
{
set_thread_name(*this, name.c_str());
}
};
extern main_thread_info main_thread;
extern std::vector<worker_thread> worker_threads;
template<typename lambda>
void spawn_worker_threads(uint32 worker_count, lambda& worker_main)
{
if (worker_count == 0)
{
worker_count = std::thread::hardware_concurrency();
}
worker_threads.reserve(worker_count);
for (uint32 i = 0; i < worker_count; i++)
{
worker_threads.emplace_back(std::string("worker_thread_") + std::to_string(i), [i, &worker_main]()
{
worker_main();
printf("%s exited.\n", worker_threads[i].name.c_str());
});
}
}
void purge_worker_threads();
}
}
#ifdef _WIN32
namespace std { class thread; }
void set_thread_name(uint32_t dwThreadID, const char* threadName);
//*
void set_thread_name(const char* threadName);
void set_thread_name(std::thread& thread, const char* threadName);
//*/
#else
void set_thread_name(std::thread* thread, const char* threadName)
{
auto handle = thread->native_handle();
pthread_setname_np(handle, threadName);
}
#include <sys/prctl.h>
void set_thread_name(const char* threadName)
{
prctl(PR_SET_NAME, threadName, 0, 0, 0);
}
#endif

View File

@ -1,37 +1,5 @@
#pragma once
#include <cstdint>
#include <cstdio>
#include <cassert>
#include <stdarg.h>
#include <vector>
#include <thread>
#include <future>
#include <chrono>
using namespace std::chrono;
// From Cod
#ifndef ENABLE
#define ENABLE 1
#endif
#ifndef DISABLED
#define DISABLED (-1)
#endif
#ifndef ENABLE_IF
#define ENABLE_IF(x) ((x) ? 1 : -1)
#endif
#ifndef ENABLED
#define ENABLED(x) (1 / (x) == 1)
#endif
#define DEBUG_PROGRAM ENABLE_IF(_DEBUG)
#include "assertion_macros.h"
// Signed base types.
typedef std::int8_t int8; // 8-bit signed.
@ -53,26 +21,3 @@ typedef std::uint16_t CHAR16; // A 16-bit character type - In-memory only. 1
typedef std::uint32_t CHAR32; // A 32-bit character type - In-memory only. 32-bit representation. Should really be char32_t but making this the generic option is easier for compilers which don't fully support C++11 yet (i.e. MSVC).
typedef WIDECHAR TCHAR; // A switchable character - In-memory only. Either ANSICHAR or WIDECHAR, depending on a licensee's requirements.
#ifdef _WIN32
namespace std { class thread; }
void set_thread_name(uint32_t dwThreadID, const char* threadName);
//*
void set_thread_name(const char* threadName);
void set_thread_name(std::thread& thread, const char* threadName);
//*/
#else
void set_thread_name(std::thread* thread, const char* threadName)
{
auto handle = thread->native_handle();
pthread_setname_np(handle, threadName);
}
#include <sys/prctl.h>
void set_thread_name(const char* threadName)
{
prctl(PR_SET_NAME, threadName, 0, 0, 0);
}
#endif

View File

@ -1,200 +0,0 @@
#include <string>
#include <memory>
#include <set>
#include <deque>
/*
#include <asio/awaitable.hpp>
#include <asio/ip/tcp.hpp>
#include <asio/co_spawn.hpp>
#include <asio/awaitable.hpp>
#include <asio/detached.hpp>
#include <asio/read_until.hpp>
#include <asio/use_awaitable.hpp>
#include <asio/redirect_error.hpp>
#include <asio/write.hpp>
*/
#include <asio.hpp>
/*
class chat_participant
{
public:
virtual ~chat_participant() {}
virtual void deliver(const std::string& msg) = 0;
};
class chat_room
{
public:
void join(std::shared_ptr<chat_participant> participant)
{
participants_.insert(participant);
for (auto msg : recent_msgs_)
participant->deliver(msg);
}
void leave(std::shared_ptr<chat_participant> participant)
{
participants_.erase(participant);
}
void deliver(const std::string& msg)
{
recent_msgs_.push_back(msg);
while (recent_msgs_.size() > max_recent_msgs)
recent_msgs_.pop_front();
for (auto participant : participants_)
participant->deliver(msg);
}
private:
std::set<std::shared_ptr<chat_participant>> participants_;
enum { max_recent_msgs = 100 };
std::deque<std::string> recent_msgs_;
};
class chat_session
: public chat_participant,
public std::enable_shared_from_this<chat_session>
{
public:
chat_session(asio::ip::tcp::socket socket, chat_room& room)
: socket_(std::move(socket)),
timer_(socket_.get_executor()),
room_(room)
{
timer_.expires_at(std::chrono::steady_clock::time_point::max());
}
void start()
{
room_.join(shared_from_this());
asio::co_spawn(socket_.get_executor(),
[self = shared_from_this()]{ return self->reader(); },
asio::detached);
asio::co_spawn(socket_.get_executor(),
[self = shared_from_this()]{ return self->writer(); },
asio::detached);
}
void deliver(const std::string& msg)
{
write_msgs_.push_back(msg);
timer_.cancel_one();
}
private:
asio::awaitable<void> reader()
{
try
{
for (std::string read_msg;;)
{
std::size_t n = co_await asio::async_read_until(socket_,
asio::dynamic_buffer(read_msg, 1024), "\n", asio::use_awaitable);
room_.deliver(read_msg.substr(0, n));
read_msg.erase(0, n);
}
}
catch (std::exception&)
{
stop();
}
}
asio::awaitable<void> writer()
{
try
{
while (socket_.is_open())
{
if (write_msgs_.empty())
{
asio::error_code ec;
co_await timer_.async_wait(redirect_error(asio::use_awaitable, ec));
}
else
{
co_await asio::async_write(socket_,
asio::buffer(write_msgs_.front()), asio::use_awaitable);
write_msgs_.pop_front();
}
}
}
catch (std::exception&)
{
stop();
}
}
void stop()
{
room_.leave(shared_from_this());
socket_.close();
timer_.cancel();
}
asio::ip::tcp::socket socket_;
asio::steady_timer timer_;
chat_room& room_;
std::deque<std::string> write_msgs_;
};
chat_room room;
asio::awaitable<void> listener(asio::ip::tcp::acceptor acceptor)
{
for (;;)
{
auto session = std::make_shared<chat_session>(
co_await acceptor.async_accept(asio::use_awaitable),
room
);
session->start();
}
}
//*/
#include <asio.hpp>
#include <exception>
#include <iostream>
/*
int main()
{
try
{
asio::io_context io_context;
unsigned short port = 9006;
co_spawn(io_context,
[&io_context, port]
{
return listener(asio::ip::tcp::acceptor(io_context, { asio::ip::tcp::v4(), port }));
},
asio::detached);
asio::signal_set signals(io_context, SIGINT, SIGTERM);
signals.async_wait([&](auto, auto) { io_context.stop(); });
io_context.run();
}
catch (std::exception & e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
}
*/

View File

@ -1,70 +0,0 @@
#include "common.h"
void assert_impl(char const* file, int line, char const* function, char const* message)
{
fprintf(stderr, "\n%s:%i in %s ASSERTION FAILED:\n %s\n", file, line, function, message);
__debugbreak();
}
void assert_impl(char const* file, int line, char const* function, char const* message, char const* format, ...)
{
va_list args;
va_start(args, format);
fprintf(stderr, "\n%s:%i in %s ASSERTION FAILED:\n %s ", file, line, function, message);
vfprintf(stderr, format, args);
fprintf(stderr, "\n");
fflush(stderr);
va_end(args);
__debugbreak();
}
#ifdef _WIN32
//#define WIN32_LEAN_AND_MEAN
#include <windows.h>
#include <thread>
const DWORD MS_VC_EXCEPTION = 0x406D1388;
#pragma pack(push,8)
typedef struct tagTHREADNAME_INFO
{
DWORD dwType; // Must be 0x1000.
LPCSTR szName; // Pointer to name (in user addr space).
DWORD dwThreadID; // Thread ID (-1=caller thread).
DWORD dwFlags; // Reserved for future use, must be zero.
} THREADNAME_INFO;
#pragma pack(pop)
void set_thread_name(uint32_t dwThreadID, const char* threadName)
{
THREADNAME_INFO info;
info.dwType = 0x1000;
info.szName = threadName;
info.dwThreadID = dwThreadID;
info.dwFlags = 0;
__try
{
RaiseException(MS_VC_EXCEPTION, 0, sizeof(info) / sizeof(ULONG_PTR), (ULONG_PTR*)&info);
}
__except (EXCEPTION_EXECUTE_HANDLER)
{
}
}
void set_thread_name(const char* threadName)
{
set_thread_name(GetCurrentThreadId(), threadName);
}
void set_thread_name(std::thread& thread, const char* threadName)
{
DWORD threadId = ::GetThreadId(static_cast<HANDLE>(thread.native_handle()));
set_thread_name(threadId, threadName);
}
#endif

View File

@ -1,108 +0,0 @@
#pragma once
namespace server
{
namespace core
{
struct tcp_connection
{
int id;
asio::ip::address address;
uint16 port;
bool marked_for_delete;
asio::ip::tcp::socket socket;
std::vector<uint8> rdata;
std::vector<uint8> wdata;
tcp_connection(asio::ip::tcp::socket&& new_socket);
void shutdown_and_close();
template<typename buffer_t>
auto async_recv(tcp_connection& connection, buffer_t buffer);
std::future<void> begin_async_recv();
};
struct connection_pool : public std::vector<tcp_connection*>
{
std::string ip;
unsigned short port;
asio::io_context& io_context;
uint32 conneciton_count = 0;
connection_pool(asio::io_context& io_context, std::string ip, uint16 port, uint16 max_connection_count);
};
template<typename connection_t, typename buffer_t>
struct async_recv_frame
{
connection_t& connection;
buffer_t buffer;
asio::error_code error_code;
size_t return_value;
async_recv_frame(connection_t& connection, buffer_t&& buffer) : connection(connection), buffer(std::move(buffer)) {}
bool await_ready()
{
return false;
}
void await_suspend(std::experimental::coroutine_handle<> coro)
{
connection.socket.async_read_some(buffer, [this, coro](auto error_code, size_t bytes_read)
{
this->error_code = error_code;
printf("%lld bytes read from connection %d\n", bytes_read, connection.id);
return_value = bytes_read;
coro.resume();
});
/*
asio::co_spawn(io_context,
[this]()
{
return async_read();
}, asio::detached);
co_await socket.async_receive(buffer, [this, coro](auto error_code, auto bytes_read)
{
coro.resume();
});
asio::async_read_until(socket, buffer, "\n", [this, coro](auto error_code, auto bytes_read) {
this->error_code = error_code;
coro.resume();
});
asio::async_read(socket, buffer, [this, coro](auto error_code, auto bytes_read) {
this->error_code = error_code;
coro.resume();
});
*/
//socket.async_receive(buffer, [this, coro](auto error_code) { this->error_code = error_code; coro.resume(); });
}
size_t await_resume()
{
if (error_code)
{
throw asio::system_error(error_code);
}
return return_value;
}
};
template<typename buffer_t>
auto tcp_connection::async_recv(tcp_connection& connection, buffer_t buffer)
{
return async_recv_frame<tcp_connection, buffer_t>(connection, std::forward<buffer_t>(buffer));
}
}
}

View File

@ -1,41 +0,0 @@
#pragma once
//#include "docopt.h"
//#include "common.h"
//#include "asio.hpp"
//#include "asio/io_context.hpp"
using namespace std::chrono;
namespace asio
{
class io_context;
}
namespace server
{
extern volatile int signal_status;
namespace core
{
extern asio::io_context io_context;
void main_loop();
void init_signals(::std::function<void(int)> custom_signal_handler);
void init_io_context(int thread_count);
void init_gui();
void render_gui();
void stop_gui();
//void (*init)(std::map<std::string, docopt::value>) = [](std::map<std::string, docopt::value>) { assert(false); };
//void (*stop)() = []() { assert(false); };
};
}

View File

@ -1,154 +0,0 @@
// Example code for blog post 'Understanding Awaitables'
//
// Copyright (c) Lewis Baker
#include <experimental/coroutine>
#include <atomic>
class async_manual_reset_event
{
public:
async_manual_reset_event(bool initiallySet = false) noexcept;
// No copying/moving
async_manual_reset_event(const async_manual_reset_event&) = delete;
async_manual_reset_event(async_manual_reset_event&&) = delete;
async_manual_reset_event& operator=(const async_manual_reset_event&) = delete;
async_manual_reset_event& operator=(async_manual_reset_event&&) = delete;
bool is_set() const noexcept;
struct awaiter;
awaiter operator co_await() const noexcept;
void set() noexcept;
void reset() noexcept;
private:
friend struct awaiter;
// - 'this' => set state
// - otherwise => not set, head of linked list of awaiter*.
mutable std::atomic<void*> m_state;
};
struct async_manual_reset_event::awaiter
{
awaiter(const async_manual_reset_event& event) noexcept
: m_event(event)
{}
bool await_ready() const noexcept;
bool await_suspend(std::experimental::coroutine_handle<> awaitingCoroutine) noexcept;
void await_resume() noexcept {}
private:
friend struct async_manual_reset_event;
const async_manual_reset_event& m_event;
std::experimental::coroutine_handle<> m_awaitingCoroutine;
awaiter* m_next;
};
bool async_manual_reset_event::awaiter::await_ready() const noexcept
{
return m_event.is_set();
}
bool async_manual_reset_event::awaiter::await_suspend(
std::experimental::coroutine_handle<> awaitingCoroutine) noexcept
{
// Special m_state value that indicates the event is in the 'set' state.
const void* const setState = &m_event;
// Stash the handle of the awaiting coroutine.
m_awaitingCoroutine = awaitingCoroutine;
// Try to atomically push this awaiter onto the front of the list.
void* oldValue = m_event.m_state.load(std::memory_order_acquire);
do
{
// Resume immediately if already in 'set' state.
if (oldValue == setState) return false;
// Update linked list to point at current head.
m_next = static_cast<awaiter*>(oldValue);
// Finally, try to swap the old list head, inserting this awaiter
// as the new list head.
} while (!m_event.m_state.compare_exchange_weak(
oldValue,
this,
std::memory_order_release,
std::memory_order_acquire));
// Successfully enqueued. Remain suspended.
return true;
}
async_manual_reset_event::async_manual_reset_event(
bool initiallySet) noexcept
: m_state(initiallySet ? this : nullptr)
{}
bool async_manual_reset_event::is_set() const noexcept
{
return m_state.load(std::memory_order_acquire) == this;
}
void async_manual_reset_event::reset() noexcept
{
void* oldValue = this;
m_state.compare_exchange_strong(oldValue, nullptr, std::memory_order_acquire);
}
void async_manual_reset_event::set() noexcept
{
// Needs to be 'release' so that subsequent 'co_await' has
// visibility of our prior writes.
// Needs to be 'acquire' so that we have visibility of prior
// writes by awaiting coroutines.
void* oldValue = m_state.exchange(this, std::memory_order_acq_rel);
if (oldValue != this)
{
// Wasn't already in 'set' state.
// Treat old value as head of a linked-list of waiters
// which we have now acquired and need to resume.
auto* waiters = static_cast<awaiter*>(oldValue);
while (waiters != nullptr)
{
// Read m_next before resuming the coroutine as resuming
// the coroutine will likely destroy the awaiter object.
auto* next = waiters->m_next;
waiters->m_awaitingCoroutine.resume();
waiters = next;
}
}
}
async_manual_reset_event::awaiter
async_manual_reset_event::operator co_await() const noexcept
{
return awaiter{ *this };
}
// A simple task-class for void-returning coroutines.
struct task
{
struct promise_type
{
task get_return_object() { return {}; }
std::experimental::suspend_never initial_suspend() { return {}; }
std::experimental::suspend_never final_suspend() { return {}; }
void return_void() {}
void unhandled_exception() {}
};
};
task example(async_manual_reset_event& event)
{
co_await event;
}

View File

@ -1,169 +0,0 @@
#include <string>
#include "asio/write.hpp"
#include "common.h"
#include "network.h"
namespace server
{
namespace core
{
main_thread_info main_thread;
std::vector<worker_thread> worker_threads;
std::list<async_network_service> network_services;
bool network_services_online;
/*
asio::awaitable<void> echo(asio::ip::tcp::socket socket)
{
try
{
char data[1024];
for (;;)
{
std::size_t n = co_await socket.async_read_some(asio::buffer(data), asio::use_awaitable);
co_await async_write(socket, asio::buffer(data, n), asio::use_awaitable);
}
}
catch (std::exception & e)
{
std::printf("echo Exception: %s\n", e.what());
}
}
asio::awaitable<void> listener()
{
auto executor = co_await asio::this_coro::executor;
asio::ip::tcp::acceptor acceptor(executor, { asio::ip::tcp::v4(), 55555 });
for (;;)
{
asio::ip::tcp::socket socket = co_await acceptor.async_accept(asio::use_awaitable);
asio::co_spawn(executor,
[socket = std::move(socket)]() mutable
{
return echo(std::move(socket));
},
asio::detached);
}
}
*/
void spawn_worker_threads(uint32 worker_count)
{
if (worker_count == 0)
{
worker_count = std::thread::hardware_concurrency();
}
worker_threads.reserve(worker_count);
for (uint32 i = 0; i < worker_count; i++)
{
worker_threads.emplace_back(
std::string("worker_thread_") + std::to_string(i),
[i]()
{
while (!network_services_online)
{
std::this_thread::sleep_for(10ms);
}
io_context.run();
printf("%s exited.\n", worker_threads[i].name.c_str());
});
}
}
void purge_worker_threads()
{
for (auto& worker : worker_threads)
{
if (worker.joinable())
{
worker.join();
}
}
worker_threads.clear();
}
void start_network(uint32 worker_count)
{
assert(!network_services_online);
//spawn_worker_threads(worker_count);
network_services_online = true;
}
void stop_network()
{
assert(network_services_online);
for (auto& i : network_services)
{
i.stop_signal = true;
}
io_context.stop();
network_services_online = false;
}
void this_thread::assign_main_thread()
{
main_thread.id = std::this_thread::get_id();
this_thread::set_debug_name(main_thread.name.c_str());
}
std::thread::id this_thread::get_id()
{
return std::this_thread::get_id();
}
void this_thread::set_debug_name(const std::string& new_name)
{
if (std::this_thread::get_id() == main_thread.id)
{
main_thread.name = new_name;
set_thread_name(new_name.c_str());
return;
}
for (auto& i : worker_threads)
{
if (std::this_thread::get_id() == i.id)
{
i.name = new_name;
set_thread_name(new_name.c_str());
return;
}
}
assert(false);
return;
}
std::string this_thread::get_debug_name()
{
if (std::this_thread::get_id() == main_thread.id)
{
return main_thread.name;
}
for (auto& i : worker_threads)
{
if (std::this_thread::get_id() == i.id)
{
return i.name;
}
}
assert(false);
return std::string();
}
}
}

View File

@ -1,169 +0,0 @@
#pragma once
#include "asio/co_spawn.hpp"
#include "asio/steady_timer.hpp"
#include "asio/detached.hpp"
#include "asio/ip/tcp.hpp"
#include "asio/read_until.hpp"
#include "asio/read.hpp"
#include <mutex>
#include "assertion_macros.h"
#include "timer.h"
#include "connection.h"
#include "acceptor.h"
namespace server
{
namespace core
{
extern asio::io_context io_context;
namespace this_thread
{
void assign_main_thread();
std::thread::id get_id();
void set_debug_name(const std::string& name);
std::string get_debug_name();
};
struct main_thread_info
{
std::thread::id id;
std::string name = "main_thread";
};
struct worker_thread : public std::thread
{
std::thread::id id;
std::string name;
template<typename lambda>
worker_thread(std::string&& new_name, lambda function) : id(this->get_id()), name(std::move(new_name)), std::thread(function)
{
set_thread_name(*this, name.c_str());
}
};
extern main_thread_info main_thread;
extern std::vector<worker_thread> worker_threads;
struct async_network_service
{
std::string ip;
unsigned short port;
connection_pool connection_pool;
asio::ip::tcp::acceptor acceptor;
volatile bool stop_signal;
template<typename lambda>
async_network_service(std::string ip, uint16 port, uint16 max_connection_count, lambda& accept_handler) :
ip(ip), port(port), connection_pool(io_context, ip, port, max_connection_count), acceptor(io_context, { asio::ip::address::from_string(ip), port }),
stop_signal(false)
{
begin_async_accept(ip, port, accept_handler, stop_signal);
}
template<typename lambda>
std::future<void> begin_async_accept(std::string ip, unsigned short port, lambda& accept_handler, volatile bool& stop_signal)
{
try
{
while (!stop_signal)
{
auto new_connection = co_await async_accept(acceptor, connection_pool, ip, port, accept_handler);
if (new_connection)
{
//printf("new connection accepted: %d from %s:%d\n", new_connection->id, new_connection->address.to_string().c_str(), port);
new_connection->begin_async_recv();
}
else
{
//printf("new connection rejected, connection_count: %d.\n", connection_pool.conneciton_count);
}
//co_await printf("something\n");
}
}
catch (const std::exception & exception)
{
printf("exception: %s", exception.what());
}
}
};
extern std::list<async_network_service> network_services;
extern bool network_services_online;
template<typename lambda>
void spawn_network_service(std::string ip, uint16 port, uint16 max_connection_count, lambda& accept_handler)
{
assert(!network_services_online);
network_services.emplace_back(ip, port, max_connection_count, accept_handler);
}
void spawn_worker_threads(uint32 worker_count = 0);
void purge_worker_threads();
void start_network(uint32 thread_count = 0);
void stop_network();
/*
asio::awaitable<void> reader()
{
try
{
for (std::string read_msg;;)
{
std::size_t n = co_await asio::async_read_until(socket_,
asio::dynamic_buffer(read_msg, 1024), "\n", asio::use_awaitable);
room_.deliver(read_msg.substr(0, n));
read_msg.erase(0, n);
}
}
catch (std::exception&)
{
stop();
}
}
asio::awaitable<void> writer()
{
try
{
while (socket_.is_open())
{
if (write_msgs_.empty())
{
asio::error_code ec;
co_await timer_.async_wait(redirect_error(asio::use_awaitable, ec));
}
else
{
co_await asio::async_write(socket_,
asio::buffer(write_msgs_.front()), asio::use_awaitable);
write_msgs_.pop_front();
}
}
}
catch (std::exception&)
{
stop();
}
}
*/
}
}

View File

@ -1 +0,0 @@
#pragma once

View File

@ -1,16 +1,10 @@
SET( DEFINE
)
SET( INCLUDE
asio
core
docopt
imgui
)
SET( LINK
asio
core
docopt
imgui
)
create_project(CONSOLE DEFINE INCLUDE LINK)

View File

@ -1,10 +1,10 @@
#include "docopt.h"
#include "core/shared.h"
#include "common.h"
#include "core/network/network_service.h"
#include "core/network/timer.h"
#include "core.h"
#include "network.h"
#include "mmo_server.h"
@ -29,7 +29,7 @@ namespace mmo_server
auto signal_handler = [&](int signal)
{
printf("exit signal: %d.\n", signal);
mmo_server::signal_status = signal;
core::server_status = core::e_server_status::stopped;
core::io_context.stop();
};
@ -45,8 +45,17 @@ namespace mmo_server
core::spawn_network_service("127.0.0.1", port, 300, accept_handler);
//core::spawn_network_service("127.0.0.1", port + 1, 300, accept_handler);
//core::async_accept("127.0.0.1", port, accept_handler, false);
core::spawn_worker_threads(2);
core::start_network(2);
core::spawn_worker_threads(2, []()
{
while (core::server_status == core::e_server_status::initializing)
{
std::this_thread::sleep_for(10ms);
}
core::io_context.run();
});
core::start_network();
core::async_after(100s, []()
{

View File

@ -0,0 +1,13 @@
#pragma once
namespace crossgate
{
void decrypt_message()
{
}
void encrypt_message()
{
}
}