使用Attention机制改进Python中的机器翻译模型
Attention机制是一种用于改进机器翻译模型的技术,它可以帮助模型更好地理解输入句子中的重要信息并生成更准确的翻译结果。在Python中,我们可以使用一些开源库来实现带Attention机制的机器翻译模型,如Tensorflow和PyTorch。
为了演示如何使用Attention机制改进机器翻译模型,我们将以英文到法文的翻译任务为例。首先,我们需要准备一个包含大量英法句对的数据集,以用于训练和评估我们的模型。
### 数据预处理
我们首先需要对数据集进行预处理,包括分词和构建词汇表。我们可以使用NLTK库进行英文分词,并使用Spacy库进行法文分词。
import nltk
import spacy
nltk.download('punkt')
!python -m spacy download fr_core_news_sm
en_tokenizer = nltk.data.load('tokenizers/punkt/english.pickle')
fr_tokenizer = spacy.load('fr_core_news_sm')
def tokenize_en(text):
return en_tokenizer.tokenize(text)
def tokenize_fr(text):
return [tok.text for tok in fr_tokenizer.tokenizer(text)]
接下来,我们可以使用这些分词函数对数据集进行预处理。我们还可以使用torchtext库来加载和处理数据集。
from torchtext.data import Field, TabularDataset
EN = Field(tokenize=tokenize_en, lower=True, init_token='<sos>', eos_token='<eos>')
FR = Field(tokenize=tokenize_fr, lower=True, init_token='<sos>', eos_token='<eos>')
data_fields = [('en', EN), ('fr', FR)]
train_data, valid_data, test_data = TabularDataset.splits(
path='data_path',
train='train.csv',
validation='valid.csv',
test='test.csv',
format='csv',
fields=data_fields
)
EN.build_vocab(train_data, min_freq=2)
FR.build_vocab(train_data, min_freq=2)
train_iterator, valid_iterator, test_iterator = BucketIterator.splits(
(train_data, valid_data, test_data),
batch_size=32,
device='cuda'
)
### 模型构建
接下来,我们可以定义一个带Attention机制的翻译模型。我们使用的是一个基于RNN(循环神经网络)的编码器-解码器架构,其中解码器的每个时间步都会根据编码器的输出和注意力权重来生成下一个单词。
import torch
import torch.nn as nn
import torch.nn.functional as F
class Encoder(nn.Module):
def __init__(self, input_dim, emb_dim, hid_dim):
super().__init__()
self.embedding = nn.Embedding(input_dim, emb_dim)
self.rnn = nn.GRU(emb_dim, hid_dim, bidirectional=True)
self.fc = nn.Linear(hid_dim * 2, hid_dim)
def forward(self, src):
embedded = self.embedding(src)
outputs, hidden = self.rnn(embedded)
hidden = torch.tanh(self.fc(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1)))
return outputs, hidden
class Attention(nn.Module):
def __init__(self, enc_hid_dim, dec_hid_dim):
super().__init__()
self.enc_hid_dim = enc_hid_dim
self.dec_hid_dim = dec_hid_dim
self.attn = nn.Linear((enc_hid_dim * 2) + dec_hid_dim, dec_hid_dim)
self.v = nn.Linear(dec_hid_dim, 1, bias=False)
def forward(self, hidden, encoder_outputs):
src_len = encoder_outputs.shape[0]
hidden = hidden.unsqueeze(1).repeat(1, src_len, 1)
encoder_outputs = encoder_outputs.permute(1, 0, 2)
energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2)))
attention = self.v(energy).squeeze(2)
return F.softmax(attention, dim=1)
class Decoder(nn.Module):
def __init__(self, output_dim, emb_dim, enc_hid_dim, dec_hid_dim):
super().__init__()
self.output_dim = output_dim
self.embedding = nn.Embedding(output_dim, emb_dim)
self.rnn = nn.GRU((enc_hid_dim * 2) + emb_dim, dec_hid_dim)
self.attention = Attention(enc_hid_dim, dec_hid_dim)
self.fc = nn.Linear((enc_hid_dim * 2) + dec_hid_dim + emb_dim, output_dim)
def forward(self, input, hidden, encoder_outputs):
input = input.unsqueeze(0)
embedded = self.embedding(input)
a = self.attention(hidden, encoder_outputs)
a = a.unsqueeze(1)
encoder_outputs = encoder_outputs.permute(1, 0, 2)
weighted = torch.bmm(a, encoder_outputs)
weighted = weighted.permute(1, 0, 2)
rnn_input = torch.cat((embedded, weighted), dim=2)
output, hidden = self.rnn(rnn_input, hidden.unsqueeze(0))
embedded = embedded.squeeze(0)
output = output.squeeze(0)
weighted = weighted.squeeze(0)
prediction = self.fc(torch.cat((output, weighted, embedded), dim=1))
return prediction, hidden.squeeze(0)
class Seq2Seq(nn.Module):
def __init__(self, encoder, decoder, device):
super().__init__()
self.encoder = encoder
self.decoder = decoder
self.device = device
def forward(self, src, trg, teacher_forcing_ratio=0.5):
batch_size = trg.shape[1]
trg_len = trg.shape[0]
trg_vocab_size = self.decoder.output_dim
outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)
encoder_outputs, hidden = self.encoder(src)
input = trg[0,:]
for t in range(1, trg_len):
output, hidden = self.decoder(input, hidden, encoder_outputs)
outputs[t] = output
teacher_force = random.random() < teacher_forcing_ratio
top1 = output.argmax(1)
input = trg[t] if teacher_force else top1
return outputs
### 模型训练和评估
定义完模型之后,我们可以开始训练和评估模型。我们使用交叉熵损失函数和Adam优化器来训练我们的模型。
python
import torch.optim as optim
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
INPUT_DIM = len(EN.vocab)
OUTPUT_DIM = len(FR.vocab)
EMB_DIM = 256
ENC_HID_DIM = 512
DEC_HID_DIM = 512
enc = Encoder(INPUT_DIM, EMB_DIM, ENC_HID_DIM).to(device)
dec = Decoder(OUTPUT_DIM, EMB_DIM, ENC_HID_DIM, DEC_HID_DIM).to(device)
model = Seq2Seq(enc, dec, device).to(device)
optimizer = optim.Adam(model.parameters())
def train(model, iterator, optimizer, criterion, clip):
model.train()
epoch_loss = 0
for i, batch in enumerate(iterator):
src = batch.en
trg = batch.fr
optimizer.zero_grad()
output = model(src, trg)
output_dim = output.shape[-1]
output = output[1:].view(-1, output_dim)
trg = trg[1:].view(-1)
loss = criterion(output, trg)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
optimizer.step()
epoch_loss += loss.item()
return epoch_loss / len(iterator)
def evaluate(model, iterator, criterion):
model.eval()
epoch_loss = 0
with torch.no_grad():
for i, batch in enumerate(iterator):
src = batch.en
trg = batch.fr
output = model(src, trg, 0)
output_dim = output.shape[-1]
output = output[1:].view(-1, output_dim)
trg = trg[1:].view(-1)
loss = criterion(output, trg)
epoch_loss += loss.item()
return epoch_loss / len(iterator)
N_EPOCHS = 10
CLIP = 1
criterion = nn.CrossEntropyLoss(ignore_index=FR.vocab.stoi['<pad>'])
best_valid_loss = float('inf')
for epoch in range(N_EPOCHS):
train_loss = train(model, train_iterator, optimizer, criterion, CLIP)
valid_loss = evaluate(model, valid_iterator, criterion)
if valid_loss < best_valid_loss:
best_valid_loss = valid_loss
torch.save(model.state_dict(), 'model.pt')
`
