A real-time fraud detection demo built with Spring Boot, Apache Kafka, MongoDB Atlas, and AI-generated embeddings.
It simulates financial transactions, stores them in MongoDB, and uses Atlas Vector Search + Change Streams to flag anomalies as potential fraud.
This app is the finished code for the tutorial: “Building a Real-Time AI Fraud Detection System with Spring Kafka and MongoDB.”
End-to-end pipeline:
- Seed customers with predictable spending habits into MongoDB (
customerscollection). - Seed historical transactions for each customer into MongoDB (
transactionscollection), including embeddings. - Produce live transactions every ~100ms:
- generate a synthetic transaction
- compute an embedding (OpenAI via Spring AI)
- publish to Kafka topic
transactions
- Consume transactions from Kafka and insert into MongoDB.
- Listen to MongoDB Change Streams for new transaction inserts.
- Run Atlas Vector Search to compare the new transaction embedding to the user’s prior transactions.
- Mark as fraud when:
- no similar transactions exist for that user, or
- any similar transaction returned is already marked as fraud.
- Spring Boot app
- Kafka Producer (generates transactions)
- Kafka Consumer (stores transactions)
- Change Stream listener (reacts to inserts)
- Vector search service (evaluates fraud)
- Kafka
- Topic:
transactions
- Topic:
- MongoDB Atlas
- Database:
fraud - Collections:
customers,transactions - Atlas Vector Search index on
transactions.embedding
- Database:
- Java 21
- Maven 3.9+
- MongoDB Atlas cluster (M0 is fine)
- Kafka 3.5+ (local install or Docker)
- OpenAI API key (for embeddings via Spring AI)
In Atlas:
- Database:
fraud - Collections:
customerstransactions
Create a Vector Search index on the transactions collection.
- Path:
embedding - Dimensions:
1536(fortext-embedding-3-small) - Similarity:
dotProduct
Use this index definition:
{
"fields": [
{
"type": "vector",
"path": "embedding",
"numDimensions": 1536,
"similarity": "dotProduct"
}
]
}Make sure the index name matches what the code expects (default in the tutorial code is
vector_index).
Edit src/main/resources/application.properties:
spring.application.name=frauddetector
# MongoDB
spring.data.mongodb.uri=<YOUR_MONGODB_ATLAS_CONNECTION_STRING>
spring.data.mongodb.database=fraud
# Spring AI (OpenAI embeddings)
spring.ai.openai.api-key=<YOUR_OPENAI_API_KEY>
spring.ai.openai.embedding.options.model=text-embedding-3-small
# Kafka
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=fraud-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.mongodb.frauddetector.modelIf you’re starting Kafka for the first time in standalone mode:
export KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.propertiesStart the broker:
bin/kafka-server-start.sh config/server.propertiesCreate the transactions topic:
bin/kafka-topics.sh --create \
--topic transactions \
--bootstrap-server localhost:9092 \
--partitions 1 \
--replication-factor 1From the project root:
mvn clean spring-boot:runOn startup, the app will:
- seed customers (if
customersis empty) - seed baseline transactions (if
transactionsis empty) - start producing live transactions to Kafka
- consume them into MongoDB
- start the change stream listener
- perform vector search per insert and mark suspicious transactions as fraud
Example logs when everything is wired correctly:
- customers seeded
- transactions seeded
- producer sending to Kafka
- consumer persisting to MongoDB
- change stream detecting inserts
- vector search running
- some transactions marked as fraud
You’ll see lines like:
Transaction sent to topic transactions
New transaction detected: <uuid>
Performing vector search
Transaction marked as fraud: <uuid>
-
config/MongoDBConfig(exposesMongoDatabase+MongoCollection<Document>)OpenAIConfig(wires Spring AI embedding model)
-
enums/Category,Currency,Merchant
-
model/Customer(spending profile + helpers)Transaction(transaction data + embedding payload)
-
repository/CustomerRepository,TransactionRepository
-
service/CustomerSeeder(seed customers on startup)TransactionSeeder(seed baseline transactions + start change streams)TransactionProducer(generate+embed+publish every 100ms)TransactionConsumer(Kafka listener -> MongoDB)TransactionChangeStreamListener(listen for inserts -> evaluate)TransactionVectorSearchService(vector search + fraud decision)EmbeddingGenerator(OpenAI embedding call)
A new transaction is marked as fraud if:
- no similar transactions are returned for the user, or
- any similar transaction returned is already flagged as fraud
This is intentionally simple to keep the demo focused on the plumbing: Kafka → MongoDB → Change Streams → Vector Search → update doc.
This demo is optimized for clarity, not maximum throughput.
Common extensions:
-
Move evaluation into Atlas Triggers to reduce app-side change stream overhead.
-
Add an in-memory cache of recent embeddings per user to reduce vector queries.
-
Introduce a fraud “risk score” (amount spikes, unusual currency, new merchant, etc.) and only run vector search for high-risk events.
-
Adjust vector search strategy:
- don’t filter by user (if fraud patterns generalize across users)
- increase
SEARCH_LIMIT/ tuneNUM_CANDIDATES