Skip to content

A real-time fraud detection demo built with Spring Boot, Apache Kafka, and MongoDB Atlas Vector Search. It generates synthetic transactions, embeds each one with OpenAI via Spring AI, streams them through Kafka into MongoDB, and uses Change Streams + vector similarity against a user’s history to flag anomalous transactions as potential fraud.

Notifications You must be signed in to change notification settings

mongodb-developer/frauddetector

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

4 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

frauddetector

A real-time fraud detection demo built with Spring Boot, Apache Kafka, MongoDB Atlas, and AI-generated embeddings.
It simulates financial transactions, stores them in MongoDB, and uses Atlas Vector Search + Change Streams to flag anomalies as potential fraud.

This app is the finished code for the tutorial: “Building a Real-Time AI Fraud Detection System with Spring Kafka and MongoDB.”


What it does

End-to-end pipeline:

  1. Seed customers with predictable spending habits into MongoDB (customers collection).
  2. Seed historical transactions for each customer into MongoDB (transactions collection), including embeddings.
  3. Produce live transactions every ~100ms:
    • generate a synthetic transaction
    • compute an embedding (OpenAI via Spring AI)
    • publish to Kafka topic transactions
  4. Consume transactions from Kafka and insert into MongoDB.
  5. Listen to MongoDB Change Streams for new transaction inserts.
  6. Run Atlas Vector Search to compare the new transaction embedding to the user’s prior transactions.
  7. Mark as fraud when:
    • no similar transactions exist for that user, or
    • any similar transaction returned is already marked as fraud.

Architecture (high level)

  • Spring Boot app
    • Kafka Producer (generates transactions)
    • Kafka Consumer (stores transactions)
    • Change Stream listener (reacts to inserts)
    • Vector search service (evaluates fraud)
  • Kafka
    • Topic: transactions
  • MongoDB Atlas
    • Database: fraud
    • Collections: customers, transactions
    • Atlas Vector Search index on transactions.embedding

Prerequisites

  • Java 21
  • Maven 3.9+
  • MongoDB Atlas cluster (M0 is fine)
  • Kafka 3.5+ (local install or Docker)
  • OpenAI API key (for embeddings via Spring AI)

MongoDB setup

1) Create the database + collections

In Atlas:

  • Database: fraud
  • Collections:
    • customers
    • transactions

2) Create the Atlas Vector Search index

Create a Vector Search index on the transactions collection.

  • Path: embedding
  • Dimensions: 1536 (for text-embedding-3-small)
  • Similarity: dotProduct

Use this index definition:

{
  "fields": [
    {
      "type": "vector",
      "path": "embedding",
      "numDimensions": 1536,
      "similarity": "dotProduct"
    }
  ]
}

Make sure the index name matches what the code expects (default in the tutorial code is vector_index).


Configuration

Edit src/main/resources/application.properties:

spring.application.name=frauddetector

# MongoDB
spring.data.mongodb.uri=<YOUR_MONGODB_ATLAS_CONNECTION_STRING>
spring.data.mongodb.database=fraud

# Spring AI (OpenAI embeddings)
spring.ai.openai.api-key=<YOUR_OPENAI_API_KEY>
spring.ai.openai.embedding.options.model=text-embedding-3-small

# Kafka
spring.kafka.bootstrap-servers=localhost:9092

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=fraud-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.mongodb.frauddetector.model

Running Kafka (standalone / KRaft)

If you’re starting Kafka for the first time in standalone mode:

export KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties

Start the broker:

bin/kafka-server-start.sh config/server.properties

Create the transactions topic:

bin/kafka-topics.sh --create \
  --topic transactions \
  --bootstrap-server localhost:9092 \
  --partitions 1 \
  --replication-factor 1

Run the app

From the project root:

mvn clean spring-boot:run

On startup, the app will:

  • seed customers (if customers is empty)
  • seed baseline transactions (if transactions is empty)
  • start producing live transactions to Kafka
  • consume them into MongoDB
  • start the change stream listener
  • perform vector search per insert and mark suspicious transactions as fraud

What you should see

Example logs when everything is wired correctly:

  • customers seeded
  • transactions seeded
  • producer sending to Kafka
  • consumer persisting to MongoDB
  • change stream detecting inserts
  • vector search running
  • some transactions marked as fraud

You’ll see lines like:

Transaction sent to topic transactions
New transaction detected: <uuid>
Performing vector search
Transaction marked as fraud: <uuid>

Key modules (code map)

  • config/

    • MongoDBConfig (exposes MongoDatabase + MongoCollection<Document>)
    • OpenAIConfig (wires Spring AI embedding model)
  • enums/

    • Category, Currency, Merchant
  • model/

    • Customer (spending profile + helpers)
    • Transaction (transaction data + embedding payload)
  • repository/

    • CustomerRepository, TransactionRepository
  • service/

    • CustomerSeeder (seed customers on startup)
    • TransactionSeeder (seed baseline transactions + start change streams)
    • TransactionProducer (generate+embed+publish every 100ms)
    • TransactionConsumer (Kafka listener -> MongoDB)
    • TransactionChangeStreamListener (listen for inserts -> evaluate)
    • TransactionVectorSearchService (vector search + fraud decision)
    • EmbeddingGenerator (OpenAI embedding call)

Fraud decision rule (demo logic)

A new transaction is marked as fraud if:

  • no similar transactions are returned for the user, or
  • any similar transaction returned is already flagged as fraud

This is intentionally simple to keep the demo focused on the plumbing: Kafka → MongoDB → Change Streams → Vector Search → update doc.


Limitations and next steps

This demo is optimized for clarity, not maximum throughput.

Common extensions:

  • Move evaluation into Atlas Triggers to reduce app-side change stream overhead.

  • Add an in-memory cache of recent embeddings per user to reduce vector queries.

  • Introduce a fraud “risk score” (amount spikes, unusual currency, new merchant, etc.) and only run vector search for high-risk events.

  • Adjust vector search strategy:

    • don’t filter by user (if fraud patterns generalize across users)
    • increase SEARCH_LIMIT / tune NUM_CANDIDATES

About

A real-time fraud detection demo built with Spring Boot, Apache Kafka, and MongoDB Atlas Vector Search. It generates synthetic transactions, embeds each one with OpenAI via Spring AI, streams them through Kafka into MongoDB, and uses Change Streams + vector similarity against a user’s history to flag anomalous transactions as potential fraud.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages