PyTorch+LSTM 预测发动机振动值

职场   2024-11-26 19:36   四川  
LSTM是为了解决传统神经网络中长期依赖问题而诞生的一种特殊的循环神经网络(RNN)结构。它就像一个聪明的大脑,最关键的部件是"门控机制",能够选择性记忆并智能地输出。核心是一个循环单元,主要由三个"门"组成:
1、遗忘门(Forget Gate):决定要忘记哪些过去的信息
2、输入门(Input Gate):决定要记住哪些新的信息
3、输出门(Output Gate):决定要输出哪些信息
简单来说,每个"门"本质上是一个神经网络层,通过使用sigmoid激活函数,使得输出在0-1之间:0意味着"完全不重要",1意味着"非常重要"。
一个“门”的基本结构如下:
图片来源:https://www.researchgate.net/publication/335975993
输入 → 权重矩阵 → 偏置项 → sigmoid激活 → 0-1输出
输入接收各种信息,包括上一时刻的隐藏状态和当前时刻的新输入;权重矩阵(W)决定每个输入的重要性,偏置项(b)用于调整激活阈值,来增加模型的表达能力,最终由sigmoid决策输出。数学表达大概可以这么理解:

遗忘门 = sigmoid(W_f * [上一时刻输出, 当前输入] + b_f)

输入门 = sigmoid(W_i * [上一时刻输出, 当前输入] + b_i)

输出门 = sigmoid(W_o * [上一时刻输出, 当前输入] + b_o)

其中,sigmoid函数的数学表达式为:f(x) = 1 / (1 + e^(-x)),e 是自然对数的底数,当x为实数时,输出始终在0和1之间(一个理想的非线性变换函数)。函数曲线如下图所示:

使用sigmoid函数的优点是:它可以将任意实数映射到[0,1]之间,平滑可微,易于理解。缺点也很明显:梯度饱和(当x很大或很小时梯度接近0),且计算量大。
此外,还有一个记忆单元更新,结合遗忘门和输入门来更新记忆单元状态。完整的公示是这样的:

这里不想花太多篇幅展开介绍LSTM的原理了,网上有很多资源可以自学。下面主要是想通过一个实战案例给大家演示一下如何用LSTM模型进行时间序列数据的预测。

对于时间序列数据,我们不能一上来就直接开始模型训练,然后在发现预测效果不佳时才去调整超参数进行优化。这种方法不仅耗时,而且可能无法取得理想的效果。正确的做法是,在开始模型训练之前,应该首先对数据进行深入的预分析,尝试探索其内在规律和特性,再去选择合适的模型。最常用的方法是通过统计分析和可视化来了解数据的分布、趋势、季节性和周期性等特征。打个比方,假如某A330飞机左发N3振动值偏高,我们想通过LSTM来捕捉数据的特性进而预测在未来某个时间点是不是会超限,为后续航班的调整(在合适的维修时机再安排排故)提供数据上的支持。这个案例我在前面的机器学习系列(9)| 梯度下降一文中尝试过,那时候不懂,梯度下降的主要作用其实是用来更新参数,而不是直接用来预测。
首先,我遍历了过去165个航段的QAR数据,提取了每个航段中左发N3振动值的最大值,得到时序趋势曲线如下:

然后,利用autocorrelation_plot 函数绘制出自相关性分析图:

从图中,可以看到该时序数据的自相关系数整体上比较接近于零,且只有少量滞后阶数的值超出显著区间。这说明时间序列的自相关性较弱,换句话说就是随机性较强。其实,我们从时域图中也能看出来振动值并没有呈现周期性或有明显的上升趋势。这种情况,其实是很难通过简单的时间依赖模式进行预测的。LSTM 模型虽然擅长捕捉时间序列的长时依赖性,但对于这种自相关性较弱的序列,建模效果可能并不理想。即便如此,到底行不行?还得实践过了才知道。具体预测步骤如下:
1、首先,创建一个自定义数据集类,将输入序列 X 和目标 y 封装成 PyTorch 数据集对象,以便与DataLoader配合使用,实现批处理、打乱数据等功能。
class TimeSeriesDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.FloatTensor(X)  # 转为 PyTorch FloatTensor
        self.y = torch.FloatTensor(y)

    def __len__(self):
        return len(self.X)  # 数据集的大小

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]  # 根据索引返回样本
2、然后,创建一个类继承nn.Module,定义 LSTM 网络结构:输入维度、隐藏层数和单元数、输出维度、全连接层、向前传播方法。
class LSTMPredictor(nn.Module):
    """
    LSTM 时间序列预测模型
    "
""
    def __init__(self, input_dim, hidden_dim, num_layers, output_dim):
        super(LSTMPredictor, self).__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        
        # 定义 LSTM 层
        self.lstm = nn.LSTM(
            input_dim,  # 输入维度
            hidden_dim,  # 隐藏层维度
            num_layers,  # LSTM 层数
            batch_first=True,  # 输入数据形状为 (batch, seq, feature)
            dropout=0.2  # 添加 Dropout 防止过拟合
        )
        
        # 定义全连接层,将 LSTM 输出映射到目标维度
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        # 初始化隐藏状态和细胞状态
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_dim).to(x.device)
        
        # LSTM 前向传播
        out, _ = self.lstm(x, (h0, c0))
        
        # 只提取最后一个时间步的输出
        out = self.fc(out[:, -1, :])
        return out
3、再定义一个数据预处理函数:用于归一化或标准化,差分处理。
def preprocess_data(data):
    # 1. 差分处理
    diff_data = np.diff(data, axis=0)
    
    # 2. 标准化
    scaler = StandardScaler()  
    scaled_data = scaler.fit_transform(diff_data)
    
    return scaled_data, scaler
4、接着定义一个转换数据格式函数创建输入-输出对),使其适合 LSTM模型训练的格式:
def create_sequences(data, seq_length):
    # 数据预处理:将时间序列数据转换为监督学习问题
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i + seq_length])  # 提取长度为 seq_length 的子序列
        y.append(data[i + seq_length])  # 子序列后一个值作为目标
    return np.array(X), np.array(y)

5、划分训练集和测试集:对于时间序列数据,为了保持数据的时序性和分布一致性,尽量避免简单地按比例划分。要不然就很可能出现下面这种情况,测试集中出现了训练集中未曾见过的模式或范围外的数据,从而导致模型预测结果较差。

本案例中,165个航段数据看似挺多,其实也就165个样本而已,数据量还是偏少。所以,分割策略我最终选择了随机划分的方式。

# 数据集划分
train_size = int(len(X) * 0.8)
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[train_size:]
绘制数据分割后的分布图是下面这样,是不是就好多了?

6、创建数据加载器并初始化模型:
# 创建数据加载器
train_loader = DataLoader(TimeSeriesDataset(X_train, y_train), batch_size=32, shuffle=True)  
test_loader = DataLoader(TimeSeriesDataset(X_test, y_test), batch_size=32, shuffle=False)  

# 初始化模型
model = LSTMPredictor(input_dim=1, hidden_dim=32, num_layers=2, output_dim=1).to(device)

7、定义损失函数和优化器,这里我选择了MSE和Adam:

criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
8、定义模型训练函数

前馈过程:先算y_pred,再计算损失;

反馈过程:先梯度归零,再反向传播;

最后:更新权重,同时计算每一轮训练内所有批次的平均损失。
def train_model(model, train_loader, criterion, optimizer, device):
    model.train()  
    total_loss = 0
    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        
        # 前向传播
        y_pred = model(X_batch)
        
        # 计算损失
        loss = criterion(y_pred, y_batch)
        
        # 反向传播和优化
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
    
    return total_loss / len(train_loader)
9、定义一个模型性能验证函数:
def validate_model(model, data_loader, criterion, device, prefix=""):
    model.eval()  # 切换到评估模式
    total_loss = 0
    predictions = []
    actuals = []
    
    with torch.no_grad():  # 关闭梯度计算
        for X_batch, y_batch in data_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            y_pred = model(X_batch)
            loss = criterion(y_pred, y_batch)
            total_loss += loss.item()
            
            predictions.extend(y_pred.cpu().numpy())
            actuals.extend(y_batch.cpu().numpy())
    
    predictions = np.array(predictions)
    actuals = np.array(actuals)
    avg_loss = total_loss / len(data_loader)
    
    # 计算R方
    r2 = r2_score(actuals, predictions)
    if prefix:
        print(f"{prefix} Loss: {avg_loss:.4f}, R^2: {r2:.4f}")
    
    return avg_loss, predictions, actual
10、训练模型并每10轮打印一次损失,用于验证模型性能:
# 训练模型
num_epochs = 50
train_losses = []
val_losses = []

for epoch in range(num_epochs):
    train_loss = train_model(model, train_loader, criterion, optimizer, device)
    val_loss, predictions, actuals = validate_model(model, test_loader, criterion, device)

    train_losses.append(train_loss)
    val_losses.append(val_loss)

    if (epoch + 1) % 10 == 0:
        print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}')
11、绘制训练过程损失曲线:
# 绘制训练过程损失曲线
plt.plot(train_losses, label='训练损失')
plt.plot(val_losses, label='验证损失')
plt.legend()
plt.title('损失曲线')
plt.show()
从图中可以看出,迭代至25轮后,训练集的损失虽然继续在下降,但测试集损失已经基本不变,甚至在40轮以后还有上升趋势,说明随着迭代轮数增多,反而会出现过拟合的情况。
12、可视化验证集的预测结果:
# 反标准化预测结果
predictions_rescaled = scaler.inverse_transform(predictions)
actuals_rescaled = scaler.inverse_transform(actuals)

# 绘制预测结果
plt.plot(actuals_rescaled, label='真实值')
plt.plot(predictions_rescaled, label='预测值', linestyle='dashed')
plt.legend()
plt.title('预测结果')
plt.show()

拟合效果是不是看起来还凑合?

13、可视化未来5天的预测值:
# 可视化预测结果
plt.figure(figsize=(12, 6))
plt.rcParams['font.family'] = 'SimHei'  # 设置中文字体
plt.rcParams['axes.unicode_minus'] = False  # 解决负号显示问题
# 绘制历史数据
plt.plot(pd.to_datetime(time_column), data, label='历史数据', color='blue')
# 绘制未来预测
plt.plot(future_dates, future_values, label='预测值', color='red', linestyle='--')
plt.title('历史数据和未来预测')
plt.xlabel('日期')
plt.ylabel('振动值')
plt.legend()
plt.grid(True)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

14、打印未来5天的预测值:

# 打印预测结果
print("\n未来5天的预测值:")
for date, value in zip(future_dates, future_values):
    print(f"{date.date()}: {value:.2f}")

未来5天的预测值:

2024-11-11: 2.37

2024-11-12: 2.45

2024-11-13: 2.63

2024-11-14: 2.60

2024-11-15: 2.81


15、打印评估模型:


train_loss, train_predictions, train_actuals = validate_model(model, train_loader, criterion, device, prefix="训练集")
test_loss, test_predictions, test_actuals = validate_model(model, test_loader, criterion, device, prefix="测试集")

# 计算全局指标
all_predictions = np.concatenate([train_predictions, test_predictions])
all_actuals = np.concatenate([train_actuals, test_actuals])
global_r2 = r2_score(all_actuals, all_predictions)
print(f"全局 R^2: {global_r2:.4f}"
训练集 Loss: 0.5140, R^2: 0.5138
测试集 Loss: 0.6171, R^2: 0.4008
全局 R^2: 0.4914

这里R² = 0.4914,表示模型解释了大约49.14%的数据变异性。怎么说呢,不是完全没有效果,但是依然有50%的数据变异性未被模型解释

我们也可以通过绘制预测值与真实值的散点图来评估模型的准确性,理想状态下,所有的数据点都应该落在红色虚线上:

在实际中,预测值不可能与真实值完全一致通过数据点围绕红色虚线布的紧密程度,我们就能大概了解模型的性能了


总结一下,对于小数据集,模型训练前的数据预分析很重要,数据集分割不合理会严重影响模型的预测效果。关于超参数调整,如果不是一些非常复杂的问题,不必增加太多的隐藏层数量或神经元数量,迭代的次数也不是越多越好。大家可以自己去尝试一下,改变学习率、训练轮数、批次大小,选择不同的损失函数和优化器,看看模型最终的预测效果有什么变化。

感谢大家对机务论坛的支持,关注机务论坛,倾听机务心声!航企优秀的方面必定宣传,不足的地方也必须指出,让领导们重视问题,解决问题,营造更好的机务维修环境。

征稿:
所见所闻,个人感悟,个人成长历程及感人故事。
特别征稿:我师傅的故事!
同时,征集劳动仲裁案例,分享案例,让更多的小伙伴能了解劳动纠纷的解决方式,通过劳动仲裁维护自己的合法权益。






评论区留言,同意的点赞
扫码添加小编微信
匿名爆料




民航机务论坛微信公众平台
改名为:机务论坛
发布最新行业动向 深入解读政策法规
开辟维修工程专栏 交流飞机排故经验
分享前沿技术应用 预测职业发展前景
行业大咖讲经布道 业界专家授业解惑
致力打造一流的民航机务朋友圈----机务论坛
关注机务论坛,倾听机务心声!
投稿邮箱:duanwei0615@163.com



机务论坛
民航机务论坛改名为:机务论坛 发布最新行业动向 深入解读政策法规 开辟维修工程专栏 交流飞机排故经验 分享前沿技术应用 预测职业发展前景 行业大咖讲经布道 业界专家授业解惑 致力打造一流的民航机务朋友圈----机务论坛
 最新文章