Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
1ca9e6f
feat(agent): add AiAgentConfig for AI agent governance
kylewanginchina Mar 9, 2026
60bddd8
feat(agent): add BIZ_TYPE_AI_AGENT constant
kylewanginchina Mar 9, 2026
245b0ca
feat(agent): add ai_agent stub in enterprise-utils
kylewanginchina Mar 9, 2026
f4f1130
feat(agent): add AI agent URL detection hook in HTTP parser
kylewanginchina Mar 9, 2026
3330804
feat(agent): add biz_type to ProcessData and ProcessInfo proto
kylewanginchina Mar 9, 2026
d98f869
feat(agent): bypass flow reassembly limits for AI agent traffic
kylewanginchina Mar 9, 2026
2f703fe
feat(ebpf): add access_permission field and AI agent hook point
kylewanginchina Mar 9, 2026
0776f2c
feat(agent): wire AiAgentRegistry into agent lifecycle
kylewanginchina Mar 9, 2026
5c864b5
fix(agent): add missing L7ProtocolInfoInterface import and fix unused…
kylewanginchina Mar 9, 2026
c2f2e65
fix(ebpf): add null statement after skip_latency_filter label
kylewanginchina Mar 9, 2026
4bc1a03
feat(agent): AI Agent governance - fix blockers, unlimited reassembly…
kylewanginchina Mar 10, 2026
a6a0fef
fix(agent): fix blocking issues in AI Agent governance event pipeline
kylewanginchina Mar 10, 2026
cc6bbd8
agent/controller: ai agent reassembly and process biz_type
kylewanginchina Mar 10, 2026
2d70889
server: add ai_agent config and bump db version
kylewanginchina Mar 10, 2026
4ea530f
agent: extend endpoints for agent judgement
kylewanginchina Mar 11, 2026
e45c6b0
agent: fix ai-agent tracepoint hooks
kylewanginchina Mar 11, 2026
c0d3329
agent: add ai-agent chmod/chown/unlink tracepoints
kylewanginchina Mar 11, 2026
2aea671
Fix AI Agent cleanup using full proc scan
kylewanginchina Mar 11, 2026
321af1c
Fix AI agent pid_tgid usage in socket trace
kylewanginchina Mar 11, 2026
ba6a9bb
Reduce AI agent stack usage in data submit
kylewanginchina Mar 11, 2026
c6b50f2
Sync biz_type for gprocess in multi-controller
kylewanginchina Mar 11, 2026
95fe692
Fix missing is_ai_agent in socket_trace
kylewanginchina Mar 11, 2026
5b78c97
Avoid BPF stack usage for ai_agent flag
kylewanginchina Mar 11, 2026
d37714d
Fix AI agent governance review issues
kylewanginchina Mar 11, 2026
ac18359
Fix proc scan hook warning and HTTP endpoint borrow
kylewanginchina Mar 11, 2026
0ebe436
agent: auto sync AI agent gprocess_info
kylewanginchina Mar 11, 2026
b45fa06
agent: mark ai agent biz_type in gprocess
kylewanginchina Mar 11, 2026
aa92135
server: support gprocess.biz_type tag query
kylewanginchina Mar 12, 2026
a63adf3
agent: log ai agent pids for gprocess sync
kylewanginchina Mar 12, 2026
5df64cc
ai-agent: inherit child proc lifecycle
kylewanginchina Mar 12, 2026
b1be1b4
ai-agent: expose proc event start time
kylewanginchina Mar 12, 2026
3a4ef94
Fix proc lifecycle gprocess fallback and captured bytes
kylewanginchina Mar 12, 2026
a8c70bb
fix: guard ai reasm bytes on invalid socket info
kylewanginchina Mar 12, 2026
6d8c0ce
Fix proc.gprocess_info refresh on process change
kylewanginchina Mar 12, 2026
aaf3a3b
fix: enable reassembly after protocol inference
kylewanginchina Mar 12, 2026
9bde4c6
fix: enable reassembly on inferred protocol for existing sockets
kylewanginchina Mar 12, 2026
1957515
fix: ai-agent 子孙进程继承 gprocess_id
kylewanginchina Mar 13, 2026
c128384
fix: proc exec 保持 gprocess 继承与单测
kylewanginchina Mar 13, 2026
96fa37f
Ensure AI agent pids included in socket list sync
kylewanginchina Mar 13, 2026
d0ab245
fix: propagate reasm_bytes on ebpf merge
kylewanginchina Mar 13, 2026
932234d
fix: remove record_endpoint_hit stub from AI Agent registry
kylewanginchina Mar 16, 2026
94e4ea5
feat: add ai_agent_root_pid to resolve gprocess_id for child processes
kylewanginchina Mar 17, 2026
647d2d2
fix: normalize file_op event output to match IoEvent format
kylewanginchina Mar 18, 2026
abff528
fix: AI agent子进程文件读写事件被collect_mode过滤
kylewanginchina Mar 18, 2026
6ddee0a
chore: support deepflow-ctl for ai-agent process show
kylewanginchina Mar 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions agent/crates/enterprise-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,3 +434,88 @@ pub mod rpc {
}
}
}

pub mod ai_agent {
use std::sync::Arc;
use std::time::Duration;

#[derive(Debug, Clone, Default)]
pub struct AgentMeta {
pub first_seen: Duration,
pub last_seen: Duration,
pub matched_endpoint: String,
pub root_pid: u32,
}

#[derive(Debug, Clone, Default)]
pub struct AiAgentRegistry;

impl AiAgentRegistry {
pub fn new() -> Self {
AiAgentRegistry
}

pub fn register(&self, _pid: u32, _endpoint: &str, _now: Duration) -> bool {
false
}

pub fn is_ai_agent(&self, _pid: u32) -> bool {
false
}

pub fn get_root_pid(&self, _pid: u32) -> u32 {
0
}

pub fn register_child(&self, _parent_pid: u32, _child_pid: u32, _now: Duration) -> bool {
false
}

pub fn get_all_pids(&self) -> Vec<u32> {
vec![]
}

pub fn cleanup_dead_pids(&self, _alive_pids: &[u32]) -> Vec<u32> {
vec![]
}

pub fn len(&self) -> usize {
0
}

pub fn is_empty(&self) -> bool {
true
}

pub fn sync_bpf_map_add(&self, _pid: u32) {}

pub fn sync_bpf_map_remove(&self, _pid: u32) {}

#[cfg(target_os = "linux")]
pub fn set_bpf_map_fd(&self, _fd: i32) {}

pub fn set_file_io_enabled(&self, _enabled: bool) {}
}

/// Check if a URL path matches an AI Agent endpoint pattern.
pub fn match_ai_agent_endpoint(
_endpoints: &[String],
_path: &str,
_pid: u32,
_now: Duration,
) -> Option<String> {
None
}

/// Initialize the global AI Agent registry. Returns the registry Arc.
/// Stub: returns a no-op registry.
pub fn init_global_registry() -> Arc<AiAgentRegistry> {
Arc::new(AiAgentRegistry::new())
}

/// Get a reference to the global AI Agent registry.
/// Stub: always returns None.
pub fn global_registry() -> Option<&'static Arc<AiAgentRegistry>> {
None
}
}
4 changes: 4 additions & 0 deletions agent/src/common/ebpf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ pub const GO_HTTP2_UPROBE_DATA: u8 = 5;
pub const SOCKET_CLOSE_EVENT: u8 = 6;
// unix socket
pub const UNIX_SOCKET: u8 = 8;
// AI Agent governance event types
pub const FILE_OP_EVENT: u8 = 9;
pub const PERM_OP_EVENT: u8 = 10;
pub const PROC_LIFECYCLE_EVENT: u8 = 11;

const EBPF_TYPE_TRACEPOINT: u8 = 0;
const EBPF_TYPE_TLS_UPROBE: u8 = 1;
Expand Down
4 changes: 4 additions & 0 deletions agent/src/common/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,10 @@ impl From<FlowPerfStats> for flow_log::FlowPerfStats {
}
}

// Business type constants for process classification
pub const BIZ_TYPE_DEFAULT: u8 = 0;
pub const BIZ_TYPE_AI_AGENT: u8 = 1;

#[derive(Clone, Debug, Default)]
pub struct L7Stats {
pub stats: L7PerfStats,
Expand Down
53 changes: 50 additions & 3 deletions agent/src/common/l7_protocol_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use std::sync::{
};
use std::time::Duration;

use crate::common::l7_protocol_info::L7ProtocolInfoInterface;

use enum_dispatch::enum_dispatch;
use log::debug;
use lru::LruCache;
Expand Down Expand Up @@ -248,6 +250,16 @@ impl L7ParseResult {
L7ParseResult::None => panic!("parse result is none but unwrap multi"),
}
}

/// Check if any parsed result has the given biz_type.
/// Used to detect AI Agent flows after parsing.
pub fn has_biz_type(&self, biz_type: u8) -> bool {
match self {
L7ParseResult::Single(info) => info.get_biz_type() == biz_type,
L7ParseResult::Multi(infos) => infos.iter().any(|i| i.get_biz_type() == biz_type),
L7ParseResult::None => false,
}
}
}

#[enum_dispatch]
Expand Down Expand Up @@ -685,11 +697,13 @@ pub struct ParseParam<'a> {

// the config of `l7_log_packet_size`, must set in parse_payload and check_payload
pub buf_size: u16,
pub captured_byte: u16,
pub captured_byte: u32,

pub oracle_parse_conf: OracleConfig,
pub iso8583_parse_conf: Iso8583ParseConfig,
pub web_sphere_mq_parse_conf: WebSphereMqParseConfig,

pub process_id: u32,
}

impl<'a> fmt::Debug for ParseParam<'a> {
Expand Down Expand Up @@ -793,6 +807,8 @@ impl<'a> ParseParam<'a> {
oracle_parse_conf: OracleConfig::default(),
iso8583_parse_conf: Iso8583ParseConfig::default(),
web_sphere_mq_parse_conf: WebSphereMqParseConfig::default(),

process_id: packet.process_id,
}
}
}
Expand All @@ -811,11 +827,17 @@ impl<'a> ParseParam<'a> {
}

pub fn set_buf_size(&mut self, buf_size: usize) {
self.buf_size = buf_size as u16;
// Saturate to u16::MAX to avoid overflow when AI Agent flows use larger payload sizes.
// buf_size is informational for plugins; actual payload truncation uses the usize value directly.
self.buf_size = if buf_size > u16::MAX as usize {
u16::MAX
} else {
buf_size as u16
};
}

pub fn set_captured_byte(&mut self, captured_byte: usize) {
self.captured_byte = captured_byte as u16;
self.captured_byte = u32::try_from(captured_byte).unwrap_or(u32::MAX);
}

pub fn set_rrt_timeout(&mut self, t: usize) {
Expand Down Expand Up @@ -939,3 +961,28 @@ impl fmt::Debug for L7ProtocolBitmap {
f.write_str(format!("{:#?}", p).as_str())
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::cell::RefCell;
use std::rc::Rc;

#[test]
fn captured_byte_should_not_truncate_large_payloads() {
let packet = MetaPacket::default();
let mut param = ParseParam::new(
&packet,
None,
Rc::new(RefCell::new(None)),
#[cfg(any(target_os = "linux", target_os = "android"))]
Rc::new(RefCell::new(None)),
false,
false,
);

let captured: u32 = 200_000;
param.set_captured_byte(captured as usize);
assert_eq!(param.captured_byte as u32, captured);
}
}
22 changes: 22 additions & 0 deletions agent/src/common/meta_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ pub struct MetaPacket<'a> {
pub socket_id: u64,
pub cap_start_seq: u64,
pub cap_end_seq: u64,
pub reasm_bytes: u32,
pub l7_protocol_from_ebpf: L7Protocol,
// 流结束标识, 目前只有 go http2 uprobe 用到
pub is_request_end: bool,
Expand Down Expand Up @@ -1035,6 +1036,9 @@ impl<'a> MetaPacket<'a> {
#[inline]
pub fn get_captured_byte(&self) -> usize {
if self.tap_port.is_from(TapPort::FROM_EBPF) {
if self.reasm_bytes > 0 {
return self.reasm_bytes as usize;
}
return self.packet_len as usize - 54;
}

Expand Down Expand Up @@ -1083,6 +1087,13 @@ impl<'a> MetaPacket<'a> {
self.payload_len += packet.payload_len;
self.l4_payload_len += packet.l4_payload_len;
self.cap_end_seq = packet.cap_start_seq;
// eBPF reassembly: propagate the latest cumulative reassembly bytes.
// `reasm_bytes` reflects the total bytes reassembled in the kernel for
// this flow. When we merge multiple MSG_REASM_* segments, we must keep
// the newest cumulative value, otherwise `get_captured_byte()` will
// stay at the first segment's size (e.g., HTTP headers only) and
// `captured_request_byte` will be incorrect for large bodies.
self.reasm_bytes = self.reasm_bytes.max(packet.reasm_bytes);
}

#[cfg(all(unix, feature = "libtrace"))]
Expand Down Expand Up @@ -1147,6 +1158,7 @@ impl<'a> MetaPacket<'a> {
packet.signal_source = SignalSource::EBPF;
packet.cap_start_seq = data.cap_seq;
packet.cap_end_seq = data.cap_seq;
packet.reasm_bytes = data.reasm_bytes;
packet.process_id = data.process_id;
packet.thread_id = data.thread_id;
packet.coroutine_id = data.coroutine_id;
Expand Down Expand Up @@ -1597,4 +1609,14 @@ mod tests {
pkt
);
}

#[test]
fn get_captured_byte_prefers_reasm_bytes_for_ebpf() {
let mut pkt = MetaPacket::default();
pkt.tap_port = TapPort::from_ebpf(1, 0);
pkt.packet_len = 54 + 16;
pkt.reasm_bytes = 200_000;

assert_eq!(pkt.get_captured_byte(), 200_000);
}
}
Loading
Loading