Python时空序列分析
趋势分析可用于各种应用,例如水文学、气候学、城市测绘、农作物监测等。从全球或区域层面的粗到细尺度,各种网格数据(包括卫星观测、再分析数据和气候情景观测等)都可以免费用于时空监测和建模。除此之外,Mann-Kendall (MK) 趋势检验和 Sen 的斜率估计器等非参数方法能够提供时间和空间尺度数据的趋势及其显著性水平。
本教程在时间和网格数据中实现 Mann-Kendall 检验。此外,还根据这两个数据集(表格+图像)计算 Sen 的斜率值。本教程我们利用 2003 年至 2021 年基于卫星的重力恢复和气候实验 (GRACE) 陆地水储量异常 (TWSA) 数据(https://grace.jpl.nasa.gov/data/get-data/jpl_global_mascons/)进行趋势分析。我们使用三种类型的数据集概述了简单的步骤:
表格数据的时间趋势测试, 使用栅格图像平均值进行时间趋势测试, 利用栅格数据集的时间序列进行空间趋势检验
MK方法介绍
Mann-Kendall检验是一种用于检测时间序列中趋势变化的非参数统计方法,它不需要对数据分布做出任何假设,因此适用于各种类型的数据。
Mann-Kendall检验的假设是原假设:时间序列中不存在趋势变化,备择假设:时间序列中存在趋势变化。其基本思想是通过比较每对数据点之间的大小关系来检查序列中的趋势,然后根据秩和的正负性来确定趋势的方向。
Mann-Kendall检验的计算过程包括以下步骤:
1.对于长度为n的时间序列X,计算n(n-1)/2个变量关系差(d):
2.将所有d[i,j]的符号相乘,得到Sgn。即:
3.计算所有d[i,j]的绝对值的秩和,即:
4.计算标准正态分布的z统计量,即:
5.根据显著性水平选择拒绝原假设的临界值,如果z值小于临界值,则接受原假设,否则拒绝原假设。
在Mann-Kendall检验确定存在趋势后,可以使用Sen's slope来计算趋势的斜率。Sen's slope是一种计算趋势斜率的方法,它利用中位数差来估计趋势的斜率,与线性回归方法不同,Sen's slope不受异常值的影响,因此更适用于含有异常值的数据。
Sen's slope的计算方法如下:
1.对于长度为n的时间序列X,计算n(n-1)/2个变量关系差(d):
2.计算所有d[i,j]的绝对值的中位数,即:
3.计算Sen's slope,即:
其中,k = (n-1)/2
Sen's slope表示时间序列中单位时间的变化量。当Sen's slope为正数时,表示时间序列呈现上升趋势,当Sen's slope为负数时,表示时间序列呈现下降趋势。
简而言之,Sen's slope是所有点连线的中位数。
表格数据
# Import necessary libraries
import os # Operating system functions
import re # Regular expressions
import numpy as np # Numerical computing
import pandas as pd # Data manipulation and analysis
import rasterio # Raster data manipulation
import matplotlib.pyplot as plt # Plotting library
import matplotlib.colors as mcolors # Custom color maps
from pymannkendall import original_test # Mann-Kendall trend test
创建必要的函数(主要是排序函数)
# Function for natural key sorting
def atoi(text):
"""
Convert a string to an integer if possible.
Args:
text (str): The input string.
Returns:
int or str: The converted integer or the original string.
"""
return int(text) if text.isdigit() else text
def natural_keys(text):
"""
Convert a string to a list of integers and strings for natural sorting.
Args:
text (str): The input string.
Returns:
list: A list of integers and strings.
"""
return [atoi(c) for c in re.split(r'(\d+)', text)]
也可以使用pyMannKendall
提供了 13 种不同版本的 MK 测试,包括修改后的 Mann-Kendall 测试、多元 MK 测试、区域 MK 测试、相关 MK 测试、部分 MK 测试等。
这里我们使用原始 Mann-Kendall 检验(original_test):原始 Mann-Kendall 检验是一种非参数检验,不考虑序列相关性或季节性效应。
测试将返回一个包含命名参数的元组:
trend: 趋势 h: 如果存在趋势就是True p: 显著性检验的p-value, z: 归一化检验统计数据, Tau: Kendall Tau, s: Mann-Kendall’s 得分, var_s: 方差S, slope: Theil-Sen估计量/斜率, intercept: Kendall-Theil稳健线截距
def mk_test(data):
"""
Perform the Mann-Kendall test and obtain the slope, intercept, and p-value.
Args:
data (array_like): The input data for the test.
Returns:
tuple: A tuple containing the slope, intercept, and p-value.
"""
# Perform the test and obtain the results
result = original_test(data)
# Extract slope, intercept, and p-value from the result
slope, intercept, p = result.slope, result.intercept, result.p
return slope, intercept, p
下面的代码对存储在 Excel 文件中的数据执行 Mann-Kendall 趋势检验。它自动执行对多列数据进行 Mann-Kendall 趋势检验的过程,然后将结果保存在 Excel 文件中以供进一步分析
# Define the file path
file_path = '/home/mw/input/TWSA9197/'
input_filename = 'MK data.xlsx'
# Combine the file path and input filename
input_file_path = os.path.join(file_path, input_filename)
# Read data from Excel file
data_frame = pd.read_excel(input_file_path, sheet_name='Sheet1', engine='openpyxl')
# Set index starting from 1
data_frame.index = range(1, len(data_frame) + 1)
# Initialize lists to hold results
mk_results = {
'Column': [], # List to store column names
'Slope': [], # List to store slope values
'P-value': [] # List to store p-values
}
# Perform the Mann-Kendall test on each column in the data frame
for column_name in data_frame.columns[1:]:
# Calculate the slope, intercept, and p-value for the current column
slope, intercept, p_value = mk_test(data_frame[column_name])
# Store the results in the lists
mk_results['Column'].append(column_name)
mk_results['Slope'].append(slope)
mk_results['P-value'].append(p_value)
# Create a DataFrame from the dictionary
results_df = pd.DataFrame(mk_results)
# Define the save path for the results
save_path = os.path.join(file_path, "MK Test")
# Create the output directory if it doesn't exist
if not os.path.exists(save_path):
os.makedirs(save_path)
# Define the output file path
output_filename = 'MK result.xlsx'
output_file_path = os.path.join(save_path, output_filename)
# Save the results to an Excel file
results_df.to_excel(output_file_path, index=False)
print("Mann-Kendall test results saved to", output_file_path)
data_frame
使用 matplotlib 绘制趋势
# Plot the trend using matplotlib
fig, ax = plt.subplots(figsize=(12, 4)) # Set the figure size
colors = ['red', 'black'] # Define colors for different columns
x = np.arange(len(data_frame))
index = data_frame.iloc[:, 0]
# Iterate over each column in the data frame
for i, column_name in enumerate(data_frame.columns[1:]):
slope, intercept, p_value = mk_test(data_frame[column_name]) # Perform the Mann-Kendall test
# Plot the data with corresponding label and color
ax.plot(x, data_frame[column_name], marker='o', label=f'{column_name} (Slope: {slope:.2f}, Intercept: {intercept:.2f}, P-value: {p_value:.2f})', color=colors[i % len(colors)]) # Use modulus to cycle through colors
if p_value < 0.05:
# Plot the trend line for significant trends
ax.plot(x, slope * data_frame.index + intercept, linestyle='--', color=colors[i % len(colors)])
# Set the x-axis ticks and labels
interval = 1
ax.set_xticks(x[::interval])
ax.set_xticklabels(index[::interval], rotation=45)
# Set the y-limits to start and end on the x-axis
ax.set_xlim(-1, len(index))
ax.set_ylabel('GRACE TWSA (cm)', fontsize=12)
ax.set_xlabel('Year', fontsize=12)
ax.set_title('Trend Analysis')
ax.legend() # Display legend
ax.grid(True) # Show grid
# Define the output file path
output_figure_name = 'MK_figure.png'
output_figure_path = os.path.join(file_path, output_figure_name)
# Save the graph as TIFF with 300 DPI
plt.show()
使用栅格图像平均值进行 MK 趋势测试
此代码计算 GeoTIFF 文件中存储的数据的平均值。它从文件中读取数据(不包括无数据值),然后计算平均值。结果与相应的年份一起存储在 DataFrame 中。
# Define a function to calculate the mean of a GeoTIFF file
def calculate_mean(tif_file):
try:
with rasterio.open(tif_file) as src:
data = src.read(1, masked=True) # Read data from GeoTIFF
nodata_value = src.nodatavals[0] # Get nodata value
# Remove nodata values
data_no_nodata = data.compressed() # Equivalent to data[~data.mask]
# Calculate mean value
mean_value = data_no_nodata.mean()
# Return the mean value
return mean_value
except Exception as e:
print(f"Error processing {tif_file}: {e}") # Print error message if an exception occurs
return np.nan # Return NaN if there is an error
# Base directory path
folder_path = '/home/mw/input/TWSA9197/'
# Initialize a dictionary to store results
results_dict = {'Year': []}
# Get list of GeoTIFF files in the current folder
tif_files = [f for f in os.listdir(folder_path) if f.endswith('.tif')]
# Sort files naturally
tif_files.sort(key=natural_keys)
# Initialize lists to store mean values
mean_list = []
# Iterate through files and calculate mean values
for tif_file in tif_files:
tif_path = os.path.join(folder_path, tif_file)
mean_value = calculate_mean(tif_path)
# Append mean value to the list
mean_list.append(mean_value)
results_dict['Year'].append(int(tif_file.rstrip('.tif')))
# Add mean values to results dictionary
results_dict['TWSA'] = mean_list
# Convert results dictionary to DataFrame
results_df = pd.DataFrame(results_dict)
dst_path = os.path.join(folder_path, 'result')
if not os.path.exists(dst_path):
os.makedirs(dst_path)
# Save DataFrame to Excel file
results_df.to_excel(os.path.join(dst_path, 'Raster mean value.xlsx'), index=False)
slope, intercept, p_value = mk_test(results_df.iloc[:, 1])
print(f'Slope: {round(slope, 3)}, intercept: {round(intercept, 3)}, p-value: {round(p_value, 3)}')
使用 matplotlib 绘制趋势
# Plot the trend using matplotlib
fig, ax = plt.subplots(figsize=(12, 4)) # Set the figure size
x = np.arange(len(results_df))
index = results_df.iloc[:, 0]
# Iterate over each column in the data frame
for column_name in results_df.columns[1:]:
slope, intercept, p_value = mk_test(results_df[column_name]) # Perform the Mann-Kendall test
# Plot the data with corresponding label and color
ax.plot(x, results_df[column_name], marker='^', label=f'{column_name} (Slope: {slope:.2f}, Intercept: {intercept:.2f}, P-value: {p_value:.2f})', color= 'blue')
if p_value < 0.05:
# Plot the trend line for significant trends
ax.plot(x, slope * results_df.index + intercept, linestyle='--', color= 'red')
# Set the x-axis ticks and labels
interval = 1
ax.set_xticks(x[::interval])
ax.set_xticklabels(index[::interval], rotation=45)
# Set the y-limits to start and end on the x-axis
ax.set_xlim(-1, len(index))
ax.set_ylabel('GRACE TWSA (cm)', fontsize=12)
ax.set_xlabel('Year', fontsize=12)
ax.set_title('Trend Analysis')
ax.legend() # Display legend
ax.grid(True) # Show grid
# Define the output file path
output_figure_name = 'MK_Spatial_figure.png'
output_figure_path = os.path.join(dst_path, output_figure_name)
# Save the graph as TIFF with 300 DPI
fig.savefig(output_figure_path, dpi=300, format='png', bbox_inches='tight')
利用栅格数据集的时间序列进行空间趋势检验
下面代码处理存储 GeoTIFF 文件的栅格数据。它从每个 GeoTIFF 文件中读取数据,对每个像素执行 Mann-Kendall 检验,计算斜率和 p 值,并将结果存储在单独的数组中。然后,代码将斜率和 p 值数组保存为新的 GeoTIFF 文件,并保留原始文件的元数据。
# Define the data path and list of input files
path_data = '/home/mw/input/TWSA9197/'
files_list = [f for f in os.listdir(path_data) if f.endswith(".tif")]
# Open a sample file to get metadata and nodata value
with rasterio.open(os.path.join(path_data, files_list[0])) as src:
profile = src.profile
nodata = src.nodatavals[0]
# Sort files naturally
files_list.sort(key=natural_keys)
array_path_list = [os.path.join(path_data, f) for f in files_list]
# Initialize an array to store raster data
raster_data = []
# Read each file and append its data to the raster_data list
for file in array_path_list:
with rasterio.open(file) as src:
data = src.read(1, masked=True)
raster_data.append(data)
# Convert list of rasters to a numpy array
raster_data = np.array(raster_data)
# Get the dimensions of the data
rows, cols = raster_data.shape[1], raster_data.shape[2]
# Define the save path for the results
save_path = os.path.join(path_data, "MK Test")
# Create the output directory if it doesn't exist
if not os.path.exists(save_path):
os.makedirs(save_path)
# Initialize arrays for slope and p-value
slope_array = np.full((rows, cols), nodata, dtype=np.float32)
p_array = np.full((rows, cols), nodata, dtype=np.float32)
# Perform the Mann-Kendall test on each pixel
for r in range(rows):
for c in range(cols):
# Exclude nodata values
if raster_data[0, r, c] != nodata:
slope,__, p = mk_test(raster_data[:, r, c])
slope_array[r, c] = slope
p_array[r, c] = p
# Update profile for the slope and p-value rasters
profile.update(
dtype=rasterio.float32,
count=1,
nodata=nodata
)
# Save the slope raster
slope_filename = os.path.join(save_path, "slope.tif")
with rasterio.open(slope_filename, "w", **profile) as dst:
dst.write(slope_array, 1)
# Save the p-value raster
p_filename = os.path.join(save_path, "p.tif")
with rasterio.open(p_filename, "w", **profile) as dst:
dst.write(p_array, 1)
print(f"Saved slope and p-value rasters to {save_path}")
使用 matplotlib 绘制趋势
# Define longitude and latitude arrays
lon = np.linspace(profile["transform"][2], profile["transform"][2] + cols * profile["transform"][0], cols)
lat = np.linspace(profile["transform"][5], profile["transform"][5] + rows * profile["transform"][4], rows)
# Create mesh grid for longitude and latitude
lon_grid, lat_grid = np.meshgrid(lon, lat)
# Set figure size
figsize = (10, 6)
# Plot slope
plt.figure(figsize=figsize)
plt.imshow(slope_array, extent=[lon.min(), lon.max(), lat.min(), lat.max()], cmap='jet', aspect='auto')
plt.colorbar(label='Slope (cm/yr)')
plt.xlabel('Longitude')
plt.ylabel('Latitude')
plt.title("Sen's Slope")
plt.savefig(os.path.join(save_path, 'slope.png'), dpi=300, format='png', bbox_inches='tight')
plt.show()
# Plot p-value
plt.figure(figsize=figsize)
plt.imshow(p_array, extent=[lon.min(), lon.max(), lat.min(), lat.max()], cmap='jet', aspect='auto')
plt.colorbar(label='p-value')
plt.xlabel('Longitude')
plt.ylabel('Latitude')
plt.title('p-value')
plt.savefig(os.path.join(save_path, 'p-value.png'), dpi=300, format='png', bbox_inches='tight')
plt.show()
根据 p 值绘制显著像素和非显著像素
# Plot significant p-values
plt.figure(figsize=figsize) # Set figure size here
# Set colorbar range
divnorm = mcolors.TwoSlopeNorm(vmin=0, vcenter=0.05, vmax=1)
# Colormap
colors = ['red', 'blue']
cmap = mcolors.ListedColormap(colors)
# Plot the p-values
plt.imshow(p_array, norm=divnorm, extent=[lon.min(), lon.max(), lat.min(), lat.max()], cmap=cmap)
plt.colorbar(label='p-values', ticks=[0, 0.05, 1])
plt.xlabel('Longitude')
plt.ylabel('Latitude')
plt.title('Significant P-values')
plt.savefig(os.path.join(save_path, 'significant_p-values.png'), dpi=300, format='png', bbox_inches='tight')
plt.show()
后台回复【MK】获取代码和示例数据