diff --git a/src/python/BUILD b/src/python/BUILD index 8b5cb2e70d..a151d78643 100644 --- a/src/python/BUILD +++ b/src/python/BUILD @@ -86,7 +86,10 @@ ovms_cc_library( deps = PYBIND_DEPS + [ "//third_party:openvino", "@mediapipe//mediapipe/framework:calculator_framework", + "@com_github_tencent_rapidjson//:rapidjson", "//src:libovmsprecision", + "//src:httppayload", + "//src:libmultipart_parser", "ovmspytensor", "pytensorovtensorconvertercalculator_cc_proto", "pythonbackend", diff --git a/src/python/pytensor_ovtensor_converter_calculator.cc b/src/python/pytensor_ovtensor_converter_calculator.cc index 0ebeeadbb9..315a5a9099 100644 --- a/src/python/pytensor_ovtensor_converter_calculator.cc +++ b/src/python/pytensor_ovtensor_converter_calculator.cc @@ -13,7 +13,9 @@ // See the License for the specific language governing permissions and // limitations under the License. //***************************************************************************** +#include #include +#include #include #include @@ -31,6 +33,15 @@ #include #pragma warning(pop) +#pragma warning(push) +#pragma warning(disable : 6313) +#include +#include +#include +#pragma warning(pop) + +#include "../http_payload.hpp" +#include "../multi_part_parser.hpp" #include "../precision.hpp" #include "python_backend.hpp" #include "src/python/ovms_py_tensor.hpp" @@ -95,24 +106,44 @@ class PyTensorOvTensorConverterCalculator : public CalculatorBase { mediapipe::Timestamp outputTimestamp; static const std::string OV_TENSOR_TAG_NAME; static const std::string OVMS_PY_TENSOR_TAG_NAME; + static const std::string HTTP_REQUEST_TAG_NAME; + static const std::string HTTP_RESPONSE_TAG_NAME; public: static absl::Status GetContract(CalculatorContract* cc) { LOG(INFO) << "PyTensorOvTensorConverterCalculator [Node: " << cc->GetNodeName() << "] GetContract start"; RET_CHECK(cc->Inputs().GetTags().size() == 1); RET_CHECK(cc->Outputs().GetTags().size() == 1); - RET_CHECK((*(cc->Inputs().GetTags().begin()) == OV_TENSOR_TAG_NAME && *(cc->Outputs().GetTags().begin()) == OVMS_PY_TENSOR_TAG_NAME) || (*(cc->Inputs().GetTags().begin()) == OVMS_PY_TENSOR_TAG_NAME && *(cc->Outputs().GetTags().begin()) == OV_TENSOR_TAG_NAME)); - if (*(cc->Inputs().GetTags().begin()) == OV_TENSOR_TAG_NAME) { + const std::string inputTag = *(cc->Inputs().GetTags().begin()); + const std::string outputTag = *(cc->Outputs().GetTags().begin()); + const bool ovToPy = (inputTag == OV_TENSOR_TAG_NAME && outputTag == OVMS_PY_TENSOR_TAG_NAME); + const bool pyToOv = (inputTag == OVMS_PY_TENSOR_TAG_NAME && outputTag == OV_TENSOR_TAG_NAME); + const bool httpToPy = (inputTag == HTTP_REQUEST_TAG_NAME && outputTag == OVMS_PY_TENSOR_TAG_NAME); + const bool pyToHttp = (inputTag == OVMS_PY_TENSOR_TAG_NAME && outputTag == HTTP_RESPONSE_TAG_NAME); + RET_CHECK(ovToPy || pyToOv || httpToPy || pyToHttp) + << "PyTensorOvTensorConverterCalculator supports only the following input/output tag pairings: " + << OV_TENSOR_TAG_NAME << "->" << OVMS_PY_TENSOR_TAG_NAME << ", " + << OVMS_PY_TENSOR_TAG_NAME << "->" << OV_TENSOR_TAG_NAME << ", " + << HTTP_REQUEST_TAG_NAME << "->" << OVMS_PY_TENSOR_TAG_NAME << ", " + << OVMS_PY_TENSOR_TAG_NAME << "->" << HTTP_RESPONSE_TAG_NAME + << ". Received: " << inputTag << "->" << outputTag; + if (ovToPy) { RET_CHECK(cc->Options().tag_to_output_tensor_names().count(OVMS_PY_TENSOR_TAG_NAME) > 0); if (cc->Options().tag_to_output_tensor_names().count(OVMS_PY_TENSOR_TAG_NAME) > 1) LOG(INFO) << "PyTensorOvTensorConverterCalculator [Node: " << cc->GetNodeName() << "] tag_to_output_tensor_names map contains some keys that will be ignored"; cc->Inputs().Tag(OV_TENSOR_TAG_NAME).Set(); cc->Outputs().Tag(OVMS_PY_TENSOR_TAG_NAME).Set>(); - } else { + } else if (pyToOv) { if (cc->Options().tag_to_output_tensor_names().count(OVMS_PY_TENSOR_TAG_NAME) > 0) LOG(INFO) << "PyTensorOvTensorConverterCalculator [Node: " << cc->GetNodeName() << "] tag_to_output_tensor_names map contains some keys that will be ignored"; cc->Inputs().Tag(OVMS_PY_TENSOR_TAG_NAME).Set>(); cc->Outputs().Tag(OV_TENSOR_TAG_NAME).Set(); + } else if (httpToPy) { + cc->Inputs().Tag(HTTP_REQUEST_TAG_NAME).Set(); + cc->Outputs().Tag(OVMS_PY_TENSOR_TAG_NAME).Set>(); + } else { // pyToHttp + cc->Inputs().Tag(OVMS_PY_TENSOR_TAG_NAME).Set>(); + cc->Outputs().Tag(HTTP_RESPONSE_TAG_NAME).Set(); } LOG(INFO) << "PyTensorOvTensorConverterCalculator [Node: " << cc->GetNodeName() << "] GetContract end"; @@ -135,8 +166,6 @@ class PyTensorOvTensorConverterCalculator : public CalculatorBase { LOG(INFO) << "PyTensorOvTensorConverterCalculator [Node: " << cc->NodeName() << "] Process start"; py::gil_scoped_acquire acquire; try { - PythonBackend pythonBackend; - for (const std::string& tag : cc->Inputs().GetTags()) { if (cc->Inputs().Tag(tag).IsEmpty()) { LOG(INFO) << "PyTensorOvTensorConverterCalculator [Node: " << cc->NodeName() << "] Error occurred during reading inputs. Unexpected empty packet received on input: " << tag; @@ -144,7 +173,58 @@ class PyTensorOvTensorConverterCalculator : public CalculatorBase { } } - if (*(cc->Inputs().GetTags().begin()) == OV_TENSOR_TAG_NAME) { + const std::string inputTag = *(cc->Inputs().GetTags().begin()); + const std::string outputTag = *(cc->Outputs().GetTags().begin()); + + if (inputTag == HTTP_REQUEST_TAG_NAME) { + const ovms::HttpPayload& payload = cc->Inputs().Tag(HTTP_REQUEST_TAG_NAME).Get(); + py::object pyPayload; + if (payload.multipartParser && !payload.multipartParser->hasParseError() && !payload.multipartParser->getAllFieldNames().empty()) { + py::module_ numpy = py::module_::import("numpy"); + py::object uint8 = numpy.attr("uint8"); + py::dict result; + const std::set fieldNames = payload.multipartParser->getAllFieldNames(); + for (const std::string& fieldName : fieldNames) { + const std::vector files = payload.multipartParser->getFilesArrayByFieldName(fieldName); + if (!files.empty()) { + const std::string_view fileContent = files.front(); + py::bytes raw(fileContent.data(), fileContent.size()); + result[py::str(fieldName)] = numpy.attr("frombuffer")(raw, "dtype"_a = uint8); + } else { + result[py::str(fieldName)] = py::str(payload.multipartParser->getFieldByName(fieldName)); + } + } + pyPayload = std::move(result); + } else if (payload.parsedJson && !payload.parsedJson->HasParseError()) { + rapidjson::StringBuffer buffer; + rapidjson::Writer writer(buffer); + payload.parsedJson->Accept(writer); + py::module_ jsonModule = py::module_::import("json"); + pyPayload = jsonModule.attr("loads")(py::str(buffer.GetString(), buffer.GetSize())); + } else { + py::module_ jsonModule = py::module_::import("json"); + try { + pyPayload = jsonModule.attr("loads")(py::str(payload.body)); + } catch (const pybind11::error_already_set&) { + pyPayload = py::str(payload.body); + } + } + std::unique_ptr> outputPtr = std::make_unique>(pyPayload); + cc->Outputs().Tag(OVMS_PY_TENSOR_TAG_NAME).Add(outputPtr.release(), cc->InputTimestamp()); + } else if (outputTag == HTTP_RESPONSE_TAG_NAME) { + const py::object& pyInput = cc->Inputs().Tag(OVMS_PY_TENSOR_TAG_NAME).Get>().getObject(); + std::unique_ptr response; + if (py::isinstance(pyInput)) { + response = std::make_unique(pyInput.cast()); + } else if (py::isinstance(pyInput)) { + response = std::make_unique(pyInput.cast()); + } else { + py::module_ jsonModule = py::module_::import("json"); + py::object dumped = jsonModule.attr("dumps")(pyInput); + response = std::make_unique(dumped.cast()); + } + cc->Outputs().Tag(HTTP_RESPONSE_TAG_NAME).Add(response.release(), cc->InputTimestamp()); + } else if (inputTag == OV_TENSOR_TAG_NAME) { auto& inputTensor = cc->Inputs().Tag(OV_TENSOR_TAG_NAME).Get(); std::unique_ptr> outputPyTensor; @@ -168,6 +248,7 @@ class PyTensorOvTensorConverterCalculator : public CalculatorBase { << "Undefined precision in input tensor: " << inputTensor.get_element_type(); } + PythonBackend pythonBackend; pythonBackend.createOvmsPyTensor( outputName, const_cast((const void*)inputTensor.data()), @@ -180,6 +261,7 @@ class PyTensorOvTensorConverterCalculator : public CalculatorBase { } else { if (*(cc->Inputs().GetTags().begin()) == OVMS_PY_TENSOR_TAG_NAME) { auto& inputTensor = cc->Inputs().Tag(OVMS_PY_TENSOR_TAG_NAME).Get>(); + PythonBackend pythonBackend; pythonBackend.validateOvmsPyTensor(inputTensor.getObject()); const auto precision = ovmsPrecisionToIE2Precision(fromKfsString(inputTensor.getProperty("datatype"))); if (precision == ov::element::Type_t::dynamic) { @@ -223,6 +305,8 @@ class PyTensorOvTensorConverterCalculator : public CalculatorBase { const std::string PyTensorOvTensorConverterCalculator::OV_TENSOR_TAG_NAME{"OVTENSOR"}; const std::string PyTensorOvTensorConverterCalculator::OVMS_PY_TENSOR_TAG_NAME{"OVMS_PY_TENSOR"}; +const std::string PyTensorOvTensorConverterCalculator::HTTP_REQUEST_TAG_NAME{"HTTP_REQUEST_PAYLOAD"}; +const std::string PyTensorOvTensorConverterCalculator::HTTP_RESPONSE_TAG_NAME{"HTTP_RESPONSE_PAYLOAD"}; REGISTER_CALCULATOR(PyTensorOvTensorConverterCalculator); } // namespace mediapipe diff --git a/src/test/pythonnode_test.cpp b/src/test/pythonnode_test.cpp index 5b31146d19..afb6a17fbf 100644 --- a/src/test/pythonnode_test.cpp +++ b/src/test/pythonnode_test.cpp @@ -54,12 +54,20 @@ #pragma GCC diagnostic pop #include "opencv2/opencv.hpp" +#include "../http_payload.hpp" +#include "../http_rest_api_handler.hpp" #include "../python/python_backend.hpp" #include "c_api_test_utils.hpp" #include "constructor_enabled_model_manager.hpp" #include "platform_utils.hpp" +#include "test_http_utils.hpp" #include "test_utils.hpp" +#pragma warning(push) +#pragma warning(disable : 6313) +#include +#pragma warning(pop) + namespace py = pybind11; using namespace ovms; using namespace py::literals; @@ -2317,6 +2325,305 @@ TEST_F(PythonFlowTest, ConverterCalculator_PyTensorBufferMismatch) { } } +// ---- HTTP_REQUEST_PAYLOAD / HTTP_RESPONSE_PAYLOAD conversion tests -------- + +TEST_F(PythonFlowTest, ConverterCalculator_HttpJsonRequestToPyDict) { + std::string testPbtxt = R"( + calculator: "PyTensorOvTensorConverterCalculator" + name: "conversionNode" + input_stream: "HTTP_REQUEST_PAYLOAD:input" + output_stream: "OVMS_PY_TENSOR:output" + )"; + mediapipe::CalculatorRunner runner(testPbtxt); + + auto payload = std::make_unique(); + payload->body = R"({"prompt":"hello","temperature":0.5,"n":3})"; + payload->parsedJson = std::make_shared(); + payload->parsedJson->Parse(payload->body.c_str()); + ASSERT_FALSE(payload->parsedJson->HasParseError()); + + runner.MutableInputs()->Tag("HTTP_REQUEST_PAYLOAD").packets.push_back(mediapipe::Adopt(payload.release()).At(mediapipe::Timestamp(0))); + + py::gil_scoped_acquire acquire; + { + py::gil_scoped_release release; + auto status = runner.Run(); + ASSERT_TRUE(status.ok()) << status.code() << " " << status.message(); + } + + const PyObjectWrapper& out = + runner.Outputs().Tag("OVMS_PY_TENSOR").packets[0].Get>(); + const py::object& obj = out.getObject(); + ASSERT_TRUE(py::isinstance(obj)); + py::dict d = obj.cast(); + EXPECT_EQ(d["prompt"].cast(), "hello"); + EXPECT_DOUBLE_EQ(d["temperature"].cast(), 0.5); + EXPECT_EQ(d["n"].cast(), 3); +} + +TEST_F(PythonFlowTest, ConverterCalculator_HttpRawJsonBodyToPyDict) { + std::string testPbtxt = R"( + calculator: "PyTensorOvTensorConverterCalculator" + name: "conversionNode" + input_stream: "HTTP_REQUEST_PAYLOAD:input" + output_stream: "OVMS_PY_TENSOR:output" + )"; + mediapipe::CalculatorRunner runner(testPbtxt); + + auto payload = std::make_unique(); + payload->body = R"({"a":1,"b":[2,3]})"; + // parsedJson left null - exercise the json.loads(body) fallback. + runner.MutableInputs()->Tag("HTTP_REQUEST_PAYLOAD").packets.push_back(mediapipe::Adopt(payload.release()).At(mediapipe::Timestamp(0))); + + py::gil_scoped_acquire acquire; + { + py::gil_scoped_release release; + auto status = runner.Run(); + ASSERT_TRUE(status.ok()) << status.code() << " " << status.message(); + } + + const PyObjectWrapper& out = + runner.Outputs().Tag("OVMS_PY_TENSOR").packets[0].Get>(); + const py::object& obj = out.getObject(); + ASSERT_TRUE(py::isinstance(obj)); + py::dict d = obj.cast(); + EXPECT_EQ(d["a"].cast(), 1); + py::list l = d["b"].cast(); + ASSERT_EQ(l.size(), 2u); + EXPECT_EQ(l[0].cast(), 2); + EXPECT_EQ(l[1].cast(), 3); +} + +TEST_F(PythonFlowTest, ConverterCalculator_HttpRawNonJsonBodyToPyString) { + std::string testPbtxt = R"( + calculator: "PyTensorOvTensorConverterCalculator" + name: "conversionNode" + input_stream: "HTTP_REQUEST_PAYLOAD:input" + output_stream: "OVMS_PY_TENSOR:output" + )"; + mediapipe::CalculatorRunner runner(testPbtxt); + + auto payload = std::make_unique(); + payload->body = "not json at all"; + runner.MutableInputs()->Tag("HTTP_REQUEST_PAYLOAD").packets.push_back(mediapipe::Adopt(payload.release()).At(mediapipe::Timestamp(0))); + + py::gil_scoped_acquire acquire; + { + py::gil_scoped_release release; + auto status = runner.Run(); + ASSERT_TRUE(status.ok()) << status.code() << " " << status.message(); + } + + const PyObjectWrapper& out = + runner.Outputs().Tag("OVMS_PY_TENSOR").packets[0].Get>(); + const py::object& obj = out.getObject(); + ASSERT_TRUE(py::isinstance(obj)); + EXPECT_EQ(obj.cast(), "not json at all"); +} + +TEST_F(PythonFlowTest, ConverterCalculator_HttpMultipartToPyDict) { + std::string testPbtxt = R"( + calculator: "PyTensorOvTensorConverterCalculator" + name: "conversionNode" + input_stream: "HTTP_REQUEST_PAYLOAD:input" + output_stream: "OVMS_PY_TENSOR:output" + )"; + mediapipe::CalculatorRunner runner(testPbtxt); + + // Lifetime of file content must outlive runner.Run() because parser returns string_view. + static const std::string fileBytes{"\x01\x02\x03\x04\x05", 5}; + auto parser = std::make_shared(); + EXPECT_CALL(*parser, hasParseError()).WillRepeatedly(::testing::Return(false)); + EXPECT_CALL(*parser, getAllFieldNames()) + .WillRepeatedly(::testing::Return(std::set{"file", "model"})); + EXPECT_CALL(*parser, getFilesArrayByFieldName(::testing::Eq("file"))) + .WillRepeatedly(::testing::Return(std::vector{std::string_view{fileBytes}})); + EXPECT_CALL(*parser, getFilesArrayByFieldName(::testing::Eq("model"))) + .WillRepeatedly(::testing::Return(std::vector{})); + EXPECT_CALL(*parser, getFieldByName(::testing::Eq("model"))) + .WillRepeatedly(::testing::Return(std::string{"my-model"})); + + auto payload = std::make_unique(); + payload->multipartParser = parser; + runner.MutableInputs()->Tag("HTTP_REQUEST_PAYLOAD").packets.push_back(mediapipe::Adopt(payload.release()).At(mediapipe::Timestamp(0))); + + py::gil_scoped_acquire acquire; + { + py::gil_scoped_release release; + auto status = runner.Run(); + ASSERT_TRUE(status.ok()) << status.code() << " " << status.message(); + } + + const PyObjectWrapper& out = + runner.Outputs().Tag("OVMS_PY_TENSOR").packets[0].Get>(); + const py::object& obj = out.getObject(); + ASSERT_TRUE(py::isinstance(obj)); + py::dict d = obj.cast(); + ASSERT_TRUE(d.contains("file")); + ASSERT_TRUE(d.contains("model")); + + py::module_ numpy = py::module_::import("numpy"); + py::object ndarray = d["file"]; + ASSERT_TRUE(py::isinstance(ndarray, numpy.attr("ndarray"))); + EXPECT_EQ(ndarray.attr("dtype").attr("name").cast(), "uint8"); + EXPECT_EQ(ndarray.attr("size").cast(), fileBytes.size()); + auto bytesOut = ndarray.attr("tobytes")().cast(); + EXPECT_EQ(bytesOut, fileBytes); + + EXPECT_EQ(d["model"].cast(), "my-model"); +} + +TEST_F(PythonFlowTest, ConverterCalculator_HttpMultipartEmptyFileToPyEmptyArray) { + std::string testPbtxt = R"( + calculator: "PyTensorOvTensorConverterCalculator" + name: "conversionNode" + input_stream: "HTTP_REQUEST_PAYLOAD:input" + output_stream: "OVMS_PY_TENSOR:output" + )"; + mediapipe::CalculatorRunner runner(testPbtxt); + + auto parser = std::make_shared(); + EXPECT_CALL(*parser, hasParseError()).WillRepeatedly(::testing::Return(false)); + EXPECT_CALL(*parser, getAllFieldNames()) + .WillRepeatedly(::testing::Return(std::set{"file"})); + // File is present in the multipart upload but its content is empty. + EXPECT_CALL(*parser, getFilesArrayByFieldName(::testing::Eq("file"))) + .WillRepeatedly(::testing::Return(std::vector{std::string_view{}})); + + auto payload = std::make_unique(); + payload->multipartParser = parser; + runner.MutableInputs()->Tag("HTTP_REQUEST_PAYLOAD").packets.push_back(mediapipe::Adopt(payload.release()).At(mediapipe::Timestamp(0))); + + py::gil_scoped_acquire acquire; + { + py::gil_scoped_release release; + auto status = runner.Run(); + ASSERT_TRUE(status.ok()) << status.code() << " " << status.message(); + } + + const PyObjectWrapper& out = + runner.Outputs().Tag("OVMS_PY_TENSOR").packets[0].Get>(); + const py::object& obj = out.getObject(); + ASSERT_TRUE(py::isinstance(obj)); + py::dict d = obj.cast(); + ASSERT_TRUE(d.contains("file")); + py::module_ numpy = py::module_::import("numpy"); + py::object ndarray = d["file"]; + ASSERT_TRUE(py::isinstance(ndarray, numpy.attr("ndarray"))); + EXPECT_EQ(ndarray.attr("dtype").attr("name").cast(), "uint8"); + EXPECT_EQ(ndarray.attr("size").cast(), 0u); +} + +TEST_F(PythonFlowTest, ConverterCalculator_PyDictToHttpResponse) { + std::string testPbtxt = R"( + calculator: "PyTensorOvTensorConverterCalculator" + name: "conversionNode" + input_stream: "OVMS_PY_TENSOR:input" + output_stream: "HTTP_RESPONSE_PAYLOAD:output" + )"; + mediapipe::CalculatorRunner runner(testPbtxt); + + py::gil_scoped_acquire acquire; + { + py::dict d; + d["status"] = py::str("ok"); + d["count"] = py::int_(7); + runner.MutableInputs()->Tag("OVMS_PY_TENSOR").packets.push_back(mediapipe::Adopt>(new PyObjectWrapper(static_cast(d))).At(mediapipe::Timestamp(0))); + } + + { + py::gil_scoped_release release; + auto status = runner.Run(); + ASSERT_TRUE(status.ok()) << status.code() << " " << status.message(); + } + + const std::string& response = + runner.Outputs().Tag("HTTP_RESPONSE_PAYLOAD").packets[0].Get(); + // Validate by re-parsing instead of relying on key ordering. + rapidjson::Document doc; + doc.Parse(response.c_str()); + ASSERT_FALSE(doc.HasParseError()) << response; + ASSERT_TRUE(doc.IsObject()); + ASSERT_TRUE(doc.HasMember("status")); + ASSERT_TRUE(doc.HasMember("count")); + EXPECT_STREQ(doc["status"].GetString(), "ok"); + EXPECT_EQ(doc["count"].GetInt(), 7); +} + +TEST_F(PythonFlowTest, ConverterCalculator_PyStringToHttpResponse) { + std::string testPbtxt = R"( + calculator: "PyTensorOvTensorConverterCalculator" + name: "conversionNode" + input_stream: "OVMS_PY_TENSOR:input" + output_stream: "HTTP_RESPONSE_PAYLOAD:output" + )"; + mediapipe::CalculatorRunner runner(testPbtxt); + + py::gil_scoped_acquire acquire; + { + py::object s = py::str("raw response body"); + runner.MutableInputs()->Tag("OVMS_PY_TENSOR").packets.push_back(mediapipe::Adopt>(new PyObjectWrapper(s)).At(mediapipe::Timestamp(0))); + } + + { + py::gil_scoped_release release; + auto status = runner.Run(); + ASSERT_TRUE(status.ok()) << status.code() << " " << status.message(); + } + + const std::string& response = + runner.Outputs().Tag("HTTP_RESPONSE_PAYLOAD").packets[0].Get(); + EXPECT_EQ(response, "raw response body"); +} + +TEST_F(PythonFlowTest, ConverterCalculator_PyBytesToHttpResponse) { + std::string testPbtxt = R"( + calculator: "PyTensorOvTensorConverterCalculator" + name: "conversionNode" + input_stream: "OVMS_PY_TENSOR:input" + output_stream: "HTTP_RESPONSE_PAYLOAD:output" + )"; + mediapipe::CalculatorRunner runner(testPbtxt); + + // Binary payload: embedded NUL plus high (non-UTF-8) bytes. Must round-trip byte-for-byte. + const std::string expected{"\x00\x01\xff\x80\x7f\x00" + "ABC\xfe", + 10}; + + py::gil_scoped_acquire acquire; + { + py::object b = py::bytes(expected.data(), expected.size()); + runner.MutableInputs()->Tag("OVMS_PY_TENSOR").packets.push_back(mediapipe::Adopt>(new PyObjectWrapper(b)).At(mediapipe::Timestamp(0))); + } + + { + py::gil_scoped_release release; + auto status = runner.Run(); + ASSERT_TRUE(status.ok()) << status.code() << " " << status.message(); + } + + const std::string& response = + runner.Outputs().Tag("HTTP_RESPONSE_PAYLOAD").packets[0].Get(); + ASSERT_EQ(response.size(), expected.size()); + EXPECT_EQ(response, expected); +} + +TEST_F(PythonFlowTest, ConverterCalculator_InvalidTagPairRejected) { + // HTTP_REQUEST_PAYLOAD as input must pair with OVMS_PY_TENSOR output. + // Pairing with OVTENSOR is unsupported and GetContract should reject it. + std::string testPbtxt = R"( + calculator: "PyTensorOvTensorConverterCalculator" + name: "conversionNode" + input_stream: "HTTP_REQUEST_PAYLOAD:input" + output_stream: "OVTENSOR:output" + )"; + mediapipe::CalculatorRunner runner(testPbtxt); + py::gil_scoped_acquire acquire; + py::gil_scoped_release release; + auto status = runner.Run(); + EXPECT_FALSE(status.ok()); +} + TEST_F(PythonFlowTest, PythonCalculatorTestSingleInSingleOutMultiRunWithErrors) { ConstructorEnabledModelManager manager; std::string firstTestPbtxt = R"(