# 读取训练和测试数据
train_path = r"H:\剩余寿命预测\CMAPSSData\train_FD001.txt"
test_path = r"H:\剩余寿命预测\CMAPSSData\test_FD001.txt"
rul_path = r"H:\剩余寿命预测\CMAPSSData\RUL_FD001.txt"
fd_001_train = pd.read_csv(train_path, sep=" ", header=None)
fd_001_test = pd.read_csv(test_path, sep=" ", header=None)
truth_df = pd.read_csv(rul_path, header=None)
# 删除不需要的列
fd_001_train.drop(columns=[26, 27], inplace=True)
fd_001_test.drop(columns=[26, 27], inplace=True)
# 定义列名
columns = [
'unit_number', 'time_in_cycles', 'setting_1', 'setting_2', 'TRA', 'T2', 'T24', 'T30', 'T50',
'P2', 'P15', 'P30', 'Nf', 'Nc', 'epr', 'Ps30', 'phi', 'NRf', 'NRc', 'BPR',
'farB', 'htBleed', 'Nf_dmd', 'PCNfR_dmd', 'W31', 'W32'
]
fd_001_train.columns = columns
fd_001_test.columns = columns
# 删除与发动机健康不相关的参数
fd_001_train.drop(columns=['Nf_dmd','PCNfR_dmd','P2','T2','TRA','farB','epr'],inplace=True)
fd_001_test.drop(columns=['Nf_dmd','PCNfR_dmd','P2','T2','TRA','farB','epr'],inplace=True)
# 绘制相关性热图,找出弱相关或冗余的参数
sns.heatmap(fd_001_train.corr(), annot=True, cmap='RdYlGn', linewidths=0.2)
fig=plt.gcf()
fig.set_size_inches(20,20)
plt.show()
# 删除与弱相关或冗余的参数
columns_to_drop = ['setting_1', 'setting_2', 'P15', 'NRc']
fd_001_train = fd_001_train.drop(columns=columns_to_drop)
fd_001_test = fd_001_test.drop(columns=columns_to_drop)
# 按发动机分组,计算训练集每个发动机time_in_cycles的最大值,并创建一个新的DataFrame: fd_RUL
fd_RUL = fd_001_train.groupby('unit_number')['time_in_cycles'].max().reset_index()
fd_RUL.columns = ['unit_number', 'max']
# 合并数据并计算RUL
train_df = fd_001_train.merge(fd_RUL, on=['unit_number'], how='left')
train_df['RUL'] = train_df['max'] - train_df['time_in_cycles']
train_df.drop(columns=['max'], inplace=True)
而在测试集中,我们使用外部数据(RUL_FD001.txt)提供的真实RUL值来评估模型的性能。
test_df = fd_001_test
rul = pd.DataFrame(test_df.groupby('unit_number')['time_in_cycles'].max()).reset_index()
rul.columns = ['unit_number','max']
# 将truth_df的列名设置为more
truth_df.columns = ['more']
truth_df['unit_number'] = truth_df.index + 1 # 为truth_df添加了一个新列unit_number,其值基于truth_df的索引加1
truth_df['max'] = rul['max'] + truth_df['more'] # 计算每台发动机的总寿命(max):测试集中已用的循环数+真实的RUL
truth_df.drop('more', axis=1, inplace=True)
# 将包含RUL信息的truth_df与测试数据集test_df进行左连接合并,基于unit_number列。这样,test_df中的每一行都会与truth_df中对应的RUL信息匹配
test_df = test_df.merge(truth_df, on=['unit_number'], how='left')
test_df['RUL'] = test_df['max'] - test_df['time_in_cycles']
test_df.drop('max', axis=1, inplace=True)
w1 = 30 # 预测窗口1
w0 = 15 # 预测窗口2
sequence_length = 50 # 设置序列长度
那么,窗口化时,以固定长度划分数据:
第二个w0窗口:x2,x3,…,x51
……
第一个w1窗口:x1,x2,…,x15
第二个w1窗口:x2,x3,…,x16
……
这样形成了多个重叠窗口,每个窗口都可以作为模型的输入。
3.2、生成标签:
将数据分割为固定长度的时间序列窗口后,还需要输入(传感器数据)和输出(RUL 标签)来建立输入与输出之间的映射关系,模型才能学习传感器数据变化与部件寿命退化之间的关系,然后通过比较预测值和标签值之间的误差来更新参数。
# 添加标签label1 和 label2。label1 用于判断在 w1(30周期内)内是否会故障,label2 进一步细分在 w0(15周期内)是否会故障。
train_df['label1'] = np.where(train_df['RUL'] <= w1, 1, 0)
train_df['label2'] = train_df['label1']
train_df.loc[train_df['RUL'] <= w0, 'label2'] = 2
test_df['label1'] = np.where(test_df['RUL'] <= w1, 1, 0 )
test_df['label2'] = test_df['label1']
test_df.loc[test_df['RUL'] <= w0, 'label2'] = 2
3.3、归一化:
# 使用MinMaxScaler对数据特征进行标准化
scaler = MinMaxScaler()
# 训练集
train_df['cycle_norm'] = train_df['time_in_cycles']
cols_normalize = train_df.columns.difference(['unit_number', 'time_in_cycles', 'RUL', 'label1', 'label2'])
norm_data = pd.DataFrame(scaler.fit_transform(train_df[cols_normalize]), columns=cols_normalize, index=train_df.index)
join_df = train_df[train_df.columns.difference(cols_normalize)].join(norm_data)
train_df = join_df.reindex(columns = train_df.columns)
# 测试集
test_df['cycle_norm'] = test_df['time_in_cycles']
norm_test_df = pd.DataFrame(scaler.transform(test_df[cols_normalize]), columns=cols_normalize, index=test_df.index)
test_join_df = test_df[test_df.columns.difference(cols_normalize)].join(norm_test_df)
test_df = test_join_df.reindex(columns = test_df.columns)
test_df = test_df.reset_index(drop=True)
def gen_sequence(id_df, seq_length, seq_cols):
data_array = id_df[seq_cols].values
num_elements = data_array.shape[0]
sequences = []
for start, stop in zip(range(0, num_elements-seq_length), range(seq_length, num_elements)):
sequences.append(data_array[start:stop, :])
return np.array(sequences)
sequence_cols = list(norm_data.columns)
sequences = []
for engine_id in train_df['unit_number'].unique():
engine_data = norm_data[train_df['unit_number'] == engine_id]
if len(engine_data) >= sequence_length:
seq = gen_sequence(engine_data, sequence_length, sequence_cols)
if seq.ndim == 3: # 确保生成的序列是三维的
sequences.append(seq)
# 合并所有序列
seq_array = np.concatenate(sequences).astype(np.float32)
# 通过遍历每个发动机,生成训练标签
labels = []
for engine_id in train_df['unit_number'].unique():
engine_data = train_df[train_df['unit_number'] == engine_id]
if len(engine_data) >= sequence_length:
labels.append(engine_data['RUL'].values[sequence_length:])
# 合并所有标签
label_array = np.concatenate(labels).reshape(-1, 1).astype(np.float32)
# 通过遍历每个发动机,生成测试序列
test_sequences = []
for engine_id in test_df['unit_number'].unique():
engine_data = norm_test_df[test_df['unit_number'] == engine_id]
if len(engine_data) >= sequence_length:
seq = gen_sequence(engine_data, sequence_length, sequence_cols)
if seq.ndim == 3: # 确保生成的序列是三维的
test_sequences.append(seq)
# 合并所有测试序列
if test_sequences:
test_seq_array = np.concatenate(test_sequences).astype(np.float32)
else:
test_seq_array = np.empty((0, sequence_length, len(sequence_cols)))
# 通过遍历每个发动机,生成测试标签
test_labels = []
for engine_id in test_df['unit_number'].unique():
engine_data = test_df[test_df['unit_number'] == engine_id]
if len(engine_data) >= sequence_length:
test_labels.append(engine_data['RUL'].values[sequence_length:])
# 合并所有测试标签
if test_labels:
test_label_array = np.concatenate(test_labels).reshape(-1, 1).astype(np.float32)
else:
test_label_array = np.empty((0, 1))
class LSTMRULModel(nn.Module):
def __init__(self, input_size):
super(LSTMRULModel, self).__init__()
self.lstm1 = nn.LSTM(input_size, 128, num_layers=2, batch_first=True, dropout=0.2)
self.lstm2 = nn.LSTM(128, 64, batch_first=True)
self.fc1 = nn.Linear(64, 32)
self.fc2 = nn.Linear(32, 1)
self.relu = nn.ReLU()
def forward(self, x):
x, _ = self.lstm1(x)
x, _ = self.lstm2(x)
x = x[:, -1, :]
x = self.relu(self.fc1(x))
x = self.fc2(x)
return x
输入层:输入维度等于传感器特征的数量。
LSTM 层:使用两层 LSTM:第一层LSTM输入特征大小为 input_size
,输出特征大小为128;第二层LSTM,输入特征大小为128,输出特征大小为64。
全连接层:用于将特征映射到最终输出标量(RUL)。
激活函数:输出层使用ReLU,确保预测值非负。
5、模型初始化:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
X_train = torch.tensor(seq_array).to(device)
y_train = torch.tensor(label_array).to(device)
X_test = torch.tensor(test_seq_array).to(device)
y_test = torch.tensor(test_label_array).to(device)
train_dataset = torch.utils.data.TensorDataset(X_train, y_train)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=16, shuffle=True)
model = LSTMRULModel(input_size=seq_array.shape[2]).to(device)
6、模型训练:
使用均方误差(MSE)损失函数,和选择 Adam 优化器:
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
批量训练,记录损失值,用于后续绘制训练损失曲线图:
for epoch in range(epochs):
model.train()
epoch_loss = 0
for batch_X, batch_y in train_loader:
optimizer.zero_grad()
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
loss.backward()
optimizer.step()
epoch_loss += loss.item()
avg_loss = epoch_loss / len(train_loader)
train_losses.append(avg_loss)
if (epoch + 1) % 5 == 0:
print(f'Epoch [{epoch+1}/{epochs}], Loss: {avg_loss:.4f}')
相信没多少人有钱买GPU来搞机器学习的模型训练,我用的也是CPU,训练完一轮耗时一个半小时。
为了提高灵活性,大家可以在模型训练完毕后保存最佳模型,未来就可以直接加载,无需重新训练。保存的 .pth
文件可以在不同设备上加载使用。
if epoch_loss < best_loss:
best_loss = epoch_loss
torch.save(self.model.state_dict(), save_path)
print(f"Best model saved with loss: {best_loss:.4f}")
7、模型评估:
这里选择了MAE、RMSE、R方来作为模型性能评估的指标
model.eval()
with torch.no_grad():
y_pred_test = model(X_test).cpu().numpy()
y_true_test = y_test.cpu().numpy()
# 计算测试集评估指标
mae_test = mean_absolute_error(y_true_test, y_pred_test)
rmse_test = np.sqrt(mean_squared_error(y_true_test, y_pred_test))
r2_test = r2_score(y_true_test, y_pred_test)
print(f"\n测试集评估结果:")
print(f"MAE: {mae_test:.4f}")
print(f"RMSE: {rmse_test:.4f}")
print(f"R2 Score: {r2_test:.4f}")
结果为:
MAE=29.49,表明每次预测的平均误差是将近30。R方=0.3564,说明模型只解释了35.64% 的数据变化。结果非常不理想啊,我觉得问题可能就出在RUL计算建模上。
8、可视化:
plt.figure(figsize=(10,5))
plt.plot(train_losses)
plt.title('训练损失曲线')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.show()
从损失曲线上看,整个训练过程的损失值是收敛的,但这并不意味着模型的泛化能力强。
不妨对比一下训练集和测试集中,预测值与真实值的对比:
测试集:
训练集:
从图中我们可以看到,模型对训练集数据的拟合太充分了,甚至有点过于完美了!训练集的R方居然高达0.9924!
考虑到篇幅已经太长了,再写下去,估计大家也没耐心看完。Youtube上有一期视频讲到估计RUL有三种常用的方法:相似性模型、生存模型和退化模型。下一篇,我将尝试使用不同RUL计算方法再尝试一下,到时候再看看效果吧。