Working TCP proxy

proxy
anulax1225 ago%!(EXTRA string=2 months)
parent ccec345bcb
commit 63c0f8b464
  1. 340
      app/src/app.cpp
  2. 18
      app/src/config.h

@ -1,90 +1,310 @@
#include "bakanet/core/ip_address.h"
#include "bakanet/core/ip_protocol.h"
#include "bakanet/core/socket.h"
#include "bakatools/container/types.h"
#include "bakatools/logging/assert.h"
#include "bakatools/logging/log.h"
#include <bakatools.h>
#include <bakanet.h>
#include <cstddef>
#include <cerrno>
#include <cstdarg>
#include <functional>
#include <sys/epoll.h>
#include <thread>
#include <vector>
#define MIN_PORT 10000
#define MAX_PORT 10005
#define MAX_EVENTS 10
#define RANGE MAX_PORT - MIN_PORT
#define TARGET "127.0.0.1"
#include "config.h"
using namespace Bk;
using namespace Bk::Net;
int setnonblocking(int sock)
{
int result;
int flags;
flags = ::fcntl(sock, F_GETFL, 0);
if (flags == -1)
{
return -1; // error
}
flags |= O_NONBLOCK;
result = fcntl(sock , F_SETFL , flags);
return result;
}
struct Binder
{
Scope<Socket> src;
int port = 0;
u16 listener = 0;
u16 target = 0;
Binder(Socket* src, int port)
:src(src), port(port) { }
Binder(Socket* src, u16 listener, u16 target)
:src(src), listener(listener), target(target) { }
};
int main()
struct Bridge
{
// Log::Init("Bakanet");
// //hostent* h = gethostbyname("localhost");
// //BK_INFO(std::string(h->h_addr_list[0], h->h_length));
// IpAddress ip("127.0.0.1");
// HttpServer server(ip, 8080);
// server.get("/", [](HttpRequest& req)
// {
// HttpReponse res(HTTP_RES_200, req.version);
// res.body = "<h1>Bakanet</h1>";
// res.body += "<p>Working http server</p>";
// return res;
// });
// server.start();
// return 0;
Scope<Socket> downstream, upstream;
Bridge(Socket* upstream, Socket* downstream)
:upstream(upstream), downstream(downstream) { }
};
Log::Init("Bakanet");
IpAddress src;
ThreadPool pool(5);
using PollCallback = std::function<void(int)>;
std::vector<Binder> sockets;
sockets.reserve(RANGE + 1);
enum class PollEvent : i64
{
None = 0,
Accept = EPOLLIN | EPOLLET,
Readable = EPOLLIN | EPOLLET,
Writable = EPOLLOUT,
Closed = EPOLLRDHUP,
Error = EPOLLERR
};
struct epoll_event events[MAX_EVENTS];
int mtpxInstance = epoll_create(1);
BK_ASSERT(mtpxInstance != -1);
class EventPoll
{
public:
EventPoll(u16 maxEvents = 10)
{
instance = epoll_create1(0);
BK_ASSERT(instance != -1);
events.resize(maxEvents);
}
BK_INFO("Starting binds min {0} max {1} RANGE {2}", MIN_PORT, MAX_PORT, RANGE);
for (size_t i = 0; i <= RANGE; i++)
{
int port = MIN_PORT + i;
BK_INFO("Buiding binder on port {0}", port);
sockets.insert(
sockets.begin() + i,
Binder(Socket::create(src, port, IpProtocol::TCP),port)
);
BK_ASSERT(sockets[i].src->init());
BK_ASSERT(sockets[i].src->start(10));;
struct epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = sockets[i].src->get_raw_socket();
if (epoll_ctl(mtpxInstance, EPOLL_CTL_ADD, sockets[i].src->get_raw_socket(), &ev) == -1) {
BK_MSG_ASSERT(false, "epoll_ctl: listen_sock");
exit(EXIT_FAILURE);
void add(int fd, int nbEvents = 0, ...)
{
va_list args;
va_start(args, nbEvents);
struct epoll_event ev;
ev.events = 0;
for(int i = 0; i < nbEvents; i++) ev.events = ev.events | va_arg(args, i64);
ev.data.fd = fd;
BK_STATIC_MSG_ASSERT(epoll_ctl(instance, EPOLL_CTL_ADD, fd, &ev) != -1, "epoll_ctl: listen_sock");
}
void on(PollEvent event, PollCallback callback)
{
callbacks[event] = callback;
}
void wait(int timeout = -1)
{
int waiters = epoll_wait(instance, events.data(), events.size(), timeout);
BK_STATIC_MSG_ASSERT(waiters != -1, "epoll_ctl: listen_sock");
for(int i = 0; i < waiters; i++)
{
if((events[i].events & (i64)PollEvent::Closed) != 0 && callbacks[PollEvent::Closed]) callbacks[PollEvent::Closed](events[i].data.fd);
else if((events[i].events & (i64)PollEvent::Readable) != 0 && callbacks[PollEvent::Readable]) callbacks[PollEvent::Readable](events[i].data.fd);
else if((events[i].events & (i64)PollEvent::Accept) != 0 && callbacks[PollEvent::Accept]) callbacks[PollEvent::Accept](events[i].data.fd);
else if((events[i].events & (i64)PollEvent::Writable) != 0 && callbacks[PollEvent::Writable]) callbacks[PollEvent::Writable](events[i].data.fd);
else if((events[i].events & (i64)PollEvent::Error) != 0 && callbacks[PollEvent::Error]) callbacks[PollEvent::Error](events[i].data.fd);
}
}
private:
std::vector<struct epoll_event> events;
int instance = -1;
std::map<PollEvent, PollCallback> callbacks;
};
class Connector
{
public:
Connector()
{
bridges.reserve(10);
ePoll.on(PollEvent::Accept, [this](int fd)
{
int index = findBridge(fd);
if(index > -1)
{
auto& bridge = bridges[index];
if (bridge.upstream->get_raw_socket() == fd)
{
std::vector<char> packet = bridge.upstream->obtain(MAX_PACKET_SIZE);
if(packet.size() > 0) bridge.downstream->emit(packet);
return;
}
std::vector<char> packet = bridge.downstream->obtain(MAX_PACKET_SIZE);
if(packet.size() > 0) bridge.upstream->emit(packet);
}
});
ePoll.on(PollEvent::Closed, [this](int fd)
{
int i = findBridge(fd);
BK_INFO("Deleting bridge N° {0} - Remaining bridges {1}", i, bridges.size() -1);
if(i > -1) bridges.erase(bridges.begin() + i);
});
}
~Connector()
{
if(running) {
stop();
worker.join();
}
}
}
while (true) {
int waiters = epoll_wait(mtpxInstance, events, MAX_EVENTS, -1);
BK_MSG_ASSERT(waiters != -1, "epoll_ctl: listen_sock");
for(int n = 0; n < waiters; n++)
int findBridge(int fd)
{
for(auto& bind : sockets)
for(int i = 0; i < bridges.size(); i++)
{
if (bind.src->get_raw_socket() == events[n].data.fd) {
BK_INFO("New connection on port : {0}", bind.port);
Socket* dest = bind.src->ack();
BK_INFO("IP Address : ", dest->get_ip());
delete dest;
if (bridges[i].upstream->get_raw_socket() == fd || bridges[i].downstream->get_raw_socket() == fd) {
return i;
}
}
return -1;
}
void start()
{
if(!running)
{
running = true;
worker = std::thread([this]
{
while(running)
{
std::unique_lock<std::mutex> lock(resource_mutex);
condition.wait(lock, [this] {
return !bridges.empty() || !running;
});
if(!running || bridges.empty()) return;
ePoll.wait();
}
});
}
}
void stop()
{
std::unique_lock<std::mutex> lock(resource_mutex);
running = false;
condition.notify_one();
}
void add(Bridge bridge)
{
{
std::unique_lock<std::mutex> lock(resource_mutex);
BK_STATIC_ASSERT(bridge.upstream->conn());
setnonblocking(bridge.upstream->get_raw_socket());
ePoll.add(bridge.upstream->get_raw_socket(), 2, PollEvent::Readable, PollEvent::Closed);
ePoll.add(bridge.downstream->get_raw_socket(), 2, PollEvent::Readable, PollEvent::Closed);
bridges.push_back(std::move(bridge));
}
condition.notify_one();
}
private:
std::thread worker;
std::condition_variable condition;
std::mutex resource_mutex;
bool running = false;
int instance;
EventPoll ePoll;
std::vector<Bridge> bridges;
};
class Initiator
{
public:
Initiator(const std::map<Bk::u16, Bk::u16>& portBinds)
{
IpAddress src;
binds.reserve(portBinds.size());
BK_INFO("Starting binds (count: {0})", portBinds.size());
int i = 0;
for (auto& portBind : portBinds)
{
BK_INFO("{2}. Buiding binder on port {0}:{1}", portBind.first, portBind.second, i + 1);
binds.insert(binds.begin() + i, Binder(
Socket::create(src, portBind.first, IpProtocol::TCP),
portBind.first,
portBind.second)
);
BK_ASSERT(binds[i].src->init());
BK_ASSERT(binds[i].src->start(10));
ePoll.add(binds[i].src->get_raw_socket(), 1, PollEvent::Accept);
i++;
}
ePoll.on(PollEvent::Accept, [this] (int fd)
{
auto& bind = binds[findBinder(fd)];
Socket* conn = bind.src->ack();
Socket* target = Socket::create(IpAddress(TARGET), bind.target, IpProtocol::TCP);
BK_INFO("New connection");
BK_INFO("Connection info :");
BK_INFO("PORT BIND [{0}:{1}]", bind.listener, bind.target);
BK_INFO("TARGET [{0} ({1})]", target->get_ip(), target->get_raw_socket() > 0 ? "Alive" : "Dead");
BK_INFO("CLIENT [{0} ({1})] ", conn->get_ip(), conn->get_raw_socket() > 0 ? "Alive" : "Dead");
setnonblocking(conn->get_raw_socket());
if(connectorPtr >= connectors.size()) connectorPtr = 0;
BK_INFO("Forwarding to connector N°{0}", connectorPtr);
connectors[connectorPtr].add(Bridge(target, conn));
connectorPtr++;
});
}
int findBinder(int fd)
{
for(int i = 0; i < binds.size(); i++)
{
if (binds[i].src->get_raw_socket() == fd) {
return i;
}
}
return -1;
}
void start()
{
running = true;
for(auto& connector : connectors)
{
connectorPtr++;
BK_INFO("Starting connector N°{0}", connectorPtr);
connector.start();
}
while (running) {
ePoll.wait();
}
}
private:
bool running = false;
std::vector<Binder> binds;
EventPoll ePoll;
std::array<Connector, MAX_CONNECTIOR_WORKERS> connectors;
int connectorPtr = 0;
};
int main()
{
Log::Init("Bakanet");
// BK_INFO("Epoll events types");
// BK_INFO("EPOLLIN {0} {0:b}", (u32)EPOLLIN);
// BK_INFO("EPOLLOUT {0} {0:b}", (u32)EPOLLOUT);
// BK_INFO("EPOLLET {0} {0:b}", (u32)EPOLLET);
// BK_INFO("EPOLLHUP {0} {0:b}", (u32)EPOLLHUP);
// BK_INFO("EPOLLERR {0} {0:b}", (u32)EPOLLERR);
// BK_INFO("EPOLLRDHUP {0} {0:b}", (u32)EPOLLRDHUP);
Initiator initiator(portBinds);
initiator.start();
return 0;
}

@ -0,0 +1,18 @@
#pragma once
#include <map>
#include "bakatools/container/types.h"
#define CREATE_MAPPING(ports) const static std::map<Bk::u16, Bk::u16> portBinds = { ports }
#define BIND_PORTS(listener, target) { listener, target },
#define MAX_EVENTS 10
#define MAX_CONNECTIOR_WORKERS 3
#define MAX_PACKET_SIZE 64000
#define TARGET "192.168.1.159"
CREATE_MAPPING(
BIND_PORTS(25080, 80)
BIND_PORTS(25443, 443)
);
Loading…
Cancel
Save