Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 168 additions & 3 deletions agent/src/config/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::{
collections::{HashMap, HashSet},
fmt,
hash::{Hash, Hasher},
iter,
io, iter,
net::{IpAddr, Ipv4Addr, Ipv6Addr},
path::PathBuf,
str,
Expand All @@ -44,6 +44,8 @@ use nix::{
sched::{sched_setaffinity, CpuSet},
unistd::Pid,
};
#[cfg(any(target_os = "linux", target_os = "android"))]
use procfs::{process::Process, ProcError};
use sysinfo::SystemExt;
#[cfg(any(target_os = "linux", target_os = "android"))]
use sysinfo::{CpuRefreshKind, RefreshKind, System};
Expand Down Expand Up @@ -2674,6 +2676,89 @@ impl ConfigHandler {
}
}

#[cfg(any(target_os = "linux", target_os = "android"))]
fn procfs_error_to_io(error: ProcError) -> io::Error {
let kind = match &error {
ProcError::PermissionDenied(_) => io::ErrorKind::PermissionDenied,
ProcError::NotFound(_) => io::ErrorKind::NotFound,
ProcError::Incomplete(_) => io::ErrorKind::UnexpectedEof,
ProcError::Io(inner, _) => inner.kind(),
ProcError::Other(_) | ProcError::InternalError(_) => io::ErrorKind::Other,
};
io::Error::new(kind, error)
}

#[cfg(any(target_os = "linux", target_os = "android"))]
fn should_skip_cpu_affinity(thread_name: &str) -> bool {
// `kick-kern.*` threads are self-managed eBPF per-CPU kickers.
thread_name.starts_with("kick-kern.")
}

#[cfg(any(target_os = "linux", target_os = "android"))]
fn set_cpu_affinity_for_all_threads(cpu_set: &CpuSet) -> io::Result<()> {
const MAX_AFFINITY_SYNC_PASSES: usize = 3;

let mut processed_thread_ids = HashSet::new();
let mut first_error: Option<io::Error> = None;

for _ in 0..MAX_AFFINITY_SYNC_PASSES {
let mut saw_new_thread = false;
let process = Process::myself().map_err(Self::procfs_error_to_io)?;
let tasks = process.tasks().map_err(Self::procfs_error_to_io)?;

for task in tasks {
Comment thread
kylewanginchina marked this conversation as resolved.
let task = match task {
Ok(task) => task,
Err(ProcError::NotFound(_)) => continue,
Err(e) => {
if first_error.is_none() {
first_error = Some(Self::procfs_error_to_io(e));
}
continue;
}
};
let thread_id = task.tid;
saw_new_thread |= processed_thread_ids.insert(thread_id);

let thread_name = match task.status() {
Ok(status) => status.name,
Err(ProcError::NotFound(_)) => continue,
Err(e) => {
if first_error.is_none() {
first_error = Some(Self::procfs_error_to_io(e));
}
continue;
}
};

if Self::should_skip_cpu_affinity(&thread_name) {
continue;
}

if let Err(e) = sched_setaffinity(Pid::from_raw(thread_id), cpu_set) {
if e == nix::errno::Errno::ESRCH {
continue;
}
if first_error.is_none() {
first_error = Some(io::Error::new(
io::ErrorKind::Other,
format!(
"CPU Affinity({:?}) bind error for thread {} ({}): {:?}",
cpu_set, thread_id, thread_name, e
),
));
}
}
}

if !saw_new_thread {
break;
}
}

first_error.map_or(Ok(()), Err)
}

#[cfg(any(target_os = "linux", target_os = "android"))]
fn set_swap_disabled() {
unsafe {
Expand Down Expand Up @@ -2717,8 +2802,7 @@ impl ConfigHandler {
if invalid_config {
warn!("Invalid CPU Affinity config {:?}.", cpu_affinity);
} else {
let pid = std::process::id() as i32;
if let Err(e) = sched_setaffinity(Pid::from_raw(pid), &cpu_set) {
if let Err(e) = Self::set_cpu_affinity_for_all_threads(cpu_set) {
warn!("CPU Affinity({:?}) bind error: {:?}.", &cpu_set, e);
}
}
Expand Down Expand Up @@ -5893,6 +5977,16 @@ impl ModuleConfig {
#[cfg(test)]
mod tests {
use super::*;
#[cfg(any(target_os = "linux", target_os = "android"))]
use nix::{
sched::{sched_getaffinity, sched_setaffinity},
unistd::Pid,
};
#[cfg(any(target_os = "linux", target_os = "android"))]
use std::{
sync::{mpsc, Arc, Barrier},
thread,
};

#[test]
fn test_new_trie() {
Expand Down Expand Up @@ -6010,4 +6104,75 @@ mod tests {
assert!(!trie.is_unconcerned("xxx.yyy.zzz.aaa"));
assert!(!trie.is_unconcerned("yyy.zzz"));
}

#[cfg(any(target_os = "linux", target_os = "android"))]
fn available_cpu_ids(cpu_set: &CpuSet) -> Vec<usize> {
(0..libc::CPU_SETSIZE as usize)
.filter(|id| cpu_set.is_set(*id).unwrap_or(false))
.collect()
}

#[cfg(any(target_os = "linux", target_os = "android"))]
fn single_cpu_set(cpu_id: usize) -> CpuSet {
let mut cpu_set = CpuSet::new();
cpu_set.set(cpu_id).unwrap();
cpu_set
}

#[cfg(any(target_os = "linux", target_os = "android"))]
struct AffinityRestore(CpuSet);

#[cfg(any(target_os = "linux", target_os = "android"))]
impl Drop for AffinityRestore {
fn drop(&mut self) {
let _ = sched_setaffinity(Pid::from_raw(0), &self.0);
}
}

#[cfg(any(target_os = "linux", target_os = "android"))]
#[test]
fn test_set_cpu_affinity_updates_existing_worker_threads() {
let original_cpu_set = sched_getaffinity(Pid::from_raw(0)).unwrap();
let _restore = AffinityRestore(original_cpu_set);
let available_cpu_ids = available_cpu_ids(&_restore.0);
if available_cpu_ids.len() < 2 {
return;
}

let target_cpu_id = available_cpu_ids[0];
let worker_initial_cpu_id = available_cpu_ids[1];
let worker_initial_cpu_set = single_cpu_set(worker_initial_cpu_id);
let worker_ready = Arc::new(Barrier::new(2));
let worker_check = Arc::new(Barrier::new(2));
let (sender, receiver) = mpsc::channel();

let worker = {
let worker_ready = worker_ready.clone();
let worker_check = worker_check.clone();
thread::spawn(move || {
sched_setaffinity(Pid::from_raw(0), &worker_initial_cpu_set).unwrap();
worker_ready.wait();
worker_check.wait();
sender
.send(sched_getaffinity(Pid::from_raw(0)).unwrap())
.unwrap();
})
};

worker_ready.wait();

let mut cpu_set = CpuSet::new();
ConfigHandler::set_cpu_affinity(&vec![target_cpu_id], &mut cpu_set);

worker_check.wait();

let worker_cpu_set = receiver.recv().unwrap();
let main_thread_cpu_set = sched_getaffinity(Pid::from_raw(0)).unwrap();
worker.join().unwrap();

assert!(main_thread_cpu_set.is_set(target_cpu_id).unwrap());
assert!(!main_thread_cpu_set.is_set(worker_initial_cpu_id).unwrap());
assert!(worker_cpu_set.is_set(target_cpu_id).unwrap());
assert!(!worker_cpu_set.is_set(worker_initial_cpu_id).unwrap());
}
}
5 changes: 2 additions & 3 deletions server/agent_config/README-CH.md
Original file line number Diff line number Diff line change
Expand Up @@ -700,8 +700,8 @@ global:

**详细描述**:

操作系统尽可能使用指定 ID 的 CPU 核运行 deepflow-agent 进程。无效的 ID 将被忽略。当前仅对
dispatcher 线程生效。举例:
操作系统尽可能使用指定 ID 的 CPU 核运行 deepflow-agent 进程。无效的 ID 将被忽略。该配置
会作用于已存在的 deepflow-agent 线程,但 self-managed 的 kick-kern.* eBPF 线程除外。举例:
```yaml
global:
tunning:
Expand Down Expand Up @@ -11471,4 +11471,3 @@ dev:
**详细描述**:

未发布的采集器特性可以通过该选项开启。

6 changes: 3 additions & 3 deletions server/agent_config/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -709,8 +709,9 @@ global:
**Description**:

CPU affinity is the tendency of a process to run on a given CPU for as long as possible
without being migrated to other processors. Invalid ID will be ignored. Currently only
works for dispatcher threads. Example:
without being migrated to other processors. Invalid ID will be ignored. The setting
applies to existing deepflow-agent threads as well, except self-managed kick-kern.*
eBPF threads. Example:
```yaml
global:
tunning:
Expand Down Expand Up @@ -11740,4 +11741,3 @@ dev:
**Description**:

Unreleased deepflow-agent features can be turned on by setting this switch.

4 changes: 2 additions & 2 deletions server/agent_config/example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1866,7 +1866,8 @@ static_config:
##################
# Note:
# CPU affinity is the tendency of a process to run on a given CPU for as long as possible without being migrated
# to other processors
# to other processors. Invalid IDs will be ignored. The setting applies to existing
# deepflow-agent threads as well, except self-managed kick-kern.* eBPF threads.
# Example:
# cpu-affinity: 1,3,5,7,9,11,13,15
cpu-affinity:
Expand Down Expand Up @@ -1919,4 +1920,3 @@ static_config:
#################
# Note: Unreleased deepflow-agent features can be turned on by setting this switch.
feature-flags:

9 changes: 5 additions & 4 deletions server/agent_config/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -489,16 +489,17 @@ global:
# description:
# en: |-
# CPU affinity is the tendency of a process to run on a given CPU for as long as possible
# without being migrated to other processors. Invalid ID will be ignored. Currently only
# works for dispatcher threads. Example:
# without being migrated to other processors. Invalid ID will be ignored. The setting
# applies to existing deepflow-agent threads as well, except self-managed kick-kern.*
# eBPF threads. Example:
# ```yaml
# global:
# tunning:
# cpu_affinity: [1, 3, 5, 7, 9]
# ```
# ch: |-
# 操作系统尽可能使用指定 ID 的 CPU 核运行 deepflow-agent 进程。无效的 ID 将被忽略。当前仅对
# dispatcher 线程生效。举例:
# 操作系统尽可能使用指定 ID 的 CPU 核运行 deepflow-agent 进程。无效的 ID 将被忽略。该配置
# 会作用于已存在的 deepflow-agent 线程,但 self-managed 的 kick-kern.* eBPF 线程除外。举例:
# ```yaml
# global:
# tunning:
Expand Down
Loading