From ec38722790dabfcbdd5600b9df841015be17ad5a Mon Sep 17 00:00:00 2001 From: Joon Park Date: Fri, 6 Mar 2020 10:48:37 -0500 Subject: [PATCH] . --- Source/core/core/network/connection.cpp | 38 +- Source/core/core/network/connection.h | 429 +++++++++++++++++++- Source/core/core/network/network_service.h | 23 +- Source/core/core/network/session.cpp | 6 - Source/core/core/network/session.h | 27 -- Source/core/core/network/timer.h | 43 ++ Source/core/core/network/xg_session.cpp | 79 ++++ Source/core/core/network/xg_session.h | 63 +++ Source/core/core/shared.cpp | 8 +- Source/core/core/utils/standard_libraries.h | 4 +- Source/core/core/utils/thread.cpp | 1 + Source/core/core/utils/thread.h | 16 +- Source/xgmsv/test_server/mmo_server.cpp | 72 +++- Source/xgmsv/test_server/xg_crypto.h | 139 +++++++ Source/xgmsv/test_server/xg_packet.h | 91 ++++- 15 files changed, 927 insertions(+), 112 deletions(-) delete mode 100644 Source/core/core/network/session.cpp delete mode 100644 Source/core/core/network/session.h create mode 100644 Source/core/core/network/xg_session.cpp create mode 100644 Source/core/core/network/xg_session.h create mode 100644 Source/xgmsv/test_server/xg_crypto.h diff --git a/Source/core/core/network/connection.cpp b/Source/core/core/network/connection.cpp index b043970..94761d1 100644 --- a/Source/core/core/network/connection.cpp +++ b/Source/core/core/network/connection.cpp @@ -2,8 +2,9 @@ #include "core/shared.h" -#include "connection.h" -#include "timer.h" +#include "core/network/connection.h" +#include "core/network/xg_session.h" +#include "core/network/timer.h" namespace server { @@ -14,10 +15,13 @@ namespace server address(new_socket.remote_endpoint().address()), port(new_socket.remote_endpoint().port()), marked_for_delete(false), - socket(std::move(new_socket)) + parent(nullptr), + socket(std::move(new_socket)), + rbuffer(1024), + wbuffer(1024), + rqueue(this), + wqueue(this) { - rdata.resize(1024); - //rdata.reserve(12); } void tcp_connection::shutdown_and_close() @@ -34,26 +38,7 @@ namespace server marked_for_delete = true; } - std::future server::core::tcp_connection::async_recv_loop() - { - try - { - while (socket.is_open()) - { - size_t bytes_read = co_await async_recv(*this, asio::buffer(rdata, 1024)); - printf("connection %d:%s\n", id,/*this_thread::get_debug_name().c_str(),*/ &rdata[0]); - memset(&rdata[0], 0, bytes_read); - - } - } - catch (const std::exception& /*exception*/) - { - shutdown_and_close(); - //printf("connection %d marked for delete.\n", id); - //printf("exception: %s", exception.what()); - } - } connection_pool::connection_pool(asio::io_context& io_context, std::string ip, uint16 port, uint16 max_connection_count) : ip(ip), port(port), io_context(io_context) @@ -70,6 +55,11 @@ namespace server uint32 id = connection->id; if (connection->marked_for_delete) { + if (connection->parent) + { + connection->parent->remove_connection(connection); + } + delete connection; connection = nullptr; this->conneciton_count--; diff --git a/Source/core/core/network/connection.h b/Source/core/core/network/connection.h index 4d16c4a..97266c9 100644 --- a/Source/core/core/network/connection.h +++ b/Source/core/core/network/connection.h @@ -3,10 +3,214 @@ #include "asio/ip/tcp.hpp" #include "asio/read.hpp" +#include "core/network/timer.h" + namespace server { namespace core { + struct packet + { + struct tcp_connection* connection; + uint8* data; + uint32 length; + }; + + static const uint32 max_frame_count = 2; + + struct packet_queue + { + struct frame_data + { + std::vector data; + uint32 wpos; + uint32 wsize; + + std::vector packets; + uint32 rpos; + uint32 packet_count; + + frame_data() : wpos(0), rpos(0), wsize(0), packet_count(0) + { + data.resize(1024); + packets.resize(128); + } + }; + + frame_data frame[max_frame_count]; + + struct tcp_connection* connection; + + std::atomic state; + uint32 read_state; + uint32 write_state; + + enum packet_queue_state_bit + { + active_frame = 0, + write_busy = 1, + dirty_flag = 2, + total_bits + }; + + + packet_queue(struct tcp_connection* connection) : + connection(connection), state(0), read_state(0), write_state(0), frame{} + {} + + bool enqueue(uint8* packet_data, uint32 packet_size) + { + this->begin_write(); + + bool result = false; + + uint32 write_idx = write_state & (0x1 << packet_queue_state_bit::active_frame); + frame_data& f = frame[write_idx]; + uint32 buffer_space = f.data.size() - f.wpos; + uint32 packet_slots = f.packets.size() - f.packet_count; + + if (buffer_space >= packet_size && packet_slots >= 1) + { + memcpy(&f.data[f.wpos], packet_data, packet_size); + f.packets[f.packet_count] = { connection, &f.data[f.wpos], packet_size }; + f.wpos += packet_size; + f.packet_count++; + result = true; + } + + this->end_write(); + return result; + } + + packet* dequeue() + { + uint32 read_idx = read_state & (0x1 << packet_queue_state_bit::active_frame); + frame_data& f = frame[read_idx]; + + if (has_next()) + { + return &f.packets[f.rpos++]; + } + + return nullptr; + } + + bool requeue(packet* packet) + { + uint32 read_idx = read_state & (0x1 << packet_queue_state_bit::active_frame); + frame_data& f = frame[read_idx]; + f.rpos--; + + return &f.packets[f.rpos] == packet; + } + + bool has_next() + { + uint32 read_idx = read_state & (0x1 << packet_queue_state_bit::active_frame); + frame_data& f = frame[read_idx]; + + return f.rpos < f.packet_count; + } + + void begin_write() + { + write_state = state.fetch_add(0x1 << packet_queue_state_bit::write_busy, std::memory_order_relaxed); + assert((write_state & (0x1 << packet_queue_state_bit::write_busy)) == 0); + state.fetch_or(0x1 << packet_queue_state_bit::dirty_flag, std::memory_order_relaxed); + } + + void end_write() + { + write_state = state.fetch_sub(0x1 << packet_queue_state_bit::write_busy, std::memory_order_relaxed); + assert((write_state & (0x1 << packet_queue_state_bit::write_busy)) != 0); + } + + bool begin_read() + { + if (state.load() & (0x1 << packet_queue_state_bit::dirty_flag)) + { + + uint32 write_state = read_state ^ (0x1 << packet_queue_state_bit::active_frame); + + uint32 test_state; + do + { + test_state = read_state | (0x1 << packet_queue_state_bit::dirty_flag); + } while (!state.compare_exchange_weak(test_state, write_state)); + + return true; + } + + return false; + } + + void end_read() + { + uint32 read_idx = read_state & (0x1 << packet_queue_state_bit::active_frame); + frame_data& f = frame[read_idx]; + + f.rpos = 0; + f.wpos = 0; + f.wsize = 0; + f.packet_count = 0; + memset(&f.data[0], 0, sizeof(uint8) * f.data.size()); + memset(&f.packets[0], 0, sizeof(packet) * f.packets.size()); + + read_state ^= (0x1 << packet_queue_state_bit::active_frame); + } + }; + + struct byte_buffer + { + std::vector buffer; + uint32 capacity; + uint32 wpos; + + byte_buffer(uint32 capacity) : + buffer(capacity), + capacity(capacity), + wpos(0) + {} + + uint32 size() + { + return wpos; + } + + uint32 space() + { + return capacity - wpos; + } + + uint8* data() + { + return buffer.data(); + } + + uint8* write(void* source, uint32 size) + { + uint8* wptr = buffer.data() + wpos; + + if (this->space() > size) + { + memcpy(wptr, source, size); + wpos += size; + return wptr; + } + + return nullptr; + } + + void erase(uint32 num_erase) + { + if (wpos >= num_erase) + { + memmove(buffer.data(), buffer.data() + wpos, wpos - num_erase); + wpos -= num_erase; + } + } + }; + struct tcp_connection { int id; @@ -15,17 +219,41 @@ namespace server bool marked_for_delete; asio::ip::tcp::socket socket; - std::vector rdata; - std::vector wdata; + byte_buffer rbuffer; + byte_buffer wbuffer; + + struct xg_session* parent; + + packet_queue rqueue; + packet_queue wqueue; + async_signal wsignal; + tcp_connection(asio::ip::tcp::socket&& new_socket); void shutdown_and_close(); + bool enqueue_response(uint8* packet_data, uint32 packet_size) + { + if (!wqueue.enqueue(packet_data, packet_size)) + { + printf("enqueue failed.\n"); + return false; + } + + wsignal.fire(); + + return true; + } + //template //auto async_recv(tcp_connection& connection, buffer_t buffer); - std::future async_recv_loop(); + template + std::future async_recv_loop(lambda& receive_handler); + + template + std::future async_send_loop(lambda& send_handler); }; @@ -37,8 +265,179 @@ namespace server uint32 conneciton_count = 0; connection_pool(asio::io_context& io_context, std::string ip, uint16 port, uint16 max_connection_count); + + tcp_connection* spawn_connection(asio::ip::tcp::socket&& socket) + { + for (int id = 0; id < this->size(); id++) + { + auto& connection = this->at(id); + if (!connection) + { + this->conneciton_count++; + connection = new tcp_connection(std::move(socket)); + connection->id = id; + return connection; + } + } + + return nullptr; + } }; + template + std::future tcp_connection::async_send_loop(lambda& send_handler) + { + try + { + while (socket.is_open()) + { + packet_queue& packets = wqueue; + + if (packets.begin_read()) + { + while (packets.has_next()) + { + core::packet* packet = packets.dequeue(); + + if (wbuffer.space() < packet->length * 2) + { + assert(false); + } + uint8* data = wbuffer.write(packet->data, packet->length); + + //crossgate::xg_dispatch_packet(std::move(*packet)); + send_handler(data, packet->length); + + //std::string packet_str((char*)packet->data, packet->length); + + //packet_str = crossgate::decrypt_message(packet_str); + //printf("new packet:%s\n", packet_str.c_str()); + //xg_dispatch_packet() + } + packets.end_read(); + } + + if (wbuffer.size()) + { + uint32 bytes_transferred = co_await async_send(*this, asio::buffer(wbuffer.data(), wbuffer.size())); + + wbuffer.erase(bytes_transferred); + } + else + { + co_await wsignal; + } + } + } + catch (const std::exception& exception) + { + //shutdown_and_close(); + + //printf("connection %d marked for delete.\n", id); + printf("exception: %s", exception.what()); + } + } + + template + struct async_send_frame + { + connection_t& connection; + buffer_t buffer; + asio::error_code error_code; + uint32 return_value; + + async_send_frame(connection_t& connection, buffer_t&& buffer) : + connection(connection), buffer(std::move(buffer)), error_code(), return_value(0) {} + + bool await_ready() + { + return false; + } + + void await_suspend(std::experimental::coroutine_handle<> coro) + { + connection.socket.async_send(buffer, [this, coro](auto error_code, size_t bytes_read) + { + this->error_code = error_code; + printf("%zd bytes read from connection %d\n", bytes_read, connection.id); + return_value = (uint32)bytes_read; + coro.resume(); + }); + } + + uint32 await_resume() + { + if (error_code) + { + throw asio::system_error(error_code); + } + + return return_value; + } + }; + + template + std::future tcp_connection::async_recv_loop(lambda& receive_handler) + { + try + { + while (socket.is_open()) + { + uint8* rdata = rbuffer.data(); + uint32 bytes_read = co_await async_recv(*this, asio::buffer(rdata, 1024)); + + uint32 begin = 0; + uint32 end = 0; + uint32 index = 0; + while (index != bytes_read) + { + if (rdata[index] == '\n') + { + rdata[index] = '\0'; + + end = index; + + uint8* data = rdata + begin; + uint32 size = end - begin; + + if (strlen((char*)data) != size) + { + printf("empty string\n"); + continue; + } + else + { + receive_handler(data, size); + + if (!rqueue.enqueue(data, size)) + { + printf("enqueue failed.\n"); + } + } + begin = index + 1; + } + + index++; + } + + memset(rdata, 0, bytes_read); + } + } + catch (const std::exception& /*exception*/) + { + shutdown_and_close(); + //printf("connection %d marked for delete.\n", id); + //printf("exception: %s", exception.what()); + } + } + + + template + auto async_send(tcp_connection& connection, buffer_t buffer) + { + return async_send_frame(connection, std::forward(buffer)); + } + template struct async_recv_frame @@ -46,7 +445,7 @@ namespace server connection_t& connection; buffer_t buffer; asio::error_code error_code; - size_t return_value; + uint32 return_value; async_recv_frame(connection_t& connection, buffer_t&& buffer) : connection(connection), buffer(std::move(buffer)), error_code(), return_value(0) {} @@ -61,13 +460,13 @@ namespace server connection.socket.async_receive(buffer, [this, coro](auto error_code, size_t bytes_read) { this->error_code = error_code; - printf("%lld bytes read from connection %d\n", bytes_read, connection.id); - return_value = bytes_read; + printf("%zd bytes read from connection %d\n", bytes_read, connection.id); + return_value = (uint32)bytes_read; coro.resume(); }); } - size_t await_resume() + uint32 await_resume() { if (error_code) { @@ -113,21 +512,7 @@ namespace server { this->error_code = error_code; - - return_value = nullptr; - - for (int id = 0; id < this->connection_pool.size(); id++) - { - auto& connection = this->connection_pool[id]; - if (!connection) - { - this->connection_pool.conneciton_count++; - connection = new tcp_connection(std::move(socket)); - connection->id = id; - return_value = connection; - break; - } - } + return_value = this->connection_pool.spawn_connection(std::move(socket)); if (return_value == nullptr) { diff --git a/Source/core/core/network/network_service.h b/Source/core/core/network/network_service.h index cbf7c77..f597d56 100644 --- a/Source/core/core/network/network_service.h +++ b/Source/core/core/network/network_service.h @@ -23,19 +23,19 @@ namespace server asio::ip::tcp::acceptor acceptor; volatile bool stop_signal; - template - async_network_service(std::string ip, uint16 port, uint16 max_connection_count, lambda& accept_handler) : + template + async_network_service(std::string ip, uint16 port, uint16 max_connection_count, lambda& accept_handler, lambda2& receive_handler, lambda3& send_handler) : ip(ip), port(port), connection_pool(io_context, ip, port, max_connection_count), acceptor(io_context, { asio::ip::address::from_string(ip), port }), stop_signal(false) { - async_accept_loop(ip, port, accept_handler, stop_signal); + async_accept_loop(ip, port, accept_handler, receive_handler, send_handler, stop_signal); } - template - std::future async_accept_loop(std::string ip, unsigned short port, lambda& accept_handler, volatile bool& stop_signal); + template + std::future async_accept_loop(std::string ip, unsigned short port, lambda& accept_handler, lambda2& receive_handler, lambda3& send_handler, volatile bool& stop_signal); }; - template - std::future async_network_service::async_accept_loop(std::string ip, unsigned short port, lambda& accept_handler, volatile bool& stop_signal) + template + std::future async_network_service::async_accept_loop(std::string ip, unsigned short port, lambda& accept_handler, lambda2& receive_handler, lambda3& send_handler, volatile bool& stop_signal) { try { @@ -47,7 +47,8 @@ namespace server if (new_connection) { //printf("new connection accepted: %d from %s:%d\n", new_connection->id, new_connection->address.to_string().c_str(), port); - new_connection->async_recv_loop(); + new_connection->async_recv_loop(receive_handler); + new_connection->async_send_loop(send_handler); } else { @@ -63,11 +64,11 @@ namespace server } } - template - void spawn_network_service(std::string ip, uint16 port, uint16 max_connection_count, lambda& accept_handler) + template + void spawn_network_service(std::string ip, uint16 port, uint16 max_connection_count, lambda& accept_handler, lambda2& receive_handler, lambda3& send_handler) { assert(server_status == e_server_status::initializing); - network_services.emplace_back(ip, port, max_connection_count, accept_handler); + network_services.emplace_back(ip, port, max_connection_count, accept_handler, receive_handler, send_handler); } } } diff --git a/Source/core/core/network/session.cpp b/Source/core/core/network/session.cpp deleted file mode 100644 index 2c8ef7e..0000000 --- a/Source/core/core/network/session.cpp +++ /dev/null @@ -1,6 +0,0 @@ -#pragma once - -#include "core/shared.h" - -#include "connection.h" -#include "session.h" diff --git a/Source/core/core/network/session.h b/Source/core/core/network/session.h deleted file mode 100644 index 3b57725..0000000 --- a/Source/core/core/network/session.h +++ /dev/null @@ -1,27 +0,0 @@ -#pragma once -#include "connection.h" - -namespace server -{ - namespace core - { - enum connection_type : uint8 - { - CONNECTION_TYPE_GAME = 0, - CONNECTION_TYPE_MAIL = 1, - MAX_CONNECTION_TYPES, - }; - - struct session - { - tcp_connection* connections[MAX_CONNECTION_TYPES]; - - }; - - struct session_pool : std::vector - { - uint32 session_count; - }; - - } -} \ No newline at end of file diff --git a/Source/core/core/network/timer.h b/Source/core/core/network/timer.h index d5d7a1f..4f5f41b 100644 --- a/Source/core/core/network/timer.h +++ b/Source/core/core/network/timer.h @@ -50,6 +50,49 @@ namespace server } }; + struct async_signal_frame + { + asio::steady_timer timer; + asio::error_code error_code; + + async_signal_frame() : timer(io_context) {} + + ~async_signal_frame() + { + timer.cancel(); + } + + bool await_ready() + { + return false; + } + + void await_suspend(std::experimental::coroutine_handle<> coro) + { + timer.expires_at(std::chrono::steady_clock::time_point::max()); + timer.async_wait([this, coro](auto error_code) { this->error_code = error_code; coro.resume(); }); + } + + void await_resume() + { + switch (error_code.value()) + { + case 995: + // signal fired by calling timer.cancel_one() + break; + default: + throw asio::system_error(error_code); + } + } + + void fire() + { + timer.cancel_one(); + } + }; + + typedef async_signal_frame async_signal; + template auto async_timer(duration delay) { diff --git a/Source/core/core/network/xg_session.cpp b/Source/core/core/network/xg_session.cpp new file mode 100644 index 0000000..6ba56b0 --- /dev/null +++ b/Source/core/core/network/xg_session.cpp @@ -0,0 +1,79 @@ +#pragma once + +#include "core/shared.h" + +#include "core/network/xg_session.h" +#include "core/network/connection.h" + +#include "core/network/timer.h" + +namespace server +{ + namespace core + { + struct xg_session_pool xg_session_pool; + + xg_session::xg_session(tcp_connection* connection, connection_type type) : + id(-1), + marked_for_delete(false), + connections{} + { + connection->parent = this; + connections[(uint32)type] = connection; + } + + xg_session_pool::xg_session_pool() : + session_count(0) + { + async_every(1000ms, [this]() + { + + int sessions_purged = 0; + for (auto& session : *this) + { + if (session) + { + uint32 id = session->id; + + if (!session->has_connection()) + { + + delete session; + session = nullptr; + this->session_count--; + sessions_purged++; + } + } + } + + if (sessions_purged) + { + printf("%d sessions purged.\n", sessions_purged); + } + + }); + } + + xg_session* xg_session_pool::spawn_session(tcp_connection* connection, connection_type type) + { + for (int id = 0; id < this->size(); id++) + { + auto& new_session = this->at(id); + if (!new_session) + { + this->session_count++; + new_session = new xg_session(connection, type); + new_session->id = id; + return new_session; + } + } + + return nullptr; + } + + void init_user_session_pool(uint32 max_session_count) + { + xg_session_pool.resize(max_session_count); + } + } +} diff --git a/Source/core/core/network/xg_session.h b/Source/core/core/network/xg_session.h new file mode 100644 index 0000000..f2e204e --- /dev/null +++ b/Source/core/core/network/xg_session.h @@ -0,0 +1,63 @@ +#pragma once +//#include "core/network/connection.h" + +namespace server +{ + namespace core + { + static const uint32 max_session_count = 300; + + extern struct xg_session_pool xg_session_pool; + + enum class connection_type : uint8 + { + CONNECTION_TYPE_GAME = 0, + CONNECTION_TYPE_MAIL = 1, + MAX_CONNECTION_TYPES, + }; + + struct xg_session + { + uint32 id; + bool marked_for_delete; + + struct tcp_connection* connections[(uint32)connection_type::MAX_CONNECTION_TYPES]; + + xg_session(tcp_connection* connection, connection_type type); + + void remove_connection(tcp_connection* connection) + { + for (uint32 i = 0; i < (uint32)connection_type::MAX_CONNECTION_TYPES; i++) + { + if (connections[i] == connection) + { + connections[i] = nullptr; + } + } + } + + bool has_connection() + { + for (uint32 i = 0; i < (uint32)connection_type::MAX_CONNECTION_TYPES; i++) + { + if (connections[i]) + { + return true; + } + } + + return false; + } + }; + + struct xg_session_pool : std::vector + { + uint32 session_count; + + xg_session_pool(); + xg_session* spawn_session(tcp_connection* connection, connection_type type); + }; + + void init_user_session_pool(uint32 max_session_count); + } +} \ No newline at end of file diff --git a/Source/core/core/shared.cpp b/Source/core/core/shared.cpp index 250172c..75129b5 100644 --- a/Source/core/core/shared.cpp +++ b/Source/core/core/shared.cpp @@ -8,7 +8,6 @@ #include "core/shared.h" - namespace server { @@ -27,12 +26,13 @@ namespace server try { - + /* while (server_status == e_server_status::running) { - Sleep(100); + std::this_thread::sleep_for(1000ms); } - //io_context.run(); + */ + io_context.run(); } catch (std::exception & e) diff --git a/Source/core/core/utils/standard_libraries.h b/Source/core/core/utils/standard_libraries.h index d4e497b..8f35580 100644 --- a/Source/core/core/utils/standard_libraries.h +++ b/Source/core/core/utils/standard_libraries.h @@ -7,7 +7,9 @@ #include #include #include -#include #include #include +#include +#include + using namespace std::chrono; \ No newline at end of file diff --git a/Source/core/core/utils/thread.cpp b/Source/core/core/utils/thread.cpp index f2ad807..adf6b86 100644 --- a/Source/core/core/utils/thread.cpp +++ b/Source/core/core/utils/thread.cpp @@ -12,6 +12,7 @@ namespace server { namespace core { + // public main_thread_info main_thread; std::vector worker_threads; diff --git a/Source/core/core/utils/thread.h b/Source/core/core/utils/thread.h index fc157f1..d5b34aa 100644 --- a/Source/core/core/utils/thread.h +++ b/Source/core/core/utils/thread.h @@ -4,6 +4,14 @@ namespace server { namespace core { + // public + extern struct main_thread_info main_thread; + extern std::vector worker_threads; + + template + void spawn_worker_threads(uint32 worker_count, lambda& worker_main); + void purge_worker_threads(); + namespace this_thread { void assign_main_thread(); @@ -13,6 +21,7 @@ namespace server }; + // detail struct main_thread_info { std::thread::id id; @@ -24,18 +33,13 @@ namespace server std::thread::id id; std::string name; - template worker_thread(std::string&& new_name, lambda function) : id(this->get_id()), name(std::move(new_name)), std::thread(function) { set_thread_name(*this, name.c_str()); } - }; - extern main_thread_info main_thread; - extern std::vector worker_threads; - template void spawn_worker_threads(uint32 worker_count, lambda& worker_main) { @@ -55,8 +59,6 @@ namespace server }); } } - - void purge_worker_threads(); } } diff --git a/Source/xgmsv/test_server/mmo_server.cpp b/Source/xgmsv/test_server/mmo_server.cpp index c35ed32..d4c21ce 100644 --- a/Source/xgmsv/test_server/mmo_server.cpp +++ b/Source/xgmsv/test_server/mmo_server.cpp @@ -4,10 +4,14 @@ #include "core/network/network_service.h" #include "core/network/timer.h" +#include "core/network/xg_session.h" #include "mmo_server.h" +#include "xg_crypto.h" +#include "xg_packet.h" + #pragma warning(push) #pragma warning(disable : 26444) @@ -39,12 +43,36 @@ namespace mmo_server auto accept_handler = [](core::tcp_connection* new_connection) { printf("connection %d accepted\n", new_connection->id); + auto session = core::xg_session_pool.spawn_session(new_connection, core::connection_type::CONNECTION_TYPE_GAME); + + if (!session) + { + printf("session spawn failed.\n"); + } + }; + + auto receive_handler = [](uint8*& data, uint32& bytes_read) + { + crossgate::decrypt_message((char*&)data, bytes_read); + + }; + + auto send_handler = [](uint8*& data, uint32& bytes_to_send) + { + crossgate::decrypt_message((char*&)data, bytes_to_send); + }; unsigned short port = 9030; - core::spawn_network_service("127.0.0.1", port, 300, accept_handler); + uint32 max_session_count = 300; + + core::init_user_session_pool(max_session_count); + + core::spawn_network_service("127.0.0.1", port, 300, accept_handler, receive_handler, send_handler); //core::spawn_network_service("127.0.0.1", port + 1, 300, accept_handler); //core::async_accept("127.0.0.1", port, accept_handler, false); + + /* core::spawn_worker_threads(2, []() { while (core::server_status == core::e_server_status::initializing) @@ -54,15 +82,55 @@ namespace mmo_server core::io_context.run(); }); + */ core::start_network(); - core::async_after(100s, []() + core::async_after(200s, []() { printf("server network stopped.\n"); core::stop_network(); }); + //char packet4[] = "MLVlll6KdKyJmGKKII5pnmuNyMUp31qWQ7QqzcfGII5pnmuNN8X5wcMqzCuyJMAX2"; + //crossgate::decrypt_message(packet4); + + core::async_every(1000ms, []() + { + if (core::server_status == core::e_server_status::running) + { + for (auto& session : core::xg_session_pool) + { + if (session) + { + core::packet_queue& packets = session->connections[0]->rqueue; + + if (!packets.begin_read()) + { + continue; + } + + while (packets.has_next()) + { + core::packet* packet = packets.dequeue(); + crossgate::xg_dispatch_packet(std::move(*packet)); + + //std::string packet_str((char*)packet->data, packet->length); + + + + //packet_str = crossgate::decrypt_message(packet_str); + //printf("new packet:%s\n", packet_str.c_str()); + //xg_dispatch_packet() + } + + + packets.end_read(); + } + } + } + }); + //core::async([]() {printf("async_task\n"); }); /* core::async_after(5s, []() {printf("async_task\n"); }); diff --git a/Source/xgmsv/test_server/xg_crypto.h b/Source/xgmsv/test_server/xg_crypto.h new file mode 100644 index 0000000..f5a459e --- /dev/null +++ b/Source/xgmsv/test_server/xg_crypto.h @@ -0,0 +1,139 @@ +#pragma once + +namespace server +{ + namespace crossgate + { +#define SEED 4595 + char mapping_table[] = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+-"; + + // decrypt stage 1 + int util_64to256(char* dst, const char* src, char* table) + { + unsigned int dw, dwcounter, i; + char* ptr = NULL; + + dw = 0; + dwcounter = 0; + if (!dst || !src || !table) return 0; + for (i = 0; i < strlen(src); i++) { + ptr = (char*)strchr(table, src[i]); + if (!ptr) return 0; + if (i % 4) { + dw = ((unsigned int)(ptr - table) & 0x3f) << (32 - 6 - ((4 - (i % 4)) * 2)) | dw; + dst[dwcounter++] = (dw & 0xff000000) >> (32 - 8); + dw = dw << 8; + } + else { + dw = ((unsigned int)(ptr - table) & 0x3f) << (32 - 6); + } + } + if (dw) dst[dwcounter++] = dw & 0xff; + dst[dwcounter] = '\0'; + return dwcounter; + } + + // decrypt stage 2 + int remove_salt(char* dst, const char* src, size_t length) + { + // remove conditional salt + int seed = SEED % length; //38 when message_size = 49 + int sum = src[seed] & 0x000000ff; + for (int i = 0; i < length; i++) + { + if (i < seed) + { + dst[i] = src[i] - sum * (i * i % 3); + } + + if (i == seed) + { + //packet3[i] = sum; + } + + if (i > seed) + { + dst[i - 1] = src[i] - sum * (i * i % 7); + } + } + + return length - 1; + } + + void remove_conditional_bit_reverse(char* dst, const char* src, size_t length) + { + // remove conditional bit reverse, seed = 4595 + for (int i = 0; i < length; i++) + { + //sum += packet1[i]; + if (SEED % 7 == i % 5 || SEED % 2 == i % 2) + { + dst[i] = ~src[i]; + } + else + { + dst[i] = src[i]; + } + int result = length; + } + } + + std::string decrypt_message_str(std::string packet) + { + std::string stage_1(packet.length(), 0); + + // 6 bit to 8 bit string + int size = util_64to256(stage_1.data(), packet.c_str(), mapping_table); + // important trim right here + stage_1 = stage_1.substr(0, size - 1); + + std::string stage_2(stage_1.length(), 0); + size = remove_salt(stage_2.data(), stage_1.c_str(), stage_1.length()); + stage_2 = stage_2.substr(0, strlen(stage_2.data())); + + std::string stage_3(stage_2.length(), 0); + remove_conditional_bit_reverse(stage_3.data(), stage_2.c_str(), stage_2.length()); + stage_3 = stage_3.substr(1, strlen(stage_3.data()) - 1); + + + return stage_3; + } + + void decrypt_message(char*& data, uint32& size) + { + // 6 bit to 8 bit string + size = util_64to256(data, data, mapping_table); + + data[strlen(data) - 1] = '\0'; + + size = remove_salt(data, data, strlen(data)); + remove_conditional_bit_reverse(data, data, strlen(data)); + + data[strlen(data) - 1] = '\0'; + + data++; + size = strlen(data) + 1; + + + // important trim right here + /* + stage_1 = stage_1.substr(0, size - 1); + + std::string stage_2(stage_1.length(), 0); + stage_2 = stage_2.substr(0, strlen(stage_2.data())); + + std::string stage_3(stage_2.length(), 0); + remove_conditional_bit_reverse(stage_3.data(), stage_2.c_str(), stage_2.length()); + stage_3 = stage_3.substr(1, strlen(stage_3.data()) - 1); + + return stage_3; + */ + + } + + void encrypt_message(char*& data, uint32& size) + { + + } + } +} \ No newline at end of file diff --git a/Source/xgmsv/test_server/xg_packet.h b/Source/xgmsv/test_server/xg_packet.h index a7ccca7..4531213 100644 --- a/Source/xgmsv/test_server/xg_packet.h +++ b/Source/xgmsv/test_server/xg_packet.h @@ -1,13 +1,88 @@ #pragma once -namespace crossgate + +namespace server { - void decrypt_message() + namespace crossgate { + struct xg_packet : public core::packet + { + xg_packet(core::packet&& packet) : + core::packet(packet) + {} + }; + + void handle_echo(xg_packet* packet) + { + auto connection = packet->connection; + + char echo_response[] = "Echo nr "; + + if (!connection->enqueue_response((uint8*)echo_response, sizeof(echo_response))) + { + printf("handle_echo failed.\n"); + } + + printf("handle_echo!\n"); + } + + void handle_fc(xg_packet* packet) + { + printf("handle_fc!\n"); + + } + + void handle_client_login(xg_packet* packet) + { + printf("handle_client_login!\n"); + } + + struct opcode_entry + { + enum xg_opcode opcode; + const char* opstring; + uint32 opstring_size; + void (*handler)(xg_packet* packet); + }; + + enum xg_opcode : uint16 + { + XG_FC, + XG_ECHO, + XG_CLIENT_LOGIN, + XG_OPCODE_COUNT + }; + +#define add_opcode_entry(opcode, opstring, handler) \ + { opcode, opstring, sizeof(opstring) / sizeof(char) - 1, handler } + + opcode_entry xg_opcode_table[XG_OPCODE_COUNT] = + { + add_opcode_entry(XG_FC, "FC", &handle_fc), + add_opcode_entry(XG_ECHO, "Echo", &handle_echo), + add_opcode_entry(XG_CLIENT_LOGIN, "ClientLogin", &handle_client_login) + }; + + template + int mystrncmp(const T* a, const T(&b)[N]) + { + return _tcsnccmp(a, b, N - 1); + } + + void xg_dispatch_packet(core::packet&& packet) + { + xg_packet xg_packet(std::move(packet)); + + for (uint32 i = 0; i < XG_OPCODE_COUNT; i++) + { + auto opstring = xg_opcode_table[i].opstring; + auto opstring_size = xg_opcode_table[i].opstring_size; + if (!strncmp((char*)packet.data, opstring, opstring_size)) + { + auto handler = xg_opcode_table[i].handler; + handler(&xg_packet); + } + } + } } - - void encrypt_message() - { - - } -} \ No newline at end of file +}