diff --git a/agent_memory_server/config.py b/agent_memory_server/config.py index edd65f3..ccd826a 100644 --- a/agent_memory_server/config.py +++ b/agent_memory_server/config.py @@ -489,6 +489,7 @@ class Settings(BaseSettings): # Compaction settings compaction_every_minutes: int = 10 + compact_semantic_duplicates: bool = True # Docket task timeout for LLM-dependent tasks (in minutes) # This controls how long tasks like memory compaction, extraction, and summarization diff --git a/agent_memory_server/long_term_memory.py b/agent_memory_server/long_term_memory.py index afcee76..c746626 100644 --- a/agent_memory_server/long_term_memory.py +++ b/agent_memory_server/long_term_memory.py @@ -515,10 +515,20 @@ async def extract_memory_structure( topics_joined = "|".join(merged_topics) if merged_topics else "" entities_joined = "|".join(merged_entities) if merged_entities else "" - await redis.hset( - Keys.memory_key(memory.id), + # Guard: only update if the key still exists. A race between semantic + # deduplication (which deletes merged keys) and this background task + # can recreate deleted keys as orphaned hashes with only topics/entities. + # Use HSETEX with FXX to atomically update only if the fields already + # exist on the hash — a deleted key has no fields, so nothing is written. + key = Keys.memory_key(memory.id) + result = await redis.hsetex( + key, mapping={"topics": topics_joined, "entities": entities_joined}, - ) # type: ignore + data_persist_option="FXX", + keepttl=True, + ) + if result == 0: + logger.info(f"Skipping topic/entity update for deleted memory {memory.id}") async def merge_memories_with_llm( @@ -640,7 +650,7 @@ async def compact_long_term_memories( redis_client: Redis | None = None, vector_distance_threshold: float = 0.2, compact_hash_duplicates: bool = True, - compact_semantic_duplicates: bool = True, + compact_semantic_duplicates: bool | None = None, perpetual: Perpetual = Perpetual( every=timedelta(minutes=settings.compaction_every_minutes), automatic=True ), @@ -655,6 +665,9 @@ async def compact_long_term_memories( Returns the count of remaining memories after compaction. """ + if compact_semantic_duplicates is None: + compact_semantic_duplicates = settings.compact_semantic_duplicates + if not redis_client: redis_client = await get_redis_conn() @@ -1020,8 +1033,8 @@ async def index_long_term_memories( else: current_memory = deduped_memory or current_memory - # Check for semantic duplicates - if not was_deduplicated: + # Check for semantic duplicates (respects compact_semantic_duplicates setting) + if not was_deduplicated and settings.compact_semantic_duplicates: deduped_memory, was_merged = await deduplicate_by_semantic_search( memory=current_memory, redis_client=redis, diff --git a/examples/agent_memory_server_interactive_guide.ipynb b/examples/agent_memory_server_interactive_guide.ipynb index ca0d8b4..0c30836 100644 --- a/examples/agent_memory_server_interactive_guide.ipynb +++ b/examples/agent_memory_server_interactive_guide.ipynb @@ -98,14 +98,14 @@ ], "source": [ "# Import required libraries\n", - "import asyncio\n", "import json\n", "import os\n", - "from datetime import datetime, timezone\n", + "from datetime import datetime\n", "\n", "import httpx\n", "from dotenv import load_dotenv\n", "\n", + "\n", "# Load environment variables from .env file\n", "load_dotenv()\n", "\n", @@ -115,7 +115,9 @@ "\n", "print(f\"Base URL: {BASE_URL}\")\n", "print(f\"Namespace: {NAMESPACE}\")\n", - "print(f\"OpenAI API Key: {'✓ loaded' if os.environ.get('OPENAI_API_KEY') else '✗ not found'}\")" + "print(\n", + " f\"OpenAI API Key: {'✓ loaded' if os.environ.get('OPENAI_API_KEY') else '✗ not found'}\"\n", + ")" ] }, { @@ -145,8 +147,9 @@ " response.raise_for_status()\n", " return response.json()\n", "\n", + "\n", "health = await health_check()\n", - "print(f\"Server is healthy!\")\n", + "print(\"Server is healthy!\")\n", "print(f\"Server timestamp: {datetime.fromtimestamp(health['now'] / 1000)}\")" ] }, @@ -173,14 +176,15 @@ "# Initialize the Python SDK client\n", "from agent_memory_client import MemoryAPIClient, MemoryClientConfig\n", "\n", + "\n", "config = MemoryClientConfig(\n", " base_url=BASE_URL,\n", " timeout=30.0,\n", - " default_namespace=NAMESPACE # \"travel_agent\" - same as the travel agent example\n", + " default_namespace=NAMESPACE, # \"travel_agent\" - same as the travel agent example\n", ")\n", "\n", "client = MemoryAPIClient(config)\n", - "print(f\"Memory client initialized!\")\n", + "print(\"Memory client initialized!\")\n", "print(f\"Default namespace: {config.default_namespace}\")" ] }, @@ -336,12 +340,13 @@ "# Pattern 1 Code driven\n", "\n", "from agent_memory_client.models import (\n", - " MemoryMessage, \n", - " WorkingMemory, \n", - " ClientMemoryRecord, \n", - " MemoryTypeEnum\n", + " ClientMemoryRecord,\n", + " MemoryMessage,\n", + " MemoryTypeEnum,\n", + " WorkingMemory,\n", ")\n", "\n", + "\n", "# Use \"nitin\" as our demo user - matching the travel agent example\n", "SESSION_ID = \"nitin-travel-session\"\n", "USER_ID = \"nitin\"\n", @@ -349,9 +354,7 @@ "\n", "# Step 1: Create/get a working memory session for Nitin\n", "created, working_memory = await client.get_or_create_working_memory(\n", - " session_id=SESSION_ID,\n", - " namespace=NAMESPACE,\n", - " user_id=USER_ID\n", + " session_id=SESSION_ID, namespace=NAMESPACE, user_id=USER_ID\n", ")\n", "\n", "print(f\"Session {'created' if created else 'already existed'}: {SESSION_ID}\")\n", @@ -381,12 +384,6 @@ ], "source": [ "# Diff session\n", - "from agent_memory_client.models import (\n", - " MemoryMessage,\n", - " WorkingMemory,\n", - " ClientMemoryRecord,\n", - " MemoryTypeEnum\n", - ")\n", "\n", "# Use \"nitin\" as our demo user - matching the travel agent example\n", "SESSION_ID = \"nitin-travel-session-2\"\n", @@ -395,9 +392,7 @@ "\n", "# Step 1: Create/get a working memory session for Nitin\n", "created, working_memory = await client.get_or_create_working_memory(\n", - " session_id=SESSION_ID,\n", - " namespace=NAMESPACE,\n", - " user_id=USER_ID\n", + " session_id=SESSION_ID, namespace=NAMESPACE, user_id=USER_ID\n", ")\n", "\n", "print(f\"Session {'created' if created else 'already existed'}: {SESSION_ID}\")\n", @@ -539,7 +534,6 @@ } ], "source": [ - "\n", "# Nitin's travel preferences (from the travel agent demo)\n", "memories_to_store = [\n", " ClientMemoryRecord(\n", @@ -547,36 +541,36 @@ " memory_type=MemoryTypeEnum.SEMANTIC,\n", " topics=[\"travel\", \"preferences\"],\n", " user_id=USER_ID,\n", - " namespace=NAMESPACE\n", + " namespace=NAMESPACE,\n", " ),\n", " ClientMemoryRecord(\n", " text=\"Vegetarian diet - needs vegetarian restaurant options\",\n", " memory_type=MemoryTypeEnum.SEMANTIC,\n", " topics=[\"travel\", \"preferences\"],\n", " user_id=USER_ID,\n", - " namespace=NAMESPACE\n", + " namespace=NAMESPACE,\n", " ),\n", " ClientMemoryRecord(\n", " text=\"Extra leg room on flights (premium economy or exit row and budget allows, anything goes)\",\n", " memory_type=MemoryTypeEnum.SEMANTIC,\n", " topics=[\"travel\", \"preferences\"],\n", " user_id=USER_ID,\n", - " namespace=NAMESPACE\n", + " namespace=NAMESPACE,\n", " ),\n", " ClientMemoryRecord(\n", " text=\"Prefers hotels with good amenities\",\n", " memory_type=MemoryTypeEnum.SEMANTIC,\n", " topics=[\"travel\", \"preferences\"],\n", " user_id=USER_ID,\n", - " namespace=NAMESPACE\n", + " namespace=NAMESPACE,\n", " ),\n", " ClientMemoryRecord(\n", " text=\"Enjoys technology, sports, outdoords, and innovation hubs\",\n", " memory_type=MemoryTypeEnum.SEMANTIC,\n", " topics=[\"travel\", \"preferences\"],\n", " user_id=USER_ID,\n", - " namespace=NAMESPACE\n", - " )\n", + " namespace=NAMESPACE,\n", + " ),\n", "]\n", "\n", "print(f\"Storing {len(memories_to_store)} preferences for user '{USER_ID}'...\")\n", @@ -622,8 +616,9 @@ } ], "source": [ - "\n", "import time\n", + "\n", + "\n", "time.sleep(2) # Wait for indexing (memories need to be embedded)\n", "\n", "user_query = \"I'm planning a trip to Japan. What should I know about my preferences?\"\n", @@ -642,11 +637,9 @@ " long_term_search={\n", " \"limit\": 5,\n", " \"distance_threshold\": 0.7,\n", - " \"user_id\": {\"eq\": USER_ID} # Only search Nitin's memories\n", - " }\n", - ")\n", - "\n", - "\n" + " \"user_id\": {\"eq\": USER_ID}, # Only search Nitin's memories\n", + " },\n", + ")" ] }, { @@ -826,19 +819,24 @@ } ], "source": [ - "\n", "messages = [\n", " MemoryMessage(role=\"user\", content=\"I'm planning a trip to Japan next month!\"),\n", - " MemoryMessage(role=\"assistant\", content=\"Exciting! Based on your preferences, I know you enjoy hiking and vegetarian food. Japan has amazing options for both!\"),\n", - " MemoryMessage(role=\"user\", content=\"Yes! I'd love to hike Mount Fuji and find good vegetarian ramen.\"),\n", - " MemoryMessage(role=\"assistant\", content=\"Perfect! I'll remember your interest in Mount Fuji. For vegetarian ramen, Kyoto has excellent options.\")\n", + " MemoryMessage(\n", + " role=\"assistant\",\n", + " content=\"Exciting! Based on your preferences, I know you enjoy hiking and vegetarian food. Japan has amazing options for both!\",\n", + " ),\n", + " MemoryMessage(\n", + " role=\"user\",\n", + " content=\"Yes! I'd love to hike Mount Fuji and find good vegetarian ramen.\",\n", + " ),\n", + " MemoryMessage(\n", + " role=\"assistant\",\n", + " content=\"Perfect! I'll remember your interest in Mount Fuji. For vegetarian ramen, Kyoto has excellent options.\",\n", + " ),\n", "]\n", "\n", "updated_memory = WorkingMemory(\n", - " session_id=SESSION_ID,\n", - " namespace=NAMESPACE,\n", - " messages=messages,\n", - " user_id=USER_ID\n", + " session_id=SESSION_ID, namespace=NAMESPACE, messages=messages, user_id=USER_ID\n", ")\n", "\n", "response = await client.put_working_memory(SESSION_ID, updated_memory)\n", @@ -949,7 +947,7 @@ " text=\"travel preferences\", # Broad search\n", " namespace={\"eq\": \"travel_agent\"},\n", " user_id={\"eq\": \"nitin\"},\n", - " limit=20\n", + " limit=20,\n", ")\n", "\n", "print(f\"\\nFound {all_memories.total} memories:\\n\")\n", @@ -957,7 +955,7 @@ "for idx, memory in enumerate(all_memories.memories, 1):\n", " # Calculate relevance score (1 - distance)\n", " relevance = (1 - memory.dist) * 100 if memory.dist else 0\n", - " \n", + "\n", " print(f\"{idx}. [{relevance:.0f}% relevant]\")\n", " print(f\" ID: {memory.id}\")\n", " print(f\" Text: {memory.text}\")\n", @@ -1118,9 +1116,11 @@ "# Step 2: Send tools to LLM with the conversation\n", "# The LLM will decide whether to use memory tools\n", "\n", - "import openai\n", "import os\n", "\n", + "import openai\n", + "\n", + "\n", "# Check if OpenAI API key is available\n", "OPENAI_API_KEY = os.environ.get(\"OPENAI_API_KEY\")\n", "if not OPENAI_API_KEY:\n", @@ -1182,9 +1182,9 @@ " model=\"gpt-4o-mini\",\n", " messages=[\n", " {\"role\": \"system\", \"content\": system_prompt},\n", - " {\"role\": \"user\", \"content\": user_message}\n", + " {\"role\": \"user\", \"content\": user_message},\n", " ],\n", - " tools=tools.to_list() # Pass memory tools to LLM\n", + " tools=tools.to_list(), # Pass memory tools to LLM\n", ")\n", "\n", "# Check if LLM decided to use any tools\n", @@ -1227,7 +1227,7 @@ "# Our code executes what the LLM decided\n", "\n", "tool_results = []\n", - " \n", + "\n", "for tool_call in message.tool_calls:\n", " print(f\"\\nExecuting: {tool_call.function.name}\")\n", "\n", @@ -1238,16 +1238,18 @@ " function_arguments=json.loads(tool_call.function.arguments),\n", " session_id=SESSION_ID_LLM,\n", " namespace=NAMESPACE,\n", - " user_id=USER_ID_LLM\n", + " user_id=USER_ID_LLM,\n", " )\n", "\n", " formatted = result.get(\"formatted_response\", str(result))\n", " print(f\" Result: {formatted}\")\n", - " tool_results.append({\n", - " \"tool_call_id\": tool_call.id,\n", - " \"role\": \"tool\",\n", - " \"content\": json.dumps(result.get(\"formatted_response\", result))\n", - " })\n", + " tool_results.append(\n", + " {\n", + " \"tool_call_id\": tool_call.id,\n", + " \"role\": \"tool\",\n", + " \"content\": json.dumps(result.get(\"formatted_response\", result)),\n", + " }\n", + " )\n", "\n", "print(f\"\\nExecuted {len(tool_results)} tool call(s)\")" ] @@ -1320,16 +1322,26 @@ "messages_with_results = [\n", " {\"role\": \"system\", \"content\": system_prompt},\n", " {\"role\": \"user\", \"content\": user_message},\n", - " {\"role\": \"assistant\", \"content\": None, \"tool_calls\": [\n", - " {\"id\": tc.id, \"type\": \"function\", \"function\": {\"name\": tc.function.name, \"arguments\": tc.function.arguments}}\n", - " for tc in message.tool_calls\n", - " ]},\n", + " {\n", + " \"role\": \"assistant\",\n", + " \"content\": None,\n", + " \"tool_calls\": [\n", + " {\n", + " \"id\": tc.id,\n", + " \"type\": \"function\",\n", + " \"function\": {\n", + " \"name\": tc.function.name,\n", + " \"arguments\": tc.function.arguments,\n", + " },\n", + " }\n", + " for tc in message.tool_calls\n", + " ],\n", + " },\n", "] + tool_results\n", "\n", "# Get final response\n", "final_response = await openai_client.chat.completions.create(\n", - " model=\"gpt-4o-mini\",\n", - " messages=messages_with_results\n", + " model=\"gpt-4o-mini\", messages=messages_with_results\n", ")\n", "\n", "print(\"Final LLM Response:\")\n", @@ -1474,6 +1486,7 @@ "\n", "from agent_memory_client.models import MemoryStrategyConfig\n", "\n", + "\n", "SESSION_ID_AUTO = \"nitin-auto-session\"\n", "USER_ID_AUTO = \"nitin\"\n", "\n", @@ -1484,7 +1497,7 @@ " # Configure automatic extraction\n", " long_term_memory_strategy=MemoryStrategyConfig(\n", " strategy=\"discrete\" # Extract individual facts (default)\n", - " )\n", + " ),\n", ")\n", "\n", "print(f\"Session created with automatic extraction: {SESSION_ID_AUTO}\")\n", @@ -1516,10 +1529,22 @@ "source": [ "# Step 2: Just store the conversation - extraction happens automatically!\n", "conversation = [\n", - " MemoryMessage(role=\"user\", content=\"I'm Nitin. I'm planning a hiking trip to Japan and need vegetarian food options.\"),\n", - " MemoryMessage(role=\"assistant\", content=\"Great choice! Japan has amazing hiking trails and excellent vegetarian cuisine.\"),\n", - " MemoryMessage(role=\"user\", content=\"I prefer nice hotels with good amenities, not too fancy but comfortable. All depends on the budget.\"),\n", - " MemoryMessage(role=\"assistant\", content=\"Noted! I'll remember your preference for comfortable mid-tier accommodations.\")\n", + " MemoryMessage(\n", + " role=\"user\",\n", + " content=\"I'm Nitin. I'm planning a hiking trip to Japan and need vegetarian food options.\",\n", + " ),\n", + " MemoryMessage(\n", + " role=\"assistant\",\n", + " content=\"Great choice! Japan has amazing hiking trails and excellent vegetarian cuisine.\",\n", + " ),\n", + " MemoryMessage(\n", + " role=\"user\",\n", + " content=\"I prefer nice hotels with good amenities, not too fancy but comfortable. All depends on the budget.\",\n", + " ),\n", + " MemoryMessage(\n", + " role=\"assistant\",\n", + " content=\"Noted! I'll remember your preference for comfortable mid-tier accommodations.\",\n", + " ),\n", "]\n", "\n", "working_memory_update = WorkingMemory(\n", @@ -1530,8 +1555,7 @@ " # Strategy is already configured on the session\n", ")\n", "\n", - "await client.put_working_memory(SESSION_ID_AUTO, working_memory_update)\n", - "\n" + "await client.put_working_memory(SESSION_ID_AUTO, working_memory_update)" ] }, { @@ -1590,18 +1614,18 @@ "Format each as a clear, standalone statement.\n", "Current datetime: {current_datetime}\n", "Conversation: {message}\"\"\"\n", - " }\n", + " },\n", ")\n", "\n", "# Create a new session with custom extraction\n", "created, custom_memory = await client.get_or_create_working_memory(\n", " session_id=\"custom-extraction-demo\",\n", " namespace=NAMESPACE,\n", - " long_term_memory_strategy=custom_strategy\n", + " long_term_memory_strategy=custom_strategy,\n", ")\n", "\n", "print(f\"Session with custom extraction: {'created' if created else 'exists'}\")\n", - "print(f\"Custom prompt configured for specialized extraction\")" + "print(\"Custom prompt configured for specialized extraction\")" ] }, { @@ -2119,8 +2143,7 @@ "session_to_check = \"nitin-travel-session\"\n", "\n", "_, wm = await client.get_or_create_working_memory(\n", - " session_id=session_to_check, \n", - " namespace=\"travel_agent\"\n", + " session_id=session_to_check, namespace=\"travel_agent\"\n", ")\n", "print(f\"Session: {wm.session_id}\")\n", "print(f\"Messages: {len(wm.messages)}\")\n", @@ -2159,9 +2182,9 @@ " session_id=\"nitin-travel-session\",\n", " data={\n", " \"user_preferences\": {\"theme\": \"dark\", \"language\": \"en\"},\n", - " \"trip_context\": {\"destination\": \"Japan\", \"dates\": \"March 2026\"}\n", + " \"trip_context\": {\"destination\": \"Japan\", \"dates\": \"March 2026\"},\n", " },\n", - " namespace=\"travel_agent\"\n", + " namespace=\"travel_agent\",\n", ")\n", "print(\"Custom data stored in working memory!\")" ] @@ -2219,13 +2242,15 @@ "source": [ "# Search long-term memories with semantic search\n", "import time\n", + "\n", + "\n", "time.sleep(2) # Wait for indexing\n", "\n", "results = await client.search_long_term_memory(\n", " text=\"What are Nitin's travel preferences?\",\n", " namespace={\"eq\": \"travel_agent\"},\n", " user_id={\"eq\": \"nitin\"},\n", - " limit=5\n", + " limit=5,\n", ")\n", "\n", "print(f\"Found {results.total} memories for user 'nitin':\")\n", @@ -2266,12 +2291,13 @@ "# Search with filters\n", "from agent_memory_client.filters import Topics, UserId\n", "\n", + "\n", "results = await client.search_long_term_memory(\n", " text=\"travel preferences\",\n", " topics=Topics(any=[\"travel\", \"preferences\"]),\n", " user_id=UserId(eq=\"nitin\"),\n", " namespace={\"eq\": \"travel_agent\"},\n", - " limit=10\n", + " limit=10,\n", ")\n", "\n", "print(f\"Filtered search found {results.total} memories\")\n", @@ -2330,15 +2356,16 @@ ], "source": [ "# Create an episodic memory with event date\n", - "from datetime import timedelta\n", + "from datetime import UTC, timedelta\n", + "\n", "\n", "episodic_memory = ClientMemoryRecord(\n", " text=\"User completed a 10-mile hike at Rocky Mountain National Park\",\n", " memory_type=MemoryTypeEnum.EPISODIC,\n", " topics=[\"activities\", \"hiking\", \"achievements\"],\n", " entities=[\"Rocky Mountain National Park\"],\n", - " event_date=datetime.now(timezone.utc) - timedelta(days=7),\n", - " namespace=NAMESPACE\n", + " event_date=datetime.now(UTC) - timedelta(days=7),\n", + " namespace=NAMESPACE,\n", ")\n", "\n", "result = await client.create_long_term_memory([episodic_memory])\n", @@ -2407,26 +2434,26 @@ " \"type\": \"semantic\",\n", " \"text\": \"User prefers SUVs for car rentals\",\n", " \"topics\": [\"travel\", \"vehicles\", \"preferences\"],\n", - " \"entities\": [\"SUV\"]\n", + " \"entities\": [\"SUV\"],\n", " },\n", " {\n", " \"type\": \"semantic\",\n", " \"text\": \"User prefers darker vehicle colors\",\n", " \"topics\": [\"preferences\", \"vehicles\"],\n", - " \"entities\": []\n", + " \"entities\": [],\n", " },\n", " {\n", " \"type\": \"episodic\",\n", " \"text\": \"User planning trip to Colorado in March 2026\",\n", " \"topics\": [\"travel\", \"trips\"],\n", - " \"entities\": [\"Colorado\", \"March 2026\"]\n", + " \"entities\": [\"Colorado\", \"March 2026\"],\n", " },\n", " {\n", " \"type\": \"semantic\",\n", " \"text\": \"User's car rental budget is $75-100 per day\",\n", " \"topics\": [\"budget\", \"travel\"],\n", - " \"entities\": []\n", - " }\n", + " \"entities\": [],\n", + " },\n", "]\n", "\n", "print(\"DISCRETE Strategy Results:\")\n", @@ -2452,7 +2479,7 @@ "outputs": [], "source": [ "# Custom prompt for vehicle rental extraction\n", - "vehicle_rental_custom_prompt = '''\n", + "vehicle_rental_custom_prompt = \"\"\"\n", "You are a vehicle rental preference extractor. Extract DETAILED vehicle preferences.\n", "\n", "CURRENT CONTEXT:\n", @@ -2477,7 +2504,7 @@ "\n", "Conversation:\n", "{message}\n", - "'''\n", + "\"\"\"\n", "\n", "print(\"Custom prompt configured for granular vehicle preference extraction\")" ] @@ -2494,56 +2521,56 @@ " \"type\": \"semantic\",\n", " \"text\": \"User prefers Toyota 4Runner or Jeep Grand Cherokee for rentals\",\n", " \"topics\": [\"vehicles\", \"preferences\", \"SUV\"],\n", - " \"entities\": [\"Toyota 4Runner\", \"Jeep Grand Cherokee\"]\n", + " \"entities\": [\"Toyota 4Runner\", \"Jeep Grand Cherokee\"],\n", " },\n", " {\n", " \"type\": \"semantic\",\n", " \"text\": \"User prefers charcoal gray or navy blue vehicle colors\",\n", " \"topics\": [\"preferences\", \"colors\"],\n", - " \"entities\": [\"charcoal gray\", \"navy blue\"]\n", + " \"entities\": [\"charcoal gray\", \"navy blue\"],\n", " },\n", " {\n", " \"type\": \"semantic\",\n", " \"text\": \"User requires 4WD capability for mountain driving\",\n", " \"topics\": [\"requirements\", \"features\"],\n", - " \"entities\": [\"4WD\"]\n", + " \"entities\": [\"4WD\"],\n", " },\n", " {\n", " \"type\": \"semantic\",\n", " \"text\": \"User requires heated seats in rental vehicles\",\n", " \"topics\": [\"requirements\", \"comfort\"],\n", - " \"entities\": [\"heated seats\"]\n", + " \"entities\": [\"heated seats\"],\n", " },\n", " {\n", " \"type\": \"semantic\",\n", " \"text\": \"User's rental budget is $75-100 per day for 5 days ($375-500 total)\",\n", " \"topics\": [\"budget\", \"pricing\"],\n", - " \"entities\": [\"$75-100/day\", \"5 days\"]\n", + " \"entities\": [\"$75-100/day\", \"5 days\"],\n", " },\n", " {\n", " \"type\": \"episodic\",\n", " \"text\": \"User needs rental starting March 15, 2026 for Colorado trip\",\n", " \"topics\": [\"booking\", \"dates\"],\n", - " \"entities\": [\"March 15, 2026\", \"Colorado\"]\n", + " \"entities\": [\"March 15, 2026\", \"Colorado\"],\n", " },\n", " {\n", " \"type\": \"semantic\",\n", " \"text\": \"User requires roof rack for ski equipment\",\n", " \"topics\": [\"equipment\", \"requirements\"],\n", - " \"entities\": [\"roof rack\", \"ski equipment\"]\n", + " \"entities\": [\"roof rack\", \"ski equipment\"],\n", " },\n", " {\n", " \"type\": \"semantic\",\n", " \"text\": \"User requires Apple CarPlay in rental vehicle\",\n", " \"topics\": [\"technology\", \"requirements\"],\n", - " \"entities\": [\"Apple CarPlay\"]\n", + " \"entities\": [\"Apple CarPlay\"],\n", " },\n", " {\n", " \"type\": \"semantic\",\n", " \"text\": \"User prefers Denver airport pickup over downtown locations\",\n", " \"topics\": [\"pickup\", \"preferences\"],\n", - " \"entities\": [\"Denver airport\"]\n", - " }\n", + " \"entities\": [\"Denver airport\"],\n", + " },\n", "]\n", "\n", "print(\"CUSTOM Strategy Results:\")\n", @@ -2672,9 +2699,9 @@ "if cleanup:\n", " sessions_to_delete = [\n", " \"nitin-travel-session\",\n", - " \"nitin-llm-session\", \n", + " \"nitin-llm-session\",\n", " \"nitin-auto-session\",\n", - " \"custom-extraction-demo\"\n", + " \"custom-extraction-demo\",\n", " ]\n", " for sid in sessions_to_delete:\n", " try:\n", diff --git a/tests/test_long_term_memory.py b/tests/test_long_term_memory.py index 1ce2464..8e074d5 100644 --- a/tests/test_long_term_memory.py +++ b/tests/test_long_term_memory.py @@ -326,22 +326,67 @@ async def test_extract_memory_structure(self, mock_async_redis_client): memory_type=MemoryTypeEnum.SEMANTIC, ) + # Simulate key exists (HSETEX FXX returns number of fields set) + mock_redis.hsetex.return_value = 2 + await extract_memory_structure(memory) # Verify extraction was called mock_extract.assert_called_once_with("Test text content") - # Verify Redis was updated with topics and entities - mock_redis.hset.assert_called_once() - args, kwargs = mock_redis.hset.call_args - - # Check the key format - it includes the memory ID in the key structure - assert "memory_idx:" in args[0] and "test-id" in args[0] + # Verify HSETEX was called with FXX for atomic guard + mock_redis.hsetex.assert_called_once() + call_kwargs = mock_redis.hsetex.call_args + key = call_kwargs[0][0] + assert "memory_idx:" in key and "test-id" in key - # Check the mapping - must use pipe separator to match langchain-redis - mapping = kwargs["mapping"] + # Check pipe-separated values (Issue #156) and FXX flag + mapping = call_kwargs[1]["mapping"] assert mapping["topics"] == "topic1|topic2" assert mapping["entities"] == "entity1|entity2" + assert call_kwargs[1]["data_persist_option"] == "FXX" + assert call_kwargs[1]["keepttl"] is True + + @pytest.mark.asyncio + async def test_extract_memory_structure_skips_deleted_key( + self, mock_async_redis_client + ): + """Regression: extract_memory_structure must not recreate deleted keys. + + When semantic deduplication deletes a memory key between scheduling + and execution of the background extraction task, HSETEX with FXX + must detect the missing fields and skip the update to avoid orphaned + hashes. + """ + with ( + patch( + "agent_memory_server.long_term_memory.get_redis_conn" + ) as mock_get_redis, + patch( + "agent_memory_server.long_term_memory.handle_extraction" + ) as mock_extract, + ): + mock_redis = AsyncMock() + mock_get_redis.return_value = mock_redis + mock_extract.return_value = (["topic1"], ["entity1"]) + + # Simulate key does NOT exist (HSETEX FXX returns 0) + mock_redis.hsetex.return_value = 0 + + memory = MemoryRecord( + id="deleted-id", + text="This memory was deleted by dedup", + namespace="test-namespace", + memory_type=MemoryTypeEnum.SEMANTIC, + ) + + # Should complete without error, skipping the update + await extract_memory_structure(memory) + + # HSETEX was called (FXX handles the existence check) + mock_redis.hsetex.assert_called_once() + # hset should NOT have been called directly + mock_redis.hset.assert_not_called() @pytest.mark.asyncio async def test_count_long_term_memories(self, mock_async_redis_client): @@ -588,6 +633,66 @@ def mock_execute_command(command): # Should return count from final search assert remaining_count == 2 # Mocked total + def test_compact_semantic_duplicates_env_var(self, monkeypatch): + """Regression: COMPACT_SEMANTIC_DUPLICATES env var must affect Settings.""" + monkeypatch.setenv("COMPACT_SEMANTIC_DUPLICATES", "false") + from agent_memory_server.config import Settings + + s = Settings() + assert s.compact_semantic_duplicates is False + + @pytest.mark.asyncio + async def test_index_skips_semantic_dedup_when_disabled( + self, mock_async_redis_client + ): + """Regression: indexing-time semantic dedup must respect the setting. + + When compact_semantic_duplicates is False, index_long_term_memories + must NOT call deduplicate_by_semantic_search even when deduplicate=True. + """ + with ( + patch("agent_memory_server.long_term_memory.settings") as mock_settings, + patch( + "agent_memory_server.long_term_memory.get_memory_vector_db" + ) as mock_get_db, + patch( + "agent_memory_server.long_term_memory.deduplicate_by_semantic_search" + ) as mock_semantic_dedup, + patch( + "agent_memory_server.long_term_memory.get_background_tasks" + ) as mock_get_bg, + ): + mock_settings.compact_semantic_duplicates = False + mock_settings.generation_model = "test-model" + mock_settings.long_term_memory_index_name = "memory_records" + mock_settings.llm_task_timeout_minutes = 5 + mock_settings.enable_discrete_memory_extraction = False + + mock_adapter = AsyncMock() + mock_get_db.return_value = mock_adapter + mock_adapter.search_memories.return_value = ([], 0) + + mock_bg = MagicMock() + mock_get_bg.return_value = mock_bg + + memories = [ + MemoryRecord( + id="test-id", + text="test memory", + namespace="test", + memory_type=MemoryTypeEnum.SEMANTIC, + ) + ] + + await index_long_term_memories( + memories, + redis_client=mock_async_redis_client, + deduplicate=True, + ) + + # Semantic dedup should NOT be called when setting is disabled + mock_semantic_dedup.assert_not_called() + @pytest.mark.asyncio async def test_promote_working_memory_to_long_term(self, mock_async_redis_client): """Test promoting memories from working memory to long-term storage"""