Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions agent_memory_server/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
migrate_add_discrete_memory_extracted_2,
migrate_add_memory_hashes_1,
migrate_add_memory_type_3,
migrate_redis_key_naming_4,
)
from agent_memory_server.utils.redis import get_redis_conn

Expand Down Expand Up @@ -263,6 +264,82 @@ async def run_migration():
asyncio.run(run_migration())


@cli.command()
@click.option(
"--batch-size",
default=50,
type=click.IntRange(min=1),
help="Number of keys to rename per pipeline batch",
)
@click.option(
"--dry-run",
is_flag=True,
help="Show what would change without executing",
)
def migrate_redis_naming(batch_size: int, dry_run: bool):
"""
Migrate Redis key and index names from underscore to dash convention.

Renames keys:
memory_idx:* → memory-idx:*
working_memory:* → working-memory:*
auth_token:* → auth-token:*
auth_tokens:list → auth-tokens:list

Drops old indexes and rebuilds with new names.

Use --dry-run to see what would change without making modifications.
"""
import asyncio

from agent_memory_server.memory_vector_db import RedisVLMemoryVectorDatabase
from agent_memory_server.memory_vector_db_factory import get_memory_vector_db
from agent_memory_server.working_memory_index import rebuild_working_memory_index

configure_logging()

async def run_migration():
redis = await get_redis_conn()

if dry_run:
click.echo("Dry run — no changes will be made.\n")

counts = await migrate_redis_key_naming_4(
redis=redis, batch_size=batch_size, dry_run=dry_run
)

click.echo("\nKey rename summary:")
click.echo(f" memory_idx keys: {counts['memory_idx']}")
click.echo(f" working_memory keys: {counts['working_memory']}")
click.echo(f" auth_token keys: {counts['auth_token']}")
click.echo(f" auth_tokens:list: {counts['auth_tokens_list']}")
click.echo(f" indexes dropped: {counts['indexes_dropped']}")

if dry_run:
click.echo("\nNo changes were made (dry run).")
return

# Rebuild indexes with new names
click.echo("\nRebuilding indexes with new names...")

# Rebuild long-term memory index
db = await get_memory_vector_db()
if isinstance(db, RedisVLMemoryVectorDatabase):
index = db.index
await index.create(overwrite=True)
click.echo(f" Rebuilt long-term memory index: {index.name}")

# Rebuild working memory index
await rebuild_working_memory_index(redis)
click.echo(
f" Rebuilt working memory index: {settings.working_memory_index_name}"
)

click.echo("\nMigration completed successfully.")

asyncio.run(run_migration())


@cli.command()
@click.option("--port", default=settings.port, help="Port to run the server on")
@click.option("--host", default="0.0.0.0", help="Host to run the server on")
Expand Down
8 changes: 4 additions & 4 deletions agent_memory_server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ class Settings(BaseSettings):
)

# RedisVL configuration (used by default Redis factory)
redisvl_index_name: str = "memory_records"
redisvl_index_name: str = "memory-records"

# The server indexes messages in long-term memory by default. If this
# setting is enabled, we also extract discrete memories from message text
Expand All @@ -400,13 +400,13 @@ class Settings(BaseSettings):
# TODO: Adapt to memory database settings
redisvl_distance_metric: str = "COSINE"
redisvl_vector_dimensions: str = "1536"
redisvl_index_prefix: str = "memory_idx"
redisvl_index_prefix: str = "memory-idx"
redisvl_indexing_algorithm: str = "HNSW"

# Working Memory Index Settings
# Used for listing sessions via Redis Search instead of sorted sets
working_memory_index_name: str = "working_memory_idx"
working_memory_index_prefix: str = "working_memory:"
working_memory_index_name: str = "working-memory-idx"
working_memory_index_prefix: str = "working-memory:"

# Deduplication Settings (Store-Time)
# Distance threshold for semantic similarity when deduplicating at store time
Expand Down
2 changes: 1 addition & 1 deletion agent_memory_server/memory_vector_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ async def add_memories(self, memories: list[MemoryRecord]) -> list[str]:
memory_ids.append(memory.id)

# Load into Redis via RedisVL -- use id_field so keys are
# auto-generated with the index prefix (e.g. "memory_idx:<id>").
# auto-generated with the index prefix (e.g. "memory-idx:<id>").
# Do NOT pass explicit keys, as that bypasses the prefix.
await self._index.load(data_list, id_field="id_")
return memory_ids
Expand Down
165 changes: 165 additions & 0 deletions agent_memory_server/migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,168 @@ async def migrate_add_memory_type_3(redis: Redis | None = None) -> None:
migrated_count += 1

logger.info(f"Migration completed. Added memory_type to {migrated_count} memories")


async def migrate_redis_key_naming_4(
redis: Redis | None = None,
batch_size: int = 50,
dry_run: bool = False,
) -> dict[str, int]:
"""
Migration 4: Rename Redis keys and drop old indexes to use dash convention.

Renames:
- memory_idx:* → memory-idx:*
- working_memory:* → working-memory:*
- auth_token:* → auth-token:*
- auth_tokens:list → auth-tokens:list

Drops old indexes (without deleting data):
- memory_records
- working_memory_idx

Args:
redis: Optional Redis client
batch_size: Number of keys to rename per pipeline batch
dry_run: If True, only count keys without renaming

Returns:
Dict with counts per category
"""
logger.info("Starting Redis key naming migration (underscore → dash)")
redis = redis or await get_redis_conn()

counts: dict[str, int] = {
"memory_idx": 0,
"working_memory": 0,
"auth_token": 0,
"auth_tokens_list": 0,
"indexes_dropped": 0,
}

if batch_size < 1:
raise ValueError(f"batch_size must be >= 1, got {batch_size}")

# Migration status keys to skip (not renamed — the new-prefix keys are
# created fresh by the application after migration)
migration_status_keys = {
b"working_memory:migration:complete",
b"working-memory:migration:complete",
b"working_memory:migration:remaining",
b"working-memory:migration:remaining",
}

async def _scan_and_rename(
pattern: str, old_prefix: str, new_prefix: str, category: str
) -> int:
"""Scan for keys matching pattern and rename old_prefix to new_prefix."""
renamed = 0
cursor = 0
while True:
cursor, keys = await redis.scan(cursor=cursor, match=pattern, count=1000)
if not keys:
if cursor == 0:
break
continue

# Filter out migration status keys for working_memory category
if category == "working_memory":
keys = [k for k in keys if k not in migration_status_keys]

if not keys:
if cursor == 0:
break
continue

if dry_run:
renamed += len(keys)
else:
# Batch rename using pipeline with RENAMENX for idempotency
for i in range(0, len(keys), batch_size):
batch = keys[i : i + batch_size]
pipe = redis.pipeline()
for key in batch:
key_str = key.decode("utf-8") if isinstance(key, bytes) else key
new_key = key_str.replace(old_prefix, new_prefix, 1)
pipe.renamenx(key_str, new_key)
results = await pipe.execute()
renamed += sum(1 for r in results if r)

if cursor == 0:
break

return renamed

# 1. Rename memory_idx:* → memory-idx:*
counts["memory_idx"] = await _scan_and_rename(
"memory_idx:*", "memory_idx:", "memory-idx:", "memory_idx"
)
logger.info(
f"{'Would rename' if dry_run else 'Renamed'} {counts['memory_idx']} memory_idx keys"
)

# 2. Rename working_memory:* → working-memory:*
counts["working_memory"] = await _scan_and_rename(
"working_memory:*", "working_memory:", "working-memory:", "working_memory"
)
logger.info(
f"{'Would rename' if dry_run else 'Renamed'} {counts['working_memory']} working_memory keys"
)

# 3. Rename auth_token:* → auth-token:*
counts["auth_token"] = await _scan_and_rename(
"auth_token:*", "auth_token:", "auth-token:", "auth_token"
)
logger.info(
f"{'Would rename' if dry_run else 'Renamed'} {counts['auth_token']} auth_token keys"
)

# 4. Rename auth_tokens:list → auth-tokens:list (single key, idempotent)
if not dry_run:
exists = await redis.exists("auth_tokens:list")
if exists:
result = await redis.renamenx("auth_tokens:list", "auth-tokens:list")
if result:
counts["auth_tokens_list"] = 1
logger.info("Renamed auth_tokens:list → auth-tokens:list")
else:
logger.info(
"auth-tokens:list already exists, skipping auth_tokens:list rename"
)
else:
exists = await redis.exists("auth_tokens:list")
if exists:
counts["auth_tokens_list"] = 1
logger.info("Would rename auth_tokens:list → auth-tokens:list")

# 5. Drop old indexes (FT.DROPINDEX without DD flag preserves data)
if not dry_run:
for old_index_name in ("memory_records", "working_memory_idx"):
try:
await redis.execute_command("FT.DROPINDEX", old_index_name)
counts["indexes_dropped"] += 1
logger.info(f"Dropped old index '{old_index_name}'")
except Exception as e:
# Index may not exist, which is fine
if "Unknown index name" in str(e) or "Unknown Index name" in str(e):
logger.info(
f"Old index '{old_index_name}' does not exist, skipping"
)
else:
logger.warning(f"Failed to drop index '{old_index_name}': {e}")
else:
for old_index_name in ("memory_records", "working_memory_idx"):
try:
await redis.execute_command("FT.INFO", old_index_name)
counts["indexes_dropped"] += 1
logger.info(f"Would drop old index '{old_index_name}'")
except Exception:
logger.info(f"Old index '{old_index_name}' does not exist, skipping")

total = sum(counts.values())
logger.info(
f"Redis key naming migration {'(dry run) ' if dry_run else ''}complete. "
f"Total: {total} operations ({counts})"
)

return counts
6 changes: 3 additions & 3 deletions agent_memory_server/utils/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def working_memory_key(
) -> str:
"""Get the working memory key for a session."""
# Build key components, filtering out None values
key_parts = ["working_memory"]
key_parts = ["working-memory"]

if namespace:
key_parts.append(namespace)
Expand All @@ -98,9 +98,9 @@ def search_index_name() -> str:
@staticmethod
def auth_token_key(token_hash: str) -> str:
"""Get the auth token key for a hashed token."""
return f"auth_token:{token_hash}"
return f"auth-token:{token_hash}"

@staticmethod
def auth_tokens_list_key() -> str:
"""Get the key for the list of all auth tokens."""
return "auth_tokens:list"
return "auth-tokens:list"
63 changes: 42 additions & 21 deletions agent_memory_server/working_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
logger = logging.getLogger(__name__)

# Redis keys for migration status (shared across workers, persists across restarts)
MIGRATION_STATUS_KEY = "working_memory:migration:complete"
MIGRATION_REMAINING_KEY = "working_memory:migration:remaining"
MIGRATION_STATUS_KEY = "working-memory:migration:complete"
MIGRATION_REMAINING_KEY = "working-memory:migration:remaining"


async def check_and_set_migration_status(redis_client: Redis | None = None) -> bool:
Expand Down Expand Up @@ -62,32 +62,53 @@ async def check_and_set_migration_status(redis_client: Redis | None = None) -> b
)
return True

# Scan for working_memory:* keys of type STRING only
# This is much faster than scanning all keys and calling TYPE on each
# Scan for working-memory:* AND old working_memory:* keys of type STRING.
# If old-prefix keys still exist the key-naming migration hasn't run yet;
# we treat them the same as new-prefix string keys that need lazy migration.
cursor = 0
string_keys_found = 0
old_prefix_status_keys = {
"working_memory:migration:complete",
"working_memory:migration:remaining",
}

try:
while True:
# Use _type="string" to only get string keys directly
cursor, keys = await redis_client.scan(
cursor=cursor, match="working_memory:*", count=1000, _type="string"
)
for pattern in ("working-memory:*", "working_memory:*"):
cursor = 0
while True:
cursor, keys = await redis_client.scan(
cursor=cursor, match=pattern, count=1000, _type="string"
)

if keys:
# Filter out migration status keys (they're also strings)
keys = [
k
for k in keys
if (k.decode("utf-8") if isinstance(k, bytes) else k)
not in (MIGRATION_STATUS_KEY, MIGRATION_REMAINING_KEY)
]
string_keys_found += len(keys)

if cursor == 0:
break
if keys:
# Filter out migration status keys (they're also strings)
keys = [
k
for k in keys
if (k.decode("utf-8") if isinstance(k, bytes) else k)
not in (
MIGRATION_STATUS_KEY,
MIGRATION_REMAINING_KEY,
*old_prefix_status_keys,
)
]
string_keys_found += len(keys)

if cursor == 0:
break

if string_keys_found > 0:
# Check if any old-prefix keys exist and log a hint
old_cursor = 0
old_cursor, old_keys = await redis_client.scan(
cursor=old_cursor, match="working_memory:*", count=1
)
if old_keys:
logger.warning(
"Found working_memory:* keys with old underscore prefix. "
"Run 'agent-memory migrate-redis-naming' to rename them."
)

# Store the count in Redis for atomic decrement during lazy migration
await redis_client.set(MIGRATION_REMAINING_KEY, str(string_keys_found))
logger.info(
Expand Down
Loading
Loading