From 18509899afa356c11cce850a6558c60cac9a6237 Mon Sep 17 00:00:00 2001 From: Adrian Tobiszewski Date: Fri, 22 May 2026 10:32:24 +0200 Subject: [PATCH 1/9] Fix python build for mp_on_py_off config --- src/llm/BUILD | 1 + src/mediapipe_internal/graphqueue.cpp | 2 ++ 2 files changed, 3 insertions(+) diff --git a/src/llm/BUILD b/src/llm/BUILD index d7e13abd55..979a42fdc9 100644 --- a/src/llm/BUILD +++ b/src/llm/BUILD @@ -362,6 +362,7 @@ ovms_cc_library( "//src:libovmsstatus", ], visibility = ["//visibility:public"], + additional_copts = COPTS_PYTHON ) ovms_cc_library( diff --git a/src/mediapipe_internal/graphqueue.cpp b/src/mediapipe_internal/graphqueue.cpp index acbb4e9870..f4688bd546 100644 --- a/src/mediapipe_internal/graphqueue.cpp +++ b/src/mediapipe_internal/graphqueue.cpp @@ -28,7 +28,9 @@ #include #include "../queue.hpp" +#if (PYTHON_DISABLE == 0) #include "src/python/pythonnoderesources.hpp" +#endif #pragma warning(push) #pragma warning(disable : 4324 6001 6385 6386 6326 6011 4309 4005 4456 6246) From 537cc52be57fa148d4d831accc889496e8392630 Mon Sep 17 00:00:00 2001 From: Adrian Tobiszewski Date: Fri, 22 May 2026 12:59:30 +0200 Subject: [PATCH 2/9] Turn off unstable tests --- src/test/pull_hf_model_test.cpp | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/test/pull_hf_model_test.cpp b/src/test/pull_hf_model_test.cpp index 204afa4df6..8f18508fa0 100644 --- a/src/test/pull_hf_model_test.cpp +++ b/src/test/pull_hf_model_test.cpp @@ -420,6 +420,9 @@ TEST_F(HfPullCache, Resume) { // ResumeAfterShutdownRequestAndRerun TEST_F(HfPull, ResumeShutdown) { +#ifdef _WIN32 + SKIP_AND_EXIT_IF_NOT_RUNNING_UNSTABLE(); +#endif std::string basePath = ovms::FileSystem::joinPath({this->directoryPath, "repository", "OpenVINO", "Phi-3-mini-FastDraft-50M-int8-ov"}); std::string modelPath = ovms::FileSystem::appendSlash(basePath) + "openvino_model.bin"; std::string model2Path = ovms::FileSystem::appendSlash(basePath) + "openvino_detokenizer.bin"; @@ -959,6 +962,9 @@ TEST(HfPullWindowsWorker, ResumeCtrlCChildProcess) { // partial-download artifacts remain on disk and that re-running --pull resumes the // download to completion. TEST_F(HfPull, ResumeCtrlC) { +#ifdef _WIN32 + SKIP_AND_EXIT_IF_NOT_RUNNING_UNSTABLE(); +#endif std::string basePath = ovms::FileSystem::joinPath({this->directoryPath, "repository", "OpenVINO", "Phi-3-mini-FastDraft-50M-int8-ov"}); std::string modelPath = ovms::FileSystem::appendSlash(basePath) + "openvino_model.bin"; std::string model2Path = ovms::FileSystem::appendSlash(basePath) + "openvino_detokenizer.bin"; From 271f3370922242daf03a4c83d6767bcfd59a496b Mon Sep 17 00:00:00 2001 From: Adrian Tobiszewski Date: Fri, 22 May 2026 16:10:44 +0200 Subject: [PATCH 3/9] Add graph reinit on failure --- src/mediapipe_internal/graphqueue.cpp | 79 ++++++++++++++++--- src/mediapipe_internal/graphqueue.hpp | 33 ++++++++ .../mediapipegraphexecutor.hpp | 12 +++ 3 files changed, 113 insertions(+), 11 deletions(-) diff --git a/src/mediapipe_internal/graphqueue.cpp b/src/mediapipe_internal/graphqueue.cpp index f4688bd546..cc0d7f7de5 100644 --- a/src/mediapipe_internal/graphqueue.cpp +++ b/src/mediapipe_internal/graphqueue.cpp @@ -85,25 +85,82 @@ GraphQueue::GraphQueue(const ::mediapipe::CalculatorGraphConfig& config, std::sh SPDLOG_ERROR("Graph queue StartRun failed: {}", absStatus.ToString()); throw std::runtime_error(absStatus.ToString()); } - inferRequests.emplace_back(std::move(graphHelper)); + this->inferRequests.emplace_back(std::move(graphHelper)); } } -GraphQueue::~GraphQueue() { - for (auto& graphHelper : inferRequests) { - auto absStatus = graphHelper->graph->WaitUntilIdle(); +GraphHelper::~GraphHelper() { + if (!graph) { + return; + } + auto absStatus = graph->WaitUntilIdle(); + if (!absStatus.ok()) { + SPDLOG_DEBUG("GraphHelper WaitUntilIdle error: {}", absStatus.ToString()); + } + absStatus = graph->CloseAllPacketSources(); + if (!absStatus.ok()) { + SPDLOG_DEBUG("GraphHelper CloseAllPacketSources error: {}", absStatus.ToString()); + } + absStatus = graph->WaitUntilDone(); + if (!absStatus.ok()) { + SPDLOG_DEBUG("GraphHelper WaitUntilDone error: {}", absStatus.ToString()); + } + graph->Cancel(); +} + +void GraphHelper::reinitialize(const ::mediapipe::CalculatorGraphConfig& config, const GraphSidePackets& sidePacketMaps) { + SPDLOG_DEBUG("Reinitializing graph after error"); + // Tear down the old graph (best-effort, errors expected since graph is in bad state) + if (this->graph) { + auto absStatus = this->graph->CloseAllPacketSources(); if (!absStatus.ok()) { - SPDLOG_DEBUG("Graph queue WaitUntilIdle error: {}", absStatus.ToString()); + SPDLOG_DEBUG("reinitialize: CloseAllPacketSources: {}", absStatus.ToString()); } - absStatus = graphHelper->graph->CloseAllPacketSources(); + absStatus = this->graph->WaitUntilDone(); if (!absStatus.ok()) { - SPDLOG_DEBUG("Graph queue CloseAllPacketSources error: {}", absStatus.ToString()); + SPDLOG_DEBUG("reinitialize: WaitUntilDone: {}", absStatus.ToString()); } - absStatus = graphHelper->graph->WaitUntilDone(); + this->graph->Cancel(); + } + // Create fresh graph + graph = std::make_unique<::mediapipe::CalculatorGraph>(); + currentTimestamp = ::mediapipe::Timestamp(0); + + auto absStatus = graph->Initialize(config); + if (!absStatus.ok()) { + SPDLOG_ERROR("Graph reinitialize: Initialize failed: {}", absStatus.ToString()); + graph.reset(); + return; + } + for (const auto& [streamName, holder] : outStreamObservers) { + absStatus = graph->ObserveOutputStream(streamName, [holder](const ::mediapipe::Packet& packet) -> absl::Status { + return holder->current->handlePacket(packet); + }); if (!absStatus.ok()) { - SPDLOG_DEBUG("Graph queue WaitUntilDone error: {}", absStatus.ToString()); + SPDLOG_ERROR("Graph reinitialize: ObserveOutputStream failed: {}", absStatus.ToString()); + graph.reset(); + return; } - graphHelper->graph->Cancel(); - graphHelper->graph.reset(); } + // Reset observers to null sentinel + for (const auto& [streamName, holder] : outStreamObservers) { + holder->current = std::make_shared(); + } + // Reset execution contexts + for (auto& [nodeName, ctx] : genAiExecutionContextMap) { + ctx->reset(); + } + std::map inputSidePackets; + buildInputSidePackets(inputSidePackets, sidePacketMaps); + inputSidePackets[LLM_EXECUTION_CONTEXT_SESSION_SIDE_PACKET_TAG] = + mediapipe::MakePacket(genAiExecutionContextMap) + .At(::mediapipe::Timestamp(STARTING_TIMESTAMP_VALUE)); + absStatus = graph->StartRun(inputSidePackets); + if (!absStatus.ok()) { + SPDLOG_ERROR("Graph reinitialize: StartRun failed: {}", absStatus.ToString()); + graph.reset(); + return; + } + SPDLOG_DEBUG("Graph reinitialized successfully"); } +GraphQueue::~GraphQueue() = default; } // namespace ovms diff --git a/src/mediapipe_internal/graphqueue.hpp b/src/mediapipe_internal/graphqueue.hpp index d97ee5d18f..ab16ac046c 100644 --- a/src/mediapipe_internal/graphqueue.hpp +++ b/src/mediapipe_internal/graphqueue.hpp @@ -65,7 +65,40 @@ struct GraphHelper { genAiExecutionContextMap(std::move(gh.genAiExecutionContextMap)), currentTimestamp(gh.currentTimestamp) {} GraphHelper& operator=(GraphHelper&&) = delete; + ~GraphHelper(); + // Tears down the current (errored) graph and rebuilds a fresh one + // with the same observers and side packets. Called when inference + // encounters a graph error to avoid returning a poisoned graph to the pool. + void reinitialize(const ::mediapipe::CalculatorGraphConfig& config, const GraphSidePackets& sidePacketMaps); }; + +// RAII guard that reinitializes the graph if inference exits with an error. +// Construct before the first graph interaction (packet push). Call dismiss() +// on the success path. If not dismissed, the destructor rebuilds the graph +// so the next request from the pool gets a clean graph. +class GraphReinitGuard { + GraphHelper& helper; + const ::mediapipe::CalculatorGraphConfig& config; + const GraphSidePackets& sidePacketMaps; + bool dismissed = false; + +public: + GraphReinitGuard(GraphHelper& helper, + const ::mediapipe::CalculatorGraphConfig& config, + const GraphSidePackets& sidePacketMaps) : + helper(helper), + config(config), + sidePacketMaps(sidePacketMaps) {} + void dismiss() { dismissed = true; } + ~GraphReinitGuard() { + if (!dismissed) { + helper.reinitialize(config, sidePacketMaps); + } + } + GraphReinitGuard(const GraphReinitGuard&) = delete; + GraphReinitGuard& operator=(const GraphReinitGuard&) = delete; +}; + // we need to keep Graph alive during MP reload hence shared_ptr class GraphQueue : public Queue> { std::shared_ptr sidePacketMaps; diff --git a/src/mediapipe_internal/mediapipegraphexecutor.hpp b/src/mediapipe_internal/mediapipegraphexecutor.hpp index f0f0428740..498c8ecced 100644 --- a/src/mediapipe_internal/mediapipegraphexecutor.hpp +++ b/src/mediapipe_internal/mediapipegraphexecutor.hpp @@ -199,6 +199,10 @@ class MediapipeGraphExecutor { guard->graphHelper->outStreamObservers.at(name)->current = std::make_shared>(name, this->outputTypes.at(name), *this, *request, *response); } + // Guard: if inference fails after this point, reinitialize the graph + // so the next request from the pool gets a clean graph (not poisoned). + GraphReinitGuard reinitGuard(*this->guard->graphHelper, this->config, this->sidePacketMaps); + size_t numberOfPacketsCreated = 0; auto ovms_status = createAndPushPacketsImpl( std::shared_ptr(request, [](const RequestType*) {}), @@ -227,6 +231,8 @@ class MediapipeGraphExecutor { } resetLlmExecutionContexts(this->guard->graphHelper->genAiExecutionContextMap); MP_RETURN_ON_FAIL(status, "graph wait until idle", mediapipeAbslToOvmsStatus(status.code())); + // Success — dismiss the guard, graph is healthy + reinitGuard.dismiss(); // Increment timestamp for next request reusing this graph from the queue this->guard->graphHelper->currentTimestamp = ::mediapipe::Timestamp(this->guard->graphHelper->currentTimestamp.Value() + 1); SPDLOG_DEBUG("Received all output stream packets for graph: {}", this->name); @@ -393,6 +399,10 @@ class MediapipeGraphExecutor { executionContext, this->mediapipeServableMetricReporter); } + // Guard: if streaming inference fails, reinitialize the graph + // so the next request from the pool gets a clean graph (not poisoned). + GraphReinitGuard reinitGuard(*this->guard->graphHelper, this->config, this->sidePacketMaps); + size_t numberOfPacketsCreated = 0; { OVMS_PROFILE_SCOPE("Mediapipe graph deserializing first request"); @@ -450,6 +460,8 @@ class MediapipeGraphExecutor { } resetLlmExecutionContexts(this->guard->graphHelper->genAiExecutionContextMap); MP_RETURN_ON_FAIL(status, "graph wait until idle", mediapipeAbslToOvmsStatus(status.code())); + // Success — dismiss the guard, graph is healthy + reinitGuard.dismiss(); // Increment timestamp for next request reusing this graph from the queue this->guard->graphHelper->currentTimestamp = ::mediapipe::Timestamp(this->guard->graphHelper->currentTimestamp.Value() + 1); SPDLOG_DEBUG("Graph {}: Done streaming execution (queue path)", this->name); From 10477df5ff5f82c4dc9700c5e3d0f67878080d8b Mon Sep 17 00:00:00 2001 From: Adrian Tobiszewski Date: Mon, 25 May 2026 13:34:42 +0200 Subject: [PATCH 4/9] Remove default queue switch off --- src/config.cpp | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/config.cpp b/src/config.cpp index cd37c30a06..6e780c9425 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -94,12 +94,9 @@ bool Config::parse(ServerSettingsImpl* serverSettings, ModelsSettingsImpl* model static EnvGuard envGuard; #if defined(__linux__) || defined(_WIN32) if (this->serverSettings.logLevel == "DEBUG" || this->serverSettings.logLevel == "TRACE") { - envGuard.set("OPENVINO_LOG_LEVEL", "4"); + envGuard.set("OPENVINO_LOG_LEVEL", "4"); } #endif - if (GetEnvVar("OVMS_GRAPH_QUEUE_OFF").empty()) { - envGuard.set("OVMS_GRAPH_QUEUE_OFF", "1"); - } return validate(); } From 6b96b84f3f89637bc5b22f4bdac8d87bc99fa62c Mon Sep 17 00:00:00 2001 From: Adrian Tobiszewski Date: Mon, 25 May 2026 17:01:25 +0200 Subject: [PATCH 5/9] Fix style --- src/config.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/config.cpp b/src/config.cpp index 6e780c9425..21f39fc697 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -94,7 +94,7 @@ bool Config::parse(ServerSettingsImpl* serverSettings, ModelsSettingsImpl* model static EnvGuard envGuard; #if defined(__linux__) || defined(_WIN32) if (this->serverSettings.logLevel == "DEBUG" || this->serverSettings.logLevel == "TRACE") { - envGuard.set("OPENVINO_LOG_LEVEL", "4"); + envGuard.set("OPENVINO_LOG_LEVEL", "4"); } #endif return validate(); From 4806c6b695dfba7bc0270aea295ca7d3f4a20835 Mon Sep 17 00:00:00 2001 From: Adrian Tobiszewski Date: Mon, 25 May 2026 17:22:37 +0200 Subject: [PATCH 6/9] Fix --- src/BUILD | 1 - src/config.cpp | 7 -- src/logging.cpp | 8 +++ .../vlm_cb_regular.pbtxt | 1 + .../calculators/streaming_test_calculator.cpp | 24 +++++++ src/test/mediapipeflow_test.cpp | 70 +++++++++++++++++++ src/utils/env_guard.cpp | 2 +- 7 files changed, 104 insertions(+), 9 deletions(-) diff --git a/src/BUILD b/src/BUILD index c2d9aec3b1..bd7ac8478b 100644 --- a/src/BUILD +++ b/src/BUILD @@ -354,7 +354,6 @@ ovms_cc_library( "libovms_cliparser", "libovms_systeminfo", "ovms_exit_codes", - "//src/utils:env_guard", ], visibility = ["//visibility:public",], additional_copts = COPTS_DROGON, diff --git a/src/config.cpp b/src/config.cpp index 21f39fc697..c6f68d1766 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -36,7 +36,6 @@ #include "modelconfig.hpp" #include "stringutils.hpp" #include "systeminfo.hpp" -#include "utils/env_guard.hpp" namespace ovms { @@ -91,12 +90,6 @@ Config& Config::parse(int argc, char** argv) { bool Config::parse(ServerSettingsImpl* serverSettings, ModelsSettingsImpl* modelsSettings) { this->serverSettings = *serverSettings; this->modelsSettings = *modelsSettings; - static EnvGuard envGuard; -#if defined(__linux__) || defined(_WIN32) - if (this->serverSettings.logLevel == "DEBUG" || this->serverSettings.logLevel == "TRACE") { - envGuard.set("OPENVINO_LOG_LEVEL", "4"); - } -#endif return validate(); } diff --git a/src/logging.cpp b/src/logging.cpp index c0974c3a4e..9353723979 100644 --- a/src/logging.cpp +++ b/src/logging.cpp @@ -21,6 +21,7 @@ #if (MEDIAPIPE_DISABLE == 0) #include #endif +#include #include namespace ovms { @@ -163,6 +164,13 @@ void configure_logger(const std::string& log_level, const std::string& log_path) FLAGS_minloglevel = google::GLOG_ERROR; #endif #endif + if (log_level == "DEBUG" || log_level == "TRACE") { +#ifdef _WIN32 + _putenv_s("OPENVINO_LOG_LEVEL", "4"); +#else + ::setenv("OPENVINO_LOG_LEVEL", "4", 1); +#endif + } } } // namespace ovms diff --git a/src/test/llm/visual_language_model/vlm_cb_regular.pbtxt b/src/test/llm/visual_language_model/vlm_cb_regular.pbtxt index d911975f00..38b139abf6 100644 --- a/src/test/llm/visual_language_model/vlm_cb_regular.pbtxt +++ b/src/test/llm/visual_language_model/vlm_cb_regular.pbtxt @@ -1,3 +1,4 @@ +# OVMS_GRAPH_QUEUE_MAX_SIZE: 2 # Copyright 2024 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/src/test/mediapipe/calculators/streaming_test_calculator.cpp b/src/test/mediapipe/calculators/streaming_test_calculator.cpp index 599fac86a0..12cb35ed45 100644 --- a/src/test/mediapipe/calculators/streaming_test_calculator.cpp +++ b/src/test/mediapipe/calculators/streaming_test_calculator.cpp @@ -13,6 +13,8 @@ // See the License for the specific language governing permissions and // limitations under the License. //***************************************************************************** +#include + #include #pragma GCC diagnostic push @@ -211,9 +213,31 @@ class AddSidePacketToSingleStreamTestCalculator : public CalculatorBase { } }; +class ErrorOnNegativeTestCalculator : public CalculatorBase { +public: + static absl::Status GetContract(CalculatorContract* cc) { + cc->Inputs().Index(0).Set(); + cc->Outputs().Index(0).Set(); + return absl::OkStatus(); + } + absl::Status Open(CalculatorContext* cc) final { return absl::OkStatus(); } + absl::Status Close(CalculatorContext* cc) final { return absl::OkStatus(); } + absl::Status Process(CalculatorContext* cc) final { + ov::Tensor input = cc->Inputs().Index(0).Get(); + if (static_cast(input.data())[0] < 0.0f) { + return absl::InvalidArgumentError("Negative input value"); + } + ov::Tensor output(input.get_element_type(), input.get_shape()); + std::memcpy(output.data(), input.data(), input.get_byte_size()); + cc->Outputs().Index(0).Add(new ov::Tensor(output), cc->InputTimestamp()); + return absl::OkStatus(); + } +}; + REGISTER_CALCULATOR(AddOneSingleStreamTestCalculator); REGISTER_CALCULATOR(AddOne3CycleIterationsTestCalculator); REGISTER_CALCULATOR(AddNumbersMultiInputsOutputsTestCalculator); REGISTER_CALCULATOR(ErrorInProcessTestCalculator); +REGISTER_CALCULATOR(ErrorOnNegativeTestCalculator); REGISTER_CALCULATOR(AddSidePacketToSingleStreamTestCalculator); } // namespace mediapipe diff --git a/src/test/mediapipeflow_test.cpp b/src/test/mediapipeflow_test.cpp index 4be6666009..3410bc31b7 100644 --- a/src/test/mediapipeflow_test.cpp +++ b/src/test/mediapipeflow_test.cpp @@ -4307,3 +4307,73 @@ TEST(MediapipeGraphQueueSizeDirective, EnvVarOVMS_GRAPH_QUEUE_OFF_NotSetDoesNotD ASSERT_EQ(status, ovms::StatusCode::OK); EXPECT_GT(def.getMediapipeGraphConfig().getInitialQueueSize(), 0); } + +// --- Graph queue reinit guard tests --- + +class UnaryQueueReinitTest : public ::testing::Test { +protected: + const std::string name{"reinit_test_graph"}; + const std::string version{"1"}; + ExecutionContext executionContext{ExecutionContext::Interface::GRPC, ExecutionContext::Method::ModelInfer}; + std::unique_ptr reporter; + std::shared_ptr sidePackets; + std::shared_ptr queue; + ::mediapipe::CalculatorGraphConfig config; + + void SetUp() override { + reporter = std::make_unique(nullptr, nullptr, ""); + sidePackets = std::make_shared(); + const std::string pbTxt{R"( +input_stream: "in" +output_stream: "out" +node { + calculator: "ErrorOnNegativeTestCalculator" + input_stream: "in" + output_stream: "out" +} + )"}; + ASSERT_TRUE(::google::protobuf::TextFormat::ParseFromString(pbTxt, &config)); + queue = std::make_shared(config, sidePackets, 1); + } + + void prepareInferRequest(KFSRequest& request, float value) { + request.Clear(); + *request.mutable_model_name() = "my_graph"; + *request.mutable_model_version() = "1"; + prepareKFSInferInputTensor(request, "in", std::tuple{{1}, ovms::Precision::FP32}, std::vector{value}, false); + request.mutable_parameters()->operator[]("OVMS_MP_TIMESTAMP").set_int64_param(0); + } +}; + +TEST_F(UnaryQueueReinitTest, GraphIsReinitializedAfterCalculatorError) { + KFSRequest request; + KFSResponse response; + { + GraphIdGuard guard(queue); + MediapipeGraphExecutor executor{ + name, version, config, + {{"in", mediapipe_packet_type_enum::OVTENSOR}}, + {{"out", mediapipe_packet_type_enum::OVTENSOR}}, + {"in"}, {"out"}, *sidePackets, nullptr, reporter.get(), + std::move(guard)}; + prepareInferRequest(request, -1.0f); + auto status = executor.infer(&request, &response, executionContext); + ASSERT_FALSE(status.ok()); + EXPECT_EQ(status.getCode(), StatusCode::MEDIAPIPE_EXECUTION_ERROR); + } + // Executor destroyed → GraphIdGuard returns graph to pool. + // The reinit guard rebuilt the graph before returning the error. + // Second request with valid (positive) input should succeed. + { + GraphIdGuard guard(queue); + MediapipeGraphExecutor executor{ + name, version, config, + {{"in", mediapipe_packet_type_enum::OVTENSOR}}, + {{"out", mediapipe_packet_type_enum::OVTENSOR}}, + {"in"}, {"out"}, *sidePackets, nullptr, reporter.get(), + std::move(guard)}; + prepareInferRequest(request, 2.0f); + auto status = executor.infer(&request, &response, executionContext); + ASSERT_TRUE(status.ok()); + } +} diff --git a/src/utils/env_guard.cpp b/src/utils/env_guard.cpp index 4a5ee3d114..08dd10d92d 100644 --- a/src/utils/env_guard.cpp +++ b/src/utils/env_guard.cpp @@ -15,7 +15,7 @@ //***************************************************************************** #include "env_guard.hpp" -#include "../logging.hpp" +#include "src/logging.hpp" #include From 8d6009676b043c53696ea79192e7d20745bbcca9 Mon Sep 17 00:00:00 2001 From: Adrian Tobiszewski Date: Tue, 26 May 2026 15:23:11 +0200 Subject: [PATCH 7/9] Fix CI: add ErrorOnNegativeTestCalculator to whitelist, remove queue directive from vlm pbtxt --- src/test/llm/visual_language_model/vlm_cb_regular.pbtxt | 1 - src/test/mediapipeflow_test.cpp | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/llm/visual_language_model/vlm_cb_regular.pbtxt b/src/test/llm/visual_language_model/vlm_cb_regular.pbtxt index 38b139abf6..d911975f00 100644 --- a/src/test/llm/visual_language_model/vlm_cb_regular.pbtxt +++ b/src/test/llm/visual_language_model/vlm_cb_regular.pbtxt @@ -1,4 +1,3 @@ -# OVMS_GRAPH_QUEUE_MAX_SIZE: 2 # Copyright 2024 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/src/test/mediapipeflow_test.cpp b/src/test/mediapipeflow_test.cpp index 3410bc31b7..7cbefc0051 100644 --- a/src/test/mediapipeflow_test.cpp +++ b/src/test/mediapipeflow_test.cpp @@ -3855,6 +3855,7 @@ TEST(WhitelistRegistered, MediapipeCalculatorsList) { "EndLoopTensorCalculator", "EndLoopTfLiteTensorCalculator", "ErrorInProcessTestCalculator", + "ErrorOnNegativeTestCalculator", "ExceptionDuringCloseCalculator", "ExceptionDuringGetContractCalculator", "ExceptionDuringOpenCalculator", From d6c7ff0227f6a0e65aaec19e5c71a31001dc49d0 Mon Sep 17 00:00:00 2001 From: Adrian Tobiszewski Date: Tue, 26 May 2026 16:11:50 +0200 Subject: [PATCH 8/9] Move ErrorOnNegativeTestCalculator to its own file --- src/test/mediapipe/calculators/BUILD | 1 + .../error_on_negative_test_calculator.cpp | 49 +++++++++++++++++++ .../calculators/streaming_test_calculator.cpp | 24 --------- 3 files changed, 50 insertions(+), 24 deletions(-) create mode 100644 src/test/mediapipe/calculators/error_on_negative_test_calculator.cpp diff --git a/src/test/mediapipe/calculators/BUILD b/src/test/mediapipe/calculators/BUILD index a62497082b..bc4d3eed6d 100644 --- a/src/test/mediapipe/calculators/BUILD +++ b/src/test/mediapipe/calculators/BUILD @@ -78,6 +78,7 @@ cc_library( "ovms_calculator.cc", "ovms_image_input_calculator.cc", "ovms_kfs_calculator.cc", + "error_on_negative_test_calculator.cpp", "streaming_test_calculator.cpp", "two_input_calculator.cpp", ], diff --git a/src/test/mediapipe/calculators/error_on_negative_test_calculator.cpp b/src/test/mediapipe/calculators/error_on_negative_test_calculator.cpp new file mode 100644 index 0000000000..422251262e --- /dev/null +++ b/src/test/mediapipe/calculators/error_on_negative_test_calculator.cpp @@ -0,0 +1,49 @@ +//***************************************************************************** +// Copyright 2025 Intel Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//***************************************************************************** +#include + +#include + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" +#include "mediapipe/framework/calculator_framework.h" +#pragma GCC diagnostic pop + +namespace mediapipe { + +class ErrorOnNegativeTestCalculator : public CalculatorBase { +public: + static absl::Status GetContract(CalculatorContract* cc) { + cc->Inputs().Index(0).Set(); + cc->Outputs().Index(0).Set(); + return absl::OkStatus(); + } + absl::Status Open(CalculatorContext* cc) final { return absl::OkStatus(); } + absl::Status Close(CalculatorContext* cc) final { return absl::OkStatus(); } + absl::Status Process(CalculatorContext* cc) final { + ov::Tensor input = cc->Inputs().Index(0).Get(); + if (static_cast(input.data())[0] < 0.0f) { + return absl::InvalidArgumentError("Negative input value"); + } + ov::Tensor output(input.get_element_type(), input.get_shape()); + std::memcpy(output.data(), input.data(), input.get_byte_size()); + cc->Outputs().Index(0).Add(new ov::Tensor(output), cc->InputTimestamp()); + return absl::OkStatus(); + } +}; + +REGISTER_CALCULATOR(ErrorOnNegativeTestCalculator); +} // namespace mediapipe diff --git a/src/test/mediapipe/calculators/streaming_test_calculator.cpp b/src/test/mediapipe/calculators/streaming_test_calculator.cpp index 12cb35ed45..599fac86a0 100644 --- a/src/test/mediapipe/calculators/streaming_test_calculator.cpp +++ b/src/test/mediapipe/calculators/streaming_test_calculator.cpp @@ -13,8 +13,6 @@ // See the License for the specific language governing permissions and // limitations under the License. //***************************************************************************** -#include - #include #pragma GCC diagnostic push @@ -213,31 +211,9 @@ class AddSidePacketToSingleStreamTestCalculator : public CalculatorBase { } }; -class ErrorOnNegativeTestCalculator : public CalculatorBase { -public: - static absl::Status GetContract(CalculatorContract* cc) { - cc->Inputs().Index(0).Set(); - cc->Outputs().Index(0).Set(); - return absl::OkStatus(); - } - absl::Status Open(CalculatorContext* cc) final { return absl::OkStatus(); } - absl::Status Close(CalculatorContext* cc) final { return absl::OkStatus(); } - absl::Status Process(CalculatorContext* cc) final { - ov::Tensor input = cc->Inputs().Index(0).Get(); - if (static_cast(input.data())[0] < 0.0f) { - return absl::InvalidArgumentError("Negative input value"); - } - ov::Tensor output(input.get_element_type(), input.get_shape()); - std::memcpy(output.data(), input.data(), input.get_byte_size()); - cc->Outputs().Index(0).Add(new ov::Tensor(output), cc->InputTimestamp()); - return absl::OkStatus(); - } -}; - REGISTER_CALCULATOR(AddOneSingleStreamTestCalculator); REGISTER_CALCULATOR(AddOne3CycleIterationsTestCalculator); REGISTER_CALCULATOR(AddNumbersMultiInputsOutputsTestCalculator); REGISTER_CALCULATOR(ErrorInProcessTestCalculator); -REGISTER_CALCULATOR(ErrorOnNegativeTestCalculator); REGISTER_CALCULATOR(AddSidePacketToSingleStreamTestCalculator); } // namespace mediapipe From 4aee6d617790cf02a110200378e5e8da36947914 Mon Sep 17 00:00:00 2001 From: Adrian Tobiszewski Date: Fri, 29 May 2026 14:40:07 +0200 Subject: [PATCH 9/9] Review fixes --- src/mediapipe_internal/graphqueue.cpp | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/mediapipe_internal/graphqueue.cpp b/src/mediapipe_internal/graphqueue.cpp index cc0d7f7de5..0a53e90dff 100644 --- a/src/mediapipe_internal/graphqueue.cpp +++ b/src/mediapipe_internal/graphqueue.cpp @@ -128,7 +128,6 @@ void GraphHelper::reinitialize(const ::mediapipe::CalculatorGraphConfig& config, auto absStatus = graph->Initialize(config); if (!absStatus.ok()) { SPDLOG_ERROR("Graph reinitialize: Initialize failed: {}", absStatus.ToString()); - graph.reset(); return; } for (const auto& [streamName, holder] : outStreamObservers) { @@ -137,7 +136,6 @@ void GraphHelper::reinitialize(const ::mediapipe::CalculatorGraphConfig& config, }); if (!absStatus.ok()) { SPDLOG_ERROR("Graph reinitialize: ObserveOutputStream failed: {}", absStatus.ToString()); - graph.reset(); return; } } @@ -157,7 +155,6 @@ void GraphHelper::reinitialize(const ::mediapipe::CalculatorGraphConfig& config, absStatus = graph->StartRun(inputSidePackets); if (!absStatus.ok()) { SPDLOG_ERROR("Graph reinitialize: StartRun failed: {}", absStatus.ToString()); - graph.reset(); return; } SPDLOG_DEBUG("Graph reinitialized successfully");