diff --git a/examples/dream/full_pipeline_example.py b/examples/dream/full_pipeline_example.py new file mode 100644 index 000000000..793e2d511 --- /dev/null +++ b/examples/dream/full_pipeline_example.py @@ -0,0 +1,187 @@ +""" +Dream pipeline example — from a day's conversations to a dream. + +Initializes the full MemOS stack, adds multi-session conversations +as memories, then runs the Dream pipeline with real LLM and graph_db: + motive formation → recall → reasoning → diary → persistence + +Usage: + python examples/dream/full_pipeline_example.py +""" + +from __future__ import annotations + +import json + +from memos.api import handlers +from memos.api.handlers.add_handler import AddHandler +from memos.api.handlers.base_handler import HandlerDependencies +from memos.api.product_models import APIADDRequest +from memos.plugins.manager import plugin_manager + + +def _print_json(title: str, value) -> None: + print(f"\n{'=' * 80}\n {title}\n{'=' * 80}") + + def _convert(v): + if hasattr(v, "model_dump"): + return v.model_dump(mode="json") + if isinstance(v, list): + return [_convert(i) for i in v] + if isinstance(v, dict): + return {k: _convert(i) for k, i in v.items()} + return v + + print(json.dumps(_convert(value), indent=2, ensure_ascii=False, default=str)) + + +# --------------------------------------------------------------------------- +# A day's conversations — the raw material for dreaming +# --------------------------------------------------------------------------- + +CONVERSATIONS = [ + { + "session_id": "weekly_report_start", + "messages": [ + { + "role": "user", + "content": "下周要开周会,帮我总结一下这周 Dream 和记忆系统相关的进展。", + }, + { + "role": "assistant", + "content": ( + "这周完成了检索实验、filter 接口调整、Dream pipeline 初版、" + "用户访谈和一些评估准备。下周可以继续优化召回质量、完善指标," + "并推进产品化验证。" + ), + }, + ], + }, + { + "session_id": "weekly_report_retry", + "messages": [ + {"role": "user", "content": "太像流水账了。我想讲得更有方向感,不只是列出做了什么。"}, + { + "role": "assistant", + "content": ( + "可以改成 OKR:目标是提升长期记忆系统质量,关键结果包括" + "召回效果提升、接口稳定、Dream 流程跑通、完成初步用户验证。" + ), + }, + ], + }, + { + "session_id": "future_planning_start", + "messages": [ + { + "role": "user", + "content": ( + "还是不对。下午我们换个问题:这个 AI 记忆系统未来到底应该往哪走?" + "是做更强的搜索,还是做能主动理解人的智能体?" + ), + }, + { + "role": "assistant", + "content": "短期建议提升搜索和召回,中期增加个性化画像,长期发展为主动式智能助理。", + }, + ], + }, + { + "session_id": "future_planning_retry", + "messages": [ + {"role": "user", "content": "这太泛了,谁都可以这么说。没有解释为什么非做这个不可。"}, + { + "role": "assistant", + "content": "可以表述为:我们要从被动检索走向主动记忆,让系统能在合适时间提醒用户重要信息。", + }, + ], + }, + { + "session_id": "filter_design", + "messages": [ + { + "role": "user", + "content": ( + "晚上再讨论一个具体方案:现在项目里召回出来很多记忆," + "但真正有用的很少。我想做一个新的 filter 模块,你觉得怎么设计?" + ), + }, + { + "role": "assistant", + "content": ( + "可以设计 relevance、importance、recency 三个分数," + "再接一个 reranker。接口上支持 metadata constraints," + "并加入用户反馈闭环。" + ), + }, + { + "role": "user", + "content": "这些都像零件。方案本身没有灵魂,我不知道怎么说服别人它重要。", + }, + ], + }, +] + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +USER_ID = "dream_demo_user_0002" +CUBE_ID = "dream_demo_cube_0002" + + +def main() -> None: + # 1. Initialize all components (LLM, graph_db, scheduler, dream plugin, ...) + print("Initializing server components...") + components = handlers.init_server() + dependencies = HandlerDependencies.from_init_server(components) + add_handler = AddHandler(dependencies) + + # 2. Add conversations as memories — hooks automatically accumulate Dream signals + _print_json("Step 1: Adding memories", {"conversations": len(CONVERSATIONS)}) + for i, conv in enumerate(CONVERSATIONS, 1): + req = APIADDRequest.model_validate( + { + "user_id": USER_ID, + "writable_cube_ids": [CUBE_ID], + "session_id": conv["session_id"], + "messages": conv["messages"], + "async_mode": "async", + } + ) + res = add_handler.handle_add_memories(req) + print(f" add #{i} (session={conv['session_id']}): {res.message}, data={len(res.data)}") + + # 3. Get the dream plugin and inspect signal state + dream_plugin = plugin_manager.plugins.get("dream") + if dream_plugin is None: + print("Dream plugin not loaded — exiting.") + return + + snapshot = dream_plugin.signal_store.snapshot( + mem_cube_id=CUBE_ID, + user_id=USER_ID, + user_name="DreamDemoUser", + ) + _print_json("Step 2: Dream signal snapshot", snapshot) + print(f" should_trigger: {dream_plugin.signal_store.should_trigger(mem_cube_id=CUBE_ID)}") + + # 4. Run the Dream pipeline directly (bypass scheduler, use real LLM + graph_db) + _print_json("Step 3: Running Dream pipeline", {"cube": CUBE_ID}) + results = dream_plugin.pipeline.run( + mem_cube_id=CUBE_ID, + user_id=USER_ID, + cube_id=CUBE_ID, + signal_snapshot=snapshot, + text_mem=None, + ) + _print_json("Pipeline results", results) + + # 5. Reset signals after Dream completes + dream_plugin.signal_store.reset(mem_cube_id=CUBE_ID) + print("\nSignal store reset. Done.") + + +if __name__ == "__main__": + main() diff --git a/src/memos/api/handlers/component_init.py b/src/memos/api/handlers/component_init.py index 3536cee09..03dcc8412 100644 --- a/src/memos/api/handlers/component_init.py +++ b/src/memos/api/handlers/component_init.py @@ -179,6 +179,7 @@ def init_server() -> dict[str, Any]: plugin_context = build_plugin_context( graph_db=graph_db, embedder=embedder, + llm=llm, default_cube_config=default_cube_config, nli_client_config=nli_client_config, mem_reader_config=mem_reader_config, @@ -285,6 +286,12 @@ def init_server() -> dict[str, Any]: # Initialize SchedulerAPIModule api_module = mem_scheduler.api_module + # Plugins keep a reference to the original context dict, so updating the shared + # section here makes the runtime handles visible without adding a second init step. + plugin_context["shared"]["mem_scheduler"] = mem_scheduler + plugin_context["shared"]["submit_scheduler_messages"] = mem_scheduler.submit_messages + plugin_context["shared"]["api_module"] = api_module + # Start scheduler if enabled if os.getenv("API_SCHEDULER_ON", "true").lower() == "true": mem_scheduler.start() diff --git a/src/memos/dream/README.md b/src/memos/dream/README.md new file mode 100644 index 000000000..ef28fdbb8 --- /dev/null +++ b/src/memos/dream/README.md @@ -0,0 +1,246 @@ +# Dream Plugin + +**MemOS Dream** is a motive-driven, offline memory consolidation system — it forms internal motives first, recalls heterogeneous memories around those motives, and persists the results as both **dream diaries** (explainable traces) and **dream memories** (affecting future behavior). + +> Without motives, it's batch summarization, not dreaming. +> Real dreams are driven by *"what's worth dreaming about"*, not *"what memories exist"*. + +## Design Philosophy + +Human dreams don't randomly compress the day's material. They revolve around: + +- **Repeatedly activated** things +- **Unfinished** goals +- **Emotionally intense** events +- **Conflicting** information +- **Anticipated future needs** + +Dream mirrors this: it starts from *unresolved internal motives*, not from a raw memory list. + +## Narrative Scenario: Dreaming One Day Into One Strategy + +This scenario explains why Dream is more than a nightly summary. During the day, the AI does not necessarily fail because each answer is wrong. It can fail because it treats one continuous user struggle as several unrelated tasks. + +In this example, the user first asks for help preparing next week's weekly meeting, then discusses the future direction of an AI memory system, and finally asks for a concrete design for a filter module. The AI gives locally reasonable answers, but the user remains dissatisfied because the three topics need one shared strategic narrative. + +Daytime chat input follows the codebase `MessageList` shape: each message is a dict with `role` and `content`. + +```json +[ + { + "role": "user", + "content": "I need to present in next week's weekly meeting. Help me summarize this week's progress on Dream and the memory system." + }, + { + "role": "assistant", + "content": "This week you completed retrieval experiments, filter interface changes, an initial Dream pipeline, user interviews, and evaluation preparation. Next week you can continue improving recall quality, metrics, and product validation." + }, + { + "role": "user", + "content": "This sounds like a task list. I need it to have direction, not just a list of what happened." + }, + { + "role": "assistant", + "content": "You can frame it as an OKR: the objective is improving long-term memory quality, with key results around retrieval quality, interface stability, Dream flow validation, and early user feedback." + }, + { + "role": "user", + "content": "Still not right. Let's switch topics: where should this AI memory system go in the future? Stronger search, or an agent that actively understands people?" + }, + { + "role": "assistant", + "content": "A practical roadmap is to improve search and recall in the short term, add personalization in the middle term, and evolve into a proactive assistant in the long term." + }, + { + "role": "user", + "content": "That's too generic. Anyone could say that. It doesn't explain why this direction is necessary." + }, + { + "role": "assistant", + "content": "You can describe the direction as moving from passive retrieval to proactive memory, where the system reminds users of important information at the right time." + }, + { + "role": "user", + "content": "Now let's discuss a concrete design. The current project recalls many memories, but only a few are actually useful. I want to build a new filter module. How should it work?" + }, + { + "role": "assistant", + "content": "You can combine relevance, importance, and recency scores, then add a reranker. The interface can support metadata constraints and a user feedback loop." + }, + { + "role": "user", + "content": "These are all components. The design has no soul. I don't know how to convince people that it matters." + } +] +``` + +The daytime failure is not caused by missing facts. It is caused by over-fragmented problem boundaries. The AI treats the weekly report as writing, the future plan as a roadmap, and the filter as an engineering module. It produces correct but shallow local answers. The real user need is different: how to turn this week's progress, the long-term direction, and the current module design into one coherent story. + +At night, Dream does not need to replay every message. It focuses on repeated failure signals: "too scattered", "too generic", "like components", and "no direction". Dream can cluster those memories into one motive: + +```json +{ + "motive_id": "motive:dream_memory_strategy_alignment", + "description": "Several conversations failed for the same hidden reason: weekly reporting, future planning, and filter design were treated as separate tasks, while the user needed a shared strategic narrative.", + "memory_ids": ["weekly_report_thread", "future_planning_thread", "filter_design_thread"] +} +``` + +In the dream, the AI sees three tables. On the first table is the weekly report, full of completed tasks but without a title. On the second is the future roadmap, long and ambitious but with no starting point. On the third is the filter architecture, full of valves, scores, and rerankers, but it is unclear whose pain this machine is meant to solve. + +The AI first tries to patch each table separately: add a better title to the report, add a vision to the roadmap, add formulas to the filter. Each patch collapses because it still does not answer the same hidden question. Then one sentence appears in the dream: + +> The user does not want an AI that searches better. The user wants an AI that knows what is worth remembering, when it should be recalled, and why it matters now. + +The global conclusion after waking is: + +> This week's work can be unified around the "memory selection layer". In the short term, it is a filter that selects truly useful memories from many candidates. In the middle term, it becomes a reflection mechanism that turns daytime failures, conflicts, and fragments into insights. In the long term, it is the starting point for moving AI from passive search toward active cognition. + +With that conclusion, the AI can rewrite all three answers the next morning. + +The weekly report becomes: + +> The core finding this week is that the bottleneck of long-term memory is not only whether the system can recall more content, but whether it can judge which memories are truly important in the current context. Around this finding, we ran retrieval experiments, user interviews, filter interface updates, and the first Dream pipeline validation. Together, these efforts point to a new middle layer: the memory selection layer. + +The future plan becomes: + +> The future system should not only be searchable memory. It should become reflective memory. It should not only store the past, but continuously judge which experiences are becoming patterns, which failures deserve reflection, and which information should proactively surface later. + +The filter design becomes: + +> The filter is the smallest implementation of the memory selection layer. Version one uses relevance, importance, recency, and the current user goal for explainable selection. Version two adds user feedback to learn which memories were actually used. Version three connects to Dream, so fragmented daytime memories that were not understood in the moment can be reorganized into new insights at night. + +In this example, Dream is not mystical inspiration. It is offline problem reframing. It treats the user's three dissatisfied reactions as one system signal: the user did not need isolated advice, but a cognitive throughline connecting reporting, strategy, and engineering design. + +## Pipeline + +``` + STEP 1 STEP 2 STEP 3 STEP 4 + Form Motives ──► Recall Around ──► Directed Dream ──► Persist + (why dream?) Motives (consolidate) (diary + memory) + (cross-type) +``` + +| Step | Stage Class | What It Does | +|------|-------------|--------------| +| 1 | `MotiveFormation` | LLM-powered: cluster pending memories into dream motives by identifying cross-conversation patterns, unresolved tensions, and repeated themes. Falls back to single-cluster heuristic without LLM. | +| 2 | `DirectRecall` | Use source-memory embeddings to recall related memories from `UserMemory` and `LongTermMemory` scopes. Results are deduplicated and ranked by similarity. | +| 3 | `ConsolidationReasoning` | LLM-powered deep dreaming: combine source and recalled memories, ask the LLM to reframe problems and produce concrete insights. Output: `DreamAction` (CREATE → `InsightMemory`) with hypothetical-deduction rationale. Falls back to placeholder without LLM. | +| 4a | `StructuredDiarySummary` | Package reasoning output into a human-readable diary entry (title, summary, dream content, motive context). Deterministic — no additional LLM call. | +| 4b | `DreamPersistence` | Execute DreamActions against `graph_db` (create/update/merge/archive across memory types) + persist diary. Fires `dream.before_persist` / `dream.after_persist` hooks. | + +All four stages are fully implemented. Steps 1 and 3 are LLM-powered (each with dedicated prompts in `prompts/`); step 2 uses embedding-based similarity search; steps 4a and 4b are deterministic. When the LLM is unavailable, steps 1 and 3 fall back to simple heuristics. Each stage can be independently replaced — see [Contributing](#contributing). + +## Triggering + +``` +add memory ──► on_add_signal ──► DreamSignalStore ──► threshold reached? + (accumulate IDs) ├─ yes → submit scheduler task → pipeline + └─ no → keep accumulating +``` + +Manual trigger is also available via `POST /dream/trigger/cube`. + +## Directory Structure + +``` +dream/ +├── plugin.py # Plugin entry point, wiring & registration +├── hooks.py # Hook handlers (signal capture + execution orchestration) +├── hook_defs.py # Plugin-scoped hooks (before/after persist) +├── maintenance.py # Lifecycle maintenance (contribution entry point) +├── signal_store.py # In-memory signal accumulator +├── types.py # Data models (DreamAction, DreamResult, DreamMemoryLifecycle, etc.) +├── pipeline/ +│ ├── base.py # Pipeline orchestrator +│ ├── motive.py # Stage 1 — motive formation +│ ├── recall.py # Stage 2 — cross-type recall +│ ├── reasoning.py # Stage 3 — consolidation reasoning (produces DreamActions) +│ ├── diary.py # Stage 4a — diary generation +│ └── persistence.py # Stage 4b — memory write-back + diary persistence +├── prompts/ +│ ├── motive_prompt.py # Motive formation prompt +│ └── reasoning_prompt.py # Consolidation reasoning prompt +└── routers/ + ├── trigger_router.py # POST /dream/trigger/cube + └── diary_router.py # POST /dream/diary +``` + +## API + +### Dream plugin endpoints + +| Method | Endpoint | Description | +|--------|----------|-------------| +| POST | `/dream/trigger/cube` | Manually trigger a Dream run for a cube | +| POST | `/dream/diary` | Query diary entries with filter | +| GET | `/dream/diary/health` | Plugin status & scheduler connectivity | + +### External endpoints that feed Dream signals + +| Method | Endpoint | How it connects | +|--------|----------|-----------------| +| POST | `/add` | `@hookable("add")` → `add.after` hook → `on_add_signal` accumulates memory IDs | +| POST | `/chat/complete`, `/chat/stream` | Chat internally calls `handle_add_memories` → same `add.after` hook chain | + +### Internal + +| Component | How it connects | +|-----------|-----------------| +| `MemDreamMessageHandler` (scheduler) | Consumes dream tasks → `trigger_single_hook(H.DREAM_EXECUTE)` → pipeline | + +### Query example + +```json +POST /dream/diary +{ + "cube_id": "user_123", + "filter": { "created_after": "2026-05-06", "limit": 5 } +} +``` + +## Persistence Design + +### Two-track write + +Dream persistence produces **two kinds of output**: + +1. **Memory store write-back** — `DreamAction` mutations applied to the heterogeneous memory system: + - `LongTermMemory` / `UserMemory` + - `SkillMemory` + - `ProfileMemory` + - `PreferenceMemory` + - `InsightMemory` + +2. **Dream Diary** — an explainable trace stored in `graph_db` and queryable via the diary API. + +### Persistence conditions + +A `DreamAction` is only persisted when: + +- **Hypothetical deduction passes**: the `rationale` field must demonstrate that a concrete question can be answered better with this memory. Empty rationale → action is skipped. +- **Confidence > 0**: the reasoning stage must assign non-zero confidence. + +### Lifecycle maintenance + +Each Dream-produced memory carries `DreamMemoryLifecycle` metadata (defined in `types.py`) and is designed for periodic maintenance. **The data model is in place but the maintenance logic is not yet implemented** — see `maintenance.py` for the contribution guide. + +| Condition | Action | +|-----------|--------| +| Long time not hit (`last_hit_at` stale) | Decay / archive | +| Hit but low usefulness (`usefulness_score` below threshold) | Archive | +| Overturned by feedback (`invalidated_by_feedback = true`) | Immediate archive | + +## Contributing + +Each stage can be replaced independently: + +| Want to improve… | Start here | Implement | +|------------------|------------|-----------| +| Motive detection — add signal sources beyond newness (conflict, frequency, feedback) | `motive.py`, `prompts/motive_prompt.py` | `form()` | +| Recall scope — extend beyond UserMemory / LongTermMemory | `recall.py` | `gather()` | +| Reasoning depth — multi-strategy or multi-action output | `reasoning.py`, `prompts/reasoning_prompt.py` | `reason()` | +| Diary narrative — LLM-generated prose instead of structured packaging | `diary.py` | `generate()` | +| Persistence logic — validation, conflict detection before write | `persistence.py` | `persist()` | +| Lifecycle maintenance (not yet implemented) | `maintenance.py` | `run_maintenance()` | +| Signal policies (dedup, decay, cooldown) | `signal_store.py` | `record_add()` / `should_trigger()` | diff --git a/src/memos/dream/README_ZH.md b/src/memos/dream/README_ZH.md new file mode 100644 index 000000000..a83beab42 --- /dev/null +++ b/src/memos/dream/README_ZH.md @@ -0,0 +1,243 @@ +# Dream 插件 + +**MemOS Dream** 是一个由动机驱动的离线记忆重组系统——先形成内在动机,再围绕动机对异构记忆做定向召回与整合,最终以**梦境日记**(可解释的追溯)和**梦境记忆**(影响未来行为)两条线分别持久化。 + +> 没有"梦因"的 Dream 只是批处理总结,不是真正的梦。 +> 真正的梦从"什么值得梦"出发,而不是从"有哪些记忆"出发。 + +## 设计理念 + +人脑的梦不是把白天的材料随机压缩一遍,而是围绕几类东西打转: + +- **反复激活**过的东西 +- **没解决完**的目标 +- **情绪强烈**的事件 +- **冲突很大**的信息 +- **预期未来还会用到**的东西 + +Dream 模拟这一点:从*未完成的内在动机*出发,而不是从原始记忆列表出发。 + +## 叙事场景:把一天里分散的问题梦成一条主线 + +这个场景用于说明 Dream 为什么不是简单的夜间总结。白天的 AI 不一定是因为每个答案都错了才失败,它也可能是因为把用户连续的困扰拆成了几个互不相关的任务。 + +在这个例子里,用户先让 AI 帮忙准备下周周会,又讨论 AI 记忆系统的未来方向,最后讨论当前项目里的 filter 模块方案。AI 给出的每个回答单独看都有道理,但用户始终不满意,因为三个话题真正需要的是同一条战略主线。 + +```json +[ + { + "role": "user", + "content": "下周要开周会,帮我总结一下这周 Dream 和记忆系统相关的进展。" + }, + { + "role": "assistant", + "content": "这周完成了检索实验、filter 接口调整、Dream pipeline 初版、用户访谈和一些评估准备。下周可以继续优化召回质量、完善指标,并推进产品化验证。" + }, + { + "role": "user", + "content": "太像流水账了。我想讲得更有方向感,不只是列出做了什么。" + }, + { + "role": "assistant", + "content": "可以改成 OKR:目标是提升长期记忆系统质量,关键结果包括召回效果提升、接口稳定、Dream 流程跑通、完成初步用户验证。" + }, + { + "role": "user", + "content": "还是不对。下午我们换个问题:这个 AI 记忆系统未来到底应该往哪走?是做更强的搜索,还是做能主动理解人的智能体?" + }, + { + "role": "assistant", + "content": "短期建议提升搜索和召回,中期增加个性化画像,长期发展为主动式智能助理。" + }, + { + "role": "user", + "content": "这太泛了,谁都可以这么说。没有解释为什么非做这个不可。" + }, + { + "role": "assistant", + "content": "可以表述为:我们要从被动检索走向主动记忆,让系统能在合适时间提醒用户重要信息。" + }, + { + "role": "user", + "content": "晚上再讨论一个具体方案:现在项目里召回出来很多记忆,但真正有用的很少。我想做一个新的 filter 模块,你觉得怎么设计?" + }, + { + "role": "assistant", + "content": "可以设计 relevance、importance、recency 三个分数,再接一个 reranker。接口上支持 metadata constraints,并加入用户反馈闭环。" + }, + { + "role": "user", + "content": "这些都像零件。方案本身没有灵魂,我不知道怎么说服别人它重要。" + } +] +``` + +白天失败的原因不是信息不足,而是问题边界被切得太碎。AI 把周报当成文案,把未来规划当成路线图,把 filter 当成工程模块,于是分别给出正确但空泛的局部建议。用户真正想解决的是另一个问题:如何把本周进展、长期方向和当前方案讲成同一个故事。 + +夜间 Dream 触发时,不需要复盘所有消息,而是聚焦白天反复出现的失败信号:用户多次反馈“太散”“太泛”“像零件”“没有方向感”。Dream 可以将这些信号和对应记忆聚成一个 motive: + +```json +{ + "motive_id": "motive:dream_memory_strategy_alignment", + "description": "Several conversations failed for the same hidden reason: weekly reporting, future planning, and filter design were treated as separate tasks, while the user needed a shared strategic narrative.", + "memory_ids": ["weekly_report_thread", "future_planning_thread", "filter_design_thread"] +} +``` + +在梦里,AI 看到三张桌子。第一张桌子上是周报,写满完成事项,却没有标题;第二张桌子上是未来规划,路线很长,但起点是空的;第三张桌子上是 filter 架构图,阀门、分数、reranker 都在,却看不出这台机器要解决什么人的痛苦。 + +AI 一开始继续补局部答案:给周报加标题,给规划加愿景,给 filter 加评分公式。它们都很快塌掉,因为这些补丁仍然没有回答同一个核心问题。直到梦里出现一句话: + +> 用户不是要一个更会搜索的 AI,而是要一个知道什么事情值得记住、什么时候该想起来、为什么此刻重要的 AI。 + +梦醒后的全局结论是: + +> 这周所有工作都可以收束到“记忆选择层”。它短期是 filter,用来从大量候选记忆中选出当前真正有用的内容;中期是反思机制,用来把白天失败、冲突和碎片化记忆聚合成 insight;长期则是 AI 从被动搜索走向主动认知的起点。 + +因此,第二天 AI 可以同时改写三个答案。 + +周会汇报不再列流水账,而是说: + +> 本周的核心发现是,长期记忆系统的瓶颈不只是能否召回更多内容,而是能否判断哪些记忆在当前语境下真正重要。围绕这个发现,我们做了检索实验、用户访谈、filter 接口调整和 Dream pipeline 初版验证。这些工作共同指向一个新的中间层:记忆选择层。 + +未来规划不再是泛泛地说“短期搜索、中期画像、长期智能体”,而是说: + +> 未来的系统不应该只是 searchable memory,而应该成为 reflective memory。它不只是保存过去,还要持续判断哪些经历正在形成模式,哪些失败值得反思,哪些信息应该在未来某个时刻主动浮现。 + +filter 方案也不再只是零件列表,而是被重新定义为: + +> filter 是记忆选择层的最小实现。第一版用 relevance、importance、recency 和当前目标做可解释筛选;第二版接入用户反馈,学习哪些记忆真的被采用;第三版接入 Dream,把白天没能解释清楚的分散记忆在夜间重新组织成新的 insight。 + +这个例子里的 Dream 不是神秘灵感,而是一次离线的问题重构。它把“白天三次不满意”视为同一个系统信号:用户不是缺少单点建议,而是缺少一条跨越周报、战略和工程方案的认知主线。 + +## Pipeline + +``` + STEP 1 STEP 2 STEP 3 STEP 4 + 形成梦因 ──────► 围绕动机召回 ──► 定向做梦 ──────────► 持久化 + (为什么值得梦) (跨记忆类型) (重组/整合) (日记 + 记忆) +``` + +| 步骤 | 阶段类 | 做什么 | +|------|--------|--------| +| 1 | `MotiveFormation` | 由 LLM 分析待处理记忆,识别跨对话模式、未解决的张力和重复主题,分组为梦因簇。LLM 不可用时回退为单簇启发式策略。 | +| 2 | `DirectRecall` | 利用源记忆的 embedding 在 `UserMemory` 和 `LongTermMemory` 范围内做语义召回,结果去重并按相似度排序。 | +| 3 | `ConsolidationReasoning` | 由 LLM 驱动的深度做梦:组合源记忆与召回上下文,要求 LLM 重构问题并产出具体洞察。输出 `DreamAction`(CREATE → `InsightMemory`),附带假设演绎论证。LLM 不可用时回退为占位符。 | +| 4a | `StructuredDiarySummary` | 将推理输出包装为人类可读的日记条目(标题、摘要、梦境内容、动机上下文)。确定性逻辑,不额外调用 LLM。 | +| 4b | `DreamPersistence` | 对 `graph_db` 执行 DreamActions(create/update/merge/archive 跨记忆类型)+ 持久化日记。触发 `dream.before_persist` / `dream.after_persist` 钩子。 | + +四个阶段均已完整实现。步骤 1 和 3 由 LLM 驱动(各有专属 prompt,位于 `prompts/`);步骤 2 基于 embedding 语义召回;步骤 4a 和 4b 为确定性逻辑。当 LLM 不可用时,步骤 1 和 3 回退为简单启发式策略。每个阶段均可独立替换——见[参与贡献](#参与贡献)。 + +## 触发机制 + +``` +add memory ──► on_add_signal ──► DreamSignalStore ──► 达到阈值? + (累积 memory ID) ├─ 是 → 提交调度器任务 → pipeline + └─ 否 → 继续累积 +``` + +也可通过 `POST /dream/trigger/cube` 手动触发。 + +## 目录结构 + +``` +dream/ +├── plugin.py # 插件入口,组装与注册 +├── hooks.py # Hook 处理(信号捕获 + 执行编排) +├── hook_defs.py # 插件级 hook(持久化前/后) +├── maintenance.py # 生命周期维护(贡献入口) +├── signal_store.py # 内存信号累积器 +├── types.py # 数据模型(DreamAction、DreamResult、DreamMemoryLifecycle 等) +├── pipeline/ +│ ├── base.py # Pipeline 编排器 +│ ├── motive.py # 阶段 1 — 动机形成 +│ ├── recall.py # 阶段 2 — 跨类型召回 +│ ├── reasoning.py # 阶段 3 — 整合推理(产出 DreamActions) +│ ├── diary.py # 阶段 4a — 日记生成 +│ └── persistence.py # 阶段 4b — 记忆回写 + 日记持久化 +├── prompts/ +│ ├── motive_prompt.py # 动机形成 prompt +│ └── reasoning_prompt.py # 整合推理 prompt +└── routers/ + ├── trigger_router.py # POST /dream/trigger/cube + └── diary_router.py # POST /dream/diary +``` + +## API + +### Dream 插件自身端点 + +| 方法 | 端点 | 说明 | +|------|------|------| +| POST | `/dream/trigger/cube` | 手动触发指定 cube 的梦境任务 | +| POST | `/dream/diary` | 按条件查询梦境日记 | +| GET | `/dream/diary/health` | 插件状态与调度器连接情况 | + +### 外部 API(通过 hook 向 Dream 输送信号) + +| 方法 | 端点 | 关联方式 | +|------|------|---------| +| POST | `/add` | `@hookable("add")` → `add.after` hook → `on_add_signal` 累积 memory ID | +| POST | `/chat/complete`、`/chat/stream` | Chat 内部调用 `handle_add_memories` → 同样触发 `add.after` hook 链路 | + +### 内部组件 + +| 组件 | 关联方式 | +|------|---------| +| `MemDreamMessageHandler`(调度器) | 消费 dream 任务 → `trigger_single_hook(H.DREAM_EXECUTE)` → pipeline | + +### 查询示例 + +```json +POST /dream/diary +{ + "cube_id": "user_123", + "filter": { "created_after": "2026-05-06", "limit": 5 } +} +``` + +## 持久化设计 + +### 双轨写入 + +Dream 持久化产出**两种结果**: + +1. **记忆库记忆更新** — `DreamAction` 操作应用到异构记忆系统: + - `LongTermMemory` / `UserMemory` + - `SkillMemory` + - `ProfileMemory` + - `PreferenceMemory` + - `InsightMemory` + +2. **梦境日记更新** — 可解释的追溯记录,存入 `graph_db`,通过 diary API 查询。 + +### 持久化条件 + +一条 `DreamAction` 必须满足以下条件才会被持久化: + +- **假设演绎通过**:`rationale` 字段必须论证"某个具体问题有了这条记忆可以回答得更好"。空 rationale → 跳过。 +- **置信度 > 0**:推理阶段必须给出非零 confidence。 + +### 生命周期维护 + +每条 Dream 产出的记忆都附带 `DreamMemoryLifecycle` 元数据(定义在 `types.py`),设计用于定期维护清理。**数据模型已就位,但维护逻辑尚未实现**——见 `maintenance.py` 中的贡献指南。 + +| 条件 | 动作 | +|------|------| +| 长时间未被命中(`last_hit_at` 过期) | 衰减 / 归档 | +| 虽然被命中,但对 query 用处不大(`usefulness_score` 低于阈值) | 归档 | +| 被 feedback 等接口直接推翻(`invalidated_by_feedback = true`) | 立即归档 | + +## 参与贡献 + +每个模块都可以独立替换: + +| 想改进… | 起点文件 | 实现方法 | +|---------|---------|---------| +| 动机检测——增加 newness 之外的信号源(冲突、频率、反馈) | `motive.py`、`prompts/motive_prompt.py` | `form()` | +| 召回范围——扩展到 UserMemory / LongTermMemory 之外 | `recall.py` | `gather()` | +| 推理深度——多策略或多 action 输出 | `reasoning.py`、`prompts/reasoning_prompt.py` | `reason()` | +| 日记叙事——用 LLM 生成散文式叙事替代结构化包装 | `diary.py` | `generate()` | +| 持久化逻辑——写入前校验、冲突检测 | `persistence.py` | `persist()` | +| 定时维护清理(尚未实现) | `maintenance.py` | `run_maintenance()` | +| 信号策略(去重、衰减、冷却) | `signal_store.py` | `record_add()` / `should_trigger()` | diff --git a/src/memos/dream/__init__.py b/src/memos/dream/__init__.py new file mode 100644 index 000000000..b58c2a10c --- /dev/null +++ b/src/memos/dream/__init__.py @@ -0,0 +1,11 @@ +"""Built-in Dream plugin package. + +The default Dream implementation lives inside the core repository so it can be +discovered like any other MemOS plugin. Enterprise builds can swap the entry +point to an external plugin without changing the scheduler contract. +""" + +from memos.dream.plugin import CommunityDreamPlugin + + +__all__ = ["CommunityDreamPlugin"] diff --git a/src/memos/dream/hook_defs.py b/src/memos/dream/hook_defs.py new file mode 100644 index 000000000..eea0a4819 --- /dev/null +++ b/src/memos/dream/hook_defs.py @@ -0,0 +1,25 @@ +"""Dream plugin-owned hooks. + +These hooks are plugin-scoped and allow extensions to observe or modify Dream +persistence behavior without changing the core pipeline. +""" + +from memos.plugins.hook_defs import define_hook + + +class DreamH: + DREAM_BEFORE_PERSIST = "dream.before_persist" + DREAM_AFTER_PERSIST = "dream.after_persist" + + +define_hook( + DreamH.DREAM_BEFORE_PERSIST, + description="Allow plugins to inspect/modify Dream results before persistence", + params=["mem_cube_id", "results"], +) + +define_hook( + DreamH.DREAM_AFTER_PERSIST, + description="Allow plugins to react after Dream persistence completes", + params=["mem_cube_id", "results"], +) diff --git a/src/memos/dream/hooks.py b/src/memos/dream/hooks.py new file mode 100644 index 000000000..2f582c689 --- /dev/null +++ b/src/memos/dream/hooks.py @@ -0,0 +1,88 @@ +from __future__ import annotations + +from typing import Any + + +def on_dream_execute( + plugin, + *, + mem_cube_id: str, + user_id: str, + user_name: str, + signal_snapshot, + text_mem, + scheduler_context, + **kwargs, +): + """Single-provider Dream execution hook. + + The scheduler handler always calls this hook. The active Dream plugin decides + which pipeline implementation to run. After the pipeline finishes, the hook + resets the signal store so the next accumulation cycle starts fresh. + """ + + results = plugin.pipeline.run( + mem_cube_id=mem_cube_id, + user_id=user_id, + cube_id=mem_cube_id, + signal_snapshot=signal_snapshot, + text_mem=text_mem, + ) + plugin.signal_store.reset(mem_cube_id=mem_cube_id) + return results + + +def on_add_signal(plugin, *, request, result, **kwargs) -> None: + """Built-in Dream signal capture hook. + + The current implementation only extracts enough information to build a scheduler payload. + If a runtime scheduler handle is not available yet, the signal is still kept + locally so manual triggering and future auto-trigger logic use the same store. + """ + + mem_cube_id = _extract_mem_cube_id(request=request, result=result) + if not mem_cube_id: + return + + memory_ids = _extract_memory_ids(result) + snapshot = plugin.signal_store.record_add( + mem_cube_id=mem_cube_id, + user_id=getattr(request, "user_id", "") or "", + user_name=getattr(request, "user_name", "") or "", + session_id=getattr(request, "session_id", "") or "", + memory_ids=memory_ids, + ) + if plugin.signal_store.should_trigger(mem_cube_id=mem_cube_id): + plugin.submit_dream_task( + mem_cube_id=mem_cube_id, + user_id=snapshot.user_id, + user_name=snapshot.user_name, + signal_snapshot=snapshot, + ) + + +def _extract_mem_cube_id(*, request, result) -> str: + writable_cube_ids = getattr(request, "writable_cube_ids", None) or [] + if writable_cube_ids: + return writable_cube_ids[0] + + data = getattr(result, "data", None) or [] + for item in data: + if isinstance(item, dict) and item.get("mem_cube_id"): + return item["mem_cube_id"] + mem_cube_id = getattr(item, "mem_cube_id", None) + if mem_cube_id: + return mem_cube_id + return "" + + +def _extract_memory_ids(result) -> list[str]: + ids: list[str] = [] + data = getattr(result, "data", None) or [] + for item in data: + candidate: Any = ( + item.get("memory_id") if isinstance(item, dict) else getattr(item, "memory_id", None) + ) + if candidate: + ids.append(str(candidate)) + return ids diff --git a/src/memos/dream/maintenance.py b/src/memos/dream/maintenance.py new file mode 100644 index 000000000..db859c011 --- /dev/null +++ b/src/memos/dream/maintenance.py @@ -0,0 +1,56 @@ +"""Dream memory lifecycle maintenance. + +This module is responsible for periodic cleanup of Dream-produced memories. +It is intentionally left unimplemented as a contribution entry point. + +──────────────────────────────────────────────────────────────────────────── +CONTRIBUTION GUIDE — Dream Memory Maintenance +──────────────────────────────────────────────────────────────────────────── + +Dream memories are not permanent. Each Dream-produced memory carries +`DreamMemoryLifecycle` metadata (see types.py) and should be periodically +evaluated for continued relevance. + +Cleanup rules to implement: + +1. STALE — Long time not hit + - If `last_hit_at` is older than a configurable TTL (e.g. 7 days), + the memory should be decayed or archived. + - Suggested approach: query graph_db for nodes where + metadata.source == "dream" and lifecycle.last_hit_at < threshold. + +2. LOW USEFULNESS — Hit but not helpful + - If the memory is retrieved during queries but `usefulness_score` + remains below a threshold, it is noise rather than signal. + - Suggested approach: integrate with the retrieval layer to update + usefulness_score when a Dream memory is included in a response + but receives low relevance feedback. + +3. INVALIDATED — Overturned by feedback + - If `invalidated_by_feedback` is set to True (e.g. by the feedback + API or a contradicting new memory), archive immediately. + - Suggested approach: register a hook on the feedback endpoint to + mark conflicting Dream memories. + +Implementation hints: + +- Create a `DreamMaintenanceTask` that runs on a scheduler interval + (e.g. daily) or is triggered by a hook. +- Use `graph_db.update_node(id, None, {"status": "archived", ...})` to + archive memories that fail the above checks. +- Consider a gradual decay: lower confidence over time rather than + hard-deleting on first miss. +- The `DreamMemoryLifecycle` model in types.py already provides the + fields you need: `last_hit_at`, `hit_count`, `usefulness_score`, + `invalidated_by_feedback`, `status`. + +Entry point suggestion: + + class DreamMaintenanceStrategy: + def run_maintenance(self, *, user_name: str, mem_cube_id: str) -> MaintenanceReport: + ... + +Register it in plugin.py and wire it to a scheduler interval or a +dedicated API endpoint (e.g. POST /dream/maintenance/run). +──────────────────────────────────────────────────────────────────────────── +""" diff --git a/src/memos/dream/pipeline/__init__.py b/src/memos/dream/pipeline/__init__.py new file mode 100644 index 000000000..23736fddd --- /dev/null +++ b/src/memos/dream/pipeline/__init__.py @@ -0,0 +1,16 @@ +from memos.dream.pipeline.base import AbstractDreamPipeline +from memos.dream.pipeline.diary import StructuredDiarySummary +from memos.dream.pipeline.motive import MotiveFormation +from memos.dream.pipeline.persistence import DreamPersistence +from memos.dream.pipeline.reasoning import ConsolidationReasoning +from memos.dream.pipeline.recall import DirectRecall + + +__all__ = [ + "AbstractDreamPipeline", + "ConsolidationReasoning", + "DirectRecall", + "DreamPersistence", + "MotiveFormation", + "StructuredDiarySummary", +] diff --git a/src/memos/dream/pipeline/base.py b/src/memos/dream/pipeline/base.py new file mode 100644 index 000000000..b00015781 --- /dev/null +++ b/src/memos/dream/pipeline/base.py @@ -0,0 +1,88 @@ +from __future__ import annotations + +from typing import Any + + +class AbstractDreamPipeline: + """Minimal Dream pipeline orchestrator. + + Community implementations can swap any strategy object to add richer motive formation, + recall, reasoning, diary generation, or persistence without changing the scheduler contract. + """ + + def __init__( + self, + *, + motive_strategy, + recall_strategy, + reasoning_strategy, + diary_strategy, + persistence_strategy, + ) -> None: + self.motive_strategy = motive_strategy + self.recall_strategy = recall_strategy + self.reasoning_strategy = reasoning_strategy + self.diary_strategy = diary_strategy + self.persistence_strategy = persistence_strategy + self.context: dict[str, Any] = {} + + def bind_context(self, context: dict[str, Any]) -> None: + self.context = context + for component in ( + self.motive_strategy, + self.recall_strategy, + self.reasoning_strategy, + self.diary_strategy, + self.persistence_strategy, + ): + bind_context = getattr(component, "bind_context", None) + if callable(bind_context): + bind_context(context) + + def run( + self, + *, + mem_cube_id: str, + user_id: str, + cube_id: str, + signal_snapshot, + text_mem, + ): + # Step 1: build Dream clusters from the scheduler payload. + clusters = self.motive_strategy.form( + signal_snapshot=signal_snapshot, + text_mem=text_mem, + cube_id=cube_id, + ) + + # Step 2: attach recall material to each cluster. + clusters = self.recall_strategy.gather( + clusters=clusters, + text_mem=text_mem, + cube_id=cube_id, + ) + + # Step 3a: convert recalled clusters into write intents. + results = self.reasoning_strategy.reason( + clusters=clusters, + text_mem=text_mem, + cube_id=cube_id, + ) + + # Step 3b: attach explainable diary artifacts. + results = self.diary_strategy.generate( + clusters=clusters, + results=results, + mem_cube_id=mem_cube_id, + ) + + # Step 4: hand persistence over to the final strategy. + self.persistence_strategy.persist( + results=results, + text_mem=text_mem, + cube_id=cube_id, + mem_cube_id=mem_cube_id, + user_id=user_id, + signal_snapshot=signal_snapshot, + ) + return results diff --git a/src/memos/dream/pipeline/diary.py b/src/memos/dream/pipeline/diary.py new file mode 100644 index 000000000..15f0a5d5d --- /dev/null +++ b/src/memos/dream/pipeline/diary.py @@ -0,0 +1,148 @@ +from __future__ import annotations + +import logging + +from typing import TYPE_CHECKING + +from memos.dream.types import DreamDiaryEntry + + +if TYPE_CHECKING: + from memos.dream.types import DreamAction, DreamCluster, DreamResult + + +logger = logging.getLogger(__name__) + +_TITLE_MAX_LEN = 30 +_TITLE_SENTENCE_TERMINATORS = ("。", "!", "?", ".", "!", "?", "\n") + + +class StructuredDiarySummary: + """Dream diary generation stage. + + Builds the human-readable diary entry directly from the reasoning + output, without an additional LLM call. The reasoning stage already + produces the most carefully crafted artifact of the pipeline — the + first-person dream itself; the diary's job is to package that content + with motive context and lightweight metadata so it can be retrieved + later as a coherent nightly reflection. + + The diary is a **user-facing explainability artifact**, not functional + memory. It is excluded from AI recall paths (InsightMemory handles + that). Think of it as the "dream journal" the user can browse. + + Per-cluster output: + - ``title`` — short label derived from the motive description. + - ``summary`` — the motive description (one or two sentences). + - ``dream_entry`` — the dream content produced by reasoning, if any. + - ``motive`` — type, why_now, and source/related memory counts. + - ``themes`` — empty by default; community plugins can replace + this strategy to add keyword/topic extraction. + - ``status`` — ``"completed"`` when a real dream was produced, + ``"skipped"`` when reasoning judged the material + too thin or only emitted a placeholder. + + Community Extension Ideas + ------------------------- + Contributors looking to enhance the diary stage may consider (but are + not limited to) the following directions: + + 1. **Style separation** — The reasoning stage produces dense, precise + InsightMemory for AI recall. The diary could add an LLM rewrite + pass that converts the insight into a lighter, more conversational + tone for human consumption (e.g. "your AI's nightly journal"). + On failure, fall back to the raw insight text (current behavior). + + 2. **Proactive surfacing** — Instead of passively waiting for the user + to query the diary API, the assistant can proactively bring up dream + content at the start of the next conversation ("I had a dream last + night..."). This turns the diary into an active dialogue trigger + rather than a static artifact. Implementation considerations: + + - Add a ``surfaced`` status field to ``DreamDiaryEntry`` tracking + whether the diary has been presented to the user. + - Hook into the chat entry point: on new session, check for unsurfaced + diary entries and inject dream context into the conversation opener. + - Design a frequency/relevance policy so the assistant does not + overwhelm the user (e.g. surface at most one dream per session, + prefer higher-confidence dreams, respect user opt-out signals). + """ + + def bind_context(self, context: dict) -> None: + self.context = context + + def generate( + self, + *, + clusters: list[DreamCluster], + results: list[DreamResult], + mem_cube_id: str, + ) -> list[DreamResult]: + cluster_map = {c.cluster_id: c for c in clusters} + for result in results: + cluster = cluster_map.get(result.cluster_id) + result.diary_entry = self._build_entry(cluster, result) + return results + + def _build_entry(self, cluster: DreamCluster | None, result: DreamResult) -> DreamDiaryEntry: + motive = cluster.motive if cluster else None + motive_description = motive.description if motive else "" + source_count = len(motive.memory_ids) if motive else 0 + related_count = len(cluster.recalled_items) if cluster else 0 + + dream_action = self._first_real_dream(result.actions) + if dream_action is not None: + dream_entry = dream_action.new_content + status = "completed" + else: + dream_entry = "" + status = "skipped" + + return DreamDiaryEntry( + title=self._make_title(motive_description), + summary=motive_description or "(no motive description)", + dream_entry=dream_entry, + motive={ + "type": motive.motive_type.value if motive else "newness", + "why_now": motive_description, + "source_memory_count": source_count, + "related_memory_count": related_count, + }, + themes=[], + status=status, + ) + + @staticmethod + def _first_real_dream(actions: list[DreamAction]) -> DreamAction | None: + """Return the first action that carries genuine dream content. + + Fallback actions (empty content, zero confidence) are skipped so + that the diary correctly reflects them as ``skipped``. + """ + for action in actions: + if action.new_content and action.new_content.strip() and action.confidence > 0: + return action + return None + + @staticmethod + def _make_title(motive_description: str) -> str: + """Derive a short title from the motive description. + + Prefer the first sentence; if it exceeds ``_TITLE_MAX_LEN``, + truncate with an ellipsis. Language-neutral — works for both + Chinese and English motives. + """ + text = motive_description.strip() + if not text: + return "Dream" + + first_break = len(text) + for sep in _TITLE_SENTENCE_TERMINATORS: + idx = text.find(sep) + if 0 <= idx < first_break: + first_break = idx + 1 + + first_sentence = text[:first_break].rstrip() + if len(first_sentence) <= _TITLE_MAX_LEN: + return first_sentence + return first_sentence[:_TITLE_MAX_LEN].rstrip() + "…" diff --git a/src/memos/dream/pipeline/motive.py b/src/memos/dream/pipeline/motive.py new file mode 100644 index 000000000..7044ba20d --- /dev/null +++ b/src/memos/dream/pipeline/motive.py @@ -0,0 +1,119 @@ +from __future__ import annotations + +import json +import logging + +from memos.dream.prompts import MOTIVE_FORMATION_PROMPT +from memos.dream.types import DreamCluster, DreamMotive, MotiveType + + +logger = logging.getLogger(__name__) + + +class MotiveFormation: + """LLM-powered Dream motive formation stage. + + Reads pending memory content, asks the LLM to identify which groups are + worth consolidating, and allows the LLM to decline (return empty) if + nothing merits a Dream run. + + Falls back to a simple single-cluster heuristic if LLM is unavailable. + """ + + def __init__(self, *, max_motives: int = 3) -> None: + self.max_motives = max_motives + + def bind_context(self, context: dict) -> None: + self.context = context + + def form(self, *, signal_snapshot, text_mem, cube_id: str) -> list[DreamCluster]: + if not signal_snapshot.pending_memory_ids: + return [] + + memories = self._fetch_memory_content(signal_snapshot.pending_memory_ids, text_mem, cube_id) + if not memories: + return self._fallback(signal_snapshot) + + llm = self.context.get("shared", {}).get("llm") + if llm is None: + logger.info("[Dream] LLM unavailable; using fallback motive formation.") + return self._fallback(signal_snapshot) + + return self._llm_form(llm, memories, signal_snapshot) + + def _llm_form(self, llm, memories: list[dict], signal_snapshot) -> list[DreamCluster]: + memories_block = "\n".join(f"- [{m['id']}] {m['content'][:8000]}" for m in memories) + prompt = MOTIVE_FORMATION_PROMPT.format( + memories_block=memories_block, + max_motives=self.max_motives, + ) + + try: + response = llm.generate([{"role": "user", "content": prompt}]) + motives_raw = json.loads(response.strip().removeprefix("```json").removesuffix("```")) + except Exception: + logger.exception("[Dream] LLM motive formation failed; using fallback.") + return self._fallback(signal_snapshot) + + if not isinstance(motives_raw, list) or len(motives_raw) == 0: + logger.info("[Dream] LLM decided nothing is worth dreaming about.") + return [] + + clusters = [] + for raw in motives_raw[: self.max_motives]: + motive = DreamMotive( + motive_id=raw.get("motive_id", f"motive:{signal_snapshot.mem_cube_id}"), + motive_type=MotiveType.NEWNESS, + description=raw.get("description", ""), + memory_ids=raw.get("memory_ids", []), + ) + clusters.append( + DreamCluster( + cluster_id=f"cluster:{motive.motive_id}", + motive=motive, + ) + ) + return clusters + + def _fetch_memory_content(self, memory_ids: list[str], text_mem, cube_id: str) -> list[dict]: + """Retrieve actual memory text for motive analysis. + + Results are re-ordered to match the input ``memory_ids`` sequence + because graph DB ``get_nodes`` does not guarantee return order. + """ + graph_db = self.context.get("shared", {}).get("graph_db") + if graph_db is None: + return [] + + try: + nodes = graph_db.get_nodes(memory_ids, user_name=cube_id) + except Exception: + logger.warning("[Dream] failed to fetch memory content for motive analysis.") + return [] + + node_by_id: dict[str, dict] = {} + for node in nodes: + if isinstance(node, dict): + node_id = node.get("id", "") + if node_id: + node_by_id[node_id] = { + "id": node_id, + "content": node.get("memory", "") or node.get("content", ""), + } + + return [node_by_id[mid] for mid in memory_ids if mid in node_by_id] + + def _fallback(self, signal_snapshot) -> list[DreamCluster]: + """Simple single-cluster fallback when LLM is unavailable.""" + motive = DreamMotive( + motive_id=f"motive:{signal_snapshot.mem_cube_id}", + motive_type=MotiveType.NEWNESS, + description="Fallback: pending memories accumulated without LLM analysis.", + memory_ids=list(signal_snapshot.pending_memory_ids), + ) + return [ + DreamCluster( + cluster_id=f"cluster:{signal_snapshot.mem_cube_id}", + motive=motive, + ) + ] diff --git a/src/memos/dream/pipeline/persistence.py b/src/memos/dream/pipeline/persistence.py new file mode 100644 index 000000000..a190fb4ed --- /dev/null +++ b/src/memos/dream/pipeline/persistence.py @@ -0,0 +1,350 @@ +from __future__ import annotations + +import logging + +from datetime import datetime +from typing import Any + +from memos.dream.hook_defs import DreamH +from memos.dream.types import DreamActionType, TargetMemoryType +from memos.plugins.hooks import trigger_hook + + +logger = logging.getLogger(__name__) + + +class DreamPersistence: + """Dream persistence strategy. + + Two-track write: + 1. Memory store write-back — execute each DreamAction against the + appropriate memory type (LongTermMemory, SkillMemory, etc.) + 2. Dream Diary write — store the diary entry in graph_db for retrieval. + + Persistence conditions (enforced before writing): + - Hypothetical deduction: the action must carry a non-empty `rationale` + demonstrating that a question can be answered better with this memory. + - Zero-confidence actions are skipped. + """ + + def __init__(self) -> None: + self.context: dict[str, Any] = {} + + def bind_context(self, context: dict) -> None: + self.context = context + + def persist( + self, + *, + results, + text_mem, + cube_id: str, + mem_cube_id: str, + user_id: str, + signal_snapshot, + ) -> None: + trigger_hook(DreamH.DREAM_BEFORE_PERSIST, mem_cube_id=mem_cube_id, results=results) + + for result in results: + self._execute_actions( + actions=result.actions, + text_mem=text_mem, + cube_id=cube_id, + mem_cube_id=mem_cube_id, + user_id=user_id, + ) + self._persist_diary( + result=result, + cube_id=cube_id, + mem_cube_id=mem_cube_id, + user_id=user_id, + signal_snapshot=signal_snapshot, + ) + + trigger_hook(DreamH.DREAM_AFTER_PERSIST, mem_cube_id=mem_cube_id, results=results) + + def _execute_actions( + self, + *, + actions, + text_mem, + cube_id: str, + mem_cube_id: str, + user_id: str, + ) -> None: + """Execute DreamActions against the memory store. + + Only actions that pass persistence conditions are written: + - Must have a non-empty rationale (hypothetical deduction) + - Must have confidence > 0 + """ + if text_mem is None: + logger.info("[Dream] text_mem is unavailable; skip memory write-back.") + return + + for action in actions: + if not self._passes_persistence_condition(action): + logger.debug( + "[Dream] action skipped (condition not met): type=%s, target=%s", + action.action_type.value, + action.target_memory_type.value, + ) + continue + + try: + self._dispatch_action( + action=action, + text_mem=text_mem, + cube_id=cube_id, + mem_cube_id=mem_cube_id, + user_id=user_id, + ) + except Exception: + logger.exception( + "[Dream] failed to execute action: type=%s, target_memory_type=%s", + action.action_type.value, + action.target_memory_type.value, + ) + + @staticmethod + def _passes_persistence_condition(action) -> bool: + """Enforce persistence conditions. + + A Dream action must justify its existence: the rationale field must + explain how this memory helps answer a concrete question better. + """ + if not action.rationale: + return False + return not action.confidence <= 0 + + def _dispatch_action( + self, + *, + action, + text_mem, + cube_id: str, + mem_cube_id: str, + user_id: str, + ) -> None: + """Route a DreamAction to the appropriate memory write operation. + + Community implementations should flesh out each branch with the real + memory-type-specific write logic. + """ + dream_metadata = { + "source": "dream", + "dream_action_type": action.action_type.value, + "target_memory_type": action.target_memory_type.value, + "rationale": action.rationale, + "confidence": action.confidence, + "source_memory_ids": action.source_memory_ids, + } + + if action.target_memory_type == TargetMemoryType.DREAM_DIARY: + return + + if action.action_type == DreamActionType.CREATE: + self._write_memory_create( + action=action, + text_mem=text_mem, + cube_id=cube_id, + dream_metadata=dream_metadata, + ) + elif action.action_type == DreamActionType.UPDATE: + self._write_memory_update( + action=action, + text_mem=text_mem, + cube_id=cube_id, + dream_metadata=dream_metadata, + ) + elif action.action_type == DreamActionType.MERGE: + self._write_memory_merge( + action=action, + text_mem=text_mem, + cube_id=cube_id, + dream_metadata=dream_metadata, + ) + elif action.action_type == DreamActionType.ARCHIVE: + self._write_memory_archive( + action=action, + text_mem=text_mem, + cube_id=cube_id, + dream_metadata=dream_metadata, + ) + + def _write_memory_create(self, *, action, text_mem, cube_id, dream_metadata) -> None: + """Create a new memory node from Dream reasoning output.""" + graph_db = self.context.get("shared", {}).get("graph_db") + if graph_db is None: + logger.info("[Dream] graph_db unavailable for CREATE action.") + return + + from uuid import uuid4 + + node_id = f"dream_mem_{uuid4().hex}" + metadata = { + "type": action.target_memory_type.value, + "memory_type": action.target_memory_type.value, + "status": "activated", + "source": "dream", + "created_at": datetime.utcnow().isoformat(), + **dream_metadata, + **action.metadata, + } + graph_db.add_node(node_id, action.new_content, metadata, user_name=cube_id) + logger.info( + "[Dream] created memory node: id=%s, type=%s", + node_id, + action.target_memory_type.value, + ) + + def _write_memory_update(self, *, action, text_mem, cube_id, dream_metadata) -> None: + """Update an existing memory node with Dream-consolidated content.""" + graph_db = self.context.get("shared", {}).get("graph_db") + if graph_db is None or not action.target_memory_id: + return + + metadata = { + "updated_at": datetime.utcnow().isoformat(), + "last_dream_update": datetime.utcnow().isoformat(), + **dream_metadata, + **action.metadata, + } + try: + graph_db.update_node( + action.target_memory_id, + action.new_content, + metadata, + user_name=cube_id, + ) + logger.info("[Dream] updated memory: id=%s", action.target_memory_id) + except Exception: + logger.exception("[Dream] update_node failed: id=%s", action.target_memory_id) + + def _write_memory_merge(self, *, action, text_mem, cube_id, dream_metadata) -> None: + """Merge multiple source memories into a new consolidated node.""" + graph_db = self.context.get("shared", {}).get("graph_db") + if graph_db is None: + return + + from uuid import uuid4 + + merged_id = f"dream_merged_{uuid4().hex}" + metadata = { + "type": action.target_memory_type.value, + "memory_type": action.target_memory_type.value, + "status": "activated", + "source": "dream", + "merged_from": action.source_memory_ids, + "created_at": datetime.utcnow().isoformat(), + **dream_metadata, + **action.metadata, + } + graph_db.add_node(merged_id, action.new_content, metadata, user_name=cube_id) + + for src_id in action.source_memory_ids: + try: + graph_db.update_node( + src_id, + None, + {"status": "merged", "merged_into": merged_id}, + user_name=cube_id, + ) + except Exception: + logger.warning("[Dream] could not mark source %s as merged", src_id) + + logger.info("[Dream] merged %d memories → %s", len(action.source_memory_ids), merged_id) + + def _write_memory_archive(self, *, action, text_mem, cube_id, dream_metadata) -> None: + """Archive a memory that Dream deems no longer valuable.""" + graph_db = self.context.get("shared", {}).get("graph_db") + if graph_db is None or not action.target_memory_id: + return + + try: + graph_db.update_node( + action.target_memory_id, + None, + { + "status": "archived", + "archived_at": datetime.utcnow().isoformat(), + "archive_reason": action.rationale, + **dream_metadata, + }, + user_name=cube_id, + ) + logger.info("[Dream] archived memory: id=%s", action.target_memory_id) + except Exception: + logger.exception("[Dream] archive failed: id=%s", action.target_memory_id) + + def _persist_diary( + self, + *, + result, + cube_id: str, + mem_cube_id: str, + user_id: str, + signal_snapshot, + ) -> None: + """Persist the Dream diary entry to graph_db.""" + entry = result.diary_entry + if entry is None: + return + + graph_db = self.context.get("shared", {}).get("graph_db") + if graph_db is None: + logger.info("[Dream] graph_db unavailable; skip diary persistence.") + return + + metadata = self._build_diary_metadata( + entry=entry, + result=result, + user_id=user_id, + mem_cube_id=mem_cube_id, + signal_snapshot=signal_snapshot, + ) + try: + graph_db.add_node( + entry.diary_id, + entry.format_content(), + metadata, + user_name=cube_id, + ) + except Exception: + logger.exception( + "[Dream] failed to persist diary: mem_cube_id=%s, cluster=%s", + mem_cube_id, + result.cluster_id, + ) + + @staticmethod + def _build_diary_metadata(*, entry, result, user_id, mem_cube_id, signal_snapshot) -> dict: + return { + "type": "dream_diary", + "memory_type": "DreamDiary", + "status": entry.status, + "source": "system", + "title": entry.title, + "summary": entry.summary, + "dream_entry": entry.dream_entry, + "motive": entry.motive, + "themes": entry.themes, + "tags": ["dream", "diary"], + "created_at": entry.created_at.isoformat(), + "updated_at": datetime.utcnow().isoformat(), + "user_id": user_id, + "session_id": getattr(signal_snapshot, "session_id", ""), + "actions_summary": [ + { + "type": a.action_type.value, + "target": a.target_memory_type.value, + "rationale": a.rationale, + } + for a in result.actions + ], + "info": { + "cluster_id": result.cluster_id, + "mem_cube_id": mem_cube_id, + "pending_memory_ids": list(getattr(signal_snapshot, "pending_memory_ids", [])), + }, + } diff --git a/src/memos/dream/pipeline/reasoning.py b/src/memos/dream/pipeline/reasoning.py new file mode 100644 index 000000000..ac7e9fbbb --- /dev/null +++ b/src/memos/dream/pipeline/reasoning.py @@ -0,0 +1,189 @@ +from __future__ import annotations + +import json +import logging + +from typing import TYPE_CHECKING, Any + +from memos.dream.prompts.reasoning_prompt import CONSOLIDATION_REASONING_PROMPT +from memos.dream.types import ( + DreamAction, + DreamActionType, + DreamResult, + TargetMemoryType, +) + + +if TYPE_CHECKING: + from memos.dream.types import DreamCluster + + +logger = logging.getLogger(__name__) + +_LLM_DREAM_CONFIDENCE = 0.9 + + +class ConsolidationReasoning: + """Dream reasoning stage — LLM-powered deep dreaming. + + For each cluster the reasoning stage produces at most ONE dream: + 1. Fetches the **source memories** (that triggered the motive) from graph_db + so the LLM can see the original experiences, not just the motive summary. + 2. Combines them with **related memories** (from semantic recall) to build + a prompt that asks the LLM to dream deeply: reframe problems, connect + dots across conversations, and produce one concrete insight. + 3. Parses the LLM response into a single ``DreamAction`` (CREATE / + InsightMemory) for downstream persistence; if the LLM judges the + material too thin, no action is produced. + + Falls back to a placeholder CREATE action when the LLM is unavailable + or returns an unparseable response. + """ + + def __init__(self) -> None: + self.context: dict[str, Any] = {} + + def bind_context(self, context: dict) -> None: + self.context = context + + def reason(self, *, clusters: list[DreamCluster], text_mem, cube_id: str) -> list[DreamResult]: + llm = self.context.get("shared", {}).get("llm") + + results: list[DreamResult] = [] + for cluster in clusters: + if llm is not None: + action = self._llm_reason(llm, cluster, cube_id) + else: + action = self._fallback_action(cluster) + + actions = [action] if action is not None else [] + results.append(DreamResult(cluster_id=cluster.cluster_id, actions=actions)) + return results + + # ------------------------------------------------------------------ + # LLM reasoning + # ------------------------------------------------------------------ + + def _llm_reason(self, llm, cluster: DreamCluster, cube_id: str) -> DreamAction | None: + source_block = self._fetch_source_memories(cluster.motive.memory_ids, cube_id) + + if cluster.recalled_items: + chronological = sorted( + cluster.recalled_items, + key=lambda x: x.get("created_at", ""), + ) + related_block = self._format_memory_block(chronological) + else: + related_block = "(none)" + + prompt = CONSOLIDATION_REASONING_PROMPT.format( + motive_description=cluster.motive.description, + source_memories_block=source_block, + related_memories_block=related_block, + ) + + try: + response = llm.generate([{"role": "user", "content": prompt}]) + raw = json.loads(response.strip().removeprefix("```json").removesuffix("```")) + except Exception: + logger.exception( + "[Dream Reasoning] LLM call or JSON parse failed for cluster=%s; using fallback.", + cluster.cluster_id, + ) + return self._fallback_action(cluster) + + if not isinstance(raw, dict): + return self._fallback_action(cluster) + + return self._parse_dream(raw, cluster) + + def _parse_dream(self, raw_dream: dict[str, Any], cluster: DreamCluster) -> DreamAction | None: + content = raw_dream.get("dream_content", "").strip() + question = raw_dream.get("hypothetical_question", "").strip() + if not content: + logger.info( + "[Dream Reasoning] LLM produced no usable dream for cluster=%s", + cluster.cluster_id, + ) + return None + + return DreamAction( + action_type=DreamActionType.CREATE, + target_memory_type=TargetMemoryType.INSIGHT, + source_memory_ids=list(cluster.motive.memory_ids), + new_content=content, + rationale=question, + confidence=_LLM_DREAM_CONFIDENCE, + ) + + # ------------------------------------------------------------------ + # Source memory fetching + # ------------------------------------------------------------------ + + def _fetch_source_memories(self, memory_ids: list[str], cube_id: str) -> str: + """Retrieve source memory content from graph_db. + + The recall stage intentionally excludes source memories from its + embedding search results (to avoid trivial self-matches), so we + fetch them here directly so the LLM can see the full picture. + """ + if not memory_ids: + return "(none)" + + graph_db = self.context.get("shared", {}).get("graph_db") + if graph_db is None: + return "(none)" + + try: + nodes = graph_db.get_nodes(memory_ids, user_name=cube_id) + except Exception: + logger.warning("[Dream Reasoning] failed to fetch source memories from graph_db.") + return "(none)" + + items: list[dict[str, Any]] = [] + for node in nodes or []: + if not isinstance(node, dict): + continue + metadata = node.get("metadata") if isinstance(node.get("metadata"), dict) else {} + items.append( + { + "id": node.get("id", "unknown"), + "memory": node.get("memory", "") or node.get("content", ""), + "created_at": metadata.get("created_at", "") if metadata else "", + } + ) + + items.sort(key=lambda x: x.get("created_at", "")) + return self._format_memory_block(items) if items else "(none)" + + # ------------------------------------------------------------------ + # Formatting + # ------------------------------------------------------------------ + + @staticmethod + def _format_memory_block(items: list[dict[str, Any]]) -> str: + lines: list[str] = [] + for item in items: + mid = item.get("id", "unknown") + content = item.get("memory", "") or item.get("content", "") + score = item.get("score") + suffix = f" (relevance: {score:.3f})" if score is not None else "" + lines.append(f"- [{mid}]{suffix} {content[:1200]}") + return "\n".join(lines) if lines else "(none)" + + # ------------------------------------------------------------------ + # Fallback + # ------------------------------------------------------------------ + + @staticmethod + def _fallback_action(cluster: DreamCluster) -> DreamAction | None: + if not cluster.motive.memory_ids: + return None + return DreamAction( + action_type=DreamActionType.CREATE, + target_memory_type=TargetMemoryType.INSIGHT, + source_memory_ids=list(cluster.motive.memory_ids), + new_content="", + rationale="Fallback: LLM unavailable, placeholder for manual review.", + confidence=0.0, + ) diff --git a/src/memos/dream/pipeline/recall.py b/src/memos/dream/pipeline/recall.py new file mode 100644 index 000000000..fee9cfcd0 --- /dev/null +++ b/src/memos/dream/pipeline/recall.py @@ -0,0 +1,112 @@ +from __future__ import annotations + +import logging + +from typing import TYPE_CHECKING, Any + + +if TYPE_CHECKING: + from memos.dream.types import DreamCluster + + +logger = logging.getLogger(__name__) + +_RECALL_TOP_K = 10 +_RECALL_SCOPES = ("UserMemory", "LongTermMemory") + + +class DirectRecall: + """Dream recall stage — use source memory embeddings to recall related memories. + + For each memory_id in the cluster, retrieves its embedding and runs + ``search_by_embedding`` against the graph DB once per allowed scope + (``UserMemory`` and ``LongTermMemory``). Results across all source + memories and scopes are deduplicated and sorted by score, then the top-k + are kept as ``cluster.recalled_items``. + + Scope is restricted on purpose: Dream-produced nodes (DreamDiary, + InsightMemory, …) and short-lived WorkingMemory are excluded so that + each Dream run reflects on the user's real daytime experiences rather + than its own previous outputs. + """ + + def __init__(self, *, recall_top_k: int = _RECALL_TOP_K) -> None: + self.recall_top_k = recall_top_k + + def bind_context(self, context: dict) -> None: + self.context = context + + def gather(self, *, clusters: list[DreamCluster], text_mem, cube_id: str) -> list[DreamCluster]: + if not clusters: + return clusters + + graph_db = self.context.get("shared", {}).get("graph_db") + + for cluster in clusters: + cluster.recalled_items = self._recall_by_embeddings( + cluster.motive.memory_ids, graph_db, cube_id + ) + logger.info( + "[Dream Recall] cluster=%s recalled=%d", + cluster.cluster_id, + len(cluster.recalled_items), + ) + + return clusters + + def _recall_by_embeddings( + self, memory_ids: list[str], graph_db, cube_id: str + ) -> list[dict[str, Any]]: + if not memory_ids or graph_db is None: + return [] + + try: + nodes = graph_db.get_nodes(memory_ids, include_embedding=True, user_name=cube_id) + except Exception: + logger.warning("[Dream Recall] failed to fetch source memory embeddings.") + return [] + + source_id_set = set(memory_ids) + seen: dict[str, dict[str, Any]] = {} + + for node in nodes or []: + if not isinstance(node, dict): + continue + metadata = node.get("metadata") if isinstance(node.get("metadata"), dict) else {} + embedding = metadata.get("embedding") if metadata else None + if not embedding: + continue + + for scope in _RECALL_SCOPES: + try: + hits = graph_db.search_by_embedding( + embedding, + top_k=self.recall_top_k, + scope=scope, + status="activated", + return_fields=["memory", "created_at"], + user_name=cube_id, + ) + except Exception: + logger.warning( + "[Dream Recall] embedding search failed for node=%s scope=%s", + node.get("id"), + scope, + ) + continue + + for hit in hits: + hit_id = hit.get("id", "") + if not hit_id or hit_id in source_id_set: + continue + score = float(hit.get("score", 0.0)) + if hit_id not in seen or score > seen[hit_id]["score"]: + seen[hit_id] = { + "id": hit_id, + "memory": hit.get("memory", ""), + "score": score, + "created_at": hit.get("created_at", ""), + } + + ranked = sorted(seen.values(), key=lambda x: x["score"], reverse=True) + return ranked[: self.recall_top_k] diff --git a/src/memos/dream/plugin.py b/src/memos/dream/plugin.py new file mode 100644 index 000000000..e44a68df1 --- /dev/null +++ b/src/memos/dream/plugin.py @@ -0,0 +1,99 @@ +from __future__ import annotations + +import json +import logging + +from functools import partial +from typing import Any + +from memos.dream.hooks import on_add_signal, on_dream_execute +from memos.dream.pipeline import ( + AbstractDreamPipeline, + ConsolidationReasoning, + DirectRecall, + DreamPersistence, + MotiveFormation, + StructuredDiarySummary, +) +from memos.dream.routers.diary_router import create_diary_router +from memos.dream.routers.trigger_router import create_trigger_router +from memos.dream.signal_store import DreamSignalStore +from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem +from memos.mem_scheduler.schemas.task_schemas import MEM_DREAM_TASK_LABEL +from memos.plugins.base import MemOSPlugin +from memos.plugins.hook_defs import H + + +logger = logging.getLogger(__name__) + + +class CommunityDreamPlugin(MemOSPlugin): + """Minimal built-in Dream plugin shipped with the core repository. + + The current Dream plugin provides scheduler wiring and replaceable pipeline + stage boundaries only. Community contributors can implement richer signal + sources, recall, reasoning, diary persistence, and trigger policies behind + the same hooks. + """ + + name = "dream" + version = "0.1.0" + description = "Built-in Dream plugin" + priority = 10 + + def on_load(self) -> None: + self.context: dict[str, Any] = {"shared": {}, "configs": {}} + self.signal_store = DreamSignalStore() + self.pipeline = AbstractDreamPipeline( + motive_strategy=MotiveFormation(), + recall_strategy=DirectRecall(), + reasoning_strategy=ConsolidationReasoning(), + diary_strategy=StructuredDiarySummary(), + persistence_strategy=DreamPersistence(), + ) + + # Hook registration happens at load time because scheduler-triggered Dream + # execution does not depend on FastAPI route binding. + self.register_hook(H.DREAM_EXECUTE, partial(on_dream_execute, self)) + self.register_hook(H.ADD_AFTER, partial(on_add_signal, self)) + logger.info("[Dream] plugin loaded") + + def init_components(self, context: dict) -> None: + # Keep the mutable context reference directly. The server bootstrap updates + # scheduler handles later, and the plugin should see those changes in-place. + self.context = context + self.pipeline.bind_context(context) + + def init_app(self) -> None: + self.register_router(create_trigger_router(self)) + self.register_router(create_diary_router(self)) + logger.info("[Dream] plugin initialized") + + def on_shutdown(self) -> None: + self.context = {"shared": {}, "configs": {}} + logger.info("[Dream] plugin shutdown") + + def submit_dream_task( + self, + *, + mem_cube_id: str, + user_id: str, + user_name: str, + signal_snapshot, + ) -> bool: + submit_messages = self.context.get("shared", {}).get("submit_scheduler_messages") + if submit_messages is None: + logger.info( + "[Dream] scheduler submit handle is unavailable; keep framework signal only." + ) + return False + + message = ScheduleMessageItem( + user_id=user_id or "", + mem_cube_id=mem_cube_id, + label=MEM_DREAM_TASK_LABEL, + content=json.dumps(signal_snapshot.model_dump(mode="json")), + user_name=user_name or "", + ) + submit_messages([message]) + return True diff --git a/src/memos/dream/prompts/__init__.py b/src/memos/dream/prompts/__init__.py new file mode 100644 index 000000000..07bd1415d --- /dev/null +++ b/src/memos/dream/prompts/__init__.py @@ -0,0 +1,8 @@ +from memos.dream.prompts.motive_prompt import MOTIVE_FORMATION_PROMPT +from memos.dream.prompts.reasoning_prompt import CONSOLIDATION_REASONING_PROMPT + + +__all__ = [ + "CONSOLIDATION_REASONING_PROMPT", + "MOTIVE_FORMATION_PROMPT", +] diff --git a/src/memos/dream/prompts/motive_prompt.py b/src/memos/dream/prompts/motive_prompt.py new file mode 100644 index 000000000..44092a1f7 --- /dev/null +++ b/src/memos/dream/prompts/motive_prompt.py @@ -0,0 +1,74 @@ +MOTIVE_FORMATION_PROMPT = """You are the Dream module of a personal AI assistant. + +During the day, this assistant continuously talks with the user — answering questions, giving advice, brainstorming together, helping with tasks. The memories below were captured from those daytime conversations, listed in chronological order. + +Now the user is away. This is your chance to step back and reflect on the day as a whole, offline and without time pressure. + +## Memories (in chronological order) + +{memories_block} + +## What to look for + +Not every memory is equally important. Some conversations leave a sense of cognitive incompleteness — things worth revisiting when the user is not waiting for an immediate answer. + +Pay special attention to CROSS-CONVERSATION patterns. The most valuable Dream motive is often one that CONNECTS conversations the daytime AI treated as separate topics. Ask yourself: +- Did the user express the same type of dissatisfaction, emotion, or unresolved feeling across multiple different topics? If so, those topics may actually be about the same deeper problem. +- Did several seemingly unrelated conversations share a hidden structural similarity — for example, the user kept asking for "direction" or "meaning" rather than "more details"? +- Could multiple fragmented discussions be reframed as one coherent question that the daytime AI never recognized? + +When you find such a cross-conversation pattern, prefer grouping those memories into ONE motive rather than splitting them into separate per-topic motives. Splitting them would repeat the same mistake the daytime AI already made. + +Other strong Dream motives include: +- A user problem that was discussed but never truly resolved +- A topic that came up repeatedly, suggesting it matters more than any single mention shows +- Emotionally charged exchanges — frustration, excitement, anxiety, or vulnerability +- Contradictions or tensions between different pieces of information +- Signals about the user's deeper goals, personality, habits, or preferences +- Information that is very likely to matter again in the future + +Weak or invalid motives include: +- Routine, fully resolved exchanges +- Isolated trivia with no connection to anything else +- Memories that are already well-organized and need no further consolidation + +## What Dream is + +Dream is NOT a summary of the day. + +Dream is an offline reflection process. While the user is away, the assistant thinks about its memories in order to: +- Understand the user more deeply than the daytime conversations allowed +- Reorganize fragmented information into coherent insights +- Discover hidden patterns the user has not explicitly stated +- Reframe problems — the user's real question may be different from what they literally asked +- Consolidate knowledge for long-term retention +- Identify open questions worth tracking in future conversations + +## Instructions + +Analyze the memories above and produce dream motives. Each motive represents a reason to consolidate a group of memories. + +CRITICAL RULES: +- Fewer motives are better. If all the memories revolve around the same underlying theme, frustration, or unresolved need, output exactly ONE motive that covers all of them. Do NOT split one theme into multiple motives just because the surface topics differ. +- Only create a separate motive when two groups of memories are genuinely about DIFFERENT underlying issues with no meaningful connection to each other. +- Maximum {max_motives} motives, but 1 is perfectly fine and often correct. + +For each motive, explain WHY it is worth dreaming about — what cognitive gap, hidden connection, or unresolved tension does it address? + +If NONE of the memories are worth dreaming about, return an empty list. + +## Output Format + +IMPORTANT: Your output language (especially the "description" field) MUST match the primary language of the conversations above. If the user spoke Chinese, write in Chinese. If the user spoke English, write in English. + +Return ONLY a JSON array (no markdown fencing). Each element: +``` +{{ + "motive_id": "", + "description": "<1-2 sentence reason why this group is worth dreaming about>", + "memory_ids": ["", "", ...] +}} +``` + +If nothing is worth dreaming about, return: [] +""" diff --git a/src/memos/dream/prompts/reasoning_prompt.py b/src/memos/dream/prompts/reasoning_prompt.py new file mode 100644 index 000000000..4f0d683d1 --- /dev/null +++ b/src/memos/dream/prompts/reasoning_prompt.py @@ -0,0 +1,80 @@ +CONSOLIDATION_REASONING_PROMPT = """You are the Dream module of a personal AI assistant, now in the **dreaming** stage. + +## What Dream Is For + +Dream exists to do what the daytime AI cannot: **solve the user's unresolved problems**, or at least deeply explore solution paths and produce genuine insights. + +During daytime conversations, the AI responds in real time — it gives quick, locally reasonable answers but often misses the bigger picture. Now the user is away. You have the time and space to think without pressure. Your job is to engage with the user's real problems and produce thinking deep enough that, when the user returns, the assistant is meaningfully smarter. + +This is NOT summarization. If your output merely restates what the memories already say, you have failed. + +## Dream Motive + +{motive_description} + +## Source Memories (chronological — the experiences that triggered this dream) + +{source_memories_block} + +## Related Memories (chronological — other experiences that may connect) + +{related_memories_block} + +## How to Dream + +### First: find the thread + +Read the memories chronologically. Before writing anything, ask: are these separate problems, or the same struggle appearing in different contexts? If the daytime AI gave locally reasonable answers but the user remained unsatisfied across multiple conversations, there is likely one deeper problem underneath. Find it. + +### Then: produce the thing, not a description of the thing + +If your output could be prefixed with "The user needs..." or "The user wants...", you are describing the problem from the outside, not solving it. The user already knows what they need — they said it. Your job is to produce the thing itself: the insight, the framework, the reframing, the connection, the answer — concrete enough that the assistant could use it directly in the next conversation. + +### Use everything you know + +You are NOT limited to the memories above. Bring in your domain knowledge — design patterns, first principles, frameworks, analogies from other fields, research, industry experience. The memories tell you WHAT problem to think about; your knowledge helps you think about it WELL. + +The best dreams combine the user's specific context (from memories) with broader understanding (from your training) to produce something neither could produce alone. + +### When material is thin + +If the recalled memories are sparse or repetitive, do NOT just rephrase them. The thinner the material, the more YOUR thinking matters: + +- Identify what's MISSING — what question should the user be asking but isn't? +- Use your domain knowledge to explore the problem from angles the user hasn't considered. +- Propose concrete frameworks, approaches, or reframings that go beyond the available material. + +## Example + +Below is a GOOD dream to show the quality bar. The motive was "用户多次提议带妈妈出游但都被婉拒,感到困惑和失落。" Source memories included: + +- 用户兴奋地给妈妈发暑假旅游攻略,妈妈回复"看看再说" +- 周末下午三点妈妈在沙发上睡着了 +- 妈妈说最近加班多,腰酸背痛 +- 用户提议去海边,妈妈说"你们年轻人去吧,我在家歇着挺好" +- 妈妈在家浇花、看电视时显得很放松 + +Dream output: + +{{ + "dream_content": "用户每次提旅游,妈妈都婉拒。用户的失落很真切——精心准备的攻略石沉大海。但我把记忆放在一起时,一个画面浮现:周末下午三点,妈妈在沙发上睡着了。腰酸背痛、加班回来只想浇花看电视的人,收到三亚攻略时涌起的也许不是拒绝,而是光想想就累的疲惫。用户用自己的方式表达爱——走出去、创造回忆;但妈妈此刻能接收到的爱,也许恰恰是'不用走'。当然也可能有经济顾虑或对陌生环境的不安,但身体疲惫是最有证据的解读。下次用户问暑假安排,我不该默认搜机票,而该帮用户看到:问题也许不是'去哪里',而是'怎么在一起'。", + "hypothetical_question": "妈妈总是拒绝出门旅游,怎么和她度过有质量的时间?" +}} + +Notice what makes this dream GOOD: it does not say "the user should understand mom better." It reframes the question itself (from "why won't she go" to "what does togetherness mean to her"), grounds the reframing in specific memory details (falling asleep at 3pm, back pain, watering flowers), and ends with a concrete, actionable shift in thinking. + +## Output + +IMPORTANT: Your output language MUST match the primary language of the memories. + +Return ONLY a JSON (no markdown fencing): + +{{ + "dream_content": "", + "hypothetical_question": "" +}} + +Produce your whole dream. + +If the memories are too thin to produce any dream, return: {{}} +""" diff --git a/src/memos/dream/routers/__init__.py b/src/memos/dream/routers/__init__.py new file mode 100644 index 000000000..b114f2394 --- /dev/null +++ b/src/memos/dream/routers/__init__.py @@ -0,0 +1,5 @@ +from memos.dream.routers.diary_router import create_diary_router +from memos.dream.routers.trigger_router import create_trigger_router + + +__all__ = ["create_diary_router", "create_trigger_router"] diff --git a/src/memos/dream/routers/diary_router.py b/src/memos/dream/routers/diary_router.py new file mode 100644 index 000000000..e4b301953 --- /dev/null +++ b/src/memos/dream/routers/diary_router.py @@ -0,0 +1,92 @@ +from __future__ import annotations + +from fastapi import APIRouter +from pydantic import BaseModel + + +class DiaryFilter(BaseModel): + task_id: str | None = None + created_after: str | None = None + created_before: str | None = None + limit: int = 20 + + +class DiaryQueryRequest(BaseModel): + cube_id: str + filter: DiaryFilter | None = None + + +def create_diary_router(plugin) -> APIRouter: + """Dream diary router. + + Single query endpoint with a structured filter: + POST /dream/diary {"cube_id": "xxx"} + POST /dream/diary {"cube_id": "xxx", "filter": {"limit": 5}} + POST /dream/diary {"cube_id": "xxx", "filter": {"task_id": "dream_diary_xxx"}} + POST /dream/diary {"cube_id": "xxx", "filter": {"created_after": "2026-05-06", "created_before": "2026-05-07"}} + """ + + router = APIRouter(prefix="/dream/diary", tags=["dream-diary"]) + + def _get_graph_db(): + return getattr(plugin, "context", {}).get("shared", {}).get("graph_db") + + @router.get("/health") + def health() -> dict[str, object]: + shared = getattr(plugin, "context", {}).get("shared", {}) + return { + "plugin": plugin.name, + "version": plugin.version, + "scheduler_connected": shared.get("submit_scheduler_messages") is not None, + "trigger_threshold": plugin.signal_store.trigger_threshold, + } + + @router.post("") + def query_diaries(req: DiaryQueryRequest) -> dict[str, object]: + graph_db = _get_graph_db() + if graph_db is None: + return {"code": 503, "message": "graph_db is unavailable.", "data": []} + + f = req.filter or DiaryFilter() + + if f.task_id: + node = graph_db.get_node(f.task_id, user_name=req.cube_id) + items = [_format_item(node)] if node else [] + return {"code": 200, "message": "Dream diary retrieved successfully", "data": items} + + filters = [{"field": "memory_type", "op": "=", "value": "DreamDiary"}] + ids = graph_db.get_by_metadata(filters, user_name=req.cube_id, status="activated") + if not ids: + return {"code": 200, "message": "Dream diary retrieved successfully", "data": []} + + nodes = graph_db.get_nodes(ids, user_name=req.cube_id) + + if f.created_after: + nodes = [n for n in nodes if _created_at(n) >= f.created_after] + if f.created_before: + nodes = [n for n in nodes if _created_at(n) < f.created_before] + + nodes.sort(key=_created_at, reverse=True) + items = [_format_item(n) for n in nodes[: f.limit]] + return {"code": 200, "message": "Dream diary retrieved successfully", "data": items} + + return router + + +def _created_at(node: dict) -> str: + return (node.get("metadata") or {}).get("created_at", "") + + +def _format_item(node: dict) -> dict: + """Transform a graph_db node into the PRD response shape.""" + meta = node.get("metadata") or {} + return { + "task_id": node.get("id", ""), + "status": meta.get("status", "completed"), + "created_at": meta.get("created_at", ""), + "title": meta.get("title", ""), + "summary": meta.get("summary", ""), + "dream_entry": meta.get("dream_entry", ""), + "motive": meta.get("motive"), + "themes": meta.get("themes", []), + } diff --git a/src/memos/dream/routers/trigger_router.py b/src/memos/dream/routers/trigger_router.py new file mode 100644 index 000000000..a07e6f112 --- /dev/null +++ b/src/memos/dream/routers/trigger_router.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +from fastapi import APIRouter + + +def create_trigger_router(plugin) -> APIRouter: + """Dream trigger router. + + The current plugin exposes only cube-level triggering. Session/topic + triggers are useful community extension directions, but are not implemented + in the current plugin. + """ + + router = APIRouter(prefix="/dream/trigger", tags=["dream-trigger"]) + + @router.post("/cube") + def trigger_cube( + cube_id: str, + user_id: str = "", + user_name: str = "", + session_id: str = "", + ) -> dict[str, object]: + snapshot = plugin.signal_store.snapshot( + mem_cube_id=cube_id, + user_id=user_id, + user_name=user_name, + session_id=session_id, + ) + accepted = plugin.submit_dream_task( + mem_cube_id=cube_id, + user_id=user_id, + user_name=user_name, + signal_snapshot=snapshot, + ) + return { + "accepted": accepted, + "plugin": plugin.name, + "scope": "cube", + "mem_cube_id": cube_id, + } + + return router diff --git a/src/memos/dream/routes.py b/src/memos/dream/routes.py new file mode 100644 index 000000000..2d3a44cb6 --- /dev/null +++ b/src/memos/dream/routes.py @@ -0,0 +1,21 @@ +"""Backward-compatible Dream route exports. + +New code should import from `memos.dream.routers.*`. This module remains only so +in-progress branches that import `create_router` do not break during refactors. +""" + +from memos.dream.routers.diary_router import create_diary_router +from memos.dream.routers.trigger_router import create_trigger_router + + +def create_router(plugin): + """Deprecated helper retained for compatibility. + + It returns the trigger router because older callers expected a single router. + Updated plugins should register trigger and diary routers separately. + """ + + return create_trigger_router(plugin) + + +__all__ = ["create_diary_router", "create_router", "create_trigger_router"] diff --git a/src/memos/dream/signal_store.py b/src/memos/dream/signal_store.py new file mode 100644 index 000000000..3a6b08ccc --- /dev/null +++ b/src/memos/dream/signal_store.py @@ -0,0 +1,85 @@ +from __future__ import annotations + +from copy import deepcopy + +from memos.dream.types import DreamSignalSnapshot + + +class DreamSignalStore: + """In-memory Dream signal accumulator. + + We accumulates new memory ids. Can add more + signal channels, ranking policy, persistence, or decay windows around this + minimal store. + """ + + def __init__(self, trigger_threshold: int = 100) -> None: + self.trigger_threshold = trigger_threshold + self._snapshots: dict[str, DreamSignalSnapshot] = {} + + def record_add( + self, + *, + mem_cube_id: str, + user_id: str = "", + user_name: str = "", + session_id: str = "", + memory_ids: list[str] | None = None, + ) -> DreamSignalSnapshot: + snapshot = self._ensure_snapshot( + mem_cube_id=mem_cube_id, + user_id=user_id, + user_name=user_name, + session_id=session_id, + ) + if memory_ids: + snapshot.pending_memory_ids.extend(memory_ids) + return deepcopy(snapshot) + + def snapshot( + self, + *, + mem_cube_id: str, + user_id: str = "", + user_name: str = "", + session_id: str = "", + ) -> DreamSignalSnapshot: + snapshot = self._ensure_snapshot( + mem_cube_id=mem_cube_id, + user_id=user_id, + user_name=user_name, + session_id=session_id, + ) + return deepcopy(snapshot) + + def reset(self, *, mem_cube_id: str) -> None: + self._snapshots.pop(mem_cube_id, None) + + def should_trigger(self, *, mem_cube_id: str) -> bool: + snapshot = self._snapshots.get(mem_cube_id) + if snapshot is None: + return False + return len(snapshot.pending_memory_ids) >= self.trigger_threshold + + def _ensure_snapshot( + self, + *, + mem_cube_id: str, + user_id: str = "", + user_name: str = "", + session_id: str = "", + ) -> DreamSignalSnapshot: + snapshot = self._snapshots.get(mem_cube_id) + if snapshot is None: + snapshot = DreamSignalSnapshot( + mem_cube_id=mem_cube_id, + user_id=user_id, + user_name=user_name, + session_id=session_id, + ) + self._snapshots[mem_cube_id] = snapshot + else: + snapshot.user_id = user_id or snapshot.user_id + snapshot.user_name = user_name or snapshot.user_name + snapshot.session_id = session_id or snapshot.session_id + return snapshot diff --git a/src/memos/dream/types.py b/src/memos/dream/types.py new file mode 100644 index 000000000..40587b70b --- /dev/null +++ b/src/memos/dream/types.py @@ -0,0 +1,152 @@ +from __future__ import annotations + +from datetime import datetime +from enum import Enum +from typing import Any +from uuid import uuid4 + +from pydantic import BaseModel, Field + + +class MotiveType(str, Enum): + """Dream motive taxonomy. + + Each type corresponds to a distinct reason for triggering a Dream run. + Community implementations can add signal producers for any of these. + """ + + NEWNESS = "newness" + FREQUENCY = "frequency" + CONFLICT = "conflict" + FEEDBACK = "feedback" + FRAGMENTATION = "fragmentation" + + +class TargetMemoryType(str, Enum): + """Memory types that Dream can write back to. + + Dream consolidation may produce updates across heterogeneous memory stores. + """ + + LONG_TERM = "LongTermMemory" + SKILL = "SkillMemory" + PROFILE = "ProfileMemory" + PREFERENCE = "PreferenceMemory" + INSIGHT = "InsightMemory" + DREAM_DIARY = "DreamDiary" + + +class DreamActionType(str, Enum): + """Write intents emitted by Dream reasoning.""" + + CREATE = "create" + UPDATE = "update" + MERGE = "merge" + ARCHIVE = "archive" + + +class DreamAction(BaseModel): + """A single write-back instruction produced by Dream reasoning. + + Each action represents one memory mutation (create/update/merge/archive). + The `rationale` field carries the hypothetical-deduction justification: + the Dream must demonstrate that *a concrete question can be answered better* + with this memory present before it is persisted. + """ + + action_type: DreamActionType + target_memory_type: TargetMemoryType + target_memory_id: str | None = None + source_memory_ids: list[str] = Field(default_factory=list) + new_content: str = "" + rationale: str = "" + confidence: float = 0.0 + metadata: dict[str, Any] = Field(default_factory=dict) + + +class DreamMemoryLifecycle(BaseModel): + """Lifecycle tracking metadata attached to Dream-produced memories. + + Used by the periodic maintenance process to decide whether a Dream memory + should be retained, decayed, or archived: + - Long time not hit → decay / archive + - Hit but low usefulness → archive + - Overturned by feedback → immediate archive + """ + + memory_id: str + source_dream_id: str = "" + created_at: datetime = Field(default_factory=datetime.utcnow) + last_hit_at: datetime | None = None + hit_count: int = 0 + usefulness_score: float = 0.0 + invalidated_by_feedback: bool = False + status: str = "active" + + +class DreamSignalSnapshot(BaseModel): + """Normalized Dream trigger payload passed through the scheduler. + + The current plugin ships the newness signal only. Projects that need + recall/conflict/feedback signals should add those fields together with + corresponding producers. + """ + + mem_cube_id: str + user_id: str = "" + user_name: str = "" + session_id: str = "" + pending_memory_ids: list[str] = Field(default_factory=list) + + +class DreamMotive(BaseModel): + """Reason why a Dream run should happen for a cluster of memories.""" + + motive_id: str + motive_type: MotiveType + description: str + memory_ids: list[str] = Field(default_factory=list) + + +class DreamCluster(BaseModel): + """A Dream work unit created from one motive.""" + + cluster_id: str + motive: DreamMotive + recalled_items: list[Any] = Field(default_factory=list) + + +class DreamDiaryEntry(BaseModel): + """Human-readable dream diary entry, aligned with the Dream diary PRD. + + Each entry carries its own identity (`diary_id`) from the moment it is + created, so downstream persistence can store it without generating IDs. + """ + + diary_id: str = Field(default_factory=lambda: f"dream_diary_{uuid4().hex}") + title: str + summary: str + dream_entry: str = "" + motive: dict | None = None + themes: list[str] = Field(default_factory=list) + created_at: datetime = Field(default_factory=datetime.utcnow) + status: str = "completed" + + def format_content(self) -> str: + """Serialize the entry into a single text block for storage.""" + parts = [self.title, self.summary] + if self.dream_entry: + parts.append(self.dream_entry) + return "\n\n".join(parts) + + +class DreamResult(BaseModel): + """Per-cluster Dream output produced by the pipeline. + + Contains both the write-back actions (memory mutations) and the + explainable diary entry for the Dream run. + """ + + cluster_id: str + actions: list[DreamAction] = Field(default_factory=list) + diary_entry: DreamDiaryEntry | None = None diff --git a/src/memos/mem_reader/read_skill_memory/process_skill_memory.py b/src/memos/mem_reader/read_skill_memory/process_skill_memory.py index 0b0c04252..4c93a30c5 100644 --- a/src/memos/mem_reader/read_skill_memory/process_skill_memory.py +++ b/src/memos/mem_reader/read_skill_memory/process_skill_memory.py @@ -332,7 +332,7 @@ def _preprocess_extract_messages( history = history[-20:] if (len(history) + len(messages)) < 10: # TODO: maybe directly return [] - logger.warning("[PROCESS_SKILLS] Not enough messages to extract skill memory") + logger.info("[PROCESS_SKILLS] Not enough messages to extract skill memory") return history, messages @@ -1034,7 +1034,6 @@ def process_skill_memory_fine( chat_history = kwargs.get("chat_history") if not chat_history or not isinstance(chat_history, list): chat_history = [] - logger.warning("[PROCESS_SKILLS] History is None in Skills") messages = _reconstruct_messages_from_memory_items(fast_memory_items) diff --git a/src/memos/mem_scheduler/general_modules/init_components_for_scheduler.py b/src/memos/mem_scheduler/general_modules/init_components_for_scheduler.py index 56614737d..05a50d49c 100644 --- a/src/memos/mem_scheduler/general_modules/init_components_for_scheduler.py +++ b/src/memos/mem_scheduler/general_modules/init_components_for_scheduler.py @@ -285,10 +285,7 @@ def init_components() -> dict[str, Any]: plugin_manager.init_components(plugin_context) # Pass graph_db to mem_reader for recall operations (deduplication, conflict detection) - mem_reader = MemReaderFactory.from_config( - mem_reader_config, - graph_db=graph_db, - ) + mem_reader = MemReaderFactory.from_config(mem_reader_config, graph_db=graph_db) reranker = RerankerFactory.from_config(reranker_config) feedback_reranker = RerankerFactory.from_config(feedback_reranker_config) internet_retriever = InternetRetrieverFactory.from_config( diff --git a/src/memos/mem_scheduler/schemas/task_schemas.py b/src/memos/mem_scheduler/schemas/task_schemas.py index 5b52b6c22..3b5e2f023 100644 --- a/src/memos/mem_scheduler/schemas/task_schemas.py +++ b/src/memos/mem_scheduler/schemas/task_schemas.py @@ -33,6 +33,7 @@ class TaskPriorityLevel(Enum): ADD_TASK_LABEL = "add" MEM_READ_TASK_LABEL = "mem_read" MEM_ORGANIZE_TASK_LABEL = "mem_organize" +MEM_DREAM_TASK_LABEL = "mem_dream" MEM_UPDATE_TASK_LABEL = "mem_update" MEM_ARCHIVE_TASK_LABEL = "mem_archive" API_MIX_SEARCH_TASK_LABEL = "api_mix_search" diff --git a/src/memos/mem_scheduler/task_schedule_modules/handlers/mem_dream_handler.py b/src/memos/mem_scheduler/task_schedule_modules/handlers/mem_dream_handler.py new file mode 100644 index 000000000..901aa6a14 --- /dev/null +++ b/src/memos/mem_scheduler/task_schedule_modules/handlers/mem_dream_handler.py @@ -0,0 +1,156 @@ +from __future__ import annotations + +import json +import threading + +from typing import TYPE_CHECKING, ClassVar + +from memos.log import get_logger +from memos.mem_scheduler.schemas.task_schemas import MEM_DREAM_TASK_LABEL +from memos.mem_scheduler.task_schedule_modules.base_handler import BaseSchedulerHandler +from memos.plugins.hook_defs import H +from memos.plugins.hooks import trigger_single_hook + + +if TYPE_CHECKING: + from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem + + +logger = get_logger(__name__) + + +class MemDreamMessageHandler(BaseSchedulerHandler): + """Thin scheduler entrypoint for Dream tasks. + + The handler intentionally contains no Dream business logic. Its job is only to: + 1. collect a scheduler batch into a single snapshot payload, + 2. guard concurrent execution with a per-cube single-flight lock, and + 3. delegate the real work to the currently active Dream plugin. + """ + + _dream_locks: ClassVar[dict[str, threading.Lock]] = {} + _class_lock: ClassVar[threading.Lock] = threading.Lock() + + @property + def expected_task_label(self) -> str: + return MEM_DREAM_TASK_LABEL + + def batch_handler( + self, user_id: str, mem_cube_id: str, batch: list[ScheduleMessageItem] + ) -> None: + mem_cube = self.scheduler_context.get_mem_cube() + if mem_cube is None: + logger.warning( + "mem_cube is None for user_id=%s, mem_cube_id=%s, skipping dream", + user_id, + mem_cube_id, + ) + return + + text_mem = getattr(mem_cube, "text_mem", None) + if text_mem is None: + logger.warning( + "text_mem is unavailable for user_id=%s, mem_cube_id=%s, skipping dream", + user_id, + mem_cube_id, + ) + return + + user_name = "" + for msg in batch: + user_name = msg.user_name or user_name + + signal_snapshot = self._build_signal_snapshot( + user_id=user_id, + mem_cube_id=mem_cube_id, + user_name=user_name, + batch=batch, + ) + self._run_dream( + user_id=user_id, + mem_cube_id=mem_cube_id, + user_name=user_name, + signal_snapshot=signal_snapshot, + text_mem=text_mem, + ) + + @classmethod + def _get_dream_lock(cls, mem_cube_id: str) -> threading.Lock: + with cls._class_lock: + lock = cls._dream_locks.get(mem_cube_id) + if lock is None: + lock = threading.Lock() + cls._dream_locks[mem_cube_id] = lock + return lock + + def _build_signal_snapshot( + self, + *, + user_id: str, + mem_cube_id: str, + user_name: str, + batch: list[ScheduleMessageItem], + ): + from memos.dream.types import DreamSignalSnapshot + + # The framework accepts batched Dream tasks, but the scaffold only needs + # a single normalized snapshot object. The most recent non-empty payload wins. + snapshot = DreamSignalSnapshot( + mem_cube_id=mem_cube_id, + user_id=user_id, + user_name=user_name, + ) + for msg in batch: + if not msg.content: + continue + try: + payload = json.loads(msg.content) + except json.JSONDecodeError: + logger.warning( + "Invalid dream payload for mem_cube_id=%s, item_id=%s; ignore batch item", + mem_cube_id, + msg.item_id, + ) + continue + try: + snapshot = DreamSignalSnapshot.model_validate(payload) + except Exception: + logger.warning( + "Dream payload schema mismatch for mem_cube_id=%s, item_id=%s; ignore batch item", + mem_cube_id, + msg.item_id, + exc_info=True, + ) + return snapshot + + def _run_dream( + self, + *, + user_id: str, + mem_cube_id: str, + user_name: str, + signal_snapshot, + text_mem, + ) -> None: + lock = self._get_dream_lock(mem_cube_id) + if not lock.acquire(blocking=False): + logger.info( + "[Dream] Already running for mem_cube_id=%s; skip this trigger.", + mem_cube_id, + ) + return + + try: + # The active Dream plugin owns the pipeline. The scheduler only forwards + # the normalized execution context through a single-provider hook. + trigger_single_hook( + H.DREAM_EXECUTE, + mem_cube_id=mem_cube_id, + user_id=user_id, + user_name=user_name, + signal_snapshot=signal_snapshot, + text_mem=text_mem, + scheduler_context=self.scheduler_context, + ) + finally: + lock.release() diff --git a/src/memos/mem_scheduler/task_schedule_modules/handlers/mem_read_handler.py b/src/memos/mem_scheduler/task_schedule_modules/handlers/mem_read_handler.py index e0baf63ff..b85e4ea71 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/handlers/mem_read_handler.py +++ b/src/memos/mem_scheduler/task_schedule_modules/handlers/mem_read_handler.py @@ -5,12 +5,15 @@ import json import traceback +from datetime import datetime, timezone from typing import TYPE_CHECKING from memos.context.context import ContextThreadPoolExecutor from memos.log import get_logger +from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem from memos.mem_scheduler.schemas.task_schemas import ( LONG_TERM_MEMORY_TYPE, + MEM_ORGANIZE_TASK_LABEL, MEM_READ_TASK_LABEL, USER_INPUT_TYPE, ) @@ -23,7 +26,6 @@ logger = get_logger(__name__) if TYPE_CHECKING: - from memos.mem_scheduler.schemas.message_schemas import ScheduleMessageItem from memos.types.general_types import UserContext @@ -211,6 +213,14 @@ def _process_memories_with_reader( ) logger.info("Added %s Rawfile memories.", len(raw_file_mem_group)) + self._try_submit_organize_task( + enhanced_mem_ids=enhanced_mem_ids, + text_mem=text_mem, + user_id=user_id, + mem_cube_id=mem_cube_id, + user_name=user_name, + ) + # fallback to simple deduplication logic when mem version switch is off if getattr(mem_reader, "memory_version_switch", "off") != "on": # Mark merged_from memories as archived when provided in memory metadata @@ -468,3 +478,35 @@ def _process_memories_with_reader( event.task_id = task_id event.status = "failed" self.scheduler_context.services.submit_web_logs([event]) + + def _try_submit_organize_task( + self, + enhanced_mem_ids: list[str], + text_mem: TreeTextMemory, + user_id: str, + mem_cube_id: str, + user_name: str, + ) -> None: + """Submit a MEM_ORGANIZE task so the reorganizer can run optimize_structure.""" + try: + reorganizer = getattr(text_mem.memory_manager, "reorganizer", None) + if not reorganizer or not getattr(reorganizer, "is_reorganize", False): + return + + message_item = ScheduleMessageItem( + user_id=user_id, + mem_cube_id=mem_cube_id, + label=MEM_ORGANIZE_TASK_LABEL, + content=json.dumps(enhanced_mem_ids), + timestamp=datetime.now(tz=timezone.utc), + user_name=user_name, + ) + self.scheduler_context.services.submit_messages([message_item]) + logger.info( + "[mem_read_handler] Submitted MEM_ORGANIZE task for user_id=%s, mem_cube_id=%s, mem_ids=%s", + user_id, + mem_cube_id, + enhanced_mem_ids, + ) + except Exception as e: + logger.error("Failed to enqueue MEM_ORGANIZE task: %s", e, exc_info=True) diff --git a/src/memos/mem_scheduler/task_schedule_modules/registry.py b/src/memos/mem_scheduler/task_schedule_modules/registry.py index f47be933e..538c29fcb 100644 --- a/src/memos/mem_scheduler/task_schedule_modules/registry.py +++ b/src/memos/mem_scheduler/task_schedule_modules/registry.py @@ -11,6 +11,7 @@ from memos.mem_scheduler.schemas.task_schemas import ( ADD_TASK_LABEL, ANSWER_TASK_LABEL, + MEM_DREAM_TASK_LABEL, MEM_FEEDBACK_TASK_LABEL, MEM_ORGANIZE_TASK_LABEL, MEM_READ_TASK_LABEL, @@ -23,6 +24,7 @@ from .handlers.add_handler import AddMessageHandler from .handlers.answer_handler import AnswerMessageHandler from .handlers.feedback_handler import FeedbackMessageHandler +from .handlers.mem_dream_handler import MemDreamMessageHandler from .handlers.mem_read_handler import MemReadMessageHandler from .handlers.mem_reorganize_handler import MemReorganizeMessageHandler from .handlers.memory_update_handler import MemoryUpdateHandler @@ -39,6 +41,7 @@ def __init__(self, scheduler_context: SchedulerHandlerContext) -> None: self.mem_feedback = FeedbackMessageHandler(scheduler_context) self.mem_read = MemReadMessageHandler(scheduler_context) self.mem_reorganize = MemReorganizeMessageHandler(scheduler_context) + self.mem_dream = MemDreamMessageHandler(scheduler_context) self.pref_add = PrefAddMessageHandler(scheduler_context) def build_dispatch_map(self) -> dict[str, Callable | tuple]: @@ -49,6 +52,7 @@ def build_dispatch_map(self) -> dict[str, Callable | tuple]: ADD_TASK_LABEL: (self.add, TaskPriorityLevel.LEVEL_1, None), MEM_READ_TASK_LABEL: self.mem_read, MEM_ORGANIZE_TASK_LABEL: self.mem_reorganize, + MEM_DREAM_TASK_LABEL: self.mem_dream, PREF_ADD_TASK_LABEL: (self.pref_add, None, 600_000), MEM_FEEDBACK_TASK_LABEL: self.mem_feedback, } diff --git a/src/memos/plugins/base.py b/src/memos/plugins/base.py index e486cfc72..b10b7c824 100644 --- a/src/memos/plugins/base.py +++ b/src/memos/plugins/base.py @@ -18,11 +18,16 @@ class MemOSPlugin: Provides three unified registration methods. Plugin developers need only inherit from this class and register capabilities via self.register_* in init_app. + + `priority` is used only when multiple installed distributions expose the + same logical plugin name. In that case the PluginManager keeps the highest + priority implementation and skips the rest. """ name: str = "unnamed" version: str = "0.0.0" description: str = "" + priority: int = 0 _app: FastAPI | None = None diff --git a/src/memos/plugins/component_bootstrap.py b/src/memos/plugins/component_bootstrap.py index 151e37215..5a4e42e95 100644 --- a/src/memos/plugins/component_bootstrap.py +++ b/src/memos/plugins/component_bootstrap.py @@ -7,6 +7,7 @@ def build_plugin_context( *, graph_db: Any, embedder: Any, + llm: Any | None = None, default_cube_config: Any, nli_client_config: dict[str, Any], mem_reader_config: Any, @@ -18,6 +19,10 @@ def build_plugin_context( "shared": { "graph_db": graph_db, "embedder": embedder, + "llm": llm, + "mem_scheduler": None, + "submit_scheduler_messages": None, + "api_module": None, }, "configs": { "default_cube_config": default_cube_config, diff --git a/src/memos/plugins/hook_defs.py b/src/memos/plugins/hook_defs.py index 030d5292f..536068c8d 100644 --- a/src/memos/plugins/hook_defs.py +++ b/src/memos/plugins/hook_defs.py @@ -83,6 +83,9 @@ class H: MEMORY_VERSION_APPLY_UPDATES = "memory_version.apply_updates" MEMORY_VERSION_APPLY_FEEDBACK_UPDATE = "memory_version.apply_feedback_update" + # dream — single-provider business hook + DREAM_EXECUTE = "dream.execute" + # ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ # CE custom Hook declarations (@hookable-generated ones need not be declared here) @@ -130,3 +133,16 @@ class H: description="Apply memory-version update semantics during feedback update", params=["old_item", "new_item", "user_name"], ) + +define_hook( + H.DREAM_EXECUTE, + description=("Execute the active Dream plugin pipeline for a scheduler-triggered dream task"), + params=[ + "mem_cube_id", + "user_id", + "user_name", + "signal_snapshot", + "text_mem", + "scheduler_context", + ], +) diff --git a/src/memos/plugins/manager.py b/src/memos/plugins/manager.py index 5f0397dc1..fb473e0d9 100644 --- a/src/memos/plugins/manager.py +++ b/src/memos/plugins/manager.py @@ -29,6 +29,55 @@ def __init__(self): def plugins(self) -> dict[str, MemOSPlugin]: return dict(self._plugins) + @staticmethod + def _select_plugin_winners( + candidates: list[tuple[str, MemOSPlugin]], + ) -> dict[str, MemOSPlugin]: + """Resolve duplicate logical plugin names by priority. + + Multiple installed distributions may expose the same plugin capability + (for example CE and EE variants of the Dream plugin). In that case we + keep only the highest-priority implementation and skip the rest. + + If the highest priority is shared by more than one plugin implementation, + startup should fail loudly because plugin activation would be ambiguous. + """ + + grouped: dict[str, list[tuple[str, MemOSPlugin]]] = {} + for entry_point_name, plugin in candidates: + grouped.setdefault(plugin.name, []).append((entry_point_name, plugin)) + + winners: dict[str, MemOSPlugin] = {} + for plugin_name, group in grouped.items(): + group.sort(key=lambda item: item[1].priority, reverse=True) + winner_ep_name, winner = group[0] + tied = [item for item in group if item[1].priority == winner.priority] + if len(tied) > 1: + tied_names = ", ".join( + f"{entry_point_name}({plugin.__class__.__name__})" + for entry_point_name, plugin in tied + ) + raise RuntimeError( + "Multiple plugins share the same logical name and highest priority: " + f"name='{plugin_name}', priority={winner.priority}, providers=[{tied_names}]" + ) + + for loser_ep_name, loser in group[1:]: + logger.info( + "Plugin implementation skipped due to lower priority: name=%s, " + "winner=%s(%s, priority=%s), skipped=%s(%s, priority=%s)", + plugin_name, + winner_ep_name, + winner.__class__.__name__, + winner.priority, + loser_ep_name, + loser.__class__.__name__, + loser.priority, + ) + + winners[plugin_name] = winner + return winners + def discover(self) -> None: """Discover and load all installed plugins via entry_points.""" if self._discovered: @@ -44,6 +93,7 @@ def discover(self) -> None: logger.exception("Failed to query entry_points") return + candidates: list[tuple[str, MemOSPlugin]] = [] for ep in plugin_eps: try: plugin_cls = ep.load() @@ -51,12 +101,21 @@ def discover(self) -> None: if not isinstance(plugin, MemOSPlugin): logger.warning("Plugin %s does not extend MemOSPlugin, skipped", ep.name) continue - plugin.on_load() - self._plugins[plugin.name] = plugin - logger.info("Plugin discovered: %s v%s", plugin.name, plugin.version) + candidates.append((ep.name, plugin)) except Exception: logger.exception("Failed to load plugin: %s", ep.name) + winners = self._select_plugin_winners(candidates) + for plugin_name, plugin in winners.items(): + plugin.on_load() + self._plugins[plugin_name] = plugin + logger.info( + "Plugin discovered: %s v%s (priority=%s)", + plugin.name, + plugin.version, + plugin.priority, + ) + self._discovered = True def init_components(self, context: dict) -> None: