我们需要从数学上证明我们的argument/thesis就是正确的。把握也就是100%。
所以,设一个最小的网络开始,根据定义。
Decoder-only
(我没仔细检查,请自行斟酌ai生成的准确性。)反正,思想就是每个组件单层结构
import torch
import torch.nn as nn
class MinimalDecoderOnlyTransformer(nn.Module):
def __init__(self, vocab_size, d_model=8, nhead=2, num_layers=1):
super().__init__()
self.embedding = nn.Embedding(vocab_size, d_model)
self.pos_encoding = nn.Parameter(torch.randn(1, 100, d_model))
decoder_layer = nn.TransformerDecoderLayer(d_model=d_model, nhead=nhead, dim_feedforward=d_model, batch_first=True)
self.transformer = nn.TransformerDecoder(decoder_layer, num_layers=num_layers)
self.fc = nn.Linear(d_model, vocab_size)
def forward(self, x):
x = self.embedding(x) + self.pos_encoding[:, :x.size(1), :]
mask = nn.Transformer.generate_square_subsequent_mask(x.size(1)).to(x.device)
output = self.transformer(x, x, tgt_mask=mask)
return self.fc(output)
# Example usage
vocab_size = 1000
model = MinimalDecoderOnlyTransformer(vocab_size)
input_seq = torch.randint(0, vocab_size, (1, 10))
output = model(input_seq)
print(output.shape) # Should be (1, 10, 1000)
Seq2Seq (Transformer实现的)
其实RNN/LSTM也是可以的。
Encoder + Decoder
import torch
import torch.nn as nn
import torch.optim as optim
# Define vocabulary
vocab = ['<pad>', '<sos>', '<eos>', 'a', 'girl', 'is', 'walking', 'in', 'the', 'park', 'she', 'sees', 'colorful', 'butterfly']
word2idx = {word: idx for idx, word in enumerate(vocab)}
idx2word = {idx: word for word, idx in word2idx.items()}
# Define constants
INPUT_DIM = len(vocab)
OUTPUT_DIM = len(vocab)
EMB_DIM = 8 # Embedding dimension
HID_DIM = 16 # Hidden dimension
N_LAYERS = 1 # Number of layers
BATCH_SIZE = 1
# Define the Encoder
class Encoder(nn.Module):
def __init__(self, input_dim, emb_dim, hid_dim, n_layers):
super(Encoder, self).__init__()
self.embedding = nn.Embedding(input_dim, emb_dim)
self.rnn = nn.RNN(emb_dim, hid_dim, n_layers)
def forward(self, src):
embedded = self.embedding(src)
outputs, hidden = self.rnn(embedded)
return hidden
# Define the Decoder
class Decoder(nn.Module):
def __init__(self, output_dim, emb_dim, hid_dim, n_layers):
super(Decoder, self).__init__()
self.embedding = nn.Embedding(output_dim, emb_dim)
self.rnn = nn.RNN(emb_dim, hid_dim, n_layers)
self.fc_out = nn.Linear(hid_dim, output_dim)
def forward(self, input, hidden):
input = input.unsqueeze(0)
embedded = self.embedding(input)
output, hidden = self.rnn(embedded, hidden)
prediction = self.fc_out(output.squeeze(0))
return prediction, hidden
# Define the Seq2Seq model
class Seq2Seq(nn.Module):
def __init__(self, encoder, decoder):
super(Seq2Seq, self).__init__()
self.encoder = encoder
self.decoder = decoder
def forward(self, src, trg, teacher_forcing_ratio=0.5):
trg_len = trg.shape[0]
trg_vocab_size = self.decoder.fc_out.out_features
outputs = torch.zeros(trg_len, trg_vocab_size)
hidden = self.encoder(src)
input = trg[0]
for t in range(1, trg_len):
output, hidden = self.decoder(input, hidden)
outputs[t] = output
teacher_force = random.random() < teacher_forcing_ratio
top1 = output.argmax(1)
input = trg[t] if teacher_force else top1
return outputs
# Instantiate the model
enc = Encoder(INPUT_DIM, EMB_DIM, HID_DIM, N_LAYERS)
dec = Decoder(OUTPUT_DIM, EMB_DIM, HID_DIM, N_LAYERS)
model = Seq2Seq(enc, dec)
# Define loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())
# Prepare data (simple example)
src_sentence = ['<sos>', 'a', 'girl', 'is', 'walking', 'in', 'the', 'park', '<eos>']
trg_sentence = ['<sos>', 'she', 'sees', 'a', 'colorful', 'butterfly', '<eos>']
src_indexes = [word2idx[word] for word in src_sentence]
trg_indexes = [word2idx[word] for word in trg_sentence]
src_tensor = torch.LongTensor(src_indexes).unsqueeze(1)
trg_tensor = torch.LongTensor(trg_indexes).unsqueeze(1)
# Training loop (minimal example)
num_epochs = 1000
for epoch in range(num_epochs):
optimizer.zero_grad()
output = model(src_tensor, trg_tensor)
output_dim = output.shape[-1]
output = output[1:].view(-1, output_dim)
trg = trg_tensor[1:].view(-1)
loss = criterion(output, trg)
loss.backward()
optimizer.step()
if epoch % 100 == 0:
print(f'Epoch: {epoch}, Loss: {loss.item()}')
# Inference (simple example)
model.eval()
with torch.no_grad():
src_tensor = torch.LongTensor(src_indexes).unsqueeze(1)
hidden = model.encoder(src_tensor)
input = torch.LongTensor([word2idx['<sos>']])
output_sentence = []
for _ in range(10):
output, hidden = model.decoder(input, hidden)
top1 = output.argmax(1).item()
if idx2word[top1] == '<eos>':
break
output_sentence.append(idx2word[top1])
input = torch.LongTensor([top1])
print('Generated sentence:', ' '.join(output_sentence))
写一个RNN基础的seq2seq:
import torch
import torch.nn as nn
import torch.optim as optim
# Define vocabulary
vocab = list(set('<sos> <eos> a girl is walking in the park she sees colorful butterfly'.split()))
word2idx = {word: idx for idx, word in enumerate(vocab)}
idx2word = {idx: word for word, idx in word2idx.items()}
# Convert sentences to indices
src_sentence = ['<sos>', 'a', 'girl', 'is', 'walking', 'in', 'the', 'park', '<eos>']
trg_sentence = ['<sos>', 'she', 'sees', 'a', 'colorful', 'butterfly', '<eos>']
src_indices = torch.tensor([word2idx[word] for word in src_sentence]).unsqueeze(0)
trg_indices = torch.tensor([word2idx[word] for word in trg_sentence]).unsqueeze(0)
# Define the seq2seq model
class Seq2Seq(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super(Seq2Seq, self).__init__()
self.hidden_size = hidden_size
self.embedding = nn.Embedding(input_size, hidden_size)
self.rnn = nn.RNN(hidden_size, hidden_size, batch_first=True)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, src, trg):
embedded = self.embedding(src)
_, hidden = self.rnn(embedded)
outputs = []
for i in range(trg.size(1)):
output = self.fc(hidden.squeeze(0))
outputs.append(output)
trg_embed = self.embedding(trg[:, i].unsqueeze(1))
_, hidden = self.rnn(trg_embed, hidden)
return torch.stack(outputs).transpose(0, 1)
# Initialize the model
input_size = len(vocab)
hidden_size = 64
output_size = len(vocab)
model = Seq2Seq(input_size, hidden_size, output_size)
# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters())
# Training loop
num_epochs = 1000
for epoch in range(num_epochs):
optimizer.zero_grad()
output = model(src_indices, trg_indices[:, :-1])
loss = criterion(output.reshape(-1, output_size), trg_indices[:, 1:].reshape(-1))
loss.backward()
optimizer.step()
if (epoch + 1) % 100 == 0:
print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')
print("Training complete!")
LSTM的Seq2Seq:
class Seq2Seq(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super(Seq2Seq, self).__init__()
self.hidden_size = hidden_size
self.embedding = nn.Embedding(input_size, hidden_size)
self.lstm = nn.LSTM(hidden_size, hidden_size, batch_first=True)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, src, trg):
embedded = self.embedding(src)
_, (hidden, cell) = self.lstm(embedded)
outputs = []
for i in range(trg.size(1)):
output = self.fc(hidden.squeeze(0))
outputs.append(output)
trg_embed = self.embedding(trg[:, i].unsqueeze(1))
_, (hidden, cell) = self.lstm(trg_embed, (hidden, cell))
return torch.stack(outputs).transpose(0, 1)