切换git用户重新进行项目首次归档

This commit is contained in:
2025-10-17 15:12:38 +08:00
parent c07dbb586d
commit b9cce1d733
71 changed files with 82326 additions and 0 deletions

42
FC_ML_NN/NN_Basic.py Normal file
View File

@@ -0,0 +1,42 @@
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
# 数据生成
x = np.linspace(-3, 3, 100).reshape(-1, 1)
y = 2 * x + 1 + np.random.normal(0, 0.5, x.shape)
x_tensor = torch.FloatTensor(x)
y_tensor = torch.FloatTensor(y)
# 模型定义
class LinearModel(nn.Module):
def __init__(self):
super().__init__()
self.linear = nn.Linear(1, 1)
def forward(self, x):
return self.linear(x)
# 训练配置
model = LinearModel()
criterion = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)
# 训练循环
for epoch in range(1000):
pred = model(x_tensor)
loss = criterion(pred, y_tensor)
optimizer.zero_grad()
loss.backward()
optimizer.step()
# 结果输出
w = model.linear.weight.item()
b = model.linear.bias.item()
print(f'Final equation: y = {w:.2f}x + {b:.2f}')
# 可视化
plt.scatter(x, y)
plt.plot(x, w*x + b, 'r-')
plt.show()

34
FC_ML_NN/NN_Kriging.py Normal file
View File

@@ -0,0 +1,34 @@
import torch
import torch.nn as nn
class OrdinaryKriging(nn.Module):
def __init__(self, variogram_model='gaussian'):
super().__init__()
self.variogram_model = variogram_model
def forward(self, known_coords, known_values, target_coords):
# 计算距离矩阵
dists = torch.cdist(known_coords, known_coords)
target_dists = torch.cdist(target_coords, known_coords)
# 半变异函数(高斯模型)
if self.variogram_model == 'gaussian':
gamma = 1.0 - torch.exp(-(dists ** 2) / 0.5)
target_gamma = 1.0 - torch.exp(-(target_dists ** 2) / 0.5)
# 构建克里金矩阵(添加拉格朗日乘子)
K = torch.cat([
torch.cat([gamma, torch.ones(len(known_coords), 1)], dim=1),
torch.cat([torch.ones(1, len(known_coords)), torch.zeros(1, 1)], dim=1)
])
# 求解权重
weights = torch.linalg.solve(
K,
torch.cat([known_values, torch.zeros(1)])
)
# 预测值
pred = (weights[:-1] * known_values).sum()
return pred

20
FC_ML_NN/NN_LSTM.py Normal file
View File

@@ -0,0 +1,20 @@
import torch
import torch.nn as nn
class LSTMModel(nn.Module):
def __init__(self, input_size, hidden_size, output_size, num_lay):
super().__init__()
self.lstm = nn.LSTM(
input_size=input_size,#输入特征维度
hidden_size=hidden_size,#隐藏层维度
num_layers=num_lay, # 隐藏层数
batch_first=True,
# bidirectional = False, # 是否使用双向LSTM
# dropout = 0.2 # 添加正则化
)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x):
out, (h_n, c_n) = self.lstm(x)
out = self.fc(out[:, -1, :]) # 取最后一个时间步的输出
return out

View File

@@ -0,0 +1,28 @@
import torch
import torch.nn as nn
# 数据生成 (x1, x2 -> y)
x1 = torch.rand(100, 1) * 4 - 2 # [-2, 2]
x2 = torch.rand(100, 1) * 3 - 1 # [-1, 2]
y_true = 2.7*x1 + 3.0*x2 + 5.0*x1*x2 - 1.5*x1**2*x2 # 真实多项式
# 特征工程
def make_features(x1, x2, degree=3):
features = [torch.ones_like(x1)]
for d in range(1, degree+1):
for i in range(d+1):
features.append((x1**i) * (x2**(d-i)))
return torch.cat(features, dim=1)
x_poly = make_features(x1, x2, degree=3)
# 模型训练
model = nn.Sequential(nn.Linear(10, 1)) # 3次多项式共10项
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=0.1)
for epoch in range(2000):
y_pred = model(x_poly)
loss = nn.MSELoss()(y_pred, y_true)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if epoch % 500 == 0:
print(f'Epoch {epoch}, Loss: {loss.item():.4f}')

View File

@@ -0,0 +1,30 @@
import torch
import torch.nn as nn
#多元多项式拟合
# 数据生成 (x1, x2 -> y)
x1 = torch.rand(100, 1) * 4 - 2 # [-2, 2]
x2 = torch.rand(100, 1) * 3 - 1 # [-1, 2]
y_true = 2.7*x1 + 3.0*x2 + 5.0*x1*x2 - 1.5*x1**2*x2 # 真实多项式
# 特征工程
def make_features(x1, x2, degree=3):
features = [torch.ones_like(x1)]
for d in range(1, degree+1):
for i in range(d+1):
features.append((x1**i) * (x2**(d-i)))
return torch.cat(features, dim=1)
x_poly = make_features(x1, x2, degree=3)
# 模型训练
model = nn.Sequential(nn.Linear(10, 1)) # 3次多项式共10项
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=0.1)
for epoch in range(2000):
y_pred = model(x_poly)
loss = nn.MSELoss()(y_pred, y_true)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if epoch % 500 == 0:
print(f'Epoch {epoch}, Loss: {loss.item():.4f}')

169
FC_ML_NN/NN_Polynomial.py Normal file
View File

@@ -0,0 +1,169 @@
#多项式拟合
import sys
import torch
import argparse
from torch.utils.data import TensorDataset, DataLoader
sys.path.append("D:\liyong\project\TVS_ML") # 替换为实际路径
from FC_ML_Data.FC_ML_Data_Load.Data_Load_Excel import get_data_from_excel_xy
from FC_ML_Data.FC_ML_Data_Output.Data_Output_Pytorch import export_model
from FC_ML_Loss_Function.Loss_Function_Selector import LossFunctionSelector
from FC_ML_Optim_Function.Optimizer_Selector import OptimizerSelector
# 生成训练数据
def make_features(x,degree):
return torch.stack([x**i for i in range(1,degree)], dim=1) # 构建x, x², x³特征矩阵
class PolyModel(torch.nn.Module):
def __init__(self,input_size):
super().__init__()
self.linear = torch.nn.Linear(input_size, 1) # 输入3维(x,x²,x³)输出1维
def forward(self, x):
return self.linear(x)
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if __name__ == "__main__":
# 在训练循环前初始化损失记录列表
train_losses = []
test_losses = []
#加载外部参数
parser = argparse.ArgumentParser(description='模型训练参数配置')
parser.add_argument('--data_dir',default='D:/liyong/project/TVS_ML/Test_Data/multi_poly/output.xlsx', help='数据集路径')
parser.add_argument('--model_dir',default='D:/liyong/project/TVS_ML/Test_Data/multi_poly/', help='模型导出路径')
parser.add_argument('--name', default='model', help='导出模型名称')
parser.add_argument('--model_format', default='pt', help='模型格式') ##pt onnx bin
parser.add_argument('--epochs', type=int, default=1000, help='训练轮次')
parser.add_argument('--epochs_output', type=int, default=10, help='训练轮次损失打印')
parser.add_argument('--degree', type=int, default=3, help='多项式拟合阶数')
parser.add_argument('--lr', type=float, default=0.001, help='学习率')# 0.1 - 0.0001
parser.add_argument('--batch_size', type=int, default=32, help='批量加载大小')# 越大内存消耗越大,计算数据加载速度越快
# 'mse': '均方误差', TVS
# 'l1': '平均绝对误差', TVS
# 'cross_entropy': '交叉熵',
# 'bce': '二分类交叉熵',
# 'smooth_l1': '平滑L1',
# 'kl_div': 'KL散度',
# 'hinge': '合页损失',
# 'triplet': '三元组损失'
parser.add_argument('--loss', default='mse', help='损失函数')
# 'sgd': '随机梯度下降', TVS
# 'adam': '自适应矩估计', TVS
# 'rmsprop': '均方根传播',
# 'adagrad': '自适应梯度',
# 'adamw': 'Adam权重衰减版'
parser.add_argument('--optim', default='sgd', help='优化函数')
parser.add_argument('--percent', type=float, default=0.8, help='训练集比例') #0.8表示训练集合占总数据集比例80%,区间[0,1]
parser.add_argument('--sheet', default='Sheet1', help='数据表单名')#不放出来
parser.add_argument('--normalization',action='store_true', help='是否开启数据预处理')#如果开启normalization_type会生效
# Min - Max等区间缩放法
# Z-score等方差缩放法用于数据标准化数据特征数据分布未知、存在异常值、模型依赖梯度下降
# 小数定标标准化法与min-max比保持原始数据分布形态区间≈[-1,1]
parser.add_argument('--normalization_type', default='minmax', help='数据处理方式')
parser.add_argument('--shuffle', action='store_false', help='数据乱序')#默认开启,强时序数据不开启
parser.add_argument('--num_workers',type=int, default=0, help='加速线程数量')#默认为0增加线程会提速数据加载 #不开放
parser.add_argument('--gpu', action='store_true', help='启用GPU加速')#默认采用GPU加速如果没有则CPU计算 #不开放
args = parser.parse_args()
print(f"训练数据源: {args.data_dir},模型导出路径:{args.model_dir},"
f"模型名称:{args.name} ,模型导出格式:{args.model_format},"
f"训练轮次: {args.epochs}, 多项式阶数:{args.degree},"
f"学习率:{args.lr},损失函数:{args.loss},优化函数:{args.optim},"
f"数据表单名:{args.sheet},是否开启数据预处理:{args.normalization},"
f"数据处理方式:{args.normalization_type},gpu加速:{args.gpu},"
f"批量加载:{args.batch_size},数据乱序:{args.shuffle},"
f"加速线程数量:{args.num_workers},训练集比例:{args.percent},训练轮次损失打印:{args.epochs_output}")
#默认开启GPU加速
if not args.gpu:
DEVICE = torch.device("cpu")
#加载训练数据
x_ori,y_ori,x,y,normalization = get_data_from_excel_xy(args.data_dir,args.sheet,args.normalization,args.normalization_type)
#拆分测试集和训练集
aa = len(x)
split = int(args.percent * len(x))
train_dataset = TensorDataset(x[:split], y[:split])
test_dataset = TensorDataset(x[split:], y[split:])
print(train_dataset,test_dataset)
train_loader = DataLoader(
train_dataset,
batch_size=args.batch_size, #批量加载数据
shuffle=args.shuffle, #数据打乱
num_workers=args.num_workers #多线程加速
)
test_loader = DataLoader(
test_dataset,
batch_size=args.batch_size, #批量加载数据
shuffle=False, #验证集默认不打乱
num_workers=args.num_workers #多线程加速
)
#初始化模型
model = PolyModel(input_size = args.degree).to(DEVICE)
#初始化损失函数
loss_selector = LossFunctionSelector()
criterion = loss_selector.get_loss(args.loss)
#初始化优化器
optim_selector = OptimizerSelector();
optimizer = optim_selector.get_optimizer(model.parameters(),args.optim, lr=args.lr)
for epoch in range(args.epochs):
#模型启用训练模式
epoch_train_loss = 0
model.train()
for features,labels in train_loader:
powers = torch.arange(1, args.degree + 1, dtype=x.dtype)
x_poly = features ** powers.view(1, -1)
x_poly,labels= x_poly.to(DEVICE),labels.to(DEVICE)
pred = model(x_poly)
loss = criterion(pred.squeeze(), labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
#计算损失
epoch_train_loss += loss.item() * x_poly.size(0)
# 计算平均训练损失并记录
avg_train_loss = epoch_train_loss / len(train_loader.dataset)
train_losses.append(avg_train_loss)
#模型启用评估模式
model.eval()
test_loss = 0
with torch.no_grad():#关闭梯度下降
for features, labels in test_loader:
powers = torch.arange(1, args.degree + 1, dtype=x.dtype)
x_poly = features ** powers.view(1, -1)
x_poly, labels = x_poly.to(DEVICE), labels.to(DEVICE)
preds = model(x_poly)
test_loss += criterion(preds.squeeze(), labels).item() * x_poly.size(0)
avg_test_loss = test_loss / len(test_loader.dataset)
test_losses.append(avg_test_loss)
#每100次迭代输出一次损失数值
if epoch % args.epochs_output == 0:
print(
f"Epoch {epoch} | Train Loss: {avg_train_loss:.4f} | Test Loss: {avg_test_loss:.4f} | 损失比: {avg_train_loss / avg_test_loss:.2f}:1")
#导出训练后的模型
export_model(model,args.model_dir,args.name,args.model_format)
# 可视化
# import matplotlib.pyplot as plt
# plt.scatter(x_ori, y_ori, label='ori')
# powers = torch.arange(1, args.degree + 1, dtype=x.dtype)
# x_input = x ** powers.view(1, -1)
# x_input.to(DEVICE)
# model.to(DEVICE)
# y_output = model(x_input).detach().numpy()
# y_output = torch.tensor(y_output)
# y_real = y_output
# if args.normalization:
# y_real = normalization.inverse_transform(y_output)
# plt.plot(x_ori, y_real.squeeze(), 'r', label='fit')
# plt.legend()
# plt.show()

View File

@@ -0,0 +1,58 @@
#多项式拟合
import torch
import numpy as np
from FC_ML_Data.FC_ML_Data_Output.Data_Output_File import tensor_to_excel
# 真实多项式系数
true_w = torch.tensor([0.5, 3.0, 2.4]) # 对应x, x², x³项
true_b = 0.9
# 生成训练数据
def make_features(x):
return torch.stack([x**i for i in range(1,4)], dim=1) # 构建x, x², x³特征矩阵
x = torch.linspace(-3, 3, 100)
X = make_features(x)
y = X @ true_w + true_b + torch.randn(x.size()) * 0.5 # 添加噪声
print(x,X,y)
# tensor_to_excel(torch.cat([x, y], dim=-1),"./")
class PolyModel(torch.nn.Module):
def __init__(self):
super().__init__()
self.linear = torch.nn.Linear(3, 1) # 输入3维(x,x²,x³)输出1维
def forward(self, x):
return self.linear(x)
model = PolyModel()
criterion = torch.nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
for epoch in range(1000):
pred = model(X)
loss = criterion(pred.squeeze(), y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if epoch % 100 == 0:
print(f'Epoch {epoch}, Loss: {loss.item():.4f}')
# 获取训练后的参数
w_pred = model.linear.weight.detach().numpy().flatten()
b_pred = model.linear.bias.detach().numpy()
# print(f"真实参数: w={true_w.numpy()}, b={true_b}")
# print(f"预测参数: w={w_pred}, b={b_pred:.2f}")
# 可视化
import matplotlib.pyplot as plt
plt.scatter(x, y, label='ori')
plt.plot(x, model(X).detach().numpy(), 'r', label='fit')
plt.legend()
plt.show()

54
FC_ML_NN/NN_RNN.py Normal file
View File

@@ -0,0 +1,54 @@
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
plt.rcParams['font.sans-serif'] = ['SimHei'] # 使用黑体显示中文:ml-citation{ref="1,2" data="citationList"}
plt.rcParams['axes.unicode_minus'] = False #
# 手动实现RNN单元
class SimpleRNNCell:
def __init__(self, input_size, hidden_size):
# 权重初始化
self.W_xh = torch.randn(input_size, hidden_size) * 0.01
self.W_hh = torch.randn(hidden_size, hidden_size) * 0.01
self.b_h = torch.zeros(1, hidden_size)
def forward(self, x, h_prev):
"""
x: 当前输入 (1, input_size)
h_prev: 前一刻隐藏状态 (1, hidden_size)
"""
# RNN核心计算
h_next = torch.tanh(torch.mm(x, self.W_xh) +
torch.mm(h_prev, self.W_hh) +
self.b_h)
return h_next
# 示例:处理序列数据
input_size = 3
hidden_size = 4
seq_length = 5
# 创建RNN单元
rnn_cell = SimpleRNNCell(input_size, hidden_size)
# 初始化隐藏状态
h = torch.zeros(1, hidden_size)
# 模拟输入序列 (5个时间步每个时间步3维向量)
inputs = [torch.randn(1, input_size) for _ in range(seq_length)]
# 循环处理序列
hidden_states = []
for t in range(seq_length):
h = rnn_cell.forward(inputs[t], h)
hidden_states.append(h.detach().numpy())
print(f"时间步 {t + 1}, 隐藏状态: {h}")
# 可视化隐藏状态变化
plt.figure(figsize=(10, 6))
for i in range(hidden_size):
plt.plot(range(1, seq_length + 1), [h[0, i] for h in hidden_states],
label=f'隐藏单元 {i + 1}')
plt.title('RNN隐藏状态随时间变化')
plt.xlabel('时间步')
plt.ylabel('隐藏状态值')
plt.legend()
plt.grid(True)
plt.show()

0
FC_ML_NN/__init__.py Normal file
View File