发动机剩余使用寿命预测(1)

职场   2024-12-17 15:26   四川  
C-MAPSS是由NASA开发并公开可用的一款仿真软件,能够模拟发动机在不同飞行条件下的运行情况,包括各种操作设置、环境条件和潜在的故障模式。在官方数据共享平台上:https://data.nasa.gov,有一个大型公开可用的数据集,包含了发动机从开始运行到故障发生的所有模拟数据。该数据集是一个多变量的时间序列,通过多个传感器通道记录的数值来表征故障演变。今天我们要做的就是利用该数据集来预测发动机的剩余使用寿命。
关于数据集的说明:
数据集四个部分:FD001、FD002、FD003和FD004。每个数据集包含不同数量的训练测试轨迹(时间序列)。并且已经划分好了测试集和训练集以及每台发动机真实的RUL,用于评估模型预测性能。
FD001:100个训练轨迹,100个测试轨迹
FD002260个训练迹,259个测试轨迹。
FD003:100个训练轨迹,100个测试轨迹
FD004:248个训练轨,249个测试轨迹。
FD001FD003是海平面条件下运行,而FD002FD004六种不同的条件下运行。所有数据都包含HPC退化的故障模式,而FD003FD004还包含风扇退化的故障模式
以FD001为例,训练轨迹指每台发动机在其运行周期内的时间序列数据。每个训练迹对应于一台发动运行数据。100 条训练轨迹就是 100 台发动机, 每台发动机的初始磨损制造工艺不同的,这些差异被视为正常情况,而不是故障条件
数据集中包含21个传感器记录的数值和不同operational setting,这些setting发动机性能有显著影响。在训练集中,模拟发动机从健康状态到故障状态的完整退化过程;而试集中,提供部分发动机的衰退轨迹,数据在故障发生之前随机截断(即未到真正发生故障的时间点)。测试模型根据当前的传感器值推断剩余使用寿命。

在github和kaggle中我看到很多大神用不同的模型实现预测的方法,这里借鉴他们的思路,自己重新走了一遍,加深一下对代码的理解,权当学习笔记,如有错误,还望大家指正。实现的思路如下:

1、提取时间序列特征:
退化信号(各个传感器数据),检查各传感器的相关性,删除冗余特征。
# 读取训练和测试数据
train_path = r"H:\剩余寿命预测\CMAPSSData\train_FD001.txt"
test_path = r"H:\剩余寿命预测\CMAPSSData\test_FD001.txt"
rul_path = r"H:\剩余寿命预测\CMAPSSData\RUL_FD001.txt"

fd_001_train = pd.read_csv(train_path, sep=" ", header=None)
fd_001_test = pd.read_csv(test_path, sep=" ", header=None)
truth_df = pd.read_csv(rul_path, header=None)

# 删除不需要的列
fd_001_train.drop(columns=[26, 27], inplace=True)
fd_001_test.drop(columns=[26, 27], inplace=True)

# 定义列名
columns = [
    'unit_number''time_in_cycles''setting_1''setting_2''TRA''T2''T24''T30''T50'
    'P2''P15''P30''Nf''Nc''epr''Ps30''phi''NRf''NRc''BPR'
    'farB''htBleed''Nf_dmd''PCNfR_dmd''W31''W32'
]
fd_001_train.columns = columns
fd_001_test.columns = columns

# 删除与发动机健康不相关的参数
fd_001_train.drop(columns=['Nf_dmd','PCNfR_dmd','P2','T2','TRA','farB','epr'],inplace=True)
fd_001_test.drop(columns=['Nf_dmd','PCNfR_dmd','P2','T2','TRA','farB','epr'],inplace=True)

# 绘制相关性热图,找出弱相关或冗余的参数
sns.heatmap(fd_001_train.corr(), annot=True, cmap='RdYlGn', linewidths=0.2)
fig=plt.gcf()
fig.set_size_inches(20,20)
plt.show()

# 删除与弱相关或冗余的参数
columns_to_drop = ['setting_1''setting_2''P15''NRc']
fd_001_train = fd_001_train.drop(columns=columns_to_drop)
fd_001_test = fd_001_test.drop(columns=columns_to_drop)
生成的相关性热图如下:
可以看出,Nc和NRc的相关性为0.96,为冗余的特征,任意删除其中一个即可。而setting_1、setting_2、P15的相关系数很小,因此也可以从特征变量中删除。

2、计算RUL:
在训练阶段,模型需要学习如何输入特中预测 RUL。但在训练集中并未有剩余使用寿命这一列,可我们已知训练集中包含了发动机从健康状态到故障状态的完整退化过程。所以在训练集中,为了构造RUL这一列,我们以发动机ID分组,先找出每个发动机time_in_cycles的最大值max,再用这个最大循环数减去当前的循环数,就可以得到RUL,即:RUL=max_life - 当前寿命周期
# 按发动机分组,计算训练集每个发动机time_in_cycles的最大值,并创建一个新的DataFrame: fd_RUL
fd_RUL = fd_001_train.groupby('unit_number')['time_in_cycles'].max().reset_index()
fd_RUL.columns = ['unit_number''max']

# 合并数据并计算RUL
train_df = fd_001_train.merge(fd_RUL, on=['unit_number'], how='left')
train_df['RUL'] = train_df['max'] - train_df['time_in_cycles']
train_df.drop(columns=['max'], inplace=True)

而在测试集中,我们使用外部数据(RUL_FD001.txt)提供的真实RUL值来评估模型的性能。

test_df = fd_001_test
rul = pd.DataFrame(test_df.groupby('unit_number')['time_in_cycles'].max()).reset_index()
rul.columns = ['unit_number','max']
# 将truth_df的列名设置为more
truth_df.columns = ['more']
truth_df['unit_number'] = truth_df.index + 1   # 为truth_df添加了一个新列unit_number,其值基于truth_df的索引加1
truth_df['max'] = rul['max'] + truth_df['more']    # 计算每台发动机的总寿命(max):测试集中已用的循环数+真实的RUL
truth_df.drop('more', axis=1, inplace=True)

# 将包含RUL信息的truth_df与测试数据集test_df进行左连接合并,基于unit_number列。这样,test_df中的每一行都会与truth_df中对应的RUL信息匹配
test_df = test_df.merge(truth_df, on=['unit_number'], how='left')
test_df['RUL'] = test_df['max'] - test_df['time_in_cycles']
test_df.drop('max', axis=1, inplace=True)
基于材料科学和可靠性工程的研究,部件寿命阶段的退化路径一般都符合先慢后快的趋势。部件的使用寿命初期,其性能退化通常是缓慢且相对线性的,性能下降与时间成正比。在这个阶段,部件可能因为正常的磨损和老化而逐渐失去效能,但这种退化通常不会立即导致故障。可当部件接近其设计寿命的末期或由于某些故障条件的影响,退化速率可能会突然增加,表现为非线性的加速退化。这个阶段的退化速度加快,部件的性能下降得更快,直到最终故障。因此,在训练集的RUL计算中,我这种用线性下降方式的建模思路,肯定是不严谨的。在网上还看到了有人用分段线性模型来计算RUL(用不同的线性段来近似部件退化的多个阶段,捕捉部件状态随时间变化的不同特征),我还没有去尝试,大家感兴趣的不妨自己试试啊。
此外,在论文《Damage Propagation Modeling for Aircraft Engine Run-to-Failure Simulation》中还提到另一种思路是:将21个传感器的数据结合起来(实际上只有14个有用的传感器值),形成一个健康指标。在开始时,健康状态被假定为1(即完全健康),而在发动机故障时,健康状态被假定为0(即完全不健康)。计算健康指标h(t)函数的公式为
不同的建模方式,最终的预测结果肯定是不同的。不用想,我这种肯定是预测精度最差的。

3、准备LSTM数据:
3.1、窗口化数据:
对于时间序列,数据通常是连续的,并且其未来值受到过去值的影响。如果时间序列很长,直接使用整个序列作为模型输入会使模型计算复杂度过高,内存需求也会激增。而且长序列可能包含许多局部模式,这些局部模式往往更能体现特征之间的关系。因此,我们要将数据分割为固定长度的时间序列窗口。这样既可以简化计算,又保留了时间依赖性。窗口化后,每个时间窗口都可以作为一个独立的训练样本,还能扩大训练集。
这里设置两个滑动的预测窗口:短期窗口可以关注更精确的短时间趋势,而长期窗口可以关注更平滑的长期变化。通过分开预测,模型可以更好地理解数据的短期和长期模式。
w1 = 30  # 预测窗口1
w0 = 15  # 预测窗口2
sequence_length = 50  # 设置序列长度
如果我们有这么一段时间序列数据:x1,x2,x3,…,x200

那么,窗口化时,以固定长度划分数据:

第一个w0窗口:x1,x2,…,x30

第二个w0窗口:x2,x3,…,x51

……

第一个w1窗口:x1,x2,…,x15

第二个w1窗口:x2,x3,…,x16

……

这样形成了多个重叠窗口,每个窗口都可以作为模型的输入。

3.2、生成标签:

将数据分割为固定长度的时间序列窗口后,还需要输入(传感器数据)和输出(RUL 标签)来建立输入与输出之间的映射关系,模型才能学习传感器数据变化与部件寿命退化之间的关系,然后通过比较预测值和标签值之间的误差来更新参数。

# 添加标签label1 和 label2。label1 用于判断在 w1(30周期内)内是否会故障,label2 进一步细分在 w0(15周期内)是否会故障。
train_df['label1'] = np.where(train_df['RUL'] <= w1, 1, 0)
train_df['label2'] = train_df['label1']
train_df.loc[train_df['RUL'] <= w0, 'label2'] = 2

test_df['label1'] = np.where(test_df['RUL'] <= w1, 1, 0 )
test_df['label2'] = test_df['label1']
test_df.loc[test_df['RUL'] <= w0, 'label2'] = 2

3.3、归一化:

将特征归一化,确保各传感器数据的数值范围一致。
# 使用MinMaxScaler对数据特征进行标准化
scaler = MinMaxScaler()
# 训练集
train_df['cycle_norm'] = train_df['time_in_cycles']
cols_normalize = train_df.columns.difference(['unit_number''time_in_cycles''RUL''label1''label2'])
norm_data = pd.DataFrame(scaler.fit_transform(train_df[cols_normalize]), columns=cols_normalize, index=train_df.index)
join_df = train_df[train_df.columns.difference(cols_normalize)].join(norm_data)
train_df = join_df.reindex(columns = train_df.columns)
# 测试集
test_df['cycle_norm'] = test_df['time_in_cycles'
norm_test_df = pd.DataFrame(scaler.transform(test_df[cols_normalize]), columns=cols_normalize, index=test_df.index)
test_join_df = test_df[test_df.columns.difference(cols_normalize)].join(norm_test_df)
test_df = test_join_df.reindex(columns = test_df.columns)
test_df = test_df.reset_index(drop=True)
3.4、生成序列数据:
由于LSTM输入需要3D的数据格式,这里定义了一个辅助函数 gen_sequence ,通过滑动窗口的方式从单个发动机的数据中生成固定长度的序列,将数据重塑为LSTM所需的格式,即:样本数、时间步长、特征维度
def gen_sequence(id_df, seq_length, seq_cols):
    data_array = id_df[seq_cols].values
    num_elements = data_array.shape[0]
    sequences = []
    for start, stop in zip(range(0, num_elements-seq_length), range(seq_length, num_elements)):
        sequences.append(data_array[start:stop, :])
    return np.array(sequences)
通过遍历训练集和测试机中每个发动机,生成序列数据和对应标签:
sequence_cols = list(norm_data.columns)
sequences = []
for engine_id in train_df['unit_number'].unique():
    engine_data = norm_data[train_df['unit_number'] == engine_id]
    if len(engine_data) >= sequence_length:
        seq = gen_sequence(engine_data, sequence_length, sequence_cols)
        if seq.ndim == 3:  # 确保生成的序列是三维的
            sequences.append(seq)
# 合并所有序列
seq_array = np.concatenate(sequences).astype(np.float32)

# 通过遍历每个发动机,生成训练标签
labels = []
for engine_id in train_df['unit_number'].unique():
    engine_data = train_df[train_df['unit_number'] == engine_id]
    if len(engine_data) >= sequence_length:
        labels.append(engine_data['RUL'].values[sequence_length:])
# 合并所有标签
label_array = np.concatenate(labels).reshape(-1, 1).astype(np.float32)

# 通过遍历每个发动机,生成测试序列
test_sequences = []
for engine_id in test_df['unit_number'].unique():
    engine_data = norm_test_df[test_df['unit_number'] == engine_id]
    if len(engine_data) >= sequence_length:
        seq = gen_sequence(engine_data, sequence_length, sequence_cols)
        if seq.ndim == 3:  # 确保生成的序列是三维的
            test_sequences.append(seq)
# 合并所有测试序列
if test_sequences:
    test_seq_array = np.concatenate(test_sequences).astype(np.float32)
else:
    test_seq_array = np.empty((0, sequence_length, len(sequence_cols)))

# 通过遍历每个发动机,生成测试标签
test_labels = []
for engine_id in test_df['unit_number'].unique():
    engine_data = test_df[test_df['unit_number'] == engine_id]
    if len(engine_data) >= sequence_length:
        test_labels.append(engine_data['RUL'].values[sequence_length:])
# 合并所有测试标签
if test_labels:
    test_label_array = np.concatenate(test_labels).reshape(-1, 1).astype(np.float32)
else:
    test_label_array = np.empty((0, 1))

4、定义 LSTM 模型:
定义一个名为LSTMRULModel的PyTorch模型类
class LSTMRULModel(nn.Module):
    def __init__(self, input_size):
        super(LSTMRULModel, self).__init__()
        self.lstm1 = nn.LSTM(input_size, 128, num_layers=2, batch_first=True, dropout=0.2)
        self.lstm2 = nn.LSTM(128, 64, batch_first=True)
        self.fc1 = nn.Linear(64, 32)
        self.fc2 = nn.Linear(32, 1)
        self.relu = nn.ReLU()
        
    def forward(self, x):
        x, _ = self.lstm1(x)
        x, _ = self.lstm2(x)
        x = x[:, -1, :]  
        x = self.relu(self.fc1(x))
        x = self.fc2(x)
        return x

输入层:输入维度等于传感器特征的数量。

LSTM 层:使用两层 LSTM:第一层LSTM输入特征大小为 input_size,输出特征大小为128;第二层LSTM,输入特征大小为128,输出特征大小为64。

全连接层:用于将特征映射到最终输出标量(RUL)。

激活函数:输出层使用ReLU,确保预测值非负。


5、模型初始化:

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

X_train = torch.tensor(seq_array).to(device)
y_train = torch.tensor(label_array).to(device)
X_test = torch.tensor(test_seq_array).to(device)
y_test = torch.tensor(test_label_array).to(device)

train_dataset = torch.utils.data.TensorDataset(X_train, y_train)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=16, shuffle=True)

model = LSTMRULModel(input_size=seq_array.shape[2]).to(device)


6、模型训练:

使用均方误差(MSE)损失函数,和选择 Adam 优化器:

criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

批量训练,记录损失值,用于后续绘制训练损失曲线图:

for epoch in range(epochs):
    model.train()
    epoch_loss = 0
    for batch_X, batch_y in train_loader:
        optimizer.zero_grad()
        outputs = model(batch_X)
        loss = criterion(outputs, batch_y)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
    
    avg_loss = epoch_loss / len(train_loader)
    train_losses.append(avg_loss)
    
    if (epoch + 1) % 5 == 0:
        print(f'Epoch [{epoch+1}/{epochs}], Loss: {avg_loss:.4f}')

相信没多少人有钱买GPU来搞机器学习的模型训练,我用的也是CPU,训练完一轮耗时一个半小时。

为了提高灵活性,大家可以在模型训练完毕后保存最佳模型,未来就可以直接加载,无需重新训练。保存的 .pth 文件可以在不同设备上加载使用。

if epoch_loss < best_loss:
    best_loss = epoch_loss
    torch.save(self.model.state_dict(), save_path)
    print(f"Best model saved with loss: {best_loss:.4f}")


7、模型评估:

这里选择了MAE、RMSE、R方来作为模型性能评估的指标

model.eval()
with torch.no_grad():
    y_pred_test = model(X_test).cpu().numpy()
    y_true_test = y_test.cpu().numpy()

# 计算测试集评估指标
mae_test = mean_absolute_error(y_true_test, y_pred_test)
rmse_test = np.sqrt(mean_squared_error(y_true_test, y_pred_test))
r2_test = r2_score(y_true_test, y_pred_test)
print(f"\n测试集评估结果:")
print(f"MAE: {mae_test:.4f}")
print(f"RMSE: {rmse_test:.4f}")
print(f"R2 Score: {r2_test:.4f}")

结果为:

MAE=29.49,表明每次预测的平均误差是将近30。R方=0.3564,说明模型只解释了35.64% 的数据变化。结果非常不理想啊,我觉得问题可能就出在RUL计算建模上。


8、可视化:

plt.figure(figsize=(10,5))
plt.plot(train_losses)
plt.title('训练损失曲线')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.show()

从损失曲线上看,整个训练过程的损失值是收敛的,但这并不意味着模型的泛化能力强。

不妨对比一下训练集和测试集中,预测值与真实值的对比:

测试集:

训练集:

从图中我们可以看到,模型对训练集数据的拟合太充分了,甚至有点过于完美了!训练集的R方居然高达0.9924!


考虑到篇幅已经太长了,再写下去,估计大家也没耐心看完。Youtube上有一期视频讲到估计RUL有三种常用的方法:相似性模型、生存模型和退化模型。下一篇,我将尝试使用不同RUL计算方法再尝试一下,到时候再看看效果吧。





感谢大家对机务论坛的支持,关注机务论坛,倾听机务心声!航企优秀的方面必定宣传,不足的地方也必须指出,让领导们重视问题,解决问题,营造更好的机务维修环境。

征稿:
所见所闻,个人感悟,个人成长历程及感人故事。
特别征稿:我师傅的故事!
同时,征集劳动仲裁案例,分享案例,让更多的小伙伴能了解劳动纠纷的解决方式,通过劳动仲裁维护自己的合法权益。





评论区留言,同意的点赞
扫码添加小编微信
匿名爆料




民航机务论坛微信公众平台
改名为:机务论坛
发布最新行业动向 深入解读政策法规
开辟维修工程专栏 交流飞机排故经验
分享前沿技术应用 预测职业发展前景
行业大咖讲经布道 业界专家授业解惑
致力打造一流的民航机务朋友圈----机务论坛
关注机务论坛,倾听机务心声!
投稿邮箱:duanwei0615@163.com







机务论坛
民航机务论坛改名为:机务论坛 发布最新行业动向 深入解读政策法规 开辟维修工程专栏 交流飞机排故经验 分享前沿技术应用 预测职业发展前景 行业大咖讲经布道 业界专家授业解惑 致力打造一流的民航机务朋友圈----机务论坛
 最新文章