Skip to content

Commit d1be598

Browse files
authored
Merge pull request #1 from constantine2nd/main
Make Adapter works with OBP-API
2 parents d36328e + 861e86b commit d1be598

6 files changed

Lines changed: 113 additions & 76 deletions

File tree

src/main/scala/com/tesobe/obp/adapter/AdapterMain.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ object AdapterMain extends IOApp {
7474
healthResult <- localAdapter.checkHealth(
7575
com.tesobe.obp.adapter.models.CallContext(
7676
correlationId = "startup-health-check",
77-
sessionId = "startup",
77+
sessionId = Some("startup"),
7878
userId = None,
7979
username = None,
8080
consumerId = None,

src/main/scala/com/tesobe/obp/adapter/config/Config.scala

Lines changed: 29 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -75,50 +75,54 @@ case class AdapterConfig(
7575

7676
object Config {
7777

78+
/** Read a config value: env var takes priority, then system property, then default */
79+
private def env(key: String, default: String): String =
80+
sys.env.getOrElse(key, sys.props.getOrElse(key, default))
81+
7882
/** Load configuration from environment variables */
7983
def load: IO[AdapterConfig] = IO {
8084
val httpConfig = HttpConfig(
81-
host = sys.env.getOrElse("HTTP_HOST", "0.0.0.0"),
82-
port = sys.env.getOrElse("HTTP_PORT", "52345").toInt,
83-
enabled = sys.env.getOrElse("HTTP_ENABLED", "true").toBoolean,
85+
host = env("HTTP_HOST", "0.0.0.0"),
86+
port = env("HTTP_PORT", "52345").toInt,
87+
enabled = env("HTTP_ENABLED", "true").toBoolean,
8488
apiExplorerUrl =
85-
sys.env.getOrElse("API_EXPLORER_URL", "http://localhost:5173"),
86-
obpApiUrl = sys.env.getOrElse("OBP_API_URL", "http://localhost:8080")
89+
env("API_EXPLORER_URL", "http://localhost:5173"),
90+
obpApiUrl = env("OBP_API_URL", "http://localhost:8080")
8791
)
8892

8993
val rabbitmqConfig = RabbitMQConfig(
90-
host = sys.env.getOrElse("RABBITMQ_HOST", "localhost"),
91-
port = sys.env.getOrElse("RABBITMQ_PORT", "5672").toInt,
92-
virtualHost = sys.env.getOrElse("RABBITMQ_VIRTUAL_HOST", "/"),
93-
username = sys.env.getOrElse("RABBITMQ_USERNAME", "guest"),
94-
password = sys.env.getOrElse("RABBITMQ_PASSWORD", "guest"),
94+
host = env("RABBITMQ_HOST", "localhost"),
95+
port = env("RABBITMQ_PORT", "5672").toInt,
96+
virtualHost = env("RABBITMQ_VIRTUAL_HOST", "/"),
97+
username = env("RABBITMQ_USERNAME", "guest"),
98+
password = env("RABBITMQ_PASSWORD", "guest"),
9599
connectionTimeout =
96-
sys.env.getOrElse("RABBITMQ_CONNECTION_TIMEOUT", "30").toInt.seconds,
100+
env("RABBITMQ_CONNECTION_TIMEOUT", "30").toInt.seconds,
97101
requestedHeartbeat =
98-
sys.env.getOrElse("RABBITMQ_HEARTBEAT", "60").toInt.seconds,
102+
env("RABBITMQ_HEARTBEAT", "60").toInt.seconds,
99103
automaticRecovery =
100-
sys.env.getOrElse("RABBITMQ_AUTOMATIC_RECOVERY", "true").toBoolean
104+
env("RABBITMQ_AUTOMATIC_RECOVERY", "true").toBoolean
101105
)
102106

103107
val queueConfig = QueueConfig(
104-
requestQueue = sys.env.getOrElse("RABBITMQ_REQUEST_QUEUE", "obp.request"),
108+
requestQueue = env("RABBITMQ_REQUEST_QUEUE", "obp.request"),
105109
responseQueue =
106-
sys.env.getOrElse("RABBITMQ_RESPONSE_QUEUE", "obp.response"),
107-
prefetchCount = sys.env.getOrElse("RABBITMQ_PREFETCH_COUNT", "10").toInt,
108-
durable = sys.env.getOrElse("RABBITMQ_QUEUE_DURABLE", "true").toBoolean,
110+
env("RABBITMQ_RESPONSE_QUEUE", "obp.response"),
111+
prefetchCount = env("RABBITMQ_PREFETCH_COUNT", "10").toInt,
112+
durable = env("RABBITMQ_QUEUE_DURABLE", "true").toBoolean,
109113
autoDelete =
110-
sys.env.getOrElse("RABBITMQ_QUEUE_AUTO_DELETE", "false").toBoolean
114+
env("RABBITMQ_QUEUE_AUTO_DELETE", "false").toBoolean
111115
)
112116

113117
val redisConfig = RedisConfig(
114-
host = sys.env.getOrElse("REDIS_HOST", "localhost"),
115-
port = sys.env.getOrElse("REDIS_PORT", "6379").toInt,
116-
enabled = sys.env.getOrElse("REDIS_ENABLED", "true").toBoolean
118+
host = env("REDIS_HOST", "localhost"),
119+
port = env("REDIS_PORT", "6379").toInt,
120+
enabled = env("REDIS_ENABLED", "true").toBoolean
117121
)
118122

119123
val grpcConfig = GrpcConfig(
120-
port = sys.env.getOrElse("GRPC_PORT", "50051").toInt,
121-
enabled = sys.env.getOrElse("GRPC_ENABLED", "false").toBoolean
124+
port = env("GRPC_PORT", "50051").toInt,
125+
enabled = env("GRPC_ENABLED", "false").toBoolean
122126
)
123127

124128
AdapterConfig(
@@ -127,8 +131,8 @@ object Config {
127131
queue = queueConfig,
128132
redis = redisConfig,
129133
grpc = grpcConfig,
130-
logLevel = sys.env.getOrElse("LOG_LEVEL", "INFO"),
131-
enableMetrics = sys.env.getOrElse("ENABLE_METRICS", "true").toBoolean
134+
logLevel = env("LOG_LEVEL", "INFO"),
135+
enableMetrics = env("ENABLE_METRICS", "true").toBoolean
132136
)
133137
}
134138

src/main/scala/com/tesobe/obp/adapter/interfaces/LocalAdapter.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package com.tesobe.obp.adapter.interfaces
1919

2020
import cats.effect.IO
2121
import com.tesobe.obp.adapter.models._
22-
import io.circe.JsonObject
22+
import io.circe.{Json, JsonObject}
2323

2424
/**
2525
* Core interface for Core Banking System (CBS) adapters.
@@ -94,7 +94,7 @@ object LocalAdapterResult {
9494
* Successful response with JSON data matching OBP message docs format
9595
*/
9696
final case class Success(
97-
data: JsonObject,
97+
data: Json,
9898
backendMessages: List[BackendMessage] = Nil
9999
) extends LocalAdapterResult
100100

@@ -109,6 +109,9 @@ object LocalAdapterResult {
109109

110110
/** Convenience constructors */
111111
def success(data: JsonObject, messages: List[BackendMessage] = Nil): LocalAdapterResult =
112+
Success(Json.fromJsonObject(data), messages)
113+
114+
def success(data: Json, messages: List[BackendMessage]): LocalAdapterResult =
112115
Success(data, messages)
113116

114117
def error(code: String, message: String, messages: List[BackendMessage] = Nil): LocalAdapterResult =

src/main/scala/com/tesobe/obp/adapter/messaging/RabbitMQClient.scala

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,15 @@ import com.rabbitmq.client.{
1919
import com.tesobe.obp.adapter.config.AdapterConfig
2020
import java.nio.charset.StandardCharsets
2121

22+
/** Message envelope containing body and RabbitMQ properties
23+
*/
24+
case class MessageEnvelope(
25+
body: String,
26+
messageId: String,
27+
correlationId: Option[String],
28+
replyTo: Option[String]
29+
)
30+
2231
/** Simple RabbitMQ client wrapper using the Java client library
2332
*
2433
* This provides basic publish/consume functionality without the complexity of
@@ -71,25 +80,26 @@ class RabbitMQClient(config: AdapterConfig) {
7180
}.void
7281
}
7382

74-
/** Publish a message to a queue with process as messageId
83+
/** Publish a message to a queue with optional correlationId for RPC responses
7584
*/
7685
def publishMessage(
7786
channel: Channel,
7887
queueName: String,
7988
message: String,
80-
process: Option[String] = None
89+
process: Option[String] = None,
90+
correlationId: Option[String] = None
8191
): IO[Unit] = {
8292
IO {
83-
val propsBuilder = new com.rabbitmq.client.AMQP.BasicProperties.Builder()
93+
var propsBuilder = new com.rabbitmq.client.AMQP.BasicProperties.Builder()
8494
.contentType("application/json")
8595

8696
// Add process as messageId property (matching OBP-API behavior)
87-
val props = process match {
88-
case Some(p) =>
89-
propsBuilder.messageId(p).build()
90-
case None =>
91-
propsBuilder.build()
92-
}
97+
process.foreach(p => propsBuilder = propsBuilder.messageId(p))
98+
99+
// Add correlationId for RPC response matching
100+
correlationId.foreach(cid => propsBuilder = propsBuilder.correlationId(cid))
101+
102+
val props = propsBuilder.build()
93103

94104
channel.basicPublish(
95105
"", // exchange (empty = default)
@@ -101,12 +111,12 @@ class RabbitMQClient(config: AdapterConfig) {
101111
}
102112

103113
/** Consume messages from a queue with a callback Returns an IO that will run
104-
* forever, processing messages Handler receives: (message, routingKey)
114+
* forever, processing messages Handler receives: MessageEnvelope with body, messageId, correlationId, and replyTo
105115
*/
106116
def consumeMessages(
107117
channel: Channel,
108118
queueName: String,
109-
handler: (String, String) => IO[Unit]
119+
handler: MessageEnvelope => IO[Unit]
110120
): IO[Unit] = {
111121
IO {
112122
// Set prefetch count
@@ -117,13 +127,18 @@ class RabbitMQClient(config: AdapterConfig) {
117127

118128
val deliverCallback: DeliverCallback = (consumerTag, delivery) => {
119129
val message = new String(delivery.getBody, StandardCharsets.UTF_8)
120-
121-
// Extract process from messageId property (matching OBP-API behavior)
122-
val process = Option(delivery.getProperties.getMessageId)
123-
.getOrElse("unknown")
130+
val props = delivery.getProperties
131+
132+
// Extract properties from message
133+
val envelope = MessageEnvelope(
134+
body = message,
135+
messageId = Option(props.getMessageId).getOrElse("unknown"),
136+
correlationId = Option(props.getCorrelationId),
137+
replyTo = Option(props.getReplyTo)
138+
)
124139

125140
val processAndAck = for {
126-
_ <- handler(message, process)
141+
_ <- handler(envelope)
127142
_ <- IO(channel.basicAck(delivery.getEnvelope.getDeliveryTag, false))
128143
} yield ()
129144

src/main/scala/com/tesobe/obp/adapter/messaging/RabbitMQConsumer.scala

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,15 @@ package com.tesobe.obp.adapter.messaging
1212
import cats.effect.IO
1313
import com.tesobe.obp.adapter.config.AdapterConfig
1414
import com.tesobe.obp.adapter.interfaces.LocalAdapter
15+
import com.tesobe.obp.adapter.models._
1516
import com.tesobe.obp.adapter.telemetry.Telemetry
1617
import com.tesobe.obp.adapter.http.DiscoveryServer
1718
import io.circe.syntax._
1819

1920
/** RabbitMQ consumer for OBP messages
2021
*
2122
* Consumes messages from the request queue, processes them via local adapter,
22-
* and sends responses to the response queue.
23+
* and sends responses to the replyTo queue (RPC pattern) or fallback response queue.
2324
*/
2425
object RabbitMQConsumer {
2526

@@ -40,6 +41,7 @@ object RabbitMQConsumer {
4041
_ <- IO.println(
4142
s"[RabbitMQ] Connected to ${config.rabbitmq.host}:${config.rabbitmq.port}"
4243
)
44+
_ <- IO.println(s"[INFO] RabbitMQ connected: ${config.rabbitmq.host}:${config.rabbitmq.port}")
4345
_ <- telemetry.recordRabbitMQConnected(
4446
config.rabbitmq.host,
4547
config.rabbitmq.port
@@ -54,21 +56,21 @@ object RabbitMQConsumer {
5456
_ <- telemetry.recordQueueConsumptionStarted(
5557
config.queue.requestQueue
5658
)
59+
_ <- IO.println(s"[INFO] Queue consumption started: ${config.queue.requestQueue}")
5760
_ <- IO.println(
5861
s"[OK] Consuming from queue: ${config.queue.requestQueue}"
5962
)
6063
_ <- IO.println("")
6164

62-
// Start consuming messages
65+
// Start consuming messages with MessageEnvelope
6366
_ <- client.consumeMessages(
6467
channel,
6568
config.queue.requestQueue,
66-
(message, routingKey) =>
69+
envelope =>
6770
processMessage(
6871
client,
6972
channel,
70-
message,
71-
routingKey,
73+
envelope,
7274
config,
7375
localAdapter,
7476
telemetry,
@@ -92,15 +94,23 @@ object RabbitMQConsumer {
9294
private def processMessage(
9395
client: RabbitMQClient,
9496
channel: com.rabbitmq.client.Channel,
95-
messageJson: String,
96-
process: String,
97+
envelope: MessageEnvelope,
9798
config: AdapterConfig,
9899
localAdapter: LocalAdapter,
99100
telemetry: Telemetry,
100101
redis: Option[dev.profunktor.redis4cats.RedisCommands[IO, String, String]]
101102
): IO[Unit] = {
103+
val messageJson = envelope.body
104+
val process = envelope.messageId
105+
106+
// Use replyTo from message properties if present, otherwise fall back to configured response queue
107+
val responseQueue = envelope.replyTo.getOrElse(config.queue.responseQueue)
108+
val rabbitCorrelationId = envelope.correlationId
102109

103110
(for {
111+
// Log incoming message details
112+
_ <- IO.println(s"[DEBUG] Received message - process: $process, replyTo: ${envelope.replyTo}, correlationId: ${envelope.correlationId}")
113+
104114
// Increment outbound counter
105115
_ <- redis match {
106116
case Some(r) => RedisCounter.incrementOutbound(r, process)
@@ -116,8 +126,9 @@ object RabbitMQConsumer {
116126
)
117127
(inboundMsg, _) = result
118128

119-
// Send response via RabbitMQ
120-
_ <- sendResponse(client, channel, config.queue.responseQueue, inboundMsg)
129+
// Send response to replyTo queue with correlationId
130+
_ <- IO.println(s"[DEBUG] Sending response to queue: $responseQueue with correlationId: $rabbitCorrelationId")
131+
_ <- sendResponse(client, channel, responseQueue, inboundMsg, rabbitCorrelationId)
121132

122133
// Increment inbound counter
123134
_ <- redis match {
@@ -129,11 +140,11 @@ object RabbitMQConsumer {
129140
// Handle errors
130141
for {
131142
_ <- telemetry.recordMessageFailed(
132-
process = "unknown",
133-
correlationId = "unknown",
134-
errorCode = "ADAPTER_ERROR",
135-
errorMessage = error.getMessage,
136-
duration = scala.concurrent.duration.Duration.Zero
143+
process = process,
144+
correlationId = envelope.correlationId.getOrElse("unknown"),
145+
errorCode = "ADAPTER_ERROR",
146+
errorMessage = error.getMessage,
147+
duration = scala.concurrent.duration.Duration.Zero
137148
)
138149
_ <- IO.println(
139150
s"[ERROR] Error processing message: ${error.getMessage}"
@@ -143,13 +154,14 @@ object RabbitMQConsumer {
143154
}
144155
}
145156

146-
/** Send response message to response queue
157+
/** Send response message to response queue with correlationId for RPC pattern
147158
*/
148159
private def sendResponse(
149160
client: RabbitMQClient,
150161
channel: com.rabbitmq.client.Channel,
151162
responseQueue: String,
152-
message: com.tesobe.obp.adapter.models.InboundMessage
163+
message: InboundMessage,
164+
correlationId: Option[String]
153165
): IO[Unit] = {
154166
for {
155167
// Convert to JSON
@@ -163,8 +175,9 @@ object RabbitMQConsumer {
163175
)
164176
)
165177

166-
// Publish message
167-
_ <- client.publishMessage(channel, responseQueue, json)
178+
// Publish message with correlationId for RPC response matching
179+
_ <- client.publishMessage(channel, responseQueue, json, correlationId = correlationId)
180+
_ <- IO.println(s"[DEBUG] Response published to $responseQueue")
168181

169182
} yield ()
170183
}

0 commit comments

Comments
 (0)