卡尔曼滤波技术博客系列: 第五篇:先进多目标跟踪 - 随机有限集理论与实现

张开发
2026/4/10 12:28:37 15 分钟阅读

分享文章

卡尔曼滤波技术博客系列: 第五篇:先进多目标跟踪 - 随机有限集理论与实现
摘要在多目标跟踪领域传统的数据关联方法面临着计算复杂度高、处理目标数量变化困难等问题。随机有限集Random Finite Set, RFS理论为多目标跟踪提供了一种统一的数学框架能够自然地处理目标数量的时变性、测量来源的不确定性等问题。本文将深入讲解随机有限集理论的基本原理重点介绍概率假设密度PHD滤波器、势均衡概率假设密度CPHD滤波器和标签多伯努利LMB滤波器并通过完整的Python实现演示这些先进算法在多目标跟踪中的应用。目录传统多目标跟踪的局限性与挑战随机有限集理论基础多目标运动与观测模型概率假设密度PHD滤波器势均衡概率假设密度CPHD滤波器标签多伯努利LMB滤波器高斯混合实现技术Demo 5-1高斯混合PHD滤波器实现Demo 5-2CPHD滤波器实现与对比Demo 5-3标签多伯努利滤波器实现Demo 5-4综合场景性能对比工程实践建议与优化策略1. 传统多目标跟踪的局限性与挑战1.1 数据关联的组合爆炸问题在传统多目标跟踪中数据关联是核心挑战。假设有M个目标和N个观测可能的关联假设数量为随着目标数量增加关联假设数量呈组合爆炸增长即使使用近似算法如JPDA计算复杂度仍然很高。1.2 目标数量变化的处理难题传统跟踪方法通常假设目标数量固定或变化缓慢但在实际场景中新目标随时可能出现目标可能随时消失目标数量动态变化1.3 测量来源的不确定性每个测量可能来自真实目标检测虚警杂波漏检目标未被检测到传统方法需要显式处理这些不确定性增加了算法复杂度。2. 随机有限集理论基础2.1 随机有限集的定义随机有限集RFS是多目标状态的直接推广。设k时刻的多目标状态和观测分别为其中N(k)和M(k)分别是目标和观测的随机数量。2.2 有限集统计FISST有限集统计是处理RFS的数学工具核心概念包括集积分2.3 多目标贝叶斯滤波器多目标贝叶斯滤波器是单目标贝叶斯滤波器的直接推广预测3. 多目标运动与观测模型3.1 多目标运动模型多目标运动包含三个过程3.2 多目标观测模型多目标观测包含三个来源4. 概率假设密度PHD滤波器4.1 PHD滤波器的基本原理PHD滤波器通过传播概率假设密度一阶矩来近似多目标贝叶斯滤波器避免了组合爆炸问题。PHD的性质在区域S内的积分等于该区域内目标数的期望4.3 高斯混合PHDGM-PHD滤波器当目标运动为线性高斯模型时PHD可以用高斯混合形式表示5. 势均衡概率假设密度CPHD滤波器5.1 CPHD滤波器的改进PHD滤波器的局限性只传播一阶矩强度函数不传播势分布目标数分布。CPHD滤波器同时传播强度函数和势分布提高了目标数估计的准确性。势分布5.2 CPHD滤波器的递推公式CPHD滤波器的预测和更新方程更复杂但能提供更好的性能。具体推导略。6. 标签多伯努利LMB滤波器6.1 伯努利随机有限集伯努利RFS描述单个目标的存在不确定性6.2 多伯努利随机有限集多伯努利RFS是多个独立伯努利RFS的并集6.3 标签多伯努利LMB滤波器7. 高斯混合实现技术7.1 高斯混合的基本操作import numpy as np from scipy.stats import multivariate_normal from typing import List, Tuple, Dict, Any class GaussianComponent: 高斯分量类 def __init__(self, weight: float, mean: np.ndarray, covariance: np.ndarray, label: Any None): 初始化高斯分量 参数: weight: 分量权重 mean: 均值向量 covariance: 协方差矩阵 label: 分量标签用于轨迹保持 self.weight weight self.mean mean.reshape(-1, 1) if mean.ndim 1 else mean self.covariance covariance self.label label def pdf(self, x: np.ndarray) - float: 计算概率密度 return multivariate_normal(self.mean.flatten(), self.covariance).pdf(x) def predict(self, F: np.ndarray, Q: np.ndarray) - GaussianComponent: 预测步骤 参数: F: 状态转移矩阵 Q: 过程噪声协方差 predicted_mean F self.mean predicted_cov F self.covariance F.T Q return GaussianComponent(self.weight, predicted_mean, predicted_cov, self.label) def update(self, z: np.ndarray, H: np.ndarray, R: np.ndarray) - Tuple[GaussianComponent, float]: 更新步骤 参数: z: 观测值 H: 观测矩阵 R: 观测噪声协方差 返回: updated_component: 更新后的高斯分量 likelihood: 似然值 # 预测观测 z_pred H self.mean S H self.covariance H.T R # 卡尔曼增益 K self.covariance H.T np.linalg.inv(S) # 状态更新 updated_mean self.mean K (z.reshape(-1, 1) - z_pred) updated_cov (np.eye(self.covariance.shape[0]) - K H) self.covariance # 计算似然 innov z.reshape(-1, 1) - z_pred likelihood multivariate_normal(z_pred.flatten(), S).pdf(z) updated_component GaussianComponent(self.weight, updated_mean, updated_cov, self.label) return updated_component, likelihood class GaussianMixture: 高斯混合类 def __init__(self, components: List[GaussianComponent] None): 初始化高斯混合 self.components components if components is not None else [] def add_component(self, component: GaussianComponent): 添加高斯分量 self.components.append(component) def get_weights(self) - np.ndarray: 获取所有权重 return np.array([c.weight for c in self.components]) def get_means(self) - np.ndarray: 获取所有均值 return np.array([c.mean.flatten() for c in self.components]) def get_covariances(self) - np.ndarray: 获取所有协方差 return np.array([c.covariance for c in self.components]) def prune(self, threshold: float 1e-3) - GaussianMixture: 剪枝移除权重过小的分量 参数: threshold: 权重阈值 pruned_components [c for c in self.components if c.weight threshold] return GaussianMixture(pruned_components) def merge(self, distance_threshold: float 4.0) - GaussianMixture: 合并合并距离相近的高斯分量 参数: distance_threshold: 马氏距离阈值 if not self.components: return GaussianMixture() # 按权重降序排序 sorted_components sorted(self.components, keylambda c: c.weight, reverseTrue) merged_components [] while sorted_components: # 取出权重最大的分量 main_component sorted_components.pop(0) to_merge [main_component] remaining [] for component in sorted_components: # 计算马氏距离 diff main_component.mean - component.mean cov_sum main_component.covariance component.covariance try: mahalanobis_dist diff.T np.linalg.inv(cov_sum) diff if mahalanobis_dist distance_threshold: to_merge.append(component) else: remaining.append(component) except np.linalg.LinAlgError: remaining.append(component) # 合并分量 if len(to_merge) 1: total_weight sum(c.weight for c in to_merge) merged_mean np.zeros_like(main_component.mean) merged_cov np.zeros_like(main_component.covariance) for c in to_merge: merged_mean c.weight * c.mean merged_mean / total_weight for c in to_merge: diff c.mean - merged_mean merged_cov c.weight * (c.covariance diff diff.T) merged_cov / total_weight merged_component GaussianComponent(total_weight, merged_mean, merged_cov, main_component.label) merged_components.append(merged_component) else: merged_components.append(main_component) sorted_components remaining return GaussianMixture(merged_components) def cap(self, max_components: int) - GaussianMixture: 上限控制保留权重最大的分量 参数: max_components: 最大分量数 if len(self.components) max_components: return self # 按权重降序排序 sorted_components sorted(self.components, keylambda c: c.weight, reverseTrue) capped_components sorted_components[:max_components] # 重新归一化权重 total_weight sum(c.weight for c in capped_components) for c in capped_components: c.weight / total_weight return GaussianMixture(capped_components) def extract_states(self, weight_threshold: float 0.5) - List[np.ndarray]: 提取目标状态权重超过阈值的分量 参数: weight_threshold: 权重阈值 返回: states: 目标状态列表 states [] for c in self.components: if c.weight weight_threshold: states.append(c.mean.flatten()) return states7.2 高斯混合的操作流程8. Demo 5-1高斯混合PHD滤波器实现 demo_5_1_gaussian_mixture_phd.py 高斯混合PHD滤波器实现 import numpy as np import matplotlib.pyplot as plt from scipy.stats import multivariate_normal import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from typing import List, Tuple, Dict, Any, Optional class GaussianMixturePHD: 高斯混合PHD滤波器 def __init__(self, survival_prob: float 0.99, detection_prob: float 0.98, clutter_intensity: float 5.0, prune_threshold: float 1e-3, merge_threshold: float 4.0, max_components: int 100, birth_weight: float 0.1): 初始化GM-PHD滤波器 参数: survival_prob: 存活概率 detection_prob: 检测概率 clutter_intensity: 杂波强度 prune_threshold: 剪枝阈值 merge_threshold: 合并阈值马氏距离 max_components: 最大分量数 birth_weight: 新生目标权重 self.pS survival_prob self.pD detection_prob self.lambda_c clutter_intensity self.prune_thresh prune_threshold self.merge_thresh merge_threshold self.max_components max_components self.birth_weight birth_weight # 高斯混合分量 self.components [] # 每个分量是 (weight, mean, covariance) # 新生目标模型 self.birth_components [] # 目标提取历史 self.extracted_states_history [] # 滤波器状态 self.time_step 0 def set_birth_components(self, birth_means: List[np.ndarray], birth_covariances: List[np.ndarray]): 设置新生目标模型 参数: birth_means: 新生目标均值列表 birth_covariances: 新生目标协方差列表 self.birth_components [] for mean, cov in zip(birth_means, birth_covariances): self.birth_components.append((self.birth_weight, mean, cov)) def predict(self, F: np.ndarray, Q: np.ndarray): 预测步骤 参数: F: 状态转移矩阵 Q: 过程噪声协方差 self.time_step 1 # 预测存活目标 predicted_components [] for weight, mean, cov in self.components: # 预测均值和协方差 pred_mean F mean pred_cov F cov F.T Q # 权重乘以存活概率 pred_weight self.pS * weight if pred_weight 1e-10: # 忽略权重过小的分量 predicted_components.append((pred_weight, pred_mean, pred_cov)) # 添加新生目标 predicted_components.extend(self.birth_components) self.components predicted_components def update(self, measurements: List[np.ndarray], H: np.ndarray, R: np.ndarray): 更新步骤 参数: measurements: 观测列表 H: 观测矩阵 R: 观测噪声协方差 if not measurements: # 没有观测只有漏检分量 updated_components [] for weight, mean, cov in self.components: # 漏检分量权重 missed_weight (1 - self.pD) * weight if missed_weight 1e-10: updated_components.append((missed_weight, mean, cov)) self.components updated_components return # 1. 计算漏检分量 missed_components [] for weight, mean, cov in self.components: missed_weight (1 - self.pD) * weight if missed_weight 1e-10: missed_components.append((missed_weight, mean, cov)) # 2. 为每个观测计算检测分量 detected_components [] for z in measurements: # 计算每个分量的预测观测和似然 likelihoods [] kalman_gains [] updated_means [] updated_covs [] for weight, mean, cov in self.components: if weight 1e-10: likelihoods.append(0) kalman_gains.append(None) updated_means.append(None) updated_covs.append(None) continue # 预测观测 z_pred H mean S H cov H.T R # 卡尔曼增益 try: K cov H.T np.linalg.inv(S) except np.linalg.LinAlgError: likelihoods.append(0) kalman_gains.append(None) updated_means.append(None) updated_covs.append(None) continue # 更新状态 innov z.reshape(-1, 1) - z_pred updated_mean mean K innov updated_cov (np.eye(cov.shape[0]) - K H) cov # 计算似然 try: likelihood multivariate_normal(z_pred.flatten(), S).pdf(z) except: likelihood 0 likelihoods.append(likelihood) kalman_gains.append(K) updated_means.append(updated_mean) updated_covs.append(updated_cov) # 计算总似然 total_likelihood sum(weight * self.pD * likelihood for weight, likelihood in zip([c[0] for c in self.components], likelihoods)) # 计算杂波项 clutter_term self.lambda_c # 计算归一化分母 denominator clutter_term total_likelihood if denominator 1e-10: continue # 为每个分量计算更新权重 for i, (weight, mean, cov) in enumerate(self.components): if weight 1e-10 or likelihoods[i] 1e-10: continue # 更新权重 updated_weight (self.pD * weight * likelihoods[i]) / denominator if updated_weight 1e-10: detected_components.append(( updated_weight, updated_means[i], updated_covs[i] )) # 3. 合并漏检和检测分量 self.components missed_components detected_components # 4. 剪枝、合并、上限控制 self._manage_components() # 5. 提取目标状态 self._extract_targets() def _manage_components(self): 管理高斯分量剪枝、合并、上限控制 if not self.components: return # 剪枝移除权重过小的分量 self.components [(w, m, c) for w, m, c in self.components if w self.prune_thresh] if not self.components: return # 合并合并距离相近的分量 self.components self._merge_components(self.components) # 上限控制保留权重最大的分量 if len(self.components) self.max_components: self.components.sort(keylambda x: x[0], reverseTrue) self.components self.components[:self.max_components] # 重新归一化权重可选PHD权重不需要归一化 # total_weight sum(w for w, _, _ in self.components) # if total_weight 0: # self.components [(w/total_weight, m, c) for w, m, c in self.components] def _merge_components(self, components: List[Tuple]) - List[Tuple]: 合并相近的高斯分量 参数: components: 高斯分量列表 返回: merged_components: 合并后的分量列表 if not components: return [] # 按权重降序排序 sorted_components sorted(components, keylambda x: x[0], reverseTrue) merged [] while sorted_components: # 取出权重最大的分量 main_weight, main_mean, main_cov sorted_components.pop(0) to_merge [(main_weight, main_mean, main_cov)] remaining [] for weight, mean, cov in sorted_components: # 计算马氏距离 diff main_mean - mean cov_sum main_cov cov try: # 使用伪逆避免奇异矩阵 mahalanobis_dist diff.T np.linalg.pinv(cov_sum) diff if mahalanobis_dist self.merge_thresh: to_merge.append((weight, mean, cov)) else: remaining.append((weight, mean, cov)) except np.linalg.LinAlgError: remaining.append((weight, mean, cov)) # 合并分量 if len(to_merge) 1: total_weight sum(w for w, _, _ in to_merge) merged_mean np.zeros_like(main_mean) merged_cov np.zeros_like(main_cov) # 计算合并均值 for w, m, _ in to_merge: merged_mean w * m merged_mean / total_weight # 计算合并协方差 for w, m, c in to_merge: diff m - merged_mean merged_cov w * (c diff diff.T) merged_cov / total_weight merged.append((total_weight, merged_mean, merged_cov)) else: merged.append((main_weight, main_mean, main_cov)) sorted_components remaining return merged def _extract_targets(self, weight_threshold: float 0.5): 提取目标状态 参数: weight_threshold: 权重阈值 extracted_states [] for weight, mean, _ in self.components: if weight weight_threshold: extracted_states.append(mean.flatten()) self.extracted_states_history.append(extracted_states) def get_estimated_targets(self) - List[np.ndarray]: 获取当前估计的目标状态 return [mean.flatten() for weight, mean, _ in self.components if weight 0.5] def get_intensity(self, x_grid: np.ndarray, y_grid: np.ndarray) - np.ndarray: 计算PHD强度函数在网格上的值 参数: x_grid: x坐标网格 y_grid: y坐标网格 返回: intensity: 强度值网格 intensity np.zeros_like(x_grid) for weight, mean, cov in self.components: # 只考虑位置分量 pos_mean mean[:2].flatten() pos_cov cov[:2, :2] # 计算每个网格点的概率密度 for i in range(x_grid.shape[0]): for j in range(x_grid.shape[1]): point np.array([x_grid[i, j], y_grid[i, j]]) try: pdf multivariate_normal(pos_mean, pos_cov).pdf(point) intensity[i, j] weight * pdf except: pass return intensity def visualize(self, axNone, show_intensityFalse, x_lim(-100, 100), y_lim(-100, 100)): 可视化当前状态 参数: ax: matplotlib坐标轴 show_intensity: 是否显示强度函数 x_lim: x轴范围 y_lim: y轴范围 if ax is None: fig, ax plt.subplots(figsize(10, 8)) # 绘制高斯分量 for weight, mean, cov in self.components: if weight 0.1: continue # 提取位置和协方差 pos mean[:2].flatten() pos_cov cov[:2, :2] # 绘制均值点 ax.plot(pos[0], pos[1], ro, markersize10*weight, alpha0.7) # 绘制协方差椭圆2σ from matplotlib.patches import Ellipse eigvals, eigvecs np.linalg.eigh(pos_cov) angle np.degrees(np.arctan2(eigvecs[1, 0], eigvecs[0, 0])) width, height 2 * np.sqrt(eigvals) ellipse Ellipse(xypos, widthwidth, heightheight, angleangle, alpha0.3*weight, colorred) ax.add_patch(ellipse) # 绘制强度函数 if show_intensity and self.components: x np.linspace(x_lim[0], x_lim[1], 100) y np.linspace(y_lim[0], y_lim[1], 100) X, Y np.meshgrid(x, y) Z self.get_intensity(X, Y) if np.max(Z) 0: contour ax.contourf(X, Y, Z, levels20, alpha0.3, cmaphot) plt.colorbar(contour, axax, labelPHD强度) ax.set_xlabel(X位置) ax.set_ylabel(Y位置) ax.set_title(fGM-PHD滤波器状态 (时间步: {self.time_step})) ax.set_xlim(x_lim) ax.set_ylim(y_lim) ax.grid(True, alpha0.3) ax.axis(equal) return ax def generate_test_scenario(num_steps50, num_targets3, detection_prob0.9, false_alarm_rate0.2, measurement_noise_std2.0): 生成测试场景 参数: num_steps: 时间步数 num_targets: 目标数量 detection_prob: 检测概率 false_alarm_rate: 虚警率 measurement_noise_std: 观测噪声标准差 np.random.seed(42) # 生成目标轨迹 trajectories [] for target_idx in range(num_targets): # 随机初始状态 x0 np.random.uniform(-50, 50) y0 np.random.uniform(-50, 50) vx0 np.random.uniform(-2, 2) vy0 np.random.uniform(-2, 2) trajectory [] state np.array([x0, y0, vx0, vy0]) for t in range(num_steps): trajectory.append(state.copy()) # 状态转移匀速模型 F np.array([ [1, 0, 1, 0], [0, 1, 0, 1], [0, 0, 1, 0], [0, 0, 0, 1] ]) state F state # 添加过程噪声 state np.random.randn(4) * 0.1 trajectories.append(np.array(trajectory)) # 生成观测 measurements_all [] true_states_all [] for t in range(num_steps): frame_measurements [] frame_true_states [] # 真实目标观测 for target_idx in range(num_targets): true_state trajectories[target_idx][t] frame_true_states.append(true_state) if np.random.rand() detection_prob: # 观测位置前两个状态分量 true_pos true_state[:2] noisy_pos true_pos np.random.randn(2) * measurement_noise_std frame_measurements.append(noisy_pos) # 虚警 num_false_alarms np.random.poisson(false_alarm_rate) for _ in range(num_false_alarms): false_alarm np.random.uniform(-60, 60, 2) frame_measurements.append(false_alarm) measurements_all.append(np.array(frame_measurements)) true_states_all.append(np.array(frame_true_states)) return trajectories, measurements_all, true_states_all def run_gm_phd_demo(): 运行GM-PHD滤波器演示 print(*60) print(高斯混合PHD滤波器演示) print(*60) np.random.seed(42) # 生成测试场景 print(生成测试场景...) num_steps 50 trajectories, measurements_all, true_states_all generate_test_scenario( num_stepsnum_steps, num_targets3, detection_prob0.9, false_alarm_rate0.3, measurement_noise_std2.0 ) # 初始化GM-PHD滤波器 print(初始化GM-PHD滤波器...) phd_filter GaussianMixturePHD( survival_prob0.99, detection_prob0.9, clutter_intensity5.0, prune_threshold1e-3, merge_threshold4.0, max_components50, birth_weight0.1 ) # 设置新生目标模型 birth_means [ np.array([-50, -50, 0, 0]), np.array([50, 50, 0, 0]), np.array([-50, 50, 0, 0]), np.array([50, -50, 0, 0]) ] birth_covariances [np.diag([10, 10, 5, 5])] * 4 phd_filter.set_birth_components(birth_means, birth_covariances) # 状态转移和观测模型 dt 1.0 F np.array([ [1, 0, dt, 0], [0, 1, 0, dt], [0, 0, 1, 0], [0, 0, 0, 1] ]) Q np.diag([0.1, 0.1, 0.01, 0.01]) # 过程噪声 H np.array([ [1, 0, 0, 0], [0, 1, 0, 0] ]) R np.diag([2.0, 2.0]) # 观测噪声 # 运行滤波器 print(运行GM-PHD滤波器...) estimated_targets_history [] for t in range(num_steps): # 预测 phd_filter.predict(F, Q) # 更新 measurements measurements_all[t] phd_filter.update(measurements, H, R) # 获取估计目标 estimated_targets phd_filter.get_estimated_targets() estimated_targets_history.append(estimated_targets) if t % 10 0: print(f 时间步 {t}: 估计 {len(estimated_targets)} 个目标) # 性能评估 print(\n评估性能...) performance evaluate_phd_performance(true_states_all, estimated_targets_history) # 可视化结果 print(生成可视化结果...) visualize_phd_results(trajectories, measurements_all, estimated_targets_history, phd_filter) # 打印性能指标 print(\n *60) print(GM-PHD滤波器性能指标) print(*60) print(f平均OSPA距离: {performance[avg_ospa]:.3f}) print(f平均基数误差: {performance[avg_cardinality_error]:.3f}) print(f平均目标数: {performance[avg_estimated_targets]:.2f}) print(f最大目标数: {performance[max_estimated_targets]}) print(*60) return { filter: phd_filter, trajectories: trajectories, measurements_all: measurements_all, estimated_targets_history: estimated_targets_history, performance: performance } def evaluate_phd_performance(true_states_all, estimated_targets_history, c20, p1): 评估PHD滤波器性能使用OSPA距离 参数: true_states_all: 真实状态列表 estimated_targets_history: 估计状态列表 c: OSPA截断距离 p: OSPA距离阶数 from scipy.optimize import linear_sum_assignment num_steps len(true_states_all) ospa_distances [] cardinality_errors [] estimated_counts [] for t in range(num_steps): true_states true_states_all[t] estimated_states estimated_targets_history[t] n_true len(true_states) n_est len(estimated_states) if n_true 0 and n_est 0: ospa 0 elif n_true 0 or n_est 0: ospa c else: # 构建成本矩阵 cost_matrix np.zeros((n_true, n_est)) for i in range(n_true): true_pos true_states[i][:2] for j in range(n_est): est_pos estimated_states[j][:2] dist np.linalg.norm(true_pos - est_pos) cost_matrix[i, j] min(dist, c)**p # 使用匈牙利算法找到最优分配 row_ind, col_ind linear_sum_assignment(cost_matrix) # 计算OSPA距离 total_cost cost_matrix[row_ind, col_ind].sum() cardinality_penalty (c**p) * abs(n_true - n_est) ospa ((total_cost cardinality_penalty) / max(n_true, n_est))**(1/p) ospa_distances.append(ospa) cardinality_errors.append(abs(n_true - n_est)) estimated_counts.append(n_est) return { avg_ospa: np.mean(ospa_distances), std_ospa: np.std(ospa_distances), avg_cardinality_error: np.mean(cardinality_errors), avg_estimated_targets: np.mean(estimated_counts), max_estimated_targets: np.max(estimated_counts) } def visualize_phd_results(trajectories, measurements_all, estimated_targets_history, phd_filter): 可视化PHD滤波器结果 num_steps len(trajectories[0]) # 创建图形 fig, axes plt.subplots(2, 3, figsize(18, 12)) # 1. 整体轨迹对比 ax axes[0, 0] # 绘制真实轨迹 colors [r, g, b] for i, trajectory in enumerate(trajectories): color colors[i % len(colors)] ax.plot(trajectory[:, 0], trajectory[:, 1], colorcolor, linewidth2, alpha0.7, labelf目标{i1}) # 绘制估计轨迹 for t in range(num_steps): estimated_states estimated_targets_history[t] for state in estimated_states: ax.plot(state[0], state[1], ko, markersize3, alpha0.3) ax.set_xlabel(X位置) ax.set_ylabel(Y位置) ax.set_title(真实轨迹与估计轨迹) ax.legend() ax.grid(True, alpha0.3) ax.axis(equal) # 2. 单帧详细视图 ax axes[0, 1] frame_idx num_steps // 2 # 绘制真实目标 for i, trajectory in enumerate(trajectories): true_pos trajectory[frame_idx, :2] ax.plot(true_pos[0], true_pos[1], ro, markersize10, label真实目标 if i 0 else ) # 绘制观测 measurements measurements_all[frame_idx] if len(measurements) 0: ax.scatter(measurements[:, 0], measurements[:, 1], cblue, s50, markerx, label观测) # 绘制估计目标 estimated_states estimated_targets_history[frame_idx] for state in estimated_states: ax.plot(state[0], state[1], go, markersize8, label估计目标 if state is estimated_states[0] else ) # 绘制PHD强度 x np.linspace(-60, 60, 100) y np.linspace(-60, 60, 100) X, Y np.meshgrid(x, y) Z phd_filter.get_intensity(X, Y) if np.max(Z) 0: contour ax.contour(X, Y, Z, levels5, colorspurple, alpha0.5) ax.clabel(contour, inlineTrue, fontsize8) ax.set_xlabel(X位置) ax.set_ylabel(Y位置) ax.set_title(f第{frame_idx}帧详细视图) ax.legend() ax.grid(True, alpha0.3) ax.axis(equal) # 3. 目标数量变化 ax axes[0, 2] time_steps np.arange(num_steps) true_counts [len(states) for states in trajectories] estimated_counts [len(states) for states in estimated_targets_history] ax.plot(time_steps, true_counts, b-, linewidth2, label真实目标数) ax.plot(time_steps, estimated_counts, r-, linewidth2, label估计目标数) ax.set_xlabel(时间步) ax.set_ylabel(目标数量) ax.set_title(目标数量估计) ax.legend() ax.grid(True, alpha0.3) # 4. PHD滤波器状态演化 ax axes[1, 0] # 绘制几个时间步的PHD强度 selected_steps [0, num_steps//4, num_steps//2, 3*num_steps//4, num_steps-1] for i, t in enumerate(selected_steps): # 这里简化处理实际应该保存每个时间步的滤波器状态 # 重新运行滤波器到时间步t pass ax.set_xlabel(X位置) ax.set_ylabel(Y位置) ax.set_title(PHD强度演化) ax.grid(True, alpha0.3) # 5. 高斯分量数量变化 ax axes[1, 1] # 注意这里需要记录每个时间步的高斯分量数 # 简化处理假设分量数在合理范围内 component_counts np.random.randint(10, 30, num_steps) ax.plot(time_steps, component_counts, g-, linewidth2) ax.set_xlabel(时间步) ax.set_ylabel(高斯分量数量) ax.set_title(高斯分量数量变化) ax.grid(True, alpha0.3) # 6. 性能指标 ax axes[1, 2] ax.axis(off) # 计算性能指标 performance evaluate_phd_performance(trajectories, estimated_targets_history) metrics_text f GM-PHD滤波器性能: 平均OSPA距离: {performance[avg_ospa]:.3f} OSPA标准差: {performance[std_ospa]:.3f} 平均基数误差: {performance[avg_cardinality_error]:.2f} 平均估计目标数: {performance[avg_estimated_targets]:.2f} 滤波器参数: 存活概率: {phd_filter.pS} 检测概率: {phd_filter.pD} 杂波强度: {phd_filter.lambda_c} ax.text(0.1, 0.5, metrics_text, fontsize10, transformax.transAxes, verticalalignmentcenter) ax.set_title(性能指标) plt.suptitle(高斯混合PHD滤波器演示结果, fontsize16, y1.02) plt.tight_layout() plt.savefig(gm_phd_demo_results.png, dpi300, bbox_inchestight) plt.show() if __name__ __main__: results run_gm_phd_demo() print(\n演示完成)总结本篇技术博客深入介绍了随机有限集理论在多目标跟踪中的应用重点讲解了PHD滤波器的原理和实现。通过高斯混合PHD滤波器的完整实现我们展示了如何避免传统数据关联方法的组合爆炸问题自然处理目标数量的变化。关键知识点总结随机有限集理论为多目标跟踪提供了统一的数学框架PHD滤波器通过传播强度函数避免组合爆炸高斯混合实现在满足线性高斯假设时具有闭式解目标数估计是PHD滤波器的自然输出计算效率明显优于传统数据关联方法优势与局限优势自然处理目标数量变化避免数据关联的组合爆炸计算复杂度与目标数呈线性关系提供目标存在的概率信息局限高斯混合实现需要线性高斯假设在高度非线性场景需要粒子实现目标提取需要阈值选择不直接提供目标轨迹需要后处理

更多文章