欢迎访问宙启技术站
智能推送

使用Attention机制改进Python中的机器翻译模型

发布时间:2023-12-19 05:30:46

Attention机制是一种用于改进机器翻译模型的技术,它可以帮助模型更好地理解输入句子中的重要信息并生成更准确的翻译结果。在Python中,我们可以使用一些开源库来实现带Attention机制的机器翻译模型,如Tensorflow和PyTorch。

为了演示如何使用Attention机制改进机器翻译模型,我们将以英文到法文的翻译任务为例。首先,我们需要准备一个包含大量英法句对的数据集,以用于训练和评估我们的模型。

### 数据预处理

我们首先需要对数据集进行预处理,包括分词和构建词汇表。我们可以使用NLTK库进行英文分词,并使用Spacy库进行法文分词。

import nltk
import spacy

nltk.download('punkt')
!python -m spacy download fr_core_news_sm

en_tokenizer = nltk.data.load('tokenizers/punkt/english.pickle')
fr_tokenizer = spacy.load('fr_core_news_sm')

def tokenize_en(text):
    return en_tokenizer.tokenize(text)

def tokenize_fr(text):
    return [tok.text for tok in fr_tokenizer.tokenizer(text)]

接下来,我们可以使用这些分词函数对数据集进行预处理。我们还可以使用torchtext库来加载和处理数据集。

from torchtext.data import Field, TabularDataset

EN = Field(tokenize=tokenize_en, lower=True, init_token='<sos>', eos_token='<eos>')
FR = Field(tokenize=tokenize_fr, lower=True, init_token='<sos>', eos_token='<eos>')

data_fields = [('en', EN), ('fr', FR)]
train_data, valid_data, test_data = TabularDataset.splits(
    path='data_path',
    train='train.csv',
    validation='valid.csv',
    test='test.csv',
    format='csv',
    fields=data_fields
)

EN.build_vocab(train_data, min_freq=2)
FR.build_vocab(train_data, min_freq=2)

train_iterator, valid_iterator, test_iterator = BucketIterator.splits(
    (train_data, valid_data, test_data),
    batch_size=32,
    device='cuda'
)

### 模型构建

接下来,我们可以定义一个带Attention机制的翻译模型。我们使用的是一个基于RNN(循环神经网络)的编码器-解码器架构,其中解码器的每个时间步都会根据编码器的输出和注意力权重来生成下一个单词。

import torch
import torch.nn as nn
import torch.nn.functional as F

class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hid_dim):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.rnn = nn.GRU(emb_dim, hid_dim, bidirectional=True)
        self.fc = nn.Linear(hid_dim * 2, hid_dim)

    def forward(self, src):
        embedded = self.embedding(src)
        outputs, hidden = self.rnn(embedded)
        hidden = torch.tanh(self.fc(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1)))
        return outputs, hidden

class Attention(nn.Module):
    def __init__(self, enc_hid_dim, dec_hid_dim):
        super().__init__()
        self.enc_hid_dim = enc_hid_dim
        self.dec_hid_dim = dec_hid_dim
        self.attn = nn.Linear((enc_hid_dim * 2) + dec_hid_dim, dec_hid_dim)
        self.v = nn.Linear(dec_hid_dim, 1, bias=False)

    def forward(self, hidden, encoder_outputs):
        src_len = encoder_outputs.shape[0]
        hidden = hidden.unsqueeze(1).repeat(1, src_len, 1)
        encoder_outputs = encoder_outputs.permute(1, 0, 2)
        energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2)))
        attention = self.v(energy).squeeze(2)
        return F.softmax(attention, dim=1)

class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, enc_hid_dim, dec_hid_dim):
        super().__init__()
        self.output_dim = output_dim
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.rnn = nn.GRU((enc_hid_dim * 2) + emb_dim, dec_hid_dim)
        self.attention = Attention(enc_hid_dim, dec_hid_dim)
        self.fc = nn.Linear((enc_hid_dim * 2) + dec_hid_dim + emb_dim, output_dim)

    def forward(self, input, hidden, encoder_outputs):
        input = input.unsqueeze(0)
        embedded = self.embedding(input)
        a = self.attention(hidden, encoder_outputs)
        a = a.unsqueeze(1)
        encoder_outputs = encoder_outputs.permute(1, 0, 2)
        weighted = torch.bmm(a, encoder_outputs)
        weighted = weighted.permute(1, 0, 2)
        rnn_input = torch.cat((embedded, weighted), dim=2)
        output, hidden = self.rnn(rnn_input, hidden.unsqueeze(0))
        embedded = embedded.squeeze(0)
        output = output.squeeze(0)
        weighted = weighted.squeeze(0)
        prediction = self.fc(torch.cat((output, weighted, embedded), dim=1))
        return prediction, hidden.squeeze(0)

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        batch_size = trg.shape[1]
        trg_len = trg.shape[0]
        trg_vocab_size = self.decoder.output_dim
        outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)
        encoder_outputs, hidden = self.encoder(src)
        input = trg[0,:]
        for t in range(1, trg_len):
            output, hidden = self.decoder(input, hidden, encoder_outputs)
            outputs[t] = output
            teacher_force = random.random() < teacher_forcing_ratio
            top1 = output.argmax(1)
            input = trg[t] if teacher_force else top1
        return outputs

### 模型训练和评估

定义完模型之后,我们可以开始训练和评估模型。我们使用交叉熵损失函数和Adam优化器来训练我们的模型。

python

import torch.optim as optim

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

INPUT_DIM = len(EN.vocab)

OUTPUT_DIM = len(FR.vocab)

EMB_DIM = 256

ENC_HID_DIM = 512

DEC_HID_DIM = 512

enc = Encoder(INPUT_DIM, EMB_DIM, ENC_HID_DIM).to(device)

dec = Decoder(OUTPUT_DIM, EMB_DIM, ENC_HID_DIM, DEC_HID_DIM).to(device)

model = Seq2Seq(enc, dec, device).to(device)

optimizer = optim.Adam(model.parameters())

def train(model, iterator, optimizer, criterion, clip):

model.train()

epoch_loss = 0

for i, batch in enumerate(iterator):

src = batch.en

trg = batch.fr

optimizer.zero_grad()

output = model(src, trg)

output_dim = output.shape[-1]

output = output[1:].view(-1, output_dim)

trg = trg[1:].view(-1)

loss = criterion(output, trg)

loss.backward()

torch.nn.utils.clip_grad_norm_(model.parameters(), clip)

optimizer.step()

epoch_loss += loss.item()

return epoch_loss / len(iterator)

def evaluate(model, iterator, criterion):

model.eval()

epoch_loss = 0

with torch.no_grad():

for i, batch in enumerate(iterator):

src = batch.en

trg = batch.fr

output = model(src, trg, 0)

output_dim = output.shape[-1]

output = output[1:].view(-1, output_dim)

trg = trg[1:].view(-1)

loss = criterion(output, trg)

epoch_loss += loss.item()

return epoch_loss / len(iterator)

N_EPOCHS = 10

CLIP = 1

criterion = nn.CrossEntropyLoss(ignore_index=FR.vocab.stoi['<pad>'])

best_valid_loss = float('inf')

for epoch in range(N_EPOCHS):

train_loss = train(model, train_iterator, optimizer, criterion, CLIP)

valid_loss = evaluate(model, valid_iterator, criterion)

if valid_loss < best_valid_loss:

best_valid_loss = valid_loss

torch.save(model.state_dict(), 'model.pt')

`