190 lines
6.3 KiB
Python
190 lines
6.3 KiB
Python
import torch
|
||
import torch.nn as nn
|
||
import pandas as pd
|
||
import numpy as np
|
||
from sklearn.preprocessing import StandardScaler, MinMaxScaler
|
||
from torch.utils.data import Dataset, DataLoader
|
||
import matplotlib.pyplot as plt
|
||
|
||
# 参数设置
|
||
SEQ_LENGTH = 10 # 时间序列窗口长度
|
||
PRE_LENGTH = 1 #预测时间序列窗口长度
|
||
BATCH_SIZE = 4096
|
||
EPOCHS = 2000
|
||
HIDDEN_SIZE = 64
|
||
HIDDEN_LAYER = 3 #隐藏层
|
||
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
||
|
||
|
||
# 数据预处理
|
||
def process_data(file_path):
|
||
# 读取Excel文件,跳过首行,使用科学计数法解析
|
||
df = pd.read_excel(file_path, header=None, skiprows=1,sheet_name="sample1")
|
||
|
||
# 提取输入输出数据
|
||
inputs = df.iloc[:, 1:9].values.astype(np.float32) # 第2-9列
|
||
outputs = df.iloc[:, 10:19].values.astype(np.float32) # 第11-19列
|
||
print("标准化之前:")
|
||
ori = inputs
|
||
print(inputs)
|
||
# 数据标准化
|
||
input_scaler = StandardScaler()
|
||
output_scaler = StandardScaler()
|
||
inputs = input_scaler.fit_transform(inputs)
|
||
outputs = output_scaler.fit_transform(outputs)
|
||
#
|
||
# input_scaler = MinMaxScaler(feature_range=(-1, 1))
|
||
# output_scaler = MinMaxScaler(feature_range=(-1, 1))
|
||
# inputs = input_scaler.fit_transform(inputs)
|
||
# outputs = output_scaler.fit_transform(outputs)
|
||
print("标准化之后:")
|
||
print(inputs)
|
||
|
||
# 创建对比可视化
|
||
plt.figure(figsize=(12, 8))
|
||
colors = plt.cm.tab10(np.arange(8)) # 生成8种不同颜色
|
||
# 绘制图像通道数据分布
|
||
plt.subplot(2, 2, 1)
|
||
plt.hist(ori, bins=30, alpha=0.7,color=colors, label='Original')
|
||
plt.title('Image Channel (Original)')
|
||
plt.xlabel('Pixel Value')
|
||
|
||
plt.subplot(2, 2, 1)
|
||
plt.hist(inputs, bins=30, alpha=0.7, color=colors, label='StandardScaler')
|
||
plt.title('StandardScaler Comparison')
|
||
plt.xlabel('StandardScaler Value')
|
||
plt.legend()
|
||
|
||
plt.tight_layout()
|
||
plt.show()
|
||
|
||
return inputs, outputs, input_scaler, output_scaler
|
||
|
||
|
||
# 创建序列数据集 look_back:依据时间序列 pred_step:推测时间序列
|
||
def create_sequences(inputs, outputs, look_back=8, pred_step=1):
|
||
X, y = [], []
|
||
for i in range(len(inputs) - look_back - pred_step):
|
||
X.append(inputs[i:i + look_back])
|
||
y.append(outputs[(i+look_back):(i+look_back+pred_step)])
|
||
return torch.FloatTensor(np.array(X)), torch.FloatTensor(np.array(y))
|
||
|
||
|
||
# 自定义Dataset
|
||
class TimeSeriesDataset(Dataset):
|
||
def __init__(self, X, y):
|
||
self.X = X
|
||
self.y = y
|
||
|
||
def __len__(self):
|
||
return len(self.X)
|
||
|
||
def __getitem__(self, idx):
|
||
return self.X[idx], self.y[idx]
|
||
# 在训练循环前初始化损失记录列表
|
||
train_losses = []
|
||
test_losses = []
|
||
|
||
# LSTM模型
|
||
class LSTMModel(nn.Module):
|
||
def __init__(self, input_size, hidden_size, output_size, num_lay):
|
||
super().__init__()
|
||
self.lstm = nn.LSTM(
|
||
input_size=input_size,
|
||
hidden_size=hidden_size,
|
||
batch_first=True,
|
||
num_layers = num_lay, # 增加LSTM层数
|
||
# bidirectional = True, # 使用双向LSTM
|
||
# dropout = 0.2 # 添加正则化
|
||
)
|
||
self.fc = nn.Linear(hidden_size, output_size)
|
||
|
||
def forward(self, x):
|
||
out, (h_n, c_n) = self.lstm(x)
|
||
out = self.fc(out[:, -1, :]) # 取最后一个时间步的输出
|
||
return out
|
||
|
||
|
||
# 主流程
|
||
if __name__ == "__main__":
|
||
# 数据准备,归一化和标准化
|
||
inputs, outputs, input_scaler, output_scaler = process_data("D:\liyong\文档\项目文档\中汽TVS\机器学习\降阶模型数据.xlsx")
|
||
|
||
print(inputs[:5])#输出前五个
|
||
print(outputs[:5])#输出前五个
|
||
X, y = create_sequences(inputs, outputs, SEQ_LENGTH,PRE_LENGTH)
|
||
|
||
# 数据集分割
|
||
split = int(0.8 * len(X))
|
||
train_dataset = TimeSeriesDataset(X[:split], y[:split])
|
||
test_dataset = TimeSeriesDataset(X[split:], y[split:])
|
||
|
||
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
|
||
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)
|
||
|
||
# 模型初始化
|
||
model = LSTMModel(
|
||
input_size=8, # 输入特征数
|
||
hidden_size=HIDDEN_SIZE,
|
||
output_size=8 , # 输出特征数
|
||
num_lay = HIDDEN_LAYER
|
||
).to(DEVICE)
|
||
|
||
# 训练配置
|
||
criterion = nn.MSELoss()
|
||
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
|
||
|
||
for epoch in range(EPOCHS):
|
||
model.train()
|
||
epoch_train_loss = 0
|
||
|
||
for batch_X, batch_y in train_loader:
|
||
batch_X, batch_y = batch_X.to(DEVICE), batch_y.to(DEVICE)
|
||
outputs = model(batch_X)
|
||
loss = criterion(outputs, batch_y)
|
||
|
||
optimizer.zero_grad()
|
||
loss.backward()
|
||
optimizer.step()
|
||
|
||
epoch_train_loss += loss.item() * batch_X.size(0)
|
||
|
||
# 计算平均训练损失并记录
|
||
avg_train_loss = epoch_train_loss / len(train_loader.dataset)
|
||
train_losses.append(avg_train_loss)
|
||
|
||
# 验证损失计算(原代码逻辑)
|
||
model.eval()
|
||
test_loss = 0
|
||
with torch.no_grad():
|
||
for batch_X, batch_y in test_loader:
|
||
batch_X, batch_y = batch_X.to(DEVICE), batch_y.to(DEVICE)
|
||
preds = model(batch_X)
|
||
test_loss += criterion(preds, batch_y).item() * batch_X.size(0)
|
||
|
||
avg_test_loss = test_loss / len(test_loader.dataset)
|
||
test_losses.append(avg_test_loss)
|
||
|
||
# 输出损失比例
|
||
if epoch % 10 == 0:
|
||
print(
|
||
f"Epoch {epoch} | Train Loss: {avg_train_loss:.4f} | Test Loss: {avg_test_loss:.4f} | 损失比: {avg_train_loss / avg_test_loss:.2f}:1")
|
||
torch.save(model,"D:\liyong\lstm.pth")
|
||
# 动态绘制损失曲线
|
||
plt.figure(figsize=(10, 5))
|
||
plt.plot(train_losses, label='Train Loss', color='blue', alpha=0.7)
|
||
plt.plot(test_losses, label='Test Loss', color='red', alpha=0.7)
|
||
plt.title("LSTM TrainLine (train vs test)")
|
||
plt.xlabel("Epoch")
|
||
plt.ylabel("Loss")
|
||
plt.legend()
|
||
plt.grid(True, linestyle='--', alpha=0.5)
|
||
# //plt.savefig('training_loss_curve.png', dpi=300) # 保存高清图像
|
||
plt.show()
|
||
|
||
# 示例预测
|
||
sample_input = X[0:1].to(DEVICE) # 取第一个样本
|
||
prediction = model(sample_input)
|
||
|
||
print("Sample Prediction:", prediction)
|
||
print("Real Value:", y[0:1]) |