Join us at MongoDB.local London on 7 May to unlock new possibilities for your data. Use WEB50 to save 50%.
Register now >
Docs Menu
Docs Home
/

Real-Time Media Recommendations with Atlas Stream Processing and MongoDB Vector Search

Use cases: Personalization, Gen AI

Industries: Media, Telecommunications

Products and Tools: MongoDB Atlas, MongoDB Vector Search, MongoDB Atlas Stream Processing

Partners: Voyage AI, Azure OpenAI

When a new user lands on your news site, you have seconds to understand what they are looking for before they might lose interest and leave. This introduces the cold start problem: how do you recommend content to users with no prior data about them?

Consider an anonymous user who lands on your site and clicks three articles:

  • "NVIDIA creates new AI chip"

  • "TSMC expands production in Arizona"

  • "Intel stock drops amid delays"

A simple keyword search would only recommend other articles about Intel or Arizona. However, an intelligent system recognizes that all three clicks relate to semiconductor supply chains. This allows it to recommend relevant articles like "The Future of Silicon Valley," even when they don't share keywords with the user's browsing history.

The solution in this article builds an intelligent media-personalization system that ingests and processes clickstream data to generate a natural-language summary of the user's interests and recommend relevant articles that the user is more likely to engage with.

This solution combines:

  • Atlas Stream Processing for real-time data ingestion, enrichment, and LLM integration to summarize user intent from clickstream data

  • Atlas Vector Search with automated Voyage AI embeddings for semantic retrieval and recommendations

This architecture shows how to build an AI-driven media recommendation engine that ingests, processes, and reacts to user behavior in real time using MongoDB Atlas.

An image showing the reference architecture of a media personalization pipeline with Atlas Stream Processing and MongoDB Vector Search
click to enlarge

Figure 1. AI-driven media personalization architecture with Atlas Stream Processing and MongoDB Vector Search

This architecture operates in three phases, which are described in detail in the next sections:

  1. Ingest and Enrich: Capture raw clickstream events from the user-facing application and join them with article metadata in real time using Atlas Stream Processing.

  2. Sessionize and Summarize: Group related clicks into sessions and use an LLM to generate natural-language summaries of user interests.

  3. Search and Serve: Use generated summaries to drive semantic vector search and return personalized recommendations.

In the first phase, this solution architecture uses Atlas Stream Processing to ingest raw clickstream data from your media platform and enrich it with metadata from the articles in your database.

1

This solution uses the following two collections in the news database of the same MongoDB cluster:

2
  1. In Atlas, go to the Stream Processing page for your project.

    1. If it's not already displayed, select the organization that contains your project from the Organizations menu in the navigation bar.

    2. If it's not already displayed, select your project from the Projects menu in the navigation bar.

    3. In the sidebar, click Stream Processing under the Streaming Data heading.

      The Stream Processing page displays.

  2. Click Create a workspace.

  3. On the Create a stream processing workspace page, configure your workspace as follows:

    • Tier: SP30

    • Provider: AWS

    • Region: us-east-1

    • Workspace Name: article-personalization

3

The stream processor needs connections to both your clickstream data and your article metadata to perform the enrichment. Both data sources should be in the same Atlas cluster.

  1. In the pane for your stream processing workspace, click Manage.

  2. In the Connection Registry tab, click + Add Connection in the upper right.

  3. On the Edit Connection page, configure your connection as follows:

    • Connection Type: Atlas Database

    • Connection Name: userevents

    • Atlas Cluster: The name of the cluster where your clickstream and article data lives (for example, ClickstreamCluster)

    • Execute As: Read and write to any database

  4. Click Save changes to create the connection.

4

Create a stream processor named userIntentSummarizer with stages that read raw clickstream events from the user_events collection and enrich the events with article metadata.

  1. On the Stream Processing page for your Atlas project, click Manage in the pane for your stream processing workspace.

  2. In the JSON editor, copy and paste the following JSON definition into the JSON editor text box to define a stream processor named userIntentSummarizer with these stages:

    • $source: Reads raw clickstream events from the user_events collection in the news database of your connected clickstream cluster (userevents connection).

    • $lookup: Joins the raw clickstream events with the articles collection based on the article_id field, bringing in relevant article metadata from the description, keywords, and title fields.

    • $addFields: Projects the description, keywords, and title fields from the article_details field into the top-level of the event stream to make them easily accessible for downstream stages.

    • $project: Projects relevant fields for downstream processing.

    {
    "$source": {
    "connectionName": "userevents",
    "db": "news",
    "collection": "user_events"
    }
    },
    {
    "$lookup": {
    "from": {
    "connectionName": "userevents",
    "db": "news",
    "coll": "articles"
    },
    "localField": "article_id",
    "foreignField": "_id",
    "as": "article_details",
    "pipeline": [
    {
    "$project": {
    "_id": 0,
    "description": 1,
    "keywords": 1,
    "title": 1
    }
    }
    ]
    }
    },
    {
    "$addFields": {
    "description": { "$arrayElemAt": [ "$article_details.description", 0 ] },
    "keywords": { "$arrayElemAt": [ "$article_details.keywords", 0 ] },
    "title": { "$arrayElemAt": [ "$article_details.title", 0 ] }
    }
    },
    {
    "$project": {
    "userId": 1,
    "article_id": 1,
    "eventTime": 1,
    "event_type": 1,
    "device": 1,
    "session_id": 1,
    "description": 1,
    "keywords": 1,
    "title": 1
    }
    },
  3. Click Update stream processor to save your changes.

In this phase, we extend the stream processor pipeline to group related clicks into sessions and use an LLM to generate a natural-language summary of each session that describes the user's interests.

1

Add an external HTTPS connection to your LLM provider (for example, Azure OpenAI) to enable the stream processor to call the LLM directly from the pipeline to enrich your data:

  1. In the pane for your stream processing workspace, click Manage.

  2. In the Connection Registry tab, click + Add Connection in the upper right.

  3. Configure the connection as follows:

    • Connection Type: HTTPS

    • Connection Name: azureopenai

    • URL: The endpoint URL for your Azure OpenAI instance

    • Headers: Add these key-value pairs to the headers:

      • Key: Content-Type, Value: application/json

      • Key: api-key, Value: Your Azure OpenAI API key

2

Add a $sessionWindow stage to your stream processor pipeline to group related events into sessions based on a specified session gap. This solution defines a session as a sequence of events from the same session_id with no inactivity gap longer than 60 seconds.

Add this stage to your userIntentSummarizer pipeline after the enrichment stages:

{
"$sessionWindow": {
"partitionBy": "$session_id",
"gap": { "unit": "second", "size": 60 },
"pipeline": [{
"$group": {
"_id": "$session_id", "titles": {
"$push": "$title"
}
}
}]
}
}
3

Add an $https stage to call your LLM provider directly from your stream processing pipeline. This solution calls Azure OpenAI to generate a natural-language summary of each session that describes the user's interests based on the article titles in the session.

Add this stage to your pipeline after the $sessionWindow stage:

{
"$https": {
"connectionName": "azureopenai",
"method": "POST",
"as": "apiResults",
"config": { "parseJsonStrings": true },
"payload": [
{
"$project": {
"_id": 0,
"model": "gpt-4o-mini",
"response_format": { "type": "json_object" },
"messages": [
{
"role": "system",
"content": "You are an analytical assistant that specializes in behavioral summarization. You analyze short-term reading activity and infer user interests without making personal or sensitive assumptions the create a special field called summary. Summary must be a special field in the response. Respond only in JSON format.Return a JSON object with the following keys in this order: \n reasoning: (Internal scratchpad, briefly explain your analysis) \n user_interests: (The list of inferred interests) \n summary: (A concise summary based on the interests above)"
},
{
"role": "user",
"content": { "$toString": "$titles" }
}
]
}
}
]
}
}

Note

The stream processor itself is "intelligent." It transforms a list of titles into a semantic summary (for example, "User is researching semiconductor manufacturing logistics") before the data reaches the disk. This differs fundamentally from traditional batch processing pipelines, which typically write raw session data to a database, then call an external API from an application server.

4

Add the following stages to the pipeline to extract the summary from the LLM output and write it to a new collection:

  • $match: Filters out any sessions where the LLM call failed and returned an error to avoid writing incomplete data to the database.

  • $addFields: Extracts the summary field from the LLM output and adds it to the top level of the document.

  • $project: Removes the raw LLM output from the document to reduce noise and storage costs.

  • $merge: Writes the resulting documents to a new collection called user_intent in the news database of your clickstream cluster (userevents connection). Each document in this collection represents a user session and contains a summary of the user's interests.

{
"$match": {
"titles": {
"$exists": true
},
"apiResults": {
"$exists": true
}
}
},
{
"$addFields": {
"summary": {
"$arrayElemAt": [
"$apiResults.choices.message.content.summary",
0
]
}
}
},
{
"$project": {
"apiResults": 0
}
},
{
"$merge": {
"into": {
"coll": "user_intent",
"connectionName": "userevents",
"db": "news"
}
}
}
5

When you're ready to start summarizing your clickstream data, click the Start icon for the userIntentSummarizer processor in the list of stream processors for your stream processing workspace.

This pipeline should write documents to the user_intent collection that contain session summaries that capture the user's interests. For example:

{
"_id": "sess-6a0d8837-a5f9-4ef5-8b00-78e9bf7a825c",
"summary": "The user seems interested in geopolitical developments, especially in the Middle East, US political strategies involving Trump, and legal aspects of government operations.",
"titles": [
"Israel and Hamas agree to part of Trump's Gaza peace plan, will free hostages and prisoners",
"Top officials from US and Qatar join talks aimed at brokering peace in Gaza",
"How Trump secured a Gaza breakthrough",
"Ontario's anti-tariff ad is clever, effective and legally sound, experts say",
"Shutdown? Trump's been dismantling the government all year",
"AP News Summary at 7:58 p.m. EDT"
]
}

Finally, we use MongoDB Vector Search to perform semantic search over the article catalog using the session summaries generated in the previous phase to drive personalized content recommendations.

1

Before you can perform semantic search, you need to generate vector embeddings for your article data. To do this, create a MongoDB Vector Search index named vector_index that indexes the description field of your articles collection as the autoEmbed type. This instructs MongoDB Vector Search to use Automated Embedding to automatically generate vector embeddings for the description field whenever documents are inserted or updated in the collection.

Important

Automated embedding is available as a Preview feature only for MongoDB Community Edition v8.2 and later. The feature and the corresponding documentation might change at any time during the Preview period. To learn more, see Preview Features.

Atlas supports manual embedding in all editions of MongoDB.

Use this JSON definition to create a vector index on these fields:

  • The description field as the autoEmbed type to instruct MongoDB Vector Search to automatically generate vector embeddings for the description field using the voyage-4-large embedding model whenever documents are inserted or updated in the collection.

  • The title field as the filter type to prefilter the data for the semantic search using the string value in the field. This allows you to exclude articles the user has already read from search results.

{
"fields": [
{
"type": "autoEmbed",
"modalitytype": "text",
"path": "description",
"model": "voyage-4-large"
},
{
"type": "filter",
"path": "title"
}
]
}
2

When a user visits your site, take the summary of their current session and use it as the query for a vector search against your article catalog. Since you enabled Automated Embedding on the index, MongoDB Vector Search automatically generates the embedding for the summary at query time and uses it as the effective query vector.

This example shows a simplified vector search query that uses the session summary as the query vector and excludes articles the user has already read based on the titles field:

[{
"$vectorSearch": {
"index": "vector_index", // Vector index with autoEmbed on article descriptions
"path": "description",
"query": {
"text": "<session-summary>" // Session summary from user_intent document
},
"numCandidates": 100,
"filter": {
"title": { "$nin": [<read-titles>] } // Exclude articles the array of titles in the user_intent document
}
}
}]

This architecture demonstrates several important advancements in building modern data products:

  • Reduced latency: Embedding LLM calls directly inside the stream processor eliminates multiple network hops and intermediate persistence layers. The system transforms raw clicks into actionable intent in near real time.

  • Enhanced developer experience: Define pipelines with JSON-based MQL, enabling teams that already know MongoDB queries to build advanced streaming and AI-powered workloads without learning new DSLs or provisioning additional infrastructure.

  • Semantic personalization: Move beyond keyword matching and overnight batch jobs to build systems that listen, think, and respond instantly to user behavior.

  • Vinod Krishnan, Solutions Architect, MongoDB

Back

AI-Driven Media Personalization

On this page