遗忘门 = sigmoid(W_f * [上一时刻输出, 当前输入] + b_f)
输入门 = sigmoid(W_i * [上一时刻输出, 当前输入] + b_i)
输出门 = sigmoid(W_o * [上一时刻输出, 当前输入] + b_o)
class TimeSeriesDataset(Dataset):
def __init__(self, X, y):
self.X = torch.FloatTensor(X) # 转为 PyTorch FloatTensor
self.y = torch.FloatTensor(y)
def __len__(self):
return len(self.X) # 数据集的大小
def __getitem__(self, idx):
return self.X[idx], self.y[idx] # 根据索引返回样本
class LSTMPredictor(nn.Module):
"""
LSTM 时间序列预测模型
"""
def __init__(self, input_dim, hidden_dim, num_layers, output_dim):
super(LSTMPredictor, self).__init__()
self.hidden_dim = hidden_dim
self.num_layers = num_layers
# 定义 LSTM 层
self.lstm = nn.LSTM(
input_dim, # 输入维度
hidden_dim, # 隐藏层维度
num_layers, # LSTM 层数
batch_first=True, # 输入数据形状为 (batch, seq, feature)
dropout=0.2 # 添加 Dropout 防止过拟合
)
# 定义全连接层,将 LSTM 输出映射到目标维度
self.fc = nn.Linear(hidden_dim, output_dim)
def forward(self, x):
# 初始化隐藏状态和细胞状态
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(x.device)
c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(x.device)
# LSTM 前向传播
out, _ = self.lstm(x, (h0, c0))
# 只提取最后一个时间步的输出
out = self.fc(out[:, -1, :])
return out
def preprocess_data(data):
# 1. 差分处理
diff_data = np.diff(data, axis=0)
# 2. 标准化
scaler = StandardScaler()
scaled_data = scaler.fit_transform(diff_data)
return scaled_data, scaler
def create_sequences(data, seq_length):
# 数据预处理:将时间序列数据转换为监督学习问题
X, y = [], []
for i in range(len(data) - seq_length):
X.append(data[i:i + seq_length]) # 提取长度为 seq_length 的子序列
y.append(data[i + seq_length]) # 子序列后一个值作为目标
return np.array(X), np.array(y)
5、划分训练集和测试集:对于时间序列数据,为了保持数据的时序性和分布一致性,尽量避免简单地按比例划分。要不然就很可能出现下面这种情况,测试集中出现了训练集中未曾见过的模式或范围外的数据,从而导致模型预测结果较差。
本案例中,165个航段数据看似挺多,其实也就165个样本而已,数据量还是偏少。所以,分割策略我最终选择了随机划分的方式。
# 数据集划分
train_size = int(len(X) * 0.8)
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[train_size:]
# 创建数据加载器
train_loader = DataLoader(TimeSeriesDataset(X_train, y_train), batch_size=32, shuffle=True)
test_loader = DataLoader(TimeSeriesDataset(X_test, y_test), batch_size=32, shuffle=False)
# 初始化模型
model = LSTMPredictor(input_dim=1, hidden_dim=32, num_layers=2, output_dim=1).to(device)
7、定义损失函数和优化器,这里我选择了MSE和Adam:
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
前馈过程:先算y_pred,再计算损失;
反馈过程:先梯度归零,再反向传播;
def train_model(model, train_loader, criterion, optimizer, device):
model.train()
total_loss = 0
for X_batch, y_batch in train_loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
# 前向传播
y_pred = model(X_batch)
# 计算损失
loss = criterion(y_pred, y_batch)
# 反向传播和优化
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
return total_loss / len(train_loader)
def validate_model(model, data_loader, criterion, device, prefix=""):
model.eval() # 切换到评估模式
total_loss = 0
predictions = []
actuals = []
with torch.no_grad(): # 关闭梯度计算
for X_batch, y_batch in data_loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
y_pred = model(X_batch)
loss = criterion(y_pred, y_batch)
total_loss += loss.item()
predictions.extend(y_pred.cpu().numpy())
actuals.extend(y_batch.cpu().numpy())
predictions = np.array(predictions)
actuals = np.array(actuals)
avg_loss = total_loss / len(data_loader)
# 计算R方
r2 = r2_score(actuals, predictions)
if prefix:
print(f"{prefix} Loss: {avg_loss:.4f}, R^2: {r2:.4f}")
return avg_loss, predictions, actual
# 训练模型
num_epochs = 50
train_losses = []
val_losses = []
for epoch in range(num_epochs):
train_loss = train_model(model, train_loader, criterion, optimizer, device)
val_loss, predictions, actuals = validate_model(model, test_loader, criterion, device)
train_losses.append(train_loss)
val_losses.append(val_loss)
if (epoch + 1) % 10 == 0:
print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')
# 绘制训练过程损失曲线
plt.plot(train_losses, label='训练损失')
plt.plot(val_losses, label='验证损失')
plt.legend()
plt.title('损失曲线')
plt.show()
# 反标准化预测结果
predictions_rescaled = scaler.inverse_transform(predictions)
actuals_rescaled = scaler.inverse_transform(actuals)
# 绘制预测结果
plt.plot(actuals_rescaled, label='真实值')
plt.plot(predictions_rescaled, label='预测值', linestyle='dashed')
plt.legend()
plt.title('预测结果')
plt.show()
拟合效果是不是看起来还凑合?
# 可视化预测结果
plt.figure(figsize=(12, 6))
plt.rcParams['font.family'] = 'SimHei' # 设置中文字体
plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题
# 绘制历史数据
plt.plot(pd.to_datetime(time_column), data, label='历史数据', color='blue')
# 绘制未来预测
plt.plot(future_dates, future_values, label='预测值', color='red', linestyle='--')
plt.title('历史数据和未来预测')
plt.xlabel('日期')
plt.ylabel('振动值')
plt.legend()
plt.grid(True)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
14、打印未来5天的预测值:
# 打印预测结果
print("\n未来5天的预测值:")
for date, value in zip(future_dates, future_values):
print(f"{date.date()}: {value:.2f}")未来5天的预测值:
2024-11-11: 2.37
2024-11-12: 2.45
2024-11-13: 2.63
2024-11-14: 2.60
2024-11-15: 2.81
15、打印R²评估模型:
train_loss, train_predictions, train_actuals = validate_model(model, train_loader, criterion, device, prefix="训练集")
test_loss, test_predictions, test_actuals = validate_model(model, test_loader, criterion, device, prefix="测试集")
# 计算全局指标
all_predictions = np.concatenate([train_predictions, test_predictions])
all_actuals = np.concatenate([train_actuals, test_actuals])
global_r2 = r2_score(all_actuals, all_predictions)
print(f"全局 R^2: {global_r2:.4f}"
训练集 Loss: 0.5140, R^2: 0.5138 测试集 Loss: 0.6171, R^2: 0.4008
这里R² = 0.4914,表示模型解释了大约49.14%的数据变异性。怎么说呢,不是完全没有效果,但是依然有50%的数据变异性未被模型解释
我们也可以通过绘制预测值与真实值的散点图来评估模型的准确性,理想状态下,所有的数据点都应该落在红色虚线上:
在实际中,预测值不可能与真实值完全一致。通过数据点围绕红色虚线分布的紧密程度,我们就能大概了解模型的性能了。
总结一下,对于小数据集,模型训练前的数据预分析很重要,数据集分割不合理会严重影响模型的预测效果。关于超参数调整,如果不是一些非常复杂的问题,不必增加太多的隐藏层数量或神经元数量,迭代的次数也不是越多越好。大家可以自己去尝试一下,改变学习率、训练轮数、批次大小,选择不同的损失函数和优化器,看看模型最终的预测效果有什么变化。