From 2b48c23c6bfd7600c1fdb2bf34ba725771170b80 Mon Sep 17 00:00:00 2001 From: Mikhail Petrov Date: Wed, 11 Mar 2026 18:14:51 +0300 Subject: [PATCH 1/2] IGNITE-28189 Fixed incorrect status of a Compute Task when it is canceled --- .../processors/job/GridJobWorker.java | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java index 90f32db551fff..e3c20f307836d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java @@ -748,23 +748,21 @@ else if (sysStopping && X.hasCause(e, InterruptedException.class, IgniteInterrup */ public void cancel(boolean sys) { try { - final ComputeJob job0 = job; + if (log.isDebugEnabled()) + log.debug("Cancelling job: " + ses); + + status = CANCELLED; if (sys) sysCancelled = true; - if (job0 != null) { - if (log.isDebugEnabled()) - log.debug("Cancelling job: " + ses); - - status = CANCELLED; + final ComputeJob job0 = job; - U.wrapThreadLoader(dep.classLoader(), (IgniteRunnable)() -> { - try (Scope ignored = ctx.security().withContext(secCtx)) { - job0.cancel(); - } - }); - } + U.wrapThreadLoader(dep.classLoader(), (IgniteRunnable)() -> { + try (Scope ignored = ctx.security().withContext(secCtx)) { + job0.cancel(); + } + }); // Interrupting only when all 'cancelled' flags are set. // This allows the 'job' to determine it's a cancellation. @@ -1091,6 +1089,11 @@ ComputeJobStatusEnum status() { return jobId.hashCode(); } + /** {@inheritDoc} */ + @Override public boolean isCancelled() { + return status == CANCELLED; + } + /** {@inheritDoc} */ @Override protected void onCancel(boolean firstCancelRequest) { if (firstCancelRequest) From 9512a4dd2fef9b4e73a026e4b350d4f9722cdf31 Mon Sep 17 00:00:00 2001 From: Mikhail Petrov Date: Thu, 12 Mar 2026 01:32:55 +0300 Subject: [PATCH 2/2] IGNITE-28189 --- .../processors/job/GridJobProcessor.java | 2 +- .../processors/job/GridJobWorker.java | 51 ++++++++----------- 2 files changed, 22 insertions(+), 31 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java index fd1b0122d6dae..3a5924c968430 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java @@ -485,7 +485,7 @@ public GridJobProcessor(GridKernalContext ctx) { // Cancel only if we force grid to stop if (cancel) { for (GridJobWorker job : activeJobs.values()) { - job.onStopping(); + job.onNodeStopping(); cancelJob(job, false); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java index e3c20f307836d..1aed08599eb06 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java @@ -145,16 +145,19 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject { private final AtomicBoolean masterLeaveGuard = new AtomicBoolean(); /** */ - private volatile boolean timedOut; + private volatile boolean isStarted; /** */ - private volatile boolean sysCancelled; + private volatile boolean isCancelled; /** */ - private volatile boolean sysStopping; + private volatile boolean isCancelledBySystem; /** */ - private volatile boolean isStarted; + private volatile boolean isTimedOut; + + /** */ + private volatile boolean isNodeStopping; /** Deployed job. */ private ComputeJob job; @@ -272,15 +275,6 @@ public GridDeployment getDeployment() { return dep; } - /** - * Returns {@code True} if job was cancelled by the system. - * - * @return {@code True} if job was cancelled by the system. - */ - boolean isSystemCanceled() { - return sysCancelled; - } - /** * @return Create time. */ @@ -402,7 +396,7 @@ long getQueuedTime() { * @return {@code True} if job is timed out. */ public boolean isTimedOut() { - return timedOut; + return isTimedOut; } /** @@ -417,7 +411,7 @@ public boolean isInternal() { if (finishing.get()) return; - timedOut = true; + isTimedOut = true; U.warn(log, "Job has timed out: " + ses); @@ -430,8 +424,8 @@ public boolean isInternal() { /** * Callback for whenever grid is stopping. */ - public void onStopping() { - sysStopping = true; + public void onNodeStopping() { + isNodeStopping = true; } /** @@ -561,10 +555,6 @@ private void execute0(boolean skipNtf) { } } - if (isCancelled()) - // If job was cancelled prior to assigning runner to it? - super.cancel(); - if (!skipNtf) { if (holdLsnr.onUnheld(this)) { if (held.decrementAndGet() == 0) @@ -618,7 +608,7 @@ private void execute0(boolean skipNtf) { } } catch (IgniteException e) { - if (sysStopping && e.hasCause(IgniteInterruptedCheckedException.class, InterruptedException.class)) { + if (isNodeStopping && e.hasCause(IgniteInterruptedCheckedException.class, InterruptedException.class)) { ex = handleThrowable(e); assert ex != null; @@ -700,7 +690,7 @@ private IgniteException handleThrowable(Throwable e) { // Special handling for weird interrupted exception which // happens due to JDk 1.5 bug. - if (e instanceof InterruptedException && !sysStopping) { + if (e instanceof InterruptedException && !isNodeStopping) { msg = "Failed to execute job due to interrupted exception."; // Turn interrupted exception into checked exception. @@ -716,7 +706,7 @@ else if ((e instanceof NoClassDefFoundError || e instanceof ClassNotFoundExcepti ex = new ComputeUserUndeclaredException(msg, e); } - else if (sysStopping && X.hasCause(e, InterruptedException.class, IgniteInterruptedCheckedException.class)) { + else if (isNodeStopping && X.hasCause(e, InterruptedException.class, IgniteInterruptedCheckedException.class)) { msg = "Job got interrupted due to system stop (will attempt failover)."; ex = new ComputeExecutionRejectedException(e); @@ -751,10 +741,11 @@ public void cancel(boolean sys) { if (log.isDebugEnabled()) log.debug("Cancelling job: " + ses); - status = CANCELLED; + isCancelled = true; + + isCancelledBySystem = sys; - if (sys) - sysCancelled = true; + status = CANCELLED; final ComputeJob job0 = job; @@ -842,7 +833,7 @@ void finishJob( // Do not send reply if job has been cancelled from system. if (sndReply) - sndReply = !sysCancelled; + sndReply = !isCancelledBySystem; // We should save message ID here since listener callback will reset sequence. ClusterNode sndNode = ctx.discovery().node(taskNode.id()); @@ -903,7 +894,7 @@ else if (!internal && ctx.event().isRecordable(EVT_JOB_REJECTED)) ex, res, attrs, - isCancelled(), + isCancelled, retry ? ctx.cache().context().exchange().readyAffinityVersion() : null); if (!loc) @@ -1091,7 +1082,7 @@ ComputeJobStatusEnum status() { /** {@inheritDoc} */ @Override public boolean isCancelled() { - return status == CANCELLED; + return isCancelled; } /** {@inheritDoc} */