Skip to content
1 change: 0 additions & 1 deletion src/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,6 @@ ovms_cc_library(
"libovms_cliparser",
"libovms_systeminfo",
"ovms_exit_codes",
"//src/utils:env_guard",
],
visibility = ["//visibility:public",],
additional_copts = COPTS_DROGON,
Expand Down
10 changes: 0 additions & 10 deletions src/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
#include "modelconfig.hpp"
#include "stringutils.hpp"
#include "systeminfo.hpp"
#include "utils/env_guard.hpp"

namespace ovms {

Expand Down Expand Up @@ -91,15 +90,6 @@ Config& Config::parse(int argc, char** argv) {
bool Config::parse(ServerSettingsImpl* serverSettings, ModelsSettingsImpl* modelsSettings) {
this->serverSettings = *serverSettings;
this->modelsSettings = *modelsSettings;
static EnvGuard envGuard;
#if defined(__linux__) || defined(_WIN32)
if (this->serverSettings.logLevel == "DEBUG" || this->serverSettings.logLevel == "TRACE") {
envGuard.set("OPENVINO_LOG_LEVEL", "4");
}
#endif
if (GetEnvVar("OVMS_GRAPH_QUEUE_OFF").empty()) {
envGuard.set("OVMS_GRAPH_QUEUE_OFF", "1");
}
return validate();
}

Expand Down
8 changes: 8 additions & 0 deletions src/logging.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#if (MEDIAPIPE_DISABLE == 0)
#include <glog/logging.h>
#endif
#include <cstdlib>
#include <vector>

namespace ovms {
Expand Down Expand Up @@ -163,6 +164,13 @@ void configure_logger(const std::string& log_level, const std::string& log_path)
FLAGS_minloglevel = google::GLOG_ERROR;
#endif
#endif
if (log_level == "DEBUG" || log_level == "TRACE") {
#ifdef _WIN32
_putenv_s("OPENVINO_LOG_LEVEL", "4");
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not using guard utils since we dont want cyclic dependency on logging.

#else
::setenv("OPENVINO_LOG_LEVEL", "4", 1);
#endif
}
}

} // namespace ovms
76 changes: 65 additions & 11 deletions src/mediapipe_internal/graphqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,25 +85,79 @@ GraphQueue::GraphQueue(const ::mediapipe::CalculatorGraphConfig& config, std::sh
SPDLOG_ERROR("Graph queue StartRun failed: {}", absStatus.ToString());
throw std::runtime_error(absStatus.ToString());
}
inferRequests.emplace_back(std::move(graphHelper));
this->inferRequests.emplace_back(std::move(graphHelper));
}
}
GraphQueue::~GraphQueue() {
for (auto& graphHelper : inferRequests) {
auto absStatus = graphHelper->graph->WaitUntilIdle();
GraphHelper::~GraphHelper() {
if (!graph) {
return;
}
auto absStatus = graph->WaitUntilIdle();
if (!absStatus.ok()) {
SPDLOG_DEBUG("GraphHelper WaitUntilIdle error: {}", absStatus.ToString());
}
absStatus = graph->CloseAllPacketSources();
if (!absStatus.ok()) {
SPDLOG_DEBUG("GraphHelper CloseAllPacketSources error: {}", absStatus.ToString());
}
absStatus = graph->WaitUntilDone();
if (!absStatus.ok()) {
SPDLOG_DEBUG("GraphHelper WaitUntilDone error: {}", absStatus.ToString());
}
graph->Cancel();
}

void GraphHelper::reinitialize(const ::mediapipe::CalculatorGraphConfig& config, const GraphSidePackets& sidePacketMaps) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit test that will ensure failed graphs are reusable is missing

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test

SPDLOG_DEBUG("Reinitializing graph after error");
// Tear down the old graph (best-effort, errors expected since graph is in bad state)
if (this->graph) {
auto absStatus = this->graph->CloseAllPacketSources();
Comment on lines +110 to +114
if (!absStatus.ok()) {
SPDLOG_DEBUG("Graph queue WaitUntilIdle error: {}", absStatus.ToString());
SPDLOG_DEBUG("reinitialize: CloseAllPacketSources: {}", absStatus.ToString());
}
absStatus = graphHelper->graph->CloseAllPacketSources();
absStatus = this->graph->WaitUntilDone();
if (!absStatus.ok()) {
SPDLOG_DEBUG("Graph queue CloseAllPacketSources error: {}", absStatus.ToString());
SPDLOG_DEBUG("reinitialize: WaitUntilDone: {}", absStatus.ToString());
}
absStatus = graphHelper->graph->WaitUntilDone();
this->graph->Cancel();
}
// Create fresh graph
graph = std::make_unique<::mediapipe::CalculatorGraph>();
currentTimestamp = ::mediapipe::Timestamp(0);

auto absStatus = graph->Initialize(config);
if (!absStatus.ok()) {
SPDLOG_ERROR("Graph reinitialize: Initialize failed: {}", absStatus.ToString());
return;
Comment on lines +128 to +131
}
Comment on lines +128 to +132
for (const auto& [streamName, holder] : outStreamObservers) {
absStatus = graph->ObserveOutputStream(streamName, [holder](const ::mediapipe::Packet& packet) -> absl::Status {
return holder->current->handlePacket(packet);
});
if (!absStatus.ok()) {
SPDLOG_DEBUG("Graph queue WaitUntilDone error: {}", absStatus.ToString());
SPDLOG_ERROR("Graph reinitialize: ObserveOutputStream failed: {}", absStatus.ToString());
return;
}
Comment on lines 137 to 140
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will remove graph.reset.

graphHelper->graph->Cancel();
graphHelper->graph.reset();
}
// Reset observers to null sentinel
for (const auto& [streamName, holder] : outStreamObservers) {
holder->current = std::make_shared<NullOutputStreamObserver>();
}
// Reset execution contexts
for (auto& [nodeName, ctx] : genAiExecutionContextMap) {
ctx->reset();
}
std::map<std::string, mediapipe::Packet> inputSidePackets;
buildInputSidePackets(inputSidePackets, sidePacketMaps);
inputSidePackets[LLM_EXECUTION_CONTEXT_SESSION_SIDE_PACKET_TAG] =
mediapipe::MakePacket<GenAiExecutionContextMap>(genAiExecutionContextMap)
.At(::mediapipe::Timestamp(STARTING_TIMESTAMP_VALUE));
absStatus = graph->StartRun(inputSidePackets);
if (!absStatus.ok()) {
SPDLOG_ERROR("Graph reinitialize: StartRun failed: {}", absStatus.ToString());
return;
}
Comment on lines +155 to +159
SPDLOG_DEBUG("Graph reinitialized successfully");
}
GraphQueue::~GraphQueue() = default;
} // namespace ovms
33 changes: 33 additions & 0 deletions src/mediapipe_internal/graphqueue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,40 @@ struct GraphHelper {
genAiExecutionContextMap(std::move(gh.genAiExecutionContextMap)),
currentTimestamp(gh.currentTimestamp) {}
GraphHelper& operator=(GraphHelper&&) = delete;
~GraphHelper();
// Tears down the current (errored) graph and rebuilds a fresh one
// with the same observers and side packets. Called when inference
// encounters a graph error to avoid returning a poisoned graph to the pool.
void reinitialize(const ::mediapipe::CalculatorGraphConfig& config, const GraphSidePackets& sidePacketMaps);
};

// RAII guard that reinitializes the graph if inference exits with an error.
// Construct before the first graph interaction (packet push). Call dismiss()
// on the success path. If not dismissed, the destructor rebuilds the graph
// so the next request from the pool gets a clean graph.
class GraphReinitGuard {
GraphHelper& helper;
const ::mediapipe::CalculatorGraphConfig& config;
const GraphSidePackets& sidePacketMaps;
bool dismissed = false;

public:
GraphReinitGuard(GraphHelper& helper,
const ::mediapipe::CalculatorGraphConfig& config,
const GraphSidePackets& sidePacketMaps) :
helper(helper),
config(config),
sidePacketMaps(sidePacketMaps) {}
void dismiss() { dismissed = true; }
~GraphReinitGuard() {
if (!dismissed) {
helper.reinitialize(config, sidePacketMaps);
Comment on lines +93 to +95
}
}
Comment on lines +93 to +97
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case we could only log error anyway. Will add that.

GraphReinitGuard(const GraphReinitGuard&) = delete;
GraphReinitGuard& operator=(const GraphReinitGuard&) = delete;
};

// we need to keep Graph alive during MP reload hence shared_ptr
class GraphQueue : public Queue<std::shared_ptr<GraphHelper>> {
std::shared_ptr<GraphSidePackets> sidePacketMaps;
Expand Down
12 changes: 12 additions & 0 deletions src/mediapipe_internal/mediapipegraphexecutor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ class MediapipeGraphExecutor {
guard->graphHelper->outStreamObservers.at(name)->current = std::make_shared<MyFunctor<RequestType, ResponseType>>(name, this->outputTypes.at(name), *this, *request, *response);
}

// Guard: if inference fails after this point, reinitialize the graph
// so the next request from the pool gets a clean graph (not poisoned).
GraphReinitGuard reinitGuard(*this->guard->graphHelper, this->config, this->sidePacketMaps);

size_t numberOfPacketsCreated = 0;
auto ovms_status = createAndPushPacketsImpl(
std::shared_ptr<const RequestType>(request, [](const RequestType*) {}),
Expand Down Expand Up @@ -227,6 +231,8 @@ class MediapipeGraphExecutor {
}
resetLlmExecutionContexts(this->guard->graphHelper->genAiExecutionContextMap);
MP_RETURN_ON_FAIL(status, "graph wait until idle", mediapipeAbslToOvmsStatus(status.code()));
// Success — dismiss the guard, graph is healthy
reinitGuard.dismiss();
// Increment timestamp for next request reusing this graph from the queue
this->guard->graphHelper->currentTimestamp = ::mediapipe::Timestamp(this->guard->graphHelper->currentTimestamp.Value() + 1);
SPDLOG_DEBUG("Received all output stream packets for graph: {}", this->name);
Expand Down Expand Up @@ -393,6 +399,10 @@ class MediapipeGraphExecutor {
executionContext, this->mediapipeServableMetricReporter);
}

// Guard: if streaming inference fails, reinitialize the graph
// so the next request from the pool gets a clean graph (not poisoned).
GraphReinitGuard reinitGuard(*this->guard->graphHelper, this->config, this->sidePacketMaps);

size_t numberOfPacketsCreated = 0;
{
OVMS_PROFILE_SCOPE("Mediapipe graph deserializing first request");
Expand Down Expand Up @@ -450,6 +460,8 @@ class MediapipeGraphExecutor {
}
resetLlmExecutionContexts(this->guard->graphHelper->genAiExecutionContextMap);
MP_RETURN_ON_FAIL(status, "graph wait until idle", mediapipeAbslToOvmsStatus(status.code()));
// Success — dismiss the guard, graph is healthy
reinitGuard.dismiss();
// Increment timestamp for next request reusing this graph from the queue
this->guard->graphHelper->currentTimestamp = ::mediapipe::Timestamp(this->guard->graphHelper->currentTimestamp.Value() + 1);
SPDLOG_DEBUG("Graph {}: Done streaming execution (queue path)", this->name);
Expand Down
1 change: 1 addition & 0 deletions src/test/mediapipe/calculators/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ cc_library(
"ovms_calculator.cc",
"ovms_image_input_calculator.cc",
"ovms_kfs_calculator.cc",
"error_on_negative_test_calculator.cpp",
"streaming_test_calculator.cpp",
"two_input_calculator.cpp",
],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
//*****************************************************************************
// Copyright 2025 Intel Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//*****************************************************************************
#include <cstring>

#include <openvino/openvino.hpp>

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
#include "mediapipe/framework/calculator_framework.h"
#pragma GCC diagnostic pop

namespace mediapipe {

class ErrorOnNegativeTestCalculator : public CalculatorBase {
public:
static absl::Status GetContract(CalculatorContract* cc) {
cc->Inputs().Index(0).Set<ov::Tensor>();
cc->Outputs().Index(0).Set<ov::Tensor>();
return absl::OkStatus();
}
absl::Status Open(CalculatorContext* cc) final { return absl::OkStatus(); }
absl::Status Close(CalculatorContext* cc) final { return absl::OkStatus(); }
absl::Status Process(CalculatorContext* cc) final {
ov::Tensor input = cc->Inputs().Index(0).Get<ov::Tensor>();
if (static_cast<float*>(input.data())[0] < 0.0f) {
return absl::InvalidArgumentError("Negative input value");
}
ov::Tensor output(input.get_element_type(), input.get_shape());
std::memcpy(output.data(), input.data(), input.get_byte_size());
cc->Outputs().Index(0).Add(new ov::Tensor(output), cc->InputTimestamp());
return absl::OkStatus();
}
};

REGISTER_CALCULATOR(ErrorOnNegativeTestCalculator);
} // namespace mediapipe
71 changes: 71 additions & 0 deletions src/test/mediapipeflow_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3855,6 +3855,7 @@ TEST(WhitelistRegistered, MediapipeCalculatorsList) {
"EndLoopTensorCalculator",
"EndLoopTfLiteTensorCalculator",
"ErrorInProcessTestCalculator",
"ErrorOnNegativeTestCalculator",
"ExceptionDuringCloseCalculator",
"ExceptionDuringGetContractCalculator",
"ExceptionDuringOpenCalculator",
Expand Down Expand Up @@ -4307,3 +4308,73 @@ TEST(MediapipeGraphQueueSizeDirective, EnvVarOVMS_GRAPH_QUEUE_OFF_NotSetDoesNotD
ASSERT_EQ(status, ovms::StatusCode::OK);
EXPECT_GT(def.getMediapipeGraphConfig().getInitialQueueSize(), 0);
}

// --- Graph queue reinit guard tests ---

class UnaryQueueReinitTest : public ::testing::Test {
protected:
const std::string name{"reinit_test_graph"};
const std::string version{"1"};
ExecutionContext executionContext{ExecutionContext::Interface::GRPC, ExecutionContext::Method::ModelInfer};
std::unique_ptr<MediapipeServableMetricReporter> reporter;
std::shared_ptr<GraphSidePackets> sidePackets;
std::shared_ptr<GraphQueue> queue;
::mediapipe::CalculatorGraphConfig config;

void SetUp() override {
reporter = std::make_unique<MediapipeServableMetricReporter>(nullptr, nullptr, "");
sidePackets = std::make_shared<GraphSidePackets>();
const std::string pbTxt{R"(
input_stream: "in"
output_stream: "out"
node {
calculator: "ErrorOnNegativeTestCalculator"
input_stream: "in"
output_stream: "out"
}
)"};
ASSERT_TRUE(::google::protobuf::TextFormat::ParseFromString(pbTxt, &config));
queue = std::make_shared<GraphQueue>(config, sidePackets, 1);
}

void prepareInferRequest(KFSRequest& request, float value) {
request.Clear();
*request.mutable_model_name() = "my_graph";
*request.mutable_model_version() = "1";
prepareKFSInferInputTensor(request, "in", std::tuple<ovms::signed_shape_t, const ovms::Precision>{{1}, ovms::Precision::FP32}, std::vector<float>{value}, false);
request.mutable_parameters()->operator[]("OVMS_MP_TIMESTAMP").set_int64_param(0);
}
};

TEST_F(UnaryQueueReinitTest, GraphIsReinitializedAfterCalculatorError) {
KFSRequest request;
KFSResponse response;
{
GraphIdGuard guard(queue);
MediapipeGraphExecutor executor{
name, version, config,
{{"in", mediapipe_packet_type_enum::OVTENSOR}},
{{"out", mediapipe_packet_type_enum::OVTENSOR}},
{"in"}, {"out"}, *sidePackets, nullptr, reporter.get(),
std::move(guard)};
prepareInferRequest(request, -1.0f);
auto status = executor.infer<KFSRequest, KFSResponse>(&request, &response, executionContext);
ASSERT_FALSE(status.ok());
EXPECT_EQ(status.getCode(), StatusCode::MEDIAPIPE_EXECUTION_ERROR);
}
// Executor destroyed → GraphIdGuard returns graph to pool.
// The reinit guard rebuilt the graph before returning the error.
// Second request with valid (positive) input should succeed.
{
GraphIdGuard guard(queue);
MediapipeGraphExecutor executor{
name, version, config,
{{"in", mediapipe_packet_type_enum::OVTENSOR}},
{{"out", mediapipe_packet_type_enum::OVTENSOR}},
{"in"}, {"out"}, *sidePackets, nullptr, reporter.get(),
std::move(guard)};
prepareInferRequest(request, 2.0f);
auto status = executor.infer<KFSRequest, KFSResponse>(&request, &response, executionContext);
ASSERT_TRUE(status.ok());
}
}
2 changes: 1 addition & 1 deletion src/utils/env_guard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
//*****************************************************************************
#include "env_guard.hpp"

#include "../logging.hpp"
#include "src/logging.hpp"

#include <stdlib.h>

Expand Down