Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pyx_library(
linkstatic = 1,
),
deps = [
"//cpp/fory/python:_pyfory",
"//cpp/fory/util:fory_util",
],
)
Expand Down
146 changes: 146 additions & 0 deletions cpp/fory/python/pyfory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@

#include "fory/python/pyfory.h"

#include <algorithm>
#include <cstring>
#include <exception>
#include <istream>
#include <streambuf>

static PyObject **py_sequence_get_items(PyObject *collection) {
if (PyList_CheckExact(collection)) {
return ((PyListObject *)collection)->ob_item;
Expand All @@ -29,6 +35,118 @@ static PyObject **py_sequence_get_items(PyObject *collection) {
}

namespace fory {

static std::string fetch_python_error_message() {
PyObject *type = nullptr;
PyObject *value = nullptr;
PyObject *traceback = nullptr;
PyErr_Fetch(&type, &value, &traceback);
PyErr_NormalizeException(&type, &value, &traceback);
std::string message = "python stream read failed";
if (value != nullptr) {
PyObject *value_str = PyObject_Str(value);
if (value_str != nullptr) {
const char *c_str = PyUnicode_AsUTF8(value_str);
if (c_str != nullptr) {
message = c_str;
}
Py_DECREF(value_str);
} else {
PyErr_Clear();
}
}
Py_XDECREF(type);
Py_XDECREF(value);
Py_XDECREF(traceback);
return message;
}

class PythonInputStreamBuf final : public std::streambuf {
public:
explicit PythonInputStreamBuf(PyObject *stream) : stream_(stream) {
Py_INCREF(stream_);
}

~PythonInputStreamBuf() override {
if (stream_ != nullptr) {
PyGILState_STATE gil_state = PyGILState_Ensure();
Py_DECREF(stream_);
PyGILState_Release(gil_state);
stream_ = nullptr;
}
}

bool has_error() const { return has_error_; }

const std::string &error_message() const { return error_message_; }

protected:
std::streamsize xsgetn(char *s, std::streamsize count) override {
if (count <= 0 || has_error_) {
return 0;
}
PyGILState_STATE gil_state = PyGILState_Ensure();
const Py_ssize_t requested = static_cast<Py_ssize_t>(
std::min<std::streamsize>(count, PY_SSIZE_T_MAX));
PyObject *chunk = PyObject_CallMethod(stream_, "read", "n", requested);
if (chunk == nullptr) {
has_error_ = true;
error_message_ = fetch_python_error_message();
PyGILState_Release(gil_state);
return 0;
}
Py_buffer view;
if (PyObject_GetBuffer(chunk, &view, PyBUF_CONTIG_RO) != 0) {
has_error_ = true;
error_message_ = fetch_python_error_message();
Py_DECREF(chunk);
PyGILState_Release(gil_state);
return 0;
}
const std::streamsize read_bytes = std::min<std::streamsize>(
count, static_cast<std::streamsize>(view.len));
if (read_bytes > 0) {
std::memcpy(s, view.buf, static_cast<size_t>(read_bytes));
}
PyBuffer_Release(&view);
Py_DECREF(chunk);
PyGILState_Release(gil_state);
return read_bytes;
}

int_type underflow() override {
if (gptr() < egptr()) {
return traits_type::to_int_type(*gptr());
}
if (xsgetn(&current_, 1) != 1) {
return traits_type::eof();
}
setg(&current_, &current_, &current_ + 1);
return traits_type::to_int_type(current_);
}

private:
PyObject *stream_ = nullptr;
bool has_error_ = false;
std::string error_message_;
char current_ = 0;
};

class PythonInputStream final : public std::istream {
public:
explicit PythonInputStream(PyObject *stream)
: std::istream(nullptr), buf_(stream) {
rdbuf(&buf_);
}

bool has_error() const { return buf_.has_error(); }

const std::string &error_message() const { return buf_.error_message(); }

private:
PythonInputStreamBuf buf_;
};

int Fory_PyBooleanSequenceWriteToBuffer(PyObject *collection, Buffer *buffer,
Py_ssize_t start_index) {
PyObject **items = py_sequence_get_items(collection);
Expand Down Expand Up @@ -58,4 +176,32 @@ int Fory_PyFloatSequenceWriteToBuffer(PyObject *collection, Buffer *buffer,
}
return 0;
}

int Fory_PyCreateBufferFromStream(PyObject *stream, uint32_t buffer_size,
Buffer **out, std::string *error_message) {
if (stream == nullptr) {
*error_message = "stream must not be null";
return -1;
}
const int has_read = PyObject_HasAttrString(stream, "read");
if (has_read < 0) {
*error_message = fetch_python_error_message();
return -1;
}
if (has_read == 0) {
*error_message = "stream object must provide read(size) method";
return -1;
}
try {
auto py_stream = std::make_shared<PythonInputStream>(stream);
auto source_stream = std::static_pointer_cast<std::istream>(py_stream);
auto fory_stream = std::make_shared<ForyInputStream>(
std::move(source_stream), buffer_size);
*out = new Buffer(std::move(fory_stream));
return 0;
} catch (const std::exception &e) {
*error_message = e.what();
return -1;
}
}
} // namespace fory
6 changes: 5 additions & 1 deletion cpp/fory/python/pyfory.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/

#pragma once
#include <string>

#include "Python.h"
#include "fory/util/buffer.h"

Expand All @@ -26,4 +28,6 @@ int Fory_PyBooleanSequenceWriteToBuffer(PyObject *collection, Buffer *buffer,
Py_ssize_t start_index);
int Fory_PyFloatSequenceWriteToBuffer(PyObject *collection, Buffer *buffer,
Py_ssize_t start_index);
} // namespace fory
int Fory_PyCreateBufferFromStream(PyObject *stream, uint32_t buffer_size,
Buffer **out, std::string *error_message);
} // namespace fory
10 changes: 10 additions & 0 deletions cpp/fory/serialization/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,16 @@ cc_test(
],
)

cc_test(
name = "stream_test",
srcs = ["stream_test.cc"],
deps = [
":fory_serialization",
"@googletest//:gtest",
"@googletest//:gtest_main",
],
)

cc_binary(
name = "xlang_test_main",
srcs = ["xlang_test_main.cc"],
Expand Down
4 changes: 4 additions & 0 deletions cpp/fory/serialization/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ if(FORY_BUILD_TESTS)
add_executable(fory_serialization_any_test any_serializer_test.cc)
target_link_libraries(fory_serialization_any_test fory_serialization GTest::gtest GTest::gtest_main)
gtest_discover_tests(fory_serialization_any_test)

add_executable(fory_serialization_stream_test stream_test.cc)
target_link_libraries(fory_serialization_stream_test fory_serialization GTest::gtest GTest::gtest_main)
gtest_discover_tests(fory_serialization_stream_test)
endif()

# xlang test binary
Expand Down
14 changes: 10 additions & 4 deletions cpp/fory/serialization/serializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "fory/util/error.h"
#include "fory/util/result.h"
#include <cstdint>
#include <limits>
#include <string>

namespace fory {
Expand Down Expand Up @@ -83,10 +84,15 @@ struct HeaderInfo {
/// @param buffer Input buffer
/// @return Header information or error
inline Result<HeaderInfo, Error> read_header(Buffer &buffer) {
// Check minimum header size (1 byte: flags)
if (buffer.reader_index() + 1 > buffer.size()) {
return Unexpected(
Error::buffer_out_of_bound(buffer.reader_index(), 1, buffer.size()));
// Ensure minimum header size (1 byte: flags), including stream-backed
// buffers.
Error error;
const uint64_t target = static_cast<uint64_t>(buffer.reader_index()) + 1;
if (target > std::numeric_limits<uint32_t>::max()) {
return Unexpected(Error::out_of_bound("header reader index overflow"));
}
if (!buffer.ensure_size(static_cast<uint32_t>(target), error)) {
return Unexpected(std::move(error));
}

HeaderInfo info;
Expand Down
Loading
Loading