Files
ModelTrainingPython/FC_ML_NN/NN_Polynomial.py

169 lines
7.9 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#多项式拟合
import sys
import torch
import argparse
from torch.utils.data import TensorDataset, DataLoader
sys.path.append("D:\liyong\project\TVS_ML") # 替换为实际路径
from FC_ML_Data.FC_ML_Data_Load.Data_Load_Excel import get_data_from_excel_xy
from FC_ML_Data.FC_ML_Data_Output.Data_Output_Pytorch import export_model
from FC_ML_Loss_Function.Loss_Function_Selector import LossFunctionSelector
from FC_ML_Optim_Function.Optimizer_Selector import OptimizerSelector
# 生成训练数据
def make_features(x,degree):
return torch.stack([x**i for i in range(1,degree)], dim=1) # 构建x, x², x³特征矩阵
class PolyModel(torch.nn.Module):
def __init__(self,input_size):
super().__init__()
self.linear = torch.nn.Linear(input_size, 1) # 输入3维(x,x²,x³)输出1维
def forward(self, x):
return self.linear(x)
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if __name__ == "__main__":
# 在训练循环前初始化损失记录列表
train_losses = []
test_losses = []
#加载外部参数
parser = argparse.ArgumentParser(description='模型训练参数配置')
parser.add_argument('--data_dir',default='D:/liyong/project/TVS_ML/Test_Data/multi_poly/output.xlsx', help='数据集路径')
parser.add_argument('--model_dir',default='D:/liyong/project/TVS_ML/Test_Data/multi_poly/', help='模型导出路径')
parser.add_argument('--name', default='model', help='导出模型名称')
parser.add_argument('--model_format', default='pt', help='模型格式') ##pt onnx bin
parser.add_argument('--epochs', type=int, default=1000, help='训练轮次')
parser.add_argument('--epochs_output', type=int, default=10, help='训练轮次损失打印')
parser.add_argument('--degree', type=int, default=3, help='多项式拟合阶数')
parser.add_argument('--lr', type=float, default=0.001, help='学习率')# 0.1 - 0.0001
parser.add_argument('--batch_size', type=int, default=32, help='批量加载大小')# 越大内存消耗越大,计算数据加载速度越快
# 'mse': '均方误差', TVS
# 'l1': '平均绝对误差', TVS
# 'cross_entropy': '交叉熵',
# 'bce': '二分类交叉熵',
# 'smooth_l1': '平滑L1',
# 'kl_div': 'KL散度',
# 'hinge': '合页损失',
# 'triplet': '三元组损失'
parser.add_argument('--loss', default='mse', help='损失函数')
# 'sgd': '随机梯度下降', TVS
# 'adam': '自适应矩估计', TVS
# 'rmsprop': '均方根传播',
# 'adagrad': '自适应梯度',
# 'adamw': 'Adam权重衰减版'
parser.add_argument('--optim', default='sgd', help='优化函数')
parser.add_argument('--percent', type=float, default=0.8, help='训练集比例') #0.8表示训练集合占总数据集比例80%,区间[0,1]
parser.add_argument('--sheet', default='Sheet1', help='数据表单名')#不放出来
parser.add_argument('--normalization',action='store_true', help='是否开启数据预处理')#如果开启normalization_type会生效
# Min - Max等区间缩放法
# Z-score等方差缩放法用于数据标准化数据特征数据分布未知、存在异常值、模型依赖梯度下降
# 小数定标标准化法与min-max比保持原始数据分布形态区间≈[-1,1]
parser.add_argument('--normalization_type', default='minmax', help='数据处理方式')
parser.add_argument('--shuffle', action='store_false', help='数据乱序')#默认开启,强时序数据不开启
parser.add_argument('--num_workers',type=int, default=0, help='加速线程数量')#默认为0增加线程会提速数据加载 #不开放
parser.add_argument('--gpu', action='store_true', help='启用GPU加速')#默认采用GPU加速如果没有则CPU计算 #不开放
args = parser.parse_args()
print(f"训练数据源: {args.data_dir},模型导出路径:{args.model_dir},"
f"模型名称:{args.name} ,模型导出格式:{args.model_format},"
f"训练轮次: {args.epochs}, 多项式阶数:{args.degree},"
f"学习率:{args.lr},损失函数:{args.loss},优化函数:{args.optim},"
f"数据表单名:{args.sheet},是否开启数据预处理:{args.normalization},"
f"数据处理方式:{args.normalization_type},gpu加速:{args.gpu},"
f"批量加载:{args.batch_size},数据乱序:{args.shuffle},"
f"加速线程数量:{args.num_workers},训练集比例:{args.percent},训练轮次损失打印:{args.epochs_output}")
#默认开启GPU加速
if not args.gpu:
DEVICE = torch.device("cpu")
#加载训练数据
x_ori,y_ori,x,y,normalization = get_data_from_excel_xy(args.data_dir,args.sheet,args.normalization,args.normalization_type)
#拆分测试集和训练集
aa = len(x)
split = int(args.percent * len(x))
train_dataset = TensorDataset(x[:split], y[:split])
test_dataset = TensorDataset(x[split:], y[split:])
print(train_dataset,test_dataset)
train_loader = DataLoader(
train_dataset,
batch_size=args.batch_size, #批量加载数据
shuffle=args.shuffle, #数据打乱
num_workers=args.num_workers #多线程加速
)
test_loader = DataLoader(
test_dataset,
batch_size=args.batch_size, #批量加载数据
shuffle=False, #验证集默认不打乱
num_workers=args.num_workers #多线程加速
)
#初始化模型
model = PolyModel(input_size = args.degree).to(DEVICE)
#初始化损失函数
loss_selector = LossFunctionSelector()
criterion = loss_selector.get_loss(args.loss)
#初始化优化器
optim_selector = OptimizerSelector();
optimizer = optim_selector.get_optimizer(model.parameters(),args.optim, lr=args.lr)
for epoch in range(args.epochs):
#模型启用训练模式
epoch_train_loss = 0
model.train()
for features,labels in train_loader:
powers = torch.arange(1, args.degree + 1, dtype=x.dtype)
x_poly = features ** powers.view(1, -1)
x_poly,labels= x_poly.to(DEVICE),labels.to(DEVICE)
pred = model(x_poly)
loss = criterion(pred.squeeze(), labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
#计算损失
epoch_train_loss += loss.item() * x_poly.size(0)
# 计算平均训练损失并记录
avg_train_loss = epoch_train_loss / len(train_loader.dataset)
train_losses.append(avg_train_loss)
#模型启用评估模式
model.eval()
test_loss = 0
with torch.no_grad():#关闭梯度下降
for features, labels in test_loader:
powers = torch.arange(1, args.degree + 1, dtype=x.dtype)
x_poly = features ** powers.view(1, -1)
x_poly, labels = x_poly.to(DEVICE), labels.to(DEVICE)
preds = model(x_poly)
test_loss += criterion(preds.squeeze(), labels).item() * x_poly.size(0)
avg_test_loss = test_loss / len(test_loader.dataset)
test_losses.append(avg_test_loss)
#每100次迭代输出一次损失数值
if epoch % args.epochs_output == 0:
print(
f"Epoch {epoch} | Train Loss: {avg_train_loss:.4f} | Test Loss: {avg_test_loss:.4f} | 损失比: {avg_train_loss / avg_test_loss:.2f}:1")
#导出训练后的模型
export_model(model,args.model_dir,args.name,args.model_format)
# 可视化
# import matplotlib.pyplot as plt
# plt.scatter(x_ori, y_ori, label='ori')
# powers = torch.arange(1, args.degree + 1, dtype=x.dtype)
# x_input = x ** powers.view(1, -1)
# x_input.to(DEVICE)
# model.to(DEVICE)
# y_output = model(x_input).detach().numpy()
# y_output = torch.tensor(y_output)
# y_real = y_output
# if args.normalization:
# y_real = normalization.inverse_transform(y_output)
# plt.plot(x_ori, y_real.squeeze(), 'r', label='fit')
# plt.legend()
# plt.show()