Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 39 additions & 23 deletions api/eventlog/store/barrel.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package store

import (
"encoding/json"
"errors"
"github.com/goodrain/rainbond/api/eventlog/db"
"sync"
Expand Down Expand Up @@ -110,8 +111,23 @@ type readEventBarrel struct {
subLock sync.Mutex
updateTime time.Time
// 新增字段
eventID string // 事件ID
fileStore FileStore // 文件存储
eventID string // 事件ID
fileStore FileStore // 文件存储
}

// rebuildMessageContent 从结构化字段重建Content(用于从文件读取的消息)
func rebuildMessageContent(m *db.EventLogMessage) {
data := map[string]string{
"event_id": m.EventID,
"step": m.Step,
"status": m.Status,
"message": m.Message,
"level": m.Level,
"time": m.Time,
}
if content, err := json.Marshal(data); err == nil {
m.Content = content
}
}

func (r *readEventBarrel) empty() {
Expand Down Expand Up @@ -155,31 +171,30 @@ func (r *readEventBarrel) insertMessage(message *db.EventLogMessage) {
}

func (r *readEventBarrel) pushCashMessage(ch chan *db.EventLogMessage, subID string) {
// 从文件读取历史消息并推送
// 先注册订阅通道,确保实时消息不会丢失
r.subLock.Lock()
r.subSocketChan[subID] = ch
r.subLock.Unlock()

// 再从文件读取历史消息并推送
if r.fileStore != nil && r.eventID != "" {
// 只推送最近1000条,避免推送过多
messages, err := r.fileStore.ReadLast(r.eventID, 1000)
if err != nil {
logrus.Errorf("Failed to read history for event %s: %v", r.eventID, err)
} else {
// 推送历史消息
for _, m := range messages {
if m.Content == nil {
rebuildMessageContent(m)
}
select {
case ch <- m:
case <-time.After(5 * time.Second):
// 超时保护,避免阻塞
logrus.Warnf("Timeout pushing history for event %s", r.eventID)
goto done
return
}
}
}
}

done:
// 注册订阅通道
r.subLock.Lock()
defer r.subLock.Unlock()
r.subSocketChan[subID] = ch
}

// 增加socket订阅
Expand All @@ -191,7 +206,7 @@ func (r *readEventBarrel) addSubChan(subID string) chan *db.EventLogMessage {
}
r.subLock.Unlock()

ch := make(chan *db.EventLogMessage, 10)
ch := make(chan *db.EventLogMessage, 1024)
// 异步推送历史消息(从文件读取)
go r.pushCashMessage(ch, subID)
return ch
Expand Down Expand Up @@ -268,29 +283,30 @@ func (r *dockerLogEventBarrel) insertMessage(message *db.EventLogMessage) {
}

func (r *dockerLogEventBarrel) pushCashMessage(ch chan *db.EventLogMessage, subID string) {
// 从文件读取历史消息并推送
// 先注册订阅通道,确保实时消息不会丢失
r.subLock.Lock()
r.subSocketChan[subID] = ch
r.subLock.Unlock()

// 再从文件读取历史消息并推送
if r.fileStore != nil && r.name != "" {
// Docker日志可能很多,只推送最近1000条
messages, err := r.fileStore.ReadLast(r.name, 1000)
if err != nil {
logrus.Errorf("Failed to read docker log history for %s: %v", r.name, err)
} else {
for _, m := range messages {
if m.Content == nil {
rebuildMessageContent(m)
}
select {
case ch <- m:
case <-time.After(5 * time.Second):
logrus.Warnf("Timeout pushing docker log history for %s", r.name)
goto done
return
}
}
}
}

done:
// 注册订阅
r.subLock.Lock()
defer r.subLock.Unlock()
r.subSocketChan[subID] = ch
}

// 增加socket订阅
Expand Down
4 changes: 1 addition & 3 deletions api/eventlog/store/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,9 +337,7 @@ func (s *storeManager) parsingMessage(msg []byte, messageType string) (*db2.Even
}
//message := s.pool.Get().(*db.EventLogMessage)不能使用对象池,会阻塞进程
var message db2.EventLogMessage
// 不再保存原始Content字节数组,避免内存冗余占用
// 反序列化后,字段已经被解析,不需要再保留原始数据
// message.Content = msg
message.Content = msg
if messageType == "json" {
err := ffjson.Unmarshal(msg, &message)
if err != nil {
Expand Down