-
Notifications
You must be signed in to change notification settings - Fork 254
Add graph reinit on failure #4237
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
1850989
537cc52
271f337
271ee8e
28aadd6
10477df
6b96b84
4806c6b
8d60096
d6c7ff0
4aee6d6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -85,25 +85,79 @@ GraphQueue::GraphQueue(const ::mediapipe::CalculatorGraphConfig& config, std::sh | |
| SPDLOG_ERROR("Graph queue StartRun failed: {}", absStatus.ToString()); | ||
| throw std::runtime_error(absStatus.ToString()); | ||
| } | ||
| inferRequests.emplace_back(std::move(graphHelper)); | ||
| this->inferRequests.emplace_back(std::move(graphHelper)); | ||
| } | ||
| } | ||
| GraphQueue::~GraphQueue() { | ||
| for (auto& graphHelper : inferRequests) { | ||
| auto absStatus = graphHelper->graph->WaitUntilIdle(); | ||
| GraphHelper::~GraphHelper() { | ||
| if (!graph) { | ||
| return; | ||
| } | ||
| auto absStatus = graph->WaitUntilIdle(); | ||
| if (!absStatus.ok()) { | ||
| SPDLOG_DEBUG("GraphHelper WaitUntilIdle error: {}", absStatus.ToString()); | ||
| } | ||
| absStatus = graph->CloseAllPacketSources(); | ||
| if (!absStatus.ok()) { | ||
| SPDLOG_DEBUG("GraphHelper CloseAllPacketSources error: {}", absStatus.ToString()); | ||
| } | ||
| absStatus = graph->WaitUntilDone(); | ||
| if (!absStatus.ok()) { | ||
| SPDLOG_DEBUG("GraphHelper WaitUntilDone error: {}", absStatus.ToString()); | ||
| } | ||
| graph->Cancel(); | ||
| } | ||
|
|
||
| void GraphHelper::reinitialize(const ::mediapipe::CalculatorGraphConfig& config, const GraphSidePackets& sidePacketMaps) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unit test that will ensure failed graphs are reusable is missing
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added test |
||
| SPDLOG_DEBUG("Reinitializing graph after error"); | ||
| // Tear down the old graph (best-effort, errors expected since graph is in bad state) | ||
| if (this->graph) { | ||
| auto absStatus = this->graph->CloseAllPacketSources(); | ||
|
Comment on lines
+110
to
+114
|
||
| if (!absStatus.ok()) { | ||
| SPDLOG_DEBUG("Graph queue WaitUntilIdle error: {}", absStatus.ToString()); | ||
| SPDLOG_DEBUG("reinitialize: CloseAllPacketSources: {}", absStatus.ToString()); | ||
| } | ||
| absStatus = graphHelper->graph->CloseAllPacketSources(); | ||
| absStatus = this->graph->WaitUntilDone(); | ||
| if (!absStatus.ok()) { | ||
| SPDLOG_DEBUG("Graph queue CloseAllPacketSources error: {}", absStatus.ToString()); | ||
| SPDLOG_DEBUG("reinitialize: WaitUntilDone: {}", absStatus.ToString()); | ||
| } | ||
| absStatus = graphHelper->graph->WaitUntilDone(); | ||
| this->graph->Cancel(); | ||
| } | ||
| // Create fresh graph | ||
| graph = std::make_unique<::mediapipe::CalculatorGraph>(); | ||
| currentTimestamp = ::mediapipe::Timestamp(0); | ||
|
|
||
| auto absStatus = graph->Initialize(config); | ||
| if (!absStatus.ok()) { | ||
| SPDLOG_ERROR("Graph reinitialize: Initialize failed: {}", absStatus.ToString()); | ||
| return; | ||
|
Comment on lines
+128
to
+131
|
||
| } | ||
|
Comment on lines
+128
to
+132
|
||
| for (const auto& [streamName, holder] : outStreamObservers) { | ||
| absStatus = graph->ObserveOutputStream(streamName, [holder](const ::mediapipe::Packet& packet) -> absl::Status { | ||
| return holder->current->handlePacket(packet); | ||
| }); | ||
| if (!absStatus.ok()) { | ||
| SPDLOG_DEBUG("Graph queue WaitUntilDone error: {}", absStatus.ToString()); | ||
| SPDLOG_ERROR("Graph reinitialize: ObserveOutputStream failed: {}", absStatus.ToString()); | ||
| return; | ||
| } | ||
|
Comment on lines
137
to
140
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will remove graph.reset. |
||
| graphHelper->graph->Cancel(); | ||
| graphHelper->graph.reset(); | ||
| } | ||
| // Reset observers to null sentinel | ||
| for (const auto& [streamName, holder] : outStreamObservers) { | ||
| holder->current = std::make_shared<NullOutputStreamObserver>(); | ||
| } | ||
| // Reset execution contexts | ||
| for (auto& [nodeName, ctx] : genAiExecutionContextMap) { | ||
| ctx->reset(); | ||
| } | ||
| std::map<std::string, mediapipe::Packet> inputSidePackets; | ||
| buildInputSidePackets(inputSidePackets, sidePacketMaps); | ||
| inputSidePackets[LLM_EXECUTION_CONTEXT_SESSION_SIDE_PACKET_TAG] = | ||
| mediapipe::MakePacket<GenAiExecutionContextMap>(genAiExecutionContextMap) | ||
| .At(::mediapipe::Timestamp(STARTING_TIMESTAMP_VALUE)); | ||
| absStatus = graph->StartRun(inputSidePackets); | ||
| if (!absStatus.ok()) { | ||
| SPDLOG_ERROR("Graph reinitialize: StartRun failed: {}", absStatus.ToString()); | ||
| return; | ||
| } | ||
|
Comment on lines
+155
to
+159
|
||
| SPDLOG_DEBUG("Graph reinitialized successfully"); | ||
| } | ||
| GraphQueue::~GraphQueue() = default; | ||
| } // namespace ovms | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -65,7 +65,40 @@ struct GraphHelper { | |
| genAiExecutionContextMap(std::move(gh.genAiExecutionContextMap)), | ||
| currentTimestamp(gh.currentTimestamp) {} | ||
| GraphHelper& operator=(GraphHelper&&) = delete; | ||
| ~GraphHelper(); | ||
| // Tears down the current (errored) graph and rebuilds a fresh one | ||
| // with the same observers and side packets. Called when inference | ||
| // encounters a graph error to avoid returning a poisoned graph to the pool. | ||
| void reinitialize(const ::mediapipe::CalculatorGraphConfig& config, const GraphSidePackets& sidePacketMaps); | ||
| }; | ||
|
|
||
| // RAII guard that reinitializes the graph if inference exits with an error. | ||
| // Construct before the first graph interaction (packet push). Call dismiss() | ||
| // on the success path. If not dismissed, the destructor rebuilds the graph | ||
| // so the next request from the pool gets a clean graph. | ||
| class GraphReinitGuard { | ||
| GraphHelper& helper; | ||
| const ::mediapipe::CalculatorGraphConfig& config; | ||
| const GraphSidePackets& sidePacketMaps; | ||
| bool dismissed = false; | ||
|
|
||
| public: | ||
| GraphReinitGuard(GraphHelper& helper, | ||
| const ::mediapipe::CalculatorGraphConfig& config, | ||
| const GraphSidePackets& sidePacketMaps) : | ||
| helper(helper), | ||
| config(config), | ||
| sidePacketMaps(sidePacketMaps) {} | ||
| void dismiss() { dismissed = true; } | ||
| ~GraphReinitGuard() { | ||
| if (!dismissed) { | ||
| helper.reinitialize(config, sidePacketMaps); | ||
|
Comment on lines
+93
to
+95
|
||
| } | ||
| } | ||
|
Comment on lines
+93
to
+97
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In that case we could only log error anyway. Will add that. |
||
| GraphReinitGuard(const GraphReinitGuard&) = delete; | ||
| GraphReinitGuard& operator=(const GraphReinitGuard&) = delete; | ||
| }; | ||
|
|
||
| // we need to keep Graph alive during MP reload hence shared_ptr | ||
| class GraphQueue : public Queue<std::shared_ptr<GraphHelper>> { | ||
| std::shared_ptr<GraphSidePackets> sidePacketMaps; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| //***************************************************************************** | ||
| // Copyright 2025 Intel Corporation | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
| //***************************************************************************** | ||
| #include <cstring> | ||
|
|
||
| #include <openvino/openvino.hpp> | ||
|
|
||
| #pragma GCC diagnostic push | ||
| #pragma GCC diagnostic ignored "-Wdeprecated-declarations" | ||
| #include "mediapipe/framework/calculator_framework.h" | ||
| #pragma GCC diagnostic pop | ||
|
|
||
| namespace mediapipe { | ||
|
|
||
| class ErrorOnNegativeTestCalculator : public CalculatorBase { | ||
| public: | ||
| static absl::Status GetContract(CalculatorContract* cc) { | ||
| cc->Inputs().Index(0).Set<ov::Tensor>(); | ||
| cc->Outputs().Index(0).Set<ov::Tensor>(); | ||
| return absl::OkStatus(); | ||
| } | ||
| absl::Status Open(CalculatorContext* cc) final { return absl::OkStatus(); } | ||
| absl::Status Close(CalculatorContext* cc) final { return absl::OkStatus(); } | ||
| absl::Status Process(CalculatorContext* cc) final { | ||
| ov::Tensor input = cc->Inputs().Index(0).Get<ov::Tensor>(); | ||
| if (static_cast<float*>(input.data())[0] < 0.0f) { | ||
| return absl::InvalidArgumentError("Negative input value"); | ||
| } | ||
| ov::Tensor output(input.get_element_type(), input.get_shape()); | ||
| std::memcpy(output.data(), input.data(), input.get_byte_size()); | ||
| cc->Outputs().Index(0).Add(new ov::Tensor(output), cc->InputTimestamp()); | ||
| return absl::OkStatus(); | ||
| } | ||
| }; | ||
|
|
||
| REGISTER_CALCULATOR(ErrorOnNegativeTestCalculator); | ||
| } // namespace mediapipe |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not using guard utils since we dont want cyclic dependency on logging.