Developing memory-rich AI systems with Valkey™, OpenSearch® and RAG
Adding long term memory to agent communication with OpenSearch®, Amazon Bedrock, RAG and LangChain
In this step-by-step tutorial, we'll explore how to build long-term memory into agent-based systems. Essentially, we'll create an agent system that can remember and learn from every interaction. Previously, we established a communication process using Valkey™ for pub/sub messaging to keep agents independent. Our initial architecture looked like this:
In this tutorial, we'll expand on that by introducing several new components:
- Persistent Short-Term Memory: we'll use Apache Kafka to store short-term memory.
- Reflection Mode: at the end of each conversation, fresh data will be processed, vectorized, and stored in OpenSearch® for future use as long-term memory.
- Memory Integration: we'll combine short-term and long-term memories using the Retrieval-Augmented Generation (RAG) pattern for future interactions.
This setup is particularly useful for scenarios like automating customer support, managing IoT devices, or any situation where systems need to operate independently while maintaining a shared state in real-time.
By the end of this tutorial, we'll achieve the following target architecture:
While it might seem complex, don't worry— we'll break it down module by module.
The complete code is available in this GitHub repository. However, you can follow the steps below to build the project from scratch.
Prerequisites
This tutorial builds on concepts from Integrating Valkey Pub/Sub with Amazon Bedrock for AI Agent Communication, which you can also find in this GitHub repository.
To follow along, you'll need:
- AWS access with sufficient permissions to use Amazon Bedrock foundation models.
- NodeJS and npm installed on your machine, or alternatively, you can use GitHub Codespaces or a similar environment with pre-installed libraries.
Step 1. Set up services with Terraform
We'll use Terraform to set up Aiven services, which will allow us to automate the deployment of several services, making it easier to create, deploy, and manage the necessary credentials.
If you don't have Terraform installed, follow the instructions on the Terraform documentation page.
The Terraform files for this project are located in the ./terraform
folder.
Get Aiven token
We'll need an Aiven token to run the Terraform script and access your Aiven account.
To get the Aiven token:
- In the Aiven console, go to the User Information menu and select Tokens:
- Click Generate token, add a description, and generate the token. A popup with the token will appear—copy the token value.
- Rename or create a new file named
./terraform/terraform.tfvars
. - Add your Aiven token to
aiven_api_token
and the project name toproject_name
. - Navigate to the
terraform
folder. - Set
export PROVIDER_AIVEN_ENABLE_BETA=true
in your terminal (Terraform Valkey resource is still in beta). - Run
terraform init
. - Run
terraform plan
. - Run
terraform apply
.
Terraform will create four resources:
- Aiven for Apache Kafka
- Apache Kafka Topic
- Aiven for OpenSearch
- Aiven for caching (Valkey)
Once deployment is complete, Terraform will generate a .env
file with the necessary credentials and a certificates
folder with Apache Kafka certificates.
Add dependency to dotenv
To manage environment variables and credentials, we'll use dotenv
. Install it with:
npm install dotenv
Step 2. Prepare Amazon Bedrock
To work with agents and their memories, we'll need two models:
- LLM: We'll use Amazon Bedrock Claude 3 Haiku.
- Vectorization Model: To translate text into vectors, we'll use Amazon Titan Text Embeddings.
Make sure we have enabled access to these models by following steps from AWS documentation for model access
Step 3. Store current conversation in Apache Kafka
We'll store the ongoing conversation stream in an Apache Kafka topic. This approach has several benefits—it allows replaying the conversation later, which is useful for processing short-term memories and potentially for recovering from failures if we decide to implement such functionality in the future.
To interact with the Apache Kafka service from a NodeJS project, we'll use node-rdkafka
. Install it with:
To send data to an Apache Kafka topic, we first need to set up a producer. Create a file named producer.js
and add this code for a simple Kafka producer:
import Kafka from 'node-rdkafka'; import dotenv from 'dotenv'; dotenv.config(); // Create a producer export const producer = new Kafka.Producer({ 'metadata.broker.list': process.env["KAFKA_SERVICE_URI"], 'security.protocol': 'ssl', 'ssl.key.location': process.env["ssl.key.location"], 'ssl.certificate.location': process.env["ssl.certificate.location"], 'ssl.ca.location': process.env["ssl.ca.location"], 'dr_cb': true }); producer.on('event.log', function (log) { console.log(log); }); // Logging all errors producer.on('event.error', function (err) { console.error(err); }); producer.on('connection.failure', function (err) { console.error(err); }); producer.on('delivery-report', function (err, report) { console.log('Message was delivered' + JSON.stringify(report)); }); producer.on('disconnected', function (arg) { console.log('producer disconnected. ' + JSON.stringify(arg)); }); producer.connect({}, (err) => { if (err) { console.error(err); } });
Next, we integrate this producer into the Agent
class by adding a new function, storeInKafka
:
storeInKafka(topic, message) { producer.produce( topic, null, Buffer.from(message), null, Date.now() ); producer.flush(); }
With this new function, every time an agent receives a message, it will send it to Apache Kafka. Update the startToListenToOthers
function to invoke storeInKafka
:
startToListenToOthers() { const subscriber = subscribe(this.agentName); subscriber.on('message', async (channel, message) => { const parsedMessage = JSON.parse(message); this.storeInKafka(this.conversationTopic, message); await delay(1000); await this.replyToMessage(parsedMessage.message, parsedMessage.agent); }); }
Since agents need to share a common topic for ongoing conversations, it makes sense to pass the topic name when creating agents. Update the Agent
constructor to include a conversationTopic
:
constructor(agentName, anotherAgent, starts, conversationTopic) { this.conversationTopic = conversationTopic; ....
To ensure each conversation has a unique topic, we'll use the timestamp of the conversation's start. This approach guarantees that all independent conversations have distinct topic names. Modify run.js
to calculate the topic name and provide it to the agents.
const kafkaTopic = Date.now().toString(); const nick = new Agent('Nick', 'Judy', false, kafkaTopic); nick.start(); const judy = new Agent('Judy', 'Nick', true, kafkaTopic); judy.start();
In the next step we generate reflections based on the data that is stored in each unique conversation topic.
Step 4. Generate reflections
To generate conversation summaries (reflections) for each agent we'll do these steps:
- Create a consumer to read complete data from the latest conversation topic.
- Add a new prompt that instructs the model to summarize the conversation from the agent's perspective.
- Call the model with the conversation body.
- Store the model-generated summary in a different Kafka topic.
To consume all records from a given topic, calculate the latest offset. Create a new file consumer.js
and add the code to export the consumeAll
function:
import Kafka from 'node-rdkafka'; import dotenv from 'dotenv'; dotenv.config(); export const consumeAll = async (topic, groupId) => { return new Promise((resolve, reject) => { console.log('Initializing Kafka Consumer...'); const consumer = new Kafka.KafkaConsumer({ 'group.id': groupId, 'metadata.broker.list': process.env["KAFKA_SERVICE_URI"], 'security.protocol': 'ssl', 'ssl.key.location': process.env["ssl.key.location"], 'ssl.certificate.location': process.env["ssl.certificate.location"], 'ssl.ca.location': process.env["ssl.ca.location"], 'enable.auto.commit': false }, { 'auto.offset.reset': 'earliest' }); const messages = []; let latestOffset; consumer.on('ready', () => { console.log('Consumer is ready, querying watermark offsets...'); consumer.queryWatermarkOffsets(topic, 0, 1000, (err, offsets) => { if (err) { console.error('Error querying watermark offsets:', err); return reject(err); } latestOffset = offsets.highOffset; console.log(`Latest offset for topic ${topic} is ${latestOffset}`); consumer.subscribe([topic]); console.log(`Subscribed to topic ${topic}, starting consumption...`); consumer.consume(); }); }); consumer.on('data', (data) => { console.log('Received data:', data); const messageOffset = data.offset; console.log(`Message offset: ${messageOffset}, Latest offset: ${latestOffset}`); messages.push(data.value.toString()); console.log('Message added to the list.', data.value.toString()); if (messageOffset === latestOffset - 1) { console.log('Reached the latest offset, disconnecting...'); consumer.disconnect(); } }); consumer.on('disconnected', () => { console.log('Consumer disconnected'); resolve(messages); }); consumer.on('event.error', (err) => { console.error('Error event:', err); reject(err); }); consumer.on('event.log', (log) => { console.log('Log event:', log); }); consumer.on('connection.failure', (err) => { console.error('Connection failure:', err); }); console.log('Connecting to Kafka...'); consumer.connect(); }); };
Update prompt.js
with a new prompt for generating a conversation summary:
export const getConversationSummaryPrompt = (agentName, content) => `You're an inhabitant of a planet Hipola, a very small and cosy planet. Your name is ${agentName}. you met another citizen and had this conversation: ${content}. Reflect on this conversation and summarize in one most important thought that is worth remembering about the person you met. Output only the thought. Remember, you're ${agentName}.`;
In the Agent
class, add a reflect
method to read all messages from the conversation topic, request a summary from the model, and store the summary in a new topic. Each agent will have its own topic for reflections:
async reflect() { const messages = await consumeAll(this.conversationTopic, $${this.conversationTopic}-${this.agentName}`); const summary = await this.getConversationSummary(messages.join("; ")); this.storeInKafka($${this.agentName}-reflections`, summary); } async getConversationSummary(content) { const prompt = getConversationSummaryPrompt(this.agentName, content); return await invokeModel(prompt); }
Finally, we'll connect everything using an additional Valkey channel that triggers the conversation reflection once a conversation ends:
async triggerReflection(recipient) { await sendToChannel($${recipient}-internalize`, "Reflect on the conversation"); await sendToChannel($${this.agentName}-internalize`, "Reflect on the conversation"); } async replyToMessage(message, recipient) { //agent indicated that no longer wants to continue conversation if (message && message.includes("END")) { return await this.triggerReflection(recipient); } const prompt = await this.getPrompt(message); console.log(`### ${this.agentName.toUpperCase()} PROMPT: ###`) console.log("prompt: " + this.agentName, prompt) const response = await invokeModel(prompt); console.log(`=== ${this.agentName.toUpperCase()} SAYS: ===`) console.log($${response}`); if (message) { this.shortMemory.push($${recipient} said: ${message}`) } this.shortMemory.push(`You replied: ${response}`); sendToChannel(recipient, JSON.stringify({agent: this.agentName, message: response})); } waitToConversationEnd() { const subscriber = subscribe( $${this.agentName}-internalize`); subscriber.on('message', async (channel) => { if (channel !== $${this.agentName}-internalize`) return; await this.reflect(); }); } async start() { // listen what another agent tells you this.startToListenToOthers(); // get ready to process the conversation this.waitToConversationEnd(); if (this.starts) { await this.replyToMessage(null, this.anotherAgent); } }
Step 5. Vectorise reflections and store in OpenSearch
Now that reflections are sent to Apache Kafka whenever agents converse, we need to add these reflections as long-term memories for each agent. We'll use the Retrieval Augmented Generation (RAG) pattern and the LangChain framework for this purpose:
- Vectorize long-term memories and store them in OpenSearch.
- Combine short-term memory with reflections by searching for relevant memories using vector search.
In order to install LangChain library that can work with OpenSearch and Amazon Bedrock run:
npm install @langchain/aws @langchain/community @langchain/core langchain
We also need to install OpenSearch NodeJS client:
npm install @opensearch-project/opensearch
Once you have the necessary libraries, create a new file named vectorize.js
. This file will handle data consumption from the reflection topic and send it to the OpenSearch index:
import Kafka from 'node-rdkafka'; import { Client } from "@opensearch-project/opensearch"; import { Document } from "langchain/document"; import { BedrockEmbeddings } from "@langchain/aws"; import { OpenSearchVectorStore } from "@langchain/community/vectorstores/opensearch"; import dotenv from 'dotenv'; dotenv.config(); const client = new Client({ nodes: [process.env.OPENSEARCH_SERVICE_URI], }); const consumeAndIndex = (topicName) => { // Kafka consumer setup const consumer = new Kafka.KafkaConsumer({ 'group.id': 'kafka-group', 'metadata.broker.list': process.env["KAFKA_SERVICE_URI"], 'security.protocol': 'ssl', 'ssl.key.location': process.env["ssl.key.location"], 'ssl.certificate.location': process.env["ssl.certificate.location"], 'ssl.ca.location': process.env["ssl.ca.location"], }, {}); consumer.connect(); consumer.on('ready', () => { console.log('Consumer ready'); consumer.subscribe([topicName]); consumer.consume(); }).on('data', async (data) => { const messageValue = data.value.toString(); // Process the message and create a Document const doc = new Document({ metadata: { source: 'kafka' }, pageContent: messageValue, }); // Create embeddings and send to OpenSearch try { const embeddings = new BedrockEmbeddings({ region: 'us-east-1', credentials: { secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY, accessKeyId: process.env.AWS_ACCESS_KEY_ID }, model: "amazon.titan-embed-text-v1", }); await OpenSearchVectorStore.fromDocuments([doc], embeddings, { client, indexName: topicName.toLowerCase(), }); console.log('Document indexed successfully:', doc); } catch (error) { console.error('Error indexing document:', error); } }); consumer.on('event.error', (err) => { console.error('Error from consumer:', err); }); }; export default consumeAndIndex;
Although the consumeAndIndex
method could be run as a separate process (since it’s independent of the agent), we'll keep all calls within run.js
for simplicity:
import Agent from './src/agent.js'; import consumeAndIndex from "./src/vectorize.js"; const kafkaTopic = Date.now().toString(); const nick = new Agent('Nick', 'Judy', false, kafkaTopic); consumeAndIndex("Nick-reflections"); nick.start(); const judy = new Agent('Judy', 'Nick', true, kafkaTopic); judy.start(); consumeAndIndex("Judy-reflections");
Step 6. Integrate short-term and long-term Memory
To enhance the agent's responses, we'll integrate short-term memory with the long-term memories stored in OpenSearch. For this we'll do the following:
- Add
LongMemoryService
: this service will provide a method to retrieve relevant long-term memory based on the current conversation. - Update Prompts: modify the prompts used for agent communication to include older memories.
Create a file called longTermMemory.js
that utilizes LangChain libraries to interface with both the Claude model from Bedrock and the OpenSearch vector store:
import { BedrockChat } from "@langchain/community/chat_models/bedrock"; import dotenv from 'dotenv'; import { Client } from "@opensearch-project/opensearch"; import { OpenSearchVectorStore } from "@langchain/community/vectorstores/opensearch"; import { BedrockEmbeddings } from "@langchain/aws"; import { VectorDBQAChain } from "langchain/chains"; dotenv.config(); export class LongMemoryService { constructor(indexName) { this.indexName = indexName; this.model = new BedrockChat({ model: "anthropic.claude-3-haiku-20240307-v1:0", region: "us-east-1", credentials: { secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY, accessKeyId: process.env.AWS_ACCESS_KEY_ID }, }); this.client = new Client({ nodes: [process.env.OPENSEARCH_SERVICE_URI], }); this.vectorStore = new OpenSearchVectorStore(new BedrockEmbeddings({ region: 'us-east-1', credentials: { secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY, accessKeyId: process.env.AWS_ACCESS_KEY_ID }, model: "amazon.titan-embed-text-v1" }), { client: this.client, indexName: indexName, }); this.chain = VectorDBQAChain.fromLLM(this.model, this.vectorStore, { k: 1, returnSourceDocuments: true, }); } async indexExists() { try { const response = await this.client.indices.exists({ index: this.indexName }); return response.body; } catch (error) { console.error('Error checking if index exists:', error); return false; } } async getLongMemory(query) { const indexExists = await this.indexExists(); if (!indexExists) { return ''; } const response = await this.chain.call({ query }); return response.text; } }
Next, update the Agent
class to incorporate the long-memory service and the method to query it:
constructor(agentName, anotherAgent, starts,conversationTopic) { console.log({ conversationTopic }) this.agentName = agentName; this.anotherAgent = anotherAgent; this.shortMemory = []; this.starts = starts; this.conversationTopic = conversationTopic; this.longMemoryService = new LongMemoryService($${this.agentName.toLowerCase()}-reflections`); } async queryLongTermMemory(message) { const longmemory = await this.longMemoryService.getLongMemory(`\n\nHuman: ${message} \n\nAssistant:`); console.log("******* " + this.agentName.toUpperCase() + " LONG MEMORY: " + longmemory); console.log("************************************************************************************"); return longmemory; }
Revise the prompts in prompts.js
to factor in long-term memories:
export const getPromptStart = (agentName) => `You're an inhabitant of a planet Hipola, a very small and cosy planet. Your name is ${agentName}.`; export const instructions = `Always follow these instructions: - if it is the first time you meet this inhabitant, introduce yourself and learn their name; - if you met this person before or already know something about them - do not introduce yourself, but relate to the previous conversation - if it's ongoing conversation, don't introduce yourself, just continue the conversation, reply or ask question, be natural; - after a couple of exchanged messages politely say goodbye - answer the questions of the other inhabitant; - try to finish the topic and when you're done with the conversation for today respond with "[END]"; `; export const getMemoryPrompt = (agentName, anotherAgent) => `The context are memories of ${agentName}. Are there any memories or thoughts about ${anotherAgent}? If yes, respond with "You remember meeting ${anotherAgent}, what you remember is that .... [continue based on the additional context]". If there is no info about ${anotherAgent} in the context respond with "You haven't met ${anotherAgent} before". Don't provide any other judgement or additional information.`; export const getContinuationMemoryPrompt = (agentName, anotherAgent, message) => `The context are memories of ${agentName}. Are there any memories or thoughts about ${anotherAgent} relevant to the message "${message}"? If yes return "Something that I remember from past conversations with ${anotherAgent} is that .... [continue with a concise list of notes]". Otherwise, if there is no relevant context return "nothing relevant that I remember" and be very very very short and don't provide any other judgement or additional information!`; export const getStartConversationPrompt = (agentName, memoriesOfOtherAgent) => $${getPromptStart(agentName)} ${memoriesOfOtherAgent}.\n\n${instructions}`; export const getContinueConversationPrompt = (agentName, memoryString, longTermMemory, message) => ` ${getPromptStart(agentName)} You're meeting another inhabitant. This is the conversation so far:\n${memoryString}\n\n\n\n This is what you remember about them from previous interactions that is relevant to their phrase:\n${longTermMemory} Reply to this message from another inhabitant from the planet Hipola: "${message}" and ask a relevant question to continue the conversation. If you already had several messages exchanged, politely say goodbye and end conversation. Be concise. Remember, you're ${agentName}. ${instructions}`;
Finally, adjust the getPrompt
method in the Agent
class to integrate long-term memories into the prompt:
async getPrompt(message) { // start of the conversation: if (!message) { const memoriesOfOtherAgent = await this.queryLongTermMemory(getMemoryPrompt(this.agentName, this.anotherAgent)); return getStartConversationPrompt(this.agentName, memoriesOfOtherAgent); } // continuation of the conversation: let memoryString = this.shortMemory.join('\n'); let longTermMemory = await this.queryLongTermMemory(getContinuationMemoryPrompt(this.agentName, this.anotherAgent, message)); return getContinueConversationPrompt(this.agentName, memoryString, longTermMemory, message); }
Step 7. Run
It's time to run our agents!
node run
Observe that, at the end of a conversation, the system will enter reflection mode. You can also monitor the data stored in Apache Kafka topics:
- For ongoing conversations:
- For conversation summaries of each agent:
Run multiple conversations to verify that agents are recognizing each other.
Conclusion and next steps
In this tutorial, we’ve built a system that enables agents to retain and learn from interactions through long-term memory.
If you're curious to learn more things you can do with Aiven and AI look at:
- Building a real-time AI pipeline for data analysis with Apache Flink® and OpenAI
- Applying RAG pattern to navigate your knowledge store
- When text meets image: a guide to OpenSearch® for multimodal search
- Find your perfect movie with ClickHouse®, vector search, Hugging Face API, and Next.js
- TensorFlow, PostgreSQL®, PGVector & Next.js: building a movie recommender