集合卡尔曼滤波ENKF属于数据同化的一种算法。
什么是数据同化(data assimilation)?
数据同化就是把观测值和模拟值合理地结合起来,取一个最佳值(为什么?因为观测值和模型值都是有误差的,通常假设误差都是正态分布的,通过数据同化能获得最优值)。
其中经典算法就是集合卡尔曼滤波(有趣的是,Evensen把ENKF的文章发在一个SCI四区期刊上,但是引用量超过5000次)。
基本的一些概念
(后面的计算会涉及到一些矩阵的运算,建议用手算理解整个流程,再跑代码)
方差
方差(variance)是描述单个随机变量的离散程度
是方差; 是第i个观测值; 是平均值; 是样本数量;标准差就是方差开方。
# 计算预报的不确定性
def calculate_uncertainty(forecasts):
mean = np.mean(forecasts)
variance = np.mean((forecasts - mean)**2)
return np.sqrt(variance) # 标准差
# 两组不同的预报
forecast1 = np.random.normal(25, 1, 10) # 较确定的预报
forecast2 = np.random.normal(25, 3, 10) # 较不确定的预报
print(f"预报1的不确定性: {calculate_uncertainty(forecast1):.2f}")
print(f"预报2的不确定性: {calculate_uncertainty(forecast2):.2f}")
#输出
#预报1的不确定性: 1.30
#预报2的不确定性: 2.15
协方差
协方差(covariance)是描述两个随机变量之间的关系,计算两个变量与各自平均值的差异乘积.
样本协方差:
总体协方差:
其中:
是第i个观测值
是X和Y的平均值
对于两个随机变量X和Y,协方差矩阵是一个2×2的对称矩阵
[Var(X) Cov(X,Y)]
[Cov(X,Y) Var(Y) ]
X的方差 (Var(X))[1,1]
位置,Y的方差 (Var(Y))[2,2]
位置。X和Y的协方差 (Cov(X,Y))在[1,2]
和[2,1]
位置,这两个值相等。
import numpy as np
# 生成两组原始数据
np.random.seed(42)
X = np.random.normal(0, 1, 1000) # 1000个标准正态分布随机数
Y = X * 0.8 + np.random.normal(0, 0.2, 1000) # 与X正相关的数据
# 手动计算协方差矩阵
def calculate_cov_matrix(x, y):
# 计算样本方差
var_x = np.var(x, ddof=1) # 使用 ddof=1 来计算样本方差
var_y = np.var(y, ddof=1) # 使用 ddof=1 来计算样本方差
# 计算样本协方差
cov_xy = np.mean((x - np.mean(x)) * (y - np.mean(y)))
# 构建协方差矩阵
return np.array([[var_x, cov_xy],
[cov_xy, var_y]])
# 计算并打印协方差矩阵
cov_matrix = calculate_cov_matrix(X, Y)
print("手动计算的协方差矩阵:")
print(cov_matrix)
# 计算相关系数
correlation = cov_matrix[0, 1] / np.sqrt(cov_matrix[0, 0] * cov_matrix[1, 1])
print(f"\n相关系数: {correlation:.3f}")
#输出
#手动计算的协方差矩阵:
#[[0.95886385 0.75843999]
# [0.75843999 0.64084244]]
#相关系数: 0.968
集合平均
集合平均通常被认为是最佳估计,单个成员可能有偏差,但平均能降低随机误差(这也是我们为什么要用集合卡尔曼的原因)。
正离差表示高于平均,负离差表示低于平均,所有离差之和为0。
def explain_deviations():
"""解释离差的性质"""
members = np.array([10, 12, 8, 11, 9])
mean = np.mean(members)
devs = members - mean
print("示例数据分析:")
for m, d in zip(members, devs):
print(f"成员值: {m:2d}, 离差: {d:5.2f}")
print(f"\n离差之和: {np.sum(devs):.2e}") # 应该接近0
explain_deviations()
#示例数据分析:
成员值: 10, 离差: 0.00
成员值: 12, 离差: 2.00
成员值: 8, 离差: -2.00
成员值: 11, 离差: 1.00
成员值: 9, 离差: -1.00
卡尔曼增益
卡尔曼增益K可以理解为一个"权重系数",值范围在0到1之间:接近1:更相信观测;接近0:更相信预报。用一个代码来测试:
def simple_kalman_gain(forecast_error, observation_error):
"""简单的卡尔曼增益计算"""
K = forecast_error / (forecast_error + observation_error)
return K
# 示例
def demonstrate_kalman_gain():
cases = [
(2.0, 1.0), # 预报误差大,观测误差小
(1.0, 2.0), # 预报误差小,观测误差大
(1.0, 1.0) # 误差相等
]
for f_err, o_err in cases:
K = simple_kalman_gain(f_err, o_err)
print(f"\n预报误差: {f_err}, 观测误差: {o_err}")
print(f"卡尔曼增益: {K:.2f}")
print(f"解释: {'更相信观测' if K > 0.5 else '更相信预报'}")
demonstrate_kalman_gain()
卡尔曼增益决定了我们应该相信预报还是观测的程度,公式为
:卡尔曼增益矩阵;:背景误差协方差矩阵,计算公式为;
:观测算子,目的是将模式空间映射到观测空间的矩阵;
:观测误差协方差矩阵;
分子:表示背景场的不确定性;分母表示总的不确定性(背景场 + 观测);
n是集合的样本数量,m是观测点数量,k是背景点的数量,则维度是K*K维度,
是m*k
分子是k*m个维度,
分母是m*m个维度,
所以卡尔曼增益维度为k*m
卡尔曼滤波的状态更新
每个集合成员都会单独更新:
: 第i个集合成员的背景场(所有格点都有值)
: 观测值(只在观测点有值)
观测算子,将模式空间的值映射到观测空间,在观测点为1,其他点为0, 就是提取出观测位置的背景场值
: 卡尔曼增益,H在非观测点是0,但K通过Pb传播了观测信息到所有格点,反映了每个格点受观测的影响程度
: 创新项,只在观测点计算,通过K传播到其他格点,
可以根据我的代码跑一遍,因为每一步都涉及到了公式,要理解这里面的每一个过程,会涉及一些线性代数的知识(矩阵求逆、二次型矩阵等)。
我刚开始使用的草稿纸计算的,根据公式在算的过程中就开始理解它的内在含义。
一般来说,集合数量要大于30,提高维度计算量就太大了,用python来做。这里为了测试,集合成员数量设置为了4。
import numpy as np
n = 4 # 集合成员数量
m = 3 # 观测值的数量
k = 5 # 背景值的数量(看做模型预测的数量)
# 假设观测向量的三个值分别为:
y = np.array([
1.20,
3.10,
5.00,
])
# 创建原始数据数组
data = np.array([
[1.00, 2.00, 3.00, 4.00, 5.00],
[1.10, 1.90, 3.20, 3.80, 4.90],
[0.90, 2.10, 2.80, 4.20, 5.10],
[1.05, 1.95, 3.10, 4.05, 5.05]
])
print("X1到X5在4个样本分别为")
for i, row in enumerate(data):
for val in row:
print(f"{val}", end=" ")
print()
# 计算每列的均值
means = np.mean(data, axis=0)
# 打印结果
for i, mean in enumerate(means):
print(f"x{i+1}的均值: {mean:.4f}")
# 计算偏差 (每个值减去该列的均值)
deviations = data - means
# 打印偏差矩阵
for i, row in enumerate(deviations):
print(f"子集{i+1}的偏差:", end=" ")
for val in row:
print(f"{val:.4f}", end=" ")
print()
# 计算协方差矩阵
Pb = np.zeros((5, 5)) # 创建5x5的零矩阵
# 使用公式计算每个元素
for i in range(5):
for j in range(5):
# 计算 P[i,j] = 1/(n-1) * sum(δk,i * δk,j)
sum_product = 0
for k in range(n):
sum_product += deviations[k,i] * deviations[k,j]
Pb[i,j] = sum_product / (n-1)
# 打印结果矩阵
print("协方差矩阵 Pb:")
for row in Pb:
for val in row:
print(f"{val:.6f}", end=" ")
print()
# 现在给一个观测算子H的矩阵,1代表着在哪个格子有观测值,大小为m*k,3个观测值,5个背景值
H=np.array([
[1, 0, 0, 0, 0],
[0, 0, 1, 0, 0],
[0, 0, 0, 0, 1],
])
# 计算分子:卡尔曼系数$K = \frac{P^b H^T}{HP^b H^T + R}$的分子:P^b H^T
PbHt=Pb@H.T
print("计算分子:卡尔曼系数 P^b*H^T\n",PbHt)
# 计算分母:卡尔曼系数$K = \frac{P^b H^T}{HP^b H^T + R}$的分母:HP^b H^T + R
# 假设观测误差独立且方差相同,因此 𝑅 为对角矩阵,大小为m*m
R=np.array([
[0.05, 0, 0, ],
[0, 0.05, 0, ],
[0, 0, 0.05, ],
])
all=H@Pb@H.T+R
print("计算分母: HP^b H^T + R:\n", all)
#计算卡尔曼的增益K
# 首先计算分母的逆
all_inv = np.linalg.inv(all)
print("计算分母的逆矩阵: all_inv:\n", all_inv)
# 然后进行矩阵乘法
K_cor = PbHt @ all_inv
# 卡尔曼增益的大小为背景值的数量*样本值的数量:k*m
print("卡尔曼增益 K_cor:\n", K_cor)
# 开始进行状态更新$x^a = x^b + K(y - Hx^b)$ ,x^b就是我们前面求到的4个子集的平均值
xb=means
print("H@xb:\n", H@xb)
xa=xb+K_cor@(y-H@xb)
print("卡尔曼的结果 xa:\n", xa)
集合卡尔曼滤波的状态更新
对于集合卡尔曼滤波(Ensemble Kalman Filter, EnKF) ,输出不仅仅是一个分析均值,而是一个经过观测更新后的集合。每个集合成员代表可能的系统状态,并且每个成员都会独立地进行更新。
是从观测误差协方差矩阵 R 中采样得到的扰动, 的均值为零向量; 的协方差矩阵为 ,即 。使用矩阵分解方法从标准正态分布中生成具有所需协方差的随机向量。例如,利用Cholesky分解 ,其中 是下三角矩阵,则:
其中,, 为单位矩阵。
对之前的代码添加扰动以及观测更新后的集合(而非均值)
import numpy as np
# 1. 定义初始参数
n = 4 # 集合成员数量
m = 3 # 观测数量
k = 5 # 状态变量数量
# 2. 观测数据
y = np.array([1.20, 3.10, 5.00])
# 3. 创建集合矩阵 A ∈ R^(k×n) # 注意:这里更改了维度说明
A = np.array([
[1.00, 2.00, 3.00, 4.00, 5.00],
[1.10, 1.90, 3.20, 3.80, 4.90],
[0.90, 2.10, 2.80, 4.20, 5.10],
[1.05, 1.95, 3.10, 4.05, 5.05]
]).T # 转置矩阵使其变为 5x4
print("Ensemble matrix A:")
print(A)
# 4. 计算集合平均 A
A_mean = np.mean(A, axis=1, keepdims=True) # 改为axis=1
print("\nEnsemble mean:")
print(A_mean)
# 5. 计算集合扰动矩阵 A' = A - A
A_prime = A - A_mean
print("\nEnsemble perturbation matrix A':")
print(A_prime)
# 6. 观测算子
H = np.array([
[1, 0, 0, 0, 0],
[0, 0, 1, 0, 0],
[0, 0, 0, 0, 1]
])
# 7. 生成观测扰动
R = np.array([
[0.05, 0, 0],
[0, 0.05, 0],
[0, 0, 0.05]
])
eps = np.random.multivariate_normal(np.zeros(m), R, n).T
print("\nObservation perturbations:")
print(eps)
# 8. 创建扰动观测集合
D = np.tile(y, (n, 1)).T + eps
print("\nPerturbed observations D:")
print(D)
# 9. 计算观测空间的创新向量
D_prime = D - H @ A # 现在维度应该匹配了
print("\nInnovation D':")
print(D_prime)
# 10. 计算分析步骤
# 计算 HA'
HA_prime = H @ A_prime
# 计算 (HA')(HA')^T + εε^T
C = HA_prime @ HA_prime.T + eps @ eps.T
# 计算分析增量
X = A_prime @ HA_prime.T @ np.linalg.inv(C) @ D_prime
# 更新集合
A_a = A + X
print("\nAnalyzed ensemble:")
print(A_a)
print("\nAnalyzed mean:")
print(np.mean(A_a, axis=1))
空间上的ENKF实现代码
import numpy as np
from scipy.interpolate import griddata
import matplotlib.pyplot as plt
plt.rcParams['font.sans-serif'] = ['SimHei'] # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False # 用来正常显示负号
np.random.seed(42)
# 1. 定义观测点坐标和真实温度值
known_points = np.array([
[20, 20], [20, 50], [20, 80],
[50, 20], [50, 50], [50, 80],
[80, 20], [80, 50], [80, 80]
])
true_temperatures = np.array([25, 26, 28, 26, 32, 30, 27, 29, 31])
# 2. 创建网格点
grid_x = np.linspace(0, 100, 100)
grid_y = np.linspace(0, 100, 100)
grid_x, grid_y = np.meshgrid(grid_x, grid_y)
# 3. IDW插值函数
def idw_interpolation(known_points, values, grid_x, grid_y, power=2):
result = np.zeros_like(grid_x)
for i in range(len(grid_x)):
for j in range(len(grid_y)):
distances = np.sqrt((known_points[:, 0] - grid_x[i, j])**2 +
(known_points[:, 1] - grid_y[i, j])**2)
distances[distances == 0] = 1e-10
weights = 1 / (distances ** power)
result[i, j] = np.sum(weights * values) / np.sum(weights)
return result
# 4. 生成真实场
true_field = idw_interpolation(known_points, true_temperatures, grid_x, grid_y, power=2)
# 5. 生成带误差的观测值
R = 0.1 # 观测误差标准差
observed_temperatures = true_temperatures + np.random.normal(0, R, size=true_temperatures.shape)
# 6. 生成背景场(添加随机噪声)
def generate_background_field(true_field, noise_level=0.3):
noise = np.random.normal(0, noise_level, true_field.shape)
return true_field + noise
# 7. 修改后的ENKF数据同化
def enkf_data_assimilation(true_field, known_points, observed_temperatures, ensemble_size=50):
# 状态空间维度
state_dim = grid_x.size
obs_dim = len(known_points)
# 初始化集合矩阵 A (state_dim × ensemble_size)
ensemble = np.zeros((state_dim, ensemble_size))
for i in range(ensemble_size):
ensemble[:, i] = generate_background_field(true_field).ravel()
# 构建观测算子 H (sparse matrix)
H = np.zeros((obs_dim, state_dim))
for i, (x, y) in enumerate(known_points):
H[i, int(y) * grid_x.shape[1] + int(x)] = 1
# 计算集合平均
A_mean = np.mean(ensemble, axis=1, keepdims=True)
# 计算集合扰动矩阵 A'
A_prime = ensemble - A_mean
# 生成观测扰动
R_matrix = np.eye(obs_dim) * R**2
eps = np.random.multivariate_normal(np.zeros(obs_dim), R_matrix, ensemble_size).T
# 创建扰动观测集合
D = np.tile(observed_temperatures, (ensemble_size, 1)).T + eps
# 计算观测空间的模式值
HA = H @ ensemble
# 计算创新向量
D_prime = D - HA
# 计算 HA'
HA_prime = H @ A_prime
# 计算分析步骤
# (HA')(HA')^T + εε^T
C = HA_prime @ HA_prime.T + eps @ eps.T
# 计算分析增量
X = A_prime @ HA_prime.T @ np.linalg.inv(C) @ D_prime
# 更新集合
A_a = ensemble + X
# 计算分析场均值
analysis = np.mean(A_a, axis=1).reshape(grid_x.shape)
return analysis, A_a
# 8. 执行同化
background_field = generate_background_field(true_field)
analysis_field, ensemble = enkf_data_assimilation(true_field, known_points, observed_temperatures)
# 9. 计算RMSE
def calculate_rmse(reference, estimate):
return np.sqrt(np.mean((reference - estimate)**2))
rmse_background = calculate_rmse(true_field, background_field)
rmse_analysis = calculate_rmse(true_field, analysis_field)
improvement = ((rmse_background - rmse_analysis) / rmse_background) * 100
# 10. 绘图
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
titles = ['真 实 场', '背景场', '分析场']
fields = [true_field, background_field, analysis_field]
for ax, field, title in zip(axes, fields, titles):
im = ax.imshow(field, origin='lower', extent=[0, 100, 0, 100],
cmap='RdBu_r', vmin=24, vmax=32)
ax.scatter(known_points[:, 0], known_points[:, 1], c='black',
marker='o', s=50, label='观测站')
ax.set_title(title)
ax.set_xlabel('X')
ax.set_ylabel('Y')
fig.colorbar(im, ax=ax)
plt.tight_layout()
plt.show()
print(f"背景场 RMSE: {rmse_background:.3f}°C")
print(f"分析场 RMSE: {rmse_analysis:.3f}°C")
print(f"改善程度: {improvement:.1f}%")
参考:
Evensen G. The ensemble Kalman filter: Theoretical formulation and practical implementation[J]. Ocean dynamics, 2003, 53: 343-367.
claude sonnet 3.5
chatgpt o1 mini