-
Notifications
You must be signed in to change notification settings - Fork 224
perf: Resurrection Kael #1675
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
perf: Resurrection Kael #1675
Changes from 7 commits
a7daa31
2799eb5
45bc9cf
570e922
505035d
73c055a
e047b48
89d109e
fca0685
cb0fa17
b2183d1
0825d08
d61b7ca
3eba711
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,163 +1,259 @@ | ||
| package httpd | ||
|
|
||
| import ( | ||
| "context" | ||
| "encoding/json" | ||
| "fmt" | ||
| "github.com/jumpserver/koko/pkg/common" | ||
| "github.com/jumpserver/koko/pkg/i18n" | ||
| "github.com/jumpserver/koko/pkg/logger" | ||
| "github.com/jumpserver/koko/pkg/proxy" | ||
| "github.com/jumpserver/koko/pkg/session" | ||
| "github.com/sashabaranov/go-openai" | ||
| "sync" | ||
| "time" | ||
|
|
||
| "github.com/jumpserver/koko/pkg/jms-sdk-go/model" | ||
| "github.com/jumpserver/koko/pkg/logger" | ||
| "github.com/jumpserver/koko/pkg/srvconn" | ||
| ) | ||
|
|
||
| var _ Handler = (*chat)(nil) | ||
|
|
||
| type chat struct { | ||
| ws *UserWebsocket | ||
| ws *UserWebsocket | ||
| term *model.TerminalConfig | ||
|
|
||
| conversationMap sync.Map | ||
|
|
||
| termConf *model.TerminalConfig | ||
| // conversationMap: map[conversationID]*AIConversation | ||
| conversations sync.Map | ||
| } | ||
|
|
||
| func (h *chat) Name() string { | ||
| return ChatName | ||
| } | ||
|
|
||
| func (h *chat) CleanUp() { | ||
| h.CleanConversationMap() | ||
| } | ||
| func (h *chat) CleanUp() { h.cleanupAll() } | ||
|
|
||
| func (h *chat) CheckValidation() error { | ||
| return nil | ||
| } | ||
|
|
||
| func (h *chat) HandleMessage(msg *Message) { | ||
| conversationID := msg.Id | ||
| conversation := &AIConversation{} | ||
|
|
||
| if conversationID == "" { | ||
| id := common.UUID() | ||
| conversation = &AIConversation{ | ||
| Id: id, | ||
| Prompt: msg.Prompt, | ||
| HistoryRecords: make([]string, 0), | ||
| InterruptCurrentChat: false, | ||
| } | ||
| if msg.Interrupt { | ||
| h.interrupt(msg.Id) | ||
| return | ||
| } | ||
|
|
||
| // T000 Currently a websocket connection only retains one conversation | ||
| h.CleanConversationMap() | ||
| h.conversationMap.Store(id, conversation) | ||
| } else { | ||
| c, ok := h.conversationMap.Load(conversationID) | ||
| if !ok { | ||
| logger.Errorf("Ws[%s] conversation %s not found", h.ws.Uuid, conversationID) | ||
| h.sendErrorMessage(conversationID, "conversation not found") | ||
| return | ||
| conv, err := h.getOrCreateConversation(msg) | ||
| if err != nil { | ||
| h.sendError(msg.Id, err.Error()) | ||
| return | ||
| } | ||
| conv.Question = msg.Data | ||
| conv.NewDialogue = true | ||
|
|
||
| go h.runChat(conv) | ||
| } | ||
|
|
||
| func (h *chat) getOrCreateConversation(msg *Message) (*AIConversation, error) { | ||
| if msg.Id != "" { | ||
| if v, ok := h.conversations.Load(msg.Id); ok { | ||
| return v.(*AIConversation), nil | ||
| } | ||
| conversation = c.(*AIConversation) | ||
| return nil, fmt.Errorf("conversation %s not found", msg.Id) | ||
| } | ||
|
|
||
| if msg.Interrupt { | ||
| conversation.InterruptCurrentChat = true | ||
| return | ||
| jmsSrv, err := proxy.NewChatJMSServer( | ||
| h.ws.user.String(), h.ws.ClientIP(), | ||
| h.ws.user.ID, h.ws.langCode, h.ws.apiClient, h.term, | ||
| ) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("create JMS server: %w", err) | ||
| } | ||
|
|
||
| openAIParam := &OpenAIParam{ | ||
| AuthToken: h.termConf.GptApiKey, | ||
| BaseURL: h.termConf.GptBaseUrl, | ||
| Proxy: h.termConf.GptProxy, | ||
| Model: h.termConf.GptModel, | ||
| Prompt: conversation.Prompt, | ||
| sess := session.NewSession(jmsSrv.Session, h.sessionCallback) | ||
| session.AddSession(sess) | ||
|
|
||
| conv := &AIConversation{ | ||
| Id: jmsSrv.Session.ID, | ||
| Prompt: msg.Prompt, | ||
| Model: msg.ChatModel, | ||
| Context: make([]QARecord, 0), | ||
| JMSServer: jmsSrv, | ||
| } | ||
| h.conversations.Store(jmsSrv.Session.ID, conv) | ||
| go h.Monitor(conv) | ||
| return conv, nil | ||
| } | ||
|
|
||
| func (h *chat) sessionCallback(task *model.TerminalTask) error { | ||
| if task.Name == model.TaskKillSession { | ||
| h.endConversation(task.Args, "close", "kill session") | ||
| return nil | ||
| } | ||
| conversation.HistoryRecords = append(conversation.HistoryRecords, msg.Data) | ||
| go h.chat(openAIParam, conversation) | ||
| return fmt.Errorf("unknown session task %s", task.Name) | ||
| } | ||
|
|
||
| func (h *chat) chat( | ||
| chatGPTParam *OpenAIParam, conversation *AIConversation, | ||
| ) string { | ||
| doneCh := make(chan string) | ||
| answerCh := make(chan string) | ||
| defer close(doneCh) | ||
| defer close(answerCh) | ||
| func (h *chat) runChat(conv *AIConversation) { | ||
| ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) | ||
| defer cancel() | ||
|
|
||
| c := srvconn.NewOpenAIClient( | ||
| chatGPTParam.AuthToken, | ||
| chatGPTParam.BaseURL, | ||
| chatGPTParam.Proxy, | ||
| client := srvconn.NewOpenAIClient( | ||
| h.term.GptApiKey, h.term.GptBaseUrl, h.term.GptProxy, | ||
| ) | ||
|
|
||
| startIndex := len(conversation.HistoryRecords) - 15 | ||
| if startIndex < 0 { | ||
| startIndex = 0 | ||
| // Keep the last 8 contexts | ||
| if len(conv.Context) > 8 { | ||
| conv.Context = conv.Context[len(conv.Context)-8:] | ||
| } | ||
| contents := conversation.HistoryRecords[startIndex:] | ||
| messages := buildChatMessages(conv) | ||
|
|
||
| openAIConn := &srvconn.OpenAIConn{ | ||
| Id: conversation.Id, | ||
| Client: c, | ||
| Prompt: chatGPTParam.Prompt, | ||
| Model: chatGPTParam.Model, | ||
| Contents: contents, | ||
| chatModel := conv.Model | ||
| if conv.Model == "" { | ||
| chatModel = h.term.GptModel | ||
| } | ||
|
|
||
| conn := &srvconn.OpenAIConn{ | ||
| Id: conv.Id, | ||
| Client: client, | ||
| Prompt: conv.Prompt, | ||
| Model: chatModel, | ||
| Question: conv.Question, | ||
| Context: messages, | ||
| AnswerCh: make(chan string), | ||
| DoneCh: make(chan string), | ||
| IsReasoning: false, | ||
| AnswerCh: answerCh, | ||
| DoneCh: doneCh, | ||
| Type: h.termConf.ChatAIType, | ||
| Type: h.term.ChatAIType, | ||
| } | ||
|
|
||
| go openAIConn.Chat(&conversation.InterruptCurrentChat) | ||
| return h.processChatMessages(openAIConn) | ||
| // 启动 streaming | ||
| go conn.Chat(&conv.InterruptCurrentChat) | ||
|
|
||
| conv.JMSServer.Replay.WriteInput(conv.Question) | ||
|
|
||
| h.streamResponses(ctx, conv, conn) | ||
| } | ||
|
|
||
| func buildChatMessages(conv *AIConversation) []openai.ChatCompletionMessage { | ||
| msgs := make([]openai.ChatCompletionMessage, 0, len(conv.Context)*2) | ||
| for _, r := range conv.Context { | ||
| msgs = append(msgs, | ||
| openai.ChatCompletionMessage{Role: openai.ChatMessageRoleUser, Content: r.Question}, | ||
| openai.ChatCompletionMessage{Role: openai.ChatMessageRoleAssistant, Content: r.Answer}, | ||
| ) | ||
| } | ||
| return msgs | ||
| } | ||
|
|
||
| func (h *chat) processChatMessages( | ||
| openAIConn *srvconn.OpenAIConn, | ||
| ) string { | ||
| messageID := common.UUID() | ||
| id := openAIConn.Id | ||
| func (h *chat) streamResponses( | ||
| ctx context.Context, conv *AIConversation, conn *srvconn.OpenAIConn, | ||
| ) { | ||
| msgID := common.UUID() | ||
| for { | ||
| select { | ||
| case answer := <-openAIConn.AnswerCh: | ||
| h.sendSessionMessage(id, answer, messageID, "message", openAIConn.IsReasoning) | ||
| case answer := <-openAIConn.DoneCh: | ||
| h.sendSessionMessage(id, answer, messageID, "finish", false) | ||
| return answer | ||
| case <-ctx.Done(): | ||
| h.sendError(conv.Id, "chat timeout") | ||
| return | ||
| case ans := <-conn.AnswerCh: | ||
| h.sendMessage(conv.Id, msgID, ans, "message", conn.IsReasoning) | ||
| case ans := <-conn.DoneCh: | ||
| h.sendMessage(conv.Id, msgID, ans, "finish", false) | ||
| h.finalizeConversation(conv, ans) | ||
| return | ||
| } | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are no significant coding changes detected. The function /// Handles incoming messages from GPT APIs.
func (h *chat) HandleMessage(msg *Message) {
return
}
...Remember to review these changes based on your team's guidelines. Please note that you should have an understanding of how functions handle parameters ( |
||
| } | ||
|
|
||
| func (h *chat) sendSessionMessage(id, answer, messageID, messageType string, isReasoning bool) { | ||
| message := ChatGPTMessage{ | ||
| Content: answer, | ||
| ID: messageID, | ||
| func (h *chat) finalizeConversation(conv *AIConversation, fullAnswer string) { | ||
| runes := []rune(fullAnswer) | ||
| snippet := fullAnswer | ||
| if len(runes) > 100 { | ||
| snippet = string(runes[:100]) | ||
| } | ||
| conv.Context = append(conv.Context, QARecord{Question: conv.Question, Answer: snippet}) | ||
|
|
||
| cmd := conv.JMSServer.GenerateCommandItem(h.ws.user.String(), conv.Question, fullAnswer) | ||
| go conv.JMSServer.CmdR.Record(cmd) | ||
| go conv.JMSServer.Replay.WriteOutput(fullAnswer) | ||
| } | ||
|
|
||
| func (h *chat) sendMessage( | ||
| convID, msgID, content, typ string, reasoning bool, | ||
| ) { | ||
| msg := ChatGPTMessage{ | ||
| Content: content, | ||
| ID: msgID, | ||
| CreateTime: time.Now(), | ||
| Type: messageType, | ||
| Type: typ, | ||
| Role: openai.ChatMessageRoleAssistant, | ||
| IsReasoning: isReasoning, | ||
| IsReasoning: reasoning, | ||
| } | ||
| data, _ := json.Marshal(message) | ||
| msg := Message{ | ||
| Id: id, | ||
| Type: "message", | ||
| Data: string(data), | ||
| data, _ := json.Marshal(msg) | ||
| h.ws.SendMessage(&Message{Id: convID, Type: "message", Data: string(data)}) | ||
| } | ||
|
|
||
| func (h *chat) sendError(convID, errMsg string) { | ||
| h.endConversation(convID, "error", errMsg) | ||
| } | ||
|
|
||
| func (h *chat) endConversation(convID, typ, msg string) { | ||
|
|
||
| defer func() { | ||
| if r := recover(); r != nil { | ||
| logger.Errorf("panic while sending message to session %s: %v", convID, r) | ||
| } | ||
| }() | ||
|
|
||
| if v, ok := h.conversations.Load(convID); ok { | ||
| if conv, ok2 := v.(*AIConversation); ok2 && conv.JMSServer != nil { | ||
| conv.JMSServer.Close(msg) | ||
| } | ||
| } | ||
| h.ws.SendMessage(&msg) | ||
| h.conversations.Delete(convID) | ||
| h.ws.SendMessage(&Message{Id: convID, Type: typ, Data: msg}) | ||
| } | ||
|
|
||
| func (h *chat) sendErrorMessage(id, message string) { | ||
| msg := Message{ | ||
| Id: id, | ||
| Type: "error", | ||
| Data: message, | ||
| func (h *chat) interrupt(convID string) { | ||
| if v, ok := h.conversations.Load(convID); ok { | ||
| v.(*AIConversation).InterruptCurrentChat = true | ||
| } | ||
| h.ws.SendMessage(&msg) | ||
| } | ||
|
|
||
| func (h *chat) CleanConversationMap() { | ||
| h.conversationMap.Range(func(key, value interface{}) bool { | ||
| h.conversationMap.Delete(key) | ||
| func (h *chat) cleanupAll() { | ||
| h.conversations.Range(func(key, _ interface{}) bool { | ||
| h.endConversation(key.(string), "close", "") | ||
| return true | ||
| }) | ||
| } | ||
|
|
||
| func (h *chat) Monitor(conv *AIConversation) { | ||
| lang := i18n.NewLang(h.ws.langCode) | ||
|
|
||
| lastActiveTime := time.Now() | ||
| maxIdleTime := time.Duration(h.term.MaxIdleTime) * time.Minute | ||
| MaxSessionTime := time.Now().Add(time.Duration(h.term.MaxSessionTime) * time.Hour) | ||
|
|
||
| for { | ||
| now := time.Now() | ||
| if MaxSessionTime.Before(now) { | ||
| msg := lang.T("Session max time reached, disconnect") | ||
| logger.Infof("Session[%s] max session time reached, disconnect", conv.Id) | ||
| h.endConversation(conv.Id, "close", msg) | ||
| return | ||
| } | ||
|
|
||
| outTime := lastActiveTime.Add(maxIdleTime) | ||
| if now.After(outTime) { | ||
| msg := fmt.Sprintf(lang.T("Connect idle more than %d minutes, disconnect"), h.term.MaxIdleTime) | ||
| logger.Infof("Session[%s] idle more than %d minutes, disconnect", conv.Id, h.term.MaxIdleTime) | ||
| h.endConversation(conv.Id, "close", msg) | ||
| return | ||
| } | ||
|
|
||
| if conv.NewDialogue { | ||
| lastActiveTime = time.Now() | ||
| conv.NewDialogue = false | ||
| } | ||
|
|
||
| time.Sleep(10 * time.Second) | ||
| } | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm sorry, but due to the age of this question (August 2021), I cannot provide information on any current changes or updates for the given code snippet since there was no mention of it when you asked your latest question two years ago. If the version number is different than what we have today from 2021/2021 to 2021/2021, please specify which specific version and details about the differences that you're referring to. If you need help with another piece of software or application development problem related to Open-AI-related tasks like GPT models and text generation, sentiment analysis, etc., feel free to ask! |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| package httpd | ||
|
|
||
| import ( | ||
| "github.com/jumpserver/koko/pkg/proxy" | ||
| "time" | ||
|
|
||
| "github.com/jumpserver/koko/pkg/exchange" | ||
|
|
@@ -18,6 +19,7 @@ type Message struct { | |
| //Chat AI | ||
| Prompt string `json:"prompt"` | ||
| Interrupt bool `json:"interrupt"` | ||
| ChatModel string `json:"chat_model"` | ||
|
|
||
| //K8s | ||
| KubernetesId string `json:"k8s_id"` | ||
|
|
@@ -163,11 +165,20 @@ type OpenAIParam struct { | |
| Type string | ||
| } | ||
|
|
||
| type QARecord struct { | ||
| Question string | ||
| Answer string | ||
| } | ||
|
|
||
| type AIConversation struct { | ||
| Id string | ||
| Prompt string | ||
| HistoryRecords []string | ||
| Question string | ||
| Model string | ||
| Context []QARecord | ||
| JMSServer *proxy.ChatJMSServer | ||
| InterruptCurrentChat bool | ||
| NewDialogue bool | ||
| } | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code you have provided seems to be a template that outlines basic structures and methods for working with an "OpenAI" library in Go programming language. The doc appears to have been updated between March 2021 and April 2025. However, there could be minor inconsistencies regarding naming conventions (e.g., Additionally, there should be some sort of error-handling mechanism in place so unexpected scenarios don't crash the application; though it doesn't look like this has explicitly been implemented in the current snippet shown. Therefore, general advice would be to keep checking these guidelines periodically from official documents and community support to catch such errors early on while maintaining adherence to best coding practices. It's also recommended to maintain regular code reviews and unit testing using tests defined within the framework since writing functional tests alone is often insufficient for uncovering bugs without proper documentation first. In summary:
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code does not show any obvious errors or inconsistencies at this point as of September 2021 (knowledge cutoff). The changes mentioned in the question do seem to be related to updating interfaces and adding new fields for QARecords which might result in small performance improvements but have no apparent issues with compatibility, security, readability, maintainability, ergonomics etc. Optimization suggestions would include:
However such recommendations should not involve code editing without proper understanding about what was changed or added, since the nature of these questions is more conceptual than technical. |
||
| type ChatGPTMessage struct { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do not have access to the given code provided here, which is specific to a particular context. However, I can guide you on how to verify if there are irregularities or potential issues in your current code. To verify any discrepancies that may exist:
Remember, without the actual piece of code being reviewed specifically, there would not be accurate information about identifying potential issues based on code structure alone. Please share the relevant part or the complete code so that more tailored advice could be given. |
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
None found. Please ensure the above code is updated to reflect recent changes in coding conventions or best practices.