Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,8 @@ func TestProcessMessage_BtwCommandRunsWithoutPersistingHistory(t *testing.T) {
defaultAgent.Sessions.SetHistory(sessionKey, initialHistory)
defaultAgent.Sessions.SetSummary(sessionKey, "The team decided to keep state request-scoped.")

initialHistory = defaultAgent.Sessions.GetHistory(sessionKey)

response, err := al.processMessage(context.Background(), msg)
if err != nil {
t.Fatalf("processMessage() error = %v", err)
Expand Down Expand Up @@ -488,6 +490,8 @@ func TestProcessMessage_BtwCommandUsesIsolatedProvider(t *testing.T) {
}
defaultAgent.Sessions.SetHistory(mainSessionKey, initialHistory)

initialHistory = defaultAgent.Sessions.GetHistory(mainSessionKey)

// Process a /btw command
response, err := al.processMessage(context.Background(), bus.InboundMessage{
Channel: "telegram",
Expand Down
2 changes: 2 additions & 0 deletions pkg/agent/steering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1306,6 +1306,8 @@ func TestAgentLoop_InterruptHard_RestoresSession(t *testing.T) {
}
defaultAgent.Sessions.SetHistory(sessionKey, originalHistory)

originalHistory = defaultAgent.Sessions.GetHistory(sessionKey)

runtimeCh, closeRuntimeEvents := subscribeRuntimeEventsForTest(
t,
al,
Expand Down
17 changes: 16 additions & 1 deletion pkg/agent/turn_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,10 +511,25 @@ func (ts *turnState) restoreSession(agent *AgentInstance) error {
return agent.Sessions.Save(ts.sessionKey)
}

// messagesContentEqual compares two message slices by content only, ignoring CreatedAt.
// JSON roundtrip loses the monotonic clock portion of time.Time, so direct
// reflect.DeepEqual would always differ on messages that roundtripped through
// the JSONL store.
func messagesContentEqual(a, b []providers.Message) bool {
for i := range a {
aCopy, bCopy := a[i], b[i]
aCopy.CreatedAt, bCopy.CreatedAt = nil, nil
if !reflect.DeepEqual(aCopy, bCopy) {
return false
}
}
return true
}

func matchingTurnMessageTail(history, persisted []providers.Message) int {
maxMatch := min(len(history), len(persisted))
for size := maxMatch; size > 0; size-- {
if reflect.DeepEqual(history[len(history)-size:], persisted[len(persisted)-size:]) {
if messagesContentEqual(history[len(history)-size:], persisted[len(persisted)-size:]) {
return size
}
}
Expand Down
13 changes: 12 additions & 1 deletion pkg/memory/jsonl.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,12 @@ func (s *JSONLStore) addMsg(sessionKey string, msg providers.Message) error {
l.Lock()
defer l.Unlock()

now := time.Now()

if msg.CreatedAt == nil {
msg.CreatedAt = &now
}

// Append the message as a single JSON line.
line, err := json.Marshal(msg)
if err != nil {
Expand Down Expand Up @@ -598,7 +604,6 @@ func (s *JSONLStore) addMsg(sessionKey string, msg providers.Message) error {
if err != nil {
return err
}
now := time.Now()
if meta.Count == 0 && meta.CreatedAt.IsZero() {
meta.CreatedAt = now
}
Expand Down Expand Up @@ -726,6 +731,12 @@ func (s *JSONLStore) SetHistory(
meta.Count = len(history)
meta.UpdatedAt = now

for i := range history {
if history[i].CreatedAt == nil {
history[i].CreatedAt = &now
}
}

// Write meta BEFORE rewriting the JSONL file. If we crash between
// the two writes, meta has Skip=0 and the old file is still intact,
// so GetHistory reads from line 1 — returning "too many" messages
Expand Down
137 changes: 137 additions & 0 deletions pkg/memory/jsonl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1032,6 +1032,143 @@ func TestMultipleSessions_Isolation(t *testing.T) {
}
}

func TestStore_SetsCreatedAtWhenNil(t *testing.T) {
type writeOp struct {
name string
fn func(store *JSONLStore, key string) (expectedCount int)
}

ops := []writeOp{
{
name: "AddMessage",
fn: func(store *JSONLStore, key string) int {
if err := store.AddMessage(context.Background(), key, "user", "hello"); err != nil {
t.Fatalf("AddMessage: %v", err)
}
return 1
},
},
{
name: "AddFullMessage",
fn: func(store *JSONLStore, key string) int {
if err := store.AddFullMessage(context.Background(), key, providers.Message{
Role: "user",
Content: "hello from full",
}); err != nil {
t.Fatalf("AddFullMessage: %v", err)
}
return 1
},
},
{
name: "SetHistory",
fn: func(store *JSONLStore, key string) int {
if err := store.SetHistory(context.Background(), key, []providers.Message{
{Role: "user", Content: "msg1"},
{Role: "assistant", Content: "msg2"},
}); err != nil {
t.Fatalf("SetHistory: %v", err)
}
return 2
},
},
}

for _, op := range ops {
t.Run(op.name, func(t *testing.T) {
store := newTestStore(t)
key := "s1"

before := time.Now().Add(-time.Second)
expectedCount := op.fn(store, key)
after := time.Now().Add(time.Second)

history, err := store.GetHistory(context.Background(), key)
if err != nil {
t.Fatalf("GetHistory: %v", err)
}
if len(history) != expectedCount {
t.Fatalf("expected %d messages, got %d", expectedCount, len(history))
}
for i := range history {
if history[i].CreatedAt == nil || history[i].CreatedAt.IsZero() {
t.Errorf("message %d CreatedAt is zero — not set by %s", i, op.name)
}
if history[i].CreatedAt.Before(before) || history[i].CreatedAt.After(after) {
t.Errorf(
"message %d CreatedAt %v outside expected window [%v, %v]",
i, history[i].CreatedAt, before, after,
)
}
}
})
}
}

func TestStore_PreservesExistingCreatedAt(t *testing.T) {
t1 := time.Date(2026, 1, 1, 10, 0, 0, 0, time.UTC)
t2 := time.Date(2026, 1, 1, 11, 0, 0, 0, time.UTC)

type writeOp struct {
name string
fn func(store *JSONLStore, key string)
wantTimes []time.Time
}

ops := []writeOp{
{
name: "AddFullMessage",
fn: func(store *JSONLStore, key string) {
if err := store.AddFullMessage(context.Background(), key, providers.Message{
Role: "user",
Content: "custom time",
CreatedAt: &t1,
}); err != nil {
t.Fatalf("AddFullMessage: %v", err)
}
},
wantTimes: []time.Time{t1},
},
{
name: "SetHistory",
fn: func(store *JSONLStore, key string) {
if err := store.SetHistory(context.Background(), key, []providers.Message{
{Role: "user", Content: "msg1", CreatedAt: &t1},
{Role: "assistant", Content: "msg2", CreatedAt: &t2},
}); err != nil {
t.Fatalf("SetHistory: %v", err)
}
},
wantTimes: []time.Time{t1, t2},
},
}

for _, op := range ops {
t.Run(op.name, func(t *testing.T) {
store := newTestStore(t)
key := "s1"

op.fn(store, key)

history, err := store.GetHistory(context.Background(), key)
if err != nil {
t.Fatalf("GetHistory: %v", err)
}
if len(history) != len(op.wantTimes) {
t.Fatalf("expected %d messages, got %d", len(op.wantTimes), len(history))
}
for i, want := range op.wantTimes {
if history[i].CreatedAt == nil || !history[i].CreatedAt.Equal(want) {
t.Errorf(
"message %d CreatedAt = %v, want %v (should preserve caller-provided time)",
i, history[i].CreatedAt, want,
)
}
}
})
}
}

func BenchmarkAddMessage(b *testing.B) {
dir := b.TempDir()
store, err := NewJSONLStore(dir)
Expand Down
3 changes: 3 additions & 0 deletions pkg/providers/protocoltypes/types.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package protocoltypes

import "time"

type ToolCall struct {
ID string `json:"id"`
Type string `json:"type,omitempty"`
Expand Down Expand Up @@ -81,6 +83,7 @@ type Attachment struct {
type Message struct {
Role string `json:"role"`
Content string `json:"content"`
CreatedAt *time.Time `json:"created_at,omitempty"`
Media []string `json:"media,omitempty"`
Attachments []Attachment `json:"attachments,omitempty"`
ReasoningContent string `json:"reasoning_content,omitempty"`
Expand Down
15 changes: 13 additions & 2 deletions pkg/session/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,13 @@ func (sm *SessionManager) AddFullMessage(sessionKey string, msg providers.Messag
sm.sessions[sessionKey] = session
}

now := time.Now()
if msg.CreatedAt == nil {
msg.CreatedAt = &now
}

session.Messages = append(session.Messages, msg)
session.Updated = time.Now()
session.Updated = now
}

func (sm *SessionManager) GetHistory(key string) []providers.Message {
Expand Down Expand Up @@ -300,7 +305,13 @@ func (sm *SessionManager) SetHistory(key string, history []providers.Message) {
// from the caller's slice.
msgs := make([]providers.Message, len(history))
copy(msgs, history)
now := time.Now()
for i := range msgs {
if msgs[i].CreatedAt == nil {
msgs[i].CreatedAt = &now
}
}
session.Messages = msgs
session.Updated = time.Now()
session.Updated = now
}
}
27 changes: 20 additions & 7 deletions web/backend/api/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type sessionChatMessage struct {
Role string `json:"role"`
Content string `json:"content"`
Kind string `json:"kind,omitempty"`
CreatedAt *time.Time `json:"created_at,omitempty"`
Media []string `json:"media,omitempty"`
Attachments []sessionChatAttachment `json:"attachments,omitempty"`
ToolCalls []utils.VisibleToolCall `json:"tool_calls,omitempty"`
Expand Down Expand Up @@ -510,6 +511,7 @@ func sessionTranscriptMessages(
chatMsg := sessionChatMessage{
Role: "user",
Content: msg.Content,
CreatedAt: msg.CreatedAt,
Media: append([]string(nil), msg.Media...),
Attachments: attachments,
}
Expand All @@ -530,8 +532,9 @@ func sessionTranscriptMessages(
toolCallsMsg, hasToolCallsMsg := assistantToolCallsMessage(
msg.ToolCalls,
toolFeedbackMaxArgsLength,
msg.CreatedAt,
)
visibleToolMessages := visibleAssistantToolMessages(msg.ToolCalls)
visibleToolMessages := visibleAssistantToolMessages(msg.ToolCalls, msg.CreatedAt)

// Pico web chat can persist both visible `message` tool output and a
// later plain assistant reply in the same turn. Hide only the fixed
Expand All @@ -556,6 +559,7 @@ func sessionTranscriptMessages(
chatMsg := sessionChatMessage{
Role: "assistant",
Content: content,
CreatedAt: msg.CreatedAt,
Media: append([]string(nil), msg.Media...),
Attachments: attachments,
}
Expand Down Expand Up @@ -682,15 +686,17 @@ func assistantThoughtMessage(msg providers.Message) (sessionChatMessage, bool) {
return sessionChatMessage{}, false
}
return sessionChatMessage{
Role: "assistant",
Content: reasoning,
Kind: "thought",
Role: "assistant",
Content: reasoning,
Kind: "thought",
CreatedAt: msg.CreatedAt,
}, true
}

func assistantToolCallsMessage(
toolCalls []providers.ToolCall,
toolFeedbackMaxArgsLength int,
createdAt *time.Time,
) (sessionChatMessage, bool) {
if len(toolCalls) == 0 {
return sessionChatMessage{}, false
Expand All @@ -707,6 +713,7 @@ func assistantToolCallsMessage(
return sessionChatMessage{
Role: "assistant",
Kind: "tool_calls",
CreatedAt: createdAt,
ToolCalls: visibleToolCalls,
}, true
}
Expand All @@ -718,7 +725,7 @@ func visibleAssistantToolArgsPreview(
return utils.VisibleToolCallArgumentsPreview(tc, toolFeedbackMaxArgsLength)
}

func visibleAssistantToolMessages(toolCalls []providers.ToolCall) []sessionChatMessage {
func visibleAssistantToolMessages(toolCalls []providers.ToolCall, createdAt *time.Time) []sessionChatMessage {
if len(toolCalls) == 0 {
return nil
}
Expand All @@ -734,8 +741,9 @@ func visibleAssistantToolMessages(toolCalls []providers.ToolCall) []sessionChatM
continue
}
messages = append(messages, sessionChatMessage{
Role: "assistant",
Content: content,
Role: "assistant",
Content: content,
CreatedAt: createdAt,
})
}

Expand Down Expand Up @@ -918,6 +926,11 @@ func (h *Handler) handleGetSession(w http.ResponseWriter, r *http.Request) {
}
}

for i := range sess.Messages {
if sess.Messages[i].CreatedAt == nil {
sess.Messages[i].CreatedAt = &sess.Updated
}
}
messages := detailSessionMessages(sess.Messages, toolFeedbackMaxArgsLength)

w.Header().Set("Content-Type", "application/json")
Expand Down
1 change: 1 addition & 0 deletions web/frontend/src/api/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export interface SessionDetail {
messages: {
role: "user" | "assistant"
content: string
created_at?: string
kind?: "normal" | "thought" | "tool_calls"
media?: string[]
attachments?: {
Expand Down
Loading