diff --git a/api/eventlog/store/barrel.go b/api/eventlog/store/barrel.go index 616ef7b6c..1c81c4747 100644 --- a/api/eventlog/store/barrel.go +++ b/api/eventlog/store/barrel.go @@ -19,6 +19,7 @@ package store import ( + "encoding/json" "errors" "github.com/goodrain/rainbond/api/eventlog/db" "sync" @@ -110,8 +111,23 @@ type readEventBarrel struct { subLock sync.Mutex updateTime time.Time // 新增字段 - eventID string // 事件ID - fileStore FileStore // 文件存储 + eventID string // 事件ID + fileStore FileStore // 文件存储 +} + +// rebuildMessageContent 从结构化字段重建Content(用于从文件读取的消息) +func rebuildMessageContent(m *db.EventLogMessage) { + data := map[string]string{ + "event_id": m.EventID, + "step": m.Step, + "status": m.Status, + "message": m.Message, + "level": m.Level, + "time": m.Time, + } + if content, err := json.Marshal(data); err == nil { + m.Content = content + } } func (r *readEventBarrel) empty() { @@ -155,31 +171,30 @@ func (r *readEventBarrel) insertMessage(message *db.EventLogMessage) { } func (r *readEventBarrel) pushCashMessage(ch chan *db.EventLogMessage, subID string) { - // 从文件读取历史消息并推送 + // 先注册订阅通道,确保实时消息不会丢失 + r.subLock.Lock() + r.subSocketChan[subID] = ch + r.subLock.Unlock() + + // 再从文件读取历史消息并推送 if r.fileStore != nil && r.eventID != "" { - // 只推送最近1000条,避免推送过多 messages, err := r.fileStore.ReadLast(r.eventID, 1000) if err != nil { logrus.Errorf("Failed to read history for event %s: %v", r.eventID, err) } else { - // 推送历史消息 for _, m := range messages { + if m.Content == nil { + rebuildMessageContent(m) + } select { case ch <- m: case <-time.After(5 * time.Second): - // 超时保护,避免阻塞 logrus.Warnf("Timeout pushing history for event %s", r.eventID) - goto done + return } } } } - -done: - // 注册订阅通道 - r.subLock.Lock() - defer r.subLock.Unlock() - r.subSocketChan[subID] = ch } // 增加socket订阅 @@ -191,7 +206,7 @@ func (r *readEventBarrel) addSubChan(subID string) chan *db.EventLogMessage { } r.subLock.Unlock() - ch := make(chan *db.EventLogMessage, 10) + ch := make(chan *db.EventLogMessage, 1024) // 异步推送历史消息(从文件读取) go r.pushCashMessage(ch, subID) return ch @@ -268,29 +283,30 @@ func (r *dockerLogEventBarrel) insertMessage(message *db.EventLogMessage) { } func (r *dockerLogEventBarrel) pushCashMessage(ch chan *db.EventLogMessage, subID string) { - // 从文件读取历史消息并推送 + // 先注册订阅通道,确保实时消息不会丢失 + r.subLock.Lock() + r.subSocketChan[subID] = ch + r.subLock.Unlock() + + // 再从文件读取历史消息并推送 if r.fileStore != nil && r.name != "" { - // Docker日志可能很多,只推送最近1000条 messages, err := r.fileStore.ReadLast(r.name, 1000) if err != nil { logrus.Errorf("Failed to read docker log history for %s: %v", r.name, err) } else { for _, m := range messages { + if m.Content == nil { + rebuildMessageContent(m) + } select { case ch <- m: case <-time.After(5 * time.Second): logrus.Warnf("Timeout pushing docker log history for %s", r.name) - goto done + return } } } } - -done: - // 注册订阅 - r.subLock.Lock() - defer r.subLock.Unlock() - r.subSocketChan[subID] = ch } // 增加socket订阅 diff --git a/api/eventlog/store/manager.go b/api/eventlog/store/manager.go index 7692e45c9..c51ae7b6a 100644 --- a/api/eventlog/store/manager.go +++ b/api/eventlog/store/manager.go @@ -337,9 +337,7 @@ func (s *storeManager) parsingMessage(msg []byte, messageType string) (*db2.Even } //message := s.pool.Get().(*db.EventLogMessage)不能使用对象池,会阻塞进程 var message db2.EventLogMessage - // 不再保存原始Content字节数组,避免内存冗余占用 - // 反序列化后,字段已经被解析,不需要再保留原始数据 - // message.Content = msg + message.Content = msg if messageType == "json" { err := ffjson.Unmarshal(msg, &message) if err != nil {