第二讲 Linear_Model(线性模型)
DataSet(数据集) -> Model(模型) -> Training(训练) -> inferring(推理)
例子
已知数据集学习时间x[1,2,3] 成绩y[2,4,6],测试数据集x=4,y=?
此时数据集分为训练数据集和测试数据集
一般情况下为了避免过拟合,会将测试数据集的一部分作为开发(验证)数据集,用来验证模型的准确程度
监督学习:有标签的数据学习,根据输入值和输出值对模型进行调整;
利用一组已知类别的样本调整分类器的参数,使其达到所要求性能的过程
线性模型
获取最优的线性模型:因为此时数据集较少采用简单的线性模型。随机选取w以后,计算损失值,然后不停调整w的值(在某个范围内穷举)使得损失值最小
Training Loss针对一个样本
Mean Square Error(MSE平均平方误差)针对整个训练集
代码实现:
损失曲线:
作业2:
代码:
import numpy as np
import matplotlib.pyplot as plt
# 定义数据集
x_data = [1.0, 2.0, 3.0]
y_data = [1.9, 3.9, 5.9]
# 生成矩阵坐标
W, B = np.arange(0.0, 4.1, 0.1).round(1), np.arange(-2.0, 2.1, 0.1).round(1)
w, b = np.meshgrid(W, B)
# 定义模型:y = x * w - b
def forward(x):
return x * w + b
# 损失函数
def loss(y_pre, y):
return (y_pre - y) ** 2
l_sum = 0 # 计算损失之和
for x_val, y_val in zip(x_data, y_data):
y_pre = forward(x_val)
loss_val = loss(y_pre, y_val)
l_sum += loss_val
mse = l_sum / len(x_data)
# 定义figure
fig = plt.figure(figsize=(10, 10), dpi=300)
# 画3D图
ax = plt.axes(projection='3d')
surf = ax.plot_surface(w, b, mse, rstride=1, cstride=1, cmap='rainbow')
# 设置z轴
ax.set_zlim(0, 40)
# 设置图标
ax.set_xlabel('w')
ax.set_ylabel('b')
ax.set_zlabel('Loss')
ax.text(0.2, 2, 43, 'Cost Value', color='black')
# 增加颜色条
fig.colorbar(surf, shrink=0.5, aspect=5)
fig.show()
效果:
第三讲 Gradient_Descent(梯度下降)
类似牛顿迭代法/ 二分法,对cost func 求导 , 利用偏导进行迭代,使得cost func 达到最小值。
关键迭代式:
课上代码:
x_data = [1.0, 2.0, 3.0]
y_data = [2.0, 4.0, 6.0]
w = 1.0
#线性函数
def forward(x):
return x * w
# 求MSE
def cost(xs, ys):
cost = 0
for x, y in zip(xs, ys):
y_pred = forward(x)
cost += (y_pred - y) ** 2
return cost / len(xs)
#求导函数
def gradient(xs, ys):
grad = 0
for x, y in zip(xs, ys):
grad += 2 * x * (x * w - y)
return grad / len(xs)
print('Predict (before training)', 4, forward(4))
#迭代!求w
for epoch in range(100):
cost_val = cost(x_data, y_data)
grad_val = gradient(x_data, y_data)
w -= 0.01 * grad_val
print('Epoch:', epoch, 'w=', w, 'loss=', cost_val)
print('Predict (after training)', 4, forward(4))
梯度下降与随机梯度下降(SGD)对比
梯度下降法遇到鞍点无法跳出,但是随机梯度下降可能会跳跃鞍点
SGD算法是从样本中随机抽出一组,训练后按梯度更新一次,然后再抽取一组,再更新一次,在样本量及其大的情况下,可能不用训练完所有的样本就可以获得一个损失值在可接受范围之内的模型了。
这里的随机是指每次迭代过程中,样本都要被随机打乱,打乱是有效减小样本之间造成的参数更新抵消问题。
SGD代码实现:
对梯度下降和随机梯度下降综合一下获取更好的性能
对数据进行分组mini-batch
:组内梯度下降,组间随机梯度下降
Mini-Batch
full batch:在梯度下降中需要对所有样本进行处理过后然后走一步,如果样本规模的特别大的话效率就会比较低。假如有500万,甚至5000万个样本(业务场景中,一般有几千万行,有些大数据有10亿行)的话走一轮迭代就会非常的耗时。
为了提高效率,我们可以把样本分成等量的子集。 例如我们把100万样本分成1000份, 每份1000个样本, 这些子集就称为mini batch。mini-batch的大小一般取2的n次方然后我们分别用一个for循环遍历这1000个子集。 针对每一个子集做一次梯度下降。 然后更新参数w和b的值。接着到下一个子集中继续进行梯度下降。 这样在遍历完所有的mini batch之后我们相当于在梯度下降中做了1000次迭代。 我们将遍历一次所有样本的行为叫做一个 epoch,也就是一个世代。 在mini batch下的梯度下降中做的事情其实跟full batch一样,只不过我们训练的数据不再是所有的样本,而是一个个的子集。 这样在mini batch我们在一个epoch中就能进行1000次的梯度下降,而在full batch中只有一次。 这样就大大的提高了我们算法的运行速度。
第四讲 Back Propagation(反向传播)
可以在图上进行梯度传播帮助我们建立一个更好的模型结构
线性模型叠加的神经网络
如图所示线性模型可以看做一个简单的神经网络
由线性模型组成简单的神经网络
可以发现这个神经网络进行的运算无论叠加多少层一直都是线性运算,提高层数没有意义
因此为了提高模型的复杂程度,我们为神经网络添加一个非线性因素,例如sigmoid函数
在进行完加法运算以后对这个中间变量进行非线性的变换
复杂的神经网络一般需要多层连接,其中主要是升维或降维的操作
反向传播
反向传播的链式求导:
1.创建计算图进行前馈运算,沿箭头方向进行运算
2.求z,同时算出z关于w,z关于x的偏导
3.求最终的输出L,并得到最终的损失值关于输出z的偏导(前馈的过程就是一步一步算到loss,而loss再一步一步反向传播,就可以拿到上一层L对z的偏导)
4.利用链式法则反向求偏导(损失关于w和x的偏导)
简单线性模型的完整计算图(包括前馈与反馈)
在PyTorch中进行前馈和反馈的运算
Tensor:
代码实现:
import torch
x_data = [1.0,2.0,3.0]
y_data = [2.0,4.0,6.0]
w = torch.tensor([1.0]) # w 是tensor类型
w.requires_grad = True # 为true表示需要计算梯度
def forward(x):
return x * w # 此时w为tensor类型,会自动将x也转换成tensor类型
def loss(x,y): # 失败函数 构建计算图直接用张量
y_pred = forward(x)
return (y_pred - y) ** 2
print("predict (before training",4,forward(4).item())
for epoch in range(100):
for x,y in zip(x_data,y_data):
l = loss(x,y) # 前馈,计算loss
l.backward() # 反馈,l是张量,调用backward函数,可以把数据链路上所有需要梯度的地方都求出来并存到w.grad里
# 每进行一次反馈重新构造一个计算图
print('\tgrad:',x,y,w.grad.item()) # item 直接把数值拿出来做成标量
w.data = w.data - 0.01 * w.grad.data # 此时的w.grad还是一个张量,需要取到data值进行计算,这样不会建立计算图
w.grad.data.zero_() # 梯度数据清零
print("progress:",epoch,l.item())
print("predict (after training)",4,forward(4).item())
第五讲 Pytorch实现线性回归
1.准备数据集
2.设计模型
3.构造损失函数和优化器
4.训练过程
5.代码实现
import torch
x_data = torch.Tensor([[1.0],[2.0],[3.0]])
y_data = torch.Tensor([[2.0],[4.0],[6.0]])
# 线性回归模型
class LinearModel(torch.nn.Module): # 从module继承类
def __init__(self):
# super调用父类构造
super(LinearModel,self).__init__()
self.linear = torch.nn.Linear(1,1) # (1,1)输入1维,输出也是1维
def forward(self,x): #只能写forward
y_pred = self.linear(x) # 实现一个可调用的对象
return y_pred
model = LinearModel() # callable
criterion = torch.nn.MSELoss(reduction='sum') # MSEloss继承自nn.module
optimizer = torch.optim.SGD(model.parameters(),lr=0.01)
for epoch in range(1000): # 迭代100次
y_pred = model(x_data)
loss = criterion(y_pred,y_data) # 前馈
print(epoch,loss.item()) #此处要用loss.item()
#loss是个对象,调用时会自动调用__str__()函数,不会产生计算图
optimizer.zero_grad() # 梯度归零
loss.backward() # 反向传播
optimizer.step() # 更新
print('w=',model.linear.weight.item())
print('b=',model.linear.bias.item())
x_test = torch.Tensor([[4.0]])
y_test = model(x_test)
print('y_pred=',y_test.data)
第六讲 Logistics_Regression(逻辑回归)
逻辑回归中因变量是离散的,而线性回归中因变量是连续的这是两者最大的区别。因此分类问题中最好使用逻辑回归。
逻辑回归本质是线性回归,但是它加了sigmoid函数
1 二分类和sigmoid函数
二分类损失函数
利用交叉熵函数计算概率,计算的是分布的差异
Mini-Batch
2 逻辑回归
由于逻辑回归本质也是线性回归,所以利用pytorch解决逻辑回归时参考线性回归的四步。
- 准备数据集
- 设计模型
- 构造损失函数和优化器
代码:
import torch
x_data = torch.Tensor([[1.0],[2.0],[3.0]])
y_data = torch.Tensor([[2.0],[4.0],[6.0]])
# 线性回归模型
class LinearModel(torch.nn.Module): # 从module继承类
def __init__(self):
# super调用父类构造
super(LinearModel,self).__init__()
self.linear = torch.nn.Linear(1,1) # (1,1)输入1维,输出也是1维
def forward(self,x): #只能写forward
y_pred = self.linear(x) # 实现一个可调用的对象
return y_pred
model = LinearModel() # callable
criterion = torch.nn.MSELoss(reduction='sum') # MSEloss继承自nn.module
optimizer = torch.optim.SGD(model.parameters(),lr=0.01)
for epoch in range(1000): # 迭代100次
y_pred = model(x_data)
loss = criterion(y_pred,y_data) # 前馈
print(epoch,loss.item()) #此处要用loss.item()
#loss是个对象,调用时会自动调用__str__()函数,不会产生计算图
optimizer.zero_grad() # 梯度归零
loss.backward() # 反向传播
optimizer.step() # 更新
print('w=',model.linear.weight.item())
print('b=',model.linear.bias.item())
x_test = torch.Tensor([[4.0]])
y_test = model(x_test)
print('y_pred=',y_test.data)
结果:
第七讲 Multiple Dimension Input(多维特征的输入)
1 多维逻辑回归模型
Mini-batch
2 线性层和人工神经网络
逻辑回归只有一层,多个类似逻辑回归的变换首尾相连就可以创建神经网络
矩阵是空间变换的函数,所以可以改变维度
神经网络是寻找一种非线性的空间变换的函数
linear可以做到空间维度的变换
3 例子:糖尿病是否恶化的预测
3.1 数据集
每个样本有好几个特征
四步走:
1. 准备数据集
2. 设计模型
3. 构造损失函数和优化器
4. 训练周期
3.2 代码实现
import numpy as np
import torch
import matplotlib.pyplot as plt
xy = np.loadtxt('D:\桌面文件\深度学习\刘二大人\PyTorch深度学习实践\diabetes.csv.gz',delimiter=',',dtype = np.float32) #32位浮点数
x_data = torch.from_numpy(xy[:,:-1]) # 最后一列不要
y_data = torch.from_numpy(xy[:,[-1]]) # 只要最后一列 []保证得到矩阵
#-------------------------#准备数据集
class Model(torch.nn.Module):
def __init__(self):
super(Model,self).__init__()
# 不同的地方就是多次降维
self.linear1 = torch.nn.Linear(8,6) #8维到6维
self.linear2 = torch.nn.Linear(6,4)
self.linear3 = torch.nn.Linear(4,1)
# 添加非线性的变化,此处是nn下的sigmoid,是一个模块
# 把此处sigmoid作为一个运算模块,继承自module,不需要传参,只构建一个
# 与functional下的没有区别
self.sigmoid = torch.nn.Sigmoid()
self.activate = torch.nn.ReLU()
def forward(self,x):
# x = self.sigmoid(self.linear1(x))
# x = self.sigmoid(self.linear2(x))
# x = self.sigmoid(self.linear3(x))
x = self.activate(self.linear1(x))
x = self.activate(self.linear2(x))
x = self.sigmoid(self.linear3(x))
return x
model = Model()
#----------------------------------------#创建模型
criterion = torch.nn.BCELoss(reduction='sum')
# 损失函数有所不同,BCE是二分类交叉熵,MSE是均方误差
# loss是否乘1/N,影响学习率的取值
optimizer = torch.optim.SGD(model.parameters(),lr=0.01)
#-----------------------------------#构造损失函数和优化器
epoch_x = []
loss_y = []
for epoch in range(100):
#前馈
y_pred = model(x_data) # 所有数据
loss = criterion(y_pred,y_data)
print(epoch,loss.item())
#画epoch-loss图,x和y轴的数据
epoch_x.extend(epoch)
loss_y.extend(loss.item())
#反馈
optimizer.zero_grad()
loss.backward()
#更新
optimizer.step()
#---------------------------------------#训练周期
# 画图
plt.plot(epoch_x, loss_y)
plt.xlabel('epoch')
plt.ylabel('loss')
plt.show()
第八讲 dataset和dataloader (加载数据集)
知识点:Dataset和Dataload,以及minibatch训练
问题描述:将数据集进行统一的封装
主要思路:利用Dataset划分数据集,转tensor张量,并支持索引;利用Dataload进行批处理并打乱数据集
代码实现:
由于Dataset为抽象类,需要自定义后继承
__init__:一般根据传入的文件名去指定文件获取数据集,并定义len(shape后取行数), x, y作为属性
__getitem__:利用索引直接返回即可
__len__:直接返回数据长度的属性
而Dataload对其进行实例化即可,需要指定:数据集、是否打乱、多线程等(由于win和linux下多线程调用库不一致,因此主循环需要加入判断语句)
由于采用batch进行训练,因此要进行嵌套for循环,Dataload处理成一批批数据了,每次读入一批
代码:
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
# 1.数据准备(Dataset为抽象类,需要实例化)
class DiabetesDataset(Dataset):
def __init__(self, filepath):
xy = np.loadtxt(filepath, delimiter=',', dtype=np.float32)
self.len = xy.shape[0]
self.x = torch.from_numpy(xy[:, :-1])
self.y = torch.from_numpy(xy[:, [-1]])
def __getitem__(self, index):
return self.x[index], self.y[index]
def __len__(self):
return self.len
dataset = DiabetesDataset('diabetes.csv.gz')
train_loader = DataLoader(dataset=dataset, batch_size=32, shuffle=True, num_workers=0)
# 2.模型定义
class LogicModule(torch.nn.Module):
def __init__(self):
super(LogicModule, self).__init__()
self.linear1 = torch.nn.Linear(8, 6)
self.linear2 = torch.nn.Linear(6, 4)
self.linear3 = torch.nn.Linear(4, 1)
self.sigmoid = torch.nn.Sigmoid()
def forward(self, x):
x = self.sigmoid(self.linear1(x))
x = self.sigmoid(self.linear2(x))
x = self.sigmoid(self.linear3(x))
return x
model = LogicModule()
# 3.损失函数和优化器
criterion = torch.nn.BCELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
# 4.模型训练(由于new_work在win和linux下不兼容,需要用if)
if __name__ == '__main__':
loss_list = []
for epoch in range(500):
loss_epoch = 0
for i, (x, y) in enumerate(train_loader): # 这里只有训练数据
y_pre = model(x)
loss = criterion(y_pre, y)
loss_epoch += loss.item()
# print('epoch=', epoch, 'loss=', loss.item())
optimizer.zero_grad()
loss.backward()
optimizer.step()
loss_list.append(loss_epoch / i)
# 绘制每个epoch中每个batch平均损失随epoch的图像
plt.plot(loss_list)
plt.show()
作业实现:
import torch
import numpy as np
import pandas as pd
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler
# 数据预处理
def preprocess_data(df):
# 处理缺失值
df['Age'].fillna(df['Age'].median(), inplace=True)
df['Fare'].fillna(df['Fare'].median(), inplace=True)
df['Embarked'].fillna(df['Embarked'].mode()[0], inplace=True)
# 将分类数据转换为数值数据
df['Sex'] = df['Sex'].map({'male': 0, 'female': 1})
df['Embarked'] = df['Embarked'].map({'C': 0, 'Q': 1, 'S': 2})
# 选择特征
features = ['Pclass', 'Sex', 'Age', 'SibSp', 'Parch', 'Fare', 'Embarked']
df = df[features]
return df
# 读取训练数据并预处理
train_df = pd.read_csv('./dataset/train.csv')
train_df = preprocess_data(train_df)
# 提取特征和标签
x_train = train_df.values
y_train = pd.read_csv('./dataset/train.csv')['Survived'].values
# 标准化特征
scaler = StandardScaler()
x_train = scaler.fit_transform(x_train)
# 将测试数据加载并预处理
test_df = pd.read_csv('./dataset/test.csv')
passenger_ids = test_df['PassengerId'].values
test_df = preprocess_data(test_df)
Xtest = scaler.transform(test_df.values)
Xtest = torch.from_numpy(Xtest).float()
# 创建自定义数据集
class TitanicDataset(Dataset):
def __init__(self, data, labels):
self.data = torch.from_numpy(data).float()
self.labels = torch.from_numpy(labels).float()
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
return self.data[idx], self.labels[idx]
# 创建数据加载器
train_dataset = TitanicDataset(x_train, y_train)
train_loader = DataLoader(dataset=train_dataset, batch_size=32, shuffle=True)
# 定义模型
class Model(torch.nn.Module):
def __init__(self):
super().__init__()
self.linear1 = torch.nn.Linear(7, 32)
self.linear2 = torch.nn.Linear(32, 16)
self.linear3 = torch.nn.Linear(16, 1)
self.activate = torch.nn.ReLU()
def forward(self, x):
x = self.activate(self.linear1(x))
x = self.activate(self.linear2(x))
x = torch.sigmoid(self.linear3(x))
return x
# 实例化模型
model = Model()
# 定义损失函数和优化器
criterion = torch.nn.BCELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
# 训练模型
def train(epoch):
model.train()
train_loss = 0.0
for inputs, labels in train_loader:
optimizer.zero_grad()
outputs = model(inputs)
labels = labels.view(-1, 1) # 调整标签形状以匹配输出
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
train_loss += loss.item()
if epoch % 200 == 199:
print(f"Epoch {epoch + 1}, Train Loss: {train_loss / len(train_loader)}")
# 训练循环
if __name__ == '__main__':
for epoch in range(5000):
train(epoch)
# 预测测试数据
with torch.no_grad():
model.eval()
y_pred = model(Xtest)
y_pred_label = (y_pred >= 0.5).float().numpy().astype(int)
print(y_pred_label)
# 将预测结果保存为CSV文件
output = pd.DataFrame({'PassengerId': passenger_ids, 'Survived': y_pred_label.flatten()})
output.to_csv('submission.csv', index=False)
第九讲 softmax classifier
知识点:SoftMax
激活函数,多分类交叉熵CrossEntropyLoss
,图像transform
预处理,训练测试单独封装,torchvision
库
问题描述:dataset中MINIST
数据集读取方式为PIL
不能直接输入网络,同时多分类问题输出需要为一个分布
主要思路:利用transform
进行预处理,将图片拉直后进入线性网络,利用多分类交叉熵CrossEntropyLoss
计算损失值(线性层传入即可,CrossEntropyLoss = SoftMax + NLLLoss
)
代码实现:
数据处理:transforms
构建Compose
时用ToTensor
转为张量,用Normalize
进行标准化(涉及到的函数都来自于torchvision.transforms)
利用dataset
中MINIST
数据集指定下载、路径、是否为训练集、transform
利用Dataloader
整理成分批数据集
由于用线性网络实现,前向传播时先利用view
将图像拉直过线性层(注意最后一层不用过激活函数),采用CrossEntropyLoss
损失函数
训练:内层循环单独拿出来(传入epoch
)记录batch
数,进行输出(loss
存的是item()
,同时输出后记得置零)
测试:
不用计算梯度,利用with torch.no_grad()
:
利用正确数除以总数记录精确度
总数:累加每个batch
的size(0)
正确数:利用torch.max
找到最大概率值,获取其索引于真实值进行比较,利用sum()
汇总数据
import torch
import matplotlib.pyplot as plt
from torchvision import transforms # 针对图像处理
from torchvision import datasets
from torch.utils.data import DataLoader
import torch.nn.functional as F # 使用ReLU
import torch.optim as optim # 优化器
# 1.数据集准备
batch_size = 64
# transform pytorch读图像时,神经网络希望输入比较小,
# pillow把图像转化为图像张量,单通道转化为多通道
transform = transforms.Compose([ # compose可以把[]里的数据进行pipline处理
transforms.ToTensor(), # 转化成张量
transforms.Normalize((0.1307,), (0.3081,)) # normalize归一化,(均值,标准差)
])
# transform放到数据集里是为了对第i个数据集直接操作
train_dataset = datasets.MNIST(root='./dataset/mnist',
train=True,
download=True,
transform=transform)
train_loader = DataLoader(train_dataset,
shuffle=True,
batch_size=batch_size)
test_dataset = datasets.MNIST(root='./dataset/mnist/',
train=False,
download=True,
transform=transform)
test_loader = DataLoader(test_dataset,
shuffle=False,
batch_size=batch_size)
# 2.构造模型
class Net(torch.nn.Module):
def __init__(self):
super(Net, self).__init__()
self.l1 = torch.nn.Linear(784, 512)
self.l2 = torch.nn.Linear(512, 256)
self.l3 = torch.nn.Linear(256, 128)
self.l4 = torch.nn.Linear(128, 64)
self.l5 = torch.nn.Linear(64, 10)
def forward(self, x):
# 如果是x.view(1,-1),表示需要转化成一行的向量,但是不知道多少列,需要电脑计算
x = x.view(-1, 784) # view改变张量的形式,把(N,1,28,28)变成二阶,-1表示0维度的数字不变
x = F.relu(self.l1(x))
x = F.relu(self.l2(x))
x = F.relu(self.l3(x))
x = F.relu(self.l4(x))
return self.l5(x) # 最后一层不激活
model = Net()
# 3.损失函数和优化器
criterion = torch.nn.CrossEntropyLoss() # 交叉熵损失
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5) # 用带冲量的
# 4.训练周期+测试集
def train(epoch):
running_loss = 0.0
for batch_size, data in enumerate(train_loader, 0):
inputs, target = data # x,y
optimizer.zero_grad() # 在优化器优化之前,进行权重清零;
outputs = model(inputs)
loss = criterion(outputs, target)
loss.backward()
optimizer.step()
running_loss += loss.item() # 累计loss
if batch_size % 300 == 299:
print('[%d, %5d] loss: %.3f' % (epoch + 1, batch_size + 1, running_loss / 300))
def test():
correct = 0
total = 0
with torch.no_grad(): # 不需要计算梯度
for data in test_loader:
images, labels = data
outputs = model(images)
# 求每一行最大值的下标,返回最大值,和下标
_, predicted = torch.max(outputs.data, dim=1)
total += labels.size(0) # batch_size
correct += (predicted == labels).sum().item() # 比较下标与预测值是否接近,求和表示猜对了几个
print('Accuracy on test set: %d %%' % (100 * correct / total))
if __name__ == '__main__':
for epoch in range(100):
train(epoch)
test()
由于 transforms.Compose() 接受的参数是一个列表,所以需要使用 [] 将变换组织成一个列表,以便传递给这个函数。
输入线性层之前一定要记得拉直成向量
torch.max(dim=1)会返回两个值(数和索引),dim表示计算最值的维度,1表示列,也就是取每行的所有列中的最值
进行计数使用tensor.data
课后作业代码:
import pandas as pd
import torch
from torch.utils.data import DataLoader, Dataset
import torch.nn.functional as F
import torch.optim as optim
# 自定义数据集
class OttoDataset(Dataset):
def __init__(self, data, targets=None, transform=None):
self.data = data
self.targets = targets
self.transform = transform
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
x = self.data[idx]
if self.targets is not None:
y = self.targets[idx]
return torch.tensor(x, dtype=torch.float32), torch.tensor(y, dtype=torch.long)
else:
return torch.tensor(x, dtype=torch.float32)
# 加载数据
train_df = pd.read_csv('./dataset/otto_train.csv')
test_df = pd.read_csv('./dataset/otto_test.csv')
# 提取特征和标签
X_train = train_df.drop(['id', 'target'], axis=1).values
y_train = train_df['target'].map(lambda x: int(x.split('_')[1]) - 1).values # target 转换为 0-8
X_test = test_df.drop(['id'], axis=1).values
test_ids = test_df['id'].values
# 转换为数据集
train_dataset = OttoDataset(X_train, y_train)
test_dataset = OttoDataset(X_test)
batch_size = 64
train_loader = DataLoader(train_dataset, shuffle=True, batch_size=batch_size)
test_loader = DataLoader(test_dataset, shuffle=False, batch_size=batch_size)
# 设计模型
class Net(torch.nn.Module):
def __init__(self):
super(Net, self).__init__()
self.l1 = torch.nn.Linear(93, 512)
self.l2 = torch.nn.Linear(512, 256)
self.l3 = torch.nn.Linear(256, 128)
self.l4 = torch.nn.Linear(128, 64)
self.l5 = torch.nn.Linear(64, 9) # 输出9类
def forward(self, x):
x = F.relu(self.l1(x))
x = F.relu(self.l2(x))
x = F.relu(self.l3(x))
x = F.relu(self.l4(x))
return self.l5(x)
model = Net()
# 构造损失函数和优化器
criterion = torch.nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
# 训练循环
def train(epoch):
running_loss = 0.0
for batch_idx, (inputs, target) in enumerate(train_loader, 0):
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, target)
loss.backward()
optimizer.step()
running_loss += loss.item()
if batch_idx % 300 == 299:
print('[%d, %5d] loss: %.3f' % (epoch + 1, batch_idx + 1, running_loss / 300))
running_loss = 0.0
# 生成测试集结果并保存为CSV
def generate_test_results():
model.eval()
results = []
with torch.no_grad():
for data in test_loader:
inputs = data
outputs = model(inputs)
probabilities = F.softmax(outputs, dim=1)
results.extend(probabilities.cpu().numpy())
# 将结果转换为 DataFrame
results_df = pd.DataFrame(results, columns=[f'Class_{i+1}' for i in range(9)])
results_df.insert(0, 'id', test_ids)
# 保存为 CSV 文件
results_df.to_csv('otto_predictions.csv', index=False, float_format='%.1f')
# 主函数
if __name__ == '__main__':
for epoch in range(10):
train(epoch)
generate_test_results()
第十讲 卷积神经网络(基础篇)
1 卷积神经网络的基本结构
卷积神经网络=特征提取+分类
特征提取:
特征提取器,通过卷积运算,找到某种特征。由卷积层convolution和下采样Subsampling 构成。
一个图像,丢到一个卷积层里面仍然是一个3维的张量;
下采样,通道数不变,但是图片的宽度和高度变小(减少维数数量,减少计算量)
分类器
将特征向量化后,用全连接网络进行来分类。
Convolution (卷积层)
背景知识:
RGB:
栅格图:按照像素格保存图像,每个像素格里面保存度值,如果是彩色图,每个格子里就是RGB三个通道的灰度。
矢量图:根据描述来绘制
2.卷积的运算过程
- 单通道卷积:
卷积核
卷积核能够输出一层
卷积核数量=最终输出的通道数
- 三通道卷积
每个通道对应一个卷积核(所以卷积核也得有三个)
做数乘,再求和。
卷积核的通道数量 = 输入通道数 (三通道输入—卷积核也有三个通道)
卷积核的总数= 输出通道的数量 (M通道输出– 卷积核(filter)有M个)
卷积核的大小自己定
卷积层对图像的大小没有要求,只对通道数有关。所以定义一个卷积层只需要定义:1. 输入通道,2. 输出通道数,3. 卷积核数;
conv_layer = torch.nn.Conv2d(in_channels,
out_channels,
kernel_size=kernel_size)
- 卷积的Padding
若对于一个大小为N×N的原图,经过大小为M×M的卷积核卷积后,仍然想要得到一个大小为N×N的图像,则需要对原图进行Padding,即外围填充。
Padding的圈数: M/2 圈 (比如M=3 , padding 3/2 = 1 圈; M=5, padding 5/2 = 2 圈)
例如,对于一个5×5的原图,若想使用一个3×3的卷积核进行卷积,并获得一个同样5×5的图像,则需要进行Padding,通常外围填充0
Stride
每次移动卷积核时的步长。
大的步长能够有效地减小维数。
Maxpooling 池化层
对于一个M*M的图像而言,被分割成大小相同的块,每个块的值取块内最大值,称为MaxPooling; (用均值就是AveragePooling)
通过池化层可以有效减小宽度和高度
只是缩小图片的宽度和高度,不改变通道数
- 简例
卷积层Conv2d Layer和 池化层Pooling Layer 对
输入图像的大小没有要求(因为做的是简单的运算都能处理)
全过程中图像大小影响最大的是最后传入分类器的维度(传入的是向量,长度与图像大小有关)
模型:
代码:
import torch
from torchvision import transforms
from torchvision import datasets
from torch.utils.data import DataLoader
import torch.nn.functional as F
import torch.optim as optim
# prepare dataset
batch_size = 64
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
train_dataset = datasets.MNIST(root='./dataset/mnist/', train=True, download=True, transform=transform)
train_loader = DataLoader(train_dataset, shuffle=True, batch_size=batch_size)
test_dataset = datasets.MNIST(root='./dataset/mnist/', train=False, download=True, transform=transform)
test_loader = DataLoader(test_dataset, shuffle=False, batch_size=batch_size)
# design model using class
class Net(torch.nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = torch.nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = torch.nn.Conv2d(10, 20, kernel_size=5)
self.pooling = torch.nn.MaxPool2d(2)
self.fc = torch.nn.Linear(320, 10)
def forward(self, x):
# flatten data from (n,1,28,28) to (n, 784)
batch_size = x.size(0)
x = F.relu(self.pooling(self.conv1(x)))
x = F.relu(self.pooling(self.conv2(x)))
x = x.view(batch_size, -1) # -1 此处自动算出的是320
x = self.fc(x)
return x
model = Net()
# construct loss and optimizer
criterion = torch.nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
# training cycle forward, backward, update
def train(epoch):
running_loss = 0.0
for batch_idx, data in enumerate(train_loader, 0):
inputs, target = data
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, target)
loss.backward()
optimizer.step()
running_loss += loss.item()
if batch_idx % 300 == 299:
print('[%d, %5d] loss: %.3f' % (epoch + 1, batch_idx + 1, running_loss / 300))
running_loss = 0.0
def test():
correct = 0
total = 0
with torch.no_grad():
for data in test_loader:
images, labels = data
outputs = model(images)
_, predicted = torch.max(outputs.data, dim=1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
print('accuracy on test set: %d %% ' % (100 * correct / total))
if __name__ == '__main__':
for epoch in range(10):
train(epoch)
test()
3.Using GPU
1.模型迁移到GPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.to(device)
2.把Input和target迁移到GPU(同一块显卡)
inputs, target = inputs.to(device), target.to(device)
第十一讲 卷积神经网络(高级篇)
GoogleNet
1. nception Module
为了减少代码的冗余,有两种重要的方法,以抽象出相似的地方
面向过程的函数
面向对象的类
我们把网络中的相似块都封装起来,减少代码重复,每个模块称为Inception。
超参数难以选择
解决: 尝试不同的超参数配置,不同配置赋予不同的权重,然后根据效果好坏调整权重。
最终不同路径得出的结果 width和Height必须一致
如何保持输出的宽度和高度不变:
Conv
用padding外围填充
Average Pooling :
设置stride=1;padding
2. 1×1 的卷积
理解起来很简单,就是按位置加权求和的一个算子;
what is it?
1*1卷积核的数目和输入的通道有关(这里3个)
why we use it?
减少运算量
运算量直接缩小到1/10 , 节约时间,也被称为network in network
3 四条路径的模型与代码
Concatenate 拼接:
说明dim =1
输入的张量是(bacth,Channel,W,H) ,第一个维度是Channel
我们沿着Channel将四个拼接
代码1:
定义Inception
class InceptionA(nn.Module):
def __init__(self, in_channels): # 通道数作为输入
super(InceptionA, self).__init__()
self.branch1x1 = nn.Conv2d(in_channels, 16, kernel_size=1)
self.branch5x5_1 = nn.Conv2d(in_channels,16, kernel_size=1)
self.branch5x5_2 = nn.Conv2d(16, 24, kernel_size=5, padding=2)
self.branch3x3_1 = nn.Conv2d(in_channels, 16, kernel_size=1)
self.branch3x3_2 = nn.Conv2d(16, 24, kernel_size=3, padding=1)
self.branch3x3_3 = nn.Conv2d(24, 24, kernel_size=3, padding=1)
self.branch_pool = nn.Conv2d(in_channels, 24, kernel_size=1)
def forward(self, x):
branch1x1 = self.branch1x1(x)
branch5x5 = self.branch5x5_1(x)
branch5x5 = self.branch5x5_2(branch5x5)
branch3x3 = self.branch3x3_1(x)
branch3x3 = self.branch3x3_2(branch3x3)
branch3x3 = self.branch3x3_3(branch3x3)
branch_pool = F.avg_pool2d(x, kernel_size=3, stride=1, padding=1)
branch_pool = self.branch_pool(branch_pool)
outputs = [branch1x1, branch5x5, branch3x3, branch_pool]
return torch.cat(outputs, dim=1) # 按照通道数合并,总共16+24+24+24=88个通道
代码2:
定义网络
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(88, 20, kernel_size=5)
self.incep1 = InceptionA(in_channels=10)
self.incep2 = InceptionA(in_channels=20)
self.mp = nn.MaxPool2d(2)
self.fc = nn.Linear(1408, 10)
def forward(self, x):
in_size = x.size(0)
x = F.relu(self.mp(self.conv1(x)))
x = self.incep1(x)
x = F.relu(self.mp(self.conv2(x)))
x = self.incep2(x)
x = x.view(in_size, -1)
x = self.fc(x)
return x
训练轮数不是越多越好,可以在实验过程中保留较优的参数值
由于层数太多,带来了梯度消失问题:https://blog.csdn.net/superCally/article/details/55671064
可以用逐层训练解决(但是有些网络层数过多)
Residual Net : (DRN :Deep Residual Net)
H(x) = F(x) + x 这种设计可以有效解决梯度消失
F(x)与 x 是同维度的才能相加,所以经过层后的输出,尺寸要和原来的相同。
残差网络(Residual Net)
两种不同神经网络结构
1.一般的
2.跳链接
例子:
在“ 卷积-> ReLU-> 池化” 的架构后面加一层ResidualNet(红色块标注)
经过Residual Block, 数据的尺寸不变
ResidualBlock的实现:
第十二讲 循环神经网络(基础篇)
basic_RNN
对于一个全连接网络,即全部由线性层组成的网络,也称作dense(稠密型) 或者deep(深度型)网络
对于一个卷积神经网络,卷积核对多层图像处理,卷积核不变,所以权重数量少,而全连接网络的线性模型,权重数多,计算量大。
对于输入的特征具有明显的序列关系,如天气时间序列预测,就适合使用RNN循环神经网络
注意信息融合和线性层共享
在pytorch中实现RNN
2.1 定义RNNCell
怎样使用RNN?
2.1.1 代码实现
import torch
# 参数设置
batch_size = 1
seq_len = 3
input_size = 4
hidden_size = 2
cell = torch.nn.RNNCell(input_size = input_size,hidden_size=hidden_size)
#(seq,batch,features)
dataset = torch.randn(seq_len,batch_size,input_size)
hidden = torch.zeros(batch_size,hidden_size)
for idx,input in enumerate(dataset):
print('='*20,idx,'='*20)
print('Input size',input.shape)
hidden = cell(input,hidden)
print('output size',hidden.shape)
print(hidden)
2.2 使用RNN模型
数据维度
2.2.1 numlayers
2.2.2 batch_first
代码实现:
import torch
# 参数设置
batch_size = 1
seq_len = 3
input_size = 4
hidden_size = 2
#比rnncell多一个numlayers
num_layers = 1
cell = torch.nn.RNN(input_size = input_size,hidden_size=hidden_size,
num_layers = num_layers)
#(seq,batch,features)
# 指明维度
inputs = torch.randn(seq_len,batch_size,input_size)
hidden = torch.zeros(num_layers,batch_size,hidden_size)
out,hidden = cell(inputs,hidden)
print('Ouput size',out.shape)
print('Output:',out)
print('gidden size',hidden.shape)
print('hidden:',hidden)
3 hello->ohlol
代码实现:
# 使用RNN
import torch
from torch import nn, optim
from torchvision import datasets
from torch.utils.data import DataLoader
from torchvision import transforms
import torch.nn.functional as F
import matplotlib.pyplot as plt
input_size = 4
hidden_size = 4
num_layers = 1
batch_size = 1
seq_len = 5
# 准备数据
idx2char = ['e', 'h', 'l', 'o']
x_data = [1, 0, 2, 2, 3] # hello
y_data = [3, 1, 2, 3, 2] # ohlol
one_hot_lookup = [[1, 0, 0, 0],
[0, 1, 0, 0],
[0, 0, 1, 0],
[0, 0, 0, 1]] # 分别对应0,1,2,3项
x_one_hot = [one_hot_lookup[x] for x in x_data] # 组成序列张量
print('x_one_hot:', x_one_hot)
# 构造输入序列和标签
inputs = torch.Tensor(x_one_hot).view(seq_len, batch_size, input_size)
labels = torch.LongTensor(y_data) # labels维度是: (seqLen * batch_size ,1)
# design model
class Model(torch.nn.Module):
def __init__(self, input_size, hidden_size, batch_size, num_layers=1):
super(Model, self).__init__()
self.num_layers = num_layers
self.batch_size = batch_size
self.input_size = input_size
self.hidden_size = hidden_size
self.rnn = torch.nn.RNN(input_size=self.input_size,
hidden_size=self.hidden_size,
num_layers=self.num_layers)
def forward(self, input):
hidden = torch.zeros(self.num_layers, self.batch_size, self.hidden_size)
out, _ = self.rnn(input, hidden)
# 为了能和labels做交叉熵,需要reshape一下:(seqlen*batchsize, hidden_size),即二维向量,变成一个矩阵
return out.view(-1, self.hidden_size)
net = Model(input_size, hidden_size, batch_size, num_layers)
# loss and optimizer
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(net.parameters(), lr=0.05)
# train cycle
if __name__ == '__main__':
epoch_list = []
loss_list = []
for epoch in range(20):
optimizer.zero_grad()
# inputs维度是: (seqLen, batch_size, input_size) labels维度是: (seqLen * batch_size * 1)
# outputs维度是: (seqLen, batch_size, hidden_size)
outputs = net(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
_, idx = outputs.max(dim=1)
idx = idx.data.numpy()
print('Predicted: ', ''.join([idx2char[x] for x in idx]), end='')
print(',Epoch [%d/20] loss=%.3f' % (epoch + 1, loss.item()))
epoch_list.append(epoch)
loss_list.append(loss.item())
plt.plot(epoch_list, loss_list)
plt.xlabel('epoch')
plt.ylabel('loss')
plt.show()
第十三讲 循环神经网络(高级篇)
双向循环神经网络
import torch
import csv
from torch.utils.data import DataLoader,Dataset
from torch.nn.utils.rnn import pack_padded_sequence
import matplotlib.pyplot as plt
# 输入姓名 输出属于哪个国家
import gzip
import time
HIDDEN_SIZE = 100
BATCH_SIZE = 1024
N_LAYER = 2
N_EPOCHS = 100
N_CHARS = 128
USE_GPU = False
start = time.time()
############################################################################# 读入数据
class NameDataset(Dataset):
def __init__(self,is_train_set=True):
filename = './names_train.csv' if is_train_set else './names_test.csv'
with open (filename,'rt') as f:
reader = csv.reader(f)
rows = list(reader)
self.names = [row[0] for row in rows] #第一列人名
self.len = len(self.names) #名字长度
self.countries = [row[1] for row in rows] #对应国家名
self.country_list = list(sorted(set(self.countries))) #对国家名长度排序
self.country_dict = self.getCountryDict() #构造字典 key:国家名 value:index
self.country_num = len(self.country_list) #国家个数
def __getitem__(self, index): #必须重写__getitem__和__len__方法
return self.names[index],self.country_dict[self.countries[index]]
def __len__(self):
return self.len
def getCountryDict(self):
country_dict = dict()
for idx,country_name in enumerate(self.country_list,0):
country_dict[country_name] = idx
return country_dict
def idx2country(self,index):
return self.country_list[index]
def getCountriesNum(self):
return self.country_num
trainset = NameDataset(is_train_set=True)
trainloader = DataLoader(trainset, batch_size=BATCH_SIZE, shuffle=True)
testset = NameDataset(is_train_set=False)
testloader = DataLoader(testset, batch_size=BATCH_SIZE, shuffle=True)
N_COUNTRY = trainset.getCountriesNum()
############################################################################## 模型
class RNNClassifier(torch.nn.Module):
def __init__(self,input_size,hidden_size,output_size,n_layers=1,bidirectional=True):
super(RNNClassifier,self).__init__()
self.hidden_size = hidden_size
self.n_layers=n_layers
self.n_directions = 2 if bidirectional else 1 #单向还是双向循环神经网络
self.embedding = torch.nn.Embedding(input_size,hidden_size)
self.gru = torch.nn.GRU(hidden_size,hidden_size,n_layers,bidirectional=bidirectional)
self.fc = torch.nn.Linear(hidden_size*self.n_directions,output_size) #如果是双向则维度*2
def _init_hidden(self,batch_size):
hidden = torch.zeros(self.n_layers*self.n_directions,batch_size,self.hidden_size)
return create_tensor(hidden)
def forward(self,input,seq_lengths):
#input shape Batchsize*SeqLen->SeqLen*Batchsize
input = input.t() #矩阵转置
batch_size = input.size(1)
hidden = self._init_hidden(batch_size)
embedding = self.embedding(input)
# pack them up
gru_input = pack_padded_sequence(embedding,seq_lengths) ### make be sorted by descendent 打包变长序列
output , hidden = self.gru(gru_input,hidden)
if self.n_directions == 2:
hidden_cat = torch.cat([hidden[-1],hidden[-2]],dim=1)
else:
hidden_cat = hidden[-1]
fc_output = self.fc(hidden_cat)
return fc_output
############################################################################ 数据处理
def create_tensor(tensor):
if USE_GPU:
device = torch.device("cuda:0")
tensor = tensor.to(device)
return tensor
def name2list(name): #返回ascll值和长度
arr = [ord(c) for c in name]
return arr, len(arr)
def make_tensors(names,countries):
sequences_and_lengths = [name2list(name) for name in names]
name_sequences = [sl[0] for sl in sequences_and_lengths] #名字的ascll值
seq_lengths = torch.LongTensor([sl[1] for sl in sequences_and_lengths]) #单独把列表长度拿出来 (名字的长度)
countries = countries.long()
#make tensor of name,BatchSize x SeqLen padding
seq_tensor = torch.zeros(len(name_sequences),seq_lengths.max()).long() #先做一个batchsize*max(seq_lengths)全0的张量
for idx,(seq,seq_len) in enumerate(zip(name_sequences,seq_lengths),0):
seq_tensor[idx,:seq_len]= torch.LongTensor(seq) #把数据贴到全0的张量上去
#sort by length to use pack_padded_sequence
seq_lengths,perm_idx = seq_lengths.sort(dim=0,descending=True) #sort返回排完序的序列和对应的index
seq_tensor = seq_tensor[perm_idx]
countries = countries[perm_idx]
return create_tensor(seq_tensor),\
create_tensor(seq_lengths),\
create_tensor(countries)
############################################################################ 训练测试模块
def trainModel():
total_loss = 0
for i,(names,countries) in enumerate(trainloader,1):
inputs,seq_lengths,target = make_tensors(names,countries)
output = classifier(inputs,seq_lengths)
loss = criterion(output,target)
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss+=loss.item()
if i%10==0:
print(f'[{time_since(start)}] Epoch{epoch}',end='')
print(f'[{i*len(inputs)}/{ len(trainset)}]',end='')
print(f'loss={total_loss/(i*len(inputs))}')
return total_loss
def testModel():
correct = 0
total = len(testset)
print("evaluating trained model...")
with torch.no_grad():
for i,(names,countrise) in enumerate(testloader,1):
inputs,seq_lengths,target = make_tensors(names,countrise)
output = classifier(inputs,seq_lengths)
pred = output.max(dim = 1,keepdim=True)[1]
correct+=pred.eq(target.view_as(pred)).sum().item()
percent = '%.2f'%(100*correct/total)
print(f'Test set:Accuracy {correct}/{total} {percent}%')
return correct/total
def time_since(start):
"""
计算给定时间戳 `start` 与当前时间之间的时间差
"""
return time.time() - start
if __name__=='__main__':
classifier = RNNClassifier(N_CHARS, HIDDEN_SIZE, N_COUNTRY, N_LAYER)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
if USE_GPU:
device = torch.device("cuda:0")
classifier.to(device)
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(classifier.parameters(), lr=0.001)
print("Training for %d epochs..." % N_EPOCHS)
acc_list = []
epoch_list=[]
for epoch in range(1,N_EPOCHS+1):
trainModel()
acc=testModel()
acc_list.append(acc)
epoch_list.append(epoch)
plt.plot(epoch_list,acc_list)
plt.ylabel('Accuracy')
plt.xlabel('epoch')
plt.grid()
plt.show()
我也再看刘老师的。加油
加油加油