From 31e0039e8cd6b30f8f4c36e760ff2a67228a6dfe Mon Sep 17 00:00:00 2001 From: Liam Vilhelmsson Date: Thu, 12 Mar 2026 11:08:12 +0100 Subject: [PATCH 1/4] Add OutboxInfoCollector --- .../info/collectors/OutboxInfoCollector.java | 529 ++++++++++++++++++ 1 file changed, 529 insertions(+) create mode 100644 cds-feature-console/src/main/java/com/sap/cds/feature/console/info/collectors/OutboxInfoCollector.java diff --git a/cds-feature-console/src/main/java/com/sap/cds/feature/console/info/collectors/OutboxInfoCollector.java b/cds-feature-console/src/main/java/com/sap/cds/feature/console/info/collectors/OutboxInfoCollector.java new file mode 100644 index 0000000..3e6b71b --- /dev/null +++ b/cds-feature-console/src/main/java/com/sap/cds/feature/console/info/collectors/OutboxInfoCollector.java @@ -0,0 +1,529 @@ +/* + * © 2024 SAP SE or an SAP affiliate company. All rights reserved. + */ +package com.sap.cds.feature.dashboard.info.collectors; + +import static com.sap.cds.services.impl.outbox.persistence.OutboxQueries.messagesQuery; + +import com.sap.cds.Result; +import com.sap.cds.Row; +import com.sap.cds.feature.dashboard.connectivity.InfoEvent; +import com.sap.cds.feature.dashboard.info.InfoCollector; +import com.sap.cds.feature.dashboard.info.Path; +import com.sap.cds.feature.dashboard.service.DashboardCommandEventContext; +import com.sap.cds.feature.dashboard.service.DashboardService; +import com.sap.cds.ql.Delete; +import com.sap.cds.ql.Insert; +import com.sap.cds.ql.Select; +import com.sap.cds.ql.Update; +import com.sap.cds.ql.cqn.CqnAnalyzer; +import com.sap.cds.ql.cqn.CqnSelect; +import com.sap.cds.services.EventContext; +import com.sap.cds.services.cds.CdsDeleteEventContext; +import com.sap.cds.services.cds.CqnService; +import com.sap.cds.services.changeset.ChangeSetListener; +import com.sap.cds.services.handler.EventHandler; +import com.sap.cds.services.handler.annotations.After; +import com.sap.cds.services.handler.annotations.Before; +import com.sap.cds.services.handler.annotations.ServiceName; +import com.sap.cds.services.impl.outbox.Messages; +import com.sap.cds.services.impl.outbox.Messages_; +import com.sap.cds.services.impl.outbox.persistence.PersistentOutbox; +import com.sap.cds.services.impl.outbox.persistence.collectors.PartitionCollector; +import com.sap.cds.services.messaging.utils.CloudEventUtils; +import com.sap.cds.services.mt.DeploymentService; +import com.sap.cds.services.mt.SubscribeEventContext; +import com.sap.cds.services.mt.TenantProviderService; +import com.sap.cds.services.outbox.OutboxMessage; +import com.sap.cds.services.persistence.PersistenceService; +import com.sap.cds.services.runtime.CdsRuntime; +import com.sap.cds.services.utils.outbox.OutboxUtils; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.slf4j.LoggerFactory; + +@ServiceName(DashboardService.DEFAULT_NAME) +public class OutboxInfoCollector extends InfoCollector implements EventHandler { + + private static final org.slf4j.Logger logger = LoggerFactory.getLogger(InfoCollector.class); + + public static final String TYPE = "outbox"; + + public static final String COMMAND_CREATE = TYPE + "/create"; + public static final String COMMAND_RESET = TYPE + "/reset"; + public static final String COMMAND_REMOVE = TYPE + "/remove"; + public static final String COMMAND_REPLAY = TYPE + "/replay"; + public static final String COMMAND_REMOVE_HISTORY = TYPE + "/remove-history"; + public static final String COMMAND_START_COLLECTOR = TYPE + "/start-collector"; + public static final String COMMAND_STOP_COLLECTOR = TYPE + "/stop-collector"; + + private static final int MAX_HISTORY = 25; + + private PersistenceService persistenceService; + private boolean isPersistentOutboxEnabled; + + private Map> lastSeenEntries = new HashMap<>(); + private Map outboxes; + + public OutboxInfoCollector(CdsRuntime runtime, DashboardService dashboardService) { + super(runtime, dashboardService); + persistenceService = + runtime + .getServiceCatalog() + .getService(PersistenceService.class, PersistenceService.DEFAULT_NAME); + isPersistentOutboxEnabled = + runtime.getServiceCatalog().getServices(PersistentOutbox.class).count() > 0; + } + + private TenantProviderService getTenantService() { + return getRuntime() + .getServiceCatalog() + .getService(TenantProviderService.class, TenantProviderService.DEFAULT_NAME); + } + + @After(event = ClientInfoCollector.COMMAND_ATTACHED) + private void dashboardAttached(DashboardCommandEventContext context) { + outboxes = new HashMap<>(); + context + .getServiceCatalog() + .getServices(PersistentOutbox.class) + .forEach( + box -> + outboxes.put( + box.getName(), + context + .getCdsRuntime() + .getEnvironment() + .getCdsProperties() + .getOutbox() + .getService(box.getName()))); + emitInfoDashboardEvent(() -> getStatus()); + emitInfoDashboardEvent(() -> getActiveStatus()); + if (MultitenancyInfoCollector.isMtEnabled(getRuntime())) { + List tenants = getTenantService().readTenants(); + tenants.forEach( + tenant -> { + emitInfoDashboardEvent(() -> getTenantStatus(tenant)); + emitInfoDashboardEvent(() -> getLastSeenEntries(tenant)); + }); + } else { + emitInfoDashboardEvent(() -> getTenantStatus(null)); + emitInfoDashboardEvent(() -> getLastSeenEntries(null)); + } + } + + private String getTenant(DashboardCommandEventContext context) { + if (MultitenancyInfoCollector.isMtEnabled(getRuntime())) { + return (String) context.getData().get("tenant"); + } + return null; + } + + @After(event = COMMAND_START_COLLECTOR) + private void startCollector(DashboardCommandEventContext context) { + String outboxName = (String) context.getData().get("name"); + PersistentOutbox outbox = + getRuntime().getServiceCatalog().getService(PersistentOutbox.class, outboxName); + outbox.start(); + for (int i = 0; i < 10; i++) { + try { + Thread.sleep(200); + } catch (InterruptedException e) { + // + } + if (outbox.isCollectorRunning()) { + emitInfoDashboardEvent(() -> getActiveStatus()); + return; + } + } + } + + @After(event = COMMAND_STOP_COLLECTOR) + private void stopCollector(DashboardCommandEventContext context) { + String outboxName = (String) context.getData().get("name"); + PersistentOutbox outbox = + getRuntime().getServiceCatalog().getService(PersistentOutbox.class, outboxName); + outbox.stop(); + emitInfoDashboardEvent(() -> getActiveStatus()); + } + + @SuppressWarnings("unchecked") + @After(event = COMMAND_CREATE) + private void createEntry(DashboardCommandEventContext context) { + sendInfoNotification("Outbox Entry Create", "Creating the outbox entry..."); + String outbox = (String) context.getData().get("outbox"); + String tenant = getTenant(context); + String event = (String) context.getData().get("event"); + OutboxMessage msg = OutboxMessage.create(); + Map payload = (Map) context.getData().get("payload"); + if (payload != null && payload.containsKey("event")) { + msg.setEvent((String) payload.remove("event")); + msg.setParams((Map) payload.get("params")); + } + + getRuntime() + .requestContext() + .systemUser(tenant) + .run( + r -> { + r.getServiceCatalog().getService(PersistentOutbox.class, outbox).submit(event, msg); + }); + } + + @After(event = COMMAND_RESET) + private void resetEntry(DashboardCommandEventContext context) { + sendInfoNotification("Outbox Entry Reset", "Resetting the outbox entry..."); + String id = (String) context.getData().get("id"); + String tenant = getTenant(context); + String target = (String) context.getData().get("target"); + getRuntime() + .requestContext() + .systemUser(tenant) + .run( + r -> { + getRuntime() + .changeSetContext() + .run( + ch -> { + if (target != null) { + ch.register( + new ChangeSetListener() { + @Override + public void afterClose(boolean completed) { + scheduleOutbox(context.getCdsRuntime(), target); + } + }); + } + try { + InfoCollector.inDashboardContext( + () -> + persistenceService.run( + Update.entity(Messages_.class) + .data(Messages.ATTEMPTS, 0) + .where(m -> m.ID().eq(id)))); + sendSuccessNotification( + "Outbox Entry Reset", "Outbox entry successfully reset!"); + } catch (Throwable th) { + sendErrorNotification( + "Error Outbox Entry Reset", + "Entry could not be reset '%s'", + th.getMessage()); + } + }); + }); + } + + @After(event = COMMAND_REMOVE) + private void removeEntry(DashboardCommandEventContext context) { + sendInfoNotification("Outbox Entry Remove", "Removing the outbox entry..."); + String id = (String) context.getData().get("id"); + String tenant = getTenant(context); + getRuntime() + .requestContext() + .systemUser(tenant) + .run( + r -> { + getRuntime() + .changeSetContext() + .run( + ch -> { + try { + InfoCollector.inDashboardContext( + () -> + persistenceService.run( + Delete.from(Messages_.class).where(e -> e.ID().eq(id)))); + sendSuccessNotification( + "Outbox Entry Remove", "Outbox entry successfully removed!"); + } catch (Throwable th) { + sendErrorNotification( + "Error Outbox Entry Remove", + "Entry could not be removed '%s'", + th.getMessage()); + } + }); + }); + } + + @After(event = COMMAND_REPLAY) + private void replayEntry(DashboardCommandEventContext context) { + sendInfoNotification("Outbox Entry Replay", "Replaying the outbox entry..."); + String id = (String) context.getData().get("id"); + String tenant = getTenant(context); + synchronized (lastSeenEntries) { + if (lastSeenEntries.containsKey(tenant)) { + List removedFromHistory = new ArrayList<>(); + lastSeenEntries.get(tenant).stream() + .map(e -> ((Row) e).as(Messages.class)) + .filter(msg -> msg.getId().equals(id)) + .forEach( + msg -> { + getRuntime() + .requestContext() + .systemUser(tenant) + .run( + req -> { + getRuntime() + .changeSetContext() + .run( + ch -> { + try { + + Messages newMsg = Messages.create(); + newMsg.setMsg(msg.getMsg()); + newMsg.setTarget(msg.getTarget()); + newMsg.setTimestamp(Instant.now()); + String appid = + getRuntime() + .getEnvironment() + .getCdsProperties() + .getEnvironment() + .getDeployment() + .getAppid(); + if (appid != null) { + newMsg.setAppid(appid); + } + + InfoCollector.inDashboardContext( + () -> + persistenceService.run( + Insert.into(Messages_.class).entry(newMsg))); + removedFromHistory.add(msg.getId()); + scheduleOutbox(context.getCdsRuntime(), msg.getTarget()); + sendSuccessNotification( + "Outbox Entry Replayed", + "Outbox entry successfully replayed!"); + + } catch (Throwable th) { + sendErrorNotification( + "Error Outbox Entry Replay", + "Entry could not be replayed '%s'", + th.getMessage()); + } + }); + }); + }); + removedFromHistory.forEach( + msg -> lastSeenEntries.get(tenant).removeIf(entry -> entry.equals(msg))); + emitInfoDashboardEvent(() -> getLastSeenEntries(tenant)); + } + } + } + + @After(event = COMMAND_REMOVE_HISTORY) + private void removeHistoryEntry(DashboardCommandEventContext context) { + sendInfoNotification( + "Outbox Entry Remove History", "Removing the outbox entry from history..."); + String id = (String) context.getData().get("id"); + String tenant = getTenant(context); + synchronized (lastSeenEntries) { + if (lastSeenEntries.containsKey(tenant)) { + lastSeenEntries + .get(tenant) + .removeIf(entry -> ((Row) entry).as(Messages.class).getId().equals(id)); + emitInfoDashboardEvent(() -> getLastSeenEntries(tenant)); + } + } + } + + @Before( + service = PersistenceService.DEFAULT_NAME, + entity = OutboxUtils.OUTBOX_MODEL, + event = CqnService.EVENT_DELETE) + private void outboxEventDelete(CdsDeleteEventContext context) { + String tenant = context.getUserInfo().getTenant(); + getRuntime() + .requestContext() + .systemUser(tenant) + .run( + req -> { + getRuntime() + .changeSetContext() + .run( + ch -> { + CqnAnalyzer analyzer = CqnAnalyzer.create(context.getModel()); + String id = + (String) + analyzer.analyze(context.getCqn()).targetKeys().get(Messages.ID); + CqnSelect select = Select.from(Messages_.class).where(e -> e.ID().eq(id)); + InfoCollector.inDashboardContext( + () -> { + persistenceService + .run(select) + .forEach( + row -> { + synchronized (lastSeenEntries) { + if (lastSeenEntries.get(tenant) == null) { + lastSeenEntries.put(tenant, new ArrayList<>()); + } + List history = lastSeenEntries.get(tenant); + if (!history.stream() + .anyMatch( + o -> + ((Row) o) + .as(Messages.class) + .getMsg() + .equals( + row.as(Messages.class).getMsg()) + && ((Row) o) + .as(Messages.class) + .getTarget() + .equals( + row.as(Messages.class) + .getTarget()))) { + updateOutboxEntry(row); + history.add(row); + if (history.size() > MAX_HISTORY) { + history.remove(0); + } + } + } + }); + }); + }); + }); + } + + @After(service = PersistenceService.DEFAULT_NAME, entity = OutboxUtils.OUTBOX_MODEL, event = "*") + private void outboxEvents(EventContext context) { + String event = context.getEvent(); + if (!event.equals(CqnService.EVENT_READ)) { + String tenant = context.getUserInfo().getTenant(); + context + .getChangeSetContext() + .register( + new ChangeSetListener() { + + @Override + public void afterClose(boolean completed) { + emitInfoDashboardEvent(() -> getTenantStatus(tenant)); + if (event.equals(CqnService.EVENT_DELETE)) { + emitInfoDashboardEvent(() -> getLastSeenEntries(tenant)); + } + } + }); + } + } + + @After(service = DeploymentService.DEFAULT_NAME) + private void tenantSubscribed(SubscribeEventContext context) { + getTenantService() + .readTenants() + .forEach(tenant -> emitInfoDashboardEvent(() -> getTenantStatus(tenant))); + } + + InfoEvent getStatus() { + InfoEvent event = InfoEvent.create(Path.OUTBOX); + if (isPersistentOutboxEnabled) { + event.getData().put("enabled", true); + event.getData().put("outboxes", outboxes); + } else { + event.getData().put("enabled", false); + } + return event; + } + + InfoEvent getActiveStatus() { + InfoEvent event = InfoEvent.create(Path.OUTBOX); + if (isPersistentOutboxEnabled) { + Map active = new HashMap<>(); + getRuntime() + .getServiceCatalog() + .getServices(PersistentOutbox.class) + .filter(s -> s.isCollectorRunning()) + .forEach(s -> active.put(s.getName(), s.isCollectorRunning())); + event.getData().put("active", active); + } else { + event.getData().put("enabled", false); + } + return event; + } + + InfoEvent getTenantStatus(String tenant) { + InfoEvent event = InfoEvent.create(Path.OUTBOX_TENANTS + '.' + tenant); + if (isPersistentOutboxEnabled) { + getRuntime() + .requestContext() + .systemUser(tenant) + .run( + r -> { + getRuntime() + .changeSetContext() + .run( + ch -> { + InfoCollector.inDashboardContext( + () -> { + CqnSelect select = + messagesQuery( + getRuntime(), s -> s.orderBy(e -> e.timestamp().asc())); + Result res = persistenceService.run(select); + res.forEach(entry -> updateOutboxEntry(entry)); + event.getData().put("entries", res.list()); + }); + }); + }); + } else { + event.getData().put("enabled", false); + } + return event; + } + + private void updateOutboxEntry(Row row) { + Map data = row; + data.put("jsonMsg", CloudEventUtils.toMap((String) data.get("msg"))); + } + + InfoEvent getLastSeenEntries(String tenant) { + InfoEvent event = InfoEvent.create(Path.OUTBOX_TENANTS + '.' + tenant); + if (isPersistentOutboxEnabled) { + event.getData().put("enabled", true); + List reversedList; + synchronized (lastSeenEntries) { + if (lastSeenEntries.get(tenant) != null) { + reversedList = lastSeenEntries.get(tenant).subList(0, lastSeenEntries.get(tenant).size()); + } else { + reversedList = new ArrayList<>(); + } + } + Collections.reverse(reversedList); + event.getData().put("history", reversedList); + } else { + event.getData().put("enabled", false); + } + return event; + } + + private void scheduleOutbox(CdsRuntime runtime, String target) { + if (isPersistentOutboxEnabled) { + runtime + .getServiceCatalog() + .getServices(PersistentOutbox.class) + .filter(s -> s.getName().endsWith(target)) + .forEach(this::scheduleCollector); + } + } + + private void scheduleCollector(PersistentOutbox outbox) { + try { + Field collector = outbox.getClass().getDeclaredField("collector"); + collector.setAccessible(true); + Object collectorInstance = collector.get(outbox); + + // Check if it's a PartitionCollector (which has unpause method) + if (collectorInstance instanceof PartitionCollector) { + Method unpause = PartitionCollector.class.getDeclaredMethod("unpause"); + unpause.setAccessible(true); + unpause.invoke(collectorInstance); + } + // For TaskBasedCollector, no manual scheduling is needed as it's task-based + // and will automatically pick up new messages through its scheduled tasks + } catch (Exception e) { + logger.error("Cannot schedule the collector for the outbox {}", outbox.getName(), e); + } + } +} From 79b53829403749093eeb7c5404cf21341f9dbeee Mon Sep 17 00:00:00 2001 From: Liam Vilhelmsson Date: Thu, 12 Mar 2026 11:15:02 +0100 Subject: [PATCH 2/4] Use remoteMonitoringContext and change data model --- .../feature/console/info/InfoCollector.java | 37 +-- .../sap/cds/feature/console/info/Path.java | 6 +- .../info/collectors/OutboxInfoCollector.java | 212 ++++++------------ .../RemoteMonitoringConfiguration.java | 10 +- 4 files changed, 102 insertions(+), 163 deletions(-) diff --git a/cds-feature-console/src/main/java/com/sap/cds/feature/console/info/InfoCollector.java b/cds-feature-console/src/main/java/com/sap/cds/feature/console/info/InfoCollector.java index 3497378..d7f6909 100644 --- a/cds-feature-console/src/main/java/com/sap/cds/feature/console/info/InfoCollector.java +++ b/cds-feature-console/src/main/java/com/sap/cds/feature/console/info/InfoCollector.java @@ -73,13 +73,13 @@ public void sendSuccessNotification(String header, String notification, Object.. public void sendErrorNotification(String header, String notification, Object... args) { RemoteLogData logData = new RemoteLogData.Builder() - .type(header) - .logger("system") - .thread(Thread.currentThread().getName()) - .level("error") - .message(String.format(notification, args)) - .ts(System.currentTimeMillis()) - .build(); + .type(header) + .logger("system") + .thread(Thread.currentThread().getName()) + .level("error") + .message(String.format(notification, args)) + .ts(System.currentTimeMillis()) + .build(); InfoEvent event = InfoEvent.createRemoteLog(Path.CONSOLE_NOTIFICATION, logData); getRemoteMonitoringService().emit(event); @@ -87,18 +87,27 @@ public void sendErrorNotification(String header, String notification, Object... public void sendNotification(NotificationType type, String notification, Object... args) { RemoteLogData logData = new RemoteLogData.Builder() - .type(type.name()) - .logger("system") - .thread(Thread.currentThread().getName()) - .level("info") - .message(String.format(notification, args)) - .ts(System.currentTimeMillis()) - .build(); + .type(type.name()) + .logger("system") + .thread(Thread.currentThread().getName()) + .level("info") + .message(String.format(notification, args)) + .ts(System.currentTimeMillis()) + .build(); InfoEvent event = InfoEvent.createRemoteLog(Path.CONSOLE_NOTIFICATION, logData); getRemoteMonitoringService().emit(event); } + public static void inRemoteMonitoringContext(Runnable action) { + REMOTE_MONITORING_EVENT.set(true); + try { + action.run(); + } finally { + REMOTE_MONITORING_EVENT.set(false); + } + } + public static boolean isInRemoteMonitoringContext() { return REMOTE_MONITORING_EVENT.get(); } diff --git a/cds-feature-console/src/main/java/com/sap/cds/feature/console/info/Path.java b/cds-feature-console/src/main/java/com/sap/cds/feature/console/info/Path.java index 5741469..5da120a 100644 --- a/cds-feature-console/src/main/java/com/sap/cds/feature/console/info/Path.java +++ b/cds-feature-console/src/main/java/com/sap/cds/feature/console/info/Path.java @@ -2,9 +2,7 @@ public final class Path { - private Path() { - - } + private Path() {} public static final String CONSOLE = "console"; public static final String REMOTE_MONITORING = "remote-monitoring"; @@ -20,4 +18,6 @@ private Path() { public static final String TRACES_OUTPUT = TRACES + ".output"; public static final String TRACES_EVENTS = TRACES + ".events"; + public static final String OUTBOX = "outbox"; + public static final String OUTBOX_TENANTS = OUTBOX + ".tenants"; } diff --git a/cds-feature-console/src/main/java/com/sap/cds/feature/console/info/collectors/OutboxInfoCollector.java b/cds-feature-console/src/main/java/com/sap/cds/feature/console/info/collectors/OutboxInfoCollector.java index 3e6b71b..650cf98 100644 --- a/cds-feature-console/src/main/java/com/sap/cds/feature/console/info/collectors/OutboxInfoCollector.java +++ b/cds-feature-console/src/main/java/com/sap/cds/feature/console/info/collectors/OutboxInfoCollector.java @@ -1,17 +1,14 @@ -/* - * © 2024 SAP SE or an SAP affiliate company. All rights reserved. - */ -package com.sap.cds.feature.dashboard.info.collectors; +package com.sap.cds.feature.console.info.collectors; -import static com.sap.cds.services.impl.outbox.persistence.OutboxQueries.messagesQuery; +import static com.sap.cds.feature.console.service.RemoteMonitoringConfiguration.COMMAND_ATTACHED; import com.sap.cds.Result; import com.sap.cds.Row; -import com.sap.cds.feature.dashboard.connectivity.InfoEvent; -import com.sap.cds.feature.dashboard.info.InfoCollector; -import com.sap.cds.feature.dashboard.info.Path; -import com.sap.cds.feature.dashboard.service.DashboardCommandEventContext; -import com.sap.cds.feature.dashboard.service.DashboardService; +import com.sap.cds.feature.console.info.InfoCollector; +import com.sap.cds.feature.console.info.Path; +import com.sap.cds.feature.console.service.CommandEventContext; +import com.sap.cds.feature.console.service.InfoEvent; +import com.sap.cds.feature.console.service.RemoteMonitoringService; import com.sap.cds.ql.Delete; import com.sap.cds.ql.Insert; import com.sap.cds.ql.Select; @@ -22,6 +19,7 @@ import com.sap.cds.services.cds.CdsDeleteEventContext; import com.sap.cds.services.cds.CqnService; import com.sap.cds.services.changeset.ChangeSetListener; +import com.sap.cds.services.environment.CdsProperties.Outbox.OutboxServiceConfig; import com.sap.cds.services.handler.EventHandler; import com.sap.cds.services.handler.annotations.After; import com.sap.cds.services.handler.annotations.Before; @@ -46,9 +44,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.slf4j.LoggerFactory; -@ServiceName(DashboardService.DEFAULT_NAME) +@ServiceName(RemoteMonitoringService.DEFAULT_NAME) public class OutboxInfoCollector extends InfoCollector implements EventHandler { private static final org.slf4j.Logger logger = LoggerFactory.getLogger(InfoCollector.class); @@ -69,10 +68,9 @@ public class OutboxInfoCollector extends InfoCollector implements EventHandler { private boolean isPersistentOutboxEnabled; private Map> lastSeenEntries = new HashMap<>(); - private Map outboxes; - public OutboxInfoCollector(CdsRuntime runtime, DashboardService dashboardService) { - super(runtime, dashboardService); + public OutboxInfoCollector(CdsRuntime runtime, RemoteMonitoringService remoteMonitoringService) { + super(runtime, remoteMonitoringService); persistenceService = runtime .getServiceCatalog() @@ -87,75 +85,54 @@ private TenantProviderService getTenantService() { .getService(TenantProviderService.class, TenantProviderService.DEFAULT_NAME); } - @After(event = ClientInfoCollector.COMMAND_ATTACHED) - private void dashboardAttached(DashboardCommandEventContext context) { - outboxes = new HashMap<>(); - context - .getServiceCatalog() - .getServices(PersistentOutbox.class) - .forEach( - box -> - outboxes.put( - box.getName(), + @After(event = COMMAND_ATTACHED) + private void capConsoleAttached(CommandEventContext context) { + if (!isPersistentOutboxEnabled) { + return; + } + + List outBoxConfigs = + context + .getServiceCatalog() + .getServices(PersistentOutbox.class) + .map( + box -> context .getCdsRuntime() .getEnvironment() .getCdsProperties() .getOutbox() - .getService(box.getName()))); - emitInfoDashboardEvent(() -> getStatus()); - emitInfoDashboardEvent(() -> getActiveStatus()); - if (MultitenancyInfoCollector.isMtEnabled(getRuntime())) { + .getService(box.getName())) + .collect(Collectors.toList()); + + InfoEvent outboxConfigsEvent = InfoEvent.create(Path.OUTBOX, Map.of("outboxes", outBoxConfigs)); + emitInfoEvent(() -> outboxConfigsEvent); + + if (isMultitenancyEnabled()) { List tenants = getTenantService().readTenants(); - tenants.forEach( - tenant -> { - emitInfoDashboardEvent(() -> getTenantStatus(tenant)); - emitInfoDashboardEvent(() -> getLastSeenEntries(tenant)); - }); + tenants.forEach(tenant -> emitInfoEvent(() -> getTenantOutboxes(tenant))); } else { - emitInfoDashboardEvent(() -> getTenantStatus(null)); - emitInfoDashboardEvent(() -> getLastSeenEntries(null)); + emitInfoEvent(() -> getTenantOutboxes(null)); } } - private String getTenant(DashboardCommandEventContext context) { - if (MultitenancyInfoCollector.isMtEnabled(getRuntime())) { - return (String) context.getData().get("tenant"); - } - return null; + private boolean isMultitenancyEnabled() { + return getRuntime() + .getServiceCatalog() + .getService(TenantProviderService.class, TenantProviderService.DEFAULT_NAME) + != null; } - @After(event = COMMAND_START_COLLECTOR) - private void startCollector(DashboardCommandEventContext context) { - String outboxName = (String) context.getData().get("name"); - PersistentOutbox outbox = - getRuntime().getServiceCatalog().getService(PersistentOutbox.class, outboxName); - outbox.start(); - for (int i = 0; i < 10; i++) { - try { - Thread.sleep(200); - } catch (InterruptedException e) { - // - } - if (outbox.isCollectorRunning()) { - emitInfoDashboardEvent(() -> getActiveStatus()); - return; - } + private String getTenant(CommandEventContext context) { + if (isMultitenancyEnabled()) { + return (String) context.getData().get("tenant"); } - } - - @After(event = COMMAND_STOP_COLLECTOR) - private void stopCollector(DashboardCommandEventContext context) { - String outboxName = (String) context.getData().get("name"); - PersistentOutbox outbox = - getRuntime().getServiceCatalog().getService(PersistentOutbox.class, outboxName); - outbox.stop(); - emitInfoDashboardEvent(() -> getActiveStatus()); + return null; } @SuppressWarnings("unchecked") @After(event = COMMAND_CREATE) - private void createEntry(DashboardCommandEventContext context) { + private void createEntry(CommandEventContext context) { sendInfoNotification("Outbox Entry Create", "Creating the outbox entry..."); String outbox = (String) context.getData().get("outbox"); String tenant = getTenant(context); @@ -177,7 +154,7 @@ private void createEntry(DashboardCommandEventContext context) { } @After(event = COMMAND_RESET) - private void resetEntry(DashboardCommandEventContext context) { + private void resetEntry(CommandEventContext context) { sendInfoNotification("Outbox Entry Reset", "Resetting the outbox entry..."); String id = (String) context.getData().get("id"); String tenant = getTenant(context); @@ -201,7 +178,7 @@ public void afterClose(boolean completed) { }); } try { - InfoCollector.inDashboardContext( + InfoCollector.inRemoteMonitoringContext( () -> persistenceService.run( Update.entity(Messages_.class) @@ -220,7 +197,7 @@ public void afterClose(boolean completed) { } @After(event = COMMAND_REMOVE) - private void removeEntry(DashboardCommandEventContext context) { + private void removeEntry(CommandEventContext context) { sendInfoNotification("Outbox Entry Remove", "Removing the outbox entry..."); String id = (String) context.getData().get("id"); String tenant = getTenant(context); @@ -234,7 +211,7 @@ private void removeEntry(DashboardCommandEventContext context) { .run( ch -> { try { - InfoCollector.inDashboardContext( + InfoCollector.inRemoteMonitoringContext( () -> persistenceService.run( Delete.from(Messages_.class).where(e -> e.ID().eq(id)))); @@ -251,7 +228,7 @@ private void removeEntry(DashboardCommandEventContext context) { } @After(event = COMMAND_REPLAY) - private void replayEntry(DashboardCommandEventContext context) { + private void replayEntry(CommandEventContext context) { sendInfoNotification("Outbox Entry Replay", "Replaying the outbox entry..."); String id = (String) context.getData().get("id"); String tenant = getTenant(context); @@ -278,18 +255,8 @@ private void replayEntry(DashboardCommandEventContext context) { newMsg.setMsg(msg.getMsg()); newMsg.setTarget(msg.getTarget()); newMsg.setTimestamp(Instant.now()); - String appid = - getRuntime() - .getEnvironment() - .getCdsProperties() - .getEnvironment() - .getDeployment() - .getAppid(); - if (appid != null) { - newMsg.setAppid(appid); - } - InfoCollector.inDashboardContext( + InfoCollector.inRemoteMonitoringContext( () -> persistenceService.run( Insert.into(Messages_.class).entry(newMsg))); @@ -310,13 +277,13 @@ private void replayEntry(DashboardCommandEventContext context) { }); removedFromHistory.forEach( msg -> lastSeenEntries.get(tenant).removeIf(entry -> entry.equals(msg))); - emitInfoDashboardEvent(() -> getLastSeenEntries(tenant)); + emitInfoEvent(() -> getTenantOutboxes(tenant)); } } } @After(event = COMMAND_REMOVE_HISTORY) - private void removeHistoryEntry(DashboardCommandEventContext context) { + private void removeHistoryEntry(CommandEventContext context) { sendInfoNotification( "Outbox Entry Remove History", "Removing the outbox entry from history..."); String id = (String) context.getData().get("id"); @@ -326,7 +293,7 @@ private void removeHistoryEntry(DashboardCommandEventContext context) { lastSeenEntries .get(tenant) .removeIf(entry -> ((Row) entry).as(Messages.class).getId().equals(id)); - emitInfoDashboardEvent(() -> getLastSeenEntries(tenant)); + emitInfoEvent(() -> getTenantOutboxes(tenant)); } } } @@ -351,7 +318,7 @@ private void outboxEventDelete(CdsDeleteEventContext context) { (String) analyzer.analyze(context.getCqn()).targetKeys().get(Messages.ID); CqnSelect select = Select.from(Messages_.class).where(e -> e.ID().eq(id)); - InfoCollector.inDashboardContext( + InfoCollector.inRemoteMonitoringContext( () -> { persistenceService .run(select) @@ -401,10 +368,7 @@ private void outboxEvents(EventContext context) { @Override public void afterClose(boolean completed) { - emitInfoDashboardEvent(() -> getTenantStatus(tenant)); - if (event.equals(CqnService.EVENT_DELETE)) { - emitInfoDashboardEvent(() -> getLastSeenEntries(tenant)); - } + emitInfoEvent(() -> getTenantOutboxes(tenant)); } }); } @@ -414,37 +378,10 @@ public void afterClose(boolean completed) { private void tenantSubscribed(SubscribeEventContext context) { getTenantService() .readTenants() - .forEach(tenant -> emitInfoDashboardEvent(() -> getTenantStatus(tenant))); + .forEach(tenant -> emitInfoEvent(() -> getTenantOutboxes(tenant))); } - InfoEvent getStatus() { - InfoEvent event = InfoEvent.create(Path.OUTBOX); - if (isPersistentOutboxEnabled) { - event.getData().put("enabled", true); - event.getData().put("outboxes", outboxes); - } else { - event.getData().put("enabled", false); - } - return event; - } - - InfoEvent getActiveStatus() { - InfoEvent event = InfoEvent.create(Path.OUTBOX); - if (isPersistentOutboxEnabled) { - Map active = new HashMap<>(); - getRuntime() - .getServiceCatalog() - .getServices(PersistentOutbox.class) - .filter(s -> s.isCollectorRunning()) - .forEach(s -> active.put(s.getName(), s.isCollectorRunning())); - event.getData().put("active", active); - } else { - event.getData().put("enabled", false); - } - return event; - } - - InfoEvent getTenantStatus(String tenant) { + private InfoEvent getTenantOutboxes(String tenant) { InfoEvent event = InfoEvent.create(Path.OUTBOX_TENANTS + '.' + tenant); if (isPersistentOutboxEnabled) { getRuntime() @@ -456,48 +393,37 @@ InfoEvent getTenantStatus(String tenant) { .changeSetContext() .run( ch -> { - InfoCollector.inDashboardContext( + InfoCollector.inRemoteMonitoringContext( () -> { CqnSelect select = - messagesQuery( - getRuntime(), s -> s.orderBy(e -> e.timestamp().asc())); + Select.from(Messages_.class).orderBy(e -> e.timestamp().asc()); Result res = persistenceService.run(select); res.forEach(entry -> updateOutboxEntry(entry)); event.getData().put("entries", res.list()); }); }); }); - } else { - event.getData().put("enabled", false); - } - return event; - } - private void updateOutboxEntry(Row row) { - Map data = row; - data.put("jsonMsg", CloudEventUtils.toMap((String) data.get("msg"))); - } - - InfoEvent getLastSeenEntries(String tenant) { - InfoEvent event = InfoEvent.create(Path.OUTBOX_TENANTS + '.' + tenant); - if (isPersistentOutboxEnabled) { - event.getData().put("enabled", true); + // Add history List reversedList; synchronized (lastSeenEntries) { - if (lastSeenEntries.get(tenant) != null) { - reversedList = lastSeenEntries.get(tenant).subList(0, lastSeenEntries.get(tenant).size()); - } else { - reversedList = new ArrayList<>(); - } + reversedList = + lastSeenEntries.get(tenant) != null + ? new ArrayList<>(lastSeenEntries.get(tenant)) + : new ArrayList<>(); } Collections.reverse(reversedList); event.getData().put("history", reversedList); - } else { - event.getData().put("enabled", false); } + return event; } + private void updateOutboxEntry(Row row) { + Map data = row; + data.put("jsonMsg", CloudEventUtils.toMap((String) data.get("msg"))); + } + private void scheduleOutbox(CdsRuntime runtime, String target) { if (isPersistentOutboxEnabled) { runtime diff --git a/cds-feature-console/src/main/java/com/sap/cds/feature/console/service/RemoteMonitoringConfiguration.java b/cds-feature-console/src/main/java/com/sap/cds/feature/console/service/RemoteMonitoringConfiguration.java index dfcfa6d..342ccd6 100644 --- a/cds-feature-console/src/main/java/com/sap/cds/feature/console/service/RemoteMonitoringConfiguration.java +++ b/cds-feature-console/src/main/java/com/sap/cds/feature/console/service/RemoteMonitoringConfiguration.java @@ -2,6 +2,7 @@ import com.sap.cds.feature.console.connectivity.RemoteMonitoringHandler; import com.sap.cds.feature.console.info.collectors.LogCollector; +import com.sap.cds.feature.console.info.collectors.OutboxInfoCollector; import com.sap.cds.services.runtime.CdsRuntimeConfiguration; import com.sap.cds.services.runtime.CdsRuntimeConfigurer; @@ -26,9 +27,12 @@ public void services(CdsRuntimeConfigurer configurer) { @Override public void eventHandlers(CdsRuntimeConfigurer configurer) { if (remoteMonitoringService != null) { - configurer.eventHandler(new RemoteMonitoringHandler(remoteMonitoringService.getRemoteMonitoringServer())); - configurer.eventHandler(new LogCollector(configurer.getCdsRuntime(), remoteMonitoringService)); + configurer.eventHandler( + new RemoteMonitoringHandler(remoteMonitoringService.getRemoteMonitoringServer())); + configurer.eventHandler( + new LogCollector(configurer.getCdsRuntime(), remoteMonitoringService)); + configurer.eventHandler( + new OutboxInfoCollector(configurer.getCdsRuntime(), remoteMonitoringService)); } } - } From 470106449630b08c8ce483c0ca45106bcd322348 Mon Sep 17 00:00:00 2001 From: Liam Vilhelmsson Date: Thu, 12 Mar 2026 13:51:29 +0100 Subject: [PATCH 3/4] Broadcast to tasks --- .../connectivity/RemoteMonitoringHandler.java | 14 +++++++++++--- .../connectivity/RemoteMonitoringServer.java | 16 +++++++++------- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/cds-feature-console/src/main/java/com/sap/cds/feature/console/connectivity/RemoteMonitoringHandler.java b/cds-feature-console/src/main/java/com/sap/cds/feature/console/connectivity/RemoteMonitoringHandler.java index 40aaefc..8bbff2d 100644 --- a/cds-feature-console/src/main/java/com/sap/cds/feature/console/connectivity/RemoteMonitoringHandler.java +++ b/cds-feature-console/src/main/java/com/sap/cds/feature/console/connectivity/RemoteMonitoringHandler.java @@ -1,5 +1,6 @@ package com.sap.cds.feature.console.connectivity; +import com.sap.cds.feature.console.info.Path; import com.sap.cds.feature.console.service.CommandEventContext; import com.sap.cds.feature.console.service.InfoEventContext; import com.sap.cds.feature.console.service.RemoteMonitoringService; @@ -24,8 +25,16 @@ public RemoteMonitoringHandler(RemoteMonitoringServer server) { @On private void handleInfoEvent(InfoEventContext context) { logger.debug("Handling info '{}'", context.getEvent()); - this.remoteMonitoringServer.broadcastToPath( - context.getInfoEvent().toJson(), RemoteMonitoringServer.PATH_LOGS); + + String path = context.getInfoEvent().getPath(); + + if (path.startsWith(Path.OUTBOX)) { + this.remoteMonitoringServer.broadcastToPath( + context.getInfoEvent().toJson(), RemoteMonitoringServer.PATH_TASKS); + } else { + this.remoteMonitoringServer.broadcastToPath( + context.getInfoEvent().toJson(), RemoteMonitoringServer.PATH_LOGS); + } context.setCompleted(); } @@ -36,5 +45,4 @@ private void handleDashboardCommandEvent(CommandEventContext context) { logger.debug("Handling command '{}'", context.getEvent()); context.setCompleted(); } - } diff --git a/cds-feature-console/src/main/java/com/sap/cds/feature/console/connectivity/RemoteMonitoringServer.java b/cds-feature-console/src/main/java/com/sap/cds/feature/console/connectivity/RemoteMonitoringServer.java index e8d1be5..3a2edee 100644 --- a/cds-feature-console/src/main/java/com/sap/cds/feature/console/connectivity/RemoteMonitoringServer.java +++ b/cds-feature-console/src/main/java/com/sap/cds/feature/console/connectivity/RemoteMonitoringServer.java @@ -32,6 +32,7 @@ public class RemoteMonitoringServer extends WebSocketServer { private static final Logger logger = LoggerFactory.getLogger(RemoteMonitoringServer.class); public static final String PATH_CAP_CONSOLE = "/cap-console"; public static final String PATH_LOGS = PATH_CAP_CONSOLE + "/logs"; + public static final String PATH_TASKS = PATH_CAP_CONSOLE + "/tasks"; private final Map> clientsByPaths = new ConcurrentHashMap<>(); private static final ObjectMapper objectMapper = new ObjectMapper(); @@ -132,13 +133,14 @@ public void onStart() { } private void welcomeClient(WebSocket conn, String path) { - RemoteLogData welcomeMsg = new RemoteLogData.Builder() - .level("INFO") - .logger("system") - .thread(Thread.currentThread().getName()) - .type("welcome") - .message("Welcome to CAP console Remote Monitoring.") - .build(); + RemoteLogData welcomeMsg = + new RemoteLogData.Builder() + .level("INFO") + .logger("system") + .thread(Thread.currentThread().getName()) + .type("welcome") + .message("Welcome to CAP console Remote Monitoring.") + .build(); InfoEvent infoEvent = InfoEvent.createRemoteLog(path, welcomeMsg); conn.send(infoEvent.toJson()); From 983328b37ae6ad3cd659c67e76aa16dc2b34ad05 Mon Sep 17 00:00:00 2001 From: Liam Vilhelmsson Date: Thu, 12 Mar 2026 16:53:39 +0100 Subject: [PATCH 4/4] Only include necessary fields in config --- .../feature/console/info/InfoCollector.java | 9 +++-- .../info/collectors/OutboxInfoCollector.java | 20 ++++++---- .../feature/console/service/OutboxConfig.java | 37 +++++++++++++++++++ 3 files changed, 55 insertions(+), 11 deletions(-) create mode 100644 cds-feature-console/src/main/java/com/sap/cds/feature/console/service/OutboxConfig.java diff --git a/cds-feature-console/src/main/java/com/sap/cds/feature/console/info/InfoCollector.java b/cds-feature-console/src/main/java/com/sap/cds/feature/console/info/InfoCollector.java index d7f6909..6367c90 100644 --- a/cds-feature-console/src/main/java/com/sap/cds/feature/console/info/InfoCollector.java +++ b/cds-feature-console/src/main/java/com/sap/cds/feature/console/info/InfoCollector.java @@ -52,7 +52,8 @@ protected void emitInfoEvent(Supplier infoProducer) { } getRemoteMonitoringService().emit(event); } catch (Exception e) { - sendErrorNotification("Data Access Error", e.getMessage()); + String errorMessage = e.getMessage() != null ? e.getMessage() : e.getClass().getName(); + sendErrorNotification("Data Access Error", errorMessage); logger.error("Could not emit remote-monitoring info event!", e); } } @@ -72,7 +73,8 @@ public void sendSuccessNotification(String header, String notification, Object.. } public void sendErrorNotification(String header, String notification, Object... args) { - RemoteLogData logData = new RemoteLogData.Builder() + RemoteLogData logData = + new RemoteLogData.Builder() .type(header) .logger("system") .thread(Thread.currentThread().getName()) @@ -86,7 +88,8 @@ public void sendErrorNotification(String header, String notification, Object... } public void sendNotification(NotificationType type, String notification, Object... args) { - RemoteLogData logData = new RemoteLogData.Builder() + RemoteLogData logData = + new RemoteLogData.Builder() .type(type.name()) .logger("system") .thread(Thread.currentThread().getName()) diff --git a/cds-feature-console/src/main/java/com/sap/cds/feature/console/info/collectors/OutboxInfoCollector.java b/cds-feature-console/src/main/java/com/sap/cds/feature/console/info/collectors/OutboxInfoCollector.java index 650cf98..53e81eb 100644 --- a/cds-feature-console/src/main/java/com/sap/cds/feature/console/info/collectors/OutboxInfoCollector.java +++ b/cds-feature-console/src/main/java/com/sap/cds/feature/console/info/collectors/OutboxInfoCollector.java @@ -8,6 +8,7 @@ import com.sap.cds.feature.console.info.Path; import com.sap.cds.feature.console.service.CommandEventContext; import com.sap.cds.feature.console.service.InfoEvent; +import com.sap.cds.feature.console.service.OutboxConfig; import com.sap.cds.feature.console.service.RemoteMonitoringService; import com.sap.cds.ql.Delete; import com.sap.cds.ql.Insert; @@ -91,18 +92,21 @@ private void capConsoleAttached(CommandEventContext context) { return; } - List outBoxConfigs = + List outBoxConfigs = context .getServiceCatalog() .getServices(PersistentOutbox.class) .map( - box -> - context - .getCdsRuntime() - .getEnvironment() - .getCdsProperties() - .getOutbox() - .getService(box.getName())) + box -> { + OutboxServiceConfig config = + context + .getCdsRuntime() + .getEnvironment() + .getCdsProperties() + .getOutbox() + .getService(box.getName()); + return OutboxConfig.fromServiceConfig(config, box.getName()); + }) .collect(Collectors.toList()); InfoEvent outboxConfigsEvent = InfoEvent.create(Path.OUTBOX, Map.of("outboxes", outBoxConfigs)); diff --git a/cds-feature-console/src/main/java/com/sap/cds/feature/console/service/OutboxConfig.java b/cds-feature-console/src/main/java/com/sap/cds/feature/console/service/OutboxConfig.java new file mode 100644 index 0000000..991c4c0 --- /dev/null +++ b/cds-feature-console/src/main/java/com/sap/cds/feature/console/service/OutboxConfig.java @@ -0,0 +1,37 @@ +package com.sap.cds.feature.console.service; + +import com.sap.cds.services.environment.CdsProperties.Outbox.OutboxServiceConfig; +import java.util.Map; + +/** + * Class representing outbox configuration data for serialization to the client. Extracts only + * serializable properties from OutboxServiceConfig. + */ +public final class OutboxConfig { + private final String name; + private final int maxAttempts; + private final boolean ordered; + private final boolean enabled; + + private OutboxConfig(String name, int maxAttempts, boolean ordered, boolean enabled) { + this.name = name; + this.maxAttempts = maxAttempts; + this.ordered = ordered; + this.enabled = enabled; + } + + /** + * Creates an OutboxConfig from a CAP OutboxServiceConfig and outbox name. + * + * @param config the OutboxServiceConfig from CAP framework + * @param name the name of the outbox service + * @return OutboxConfig instance + */ + public static OutboxConfig fromServiceConfig(OutboxServiceConfig config, String name) { + return new OutboxConfig(name, config.getMaxAttempts(), config.isOrdered(), config.isEnabled()); + } + + public Map toMap() { + return Map.of("name", name, "maxAttempts", maxAttempts, "ordered", ordered, "enabled", enabled); + } +}