用 Rust 实现进程监控工具:信号处理与系统调用实战
一、进程监控的底层需求:为什么 ps 不够用
Linux 的ps和top命令可以查看进程状态,但它们是"快照式"的工具——只能看到当前时刻的状态,无法持续追踪进程的生命周期变化。在实际运维中,经常需要这样的能力:当某个进程异常退出时立即收到通知、当进程的内存使用超过阈值时自动重启、当进程产生核心转储时收集现场信息。
现有的进程监控工具(如 supervisord、systemd)功能强大,但配置复杂、依赖较重。用 Rust 从零实现一个轻量级进程监控工具,既能深入理解 Linux 信号处理和系统调用机制,又能得到一个可定制的运维工具。
Rust 在系统编程领域有三个关键优势:零成本抽象保证性能、所有权系统保证内存安全、强大的错误处理保证可靠性。对于进程监控这种需要直接与操作系统内核交互的场景,Rust 的这些特性尤为重要。
二、进程监控的底层机制
2.1 Linux 信号机制
信号(Signal)是 Linux 内核向进程发送异步通知的机制。进程监控工具需要处理以下信号:
| 信号 | 编号 | 含义 | 监控用途 |
|---|---|---|---|
| SIGCHLD | 17 | 子进程状态变化 | 检测被监控进程退出 |
| SIGTERM | 15 | 优雅终止 | 通知监控工具自身退出 |
| SIGINT | 2 | 中断(Ctrl+C) | 通知监控工具自身退出 |
| SIGHUP | 1 | 挂起 | 重新加载配置 |
| SIGUSR1 | 10 | 用户自定义 | 触发状态报告 |
flowchart TD A[被监控进程] -->|SIGCHLD| B[监控进程] B -->|waitpid| C[获取退出状态] C --> D{退出原因} D -->|正常退出| E[记录日志,按策略重启] D -->|信号终止| F[记录信号,按策略重启] D -->|核心转储| G[收集现场,告警通知] H[用户终端] -->|SIGTERM/SIGINT| B B -->|优雅关闭| I[停止监控,清理资源] subgraph 监控循环 J[启动被监控进程] K[waitpid 阻塞等待] L[处理退出事件] M[按策略重启或退出] J --> K --> L --> M M -->|重启| J end2.2 进程状态追踪
Linux 进程有以下状态:运行(R)、睡眠(S)、磁盘睡眠(D)、停止(T)、僵尸(Z)。监控工具通过/proc/[pid]/stat文件读取进程状态,通过/proc/[pid]/status读取内存使用等详细信息。
/proc/[pid]/stat的关键字段:
- 字段 3:进程状态(R/S/D/T/Z)
- 字段 14:用户态时间(utime)
- 字段 15:内核态时间(stime)
- 字段 24:RSS(驻留内存,页数)
2.3 waitpid 与子进程管理
当被监控进程是监控进程的子进程时,监控进程通过waitpid()系统调用获取子进程的退出状态。waitpid()有三种模式:
- 阻塞模式:
waitpid(pid, &status, 0)阻塞直到指定子进程退出。 - 非阻塞模式:
waitpid(pid, &status, WNOHANG)立即返回,如果没有子进程退出则返回 0。 - 等待任何子进程:
waitpid(-1, &status, WNOHANG)检查是否有任何子进程退出。
三、Rust 生产级代码实现
3.1 进程管理器
use std::process::{Command, Child}; use std::time::{Duration, Instant}; use serde::{Deserialize, Serialize}; /// 进程重启策略 #[derive(Debug, Clone, Serialize, Deserialize)] pub enum RestartPolicy { /// 不重启 Never, /// 总是重启 Always { delay_secs: u64 }, /// 失败时重启(退出码非 0) OnFailure { delay_secs: u64, max_retries: u32 }, } /// 进程配置 #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ProcessConfig { pub name: String, pub command: String, pub args: Vec<String>, pub env: Vec<(String, String)>, pub restart_policy: RestartPolicy, pub memory_limit_mb: Option<u64>, } /// 进程运行时状态 pub struct ProcessState { child: Option<Child>, pid: Option<u32>, config: ProcessConfig, restart_count: u32, last_start_time: Option<Instant>, last_exit_code: Option<i32>, } impl ProcessState { pub fn new(config: ProcessConfig) -> Self { Self { child: None, pid: None, config, restart_count: 0, last_start_time: None, last_exit_code: None, } } /// 启动进程 pub fn start(&mut self) -> Result<(), Box<dyn std::error::Error>> { let mut cmd = Command::new(&self.config.command); cmd.args(&self.config.args); for (key, value) in &self.config.env { cmd.env(key, value); } let child = cmd.spawn()?; self.pid = Some(child.id()); self.child = Some(child); self.last_start_time = Some(Instant::now()); self.restart_count += 1; println!( "[{}] 进程已启动, PID: {}, 第 {} 次启动", self.config.name, self.pid.unwrap(), self.restart_count, ); Ok(()) } /// 检查进程是否仍在运行 pub fn check_alive(&mut self) -> bool { if let Some(child) = self.child.as_mut() { match child.try_wait() { Ok(Some(status)) => { self.last_exit_code = status.code(); self.pid = None; println!( "[{}] 进程已退出, 状态: {}", self.config.name, status, ); false } Ok(None) => true, // 仍在运行 Err(e) => { eprintln!("[{}] 检查进程状态失败: {}", self.config.name, e); false } } } else { false } } /// 根据重启策略决定是否重启 pub fn should_restart(&self) -> bool { match &self.config.restart_policy { RestartPolicy::Never => false, RestartPolicy::Always { .. } => true, RestartPolicy::OnFailure { max_retries, .. } => { if self.restart_count >= *max_retries { println!( "[{}] 已达最大重启次数 {},不再重启", self.config.name, max_retries, ); return false; } // 退出码非 0 视为失败 self.last_exit_code.map_or(true, |code| code != 0) } } } /// 获取重启延迟 pub fn restart_delay(&self) -> Duration { match &self.config.restart_policy { RestartPolicy::Never => Duration::ZERO, RestartPolicy::Always { delay_secs } => Duration::from_secs(*delay_secs), RestartPolicy::OnFailure { delay_secs, .. } => Duration::from_secs(*delay_secs), } } /// 检查内存使用是否超限 pub fn check_memory(&self) -> Result<bool, Box<dyn std::error::Error>> { let limit = match self.config.memory_limit_mb { Some(l) => l, None => return Ok(true), }; let pid = match self.pid { Some(p) => p, None => return Ok(true), }; let stat_path = format!("/proc/{}/status", pid); let content = std::fs::read_to_string(&stat_path)?; let rss_mb = self.parse_rss_from_status(&content)?; if rss_mb > limit { println!( "[{}] 内存超限: {} MB > {} MB", self.config.name, rss_mb, limit, ); return Ok(false); } Ok(true) } fn parse_rss_from_status(&self, content: &str) -> Result<u64, Box<dyn std::error::Error>> { for line in content.lines() { if line.starts_with("VmRSS:") { let parts: Vec<&str> = line.split_whitespace().collect(); if parts.len() >= 2 { let kb: u64 = parts[1].parse()?; return Ok(kb / 1024); // KB → MB } } } Ok(0) } }3.2 监控主循环
use tokio::signal::unix::{signal, SignalKind}; use tokio::time::sleep; /// 监控器 pub struct ProcessMonitor { processes: Vec<ProcessState>, running: bool, } impl ProcessMonitor { pub fn new(configs: Vec<ProcessConfig>) -> Self { let processes = configs.into_iter() .map(ProcessState::new) .collect(); Self { processes, running: false, } } /// 启动监控主循环 pub async fn run(&mut self) -> Result<(), Box<dyn std::error::Error>> { self.running = true; // 启动所有被监控进程 for proc in &mut self.processes { proc.start()?; } // 注册信号处理 let mut sigterm = signal(SignalKind::terminate())?; let mut sigint = signal(SignalKind::interrupt())?; println!("监控器已启动,正在监控 {} 个进程", self.processes.len()); while self.running { // 检查信号 tokio::select! { _ = sigterm.recv() => { println!("收到 SIGTERM,正在优雅关闭..."); self.running = false; break; } _ = sigint.recv() => { println!("收到 SIGINT,正在优雅关闭..."); self.running = false; break; } _ = sleep(Duration::from_secs(1)) => { // 定期检查进程状态 self.check_processes().await?; } } } // 优雅关闭:向所有子进程发送 SIGTERM self.shutdown().await?; Ok(()) } async fn check_processes(&mut self) -> Result<(), Box<dyn std::error::Error>> { for proc in &mut self.processes { if !proc.check_alive() { if proc.should_restart() { let delay = proc.restart_delay(); if !delay.is_zero() { println!( "[{}] {} 秒后重启...", proc.config.name, delay.as_secs(), ); sleep(delay).await; } proc.start()?; } } else { // 检查内存是否超限 if !proc.check_memory()? { // 内存超限,终止并重启 if let Some(child) = proc.child.as_mut() { let _ = child.kill(); } } } } Ok(()) } async fn shutdown(&mut self) -> Result<(), Box<dyn std::error::Error>> { for proc in &mut self.processes { if let Some(child) = proc.child.as_mut() { // 发送 SIGTERM let _ = child.kill(); // 等待进程退出(最多 5 秒) match tokio::time::timeout( Duration::from_secs(5), tokio::task::spawn_blocking(move || child.wait()), ).await { Ok(Ok(Ok(status))) => { println!("[{}] 已退出: {}", proc.config.name, status); } _ => { // 超时,强制 SIGKILL if let Some(child) = proc.child.as_mut() { let _ = child.kill(); } println!("[{}] 强制终止", proc.config.name); } } } } Ok(()) } }3.3 配置文件与启动入口
use clap::Parser; /// 进程监控工具 #[derive(Parser)] #[command(name = "procwatch", about = "轻量级进程监控工具")] struct Cli { /// 配置文件路径 #[arg(short, long, default_value = "procwatch.toml")] config: String, } #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let cli = Cli::parse(); // 读取配置文件 let config_content = std::fs::read_to_string(&cli.config)?; let configs: Vec<ProcessConfig> = toml::from_str(&config_content)?; // 启动监控器 let mut monitor = ProcessMonitor::new(configs); monitor.run().await?; Ok(()) }配置文件示例(procwatch.toml):
[[process]] name = "web-server" command = "/usr/bin/python3" args = ["-m", "http.server", "8080"] restart_policy = { OnFailure = { delay_secs = 5, max_retries = 3 } } memory_limit_mb = 512 [[process]] name = "worker" command = "/usr/local/bin/worker" args = ["--queue", "default"] env = [["RUST_LOG", "info"]] restart_policy = { Always = { delay_secs = 3 } }四、Trade-offs:自建监控工具的代价
4.1 功能覆盖度
自建监控工具的功能远不如 systemd 或 supervisord 完善——没有依赖管理、没有进程组控制、没有日志轮转、没有 Web UI。如果业务需要这些功能,应该直接使用成熟的工具。自建工具的价值在于"轻量"和"可定制"——二进制只有几 MB,没有运行时依赖,可以根据具体需求定制监控逻辑。
4.2 信号处理的复杂性
Rust 的信号处理比 C 更复杂——Rust 的信号处理器必须是异步安全的(async-signal-safe),不能在信号处理器中调用大多数标准库函数。tokio::signal通过将信号处理委托给独立线程解决了这个问题,但增加了运行时依赖。对于不需要异步运行时的场景,可以使用nixcrate 的信号处理接口。
4.3 适用边界
自建进程监控工具适用于以下场景:嵌入式或容器环境(资源受限)、需要定制监控逻辑(如内存超限自动重启)、学习 Linux 系统编程。不适用于:生产环境的核心服务管理(用 systemd)、需要完整功能(日志、依赖、进程组)的场景、团队没有系统编程经验。
五、总结
用 Rust 实现进程监控工具,是学习 Linux 信号处理和系统调用的实战项目。核心落地步骤如下:
- 理解信号机制:SIGCHLD 检测子进程退出,SIGTERM/SIGINT 处理自身关闭。
- 使用 waitpid:非阻塞模式检查子进程状态,获取退出码和退出原因。
- 实现重启策略:Never/Always/OnFailure 三种策略,支持延迟重启和最大重试次数。
- 读取 /proc 文件:通过
/proc/[pid]/status获取进程的内存使用等信息。 - 优雅关闭:先 SIGTERM,等待 5 秒,超时则 SIGKILL 强制终止。
进程监控是系统编程的入门项目,但"入门"不等于"简单"——信号处理、僵尸进程、竞态条件,每一个都是需要认真对待的工程问题。