From 63c0f8b46496fba2bd116a06a93cb19f685bee85 Mon Sep 17 00:00:00 2001 From: anulax1225 Date: Mon, 24 Feb 2025 02:20:47 +0100 Subject: [PATCH] Working TCP proxy --- app/src/app.cpp | 344 ++++++++++++++++++++++++++++++++++++++--------- app/src/config.h | 18 +++ 2 files changed, 300 insertions(+), 62 deletions(-) create mode 100644 app/src/config.h diff --git a/app/src/app.cpp b/app/src/app.cpp index 66bc54d..f3f52c6 100644 --- a/app/src/app.cpp +++ b/app/src/app.cpp @@ -1,90 +1,310 @@ +#include "bakanet/core/ip_address.h" +#include "bakanet/core/ip_protocol.h" +#include "bakanet/core/socket.h" +#include "bakatools/container/types.h" +#include "bakatools/logging/assert.h" +#include "bakatools/logging/log.h" #include #include -#include +#include +#include +#include #include +#include +#include -#define MIN_PORT 10000 -#define MAX_PORT 10005 -#define MAX_EVENTS 10 -#define RANGE MAX_PORT - MIN_PORT -#define TARGET "127.0.0.1" +#include "config.h" using namespace Bk; using namespace Bk::Net; +int setnonblocking(int sock) +{ + int result; + int flags; + + flags = ::fcntl(sock, F_GETFL, 0); + + if (flags == -1) + { + return -1; // error + } + + flags |= O_NONBLOCK; + + result = fcntl(sock , F_SETFL , flags); + return result; +} + struct Binder { Scope src; - int port = 0; + u16 listener = 0; + u16 target = 0; - Binder(Socket* src, int port) - :src(src), port(port) { } + Binder(Socket* src, u16 listener, u16 target) + :src(src), listener(listener), target(target) { } }; -int main() + +struct Bridge { - // Log::Init("Bakanet"); - // //hostent* h = gethostbyname("localhost"); - // //BK_INFO(std::string(h->h_addr_list[0], h->h_length)); - // IpAddress ip("127.0.0.1"); - // HttpServer server(ip, 8080); - // server.get("/", [](HttpRequest& req) - // { - // HttpReponse res(HTTP_RES_200, req.version); - // res.body = "

Bakanet

"; - // res.body += "

Working http server

"; - // return res; - // }); - // server.start(); - // return 0; + Scope downstream, upstream; - - Log::Init("Bakanet"); - IpAddress src; - ThreadPool pool(5); + Bridge(Socket* upstream, Socket* downstream) + :upstream(upstream), downstream(downstream) { } +}; - std::vector sockets; - sockets.reserve(RANGE + 1); +using PollCallback = std::function; - struct epoll_event events[MAX_EVENTS]; - int mtpxInstance = epoll_create(1); - BK_ASSERT(mtpxInstance != -1); +enum class PollEvent : i64 +{ + None = 0, + Accept = EPOLLIN | EPOLLET, + Readable = EPOLLIN | EPOLLET, + Writable = EPOLLOUT, + Closed = EPOLLRDHUP, + Error = EPOLLERR +}; - BK_INFO("Starting binds min {0} max {1} RANGE {2}", MIN_PORT, MAX_PORT, RANGE); - for (size_t i = 0; i <= RANGE; i++) - { - int port = MIN_PORT + i; - BK_INFO("Buiding binder on port {0}", port); - sockets.insert( - sockets.begin() + i, - Binder(Socket::create(src, port, IpProtocol::TCP),port) - ); - BK_ASSERT(sockets[i].src->init()); - BK_ASSERT(sockets[i].src->start(10));; - struct epoll_event ev; - ev.events = EPOLLIN; - ev.data.fd = sockets[i].src->get_raw_socket(); - if (epoll_ctl(mtpxInstance, EPOLL_CTL_ADD, sockets[i].src->get_raw_socket(), &ev) == -1) { - BK_MSG_ASSERT(false, "epoll_ctl: listen_sock"); - exit(EXIT_FAILURE); +class EventPoll +{ + public: + EventPoll(u16 maxEvents = 10) + { + instance = epoll_create1(0); + BK_ASSERT(instance != -1); + events.resize(maxEvents); + } + + void add(int fd, int nbEvents = 0, ...) + { + va_list args; + va_start(args, nbEvents); + struct epoll_event ev; + ev.events = 0; + for(int i = 0; i < nbEvents; i++) ev.events = ev.events | va_arg(args, i64); + ev.data.fd = fd; + BK_STATIC_MSG_ASSERT(epoll_ctl(instance, EPOLL_CTL_ADD, fd, &ev) != -1, "epoll_ctl: listen_sock"); } - } - while (true) { - int waiters = epoll_wait(mtpxInstance, events, MAX_EVENTS, -1); - BK_MSG_ASSERT(waiters != -1, "epoll_ctl: listen_sock"); - for(int n = 0; n < waiters; n++) + void on(PollEvent event, PollCallback callback) { - for(auto& bind : sockets) + callbacks[event] = callback; + } + + void wait(int timeout = -1) + { + int waiters = epoll_wait(instance, events.data(), events.size(), timeout); + BK_STATIC_MSG_ASSERT(waiters != -1, "epoll_ctl: listen_sock"); + for(int i = 0; i < waiters; i++) + { + if((events[i].events & (i64)PollEvent::Closed) != 0 && callbacks[PollEvent::Closed]) callbacks[PollEvent::Closed](events[i].data.fd); + else if((events[i].events & (i64)PollEvent::Readable) != 0 && callbacks[PollEvent::Readable]) callbacks[PollEvent::Readable](events[i].data.fd); + else if((events[i].events & (i64)PollEvent::Accept) != 0 && callbacks[PollEvent::Accept]) callbacks[PollEvent::Accept](events[i].data.fd); + else if((events[i].events & (i64)PollEvent::Writable) != 0 && callbacks[PollEvent::Writable]) callbacks[PollEvent::Writable](events[i].data.fd); + else if((events[i].events & (i64)PollEvent::Error) != 0 && callbacks[PollEvent::Error]) callbacks[PollEvent::Error](events[i].data.fd); + } + } + + private: + std::vector events; + int instance = -1; + std::map callbacks; + +}; + +class Connector +{ + public: + Connector() + { + bridges.reserve(10); + ePoll.on(PollEvent::Accept, [this](int fd) + { + int index = findBridge(fd); + if(index > -1) + { + auto& bridge = bridges[index]; + if (bridge.upstream->get_raw_socket() == fd) + { + std::vector packet = bridge.upstream->obtain(MAX_PACKET_SIZE); + + if(packet.size() > 0) bridge.downstream->emit(packet); + return; + } + std::vector packet = bridge.downstream->obtain(MAX_PACKET_SIZE); + if(packet.size() > 0) bridge.upstream->emit(packet); + } + }); + + ePoll.on(PollEvent::Closed, [this](int fd) { - if (bind.src->get_raw_socket() == events[n].data.fd) { - BK_INFO("New connection on port : {0}", bind.port); - Socket* dest = bind.src->ack(); - BK_INFO("IP Address : ", dest->get_ip()); - delete dest; + int i = findBridge(fd); + BK_INFO("Deleting bridge N° {0} - Remaining bridges {1}", i, bridges.size() -1); + if(i > -1) bridges.erase(bridges.begin() + i); + }); + } + + ~Connector() + { + if(running) { + stop(); + worker.join(); + } + } + + int findBridge(int fd) + { + for(int i = 0; i < bridges.size(); i++) + { + if (bridges[i].upstream->get_raw_socket() == fd || bridges[i].downstream->get_raw_socket() == fd) { + return i; } } + return -1; + } + + void start() + { + if(!running) + { + running = true; + worker = std::thread([this] + { + while(running) + { + std::unique_lock lock(resource_mutex); + condition.wait(lock, [this] { + return !bridges.empty() || !running; + }); + + if(!running || bridges.empty()) return; + + ePoll.wait(); + } + }); + } + } + + void stop() + { + std::unique_lock lock(resource_mutex); + running = false; + condition.notify_one(); + } + + void add(Bridge bridge) + { + { + std::unique_lock lock(resource_mutex); + BK_STATIC_ASSERT(bridge.upstream->conn()); + setnonblocking(bridge.upstream->get_raw_socket()); + ePoll.add(bridge.upstream->get_raw_socket(), 2, PollEvent::Readable, PollEvent::Closed); + ePoll.add(bridge.downstream->get_raw_socket(), 2, PollEvent::Readable, PollEvent::Closed); + bridges.push_back(std::move(bridge)); + } + condition.notify_one(); } - } + + private: + std::thread worker; + + std::condition_variable condition; + std::mutex resource_mutex; + + bool running = false; + int instance; + EventPoll ePoll; + std::vector bridges; +}; + +class Initiator +{ + public: + Initiator(const std::map& portBinds) + { + IpAddress src; + binds.reserve(portBinds.size()); + + BK_INFO("Starting binds (count: {0})", portBinds.size()); + int i = 0; + for (auto& portBind : portBinds) + { + BK_INFO("{2}. Buiding binder on port {0}:{1}", portBind.first, portBind.second, i + 1); + binds.insert(binds.begin() + i, Binder( + Socket::create(src, portBind.first, IpProtocol::TCP), + portBind.first, + portBind.second) + ); + BK_ASSERT(binds[i].src->init()); + BK_ASSERT(binds[i].src->start(10)); + ePoll.add(binds[i].src->get_raw_socket(), 1, PollEvent::Accept); + i++; + } + + ePoll.on(PollEvent::Accept, [this] (int fd) + { + auto& bind = binds[findBinder(fd)]; + Socket* conn = bind.src->ack(); + Socket* target = Socket::create(IpAddress(TARGET), bind.target, IpProtocol::TCP); + BK_INFO("New connection"); + BK_INFO("Connection info :"); + BK_INFO("PORT BIND [{0}:{1}]", bind.listener, bind.target); + BK_INFO("TARGET [{0} ({1})]", target->get_ip(), target->get_raw_socket() > 0 ? "Alive" : "Dead"); + BK_INFO("CLIENT [{0} ({1})] ", conn->get_ip(), conn->get_raw_socket() > 0 ? "Alive" : "Dead"); + setnonblocking(conn->get_raw_socket()); + if(connectorPtr >= connectors.size()) connectorPtr = 0; + BK_INFO("Forwarding to connector N°{0}", connectorPtr); + connectors[connectorPtr].add(Bridge(target, conn)); + connectorPtr++; + }); + } + + int findBinder(int fd) + { + for(int i = 0; i < binds.size(); i++) + { + if (binds[i].src->get_raw_socket() == fd) { + return i; + } + } + return -1; + } + + void start() + { + running = true; + for(auto& connector : connectors) + { + connectorPtr++; + BK_INFO("Starting connector N°{0}", connectorPtr); + connector.start(); + } + while (running) { + ePoll.wait(); + } + } + + private: + bool running = false; + std::vector binds; + EventPoll ePoll; + std::array connectors; + int connectorPtr = 0; +}; + +int main() +{ + Log::Init("Bakanet"); + // BK_INFO("Epoll events types"); + // BK_INFO("EPOLLIN {0} {0:b}", (u32)EPOLLIN); + // BK_INFO("EPOLLOUT {0} {0:b}", (u32)EPOLLOUT); + // BK_INFO("EPOLLET {0} {0:b}", (u32)EPOLLET); + // BK_INFO("EPOLLHUP {0} {0:b}", (u32)EPOLLHUP); + // BK_INFO("EPOLLERR {0} {0:b}", (u32)EPOLLERR); + // BK_INFO("EPOLLRDHUP {0} {0:b}", (u32)EPOLLRDHUP); + Initiator initiator(portBinds); + initiator.start(); return 0; } \ No newline at end of file diff --git a/app/src/config.h b/app/src/config.h new file mode 100644 index 0000000..a9f4d17 --- /dev/null +++ b/app/src/config.h @@ -0,0 +1,18 @@ +#pragma once + +#include +#include "bakatools/container/types.h" + +#define CREATE_MAPPING(ports) const static std::map portBinds = { ports } +#define BIND_PORTS(listener, target) { listener, target }, + +#define MAX_EVENTS 10 +#define MAX_CONNECTIOR_WORKERS 3 +#define MAX_PACKET_SIZE 64000 +#define TARGET "192.168.1.159" + +CREATE_MAPPING( + BIND_PORTS(25080, 80) + BIND_PORTS(25443, 443) +); +