diff --git a/agent/src/ebpf/kernel/include/socket_trace_common.h b/agent/src/ebpf/kernel/include/socket_trace_common.h index 4e4e84a6060..accfb2ceea4 100644 --- a/agent/src/ebpf/kernel/include/socket_trace_common.h +++ b/agent/src/ebpf/kernel/include/socket_trace_common.h @@ -369,7 +369,8 @@ enum { */ EVENT_TYPE_MIN = 1 << 9, EVENT_TYPE_PROC_EXEC = 1 << 9, - EVENT_TYPE_PROC_EXIT = 1 << 10 + EVENT_TYPE_PROC_EXIT = 1 << 10, + EVENT_TYPE_LIB_LOAD = 1 << 11 // Add new event type here. }; @@ -378,7 +379,7 @@ struct event_meta { __u32 event_type; }; -// Process execution or exit event data +// Process execution or exit event data struct process_event_t { struct event_meta meta; __u32 pid:31; // process ID @@ -386,6 +387,12 @@ struct process_event_t { __u8 name[TASK_COMM_LEN]; // process name }; +// Library load event data (triggered by _dl_open uretprobe) +struct library_load_event_t { + struct event_meta meta; + __u32 pid; // process ID +}; + struct debug_data { __u16 magic; __u8 fun; diff --git a/agent/src/ebpf/kernel/uprobe_base.bpf.c b/agent/src/ebpf/kernel/uprobe_base.bpf.c index 102c2117dc4..489e4946e85 100644 --- a/agent/src/ebpf/kernel/uprobe_base.bpf.c +++ b/agent/src/ebpf/kernel/uprobe_base.bpf.c @@ -774,3 +774,22 @@ TP_SCHED_PROG(process_exec) (struct sched_comm_exec_ctx *ctx) { return __process_exec((void *)ctx); } + +// uretprobe: fires when _dl_open() returns after dynamically loading a library. +// Attached from user-space via program__attach_uprobe() to ld-linux's _dl_open. +URETPROG(dl_open_uretprobe) (struct pt_regs *ctx) +{ + struct member_fields_offset *offset = retrieve_ready_kern_offset(); + if (offset == NULL) + return 0; + + __u64 id = bpf_get_current_pid_tgid(); + pid_t pid = id >> 32; + + struct library_load_event_t data = {}; + data.meta.event_type = EVENT_TYPE_LIB_LOAD; + data.pid = pid; + bpf_perf_event_output(ctx, &NAME(socket_data), + BPF_F_CURRENT_CPU, &data, sizeof(data)); + return 0; +} diff --git a/agent/src/ebpf/mod.rs b/agent/src/ebpf/mod.rs index e47bb80995c..8565dc80043 100644 --- a/agent/src/ebpf/mod.rs +++ b/agent/src/ebpf/mod.rs @@ -212,6 +212,8 @@ pub const MSG_CLOSE: u8 = 10; pub const EVENT_TYPE_PROC_EXEC: u32 = 1 << 9; #[allow(dead_code)] pub const EVENT_TYPE_PROC_EXIT: u32 = 1 << 10; +#[allow(dead_code)] +pub const EVENT_TYPE_LIB_LOAD: u32 = 1 << 11; // Profiler types #[allow(dead_code)] diff --git a/agent/src/ebpf/user/proc.c b/agent/src/ebpf/user/proc.c index 4648c921a58..c7976e6fd4a 100644 --- a/agent/src/ebpf/user/proc.c +++ b/agent/src/ebpf/user/proc.c @@ -1087,7 +1087,7 @@ void process_event_free(struct process_create_event *event) void add_event_to_proc_list(proc_event_list_t * list, struct bpf_tracer *tracer, int pid, char *path) { - static const uint32_t PROC_EVENT_HANDLE_DELAY = 120; + static const uint32_t PROC_EVENT_HANDLE_DELAY = 0; struct process_create_event *event = NULL; event = calloc(1, sizeof(struct process_create_event)); diff --git a/agent/src/ebpf/user/socket.c b/agent/src/ebpf/user/socket.c index 3ba3d766fb3..b84a51439e4 100644 --- a/agent/src/ebpf/user/socket.c +++ b/agent/src/ebpf/user/socket.c @@ -943,6 +943,16 @@ static inline int process_exists(pid_t pid) return 0; // does not exist } +static void library_load_event(struct library_load_event_t *e) +{ + /* + * A shared library was loaded via dlopen(). Re-trigger the process + * initialization pipeline so that maps are rescanned and uprobes + * are attached for the newly loaded library. + */ + extended_process_exec(e->pid); +} + static void process_event(struct process_event_t *e) { if (e->meta.event_type == EVENT_TYPE_PROC_EXEC) { @@ -992,6 +1002,11 @@ static int register_events_handle(struct reader_forward_info *fwd_info, process_event((struct process_event_t *)meta); } + // Handle library load events (dlopen detection). + if (meta->event_type == EVENT_TYPE_LIB_LOAD) { + library_load_event((struct library_load_event_t *)meta); + } + struct extra_event *e; void (*fn) (void *) = NULL; list_for_each_entry(e, &events_list, list) { diff --git a/agent/src/ebpf/user/tracer.c b/agent/src/ebpf/user/tracer.c index 798a20f5cce..d224f47a0cd 100644 --- a/agent/src/ebpf/user/tracer.c +++ b/agent/src/ebpf/user/tracer.c @@ -2095,6 +2095,13 @@ bool is_pid_match(int feature, int pid) return kv.value & (1UL << feature); } +int add_feature_pid(int feature, int pid) +{ + if (feature < 0 || feature >= FEATURE_MAX) + return -1; + return add_pid_to_match_hash(feature, pid); +} + static int clear_pid_from_match_hash(int feature, int pid) { int ret = 0; diff --git a/agent/src/ebpf/user/tracer.h b/agent/src/ebpf/user/tracer.h index fc6634d254d..73df8f07e7d 100644 --- a/agent/src/ebpf/user/tracer.h +++ b/agent/src/ebpf/user/tracer.h @@ -723,6 +723,7 @@ int exec_set_feature_pids(int feature, const int *pids, int num); int set_feature_pids(int feature, const int *pids, int num); int init_match_pids_hash(void); bool is_pid_match(int feature, int pid); +int add_feature_pid(int feature, int pid); struct probe *create_probe(struct bpf_tracer *tracer, const char *func_name, bool isret, enum probe_type type, void *private, diff --git a/agent/src/ebpf_dispatcher.rs b/agent/src/ebpf_dispatcher.rs index 8c471fd1c84..921fd8f7474 100644 --- a/agent/src/ebpf_dispatcher.rs +++ b/agent/src/ebpf_dispatcher.rs @@ -1153,6 +1153,20 @@ impl EbpfCollector { warn!("ebpf start_continuous_profiler error."); } + // Register the process event callback to receive real-time + // exec/exit notifications from the eBPF layer. This enables + // near-instant PID filter map updates instead of waiting for + // the 10-second /proc scan interval. + if ebpf::register_event_handle( + ebpf::EVENT_TYPE_PROC_EXEC | ebpf::EVENT_TYPE_PROC_EXIT, + crate::utils::process::process_event_callback, + ) != 0 + { + warn!("ebpf register_event_handle for process events failed"); + } else { + info!("ebpf register_event_handle for process events succeeded"); + } + if !is_uprobe_meltdown && !on_cpu.disabled { let feature = "ebpf.profile.on_cpu"; process_listener.register(feature, set_feature_on_cpu); diff --git a/agent/src/utils/process/linux.rs b/agent/src/utils/process/linux.rs index 1778ceb89b1..3e83a55aff2 100644 --- a/agent/src/utils/process/linux.rs +++ b/agent/src/utils/process/linux.rs @@ -24,7 +24,8 @@ use std::{ process, sync::{ atomic::{AtomicBool, Ordering::Relaxed}, - Arc, Mutex, RwLock, + mpsc::{self, Receiver, Sender, TryRecvError}, + Arc, Mutex, OnceLock, RwLock, }, thread::{self, JoinHandle}, time::Duration, @@ -32,11 +33,37 @@ use std::{ use log::{debug, error, info, trace}; use nix::sys::utsname::uname; -use procfs::process::all_processes_with_root; +use procfs::process::{all_processes_with_root, Process}; use crate::config::ProcessMatcher; +use crate::ebpf; use crate::platform::{get_os_app_tag_by_exec, ProcessData, ProcessDataOp}; +// Global sender for process exec/exit events from the eBPF C callback. +// The C callback (running on the sk-reader thread) sends (pid, event_type) +// tuples through this channel. The ProcessListener drains them in its loop. +static PROCESS_EVENT_SENDER: OnceLock>> = OnceLock::new(); + +/// Callback invoked from the C eBPF layer on process exec/exit events. +/// This runs on the sk-reader C thread, so it must be non-blocking. +/// The function pointer signature matches `void (*fn)(void *)` in the C API; +/// the Rust FFI declaration in mod.rs declares it as +/// `extern "C" fn(data: *mut PROCESS_EVENT)`. +pub extern "C" fn process_event_callback(data: *mut ebpf::PROCESS_EVENT) { + if data.is_null() { + return; + } + let (pid, event_type) = unsafe { ((*data).pid, (*data).event_type) }; + + if let Some(sender) = PROCESS_EVENT_SENDER.get() { + if let Ok(sender) = sender.lock() { + // Non-blocking send: if the channel is disconnected, we silently + // drop the event. The 10-second full scan will catch it. + let _ = sender.send((pid, event_type)); + } + } +} + //返回当前进程占用内存RSS单位(字节) pub fn get_memory_rss() -> Result { let pid = process::id(); @@ -304,6 +331,9 @@ pub struct ProcessListener { config: Arc>, thread_handle: Mutex>>, + // Receiver end of the channel for eBPF process exec/exit events. + // Created in new(), the sender is stored in the global PROCESS_EVENT_SENDER. + event_receiver: Arc>>, } impl ProcessListener { @@ -316,11 +346,17 @@ impl ProcessListener { user: String, command: Vec, ) -> Self { + // Create a channel for eBPF process events. + // The sender is stored in a global static so the C callback can reach it. + let (tx, rx) = mpsc::channel(); + let _ = PROCESS_EVENT_SENDER.set(Mutex::new(tx)); + let listener = Self { features: Default::default(), running: Arc::new(AtomicBool::new(false)), thread_handle: Mutex::new(None), config: Arc::new(RwLock::new(Config::new(proc_root, user, command))), + event_receiver: Arc::new(Mutex::new(rx)), }; listener.set(process_blacklist, process_matcher); @@ -516,14 +552,167 @@ impl ProcessListener { } } + /// Handle a single PID event (exec or exit) from the eBPF layer. + /// + /// For EXEC events: reads /proc/ to build ProcessData, matches against + /// each feature's process_matcher list, and if a feature matches and the PID + /// is not already in its list, adds it and invokes the callback. + /// + /// For EXIT events: removes the PID from all feature lists and invokes + /// callbacks for any feature whose PID list changed. + fn process_single_pid( + pid: u32, + event_type: u32, + process_data_cache: &mut HashMap, + features: &mut Features, + user: &str, + command: &[String], + ) { + let (blacklist, features) = (&mut features.blacklist, &mut features.features); + if features.is_empty() { + return; + } + + if event_type & ebpf::EVENT_TYPE_PROC_EXEC != 0 { + // EXEC event: read process info from /proc and match against features + let proc_pid = pid as i32; + let process = match Process::new(proc_pid) { + Ok(p) => p, + Err(e) => { + debug!("process_single_pid: failed to read /proc/{}: {}", pid, e); + return; + } + }; + + // Check blacklist + match process.status().map(|s| s.name) { + Ok(name) if blacklist.binary_search(&name).is_ok() => { + trace!("process_single_pid: process {name} (pid#{pid}) in blacklist, skipping"); + return; + } + Ok(_) => (), + Err(e) => { + debug!( + "process_single_pid: failed to get status for pid {}: {}", + pid, e + ); + return; + } + } + + // Build ProcessData for this PID + let pdata = match ProcessData::try_from(&process) { + Ok(d) => d, + Err(e) => { + debug!( + "process_single_pid: failed to build ProcessData for pid {}: {}", + pid, e + ); + return; + } + }; + process_data_cache.insert(proc_pid, pdata); + + // Get tags (may be empty if command is not configured) + let tags_map = match get_os_app_tag_by_exec(user, command) { + Ok(tags) => tags, + Err(_) => HashMap::new(), + }; + + // Match the new PID against each feature + for (key, node) in features.iter_mut() { + if node.process_matcher.is_empty() || node.callback.is_none() { + continue; + } + + let pdata = match process_data_cache.get(&proc_pid) { + Some(d) => d, + None => continue, + }; + + let mut matched = false; + let mut is_ignored = false; + let mut matched_process_data = None; + + for matcher in &node.process_matcher { + if let Some(process_data) = matcher.get_process_data(pdata, &tags_map) { + if matcher.ignore { + is_ignored = true; + break; + } + matched = true; + matched_process_data = Some(process_data); + break; + } + } + + if is_ignored || !matched { + continue; + } + + let pid_u32 = pid; + // Check if PID is already in the list (binary search since list is sorted) + if node.pids.binary_search(&pid_u32).is_ok() { + continue; + } + + // Add PID and re-sort + node.pids.push(pid_u32); + node.pids.sort(); + node.pids.dedup(); + + if let Some(pd) = matched_process_data { + node.process_datas.push(pd); + node.process_datas.sort_by_key(|x| x.pid); + node.process_datas.merge_and_dedup(); + } + + debug!( + "process_single_pid: Feature {} added pid {}, total {} pids.", + key, + pid, + node.pids.len() + ); + node.callback.as_ref().unwrap()(&node.pids, &node.process_datas); + } + } else if event_type & ebpf::EVENT_TYPE_PROC_EXIT != 0 { + // EXIT event: remove PID from all feature lists + let pid_u32 = pid; + let proc_pid = pid as i32; + + // Remove from process_data_cache + process_data_cache.remove(&proc_pid); + + for (key, node) in features.iter_mut() { + if node.callback.is_none() { + continue; + } + + if let Ok(idx) = node.pids.binary_search(&pid_u32) { + node.pids.remove(idx); + node.process_datas.retain(|pd| pd.pid != pid as u64); + + debug!( + "process_single_pid: Feature {} removed pid {}, remaining {} pids.", + key, + pid, + node.pids.len() + ); + node.callback.as_ref().unwrap()(&node.pids, &node.process_datas); + } + } + } + } + pub fn start(&self) { if self.running.swap(true, Relaxed) { return; } - info!("Startting process listener ..."); + info!("Starting process listener ..."); let features = self.features.clone(); let running = self.running.clone(); let config = self.config.clone(); + let event_receiver = self.event_receiver.clone(); running.store(true, Relaxed); *self.thread_handle.lock().unwrap() = Some( @@ -534,6 +723,47 @@ impl ProcessListener { let mut process_data = HashMap::new(); while running.load(Relaxed) { thread::sleep(Duration::from_secs(1)); + + // Drain all pending eBPF process events (non-blocking). + // Each event triggers an incremental PID update. + if let Ok(receiver) = event_receiver.lock() { + let mut event_count = 0u32; + loop { + match receiver.try_recv() { + Ok((pid, event_type)) => { + let current_config = config.read().unwrap(); + let mut features = features.write().unwrap(); + Self::process_single_pid( + pid, + event_type, + &mut process_data, + &mut features, + ¤t_config.user, + ¤t_config.command, + ); + drop(features); + drop(current_config); + event_count += 1; + } + Err(TryRecvError::Empty) => break, + Err(TryRecvError::Disconnected) => { + debug!( + "process event channel disconnected, \ + falling back to polling only" + ); + break; + } + } + } + if event_count > 0 { + debug!( + "process listener drained {} eBPF events in this cycle", + event_count + ); + } + } + + // Full /proc scan every INTERVAL seconds as fallback count += 1; if count < Self::INTERVAL { continue;