思路是这样子的:从手搓代码的角度去学习transformer,代码会一个一个模块地从头到尾添加,以便学习者跟着敲,到最后再手搓一个基于tansformer的机器翻译实战项目。
transformer整体架构
一、输入部分
词向量
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import math
import matplotlib.pyplot as plt
import numpy as np
import copyembedding = nn.Embedding(10, 3)
input = torch.LongTensor([[1, 2, 3, 4], [5, 6, 7, 8]])
print(embedding(input))"""
tensor([[[ 1.1585, -0.2142, 0.2379], #1[-0.0137, 0.4797, -1.0865], #2[ 0.7403, -1.1992, -0.0105], #3[-1.7339, -0.1899, -0.7764]], #4[[ 1.0883, -0.4474, -0.4151], #5[-0.8517, -0.2821, 1.3511], #6[-0.9131, -0.0999, -0.1846], #7[-3.0283, 2.6045, -1.3109]]], #8grad_fn=<EmbeddingBackward0>)
"""embedding = nn.Embedding(10, 3,padding_idx=0)
input = torch.LongTensor([[1, 2, 3, 4], [5, 6, 7, 8]])
print(embedding(input))
"""
tensor([[[-0.4958, -1.1462, 0.2109],[ 1.1422, -0.4182, 0.2201],[-0.7329, 1.1556, -0.0757],[ 1.3903, 0.3619, 0.5569]],[[ 0.0434, 2.1415, 0.2626],[ 0.3113, -0.2618, -1.6705],[ 0.8060, 0.1640, 1.4943],[-0.5313, 0.7362, 0.9071]]], grad_fn=<EmbeddingBackward0>)
"""
⭐ 1.词嵌入层
变成高纬度的词向量,以便保存更多的词
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import math
import matplotlib.pyplot as plt
import numpy as np
import copyclass Embeddings(nn.Module):def __init__(self, d_model, vocab):super(Embeddings, self).__init__()self.lut = nn.Embedding(vocab, d_model)self.d_model = d_modeldef forward(self, x):return self.lut(x) * math.sqrt(self.d_model)d_model=512
vocab=1000
x = Variable(torch.LongTensor([[100,2,421,508],[491,998,1,221]]))
emb = Embeddings(d_model, vocab)
embr = emb(x)
print("embr:", embr)
print("形状:", embr.shape)"""
embr: tensor([[[ 13.7968, -14.4595, 28.3401, ..., 1.9699, -16.2531, 0.4690],[ 20.9855, 10.0422, 0.5572, ..., 33.0242, 20.5869, 27.3373],[-25.8328, -20.8624, 15.1385, ..., -38.3399, -33.6920, -15.9326],[-19.9724, 17.2694, 22.7562, ..., -25.8548, -47.9648, 38.4995]],[[-49.9396, -43.8692, -24.5790, ..., 2.9931, -34.2201, 1.7027],[ -2.4900, 15.1773, -7.8220, ..., 19.9114, -24.9212, 11.0202],[ 21.6143, -0.7228, -11.8343, ..., -0.3574, -21.0696, 13.9079],[ 26.5733, 2.4455, -26.7212, ..., -38.3939, -1.6351, -32.0217]]],grad_fn=<MulBackward0>)
形状: torch.Size([2, 4, 512])
"""
nn.Dropout演示
torch.unsqueeze演示
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import math
import matplotlib.pyplot as plt
import numpy as np
import copy# nn.Dropout演示
m = nn.Dropout(p=0.2)
input = torch.randn(4, 5)
output = m(input)
print(output)
"""
tensor([[ 0.5801, -0.8529, 0.2143, -0.5226, 0.0000],[ 0.2660, 0.8704, -1.8572, -0.0000, -2.0312],[-0.0000, -1.1344, -0.3601, -1.9231, -0.0159],[ 0.0000, 0.0000, 0.1374, -1.6314, -0.0000]])
"""# torch.unsqueeze演示
x = torch.tensor([1, 2, 3, 4])
print(torch.unsqueeze(x, 0))
print(torch.unsqueeze(x, 1))
"""
tensor([[1, 2, 3, 4]])
tensor([[1],[2],[3],[4]])
"""
⭐ 2.位置编码器
词与词之间存在位置关系
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import math
import matplotlib.pyplot as plt
import numpy as np
import copy# 1.文本嵌入层
class Embeddings(nn.Module):def __init__(self, d_model, vocab):super(Embeddings, self).__init__()self.lut = nn.Embedding(vocab, d_model)self.d_model = d_modeldef forward(self, x):return self.lut(x) * math.sqrt(self.d_model)# 2.位置编码器类
class PositionalEncoding(nn.Module):def __init__(self, d_model, dropout, max_len=5000):super(PositionalEncoding, self).__init__()self.dropout = nn.Dropout(p=dropout)pe = torch.zeros(max_len, d_model)position = torch.arange(0, max_len).unsqueeze(1)div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))pe[:, 0::2] = torch.sin(position * div_term)pe[:, 1::2] = torch.cos(position * div_term)pe = pe.unsqueeze(0)self.register_buffer('pe', pe)def forward(self, x):x = x + Variable(self.pe[:, :x.size(1)], requires_grad=False)return self.dropout(x)d_model = 512
dropout = 0.1
max_len = 60vocab = 1000
x = Variable(torch.LongTensor([[100, 2, 421, 508], [491, 998, 1, 221]]))
emb = Embeddings(d_model, vocab)
embr = emb(x)x = embr
pe = PositionalEncoding(d_model, dropout, max_len)
pe_result = pe(x)
print("pe结果:", pe_result)
print(pe_result.shape)
"""
pe结果: tensor([[[ 26.0749, 1.7394, 18.1979, ..., 17.9599, 17.3468, 24.0999],[-26.7084, 29.3180, 41.9102, ..., -37.2804, -4.7909, 1.0968],[-41.5618, 1.7244, 2.7057, ..., -0.0000, 47.2770, -13.1729],[ 25.5094, 32.7570, 51.9276, ..., -12.3927, 5.0286, -28.2805]],[[ 16.1884, -7.0750, -18.7670, ..., -15.6387, 7.5007, 51.3489],[-32.2040, 36.8715, 11.7979, ..., -17.9770, 65.2743, 34.6677],[ 3.7295, -16.0210, -24.0060, ..., 25.5953, 13.9014, -0.0000],[-11.5124, -16.6056, -17.1153, ..., -21.1416, -28.6649, -24.2164]]],grad_fn=<MulBackward0>)
torch.Size([2, 4, 512])
"""
绘制词汇向量中特征的分布曲线
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import math
import matplotlib.pyplot as plt
import numpy as np
import copy# 1.文本嵌入层
class Embeddings(nn.Module):def __init__(self, d_model, vocab):super(Embeddings, self).__init__()self.lut = nn.Embedding(vocab, d_model)self.d_model = d_modeldef forward(self, x):return self.lut(x) * math.sqrt(self.d_model)# 2.位置编码器类
class PositionalEncoding(nn.Module):def __init__(self, d_model, dropout, max_len=5000):super(PositionalEncoding, self).__init__()self.dropout = nn.Dropout(p=dropout)pe = torch.zeros(max_len, d_model)position = torch.arange(0, max_len).unsqueeze(1)div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))pe[:, 0::2] = torch.sin(position * div_term)pe[:, 1::2] = torch.cos(position * div_term)pe = pe.unsqueeze(0)self.register_buffer('pe', pe)def forward(self, x):x = x + Variable(self.pe[:, :x.size(1)], requires_grad=False)return self.dropout(x)d_model = 512
dropout = 0.1
max_len = 60vocab = 1000
x = Variable(torch.LongTensor([[100, 2, 421, 508], [491, 998, 1, 221]]))
emb = Embeddings(d_model, vocab)
embr = emb(x)x = embr
pe = PositionalEncoding(d_model, dropout, max_len)
pe_result = pe(x)# 3.绘制词汇向量中特征的分布曲线
plt.figure(figsize=(15, 5)) # 创建一张15 x 5大小的画布
pe = PositionalEncoding(20, 0)
y = pe(Variable(torch.zeros(1, 100, 20)))
plt.plot(np.arange(100), y[0, :, 4:8].data.numpy())
plt.legend(["dim %d" % p for p in [4, 5, 6, 7]])
plt.show()
二、编码部分
- 由N个编码器层堆叠而成
- 每个编码器层由两个子层连接结构组成
- 第一个子层连接结构包括一个多头自注意力子层和规范化层以及一个残差连接
- 第二个子层连接结构包括一个前馈全连接子层和规范化层以及一个残差连接
掩码张量
np.triu演示
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import math
import matplotlib.pyplot as plt
import numpy as np
import copy# np.triu演示
print(np.triu([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]], k=-1))
print(np.triu([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]], k=0))
print(np.triu([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]], k=1))
"""
[[ 1 2 3][ 4 5 6][ 0 8 9][ 0 0 12]]
[[1 2 3][0 5 6][0 0 9][0 0 0]]
[[0 2 3][0 0 6][0 0 0][0 0 0]]
"""
⭐3.掩码张量函数
多次输入学习,最后学到最好。已经知道下文,所以要掩盖。
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import math
import matplotlib.pyplot as plt
import numpy as np
import copy# 一、输入部分
# 1.文本嵌入层
class Embeddings(nn.Module):def __init__(self, d_model, vocab):super(Embeddings, self).__init__()self.lut = nn.Embedding(vocab, d_model)self.d_model = d_modeldef forward(self, x):return self.lut(x) * math.sqrt(self.d_model)# 2.位置编码器
class PositionalEncoding(nn.Module):def __init__(self, d_model, dropout, max_len=5000):super(PositionalEncoding, self).__init__()self.dropout = nn.Dropout(p=dropout)pe = torch.zeros(max_len, d_model)position = torch.arange(0, max_len).unsqueeze(1)div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))pe[:, 0::2] = torch.sin(position * div_term)pe[:, 1::2] = torch.cos(position * div_term)pe = pe.unsqueeze(0)self.register_buffer('pe', pe)def forward(self, x):x = x + Variable(self.pe[:, :x.size(1)], requires_grad=False)return self.dropout(x)d_model = 512
dropout = 0.1
max_len = 60vocab = 1000
x = Variable(torch.LongTensor([[100, 2, 421, 508], [491, 998, 1, 221]]))
emb = Embeddings(d_model, vocab)
embr = emb(x)x = embr
pe = PositionalEncoding(d_model, dropout, max_len)
pe_result = pe(x)# 绘制词汇向量中特征的分布曲线
plt.figure(figsize=(15, 5)) # 创建一张15 x 5大小的画布
pe = PositionalEncoding(20, 0)
y = pe(Variable(torch.zeros(1, 100, 20)))
plt.plot(np.arange(100), y[0, :, 4:8].data.numpy())
plt.legend(["dim %d" % p for p in [4, 5, 6, 7]])
# plt.show()# 二、编码部分
# 1.掩码张量函数
def subsequent_mask(size):attn_shape = (1, size, size)subsequent_mask = np.triu(np.ones(attn_shape), k=1).astype('uint8')return torch.from_numpy(1 - subsequent_mask)
# 生成的掩码张量的最后两维的大小
size = 5
sm = subsequent_mask(size)
print("掩码张量:", sm)
"""
掩码张量: tensor([[[1, 0, 0, 0, 0],[1, 1, 0, 0, 0],[1, 1, 1, 0, 0],[1, 1, 1, 1, 0],[1, 1, 1, 1, 1]]], dtype=torch.uint8)
"""# 掩码张量的可视化
plt.figure(figsize=(5,5))
plt.imshow(subsequent_mask(20)[0])
plt.show()
注意力机制
你在做一道题,key是提示,query是详细答案,value是你看完答案后你自己写的答案
tensor.masked_fill演示
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import math
import matplotlib.pyplot as plt
import numpy as np
import copyinput = Variable(torch.randn(5, 5))
print(input)mask = Variable(torch.zeros(5, 5))
print(mask)input.masked_fill(mask == 0, -1e9)
print(input)
"""
tensor([[-2.0163, -0.7226, -0.5435, 0.3623, 0.7278],[-0.8157, -0.6707, -1.4750, -0.4648, 0.4925],[ 0.7696, -0.9166, -0.2969, -0.0952, -0.0676],[ 0.6840, 0.4322, 1.5707, -0.2410, 0.9939],[ 0.2432, -0.8106, -0.8171, 2.3484, -0.3595]])
tensor([[0., 0., 0., 0., 0.],[0., 0., 0., 0., 0.],[0., 0., 0., 0., 0.],[0., 0., 0., 0., 0.],[0., 0., 0., 0., 0.]])
tensor([[-2.0163, -0.7226, -0.5435, 0.3623, 0.7278],[-0.8157, -0.6707, -1.4750, -0.4648, 0.4925],[ 0.7696, -0.9166, -0.2969, -0.0952, -0.0676],[ 0.6840, 0.4322, 1.5707, -0.2410, 0.9939],[ 0.2432, -0.8106, -0.8171, 2.3484, -0.3595]])
"""
⭐4.注意力机制
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import math
import matplotlib.pyplot as plt
import numpy as np
import copy# 一、输入部分
# 1.文本嵌入层
class Embeddings(nn.Module):def __init__(self, d_model, vocab):super(Embeddings, self).__init__()self.lut = nn.Embedding(vocab, d_model)self.d_model = d_modeldef forward(self, x):return self.lut(x) * math.sqrt(self.d_model)# 2.位置编码器
class PositionalEncoding(nn.Module):def __init__(self, d_model, dropout, max_len=5000):super(PositionalEncoding, self).__init__()self.dropout = nn.Dropout(p=dropout)pe = torch.zeros(max_len, d_model)position = torch.arange(0, max_len).unsqueeze(1)div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))pe[:, 0::2] = torch.sin(position * div_term)pe[:, 1::2] = torch.cos(position * div_term)pe = pe.unsqueeze(0)self.register_buffer('pe', pe)def forward(self, x):x = x + Variable(self.pe[:, :x.size(1)], requires_grad=False)return self.dropout(x)d_model = 512
dropout = 0.1
max_len = 60vocab = 1000
x = Variable(torch.LongTensor([[100, 2, 421, 508], [491, 998, 1, 221]]))
emb = Embeddings(d_model, vocab)
embr = emb(x)x = embr
pe = PositionalEncoding(d_model, dropout, max_len)
pe_result = pe(x)# 绘制词汇向量中特征的分布曲线
plt.figure(figsize=(15, 5)) # 创建一张15 x 5大小的画布
pe = PositionalEncoding(20, 0)
y = pe(Variable(torch.zeros(1, 100, 20)))
plt.plot(np.arange(100), y[0, :, 4:8].data.numpy())
plt.legend(["dim %d" % p for p in [4, 5, 6, 7]])# plt.show()# 二、编码部分
# 1.掩码张量函数
def subsequent_mask(size):attn_shape = (1, size, size)subsequent_mask = np.triu(np.ones(attn_shape), k=1).astype('uint8')return torch.from_numpy(1 - subsequent_mask)# 掩码张量的可视化
plt.figure(figsize=(5, 5))
plt.imshow(subsequent_mask(20)[0])
# plt.show()# 2.注意力机制
def attention(query, key, value, mask=None, dropout=None):d_k = query.size(-1)scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)if mask is not None:scores = scores.masked_fill(mask == 0, -1e9)p_attn = F.softmax(scores, dim=-1)if dropout is not None:p_attn = dropout(p_attn)return torch.matmul(p_attn, value), p_attnquery = key = value = pe_result
attn, p_attn = attention(query, key, value)
print("query的注意力表示:", attn) # 2x4x512
print("注意力张量:", p_attn) # size 2x4x4print("*****************************************************************")
# 带有mask的输入参数
query = key = value = pe_result
mask = Variable(torch.zeros(2, 4, 4))
attn, p_attn = attention(query, key, value, mask=mask)
print("query的注意力表示:", attn) # size 2x4x512
print("注意力张量:", p_attn) # size 2x4x4"""
query的注意力表示: tensor([[[-1.0675e+01, -8.0456e+00, -2.2159e+01, ..., -1.7814e+01,3.0499e+01, 4.1339e+01],[ 3.2106e+01, 2.4037e+01, 1.3494e+01, ..., 2.4034e+01,1.8157e+00, -2.0683e+01],[ 6.6581e+00, 1.4371e+01, 1.6482e+01, ..., -9.3249e-01,1.4465e+01, -2.8638e+01],[-1.4626e+00, -8.2685e+00, 4.5742e+01, ..., 3.5178e+01,1.2451e+01, -5.7837e+00]],[[ 0.0000e+00, 1.4930e+01, 2.3648e+00, ..., -1.5506e+01,-3.2476e+01, -9.5132e+00],[ 0.0000e+00, 4.5180e-02, -3.4786e+01, ..., 9.0967e+00,-9.1057e+00, -2.0643e+01],[ 0.0000e+00, -6.6465e+00, -7.8801e+00, ..., 5.4841e+00,3.9251e+01, 2.5519e+01],[ 0.0000e+00, 2.9907e+01, -9.8955e+00, ..., -8.6210e+00,0.0000e+00, 0.0000e+00]]], grad_fn=<UnsafeViewBackward0>)
注意力张量: tensor([[[1., 0., 0., 0.],[0., 1., 0., 0.],[0., 0., 1., 0.],[0., 0., 0., 1.]],[[1., 0., 0., 0.],[0., 1., 0., 0.],[0., 0., 1., 0.],[0., 0., 0., 1.]]], grad_fn=<SoftmaxBackward0>)
*****************************************************************
query的注意力表示: tensor([[[ 6.6567, 5.5234, 13.3898, ..., 10.1163, 14.8077, -3.4414],[ 6.6567, 5.5234, 13.3898, ..., 10.1163, 14.8077, -3.4414],[ 6.6567, 5.5234, 13.3898, ..., 10.1163, 14.8077, -3.4414],[ 6.6567, 5.5234, 13.3898, ..., 10.1163, 14.8077, -3.4414]],[[ 0.0000, 9.5590, -12.5492, ..., -2.3865, -0.5825, -1.1594],[ 0.0000, 9.5590, -12.5492, ..., -2.3865, -0.5825, -1.1594],[ 0.0000, 9.5590, -12.5492, ..., -2.3865, -0.5825, -1.1594],[ 0.0000, 9.5590, -12.5492, ..., -2.3865, -0.5825, -1.1594]]],grad_fn=<UnsafeViewBackward0>)
注意力张量: tensor([[[0.2500, 0.2500, 0.2500, 0.2500],[0.2500, 0.2500, 0.2500, 0.2500],[0.2500, 0.2500, 0.2500, 0.2500],[0.2500, 0.2500, 0.2500, 0.2500]],[[0.2500, 0.2500, 0.2500, 0.2500],[0.2500, 0.2500, 0.2500, 0.2500],[0.2500, 0.2500, 0.2500, 0.2500],[0.2500, 0.2500, 0.2500, 0.2500]]], grad_fn=<SoftmaxBackward0>)
"""
多头注意力机制
一千个哈姆雷特有《哈姆雷特》
tensor.view演示
torch.transpose演示
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import math
import matplotlib.pyplot as plt
import numpy as np
import copy# tensor.view演示
x = torch.randn(4, 4)
print(x.size())# torch.Size([4, 4])y = x.view(16)
print(y.size())# torch.Size([16])z = x.view(-1, 8)
print(z.size())# torch.Size([2, 8])a = torch.randn(1, 2, 3, 4)
print(a.size())# torch.Size([1, 2, 3, 4])b = a.transpose(1, 2)# 序号为1的和序号为2的交换位置
print(b.size())# torch.Size([1, 3, 2, 4])c = a.view(1, 3, 2, 4)
print(c.size())# torch.Size([1, 3, 2, 4])
print(torch.equal(b, c))# False# torch.transpose演示
x = torch.randn(2, 3)
print(x)
# tensor([[-0.8869, 1.2497, 0.3226],
# [-0.6379, -1.4205, -1.2025]])
print(torch.transpose(x, 0, 1))
# tensor([[-0.8869, -0.6379],
# [ 1.2497, -1.4205],
# [ 0.3226, -1.2025]])
⭐5.多头注意力机制
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import math
import matplotlib.pyplot as plt
import numpy as np
import copy# 一、输入部分
# 1.文本嵌入层
class Embeddings(nn.Module):def __init__(self, d_model, vocab):super(Embeddings, self).__init__()self.lut = nn.Embedding(vocab, d_model)self.d_model = d_modeldef forward(self, x):return self.lut(x) * math.sqrt(self.d_model)# 2.位置编码器
class PositionalEncoding(nn.Module):def __init__(self, d_model, dropout, max_len=5000):super(PositionalEncoding, self).__init__()self.dropout = nn.Dropout(p=dropout)pe = torch.zeros(max_len, d_model)position = torch.arange(0, max_len).unsqueeze(1)div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))pe[:, 0::2] = torch.sin(position * div_term)pe[:, 1::2] = torch.cos(position * div_term)pe = pe.unsqueeze(0)self.register_buffer('pe', pe)def forward(self, x):x = x + Variable(self.pe[:, :x.size(1)], requires_grad=False)return self.dropout(x)d_model = 512
dropout = 0.1
max_len = 60vocab = 1000
x = Variable(torch.LongTensor([[100, 2, 421, 508], [491, 998, 1, 221]]))
emb = Embeddings(d_model, vocab)
embr = emb(x)x = embr
pe = PositionalEncoding(d_model, dropout, max_len)
pe_result = pe(x)# 绘制词汇向量中特征的分布曲线
plt.figure(figsize=(15, 5)) # 创建一张15 x 5大小的画布
pe = PositionalEncoding(20, 0)
y = pe(Variable(torch.zeros(1, 100, 20)))
plt.plot(np.arange(100), y[0, :, 4:8].data.numpy())
plt.legend(["dim %d" % p for p in [4, 5, 6, 7]])# plt.show()# 二、编码部分
# 1.掩码张量函数
def subsequent_mask(size):attn_shape = (1, size, size)subsequent_mask = np.triu(np.ones(attn_shape), k=1).astype('uint8')return torch.from_numpy(1 - subsequent_mask)# 掩码张量的可视化
plt.figure(figsize=(5, 5))
plt.imshow(subsequent_mask(20)[0])# plt.show()# 2.注意力机制
def attention(query, key, value, mask=None, dropout=None):d_k = query.size(-1)scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)if mask is not None:scores = scores.masked_fill(mask == 0, -1e9)p_attn = F.softmax(scores, dim=-1)if dropout is not None:p_attn = dropout(p_attn)return torch.matmul(p_attn, value), p_attnquery = key = value = pe_result
attn, p_attn = attention(query, key, value)
# print("query的注意力表示:", attn) # 2x4x512
# print("注意力张量:", p_attn) # size 2x4x4
#
# print("*****************************************************************")
# 带有mask的输入参数
query = key = value = pe_result
mask = Variable(torch.zeros(2, 4, 4))
attn, p_attn = attention(query, key, value, mask=mask)# print("query的注意力表示:", attn) # size 2x4x512
# print("注意力张量:", p_attn) # size 2x4x4# 深拷贝
def clones(module, N):return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])# 多头注意力机制
class MultiHeadedAttention(nn.Module):def __init__(self, head, embedding_dim, dropout=0.1):super(MultiHeadedAttention, self).__init__()assert embedding_dim % head == 0self.d_k = embedding_dim // headself.head = head# 在多头注意力中,Q,K,V各需要一个,最后拼接的矩阵还需要一个,一共是4个self.linears = clones(nn.Linear(embedding_dim, embedding_dim), 4)self.attn = Noneself.dropout = nn.Dropout(p=dropout)def forward(self, query, key, value, mask=None):if mask is not None:mask = mask.unsqueeze(0)batch_size = query.size(0)query, key, value = [model(x).view(batch_size, -1, self.head, self.d_k).transpose(1, 2)for model, x in zip(self.linears, (query, key, value))]x, self.attn = attention(query, key, value, mask=mask, dropout=self.dropout)x = x.transpose(1, 2).contiguous().view(batch_size, -1, self.head * self.d_k)return self.linears[-1](x)head = 8
embedding_dim = 512
dropout = 0.2
query = value = key = pe_result
mask = Variable(torch.zeros(8, 4, 4))
mha = MultiHeadedAttention(head, embedding_dim, dropout)
mha_result = mha(query, key, value, mask)
print(mha_result)
"""
tensor([[[ 5.1117, 2.7441, -3.6746, ..., 5.4250, 2.4214, 0.8056],[ 6.1471, 2.2109, -3.5177, ..., 5.3436, 3.8831, 4.9805],[ 1.4831, 0.4307, -2.5829, ..., 2.0772, 0.9475, 3.2005],[ 3.5892, 2.9082, -1.7384, ..., 2.9132, 4.1973, 5.0990]],[[ -1.3965, -6.1177, -7.4958, ..., -0.5587, -6.4261, -3.2176],[ -1.2701, -4.3102, -6.2340, ..., -4.0173, -3.0431, -0.6736],[ 0.8762, -5.1155, -6.8253, ..., -4.9823, -1.4425, -2.7415],[ 0.3864, -8.2357, -11.1042, ..., 0.3552, -4.3414, -4.0765]]],grad_fn=<ViewBackward0>)
"""
⭐6.前馈全连接层
注意力机制可能对复杂过程的拟合程度不够, 通过增加两层网络来增强模型的能力
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import math
import matplotlib.pyplot as plt
import numpy as np
import copy# 一、输入部分
# 1.文本嵌入层
class Embeddings(nn.Module):def __init__(self, d_model, vocab):super(Embeddings, self).__init__()self.lut = nn.Embedding(vocab, d_model)self.d_model = d_modeldef forward(self, x):return self.lut(x) * math.sqrt(self.d_model)# 2.位置编码器
class PositionalEncoding(nn.Module):def __init__(self, d_model, dropout, max_len=5000):super(PositionalEncoding, self).__init__()self.dropout = nn.Dropout(p=dropout)pe = torch.zeros(max_len, d_model)position = torch.arange(0, max_len).unsqueeze(1)div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))pe[:, 0::2] = torch.sin(position * div_term)pe[:, 1::2] = torch.cos(position * div_term)pe = pe.unsqueeze(0)self.register_buffer('pe', pe)def forward(self, x):x = x + Variable(self.pe[:, :x.size(1)], requires_grad=False)return self.dropout(x)d_model = 512
dropout = 0.1
max_len = 60vocab = 1000
x = Variable(torch.LongTensor([[100, 2, 421, 508], [491, 998, 1, 221]]))
emb = Embeddings(d_model, vocab)
embr = emb(x)x = embr
pe = PositionalEncoding(d_model, dropout, max_len)
pe_result = pe(x)# 绘制词汇向量中特征的分布曲线
plt.figure(figsize=(15, 5)) # 创建一张15 x 5大小的画布
pe = PositionalEncoding(20, 0)
y = pe(Variable(torch.zeros(1, 100, 20)))
plt.plot(np.arange(100), y[0, :, 4:8].data.numpy())
plt.legend(["dim %d" % p for p in [4, 5, 6, 7]])# plt.show()# 二、编码部分
# 1.掩码张量函数
def subsequent_mask(size):attn_shape = (1, size, size)subsequent_mask = np.triu(np.ones(attn_shape), k=1).astype('uint8')return torch.from_numpy(1 - subsequent_mask)# 掩码张量的可视化
plt.figure(figsize=(5, 5))
plt.imshow(subsequent_mask(20)[0])# plt.show()# 2.注意力机制
def attention(query, key, value, mask=None, dropout=None):d_k = query.size(-1)scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)if mask is not None:scores = scores.masked_fill(mask == 0, -1e9)p_attn = F.softmax(scores, dim=-1)if dropout is not None:p_attn = dropout(p_attn)return torch.matmul(p_attn, value), p_attnquery = key = value = pe_result
attn, p_attn = attention(query, key, value)
# print("query的注意力表示:", attn) # 2x4x512
# print("注意力张量:", p_attn) # size 2x4x4
#
# print("*****************************************************************")
# 带有mask的输入参数
query = key = value = pe_result
mask = Variable(torch.zeros(2, 4, 4))
attn, p_attn = attention(query, key, value, mask=mask)
# print("query的注意力表示:", attn) # size 2x4x512
# print("注意力张量:", p_attn) # size 2x4x4# 3.多头注意力机制
# 深拷贝
def clones(module, N):return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])class MultiHeadedAttention(nn.Module):def __init__(self, head, embedding_dim, dropout=0.1):super(MultiHeadedAttention, self).__init__()assert embedding_dim % head == 0self.d_k = embedding_dim // headself.head = head# 在多头注意力中,Q,K,V各需要一个,最后拼接的矩阵还需要一个,一共是4个self.linears = clones(nn.Linear(embedding_dim, embedding_dim), 4)self.attn = Noneself.dropout = nn.Dropout(p=dropout)def forward(self, query, key, value, mask=None):if mask is not None:mask = mask.unsqueeze(0)batch_size = query.size(0)query, key, value = [model(x).view(batch_size, -1, self.head, self.d_k).transpose(1, 2)for model, x in zip(self.linears, (query, key, value))]x, self.attn = attention(query, key, value, mask=mask, dropout=self.dropout)x = x.transpose(1, 2).contiguous().view(batch_size, -1, self.head * self.d_k)return self.linears[-1](x)head = 8
embedding_dim = 512
dropout = 0.2
query = value = key = pe_result
mask = Variable(torch.zeros(8, 4, 4))
mha = MultiHeadedAttention(head, embedding_dim, dropout)
mha_result = mha(query, key, value, mask)
# print(mha_result)# 4.前馈全连接层
class PositionwiseFeedForward(nn.Module):def __init__(self, d_model, d_ff, dropout=0.1):super(PositionwiseFeedForward, self).__init__()self.w1 = nn.Linear(d_model, d_ff)self.w2 = nn.Linear(d_ff, d_model)self.dropout = nn.Dropout(dropout)def forward(self, x):return self.w2(self.dropout(F.relu(self.w1(x))))d_model = 512
d_ff = 64
dropout = 0.2
x = mha_result
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
ff_result = ff(x)
print(ff_result)
"""
tensor([[[ 1.3699, -0.0291, -0.3212, ..., -0.7105, 0.1728, -1.6720],[ 1.8951, 0.6111, -0.5830, ..., -1.4471, -0.2291, -2.0005],[ 1.1673, 0.0624, 0.8014, ..., 1.3812, -0.4503, -2.1730],[ 1.5105, 0.2297, 0.2027, ..., 1.0533, 0.9179, -0.9378]],[[-0.5993, 1.5654, -0.5952, ..., 0.9375, -0.1775, -2.4535],[ 0.1358, 1.8777, -0.6284, ..., 2.0970, 1.4326, -1.5991],[-0.4315, 0.3731, 0.6662, ..., 1.8709, 0.2463, -0.8921],[-0.6862, 1.1372, -0.1283, ..., 2.5608, 0.7814, -1.5519]]],grad_fn=<ViewBackward0>)
"""
⭐7.规范化层
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import math
import matplotlib.pyplot as plt
import numpy as np
import copy# 一、输入部分
# 1.文本嵌入层
class Embeddings(nn.Module):def __init__(self, d_model, vocab):super(Embeddings, self).__init__()self.lut = nn.Embedding(vocab, d_model)self.d_model = d_modeldef forward(self, x):return self.lut(x) * math.sqrt(self.d_model)# 2.位置编码器
class PositionalEncoding(nn.Module):def __init__(self, d_model, dropout, max_len=5000):super(PositionalEncoding, self).__init__()self.dropout = nn.Dropout(p=dropout)pe = torch.zeros(max_len, d_model)position = torch.arange(0, max_len).unsqueeze(1)div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))pe[:, 0::2] = torch.sin(position * div_term)pe[:, 1::2] = torch.cos(position * div_term)pe = pe.unsqueeze(0)self.register_buffer('pe', pe)def forward(self, x):x = x + Variable(self.pe[:, :x.size(1)], requires_grad=False)return self.dropout(x)d_model = 512
dropout = 0.1
max_len = 60vocab = 1000
x = Variable(torch.LongTensor([[100, 2, 421, 508], [491, 998, 1, 221]]))
emb = Embeddings(d_model, vocab)
embr = emb(x)x = embr
pe = PositionalEncoding(d_model, dropout, max_len)
pe_result = pe(x)# 绘制词汇向量中特征的分布曲线
plt.figure(figsize=(15, 5)) # 创建一张15 x 5大小的画布
pe = PositionalEncoding(20, 0)
y = pe(Variable(torch.zeros(1, 100, 20)))
plt.plot(np.arange(100), y[0, :, 4:8].data.numpy())
plt.legend(["dim %d" % p for p in [4, 5, 6, 7]])# plt.show()# 二、编码部分
# 1.掩码张量函数
def subsequent_mask(size):attn_shape = (1, size, size)subsequent_mask = np.triu(np.ones(attn_shape), k=1).astype('uint8')return torch.from_numpy(1 - subsequent_mask)# 掩码张量的可视化
plt.figure(figsize=(5, 5))
plt.imshow(subsequent_mask(20)[0])# plt.show()# 2.注意力机制
def attention(query, key, value, mask=None, dropout=None):d_k = query.size(-1)scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)if mask is not None:scores = scores.masked_fill(mask == 0, -1e9)p_attn = F.softmax(scores, dim=-1)if dropout is not None:p_attn = dropout(p_attn)return torch.matmul(p_attn, value), p_attnquery = key = value = pe_result
attn, p_attn = attention(query, key, value)
# print("query的注意力表示:", attn) # 2x4x512
# print("注意力张量:", p_attn) # size 2x4x4
#
# print("*****************************************************************")
# 带有mask的输入参数
query = key = value = pe_result
mask = Variable(torch.zeros(2, 4, 4))
attn, p_attn = attention(query, key, value, mask=mask)
# print("query的注意力表示:", attn) # size 2x4x512
# print("注意力张量:", p_attn) # size 2x4x4# 3.多头注意力机制
# 深拷贝
def clones(module, N):return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])class MultiHeadedAttention(nn.Module):def __init__(self, head, embedding_dim, dropout=0.1):super(MultiHeadedAttention, self).__init__()assert embedding_dim % head == 0self.d_k = embedding_dim // headself.head = head# 在多头注意力中,Q,K,V各需要一个,最后拼接的矩阵还需要一个,一共是4个self.linears = clones(nn.Linear(embedding_dim, embedding_dim), 4)self.attn = Noneself.dropout = nn.Dropout(p=dropout)def forward(self, query, key, value, mask=None):if mask is not None:mask = mask.unsqueeze(0)batch_size = query.size(0)query, key, value = [model(x).view(batch_size, -1, self.head, self.d_k).transpose(1, 2)for model, x in zip(self.linears, (query, key, value))]x, self.attn = attention(query, key, value, mask=mask, dropout=self.dropout)x = x.transpose(1, 2).contiguous().view(batch_size, -1, self.head * self.d_k)return self.linears[-1](x)head = 8
embedding_dim = 512
dropout = 0.2
query = value = key = pe_result
mask = Variable(torch.zeros(8, 4, 4))
mha = MultiHeadedAttention(head, embedding_dim, dropout)
mha_result = mha(query, key, value, mask)
# print(mha_result)# 4.前馈全连接层
class PositionwiseFeedForward(nn.Module):def __init__(self, d_model, d_ff, dropout=0.1):super(PositionwiseFeedForward, self).__init__()self.w1 = nn.Linear(d_model, d_ff)self.w2 = nn.Linear(d_ff, d_model)self.dropout = nn.Dropout(dropout)def forward(self, x):return self.w2(self.dropout(F.relu(self.w1(x))))d_model = 512
d_ff = 64
dropout = 0.2
x = mha_result
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
ff_result = ff(x)# 5.规范化层
# 通过LayerNorm实现规范化层的类
class LayerNorm(nn.Module):def __init__(self, features, eps=1e-6):super(LayerNorm, self).__init__()self.a2 = nn.Parameter(torch.ones(features))self.b2 = nn.Parameter(torch.zeros(features))self.eps = epsdef forward(self, x):mean = x.mean(-1, keepdim=True)std = x.std(-1, keepdim=True)return self.a2 * (x - mean) / (std + self.eps) + self.b2features = d_model = 512
eps = 1e-6
x = ff_result
ln = LayerNorm(features, eps)
ln_result = ln(x)
print(ln_result)
"""
tensor([[[-8.3460e-01, -4.5519e-01, -4.8425e-01, ..., 4.5406e-01,9.2949e-01, 9.3043e-01],[-1.1315e+00, -9.1994e-01, -6.4669e-01, ..., 7.5945e-01,7.6214e-01, 8.0217e-01],[-6.3322e-01, 4.7747e-01, -5.0195e-01, ..., 4.6353e-04,3.2654e-01, -1.6072e-02],[-9.1272e-01, -3.7506e-01, -1.4400e+00, ..., -2.3055e-01,4.1403e-01, -1.4555e-01]],[[-1.1166e-01, -1.3829e+00, -5.9005e-01, ..., 1.5550e+00,9.5446e-01, 4.0732e-02],[ 6.8869e-01, -8.0725e-01, -1.4566e+00, ..., 1.2550e+00,6.6449e-01, -1.1773e+00],[-5.8408e-01, -1.1875e+00, -7.8642e-01, ..., 1.1239e+00,6.7882e-01, 5.9670e-01],[ 9.4805e-01, -1.3687e+00, 2.0909e-01, ..., 6.0416e-01,2.0030e+00, -5.7529e-02]]], grad_fn=<AddBackward0>)
"""
⭐8.残差连接(子层连接、跳跃连接)
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import math
import matplotlib.pyplot as plt
import numpy as np
import copy# 一、输入部分
# 1.文本嵌入层
class Embeddings(nn.Module):def __init__(self, d_model, vocab):super(Embeddings, self).__init__()self.lut = nn.Embedding(vocab, d_model)self.d_model = d_modeldef forward(self, x):return self.lut(x) * math.sqrt(self.d_model)# 2.位置编码器
class PositionalEncoding(nn.Module):def __init__(self, d_model, dropout, max_len=5000):super(PositionalEncoding, self).__init__()self.dropout = nn.Dropout(p=dropout)pe = torch.zeros(max_len, d_model)position = torch.arange(0, max_len).unsqueeze(1)div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))pe[:, 0::2] = torch.sin(position * div_term)pe[:, 1::2] = torch.cos(position * div_term)pe = pe.unsqueeze(0)self.register_buffer('pe', pe)def forward(self, x):x = x + Variable(self.pe[:, :x.size(1)], requires_grad=False)return self.dropout(x)d_model = 512
dropout = 0.1
max_len = 60vocab = 1000
x = Variable(torch.LongTensor([[100, 2, 421, 508], [491, 998, 1, 221]]))
emb = Embeddings(d_model, vocab)
embr = emb(x)x = embr
pe = PositionalEncoding(d_model, dropout, max_len)
pe_result = pe(x)# 绘制词汇向量中特征的分布曲线
plt.figure(figsize=(15, 5)) # 创建一张15 x 5大小的画布
pe = PositionalEncoding(20, 0)
y = pe(Variable(torch.zeros(1, 100, 20)))
plt.plot(np.arange(100), y[0, :, 4:8].data.numpy())
plt.legend(["dim %d" % p for p in [4, 5, 6, 7]])# plt.show()# 二、编码部分
# 1.掩码张量函数
def subsequent_mask(size):attn_shape = (1, size, size)subsequent_mask = np.triu(np.ones(attn_shape), k=1).astype('uint8')return torch.from_numpy(1 - subsequent_mask)# 掩码张量的可视化
plt.figure(figsize=(5, 5))
plt.imshow(subsequent_mask(20)[0])# plt.show()# 2.注意力机制
def attention(query, key, value, mask=None, dropout=None):d_k = query.size(-1)scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)if mask is not None:scores = scores.masked_fill(mask == 0, -1e9)p_attn = F.softmax(scores, dim=-1)if dropout is not None:p_attn = dropout(p_attn)return torch.matmul(p_attn, value), p_attnquery = key = value = pe_result
attn, p_attn = attention(query, key, value)
# print("query的注意力表示:", attn) # 2x4x512
# print("注意力张量:", p_attn) # size 2x4x4
#
# print("*****************************************************************")
# 带有mask的输入参数
query = key = value = pe_result
mask = Variable(torch.zeros(2, 4, 4))
attn, p_attn = attention(query, key, value, mask=mask)
# print("query的注意力表示:", attn) # size 2x4x512
# print("注意力张量:", p_attn) # size 2x4x4# 3.多头注意力机制
# 深拷贝
def clones(module, N):return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])class MultiHeadedAttention(nn.Module):def __init__(self, head, embedding_dim, dropout=0.1):super(MultiHeadedAttention, self).__init__()assert embedding_dim % head == 0self.d_k = embedding_dim // headself.head = head# 在多头注意力中,Q,K,V各需要一个,最后拼接的矩阵还需要一个,一共是4个self.linears = clones(nn.Linear(embedding_dim, embedding_dim), 4)self.attn = Noneself.dropout = nn.Dropout(p=dropout)def forward(self, query, key, value, mask=None):if mask is not None:mask = mask.unsqueeze(0)batch_size = query.size(0)query, key, value = [model(x).view(batch_size, -1, self.head, self.d_k).transpose(1, 2)for model, x in zip(self.linears, (query, key, value))]x, self.attn = attention(query, key, value, mask=mask, dropout=self.dropout)x = x.transpose(1, 2).contiguous().view(batch_size, -1, self.head * self.d_k)return self.linears[-1](x)head = 8
embedding_dim = 512
dropout = 0.2
query = value = key = pe_result
mask = Variable(torch.zeros(8, 4, 4))
mha = MultiHeadedAttention(head, embedding_dim, dropout)
mha_result = mha(query, key, value, mask)
# print(mha_result)# 4.前馈全连接层
class PositionwiseFeedForward(nn.Module):def __init__(self, d_model, d_ff, dropout=0.1):super(PositionwiseFeedForward, self).__init__()self.w1 = nn.Linear(d_model, d_ff)self.w2 = nn.Linear(d_ff, d_model)self.dropout = nn.Dropout(dropout)def forward(self, x):return self.w2(self.dropout(F.relu(self.w1(x))))d_model = 512
d_ff = 64
dropout = 0.2
x = mha_result
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
ff_result = ff(x)# 5.规范化层
# 通过LayerNorm实现规范化层的类
class LayerNorm(nn.Module):def __init__(self, features, eps=1e-6):super(LayerNorm, self).__init__()self.a2 = nn.Parameter(torch.ones(features))self.b2 = nn.Parameter(torch.zeros(features))self.eps = epsdef forward(self, x):mean = x.mean(-1, keepdim=True)std = x.std(-1, keepdim=True)return self.a2 * (x - mean) / (std + self.eps) + self.b2features = d_model = 512
eps = 1e-6
x = ff_result
ln = LayerNorm(features, eps)
ln_result = ln(x)# 6.残差连接
class SublayerConnection(nn.Module):def __init__(self, size, dropout=0.1):super(SublayerConnection, self).__init__()self.norm = LayerNorm(size)self.dropout = nn.Dropout(p=dropout)def forward(self, x, sublayer):return x + self.dropout(sublayer(self.norm(x)))size = 512
dropout = 0.2
head = 8
d_model = 512
x = pe_result
mask = Variable(torch.zeros(8, 4, 4))
self_attn = MultiHeadedAttention(head, d_model)
sublayer = lambda x: self_attn(x, x, x, mask)
sc = SublayerConnection(size, dropout)
sc_result = sc(x, sublayer)
print(sc_result)
print(sc_result.shape)
"""
tensor([[[-8.1750e+00, 0.0000e+00, 7.1912e+00, ..., 1.4916e+01,-1.9816e+01, -1.5434e+01],[-1.0226e+01, 2.1595e+00, 6.9106e+00, ..., -1.8356e+01,-2.3092e+01, 1.7498e+00],[-2.8452e+01, -1.0691e-01, 1.9114e-01, ..., 6.0072e+00,2.7866e+01, -2.8865e+01],[ 2.7632e+01, 2.2874e+01, -5.3257e+00, ..., -2.7372e-01,-2.7839e+01, 3.2575e+01]],[[-7.4514e+00, 1.0837e+01, 1.2139e+01, ..., -4.2897e+01,4.9849e+00, -6.1880e+00],[-2.3347e+01, -2.6158e-02, 3.0347e+01, ..., -1.1466e+01,-2.5094e+01, 3.5434e+01],[ 1.8800e+01, -2.8887e+01, -5.4066e+00, ..., -1.9323e+01,3.9585e-01, -1.9223e+01],[-2.0564e+00, 1.3380e+01, 3.6210e+01, ..., -2.6659e+01,-9.5822e+00, 3.5938e+01]]], grad_fn=<AddBackward0>)
torch.Size([2, 4, 512])
"""
⭐9.编码器层
作为编码器的组成单元, 每个编码器层完成一次对输入的特征提取过程
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import math
import matplotlib.pyplot as plt
import numpy as np
import copy# 一、输入部分
# 1.文本嵌入层
class Embeddings(nn.Module):def __init__(self, d_model, vocab):super(Embeddings, self).__init__()self.lut = nn.Embedding(vocab, d_model)self.d_model = d_modeldef forward(self, x):return self.lut(x) * math.sqrt(self.d_model)# 2.位置编码器
class PositionalEncoding(nn.Module):def __init__(self, d_model, dropout, max_len=5000):super(PositionalEncoding, self).__init__()self.dropout = nn.Dropout(p=dropout)pe = torch.zeros(max_len, d_model)position = torch.arange(0, max_len).unsqueeze(1)div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))pe[:, 0::2] = torch.sin(position * div_term)pe[:, 1::2] = torch.cos(position * div_term)pe = pe.unsqueeze(0)self.register_buffer('pe', pe)def forward(self, x):x = x + Variable(self.pe[:, :x.size(1)], requires_grad=False)return self.dropout(x)d_model = 512
dropout = 0.1
max_len = 60vocab = 1000
x = Variable(torch.LongTensor([[100, 2, 421, 508], [491, 998, 1, 221]]))
emb = Embeddings(d_model, vocab)
embr = emb(x)x = embr
pe = PositionalEncoding(d_model, dropout, max_len)
pe_result = pe(x)# 绘制词汇向量中特征的分布曲线
plt.figure(figsize=(15, 5)) # 创建一张15 x 5大小的画布
pe = PositionalEncoding(20, 0)
y = pe(Variable(torch.zeros(1, 100, 20)))
plt.plot(np.arange(100), y[0, :, 4:8].data.numpy())
plt.legend(["dim %d" % p for p in [4, 5, 6, 7]])# plt.show()# 二、编码部分
# 1.掩码张量函数
def subsequent_mask(size):attn_shape = (1, size, size)subsequent_mask = np.triu(np.ones(attn_shape), k=1).astype('uint8')return torch.from_numpy(1 - subsequent_mask)# 掩码张量的可视化
plt.figure(figsize=(5, 5))
plt.imshow(subsequent_mask(20)[0])# plt.show()# 2.注意力机制
def attention(query, key, value, mask=None, dropout=None):d_k = query.size(-1)scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)if mask is not None:scores = scores.masked_fill(mask == 0, -1e9)p_attn = F.softmax(scores, dim=-1)if dropout is not None:p_attn = dropout(p_attn)return torch.matmul(p_attn, value), p_attnquery = key = value = pe_result
attn, p_attn = attention(query, key, value)
# print("query的注意力表示:", attn) # 2x4x512
# print("注意力张量:", p_attn) # size 2x4x4
#
# print("*****************************************************************")
# 带有mask的输入参数
query = key = value = pe_result
mask = Variable(torch.zeros(2, 4, 4))
attn, p_attn = attention(query, key, value, mask=mask)
# print("query的注意力表示:", attn) # size 2x4x512
# print("注意力张量:", p_attn) # size 2x4x4# 3.多头注意力机制
# 深拷贝
def clones(module, N):return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])class MultiHeadedAttention(nn.Module):def __init__(self, head, embedding_dim, dropout=0.1):super(MultiHeadedAttention, self).__init__()assert embedding_dim % head == 0self.d_k = embedding_dim // headself.head = head# 在多头注意力中,Q,K,V各需要一个,最后拼接的矩阵还需要一个,一共是4个self.linears = clones(nn.Linear(embedding_dim, embedding_dim), 4)self.attn = Noneself.dropout = nn.Dropout(p=dropout)def forward(self, query, key, value, mask=None):if mask is not None:mask = mask.unsqueeze(0)batch_size = query.size(0)query, key, value = [model(x).view(batch_size, -1, self.head, self.d_k).transpose(1, 2)for model, x in zip(self.linears, (query, key, value))]x, self.attn = attention(query, key, value, mask=mask, dropout=self.dropout)x = x.transpose(1, 2).contiguous().view(batch_size, -1, self.head * self.d_k)return self.linears[-1](x)head = 8
embedding_dim = 512
dropout = 0.2
query = value = key = pe_result
mask = Variable(torch.zeros(8, 4, 4))
mha = MultiHeadedAttention(head, embedding_dim, dropout)
mha_result = mha(query, key, value, mask)
# print(mha_result)# 4.前馈全连接层
class PositionwiseFeedForward(nn.Module):def __init__(self, d_model, d_ff, dropout=0.1):super(PositionwiseFeedForward, self).__init__()self.w1 = nn.Linear(d_model, d_ff)self.w2 = nn.Linear(d_ff, d_model)self.dropout = nn.Dropout(dropout)def forward(self, x):return self.w2(self.dropout(F.relu(self.w1(x))))d_model = 512
d_ff = 64
dropout = 0.2
x = mha_result
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
ff_result = ff(x)# 5.规范化层
# 通过LayerNorm实现规范化层的类
class LayerNorm(nn.Module):def __init__(self, features, eps=1e-6):super(LayerNorm, self).__init__()self.a2 = nn.Parameter(torch.ones(features))self.b2 = nn.Parameter(torch.zeros(features))self.eps = epsdef forward(self, x):mean = x.mean(-1, keepdim=True)std = x.std(-1, keepdim=True)return self.a2 * (x - mean) / (std + self.eps) + self.b2features = d_model = 512
eps = 1e-6
x = ff_result
ln = LayerNorm(features, eps)
ln_result = ln(x)# 6.残差连接
class SublayerConnection(nn.Module):def __init__(self, size, dropout=0.1):super(SublayerConnection, self).__init__()self.norm = LayerNorm(size)self.dropout = nn.Dropout(p=dropout)def forward(self, x, sublayer):return x + self.dropout(sublayer(self.norm(x)))size = 512
dropout = 0.2
head = 8
d_model = 512
x = pe_result
mask = Variable(torch.zeros(8, 4, 4))
self_attn = MultiHeadedAttention(head, d_model)
sublayer = lambda x: self_attn(x, x, x, mask)
sc = SublayerConnection(size, dropout)
sc_result = sc(x, sublayer)# 7.编码器层
class EncoderLayer(nn.Module):def __init__(self, size, self_attn, feed_forward, dropout):super(EncoderLayer, self).__init__()self.self_attn = self_attnself.feed_forward = feed_forwardself.sublayer = clones(SublayerConnection(size, dropout), 2)self.size = sizedef forward(self, x, mask):x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, mask))return self.sublayer[1](x, self.feed_forward)size = 512
head = 8
d_model = 512
d_ff = 64
x = pe_result
dropout = 0.2
self_attn = MultiHeadedAttention(head, d_model)
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
mask = Variable(torch.zeros(8, 4, 4))
el = EncoderLayer(size, self_attn, ff, dropout)
el_result = el(x, mask)# 8.编码器
class Encoder(nn.Module):def __init__(self, layer, N):super(Encoder, self).__init__()self.layers = clones(layer, N)self.norm = LayerNorm(layer.size)def forward(self, x, mask):for layer in self.layers:x = layer(x, mask)return self.norm(x)size = 512
head = 8
d_model = 512
d_ff = 64
c = copy.deepcopy
attn = MultiHeadedAttention(head, d_model)
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
dropout = 0.2
layer = EncoderLayer(size, c(attn), c(ff), dropout)
N = 8
mask = Variable(torch.zeros(8, 4, 4))
en = Encoder(layer, N)
en_result = en(x, mask)
print(en_result)
print(en_result.shape)
"""
tensor([[[-1.2431e-01, -2.3363e+00, 1.9084e-02, ..., -9.8174e-02,-2.0241e+00, -2.8970e-01],[-3.9608e-01, 5.2420e-02, 2.4076e-02, ..., -1.2182e-01,4.7777e-01, 4.0544e-01],[-6.3494e-01, -2.5631e-03, -1.7992e-01, ..., -5.5367e-02,-4.3454e-02, 1.0005e+00],[-8.5996e-01, 2.6673e+00, 9.2570e-01, ..., 6.2907e-01,3.7063e-01, 6.4456e-01]],[[ 3.3140e-01, 1.4327e+00, 4.1478e-02, ..., 4.5121e-01,-1.7026e+00, 8.7472e-01],[-2.5319e-01, 1.8512e+00, -3.0673e-02, ..., 7.9770e-02,1.1026e-01, -2.9194e-01],[ 1.3375e-01, -1.7779e-01, 2.6414e-03, ..., -5.6526e-01,6.5849e-01, 1.1001e+00],[ 1.5610e+00, -1.4482e+00, 2.5439e-01, ..., -5.4919e-01,-7.2307e-01, 1.4985e+00]]], grad_fn=<AddBackward0>)
torch.Size([2, 4, 512])
"""
三、解码部分
- 由N个解码器层堆叠而成
- 每个解码器层由三个子层连接结构组成
- 第一个子层连接结构包括一个多头自注意力子层和规范化层以及一个残差连接
- 第二个子层连接结构包括一个多头注意力子层和规范化层以及一个残差连接
- 第三个子层连接结构包括一个前馈全连接子层和规范化层以及一个残差连接
⭐10.编码器层
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import math
import matplotlib.pyplot as plt
import numpy as np
import copy# 一、输入部分
# 1.文本嵌入层
class Embeddings(nn.Module):def __init__(self, d_model, vocab):super(Embeddings, self).__init__()self.lut = nn.Embedding(vocab, d_model)self.d_model = d_modeldef forward(self, x):return self.lut(x) * math.sqrt(self.d_model)# 2.位置编码器
class PositionalEncoding(nn.Module):def __init__(self, d_model, dropout, max_len=5000):super(PositionalEncoding, self).__init__()self.dropout = nn.Dropout(p=dropout)pe = torch.zeros(max_len, d_model)position = torch.arange(0, max_len).unsqueeze(1)div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))pe[:, 0::2] = torch.sin(position * div_term)pe[:, 1::2] = torch.cos(position * div_term)pe = pe.unsqueeze(0)self.register_buffer('pe', pe)def forward(self, x):x = x + Variable(self.pe[:, :x.size(1)], requires_grad=False)return self.dropout(x)d_model = 512
dropout = 0.1
max_len = 60vocab = 1000
x = Variable(torch.LongTensor([[100, 2, 421, 508], [491, 998, 1, 221]]))
emb = Embeddings(d_model, vocab)
embr = emb(x)x = embr
pe = PositionalEncoding(d_model, dropout, max_len)
pe_result = pe(x)# 绘制词汇向量中特征的分布曲线
plt.figure(figsize=(15, 5)) # 创建一张15 x 5大小的画布
pe = PositionalEncoding(20, 0)
y = pe(Variable(torch.zeros(1, 100, 20)))
plt.plot(np.arange(100), y[0, :, 4:8].data.numpy())
plt.legend(["dim %d" % p for p in [4, 5, 6, 7]])# plt.show()# 二、编码器部分
# 1.掩码张量函数
def subsequent_mask(size):attn_shape = (1, size, size)subsequent_mask = np.triu(np.ones(attn_shape), k=1).astype('uint8')return torch.from_numpy(1 - subsequent_mask)# 掩码张量的可视化
plt.figure(figsize=(5, 5))
plt.imshow(subsequent_mask(20)[0])# plt.show()# 2.注意力机制
def attention(query, key, value, mask=None, dropout=None):d_k = query.size(-1)scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)if mask is not None:scores = scores.masked_fill(mask == 0, -1e9)p_attn = F.softmax(scores, dim=-1)if dropout is not None:p_attn = dropout(p_attn)return torch.matmul(p_attn, value), p_attnquery = key = value = pe_result
attn, p_attn = attention(query, key, value)
# print("query的注意力表示:", attn) # 2x4x512
# print("注意力张量:", p_attn) # size 2x4x4
#
# print("*****************************************************************")
# 带有mask的输入参数
query = key = value = pe_result
mask = Variable(torch.zeros(2, 4, 4))
attn, p_attn = attention(query, key, value, mask=mask)
# print("query的注意力表示:", attn) # size 2x4x512
# print("注意力张量:", p_attn) # size 2x4x4# 3.多头注意力机制
# 深拷贝
def clones(module, N):return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])class MultiHeadedAttention(nn.Module):def __init__(self, head, embedding_dim, dropout=0.1):super(MultiHeadedAttention, self).__init__()assert embedding_dim % head == 0self.d_k = embedding_dim // headself.head = head# 在多头注意力中,Q,K,V各需要一个,最后拼接的矩阵还需要一个,一共是4个self.linears = clones(nn.Linear(embedding_dim, embedding_dim), 4)self.attn = Noneself.dropout = nn.Dropout(p=dropout)def forward(self, query, key, value, mask=None):if mask is not None:mask = mask.unsqueeze(0)batch_size = query.size(0)query, key, value = [model(x).view(batch_size, -1, self.head, self.d_k).transpose(1, 2)for model, x in zip(self.linears, (query, key, value))]x, self.attn = attention(query, key, value, mask=mask, dropout=self.dropout)x = x.transpose(1, 2).contiguous().view(batch_size, -1, self.head * self.d_k)return self.linears[-1](x)head = 8
embedding_dim = 512
dropout = 0.2
query = value = key = pe_result
mask = Variable(torch.zeros(8, 4, 4))
mha = MultiHeadedAttention(head, embedding_dim, dropout)
mha_result = mha(query, key, value, mask)
# print(mha_result)# 4.前馈全连接层
class PositionwiseFeedForward(nn.Module):def __init__(self, d_model, d_ff, dropout=0.1):super(PositionwiseFeedForward, self).__init__()self.w1 = nn.Linear(d_model, d_ff)self.w2 = nn.Linear(d_ff, d_model)self.dropout = nn.Dropout(dropout)def forward(self, x):return self.w2(self.dropout(F.relu(self.w1(x))))d_model = 512
d_ff = 64
dropout = 0.2
x = mha_result
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
ff_result = ff(x)# 5.规范化层
# 通过LayerNorm实现规范化层的类
class LayerNorm(nn.Module):def __init__(self, features, eps=1e-6):super(LayerNorm, self).__init__()self.a2 = nn.Parameter(torch.ones(features))self.b2 = nn.Parameter(torch.zeros(features))self.eps = epsdef forward(self, x):mean = x.mean(-1, keepdim=True)std = x.std(-1, keepdim=True)return self.a2 * (x - mean) / (std + self.eps) + self.b2features = d_model = 512
eps = 1e-6
x = ff_result
ln = LayerNorm(features, eps)
ln_result = ln(x)# 6.残差连接
class SublayerConnection(nn.Module):def __init__(self, size, dropout=0.1):super(SublayerConnection, self).__init__()self.norm = LayerNorm(size)self.dropout = nn.Dropout(p=dropout)def forward(self, x, sublayer):return x + self.dropout(sublayer(self.norm(x)))size = 512
dropout = 0.2
head = 8
d_model = 512
x = pe_result
mask = Variable(torch.zeros(8, 4, 4))
self_attn = MultiHeadedAttention(head, d_model)
sublayer = lambda x: self_attn(x, x, x, mask)
sc = SublayerConnection(size, dropout)
sc_result = sc(x, sublayer)# 7.编码器层
class EncoderLayer(nn.Module):def __init__(self, size, self_attn, feed_forward, dropout):super(EncoderLayer, self).__init__()self.self_attn = self_attnself.feed_forward = feed_forwardself.sublayer = clones(SublayerConnection(size, dropout), 2)self.size = sizedef forward(self, x, mask):x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, mask))return self.sublayer[1](x, self.feed_forward)size = 512
head = 8
d_model = 512
d_ff = 64
x = pe_result
dropout = 0.2
self_attn = MultiHeadedAttention(head, d_model)
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
mask = Variable(torch.zeros(8, 4, 4))
el = EncoderLayer(size, self_attn, ff, dropout)
el_result = el(x, mask)# 8.编码器
class Encoder(nn.Module):def __init__(self, layer, N):super(Encoder, self).__init__()self.layers = clones(layer, N)self.norm = LayerNorm(layer.size)def forward(self, x, mask):for layer in self.layers:x = layer(x, mask)return self.norm(x)size = 512
head = 8
d_model = 512
d_ff = 64
c = copy.deepcopy
attn = MultiHeadedAttention(head, d_model)
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
dropout = 0.2
layer = EncoderLayer(size, c(attn), c(ff), dropout)
N = 8
mask = Variable(torch.zeros(8, 4, 4))
en = Encoder(layer, N)
en_result = en(x, mask)
print(en_result)
print(en_result.shape)
"""
tensor([[[-1.2431e-01, -2.3363e+00, 1.9084e-02, ..., -9.8174e-02,-2.0241e+00, -2.8970e-01],[-3.9608e-01, 5.2420e-02, 2.4076e-02, ..., -1.2182e-01,4.7777e-01, 4.0544e-01],[-6.3494e-01, -2.5631e-03, -1.7992e-01, ..., -5.5367e-02,-4.3454e-02, 1.0005e+00],[-8.5996e-01, 2.6673e+00, 9.2570e-01, ..., 6.2907e-01,3.7063e-01, 6.4456e-01]],[[ 3.3140e-01, 1.4327e+00, 4.1478e-02, ..., 4.5121e-01,-1.7026e+00, 8.7472e-01],[-2.5319e-01, 1.8512e+00, -3.0673e-02, ..., 7.9770e-02,1.1026e-01, -2.9194e-01],[ 1.3375e-01, -1.7779e-01, 2.6414e-03, ..., -5.6526e-01,6.5849e-01, 1.1001e+00],[ 1.5610e+00, -1.4482e+00, 2.5439e-01, ..., -5.4919e-01,-7.2307e-01, 1.4985e+00]]], grad_fn=<AddBackward0>)
torch.Size([2, 4, 512])
"""# 三、解码器部分
# 1.解码器层
class DecoderLayer(nn.Module):def __init__(self, size, self_attn, src_attn, feed_forward, dropout):super(DecoderLayer, self).__init__()self.size = sizeself.self_attn = self_attnself.src_attn = src_attnself.feed_forward = feed_forwardself.sublayer = clones(SublayerConnection(size, dropout), 3)def forward(self, x, memory, source_mask, target_mask):m = memoryx = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, target_mask))x = self.sublayer[1](x, lambda x: self.src_attn(x, m, m, source_mask))return self.sublayer[2](x, self.feed_forward)head = 8
size = 512
d_model = 512
d_ff = 64
dropout = 0.2
self_attn = src_attn = MultiHeadedAttention(head, d_model, dropout)
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
x = pe_result
memory = en_result
mask = Variable(torch.zeros(8, 4, 4))
source_mask = target_mask = mask
dl = DecoderLayer(size, self_attn, src_attn, ff, dropout)
dl_result = dl(x, memory, source_mask, target_mask)
print(dl_result)
print(dl_result.shape)
"""
tensor([[[ 1.9604e+00, 3.9288e+01, -5.2422e+01, ..., 2.1041e-01,-5.5063e+01, 1.5233e-01],[ 1.0135e-01, -3.7779e-01, 6.5491e+01, ..., 2.8062e+01,-3.7780e+01, -3.9577e+01],[ 1.9526e+01, -2.5741e+01, 2.6926e-01, ..., -1.5316e+01,1.4543e+00, 2.7714e+00],[-2.1528e+01, 2.0141e+01, 2.1999e+01, ..., 2.2099e+00,-1.7267e+01, -1.6687e+01]],[[ 6.7259e+00, -2.6918e+01, 1.1807e+01, ..., -3.6453e+01,-2.9231e+01, 1.1288e+01],[ 7.7484e+01, -5.0572e-01, -1.3096e+01, ..., 3.6302e-01,1.9907e+01, -1.2160e+00],[ 2.6703e+01, 4.4737e+01, -3.1590e+01, ..., 4.1540e-03,5.2587e+00, 5.2382e+00],[ 4.7435e+01, -3.7599e-01, 5.0898e+01, ..., 5.6361e+00,3.5891e+01, 1.5697e+01]]], grad_fn=<AddBackward0>)
torch.Size([2, 4, 512])
"""
⭐11.编码器
根据编码器的结果以及上一次预测的结果, 对下一次可能出现的'值'进行特征表示
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import math
import matplotlib.pyplot as plt
import numpy as np
import copy# 一、输入部分
# 1.文本嵌入层
class Embeddings(nn.Module):def __init__(self, d_model, vocab):super(Embeddings, self).__init__()self.lut = nn.Embedding(vocab, d_model)self.d_model = d_modeldef forward(self, x):return self.lut(x) * math.sqrt(self.d_model)# 2.位置编码器
class PositionalEncoding(nn.Module):def __init__(self, d_model, dropout, max_len=5000):super(PositionalEncoding, self).__init__()self.dropout = nn.Dropout(p=dropout)pe = torch.zeros(max_len, d_model)position = torch.arange(0, max_len).unsqueeze(1)div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))pe[:, 0::2] = torch.sin(position * div_term)pe[:, 1::2] = torch.cos(position * div_term)pe = pe.unsqueeze(0)self.register_buffer('pe', pe)def forward(self, x):x = x + Variable(self.pe[:, :x.size(1)], requires_grad=False)return self.dropout(x)d_model = 512
dropout = 0.1
max_len = 60vocab = 1000
x = Variable(torch.LongTensor([[100, 2, 421, 508], [491, 998, 1, 221]]))
emb = Embeddings(d_model, vocab)
embr = emb(x)x = embr
pe = PositionalEncoding(d_model, dropout, max_len)
pe_result = pe(x)# 绘制词汇向量中特征的分布曲线
plt.figure(figsize=(15, 5)) # 创建一张15 x 5大小的画布
pe = PositionalEncoding(20, 0)
y = pe(Variable(torch.zeros(1, 100, 20)))
plt.plot(np.arange(100), y[0, :, 4:8].data.numpy())
plt.legend(["dim %d" % p for p in [4, 5, 6, 7]])# plt.show()# 二、编码器部分
# 1.掩码张量函数
def subsequent_mask(size):attn_shape = (1, size, size)subsequent_mask = np.triu(np.ones(attn_shape), k=1).astype('uint8')return torch.from_numpy(1 - subsequent_mask)# 掩码张量的可视化
plt.figure(figsize=(5, 5))
plt.imshow(subsequent_mask(20)[0])# plt.show()# 2.注意力机制
def attention(query, key, value, mask=None, dropout=None):d_k = query.size(-1)scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)if mask is not None:scores = scores.masked_fill(mask == 0, -1e9)p_attn = F.softmax(scores, dim=-1)if dropout is not None:p_attn = dropout(p_attn)return torch.matmul(p_attn, value), p_attnquery = key = value = pe_result
attn, p_attn = attention(query, key, value)
# print("query的注意力表示:", attn) # 2x4x512
# print("注意力张量:", p_attn) # size 2x4x4
#
# print("*****************************************************************")
# 带有mask的输入参数
query = key = value = pe_result
mask = Variable(torch.zeros(2, 4, 4))
attn, p_attn = attention(query, key, value, mask=mask)
# print("query的注意力表示:", attn) # size 2x4x512
# print("注意力张量:", p_attn) # size 2x4x4# 3.多头注意力机制
# 深拷贝
def clones(module, N):return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])class MultiHeadedAttention(nn.Module):def __init__(self, head, embedding_dim, dropout=0.1):super(MultiHeadedAttention, self).__init__()assert embedding_dim % head == 0self.d_k = embedding_dim // headself.head = head# 在多头注意力中,Q,K,V各需要一个,最后拼接的矩阵还需要一个,一共是4个self.linears = clones(nn.Linear(embedding_dim, embedding_dim), 4)self.attn = Noneself.dropout = nn.Dropout(p=dropout)def forward(self, query, key, value, mask=None):if mask is not None:mask = mask.unsqueeze(0)batch_size = query.size(0)query, key, value = [model(x).view(batch_size, -1, self.head, self.d_k).transpose(1, 2)for model, x in zip(self.linears, (query, key, value))]x, self.attn = attention(query, key, value, mask=mask, dropout=self.dropout)x = x.transpose(1, 2).contiguous().view(batch_size, -1, self.head * self.d_k)return self.linears[-1](x)head = 8
embedding_dim = 512
dropout = 0.2
query = value = key = pe_result
mask = Variable(torch.zeros(8, 4, 4))
mha = MultiHeadedAttention(head, embedding_dim, dropout)
mha_result = mha(query, key, value, mask)
# print(mha_result)# 4.前馈全连接层
class PositionwiseFeedForward(nn.Module):def __init__(self, d_model, d_ff, dropout=0.1):super(PositionwiseFeedForward, self).__init__()self.w1 = nn.Linear(d_model, d_ff)self.w2 = nn.Linear(d_ff, d_model)self.dropout = nn.Dropout(dropout)def forward(self, x):return self.w2(self.dropout(F.relu(self.w1(x))))d_model = 512
d_ff = 64
dropout = 0.2
x = mha_result
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
ff_result = ff(x)# 5.规范化层
# 通过LayerNorm实现规范化层的类
class LayerNorm(nn.Module):def __init__(self, features, eps=1e-6):super(LayerNorm, self).__init__()self.a2 = nn.Parameter(torch.ones(features))self.b2 = nn.Parameter(torch.zeros(features))self.eps = epsdef forward(self, x):mean = x.mean(-1, keepdim=True)std = x.std(-1, keepdim=True)return self.a2 * (x - mean) / (std + self.eps) + self.b2features = d_model = 512
eps = 1e-6
x = ff_result
ln = LayerNorm(features, eps)
ln_result = ln(x)# 6.残差连接
class SublayerConnection(nn.Module):def __init__(self, size, dropout=0.1):super(SublayerConnection, self).__init__()self.norm = LayerNorm(size)self.dropout = nn.Dropout(p=dropout)def forward(self, x, sublayer):return x + self.dropout(sublayer(self.norm(x)))size = 512
dropout = 0.2
head = 8
d_model = 512
x = pe_result
mask = Variable(torch.zeros(8, 4, 4))
self_attn = MultiHeadedAttention(head, d_model)
sublayer = lambda x: self_attn(x, x, x, mask)
sc = SublayerConnection(size, dropout)
sc_result = sc(x, sublayer)# 7.编码器层
class EncoderLayer(nn.Module):def __init__(self, size, self_attn, feed_forward, dropout):super(EncoderLayer, self).__init__()self.self_attn = self_attnself.feed_forward = feed_forwardself.sublayer = clones(SublayerConnection(size, dropout), 2)self.size = sizedef forward(self, x, mask):x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, mask))return self.sublayer[1](x, self.feed_forward)size = 512
head = 8
d_model = 512
d_ff = 64
x = pe_result
dropout = 0.2
self_attn = MultiHeadedAttention(head, d_model)
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
mask = Variable(torch.zeros(8, 4, 4))
el = EncoderLayer(size, self_attn, ff, dropout)
el_result = el(x, mask)# 8.编码器
class Encoder(nn.Module):def __init__(self, layer, N):super(Encoder, self).__init__()self.layers = clones(layer, N)self.norm = LayerNorm(layer.size)def forward(self, x, mask):for layer in self.layers:x = layer(x, mask)return self.norm(x)size = 512
head = 8
d_model = 512
d_ff = 64
c = copy.deepcopy
attn = MultiHeadedAttention(head, d_model)
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
dropout = 0.2
layer = EncoderLayer(size, c(attn), c(ff), dropout)
N = 8
mask = Variable(torch.zeros(8, 4, 4))
en = Encoder(layer, N)
en_result = en(x, mask)
# print(en_result)
# print(en_result.shape)
"""
tensor([[[-1.2431e-01, -2.3363e+00, 1.9084e-02, ..., -9.8174e-02,-2.0241e+00, -2.8970e-01],[-3.9608e-01, 5.2420e-02, 2.4076e-02, ..., -1.2182e-01,4.7777e-01, 4.0544e-01],[-6.3494e-01, -2.5631e-03, -1.7992e-01, ..., -5.5367e-02,-4.3454e-02, 1.0005e+00],[-8.5996e-01, 2.6673e+00, 9.2570e-01, ..., 6.2907e-01,3.7063e-01, 6.4456e-01]],[[ 3.3140e-01, 1.4327e+00, 4.1478e-02, ..., 4.5121e-01,-1.7026e+00, 8.7472e-01],[-2.5319e-01, 1.8512e+00, -3.0673e-02, ..., 7.9770e-02,1.1026e-01, -2.9194e-01],[ 1.3375e-01, -1.7779e-01, 2.6414e-03, ..., -5.6526e-01,6.5849e-01, 1.1001e+00],[ 1.5610e+00, -1.4482e+00, 2.5439e-01, ..., -5.4919e-01,-7.2307e-01, 1.4985e+00]]], grad_fn=<AddBackward0>)
torch.Size([2, 4, 512])
"""# 三、解码器部分
# 1.解码器层
class DecoderLayer(nn.Module):def __init__(self, size, self_attn, src_attn, feed_forward, dropout):super(DecoderLayer, self).__init__()self.size = sizeself.self_attn = self_attnself.src_attn = src_attnself.feed_forward = feed_forwardself.sublayer = clones(SublayerConnection(size, dropout), 3)def forward(self, x, memory, source_mask, target_mask):m = memoryx = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, target_mask))x = self.sublayer[1](x, lambda x: self.src_attn(x, m, m, source_mask))return self.sublayer[2](x, self.feed_forward)head = 8
size = 512
d_model = 512
d_ff = 64
dropout = 0.2
self_attn = src_attn = MultiHeadedAttention(head, d_model, dropout)
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
x = pe_result
memory = en_result
mask = Variable(torch.zeros(8, 4, 4))
source_mask = target_mask = mask
dl = DecoderLayer(size, self_attn, src_attn, ff, dropout)
dl_result = dl(x, memory, source_mask, target_mask)
# print(dl_result)
# print(dl_result.shape)
"""
tensor([[[ 1.9604e+00, 3.9288e+01, -5.2422e+01, ..., 2.1041e-01,-5.5063e+01, 1.5233e-01],[ 1.0135e-01, -3.7779e-01, 6.5491e+01, ..., 2.8062e+01,-3.7780e+01, -3.9577e+01],[ 1.9526e+01, -2.5741e+01, 2.6926e-01, ..., -1.5316e+01,1.4543e+00, 2.7714e+00],[-2.1528e+01, 2.0141e+01, 2.1999e+01, ..., 2.2099e+00,-1.7267e+01, -1.6687e+01]],[[ 6.7259e+00, -2.6918e+01, 1.1807e+01, ..., -3.6453e+01,-2.9231e+01, 1.1288e+01],[ 7.7484e+01, -5.0572e-01, -1.3096e+01, ..., 3.6302e-01,1.9907e+01, -1.2160e+00],[ 2.6703e+01, 4.4737e+01, -3.1590e+01, ..., 4.1540e-03,5.2587e+00, 5.2382e+00],[ 4.7435e+01, -3.7599e-01, 5.0898e+01, ..., 5.6361e+00,3.5891e+01, 1.5697e+01]]], grad_fn=<AddBackward0>)
torch.Size([2, 4, 512])
"""# 2.解码器
class Decoder(nn.Module):def __init__(self, layer, N):super(Decoder, self).__init__()self.layers = clones(layer, N)self.norm = LayerNorm(layer.size)def forward(self, x, memory, source_mask, target_mask):for layer in self.layers:x = layer(x, memory, source_mask, target_mask)return self.norm(x)size = 512
d_model = 512
head = 8
d_ff = 64
dropout = 0.2
c = copy.deepcopy
attn = MultiHeadedAttention(head, d_model)
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
layer = DecoderLayer(d_model, c(attn), c(attn), c(ff), dropout)
N = 8
x = pe_result
memory = en_result
mask = Variable(torch.zeros(8, 4, 4))
source_mask = target_mask = mask
de = Decoder(layer, N)
de_result = de(x, memory, source_mask, target_mask)
print(de_result)
print(de_result.shape)
"""
tensor([[[ 0.2436, 0.8310, 1.1406, ..., 1.2474, 1.0660, -0.7125],[ 0.8292, -0.1330, -0.2391, ..., -1.0578, -0.8154, 1.4003],[ 0.8909, 0.1255, 0.9115, ..., 0.0775, 0.0753, 0.3909],[-1.9148, 0.2801, 1.7520, ..., -0.7988, -2.0647, -0.5999]],[[ 0.9265, 0.5207, -1.8971, ..., -2.2877, 0.1123, 0.2563],[ 0.8011, 1.0716, -0.0627, ..., -1.2644, 1.6997, 0.8083],[-0.6971, -1.6886, -0.7169, ..., 1.0697, -1.0679, 0.8851],[-0.9620, -0.2029, 1.2966, ..., -0.3927, 1.6059, 1.6047]]],grad_fn=<AddBackward0>)
torch.Size([2, 4, 512])
"""
四、输出部分
- 线性层
- softmax层
nn.Linear演示
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import math
import matplotlib.pyplot as plt
import numpy as np
import copym = nn.Linear(20, 30)
input = torch.randn(128, 20)
output = m(input)
print(output.size())# torch.Size([128, 30])
⭐12.线性层和softmax层
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import math
import matplotlib.pyplot as plt
import numpy as np
import copy# 一、输入部分
# 1.文本嵌入层
class Embeddings(nn.Module):def __init__(self, d_model, vocab):super(Embeddings, self).__init__()self.lut = nn.Embedding(vocab, d_model)self.d_model = d_modeldef forward(self, x):return self.lut(x) * math.sqrt(self.d_model)# 2.位置编码器
class PositionalEncoding(nn.Module):def __init__(self, d_model, dropout, max_len=5000):super(PositionalEncoding, self).__init__()self.dropout = nn.Dropout(p=dropout)pe = torch.zeros(max_len, d_model)position = torch.arange(0, max_len).unsqueeze(1)div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))pe[:, 0::2] = torch.sin(position * div_term)pe[:, 1::2] = torch.cos(position * div_term)pe = pe.unsqueeze(0)self.register_buffer('pe', pe)def forward(self, x):x = x + Variable(self.pe[:, :x.size(1)], requires_grad=False)return self.dropout(x)d_model = 512
dropout = 0.1
max_len = 60vocab = 1000
x = Variable(torch.LongTensor([[100, 2, 421, 508], [491, 998, 1, 221]]))
emb = Embeddings(d_model, vocab)
embr = emb(x)x = embr
pe = PositionalEncoding(d_model, dropout, max_len)
pe_result = pe(x)# 绘制词汇向量中特征的分布曲线
plt.figure(figsize=(15, 5)) # 创建一张15 x 5大小的画布
pe = PositionalEncoding(20, 0)
y = pe(Variable(torch.zeros(1, 100, 20)))
plt.plot(np.arange(100), y[0, :, 4:8].data.numpy())
plt.legend(["dim %d" % p for p in [4, 5, 6, 7]])# plt.show()# 二、编码器部分
# 1.掩码张量函数
def subsequent_mask(size):attn_shape = (1, size, size)subsequent_mask = np.triu(np.ones(attn_shape), k=1).astype('uint8')return torch.from_numpy(1 - subsequent_mask)# 掩码张量的可视化
plt.figure(figsize=(5, 5))
plt.imshow(subsequent_mask(20)[0])# plt.show()# 2.注意力机制
def attention(query, key, value, mask=None, dropout=None):d_k = query.size(-1)scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)if mask is not None:scores = scores.masked_fill(mask == 0, -1e9)p_attn = F.softmax(scores, dim=-1)if dropout is not None:p_attn = dropout(p_attn)return torch.matmul(p_attn, value), p_attnquery = key = value = pe_result
attn, p_attn = attention(query, key, value)
# print("query的注意力表示:", attn) # 2x4x512
# print("注意力张量:", p_attn) # size 2x4x4
#
# print("*****************************************************************")
# 带有mask的输入参数
query = key = value = pe_result
mask = Variable(torch.zeros(2, 4, 4))
attn, p_attn = attention(query, key, value, mask=mask)
# print("query的注意力表示:", attn) # size 2x4x512
# print("注意力张量:", p_attn) # size 2x4x4# 3.多头注意力机制
# 深拷贝
def clones(module, N):return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])class MultiHeadedAttention(nn.Module):def __init__(self, head, embedding_dim, dropout=0.1):super(MultiHeadedAttention, self).__init__()assert embedding_dim % head == 0self.d_k = embedding_dim // headself.head = head# 在多头注意力中,Q,K,V各需要一个,最后拼接的矩阵还需要一个,一共是4个self.linears = clones(nn.Linear(embedding_dim, embedding_dim), 4)self.attn = Noneself.dropout = nn.Dropout(p=dropout)def forward(self, query, key, value, mask=None):if mask is not None:mask = mask.unsqueeze(0)batch_size = query.size(0)query, key, value = [model(x).view(batch_size, -1, self.head, self.d_k).transpose(1, 2)for model, x in zip(self.linears, (query, key, value))]x, self.attn = attention(query, key, value, mask=mask, dropout=self.dropout)x = x.transpose(1, 2).contiguous().view(batch_size, -1, self.head * self.d_k)return self.linears[-1](x)head = 8
embedding_dim = 512
dropout = 0.2
query = value = key = pe_result
mask = Variable(torch.zeros(8, 4, 4))
mha = MultiHeadedAttention(head, embedding_dim, dropout)
mha_result = mha(query, key, value, mask)
# print(mha_result)# 4.前馈全连接层
class PositionwiseFeedForward(nn.Module):def __init__(self, d_model, d_ff, dropout=0.1):super(PositionwiseFeedForward, self).__init__()self.w1 = nn.Linear(d_model, d_ff)self.w2 = nn.Linear(d_ff, d_model)self.dropout = nn.Dropout(dropout)def forward(self, x):return self.w2(self.dropout(F.relu(self.w1(x))))d_model = 512
d_ff = 64
dropout = 0.2
x = mha_result
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
ff_result = ff(x)# 5.规范化层
# 通过LayerNorm实现规范化层的类
class LayerNorm(nn.Module):def __init__(self, features, eps=1e-6):super(LayerNorm, self).__init__()self.a2 = nn.Parameter(torch.ones(features))self.b2 = nn.Parameter(torch.zeros(features))self.eps = epsdef forward(self, x):mean = x.mean(-1, keepdim=True)std = x.std(-1, keepdim=True)return self.a2 * (x - mean) / (std + self.eps) + self.b2features = d_model = 512
eps = 1e-6
x = ff_result
ln = LayerNorm(features, eps)
ln_result = ln(x)# 6.残差连接
class SublayerConnection(nn.Module):def __init__(self, size, dropout=0.1):super(SublayerConnection, self).__init__()self.norm = LayerNorm(size)self.dropout = nn.Dropout(p=dropout)def forward(self, x, sublayer):return x + self.dropout(sublayer(self.norm(x)))size = 512
dropout = 0.2
head = 8
d_model = 512
x = pe_result
mask = Variable(torch.zeros(8, 4, 4))
self_attn = MultiHeadedAttention(head, d_model)
sublayer = lambda x: self_attn(x, x, x, mask)
sc = SublayerConnection(size, dropout)
sc_result = sc(x, sublayer)# 7.编码器层
class EncoderLayer(nn.Module):def __init__(self, size, self_attn, feed_forward, dropout):super(EncoderLayer, self).__init__()self.self_attn = self_attnself.feed_forward = feed_forwardself.sublayer = clones(SublayerConnection(size, dropout), 2)self.size = sizedef forward(self, x, mask):x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, mask))return self.sublayer[1](x, self.feed_forward)size = 512
head = 8
d_model = 512
d_ff = 64
x = pe_result
dropout = 0.2
self_attn = MultiHeadedAttention(head, d_model)
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
mask = Variable(torch.zeros(8, 4, 4))
el = EncoderLayer(size, self_attn, ff, dropout)
el_result = el(x, mask)# 8.编码器
class Encoder(nn.Module):def __init__(self, layer, N):super(Encoder, self).__init__()self.layers = clones(layer, N)self.norm = LayerNorm(layer.size)def forward(self, x, mask):for layer in self.layers:x = layer(x, mask)return self.norm(x)size = 512
head = 8
d_model = 512
d_ff = 64
c = copy.deepcopy
attn = MultiHeadedAttention(head, d_model)
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
dropout = 0.2
layer = EncoderLayer(size, c(attn), c(ff), dropout)
N = 8
mask = Variable(torch.zeros(8, 4, 4))
en = Encoder(layer, N)
en_result = en(x, mask)
# print(en_result)
# print(en_result.shape)
"""
tensor([[[-1.2431e-01, -2.3363e+00, 1.9084e-02, ..., -9.8174e-02,-2.0241e+00, -2.8970e-01],[-3.9608e-01, 5.2420e-02, 2.4076e-02, ..., -1.2182e-01,4.7777e-01, 4.0544e-01],[-6.3494e-01, -2.5631e-03, -1.7992e-01, ..., -5.5367e-02,-4.3454e-02, 1.0005e+00],[-8.5996e-01, 2.6673e+00, 9.2570e-01, ..., 6.2907e-01,3.7063e-01, 6.4456e-01]],[[ 3.3140e-01, 1.4327e+00, 4.1478e-02, ..., 4.5121e-01,-1.7026e+00, 8.7472e-01],[-2.5319e-01, 1.8512e+00, -3.0673e-02, ..., 7.9770e-02,1.1026e-01, -2.9194e-01],[ 1.3375e-01, -1.7779e-01, 2.6414e-03, ..., -5.6526e-01,6.5849e-01, 1.1001e+00],[ 1.5610e+00, -1.4482e+00, 2.5439e-01, ..., -5.4919e-01,-7.2307e-01, 1.4985e+00]]], grad_fn=<AddBackward0>)
torch.Size([2, 4, 512])
"""# 三、解码器部分
# 1.解码器层
class DecoderLayer(nn.Module):def __init__(self, size, self_attn, src_attn, feed_forward, dropout):super(DecoderLayer, self).__init__()self.size = sizeself.self_attn = self_attnself.src_attn = src_attnself.feed_forward = feed_forwardself.sublayer = clones(SublayerConnection(size, dropout), 3)def forward(self, x, memory, source_mask, target_mask):m = memoryx = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, target_mask))x = self.sublayer[1](x, lambda x: self.src_attn(x, m, m, source_mask))return self.sublayer[2](x, self.feed_forward)head = 8
size = 512
d_model = 512
d_ff = 64
dropout = 0.2
self_attn = src_attn = MultiHeadedAttention(head, d_model, dropout)
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
x = pe_result
memory = en_result
mask = Variable(torch.zeros(8, 4, 4))
source_mask = target_mask = mask
dl = DecoderLayer(size, self_attn, src_attn, ff, dropout)
dl_result = dl(x, memory, source_mask, target_mask)
# print(dl_result)
# print(dl_result.shape)
"""
tensor([[[ 1.9604e+00, 3.9288e+01, -5.2422e+01, ..., 2.1041e-01,-5.5063e+01, 1.5233e-01],[ 1.0135e-01, -3.7779e-01, 6.5491e+01, ..., 2.8062e+01,-3.7780e+01, -3.9577e+01],[ 1.9526e+01, -2.5741e+01, 2.6926e-01, ..., -1.5316e+01,1.4543e+00, 2.7714e+00],[-2.1528e+01, 2.0141e+01, 2.1999e+01, ..., 2.2099e+00,-1.7267e+01, -1.6687e+01]],[[ 6.7259e+00, -2.6918e+01, 1.1807e+01, ..., -3.6453e+01,-2.9231e+01, 1.1288e+01],[ 7.7484e+01, -5.0572e-01, -1.3096e+01, ..., 3.6302e-01,1.9907e+01, -1.2160e+00],[ 2.6703e+01, 4.4737e+01, -3.1590e+01, ..., 4.1540e-03,5.2587e+00, 5.2382e+00],[ 4.7435e+01, -3.7599e-01, 5.0898e+01, ..., 5.6361e+00,3.5891e+01, 1.5697e+01]]], grad_fn=<AddBackward0>)
torch.Size([2, 4, 512])
"""# 2.解码器
class Decoder(nn.Module):def __init__(self, layer, N):super(Decoder, self).__init__()self.layers = clones(layer, N)self.norm = LayerNorm(layer.size)def forward(self, x, memory, source_mask, target_mask):for layer in self.layers:x = layer(x, memory, source_mask, target_mask)return self.norm(x)size = 512
d_model = 512
head = 8
d_ff = 64
dropout = 0.2
c = copy.deepcopy
attn = MultiHeadedAttention(head, d_model)
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
layer = DecoderLayer(d_model, c(attn), c(attn), c(ff), dropout)
N = 8
x = pe_result
memory = en_result
mask = Variable(torch.zeros(8, 4, 4))
source_mask = target_mask = mask
de = Decoder(layer, N)
de_result = de(x, memory, source_mask, target_mask)
# print(de_result)
# print(de_result.shape)
"""
tensor([[[ 0.2436, 0.8310, 1.1406, ..., 1.2474, 1.0660, -0.7125],[ 0.8292, -0.1330, -0.2391, ..., -1.0578, -0.8154, 1.4003],[ 0.8909, 0.1255, 0.9115, ..., 0.0775, 0.0753, 0.3909],[-1.9148, 0.2801, 1.7520, ..., -0.7988, -2.0647, -0.5999]],[[ 0.9265, 0.5207, -1.8971, ..., -2.2877, 0.1123, 0.2563],[ 0.8011, 1.0716, -0.0627, ..., -1.2644, 1.6997, 0.8083],[-0.6971, -1.6886, -0.7169, ..., 1.0697, -1.0679, 0.8851],[-0.9620, -0.2029, 1.2966, ..., -0.3927, 1.6059, 1.6047]]],grad_fn=<AddBackward0>)
torch.Size([2, 4, 512])
"""# 四、输出部分
# 线性层和softmax层一起实现, 因为二者的共同目标是生成最后的结构
# 因此把类的名字叫做Generator
class Generator(nn.Module):def __init__(self, d_model, vocab_size):super(Generator, self).__init__()self.project = nn.Linear(d_model, vocab_size)def forward(self, x):return F.log_softmax(self.project(x), dim=-1)d_model = 512
vocab_size = 1000
x = de_result
gen = Generator(d_model, vocab_size)
gen_result = gen(x)
print(gen_result)
print(gen_result.shape)
"""
tensor([[[-7.0677, -6.3155, -6.8694, ..., -6.8623, -6.4482, -7.2010],[-7.8073, -7.6669, -6.3424, ..., -7.0006, -6.8322, -6.1138],[-9.0578, -7.1061, -6.2095, ..., -7.3074, -7.2882, -7.3483],[-8.1861, -7.2428, -6.7725, ..., -6.8366, -7.3286, -6.8935]],[[-7.3694, -6.7055, -6.8839, ..., -6.7879, -6.8398, -7.0582],[-6.5527, -6.8104, -7.6633, ..., -8.0519, -7.0640, -6.3101],[-8.4895, -7.9180, -6.4888, ..., -6.7811, -5.6739, -6.5447],[-6.2718, -7.3904, -7.8301, ..., -6.6355, -5.7487, -8.1378]]],grad_fn=<LogSoftmaxBackward0>)
torch.Size([2, 4, 1000])
"""
五、完整代码
13.编码器-解码器
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import math
import matplotlib.pyplot as plt
import numpy as np
import copy# 一、输入部分
# 1.文本嵌入层
class Embeddings(nn.Module):def __init__(self, d_model, vocab):super(Embeddings, self).__init__()self.lut = nn.Embedding(vocab, d_model)self.d_model = d_modeldef forward(self, x):return self.lut(x) * math.sqrt(self.d_model)# 2.位置编码器
class PositionalEncoding(nn.Module):def __init__(self, d_model, dropout, max_len=5000):super(PositionalEncoding, self).__init__()self.dropout = nn.Dropout(p=dropout)pe = torch.zeros(max_len, d_model)position = torch.arange(0, max_len).unsqueeze(1)div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))pe[:, 0::2] = torch.sin(position * div_term)pe[:, 1::2] = torch.cos(position * div_term)pe = pe.unsqueeze(0)self.register_buffer('pe', pe)def forward(self, x):x = x + Variable(self.pe[:, :x.size(1)], requires_grad=False)return self.dropout(x)d_model = 512
dropout = 0.1
max_len = 60vocab = 1000
x = Variable(torch.LongTensor([[100, 2, 421, 508], [491, 998, 1, 221]]))
emb = Embeddings(d_model, vocab)
embr = emb(x)x = embr
pe = PositionalEncoding(d_model, dropout, max_len)
pe_result = pe(x)# 绘制词汇向量中特征的分布曲线
plt.figure(figsize=(15, 5)) # 创建一张15 x 5大小的画布
pe = PositionalEncoding(20, 0)
y = pe(Variable(torch.zeros(1, 100, 20)))
plt.plot(np.arange(100), y[0, :, 4:8].data.numpy())
plt.legend(["dim %d" % p for p in [4, 5, 6, 7]])# plt.show()# 二、编码器部分
# 1.掩码张量函数
def subsequent_mask(size):attn_shape = (1, size, size)subsequent_mask = np.triu(np.ones(attn_shape), k=1).astype('uint8')return torch.from_numpy(1 - subsequent_mask)# 掩码张量的可视化
plt.figure(figsize=(5, 5))
plt.imshow(subsequent_mask(20)[0])# plt.show()# 2.注意力机制
def attention(query, key, value, mask=None, dropout=None):d_k = query.size(-1)scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)if mask is not None:scores = scores.masked_fill(mask == 0, -1e9)p_attn = F.softmax(scores, dim=-1)if dropout is not None:p_attn = dropout(p_attn)return torch.matmul(p_attn, value), p_attnquery = key = value = pe_result
attn, p_attn = attention(query, key, value)
# print("query的注意力表示:", attn) # 2x4x512
# print("注意力张量:", p_attn) # size 2x4x4
#
# print("*****************************************************************")
# 带有mask的输入参数
query = key = value = pe_result
mask = Variable(torch.zeros(2, 4, 4))
attn, p_attn = attention(query, key, value, mask=mask)
# print("query的注意力表示:", attn) # size 2x4x512
# print("注意力张量:", p_attn) # size 2x4x4# 3.多头注意力机制
# 深拷贝
def clones(module, N):return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])class MultiHeadedAttention(nn.Module):def __init__(self, head, embedding_dim, dropout=0.1):super(MultiHeadedAttention, self).__init__()assert embedding_dim % head == 0self.d_k = embedding_dim // headself.head = head# 在多头注意力中,Q,K,V各需要一个,最后拼接的矩阵还需要一个,一共是4个self.linears = clones(nn.Linear(embedding_dim, embedding_dim), 4)self.attn = Noneself.dropout = nn.Dropout(p=dropout)def forward(self, query, key, value, mask=None):if mask is not None:mask = mask.unsqueeze(0)batch_size = query.size(0)query, key, value = [model(x).view(batch_size, -1, self.head, self.d_k).transpose(1, 2)for model, x in zip(self.linears, (query, key, value))]x, self.attn = attention(query, key, value, mask=mask, dropout=self.dropout)x = x.transpose(1, 2).contiguous().view(batch_size, -1, self.head * self.d_k)return self.linears[-1](x)head = 8
embedding_dim = 512
dropout = 0.2
query = value = key = pe_result
mask = Variable(torch.zeros(8, 4, 4))
mha = MultiHeadedAttention(head, embedding_dim, dropout)
mha_result = mha(query, key, value, mask)
# print(mha_result)# 4.前馈全连接层
class PositionwiseFeedForward(nn.Module):def __init__(self, d_model, d_ff, dropout=0.1):super(PositionwiseFeedForward, self).__init__()self.w1 = nn.Linear(d_model, d_ff)self.w2 = nn.Linear(d_ff, d_model)self.dropout = nn.Dropout(dropout)def forward(self, x):return self.w2(self.dropout(F.relu(self.w1(x))))d_model = 512
d_ff = 64
dropout = 0.2
x = mha_result
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
ff_result = ff(x)# 5.规范化层
# 通过LayerNorm实现规范化层的类
class LayerNorm(nn.Module):def __init__(self, features, eps=1e-6):super(LayerNorm, self).__init__()self.a2 = nn.Parameter(torch.ones(features))self.b2 = nn.Parameter(torch.zeros(features))self.eps = epsdef forward(self, x):mean = x.mean(-1, keepdim=True)std = x.std(-1, keepdim=True)return self.a2 * (x - mean) / (std + self.eps) + self.b2features = d_model = 512
eps = 1e-6
x = ff_result
ln = LayerNorm(features, eps)
ln_result = ln(x)# 6.残差连接
class SublayerConnection(nn.Module):def __init__(self, size, dropout=0.1):super(SublayerConnection, self).__init__()self.norm = LayerNorm(size)self.dropout = nn.Dropout(p=dropout)def forward(self, x, sublayer):return x + self.dropout(sublayer(self.norm(x)))size = 512
dropout = 0.2
head = 8
d_model = 512
x = pe_result
mask = Variable(torch.zeros(8, 4, 4))
self_attn = MultiHeadedAttention(head, d_model)
sublayer = lambda x: self_attn(x, x, x, mask)
sc = SublayerConnection(size, dropout)
sc_result = sc(x, sublayer)# 7.编码器层
class EncoderLayer(nn.Module):def __init__(self, size, self_attn, feed_forward, dropout):super(EncoderLayer, self).__init__()self.self_attn = self_attnself.feed_forward = feed_forwardself.sublayer = clones(SublayerConnection(size, dropout), 2)self.size = sizedef forward(self, x, mask):x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, mask))return self.sublayer[1](x, self.feed_forward)size = 512
head = 8
d_model = 512
d_ff = 64
x = pe_result
dropout = 0.2
self_attn = MultiHeadedAttention(head, d_model)
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
mask = Variable(torch.zeros(8, 4, 4))
el = EncoderLayer(size, self_attn, ff, dropout)
el_result = el(x, mask)# 8.编码器
class Encoder(nn.Module):def __init__(self, layer, N):super(Encoder, self).__init__()self.layers = clones(layer, N)self.norm = LayerNorm(layer.size)def forward(self, x, mask):for layer in self.layers:x = layer(x, mask)return self.norm(x)size = 512
head = 8
d_model = 512
d_ff = 64
c = copy.deepcopy
attn = MultiHeadedAttention(head, d_model)
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
dropout = 0.2
layer = EncoderLayer(size, c(attn), c(ff), dropout)
N = 8
mask = Variable(torch.zeros(8, 4, 4))
en = Encoder(layer, N)
en_result = en(x, mask)
# print(en_result)
# print(en_result.shape)
"""
tensor([[[-1.2431e-01, -2.3363e+00, 1.9084e-02, ..., -9.8174e-02,-2.0241e+00, -2.8970e-01],[-3.9608e-01, 5.2420e-02, 2.4076e-02, ..., -1.2182e-01,4.7777e-01, 4.0544e-01],[-6.3494e-01, -2.5631e-03, -1.7992e-01, ..., -5.5367e-02,-4.3454e-02, 1.0005e+00],[-8.5996e-01, 2.6673e+00, 9.2570e-01, ..., 6.2907e-01,3.7063e-01, 6.4456e-01]],[[ 3.3140e-01, 1.4327e+00, 4.1478e-02, ..., 4.5121e-01,-1.7026e+00, 8.7472e-01],[-2.5319e-01, 1.8512e+00, -3.0673e-02, ..., 7.9770e-02,1.1026e-01, -2.9194e-01],[ 1.3375e-01, -1.7779e-01, 2.6414e-03, ..., -5.6526e-01,6.5849e-01, 1.1001e+00],[ 1.5610e+00, -1.4482e+00, 2.5439e-01, ..., -5.4919e-01,-7.2307e-01, 1.4985e+00]]], grad_fn=<AddBackward0>)
torch.Size([2, 4, 512])
"""# 三、解码器部分
# 1.解码器层
class DecoderLayer(nn.Module):def __init__(self, size, self_attn, src_attn, feed_forward, dropout):super(DecoderLayer, self).__init__()self.size = sizeself.self_attn = self_attnself.src_attn = src_attnself.feed_forward = feed_forwardself.sublayer = clones(SublayerConnection(size, dropout), 3)def forward(self, x, memory, source_mask, target_mask):m = memoryx = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, target_mask))x = self.sublayer[1](x, lambda x: self.src_attn(x, m, m, source_mask))return self.sublayer[2](x, self.feed_forward)head = 8
size = 512
d_model = 512
d_ff = 64
dropout = 0.2
self_attn = src_attn = MultiHeadedAttention(head, d_model, dropout)
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
x = pe_result
memory = en_result
mask = Variable(torch.zeros(8, 4, 4))
source_mask = target_mask = mask
dl = DecoderLayer(size, self_attn, src_attn, ff, dropout)
dl_result = dl(x, memory, source_mask, target_mask)
# print(dl_result)
# print(dl_result.shape)
"""
tensor([[[ 1.9604e+00, 3.9288e+01, -5.2422e+01, ..., 2.1041e-01,-5.5063e+01, 1.5233e-01],[ 1.0135e-01, -3.7779e-01, 6.5491e+01, ..., 2.8062e+01,-3.7780e+01, -3.9577e+01],[ 1.9526e+01, -2.5741e+01, 2.6926e-01, ..., -1.5316e+01,1.4543e+00, 2.7714e+00],[-2.1528e+01, 2.0141e+01, 2.1999e+01, ..., 2.2099e+00,-1.7267e+01, -1.6687e+01]],[[ 6.7259e+00, -2.6918e+01, 1.1807e+01, ..., -3.6453e+01,-2.9231e+01, 1.1288e+01],[ 7.7484e+01, -5.0572e-01, -1.3096e+01, ..., 3.6302e-01,1.9907e+01, -1.2160e+00],[ 2.6703e+01, 4.4737e+01, -3.1590e+01, ..., 4.1540e-03,5.2587e+00, 5.2382e+00],[ 4.7435e+01, -3.7599e-01, 5.0898e+01, ..., 5.6361e+00,3.5891e+01, 1.5697e+01]]], grad_fn=<AddBackward0>)
torch.Size([2, 4, 512])
"""# 2.解码器
class Decoder(nn.Module):def __init__(self, layer, N):super(Decoder, self).__init__()self.layers = clones(layer, N)self.norm = LayerNorm(layer.size)def forward(self, x, memory, source_mask, target_mask):for layer in self.layers:x = layer(x, memory, source_mask, target_mask)return self.norm(x)size = 512
d_model = 512
head = 8
d_ff = 64
dropout = 0.2
c = copy.deepcopy
attn = MultiHeadedAttention(head, d_model)
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
layer = DecoderLayer(d_model, c(attn), c(attn), c(ff), dropout)
N = 8
x = pe_result
memory = en_result
mask = Variable(torch.zeros(8, 4, 4))
source_mask = target_mask = mask
de = Decoder(layer, N)
de_result = de(x, memory, source_mask, target_mask)
# print(de_result)
# print(de_result.shape)
"""
tensor([[[ 0.2436, 0.8310, 1.1406, ..., 1.2474, 1.0660, -0.7125],[ 0.8292, -0.1330, -0.2391, ..., -1.0578, -0.8154, 1.4003],[ 0.8909, 0.1255, 0.9115, ..., 0.0775, 0.0753, 0.3909],[-1.9148, 0.2801, 1.7520, ..., -0.7988, -2.0647, -0.5999]],[[ 0.9265, 0.5207, -1.8971, ..., -2.2877, 0.1123, 0.2563],[ 0.8011, 1.0716, -0.0627, ..., -1.2644, 1.6997, 0.8083],[-0.6971, -1.6886, -0.7169, ..., 1.0697, -1.0679, 0.8851],[-0.9620, -0.2029, 1.2966, ..., -0.3927, 1.6059, 1.6047]]],grad_fn=<AddBackward0>)
torch.Size([2, 4, 512])
"""# 四、输出部分
# 线性层和softmax层一起实现, 因为二者的共同目标是生成最后的结构
# 因此把类的名字叫做Generator
class Generator(nn.Module):def __init__(self, d_model, vocab_size):super(Generator, self).__init__()self.project = nn.Linear(d_model, vocab_size)def forward(self, x):return F.log_softmax(self.project(x), dim=-1)d_model = 512
vocab_size = 1000
x = de_result
gen = Generator(d_model, vocab_size)
gen_result = gen(x)
print(gen_result)
print(gen_result.shape)
"""
tensor([[[-7.0677, -6.3155, -6.8694, ..., -6.8623, -6.4482, -7.2010],[-7.8073, -7.6669, -6.3424, ..., -7.0006, -6.8322, -6.1138],[-9.0578, -7.1061, -6.2095, ..., -7.3074, -7.2882, -7.3483],[-8.1861, -7.2428, -6.7725, ..., -6.8366, -7.3286, -6.8935]],[[-7.3694, -6.7055, -6.8839, ..., -6.7879, -6.8398, -7.0582],[-6.5527, -6.8104, -7.6633, ..., -8.0519, -7.0640, -6.3101],[-8.4895, -7.9180, -6.4888, ..., -6.7811, -5.6739, -6.5447],[-6.2718, -7.3904, -7.8301, ..., -6.6355, -5.7487, -8.1378]]],grad_fn=<LogSoftmaxBackward0>)
torch.Size([2, 4, 1000])
"""# 编码器-解码器
class EncoderDecoder(nn.Module):def __init__(self, encoder, decoder, source_embed, target_embed, generator):super(EncoderDecoder, self).__init__()self.encoder = encoderself.decoder = decoderself.src_embed = source_embedself.tgt_embed = target_embedself.generator = generatordef forward(self, source, target, source_mask, target_mask):return self.decode(self.encode(source, source_mask), source_mask,target, target_mask)def encode(self, source, source_mask):return self.encoder(self.src_embed(source), source_mask)def decode(self, memory, source_mask, target, target_mask):return self.decoder(self.tgt_embed(target), memory, source_mask, target_mask)vocab_size = 1000
d_model = 512
encoder = en
decoder = de
source_embed = nn.Embedding(vocab_size, d_model)
target_embed = nn.Embedding(vocab_size, d_model)
generator = gen
source = target = Variable(torch.LongTensor([[100, 2, 421, 508], [491, 998, 1, 221]]))
source_mask = target_mask = Variable(torch.zeros(8, 4, 4))
ed = EncoderDecoder(encoder, decoder, source_embed, target_embed, generator)
ed_result = ed(source, target, source_mask, target_mask)
print(ed_result)
print(ed_result.shape)
"""
tensor([[[ 0.2102, -0.0826, -0.0550, ..., 1.5555, 1.3025, -0.6296],[ 0.8270, -0.5372, -0.9559, ..., 0.3665, 0.4338, -0.7505],[ 0.4956, -0.5133, -0.9323, ..., 1.0773, 1.1913, -0.6240],[ 0.5770, -0.6258, -0.4833, ..., 0.1171, 1.0069, -1.9030]],[[-0.4355, -1.7115, -1.5685, ..., -0.6941, -0.1878, -0.1137],[-0.8867, -1.2207, -1.4151, ..., -0.9618, 0.1722, -0.9562],[-0.0946, -0.9012, -1.6388, ..., -0.2604, -0.3357, -0.6436],[-1.1204, -1.4481, -1.5888, ..., -0.8816, -0.6497, 0.0606]]],grad_fn=<AddBackward0>)
torch.Size([2, 4, 512])
"""
14.transformer模型
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import math
import matplotlib.pyplot as plt
import numpy as np
import copy# 一、输入部分
# 1.文本嵌入层
class Embeddings(nn.Module):def __init__(self, d_model, vocab):super(Embeddings, self).__init__()self.lut = nn.Embedding(vocab, d_model)self.d_model = d_modeldef forward(self, x):return self.lut(x) * math.sqrt(self.d_model)# 2.位置编码器
class PositionalEncoding(nn.Module):def __init__(self, d_model, dropout, max_len=5000):super(PositionalEncoding, self).__init__()self.dropout = nn.Dropout(p=dropout)pe = torch.zeros(max_len, d_model)position = torch.arange(0, max_len).unsqueeze(1)div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))pe[:, 0::2] = torch.sin(position * div_term)pe[:, 1::2] = torch.cos(position * div_term)pe = pe.unsqueeze(0)self.register_buffer('pe', pe)def forward(self, x):x = x + Variable(self.pe[:, :x.size(1)], requires_grad=False)return self.dropout(x)d_model = 512
dropout = 0.1
max_len = 60vocab = 1000
x = Variable(torch.LongTensor([[100, 2, 421, 508], [491, 998, 1, 221]]))
emb = Embeddings(d_model, vocab)
embr = emb(x)x = embr
pe = PositionalEncoding(d_model, dropout, max_len)
pe_result = pe(x)# 绘制词汇向量中特征的分布曲线
plt.figure(figsize=(15, 5)) # 创建一张15 x 5大小的画布
pe = PositionalEncoding(20, 0)
y = pe(Variable(torch.zeros(1, 100, 20)))
plt.plot(np.arange(100), y[0, :, 4:8].data.numpy())
plt.legend(["dim %d" % p for p in [4, 5, 6, 7]])# plt.show()# 二、编码器部分
# 1.掩码张量函数
def subsequent_mask(size):attn_shape = (1, size, size)subsequent_mask = np.triu(np.ones(attn_shape), k=1).astype('uint8')return torch.from_numpy(1 - subsequent_mask)# 掩码张量的可视化
plt.figure(figsize=(5, 5))
plt.imshow(subsequent_mask(20)[0])# plt.show()# 2.注意力机制
def attention(query, key, value, mask=None, dropout=None):d_k = query.size(-1)scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)if mask is not None:scores = scores.masked_fill(mask == 0, -1e9)p_attn = F.softmax(scores, dim=-1)if dropout is not None:p_attn = dropout(p_attn)return torch.matmul(p_attn, value), p_attnquery = key = value = pe_result
attn, p_attn = attention(query, key, value)
# print("query的注意力表示:", attn) # 2x4x512
# print("注意力张量:", p_attn) # size 2x4x4
#
# print("*****************************************************************")
# 带有mask的输入参数
query = key = value = pe_result
mask = Variable(torch.zeros(2, 4, 4))
attn, p_attn = attention(query, key, value, mask=mask)
# print("query的注意力表示:", attn) # size 2x4x512
# print("注意力张量:", p_attn) # size 2x4x4# 3.多头注意力机制
# 深拷贝
def clones(module, N):return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])class MultiHeadedAttention(nn.Module):def __init__(self, head, embedding_dim, dropout=0.1):super(MultiHeadedAttention, self).__init__()assert embedding_dim % head == 0self.d_k = embedding_dim // headself.head = head# 在多头注意力中,Q,K,V各需要一个,最后拼接的矩阵还需要一个,一共是4个self.linears = clones(nn.Linear(embedding_dim, embedding_dim), 4)self.attn = Noneself.dropout = nn.Dropout(p=dropout)def forward(self, query, key, value, mask=None):if mask is not None:mask = mask.unsqueeze(0)batch_size = query.size(0)query, key, value = [model(x).view(batch_size, -1, self.head, self.d_k).transpose(1, 2)for model, x in zip(self.linears, (query, key, value))]x, self.attn = attention(query, key, value, mask=mask, dropout=self.dropout)x = x.transpose(1, 2).contiguous().view(batch_size, -1, self.head * self.d_k)return self.linears[-1](x)head = 8
embedding_dim = 512
dropout = 0.2
query = value = key = pe_result
mask = Variable(torch.zeros(8, 4, 4))
mha = MultiHeadedAttention(head, embedding_dim, dropout)
mha_result = mha(query, key, value, mask)
# print(mha_result)# 4.前馈全连接层
class PositionwiseFeedForward(nn.Module):def __init__(self, d_model, d_ff, dropout=0.1):super(PositionwiseFeedForward, self).__init__()self.w1 = nn.Linear(d_model, d_ff)self.w2 = nn.Linear(d_ff, d_model)self.dropout = nn.Dropout(dropout)def forward(self, x):return self.w2(self.dropout(F.relu(self.w1(x))))d_model = 512
d_ff = 64
dropout = 0.2
x = mha_result
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
ff_result = ff(x)# 5.规范化层
# 通过LayerNorm实现规范化层的类
class LayerNorm(nn.Module):def __init__(self, features, eps=1e-6):super(LayerNorm, self).__init__()self.a2 = nn.Parameter(torch.ones(features))self.b2 = nn.Parameter(torch.zeros(features))self.eps = epsdef forward(self, x):mean = x.mean(-1, keepdim=True)std = x.std(-1, keepdim=True)return self.a2 * (x - mean) / (std + self.eps) + self.b2features = d_model = 512
eps = 1e-6
x = ff_result
ln = LayerNorm(features, eps)
ln_result = ln(x)# 6.残差连接
class SublayerConnection(nn.Module):def __init__(self, size, dropout=0.1):super(SublayerConnection, self).__init__()self.norm = LayerNorm(size)self.dropout = nn.Dropout(p=dropout)def forward(self, x, sublayer):return x + self.dropout(sublayer(self.norm(x)))size = 512
dropout = 0.2
head = 8
d_model = 512
x = pe_result
mask = Variable(torch.zeros(8, 4, 4))
self_attn = MultiHeadedAttention(head, d_model)
sublayer = lambda x: self_attn(x, x, x, mask)
sc = SublayerConnection(size, dropout)
sc_result = sc(x, sublayer)# 7.编码器层
class EncoderLayer(nn.Module):def __init__(self, size, self_attn, feed_forward, dropout):super(EncoderLayer, self).__init__()self.self_attn = self_attnself.feed_forward = feed_forwardself.sublayer = clones(SublayerConnection(size, dropout), 2)self.size = sizedef forward(self, x, mask):x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, mask))return self.sublayer[1](x, self.feed_forward)size = 512
head = 8
d_model = 512
d_ff = 64
x = pe_result
dropout = 0.2
self_attn = MultiHeadedAttention(head, d_model)
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
mask = Variable(torch.zeros(8, 4, 4))
el = EncoderLayer(size, self_attn, ff, dropout)
el_result = el(x, mask)# 8.编码器
class Encoder(nn.Module):def __init__(self, layer, N):super(Encoder, self).__init__()self.layers = clones(layer, N)self.norm = LayerNorm(layer.size)def forward(self, x, mask):for layer in self.layers:x = layer(x, mask)return self.norm(x)size = 512
head = 8
d_model = 512
d_ff = 64
c = copy.deepcopy
attn = MultiHeadedAttention(head, d_model)
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
dropout = 0.2
layer = EncoderLayer(size, c(attn), c(ff), dropout)
N = 8
mask = Variable(torch.zeros(8, 4, 4))
en = Encoder(layer, N)
en_result = en(x, mask)
# print(en_result)
# print(en_result.shape)
"""
tensor([[[-1.2431e-01, -2.3363e+00, 1.9084e-02, ..., -9.8174e-02,-2.0241e+00, -2.8970e-01],[-3.9608e-01, 5.2420e-02, 2.4076e-02, ..., -1.2182e-01,4.7777e-01, 4.0544e-01],[-6.3494e-01, -2.5631e-03, -1.7992e-01, ..., -5.5367e-02,-4.3454e-02, 1.0005e+00],[-8.5996e-01, 2.6673e+00, 9.2570e-01, ..., 6.2907e-01,3.7063e-01, 6.4456e-01]],[[ 3.3140e-01, 1.4327e+00, 4.1478e-02, ..., 4.5121e-01,-1.7026e+00, 8.7472e-01],[-2.5319e-01, 1.8512e+00, -3.0673e-02, ..., 7.9770e-02,1.1026e-01, -2.9194e-01],[ 1.3375e-01, -1.7779e-01, 2.6414e-03, ..., -5.6526e-01,6.5849e-01, 1.1001e+00],[ 1.5610e+00, -1.4482e+00, 2.5439e-01, ..., -5.4919e-01,-7.2307e-01, 1.4985e+00]]], grad_fn=<AddBackward0>)
torch.Size([2, 4, 512])
"""# 三、解码器部分
# 1.解码器层
class DecoderLayer(nn.Module):def __init__(self, size, self_attn, src_attn, feed_forward, dropout):super(DecoderLayer, self).__init__()self.size = sizeself.self_attn = self_attnself.src_attn = src_attnself.feed_forward = feed_forwardself.sublayer = clones(SublayerConnection(size, dropout), 3)def forward(self, x, memory, source_mask, target_mask):m = memoryx = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, target_mask))x = self.sublayer[1](x, lambda x: self.src_attn(x, m, m, source_mask))return self.sublayer[2](x, self.feed_forward)head = 8
size = 512
d_model = 512
d_ff = 64
dropout = 0.2
self_attn = src_attn = MultiHeadedAttention(head, d_model, dropout)
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
x = pe_result
memory = en_result
mask = Variable(torch.zeros(8, 4, 4))
source_mask = target_mask = mask
dl = DecoderLayer(size, self_attn, src_attn, ff, dropout)
dl_result = dl(x, memory, source_mask, target_mask)
# print(dl_result)
# print(dl_result.shape)
"""
tensor([[[ 1.9604e+00, 3.9288e+01, -5.2422e+01, ..., 2.1041e-01,-5.5063e+01, 1.5233e-01],[ 1.0135e-01, -3.7779e-01, 6.5491e+01, ..., 2.8062e+01,-3.7780e+01, -3.9577e+01],[ 1.9526e+01, -2.5741e+01, 2.6926e-01, ..., -1.5316e+01,1.4543e+00, 2.7714e+00],[-2.1528e+01, 2.0141e+01, 2.1999e+01, ..., 2.2099e+00,-1.7267e+01, -1.6687e+01]],[[ 6.7259e+00, -2.6918e+01, 1.1807e+01, ..., -3.6453e+01,-2.9231e+01, 1.1288e+01],[ 7.7484e+01, -5.0572e-01, -1.3096e+01, ..., 3.6302e-01,1.9907e+01, -1.2160e+00],[ 2.6703e+01, 4.4737e+01, -3.1590e+01, ..., 4.1540e-03,5.2587e+00, 5.2382e+00],[ 4.7435e+01, -3.7599e-01, 5.0898e+01, ..., 5.6361e+00,3.5891e+01, 1.5697e+01]]], grad_fn=<AddBackward0>)
torch.Size([2, 4, 512])
"""# 2.解码器
class Decoder(nn.Module):def __init__(self, layer, N):super(Decoder, self).__init__()self.layers = clones(layer, N)self.norm = LayerNorm(layer.size)def forward(self, x, memory, source_mask, target_mask):for layer in self.layers:x = layer(x, memory, source_mask, target_mask)return self.norm(x)size = 512
d_model = 512
head = 8
d_ff = 64
dropout = 0.2
c = copy.deepcopy
attn = MultiHeadedAttention(head, d_model)
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
layer = DecoderLayer(d_model, c(attn), c(attn), c(ff), dropout)
N = 8
x = pe_result
memory = en_result
mask = Variable(torch.zeros(8, 4, 4))
source_mask = target_mask = mask
de = Decoder(layer, N)
de_result = de(x, memory, source_mask, target_mask)
# print(de_result)
# print(de_result.shape)
"""
tensor([[[ 0.2436, 0.8310, 1.1406, ..., 1.2474, 1.0660, -0.7125],[ 0.8292, -0.1330, -0.2391, ..., -1.0578, -0.8154, 1.4003],[ 0.8909, 0.1255, 0.9115, ..., 0.0775, 0.0753, 0.3909],[-1.9148, 0.2801, 1.7520, ..., -0.7988, -2.0647, -0.5999]],[[ 0.9265, 0.5207, -1.8971, ..., -2.2877, 0.1123, 0.2563],[ 0.8011, 1.0716, -0.0627, ..., -1.2644, 1.6997, 0.8083],[-0.6971, -1.6886, -0.7169, ..., 1.0697, -1.0679, 0.8851],[-0.9620, -0.2029, 1.2966, ..., -0.3927, 1.6059, 1.6047]]],grad_fn=<AddBackward0>)
torch.Size([2, 4, 512])
"""# 四、输出部分
# 线性层和softmax层一起实现, 因为二者的共同目标是生成最后的结构
# 因此把类的名字叫做Generator
class Generator(nn.Module):def __init__(self, d_model, vocab_size):super(Generator, self).__init__()self.project = nn.Linear(d_model, vocab_size)def forward(self, x):return F.log_softmax(self.project(x), dim=-1)d_model = 512
vocab_size = 1000
x = de_result
gen = Generator(d_model, vocab_size)
gen_result = gen(x)
print(gen_result)
print(gen_result.shape)
"""
tensor([[[-7.0677, -6.3155, -6.8694, ..., -6.8623, -6.4482, -7.2010],[-7.8073, -7.6669, -6.3424, ..., -7.0006, -6.8322, -6.1138],[-9.0578, -7.1061, -6.2095, ..., -7.3074, -7.2882, -7.3483],[-8.1861, -7.2428, -6.7725, ..., -6.8366, -7.3286, -6.8935]],[[-7.3694, -6.7055, -6.8839, ..., -6.7879, -6.8398, -7.0582],[-6.5527, -6.8104, -7.6633, ..., -8.0519, -7.0640, -6.3101],[-8.4895, -7.9180, -6.4888, ..., -6.7811, -5.6739, -6.5447],[-6.2718, -7.3904, -7.8301, ..., -6.6355, -5.7487, -8.1378]]],grad_fn=<LogSoftmaxBackward0>)
torch.Size([2, 4, 1000])
"""# 编码器-解码器
class EncoderDecoder(nn.Module):def __init__(self, encoder, decoder, source_embed, target_embed, generator):super(EncoderDecoder, self).__init__()self.encoder = encoderself.decoder = decoderself.src_embed = source_embedself.tgt_embed = target_embedself.generator = generatordef forward(self, source, target, source_mask, target_mask):return self.decode(self.encode(source, source_mask), source_mask,target, target_mask)def encode(self, source, source_mask):return self.encoder(self.src_embed(source), source_mask)def decode(self, memory, source_mask, target, target_mask):return self.decoder(self.tgt_embed(target), memory, source_mask, target_mask)vocab_size = 1000
d_model = 512
encoder = en
decoder = de
source_embed = nn.Embedding(vocab_size, d_model)
target_embed = nn.Embedding(vocab_size, d_model)
generator = gen
source = target = Variable(torch.LongTensor([[100, 2, 421, 508], [491, 998, 1, 221]]))
source_mask = target_mask = Variable(torch.zeros(8, 4, 4))
ed = EncoderDecoder(encoder, decoder, source_embed, target_embed, generator)
ed_result = ed(source, target, source_mask, target_mask)
# print(ed_result)
# print(ed_result.shape)
"""
tensor([[[ 0.2102, -0.0826, -0.0550, ..., 1.5555, 1.3025, -0.6296],[ 0.8270, -0.5372, -0.9559, ..., 0.3665, 0.4338, -0.7505],[ 0.4956, -0.5133, -0.9323, ..., 1.0773, 1.1913, -0.6240],[ 0.5770, -0.6258, -0.4833, ..., 0.1171, 1.0069, -1.9030]],[[-0.4355, -1.7115, -1.5685, ..., -0.6941, -0.1878, -0.1137],[-0.8867, -1.2207, -1.4151, ..., -0.9618, 0.1722, -0.9562],[-0.0946, -0.9012, -1.6388, ..., -0.2604, -0.3357, -0.6436],[-1.1204, -1.4481, -1.5888, ..., -0.8816, -0.6497, 0.0606]]],grad_fn=<AddBackward0>)
torch.Size([2, 4, 512])
"""# Tansformer模型
def make_model(source_vocab, target_vocab, N=6,d_model=512, d_ff=2048, head=8, dropout=0.1):c = copy.deepcopyattn = MultiHeadedAttention(head, d_model)ff = PositionwiseFeedForward(d_model, d_ff, dropout)position = PositionalEncoding(d_model, dropout)model = EncoderDecoder(Encoder(EncoderLayer(d_model, c(attn), c(ff), dropout), N),Decoder(DecoderLayer(d_model, c(attn), c(attn),c(ff), dropout), N),nn.Sequential(Embeddings(d_model, source_vocab), c(position)),nn.Sequential(Embeddings(d_model, target_vocab), c(position)),Generator(d_model, target_vocab))for p in model.parameters():if p.dim() > 1:nn.init.xavier_uniform(p)return modelsource_vocab = 11
target_vocab = 11
N = 6
if __name__ == '__main__':res = make_model(source_vocab, target_vocab, N)print(res)
"""
EncoderDecoder((encoder): Encoder((layers): ModuleList((0): EncoderLayer((self_attn): MultiHeadedAttention((linears): ModuleList((0): Linear(in_features=512, out_features=512)(1): Linear(in_features=512, out_features=512)(2): Linear(in_features=512, out_features=512)(3): Linear(in_features=512, out_features=512))(dropout): Dropout(p=0.1))(feed_forward): PositionwiseFeedForward((w_1): Linear(in_features=512, out_features=2048)(w_2): Linear(in_features=2048, out_features=512)(dropout): Dropout(p=0.1))(sublayer): ModuleList((0): SublayerConnection((norm): LayerNorm()(dropout): Dropout(p=0.1))(1): SublayerConnection((norm): LayerNorm()(dropout): Dropout(p=0.1))))(1): EncoderLayer((self_attn): MultiHeadedAttention((linears): ModuleList((0): Linear(in_features=512, out_features=512)(1): Linear(in_features=512, out_features=512)(2): Linear(in_features=512, out_features=512)(3): Linear(in_features=512, out_features=512))(dropout): Dropout(p=0.1))(feed_forward): PositionwiseFeedForward((w_1): Linear(in_features=512, out_features=2048)(w_2): Linear(in_features=2048, out_features=512)(dropout): Dropout(p=0.1))(sublayer): ModuleList((0): SublayerConnection((norm): LayerNorm()(dropout): Dropout(p=0.1))(1): SublayerConnection((norm): LayerNorm()(dropout): Dropout(p=0.1)))))(norm): LayerNorm())(decoder): Decoder((layers): ModuleList((0): DecoderLayer((self_attn): MultiHeadedAttention((linears): ModuleList((0): Linear(in_features=512, out_features=512)(1): Linear(in_features=512, out_features=512)(2): Linear(in_features=512, out_features=512)(3): Linear(in_features=512, out_features=512))(dropout): Dropout(p=0.1))(src_attn): MultiHeadedAttention((linears): ModuleList((0): Linear(in_features=512, out_features=512)(1): Linear(in_features=512, out_features=512)(2): Linear(in_features=512, out_features=512)(3): Linear(in_features=512, out_features=512))(dropout): Dropout(p=0.1))(feed_forward): PositionwiseFeedForward((w_1): Linear(in_features=512, out_features=2048)(w_2): Linear(in_features=2048, out_features=512)(dropout): Dropout(p=0.1))(sublayer): ModuleList((0): SublayerConnection((norm): LayerNorm()(dropout): Dropout(p=0.1))(1): SublayerConnection((norm): LayerNorm()(dropout): Dropout(p=0.1))(2): SublayerConnection((norm): LayerNorm()(dropout): Dropout(p=0.1))))(1): DecoderLayer((self_attn): MultiHeadedAttention((linears): ModuleList((0): Linear(in_features=512, out_features=512)(1): Linear(in_features=512, out_features=512)(2): Linear(in_features=512, out_features=512)(3): Linear(in_features=512, out_features=512))(dropout): Dropout(p=0.1))(src_attn): MultiHeadedAttention((linears): ModuleList((0): Linear(in_features=512, out_features=512)(1): Linear(in_features=512, out_features=512)(2): Linear(in_features=512, out_features=512)(3): Linear(in_features=512, out_features=512))(dropout): Dropout(p=0.1))(feed_forward): PositionwiseFeedForward((w_1): Linear(in_features=512, out_features=2048)(w_2): Linear(in_features=2048, out_features=512)(dropout): Dropout(p=0.1))(sublayer): ModuleList((0): SublayerConnection((norm): LayerNorm()(dropout): Dropout(p=0.1))(1): SublayerConnection((norm): LayerNorm()(dropout): Dropout(p=0.1))(2): SublayerConnection((norm): LayerNorm()(dropout): Dropout(p=0.1)))))(norm): LayerNorm())(src_embed): Sequential((0): Embeddings((lut): Embedding(11, 512))(1): PositionalEncoding((dropout): Dropout(p=0.1)))(tgt_embed): Sequential((0): Embeddings((lut): Embedding(11, 512))(1): PositionalEncoding((dropout): Dropout(p=0.1)))(generator): Generator((proj): Linear(in_features=512, out_features=11))
)
"""
六、实战项目
基于Transformer的机器翻译模型
-
seq2seq架构
-
seq2seq模型架构分析
-
从图中可知, seq2seq模型架构, 包括两部分分别是encoder(编码器)和decoder(解码器), 编码器和解码器的内部实现都使用了GRU模型, 这里它要完成的是一个中文到英文的翻译: 欢迎 来 北京 --> welcome to BeiJing. 编码器首先处理中文输入"欢迎 来 北京", 通过GRU模型获得每个时间步的输出张量,最后将它们拼接成一个中间语义张量c, 接着解码器将使用这个中间语义张量c以及每一个时间步的隐层张量, 逐个生成对应的翻译语言.
-
-
Transformer模型架构分析
-
从图中可知, Transformer模型架构, 大范围内包括两部分分别是encoder(编码器)和decoder(解码器), 编码器和解码器的内部实现都使用了注意力机制实现, 这里它要完成的是一个德文到英文的翻译: Willkommen in peking --> welcome to BeiJing. 编码器首先处理中文输入"Willkommen in peking", 通过Transformer内部的注意力机制提取信息之后的输出张量,就是一个中间语义张量c, 接着解码器将使用这个中间语义张量c以及每一个时间步的目标语言的语义张量, 逐个生成对应的翻译语言.
-
-
数据集:
-
使用的是torchtext中自带的数据集Multi30k, 直接可以使用内置的API函数即可下载
-
# 默认下载的路径为: /root/.torchtext/cache/Multi30k └── Multi30k├── mmt16_task1_test.tar.gz├── test.de├── test.en├── train.de├── train.en├── training.tar.gz├── val.de├── val.en└── validation.tar.gz
数据说明:
train.en和train.de数据量为: 29001条, val.de和val.en数据量为:1015条 test.de和test.en数据量为:1000条
基于Transformer模型架构实现机器翻译过程
-
第一步: 导入必备的工具包
-
第二步: 导入Multi30k数据集并做基本处理
-
第三步: 构建Transformer模型
-
第四步: 定义mask的函数, 创建对应的不同的mask
-
第五步: 定义批次数据处理的回调函数
-
第六步: 构建训练函数和评估函数
-
第七步: 训练Transformer模型
-
第八步: 进行解码生成目标语言语句
-
第九步: 模型的保存和重加载
第一步: 导入必备的工具包
-
pytorch版本使用的是1.10.1, python版本使用的是3.7.x
pip install torch==1.10.1
import torch import torch.nn as nn import math from torchtext.data.utils import get_tokenizer from torchtext.vocab import build_vocab_from_iterator from torchtext.datasets import Multi30k from typing import Iterable, List from torch import Tensor from torch.nn import Transformer from torch.nn.utils.rnn import pad_sequence from torch.utils.data import DataLoader from timeit import default_timer as timer DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
第二步: 导入Multi30k数据集并做基本处理
加载对应的tokenizer
# 源语言是德语 SRC_LANGUAGE = 'de' # 目标语言是英语 TGT_LANGUAGE = 'en' # 定义token的字典, 定义vocab字典 token_transform = {} vocab_transform = {} # 创建源语言和目标语言的kokenizer, 确保依赖关系已经安装 # pip install -U spacy # python -m spacy download en_core_web_sm # python -m spacy download de_core_news_sm # get_tokenizer是分词函数, 如果没有特殊的则按照英语的空格分割, 如果有这按照对应的分词库返回. 比如spacy, 返回对应的分词库 token_transform[SRC_LANGUAGE] = get_tokenizer('spacy', language='de_core_news_sm') token_transform[TGT_LANGUAGE] = get_tokenizer('spacy', language='en_core_web_sm')
构建生成分词的迭代器
def yield_tokens(data_iter: Iterable, language: str):# data_iter: 对象的迭代对象 Multi30k对象# language: 对应的翻译语言 {'de': 0, 'en': 1}language_index = {SRC_LANGUAGE: 0, TGT_LANGUAGE: 1}# 返回对应的数据迭代器对象for data_sample in data_iter:# data_sample:(德文, 英文)# data_sample:('Zwei junge weiße Männer sind im Freien in der Nähe vieler Büsche.\n', 'Two young, White males are outside near many bushes.\n')# token_transform['de']()=['Zwei', 'junge', 'weiße', 'Männer', 'sind', 'im', 'Freien', 'in', 'der', 'Nähe', 'vieler', 'Büsche', '.', '\n']# or token_transform['en']分别进行构造对应的字典yield token_transform[language](data_sample[language_index[language]])
定义特殊字符并下载数据设置默认索引
# 定义特殊字符及其对应的索引值 UNK_IDX, PAD_IDX, BOS_IDX, EOS_IDX = 0, 1, 2, 3 # 确保标记按其索引的顺序正确插入到词汇表中 special_symbols = ['<unk>', '<pad>', '<bos>', '<eos>'] for ln in [SRC_LANGUAGE, TGT_LANGUAGE]:# 训练数据集的迭代器,# 数据集是用英文描述图像的英文语句, 然后人工将其翻译为德文的语句,有两个文件, 一个是train.de 一个是train.en文件,# 然后将其构建为(德文, 英文)的形式train_iter = Multi30k(split='train', language_pair=(SRC_LANGUAGE, TGT_LANGUAGE))# 创建torchtext的vocab对象, 即词汇表vocab_transform[ln] = build_vocab_from_iterator(yield_tokens(train_iter, ln), # 用于构建 Vocab 的迭代器。必须产生令牌列表或迭代器min_freq=1,#在词汇表中包含一个标记所需的最低频率specials=special_symbols, # 用于添加的特殊字符special_first=True) # 指示是在开头还是结尾插入符号 # 将 UNK_IDX 设置为默认索引。未找到令牌时返回此索引 # 如果未设置,则在 Vocabulary 中找不到查询的标记时抛出 RuntimeError for ln in [SRC_LANGUAGE, TGT_LANGUAGE]:vocab_transform[ln].set_default_index(UNK_IDX)
第三步: 构建Transformer模型
定义位置编码器类
class PositionalEncoding(nn.Module):def __init__(self,emb_size: int,dropout: float, maxlen: int = 5000):'''emb_size: 词嵌入的维度大小dropout: 正则化的大小maxlen: 句子的最大长度'''super(PositionalEncoding, self).__init__()# 将1000的2i/d_model变型为e的指数形式den = torch.exp(- torch.arange(0, emb_size, 2)* math.log(10000) / emb_size)# 效果等价与torch.arange(0, maxlen).unsqueeze(1)pos = torch.arange(0, maxlen).reshape(maxlen, 1)# 构建一个(maxlen, emb_size)大小的全零矩阵pos_embedding = torch.zeros((maxlen, emb_size))# 偶数列是正弦函数填充pos_embedding[:, 0::2] = torch.sin(pos * den)# 奇数列是余弦函数填充pos_embedding[:, 1::2] = torch.cos(pos * den)# 将其维度变成三维, 为了后期方便计算pos_embedding = pos_embedding.unsqueeze(-2)# 添加dropout层, 防止过拟合self.dropout = nn.Dropout(dropout)'''向模块添加持久缓冲区。这通常用于注册不应被视为模型参数的缓冲区。例如,pos_embedding不是一个参数,而是持久状态的一部分。缓冲区可以使用给定的名称作为属性访问。说明:应该就是在内存中定义一个常量,同时,模型保存和加载的时候可以写入和读出'''self.register_buffer('pos_embedding', pos_embedding) def forward(self, token_embedding: Tensor):# 将token_embedding和位置编码相融合return self.dropout(token_embedding + self.pos_embedding[:token_embedding.size(0), :])
定义词嵌入层类
class TokenEmbedding(nn.Module):def __init__(self, vocab_size: int, emb_size):'''vocab_size:词表的大小emb_size:词嵌入的维度'''super(TokenEmbedding, self).__init__()# 调用nn中的预定义层Embedding, 获取一个词嵌入对象self.embeddingself.embedding = nn.Embedding(vocab_size, emb_size)# 将emb_size传入类内, 变成类内的变量self.emb_size = emb_size def forward(self, tokens: Tensor):# 让 embeddings vector 在增加 之后的 postion encoing 之前相对大一些的操作,# 主要是为了让position encoding 相对的小,这样会让原来的 embedding vector 中的信息在和 position encoding 的信息相加时不至于丢失掉# 让 embeddings vector 相对大一些return self.embedding(tokens.long()) * math.sqrt(self.emb_size)
构建Seq2SeqTransformer模型
class Seq2SeqTransformer(nn.Module):def __init__(self,num_encoder_layers, num_decoder_layers,emb_size,nhead,src_vocab_size,tgt_vocab_size,dim_feedforward = 512,dropout = 0.1):'''num_encoder_layers: 编码器的层数num_decoder_layers: 解码器的层数emb_size: 词嵌入的维度nhead: 头数src_vocab_size: 源语言的词表大小tgt_vocab_size: 目标语言的词表大小dim_feedforward: 前馈全连接层的维度dropout: 正则化的大小'''# 继承nn.Module类, 一般继承习惯行的写法super(Seq2SeqTransformer, self).__init__()# 创建Transformer对象self.transformer = Transformer(d_model=emb_size,nhead=nhead,num_encoder_layers=num_encoder_layers,num_decoder_layers=num_decoder_layers,dim_feedforward=dim_feedforward,dropout=dropout)# 创建全连接线性层self.generator = nn.Linear(emb_size, tgt_vocab_size)# 创建源语言的embedding层self.src_tok_emb = TokenEmbedding(src_vocab_size, emb_size)# 创建目标语言的embedding层self.tgt_tok_emb = TokenEmbedding(tgt_vocab_size, emb_size)# 创建位置编码器层对象self.positional_encoding = PositionalEncoding(emb_size, dropout=dropout) def forward(self,src, trg, src_mask, tgt_mask, src_padding_mask, tgt_padding_mask, memory_key_padding_mask):'''src: 源语言trg: 目标语言src_mask: 源语言掩码tgt_mask: 目标语言掩码src_padding_mask: 源语言的padding_masktgt_padding_mask: 目标语言的padding_maskmemory_key_padding_mask: 中间语义张量的padding_mask'''# 获取源语言的embedding张量融合了位置编码src_emb = self.positional_encoding(self.src_tok_emb(src))# 获取目标语言的embedding张量融合了位置编码tgt_emb = self.positional_encoding(self.tgt_tok_emb(trg))# 经过Transformer进行编解码之后输出out值outs = self.transformer(src_emb, tgt_emb, src_mask, tgt_mask, None, src_padding_mask, tgt_padding_mask, memory_key_padding_mask)# outs值经过输出层得到最后的输出分布值return self.generator(outs)# 定义Transformer的编码器def encode(self, src, src_mask):'''src:源语言src_mask:源语言掩码'''return self.transformer.encoder(self.positional_encoding(self.src_tok_emb(src)), src_mask)# 定义Transformer的解码器def decode(self, tgt, memory, tgt_mask):'''tgt:目标语言memory:中间语言张量输出tgt_mask: 目标语言的掩码'''return self.transformer.decoder(self.positional_encoding(self.tgt_tok_emb(tgt)), memory, tgt_mask)
第四步: 定义mask的函数, 创建对应的不同的mask
定义掩码, 以防止模型在进行预测的过程中查看到未来的单词. 同时需要掩码来隐藏源语言和目标语言的padding tokens
def generate_square_subsequent_mask(sz):# sz: 句子的长度# triu生成的是上三角, 经过transpose之后变成了下三角矩阵mask = (torch.triu(torch.ones((sz, sz), device=DEVICE)) == 1).transpose(0, 1)# 将0的位置填充负无穷小, 将1的位置填充为0mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))return mask def create_mask(src, tgt):'''src: 源语言张量形状为: [seq_length , batch_size]tgt: 目标语言张量形状为: [seq_length , batch_size]'''# 获取源语言的句子长度src_seq_len = src.shape[0]# 获取目标语言的句子长度tgt_seq_len = tgt.shape[0]# 产生目标语言的掩码张量tgt_mask = generate_square_subsequent_mask(tgt_seq_len)# 产生源语言的掩码张量src_mask = torch.zeros((src_seq_len, src_seq_len),device=DEVICE).type(torch.bool)# 构建源语言的padding_mask src_padding_mask==> [batch_size, seq_len]src_padding_mask = (src == PAD_IDX).transpose(0, 1)# 构建目标语言的padding_mask tgt_paddig_mask ==> [batch_size, seq_len-1]tgt_padding_mask = (tgt == PAD_IDX).transpose(0, 1)return src_mask, tgt_mask, src_padding_mask, tgt_padding_mask
第五步: 定义批次数据处理的回调函数
将字符串转化为整数的tensor张量
# 将句子字符转化为对应的tensor张量 def sequential_transforms(*transforms):'''Transformers中会传入三个迭代器: 第一个是Tokenization的, 第二个是Numericalization, 第三个是Add BOS/EOS and create tensor'''def func(txt_input):# 循环三个迭代器, 第一个进行语句的分割, 第二个将对应的词语映射为对应的张量表示, 第三个是在整个句子的首尾部分添加起始和结束标志.for transform in transforms:txt_input = transform(txt_input)return txt_inputreturn func
在句子首尾添加起始和结束符号
# 辅助函数, 完成句子首尾BOS/EOS的添加过程 def tensor_transform(token_ids: List[int]):# 添加的是列表形式的数据, 将BOS和EOS添加到句子的首尾部分return torch.cat((torch.tensor([BOS_IDX]),torch.tensor(token_ids),torch.tensor([EOS_IDX]))) text_transform = {} # 循环添加源语言和目标语言 for ln in [SRC_LANGUAGE, TGT_LANGUAGE]:text_transform[ln] = sequential_transforms(token_transform[ln], #Tokenizationvocab_transform[ln], #Numericalizationtensor_transform) # Add BOS/EOS and create tensor
数据进行批次化处理
# 按照批次进行源语言和目标语言的组装 def collate_fn(batch):# 定义源语言和目标语言的批次列表src_batch, tgt_batch = [], []# 循环批次样本for src_sample, tgt_sample in batch:# 添加源语言句子到列表中src_batch.append(text_transform[SRC_LANGUAGE](src_sample.rstrip("\n")))# 添加目标语言句子到列表中tgt_batch.append(text_transform[TGT_LANGUAGE](tgt_sample.rstrip("\n")))# 将源语言和目标语言进行截断补齐 PAD_IDX=1# src_batch的形状为: [seq_length, batch] seq_length是最长的句子长度src_batch = pad_sequence(src_batch, padding_value=PAD_IDX)# tgt_batch的形状为: [seq_length, batch] seq_length是最长的句子长度tgt_batch = pad_sequence(tgt_batch, padding_value=PAD_IDX)return src_batch, tgt_batch
第六步: 构建训练函数和评估函数
实例化模型并定义损失函数和优化器
# 设置种子用于生成随机数,以使得结果是确定的 torch.manual_seed(0) # 设置调用时候使用的参数 SRC_VOCAB_SIZE = len(vocab_transform[SRC_LANGUAGE]) TGT_VOCAB_SIZE = len(vocab_transform[TGT_LANGUAGE]) EMB_SIZE = 512 NHEAD = 8 FFN_HID_DIM = 512 BATCH_SIZE = 128 NUM_ENCODER_LAYERS = 3 NUM_DECODER_LAYERS = 3 # 实例化Transformer对象 transformer = Seq2SeqTransformer(NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, EMB_SIZE,NHEAD, SRC_VOCAB_SIZE, TGT_VOCAB_SIZE, FFN_HID_DIM) # 为了保证每层的输入和输出的方差相同, 防止梯度消失问题 for p in transformer.parameters():if p.dim() > 1:# 此处使用的是xavier的均匀分布nn.init.xavier_uniform_(p) # 如果有GPU则将模型移动到GPU上 transformer = transformer.to(DEVICE) # 定义损失函数 loss_fn = torch.nn.CrossEntropyLoss(ignore_index=PAD_IDX) # 定义优化器 betas: 用于计算梯度及其平方的运行平均值的系数 eps:添加到分母以提高数值稳定性 optimizer = torch.optim.Adam(transformer.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)
定义批次训练函数
def train_epoch(model, optimizer):# 开启训练模式model.train()# 定义其实的损失值为0losses = 0# 获取训练数据集的迭代器, 语言对为(de, en)train_iter = Multi30k(split='train', language_pair=(SRC_LANGUAGE, TGT_LANGUAGE))# 加载数据, 按照一个批次一个批次进行加载, 返回一个迭代器train_dataloader = DataLoader(train_iter, batch_size=BATCH_SIZE, collate_fn=collate_fn)# 循环数据迭代器for src, tgt in train_dataloader:# 将源语言数据移动到对应的设备上去src = src.to(DEVICE)# 将目标语言数据移动到对应设备上去tgt = tgt.to(DEVICE)# 获取输入真实的张量 第一个单词到倒数第二个单词tgt_input = tgt[:-1, :]# 调用mask函数, 生成对应的四个masksrc_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)# 调用模型进行训练, 得到最后的张量分布logits = model(src, tgt_input, src_mask, tgt_mask,src_padding_mask, tgt_padding_mask, src_padding_mask)# 梯度清零optimizer.zero_grad()# 获取输出真实的标签数据 第二个单词到最后一个单词tgt_out = tgt[1:, :]# 计算损失loss = loss_fn(logits.reshape(-1, logits.shape[-1]), tgt_out.reshape(-1))# 反向传播loss.backward()# 梯度更新optimizer.step()# 损失值累加求和losses += loss.item()# 返回平均损失值return losses / len(train_dataloader)
定义批次评估函数
def evaluate(model):# 开启模型评估模式model.eval()# 定义起始损失值losses = 0# 加载验证数据集, 语言对为(de, en)val_iter = Multi30k(split='valid', language_pair=(SRC_LANGUAGE, TGT_LANGUAGE))# 返回验证集的数据加载器val_dataloader = DataLoader(val_iter, batch_size=BATCH_SIZE, collate_fn=collate_fn)# 循环验证集for src, tgt in val_dataloader:# 源语言数据移动到对应的设备上src = src.to(DEVICE)# 目标语言数据移动到对应的设备上tgt = tgt.to(DEVICE)# 获取输入的真实的张量tgt_input = tgt[:-1, :]# 调用mask函数, 产生对应的四个mask值src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)# 调用模型, 得到对应的输出分布值logits = model(src, tgt_input, src_mask, tgt_mask,src_padding_mask, tgt_padding_mask, src_padding_mask)# 获取输出的真实张量tgt_out = tgt[1:, :]# 计算损失值loss = loss_fn(logits.reshape(-1, logits.shape[-1]), tgt_out.reshape(-1))# 损失值累加, 求和losses += loss.item()# 求得对应的平均损失return losses / len(val_dataloader)
####第七步: 训练Transformer模型
利用循环训练Transformer模型
# 定义epoch的次数 NUM_EPOCHS = 18 # 循环整个数据集num_epochs次 for epoch in range(1, NUM_EPOCHS+1):# 获取开始时间start_time = timer()# 将整个训练数据集进行训练train_loss = train_epoch(transformer, optimizer)# 获取结束时间end_time = timer()# 将整个验证集进行评估val_loss = evaluate(transformer)# 打印每个epoch的训练损失, 验证损失, 和训练时间.print((f"Epoch: {epoch}, Train loss: {train_loss:.3f}, Val loss: {val_loss:.3f}, "f"Epoch time = {(end_time - start_time):.3f}s"))
输出效果展示
Epoch: 1, Train loss: 5.342, Val loss: 4.138, Epoch time = 653.749s Epoch: 2, Train loss: 3.799, Val loss: 3.370, Epoch time = 649.536s Epoch: 3, Train loss: 3.184, Val loss: 2.921, Epoch time = 644.899s Epoch: 4, Train loss: 2.782, Val loss: 2.642, Epoch time = 648.685s Epoch: 5, Train loss: 2.490, Val loss: 2.453, Epoch time = 650.243s Epoch: 6, Train loss: 2.256, Val loss: 2.321, Epoch time = 647.609s Epoch: 7, Train loss: 2.064, Val loss: 2.210, Epoch time = 654.674s Epoch: 8, Train loss: 1.905, Val loss: 2.132, Epoch time = 659.779s Epoch: 9, Train loss: 1.761, Val loss: 2.070, Epoch time = 652.363s Epoch: 10, Train loss: 1.637, Val loss: 2.016, Epoch time = 646.682s Epoch: 11, Train loss: 1.527, Val loss: 1.977, Epoch time = 643.913s Epoch: 12, Train loss: 1.427, Val loss: 1.970, Epoch time = 640.084s Epoch: 13, Train loss: 1.335, Val loss: 1.964, Epoch time = 639.331s Epoch: 14, Train loss: 1.253, Val loss: 1.936, Epoch time = 639.232s Epoch: 15, Train loss: 1.173, Val loss: 1.928, Epoch time = 649.990s Epoch: 16, Train loss: 1.106, Val loss: 1.909, Epoch time = 636.465s Epoch: 17, Train loss: 1.038, Val loss: 1.905, Epoch time = 644.609s Epoch: 18, Train loss: 0.976, Val loss: 1.914, Epoch time = 644.115s
注意: 这个训练的过程是4核8G内存的CPU服务器,大家可以更换为GPU服务器, 速度会更快.
第八步: 进行解码生成目标语言语句
使用贪心算法构建生成序列函数
def greedy_decode(model, src, src_mask, max_len, start_symbol):# 将对应的源语言数据移动的对应的设备上src = src.to(DEVICE)# 将对应的源语言的mask移动到对应的设备上src_mask = src_mask.to(DEVICE)# 将源语言使用模型的编码器, 得到中间语义张量 memory的形状为: [seq_len, batch_size, dim]memory = model.encode(src, src_mask)# 构建一个起始的二维矩阵, 然后准备开始句子的解码过程. ys形状为[1, 1]二维的ys = torch.ones(1, 1).fill_(start_symbol).type(torch.long).to(DEVICE)for i in range(max_len-1):# 将中间语义张量的数据一定到对应的设备上memory = memory.to(DEVICE)# 生成目标语言的mask值tgt_mask = (generate_square_subsequent_mask(ys.size(0)).type(torch.bool)).to(DEVICE)# 调用模型的解码器进行解码 out形状为:[seq_len, 1, 512]==> [seq_len, batch_size, emb_size]out = model.decode(ys, memory, tgt_mask)# 输出张量进行形状的转换out = out.transpose(0, 1)# 经过最后输出层, 获取最后的输出概率分布 out[:, -1]形状为: [1, 512] --> [seq_len, emb_size]# prob的形状为: [1, tgt_vocab_size]prob = model.generator(out[:, -1])# 在1维度上, 获取概率最大的那个就是最后预测的那个值 max返回两个值, 第一个是返回的最大值的概率, 第二个是返回最大概率的下标值._, next_word = torch.max(prob, dim=1)# 获取对应的那个下标值next_word = next_word.item()# 拼接上一步和这一步产生的单词, 作为下一步使用的ys fill_()表示用括号中的数字去填充整个矩阵ys = torch.cat([ys, torch.ones(1, 1).type_as(src.data).fill_(next_word)], dim=0)if next_word == EOS_IDX:breakreturn ys
定义最终的翻译转化函数
def translate(model: torch.nn.Module, src_sentence: str):'''model: 输入整个Transformer模型src_sentence:要翻译的语句'''# 开启模型的评估模式model.eval()# 将源语句转化为对应的张量表示 起初是一维的(seq_len, ), 后经过view(-1, 1)转化为[seq_len, 1]二维的形状.src = text_transform[SRC_LANGUAGE](src_sentence).view(-1, 1)# src.shape==> [seq_len, 1]num_tokens = src.shape[0]# 创建一个全零的矩阵作为src_mask的起始矩阵src_mask = (torch.zeros(num_tokens, num_tokens)).type(torch.bool)# 使用贪心算法进行解码tgt_tokens = greedy_decode(model, src, src_mask, max_len=num_tokens + 5, start_symbol=BOS_IDX).flatten()# 现将数据从GPU上迁移到CPU上, 然后进行tensor类型转化为numpy.ndarray类型的整数值# 使用lookup_tokens进行索引到对应字符的查找, 反转为对应的字符, 然后将句子的首尾的bos和eos替换掉, 即为解码之后的语句.return " ".join(vocab_transform[TGT_LANGUAGE].lookup_tokens(list(tgt_tokens.cpu().numpy()))).replace("<bos>", "").replace("<eos>", "")
验证
print(translate(transformer, "Eine Gruppe von Menschen steht vor einem Iglu ."))
输出效果
A group of people stand in front of an aquarium .
第九步: 模型的保存和重加载
模型的保存
path = './model/transformer_translation_18.pth' torch.save(transformer.state_dict(), path)
模型的重加载
transformer = Seq2SeqTransformer(NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, EMB_SIZE,NHEAD, SRC_VOCAB_SIZE, TGT_VOCAB_SIZE, FFN_HID_DIM) transformer.load_state_dict(torch.load(path))
"""
第一步: 导入必备的工具包
第二步: 导入Multi30k数据集并做基本处理
第三步: 构建Transformer模型
第四步: 定义mask的函数, 创建对应的不同的mask
第五步: 定义批次数据处理的回调函数
第六步: 构建训练函数和评估函数
第七步: 训练Transformer模型
第八步: 进行解码生成目标语言语句
第九步: 模型的保存和重加载
"""# 1 导入必备的工具包
import torch
import torch.nn as nn
import math
#import spacy#还没研究明白from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from torchtext.datasets import Multi30k
from typing import Iterable, List
from torch import Tensor
from torch.nn import Transformer
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader
from timeit import default_timer as timerDEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')# 2 导入Multi30k数据集并做基本处理
# 2.1 加载对应的tokenizer
SRC_LANGUAGE = 'de'# 源语言是德语
TGT_LANGUAGE = 'en'# 目标语言是英语# 定义token的字典, 定义vocab字典
token_transform = {}
vocab_transform = {}# 创建源语言和目标语言的kokenizer, 确保依赖关系已经安装
# pip install -U spacy
# python -m spacy download en_core_web_sm
# python -m spacy download de_core_news_sm
# get_tokenizer是分词函数, 如果没有特殊的则按照英语的空格分割, 如果有这按照对应的分词库返回. 比如spacy, 返回对应的分词库
token_transform[SRC_LANGUAGE] = get_tokenizer('spacy', language='de_core_news_sm')
token_transform[TGT_LANGUAGE] = get_tokenizer('spacy', language='en_core_web_sm')def yield_tokens(data_iter: Iterable, language: str) -> List[str]:# data_iter: 对象的迭代对象 Multi30k对象# language: 对应的翻译语言 {'de': 0, 'en': 1}language_index = {SRC_LANGUAGE: 0, TGT_LANGUAGE: 1}# 返回对应的数据迭代器对象for data_sample in data_iter:# data_sample:(德文, 英文)# data_sample:('Zwei junge weiße Männer sind im Freien in der Nähe vieler Büsche.\n', 'Two young, White males are outside near many bushes.\n')# token_transform['de']()=['Zwei', 'junge', 'weiße', 'Männer', 'sind', 'im', 'Freien', 'in', 'der', 'Nähe', 'vieler', 'Büsche', '.', '\n']# or token_transform['en']分别进行构造对应的字典yield token_transform[language](data_sample[language_index[language]])# 2.2 构建生成分词的迭代器
def yield_tokens(data_iter: Iterable, language: str) -> List[str]:# data_iter: 对象的迭代对象 Multi30k对象# language: 对应的翻译语言 {'de': 0, 'en': 1}language_index = {SRC_LANGUAGE: 0, TGT_LANGUAGE: 1}# 返回对应的数据迭代器对象for data_sample in data_iter:# data_sample:(德文, 英文)# data_sample:('Zwei junge weiße Männer sind im Freien in der Nähe vieler Büsche.\n', 'Two young, White males are outside near many bushes.\n')# token_transform['de']()=['Zwei', 'junge', 'weiße', 'Männer', 'sind', 'im', 'Freien', 'in', 'der', 'Nähe', 'vieler', 'Büsche', '.', '\n']# or token_transform['en']分别进行构造对应的字典yield token_transform[language](data_sample[language_index[language]])# 2.3 定义特殊字符并下载数据设置默认索引
# 定义特殊字符及其对应的索引值
UNK_IDX, PAD_IDX, BOS_IDX, EOS_IDX = 0, 1, 2, 3
# 确保标记按其索引的顺序正确插入到词汇表中
special_symbols = ['<unk>', '<pad>', '<bos>', '<eos>']for ln in [SRC_LANGUAGE, TGT_LANGUAGE]:# 训练数据集的迭代器,# 数据集是用英文描述图像的英文语句, 然后人工将其翻译为德文的语句,有两个文件, 一个是train.de 一个是train.en文件,# 然后将其构建为(德文, 英文)的形式train_iter = Multi30k(split='train', language_pair=(SRC_LANGUAGE, TGT_LANGUAGE))# 创建torchtext的vocab对象, 即词汇表vocab_transform[ln] = build_vocab_from_iterator(yield_tokens(train_iter, ln), # 用于构建 Vocab 的迭代器。必须产生令牌列表或迭代器min_freq=1,#在词汇表中包含一个标记所需的最低频率specials=special_symbols, # 用于添加的特殊字符special_first=True) # 指示是在开头还是结尾插入符号# 将 UNK_IDX 设置为默认索引。未找到令牌时返回此索引
# 如果未设置,则在 Vocabulary 中找不到查询的标记时抛出 RuntimeError
for ln in [SRC_LANGUAGE, TGT_LANGUAGE]:vocab_transform[ln].set_default_index(UNK_IDX)# 3 构建Transformer模型
# 3.1 定义位置编码器类
class PositionalEncoding(nn.Module):def __init__(self,emb_size: int,dropout: float, maxlen: int = 5000):'''emb_size: 词嵌入的维度大小dropout: 正则化的大小maxlen: 句子的最大长度'''super(PositionalEncoding, self).__init__()# 将1000的2i/d_model变型为e的指数形式den = torch.exp(- torch.arange(0, emb_size, 2)* math.log(10000) / emb_size)# 效果等价与torch.arange(0, maxlen).unsqueeze(1)pos = torch.arange(0, maxlen).reshape(maxlen, 1)# 构建一个(maxlen, emb_size)大小的全零矩阵pos_embedding = torch.zeros((maxlen, emb_size))# 偶数列是正弦函数填充pos_embedding[:, 0::2] = torch.sin(pos * den)# 奇数列是余弦函数填充pos_embedding[:, 1::2] = torch.cos(pos * den)# 将其维度变成三维, 为了后期方便计算pos_embedding = pos_embedding.unsqueeze(-2)# 添加dropout层, 防止过拟合self.dropout = nn.Dropout(dropout)'''向模块添加持久缓冲区。这通常用于注册不应被视为模型参数的缓冲区。例如,pos_embedding不是一个参数,而是持久状态的一部分。缓冲区可以使用给定的名称作为属性访问。说明:应该就是在内存中定义一个常量,同时,模型保存和加载的时候可以写入和读出'''self.register_buffer('pos_embedding', pos_embedding)def forward(self, token_embedding: Tensor):# 将token_embedding和位置编码相融合return self.dropout(token_embedding + self.pos_embedding[:token_embedding.size(0), :])# 3.2 定义词嵌入层类
class TokenEmbedding(nn.Module):def __init__(self, vocab_size: int, emb_size):'''vocab_size:词表的大小emb_size:词嵌入的维度'''super(TokenEmbedding, self).__init__()# 调用nn中的预定义层Embedding, 获取一个词嵌入对象self.embeddingself.embedding = nn.Embedding(vocab_size, emb_size)# 将emb_size传入类内, 变成类内的变量self.emb_size = emb_sizedef forward(self, tokens: Tensor):# 让 embeddings vector 在增加 之后的 postion encoing 之前相对大一些的操作,# 主要是为了让position encoding 相对的小,这样会让原来的 embedding vector 中的信息在和 position encoding 的信息相加时不至于丢失掉# 让 embeddings vector 相对大一些return self.embedding(tokens.long()) * math.sqrt(self.emb_size)# 3.3 构建Seq2SeqTransformer模型
class Seq2SeqTransformer(nn.Module):def __init__(self,num_encoder_layers, num_decoder_layers,emb_size,nhead,src_vocab_size,tgt_vocab_size,dim_feedforward = 512,dropout = 0.1):'''num_encoder_layers: 编码器的层数num_decoder_layers: 解码器的层数emb_size: 词嵌入的维度nhead: 头数src_vocab_size: 源语言的词表大小tgt_vocab_size: 目标语言的词表大小dim_feedforward: 前馈全连接层的维度dropout: 正则化的大小'''# 继承nn.Module类, 一般继承习惯行的写法super(Seq2SeqTransformer, self).__init__()# 创建Transformer对象self.transformer = Transformer(d_model=emb_size,nhead=nhead,num_encoder_layers=num_encoder_layers,num_decoder_layers=num_decoder_layers,dim_feedforward=dim_feedforward,dropout=dropout)# 创建全连接线性层self.generator = nn.Linear(emb_size, tgt_vocab_size)# 创建源语言的embedding层self.src_tok_emb = TokenEmbedding(src_vocab_size, emb_size)# 创建目标语言的embedding层self.tgt_tok_emb = TokenEmbedding(tgt_vocab_size, emb_size)# 创建位置编码器层对象self.positional_encoding = PositionalEncoding(emb_size, dropout=dropout)def forward(self,src, trg, src_mask, tgt_mask, src_padding_mask, tgt_padding_mask, memory_key_padding_mask):'''src: 源语言trg: 目标语言src_mask: 源语言掩码tgt_mask: 目标语言掩码src_padding_mask: 源语言的padding_masktgt_padding_mask: 目标语言的padding_maskmemory_key_padding_mask: 中间语义张量的padding_mask'''# 获取源语言的embedding张量融合了位置编码src_emb = self.positional_encoding(self.src_tok_emb(src))# 获取目标语言的embedding张量融合了位置编码tgt_emb = self.positional_encoding(self.tgt_tok_emb(trg))# 经过Transformer进行编解码之后输出out值outs = self.transformer(src_emb, tgt_emb, src_mask, tgt_mask, None, src_padding_mask, tgt_padding_mask, memory_key_padding_mask)# outs值经过输出层得到最后的输出分布值return self.generator(outs)# 定义Transformer的编码器def encode(self, src, src_mask):'''src:源语言src_mask:源语言掩码'''return self.transformer.encoder(self.positional_encoding(self.src_tok_emb(src)), src_mask)# 定义Transformer的解码器def decode(self, tgt, memory, tgt_mask):'''tgt:目标语言memory:中间语言张量输出tgt_mask: 目标语言的掩码'''return self.transformer.decoder(self.positional_encoding(self.tgt_tok_emb(tgt)), memory, tgt_mask)# 4 定义mask的函数, 创建对应的不同的mask
# 4.1 定义掩码
def generate_square_subsequent_mask(sz):# sz: 句子的长度# triu生成的是上三角, 经过transpose之后变成了下三角矩阵mask = (torch.triu(torch.ones((sz, sz), device=DEVICE)) == 1).transpose(0, 1)# 将0的位置填充负无穷小, 将1的位置填充为0mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))return maskdef create_mask(src, tgt):'''src: 源语言张量形状为: [seq_length , batch_size]tgt: 目标语言张量形状为: [seq_length , batch_size]'''# 获取源语言的句子长度src_seq_len = src.shape[0]# 获取目标语言的句子长度tgt_seq_len = tgt.shape[0]# 产生目标语言的掩码张量tgt_mask = generate_square_subsequent_mask(tgt_seq_len)# 产生源语言的掩码张量src_mask = torch.zeros((src_seq_len, src_seq_len),device=DEVICE).type(torch.bool)# 构建源语言的padding_mask src_padding_mask==> [batch_size, seq_len]src_padding_mask = (src == PAD_IDX).transpose(0, 1)# 构建目标语言的padding_mask tgt_paddig_mask ==> [batch_size, seq_len-1]tgt_padding_mask = (tgt == PAD_IDX).transpose(0, 1)return src_mask, tgt_mask, src_padding_mask, tgt_padding_mask# 5 定义批次数据处理的回调函数
# 5.1 将字符串转化为整数的tensor张量
# 将句子字符转化为对应的tensor张量
def sequential_transforms(*transforms):'''Transformers中会传入三个迭代器:第一个是Tokenization的,第二个是Numericalization,第三个是Add BOS/EOS and create tensor'''def func(txt_input):# 循环三个迭代器, 第一个进行语句的分割, 第二个将对应的词语映射为对应的张量表示, 第三个是在整个句子的首尾部分添加起始和结束标志.for transform in transforms:txt_input = transform(txt_input)return txt_inputreturn func# 5.2 在句子首尾添加起始和结束符号# 辅助函数, 完成句子首尾BOS/EOS的添加过程
def tensor_transform(token_ids: List[int]):# 添加的是列表形式的数据, 将BOS和EOS添加到句子的首尾部分return torch.cat((torch.tensor([BOS_IDX]),torch.tensor(token_ids),torch.tensor([EOS_IDX])))text_transform = {}
# 循环添加源语言和目标语言
for ln in [SRC_LANGUAGE, TGT_LANGUAGE]:text_transform[ln] = sequential_transforms(token_transform[ln], #Tokenizationvocab_transform[ln], #Numericalizationtensor_transform) # Add BOS/EOS and create tensor# 5.3 数据进行批次化处理# 按照批次进行源语言和目标语言的组装
def collate_fn(batch):# 定义源语言和目标语言的批次列表src_batch, tgt_batch = [], []# 循环批次样本for src_sample, tgt_sample in batch:# 添加源语言句子到列表中src_batch.append(text_transform[SRC_LANGUAGE](src_sample.rstrip("\n")))# 添加目标语言句子到列表中tgt_batch.append(text_transform[TGT_LANGUAGE](tgt_sample.rstrip("\n")))# 将源语言和目标语言进行截断补齐 PAD_IDX=1# src_batch的形状为: [seq_length, batch] seq_length是最长的句子长度src_batch = pad_sequence(src_batch, padding_value=PAD_IDX)# tgt_batch的形状为: [seq_length, batch] seq_length是最长的句子长度tgt_batch = pad_sequence(tgt_batch, padding_value=PAD_IDX)return src_batch, tgt_batch# 6 构建训练函数和评估函数¶
# 6.1 实例化模型并定义损失函数和优化器¶# 设置种子用于生成随机数,以使得结果是确定的
torch.manual_seed(0)# 设置调用时候使用的参数
SRC_VOCAB_SIZE = len(vocab_transform[SRC_LANGUAGE])
TGT_VOCAB_SIZE = len(vocab_transform[TGT_LANGUAGE])
EMB_SIZE = 512
NHEAD = 8
FFN_HID_DIM = 512
BATCH_SIZE = 128
NUM_ENCODER_LAYERS = 3
NUM_DECODER_LAYERS = 3# 实例化Transformer对象
transformer = Seq2SeqTransformer(NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, EMB_SIZE,NHEAD, SRC_VOCAB_SIZE, TGT_VOCAB_SIZE, FFN_HID_DIM)
# 为了保证每层的输入和输出的方差相同, 防止梯度消失问题
for p in transformer.parameters():if p.dim() > 1:# 此处使用的是xavier的均匀分布nn.init.xavier_uniform_(p)
# 如果有GPU则将模型移动到GPU上
transformer = transformer.to(DEVICE)
# 定义损失函数
loss_fn = torch.nn.CrossEntropyLoss(ignore_index=PAD_IDX)
# 定义优化器 betas: 用于计算梯度及其平方的运行平均值的系数 eps:添加到分母以提高数值稳定性
optimizer = torch.optim.Adam(transformer.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)# 6.2 定义批次训练函数def train_epoch(model, optimizer):# 开启训练模式model.train()# 定义其实的损失值为0losses = 0# 获取训练数据集的迭代器, 语言对为(de, en)train_iter = Multi30k(split='train', language_pair=(SRC_LANGUAGE, TGT_LANGUAGE))# 加载数据, 按照一个批次一个批次进行加载, 返回一个迭代器train_dataloader = DataLoader(train_iter, batch_size=BATCH_SIZE, collate_fn=collate_fn)# 循环数据迭代器for src, tgt in train_dataloader:# 将源语言数据移动到对应的设备上去src = src.to(DEVICE)# 将目标语言数据移动到对应设备上去tgt = tgt.to(DEVICE)# 获取输入真实的张量 第一个单词到倒数第二个单词tgt_input = tgt[:-1, :]# 调用mask函数, 生成对应的四个masksrc_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)# 调用模型进行训练, 得到最后的张量分布logits = model(src, tgt_input, src_mask, tgt_mask,src_padding_mask, tgt_padding_mask, src_padding_mask)# 梯度清零optimizer.zero_grad()# 获取输出真实的标签数据 第二个单词到最后一个单词tgt_out = tgt[1:, :]# 计算损失loss = loss_fn(logits.reshape(-1, logits.shape[-1]), tgt_out.reshape(-1))# 反向传播loss.backward()# 梯度更新optimizer.step()# 损失值累加求和losses += loss.item()# 返回平均损失值return losses / len(train_dataloader)# 6.3 定义批次评估函数def evaluate(model):# 开启模型评估模式model.eval()# 定义起始损失值losses = 0# 加载验证数据集, 语言对为(de, en)val_iter = Multi30k(split='valid', language_pair=(SRC_LANGUAGE, TGT_LANGUAGE))# 返回验证集的数据加载器val_dataloader = DataLoader(val_iter, batch_size=BATCH_SIZE, collate_fn=collate_fn)# 循环验证集for src, tgt in val_dataloader:# 源语言数据移动到对应的设备上src = src.to(DEVICE)# 目标语言数据移动到对应的设备上tgt = tgt.to(DEVICE)# 获取输入的真实的张量tgt_input = tgt[:-1, :]# 调用mask函数, 产生对应的四个mask值src_mask, tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)# 调用模型, 得到对应的输出分布值logits = model(src, tgt_input, src_mask, tgt_mask,src_padding_mask, tgt_padding_mask, src_padding_mask)# 获取输出的真实张量tgt_out = tgt[1:, :]# 计算损失值loss = loss_fn(logits.reshape(-1, logits.shape[-1]), tgt_out.reshape(-1))# 损失值累加, 求和losses += loss.item()# 求得对应的平均损失return losses / len(val_dataloader)# 7 训练Transformer模型
# 7.1 利用循环训练Transformer模型
# 定义epoch的次数
NUM_EPOCHS = 3# 循环整个数据集num_epochs次
for epoch in range(1, NUM_EPOCHS+1):# 获取开始时间start_time = timer()# 将整个训练数据集进行训练train_loss = train_epoch(transformer, optimizer)# 获取结束时间end_time = timer()# 将整个验证集进行评估val_loss = evaluate(transformer)# 打印每个epoch的训练损失, 验证损失, 和训练时间.print((f"Epoch: {epoch}, Train loss: {train_loss:.3f}, Val loss: {val_loss:.3f}, "f"Epoch time = {(end_time - start_time):.3f}s"))# 8 进行解码生成目标语言语句
# 8.1 使用贪心算法构建生成序列函数def greedy_decode(model, src, src_mask, max_len, start_symbol):# 将对应的源语言数据移动的对应的设备上src = src.to(DEVICE)# 将对应的源语言的mask移动到对应的设备上src_mask = src_mask.to(DEVICE)# 将源语言使用模型的编码器, 得到中间语义张量 memory的形状为: [seq_len, batch_size, dim]memory = model.encode(src, src_mask)# 构建一个起始的二维矩阵, 然后准备开始句子的解码过程. ys形状为[1, 1]二维的ys = torch.ones(1, 1).fill_(start_symbol).type(torch.long).to(DEVICE)for i in range(max_len-1):# 将中间语义张量的数据一定到对应的设备上memory = memory.to(DEVICE)# 生成目标语言的mask值tgt_mask = (generate_square_subsequent_mask(ys.size(0)).type(torch.bool)).to(DEVICE)# 调用模型的解码器进行解码 out形状为:[seq_len, 1, 512]==> [seq_len, batch_size, emb_size]out = model.decode(ys, memory, tgt_mask)# 输出张量进行形状的转换out = out.transpose(0, 1)# 经过最后输出层, 获取最后的输出概率分布 out[:, -1]形状为: [1, 512] --> [seq_len, emb_size]# prob的形状为: [1, tgt_vocab_size]prob = model.generator(out[:, -1])# 在1维度上, 获取概率最大的那个就是最后预测的那个值 max返回两个值, 第一个是返回的最大值的概率, 第二个是返回最大概率的下标值._, next_word = torch.max(prob, dim=1)# 获取对应的那个下标值next_word = next_word.item()# 拼接上一步和这一步产生的单词, 作为下一步使用的ys fill_()表示用括号中的数字去填充整个矩阵ys = torch.cat([ys, torch.ones(1, 1).type_as(src.data).fill_(next_word)], dim=0)if next_word == EOS_IDX:breakreturn ys# 8.2 定义最终的翻译转化函数¶def translate(model: torch.nn.Module, src_sentence: str):'''model: 输入整个Transformer模型src_sentence:要翻译的语句'''# 开启模型的评估模式model.eval()# 将源语句转化为对应的张量表示 起初是一维的(seq_len, ), 后经过view(-1, 1)转化为[seq_len, 1]二维的形状.src = text_transform[SRC_LANGUAGE](src_sentence).view(-1, 1)# src.shape==> [seq_len, 1]num_tokens = src.shape[0]# 创建一个全零的矩阵作为src_mask的起始矩阵src_mask = (torch.zeros(num_tokens, num_tokens)).type(torch.bool)# 使用贪心算法进行解码tgt_tokens = greedy_decode(model, src, src_mask, max_len=num_tokens + 5, start_symbol=BOS_IDX).flatten()# 现将数据从GPU上迁移到CPU上, 然后进行tensor类型转化为numpy.ndarray类型的整数值# 使用lookup_tokens进行索引到对应字符的查找, 反转为对应的字符, 然后将句子的首尾的bos和eos替换掉, 即为解码之后的语句.return " ".join(vocab_transform[TGT_LANGUAGE].lookup_tokens(list(tgt_tokens.cpu().numpy()))).replace("<bos>", "").replace("<eos>", "")# 验证print(translate(transformer, "Eine Gruppe von Menschen steht vor einem Iglu ."))# 9 模型的保存和重加载¶
# 9.1 模型的保存¶path = './model/transformer_translation_18.pth'
torch.save(transformer.state_dict(), path)
print("模型保存完毕")
# 9.2 模型的重加载¶transformer = Seq2SeqTransformer(NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, EMB_SIZE,NHEAD, SRC_VOCAB_SIZE, TGT_VOCAB_SIZE, FFN_HID_DIM)
transformer.load_state_dict(torch.load(path))
print("模型保存完毕")# 验证
print(translate(transformer, "Eine Gruppe von Menschen steht vor einem Iglu ."))