Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions include/expresso/core/io/connection.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#pragma once

#include <atomic>
#include <optional>
#include <string>
#include <sys/types.h>
#include <vector>

namespace expresso {

namespace core {

namespace io {

struct PendingFile {
int fd;
off_t offset;
off_t remaining;
};

enum class ConnectionPhase {
READING = 0,
PROCESSING = 1,
WRITING = 2,
DONE = 3,
};

struct Connection {
int fd;

std::vector<char> readBuffer;

std::string writeBuffer;
size_t writeOffset;

std::optional<PendingFile> pendingFile;

std::atomic<int> phase;

explicit Connection(int fd)
: fd(fd),
writeOffset(0),
phase(static_cast<int>(ConnectionPhase::READING)) {}

Connection(const Connection&) = delete;
Connection& operator=(const Connection&) = delete;
};

} // namespace io

} // namespace core

} // namespace expresso
35 changes: 35 additions & 0 deletions include/expresso/core/io/ipoller.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#pragma once

#include <vector>

namespace expresso {

namespace core {

namespace io {

enum class EventType {
READ = 1,
WRITE = 2,
};

struct Event {
int fd;
EventType type;
};

class IPoller {
public:
virtual ~IPoller() = default;

virtual void add(int fd, EventType type) noexcept(false) = 0;
virtual void modify(int fd, EventType type) noexcept(false) = 0;
virtual void remove(int fd) noexcept(false) = 0;
virtual std::vector<Event> wait(int timeoutMs = -1) noexcept(false) = 0;
};

} // namespace io

} // namespace core

} // namespace expresso
19 changes: 19 additions & 0 deletions include/expresso/core/io/poller.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#pragma once

#include <memory>

#include <expresso/core/io/ipoller.h>

namespace expresso {

namespace core {

namespace io {

std::unique_ptr<IPoller> makePoller() noexcept(false);

} // namespace io

} // namespace core

} // namespace expresso
36 changes: 30 additions & 6 deletions include/expresso/core/server.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
#pragma once

#include <csignal>
#include <map>
#include <memory>
#include <mutex>
#include <queue>
#include <unistd.h>

#include <json/parse.h>
#include <nexus/pool.h>

#include <expresso/core/io/connection.h>
#include <expresso/core/io/ipoller.h>
#include <expresso/core/io/poller.h>
#include <expresso/core/router.h>
#include <expresso/middleware/date.h>
#include <expresso/middleware/version.h>
Expand All @@ -17,19 +24,36 @@ namespace core {

class Server : public Router {
private:
int socket;
int serverSocket;
size_t maxConnections;
struct sockaddr_in address;

std::unique_ptr<expresso::core::io::IPoller> poller;
std::map<int, std::shared_ptr<expresso::core::io::Connection>> connections;

int wakeupPipe[2];
std::queue<int> pendingWrites;
std::mutex pendingWritesMutex;

nexus::pool threadPool;

mochios::enums::method getMethodFromString(
const std::string& method) noexcept(false);

void setupMiddlewares();
void acceptConnections();
void handleConnection(int clientSocket) noexcept(false);
void runEventLoop();
void acceptNewConnections();
void readFromConnection(
std::shared_ptr<expresso::core::io::Connection> conn);
void writeToConnection(
std::shared_ptr<expresso::core::io::Connection> conn);
void processConnection(
std::shared_ptr<expresso::core::io::Connection> conn);
void closeConnection(
std::shared_ptr<expresso::core::io::Connection> conn);

expresso::messages::Request makeRequest(std::string& request) noexcept(false);
nexus::pool threadPool;
expresso::messages::Request makeRequest(
const std::string& request) noexcept(false);

public:
Server(size_t maxConnections = SOMAXCONN,
Expand All @@ -41,4 +65,4 @@ class Server : public Router {

} // namespace core

} // namespace expresso
} // namespace expresso
9 changes: 5 additions & 4 deletions include/expresso/helpers/response.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
#pragma once

#include <fstream>
#include <iomanip>
#include <sstream>
#include <string>

#include <brewtils/os.h>
#include <brewtils/sys.h>
#include <zippuccino/crc.h>

namespace expresso {
Expand All @@ -17,10 +18,10 @@ std::string getAvailableFile(const std::string& path);

const std::string generateETag(const std::string& data);

bool sendChunkedData(const int& socket, const std::string& data);
std::string makeChunkedData(const std::string& data);

bool sendFileInChunks(const int& socket, const std::string& path);
std::string readFileAsChunked(const std::string& path);

} // namespace helpers

} // namespace expresso
} // namespace expresso
23 changes: 19 additions & 4 deletions include/expresso/messages/response.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#pragma once

#include <fstream>
#include <netinet/in.h>
#include <set>
#include <sstream>
#include <sys/types.h>

#include <brewtils/os.h>
#include <brewtils/string.h>
#include <brewtils/sys.h>
#include <json/object.h>
#include <mochios/enums/method.h>
#include <mochios/messages/response.h>
Expand All @@ -19,16 +21,25 @@ namespace expresso {
namespace messages {

class Response : public mochios::messages::Response {
public:
struct FileTransfer {
int fd = -1;
off_t offset = 0;
off_t length = 0;
};

private:
bool hasEnded;

int socket;

std::string message;
std::string writeBuffer;
std::vector<Cookie*> cookies;

void sendToClient();
void sendHeaders();
FileTransfer fileTransfer;

void buildHeaders();

public:
Response(int clientSocket);
Expand All @@ -50,9 +61,13 @@ class Response : public mochios::messages::Response {
void sendInvalidRange();

void end();
const std::string& getWriteBuffer() const;
std::string takeWriteBuffer();
bool hasPendingFile() const;
FileTransfer takeFileTransfer();
const void print() const override;
};

} // namespace messages

} // namespace expresso
} // namespace expresso
91 changes: 91 additions & 0 deletions src/core/io/epoll_poller.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#ifdef __linux__

#include <sys/epoll.h>
#include <unistd.h>

#include <logger/log.h>

#include <expresso/core/io/ipoller.h>
#include <expresso/core/io/poller.h>

namespace expresso {

namespace core {

namespace io {

class EpollPoller : public IPoller {
private:
int epollFd;
static constexpr int MAX_EVENTS = 64;

public:
EpollPoller() {
this->epollFd = epoll_create1(0);
if (this->epollFd < 0) {
logger::error(
"Failed to create epoll fd",
"expresso::core::io::EpollPoller::EpollPoller()");
}
}

~EpollPoller() {
if (this->epollFd >= 0) {
close(this->epollFd);
}
}

void add(int fd, EventType type) noexcept(false) override {
struct epoll_event ev;
ev.data.fd = fd;
ev.events = (type == EventType::READ ? EPOLLIN : EPOLLOUT) | EPOLLET;
if (epoll_ctl(this->epollFd, EPOLL_CTL_ADD, fd, &ev) < 0) {
logger::error(
"epoll_ctl ADD failed for fd " + std::to_string(fd),
"expresso::core::io::EpollPoller::add()");
}
}

void modify(int fd, EventType type) noexcept(false) override {
struct epoll_event ev;
ev.data.fd = fd;
ev.events = (type == EventType::READ ? EPOLLIN : EPOLLOUT) | EPOLLET;
if (epoll_ctl(this->epollFd, EPOLL_CTL_MOD, fd, &ev) < 0) {
logger::error(
"epoll_ctl MOD failed for fd " + std::to_string(fd),
"expresso::core::io::EpollPoller::modify()");
}
}

void remove(int fd) noexcept(false) override {
epoll_ctl(this->epollFd, EPOLL_CTL_DEL, fd, nullptr);
}

std::vector<Event> wait(int timeoutMs = -1) noexcept(false) override {
struct epoll_event events[MAX_EVENTS];
int n = epoll_wait(this->epollFd, events, MAX_EVENTS, timeoutMs);
if (n < 0) {
return {};
}
std::vector<Event> result;
result.reserve(n);
for (int i = 0; i < n; i++) {
EventType type =
(events[i].events & EPOLLIN) ? EventType::READ : EventType::WRITE;
result.push_back({events[i].data.fd, type});
}
return result;
}
};

std::unique_ptr<IPoller> makePoller() noexcept(false) {
return std::make_unique<EpollPoller>();
}

} // namespace io

} // namespace core

} // namespace expresso

#endif // __linux__
Loading
Loading