|
import torch |
|
import torch.nn as nn |
|
import torch.optim as optim |
|
from torch.utils.data import Dataset, DataLoader |
|
from torch.nn.utils.rnn import pad_sequence |
|
import nltk |
|
from nltk.tokenize import word_tokenize |
|
from collections import Counter |
|
import numpy as np |
|
|
|
|
|
nltk.download('punkt') |
|
|
|
|
|
def load_text_data(file_path): |
|
with open(file_path, 'r') as file: |
|
data = file.readlines() |
|
return [line.strip() for line in data] |
|
|
|
|
|
file_path = 'data.txt' |
|
sentences = load_text_data(file_path) |
|
|
|
|
|
def tokenize(text): |
|
return word_tokenize(text.lower()) |
|
|
|
|
|
def build_vocab(sentences): |
|
tokens = [token for sentence in sentences for token in tokenize(sentence)] |
|
vocab = {word: i for i, (word, _) in enumerate(Counter(tokens).items())} |
|
vocab['<unk>'] = len(vocab) |
|
vocab['<pad>'] = len(vocab) |
|
return vocab |
|
|
|
vocab = build_vocab(sentences) |
|
vocab_size = len(vocab) |
|
print(f"Vocabulary size: {vocab_size}") |
|
|
|
|
|
class TextDataset(Dataset): |
|
def __init__(self, sentences, vocab, seq_length = 50): |
|
self.data = [] |
|
self.vocab = vocab |
|
self.seq_length = seq_length |
|
for sentence in sentences: |
|
tokens = tokenize(sentence) |
|
indices = [vocab.get(token, vocab['<unk>']) for token in tokens] |
|
for i in range(len(indices) - seq_length): |
|
self.data.append((indices[i:i+seq_length], indices[i+1:i+seq_length+1])) |
|
|
|
def __len__(self): |
|
return len(self.data) |
|
|
|
def __getitem__(self, idx): |
|
inputs, targets = self.data[idx] |
|
return torch.tensor(inputs, dtype=torch.long), torch.tensor(targets, dtype=torch.long) |
|
|
|
dataset = TextDataset(sentences, vocab, seq_length = 50) |
|
dataloader = DataLoader(dataset, batch_size=4, shuffle=True, collate_fn=lambda x: ( |
|
pad_sequence([i[0] for i in x], batch_first=True), |
|
pad_sequence([i[1] for i in x], batch_first=True) |
|
)) |
|
|
|
|
|
class LSTMModel(nn.Module): |
|
def __init__(self, vocab_size, embed_size, hidden_size, num_layers=1): |
|
super(LSTMModel, self).__init__() |
|
self.embedding = nn.Embedding(vocab_size, embed_size) |
|
self.lstm = nn.LSTM(embed_size, hidden_size, num_layers=num_layers, dropout=0, batch_first=True) |
|
self.fc = nn.Linear(hidden_size, vocab_size) |
|
|
|
def forward(self, x): |
|
x = self.embedding(x) |
|
lstm_out, _ = self.lstm(x) |
|
out = self.fc(lstm_out) |
|
return out |
|
|
|
|
|
embed_size = 10 |
|
hidden_size = 100 |
|
model = LSTMModel(vocab_size, embed_size, hidden_size) |
|
criterion = nn.CrossEntropyLoss() |
|
optimizer = optim.AdamW(model.parameters(), lr=0.01) |
|
|
|
|
|
num_epochs = 5 |
|
for epoch in range(num_epochs): |
|
for inputs, targets in dataloader: |
|
optimizer.zero_grad() |
|
outputs = model(inputs) |
|
loss = criterion(outputs.view(-1, vocab_size), targets.view(-1)) |
|
loss.backward() |
|
optimizer.step() |
|
print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}') |
|
|
|
|
|
def beam_search_with_top_k_sampling(model, start_text, vocab, tokenizer, beam_width=1, top_k=2, max_length=50): |
|
model.eval() |
|
indices = [vocab.get(token, vocab['<unk>']) for token in tokenize(start_text)] |
|
input_seq = torch.tensor(indices).unsqueeze(0) |
|
beams = [(input_seq, start_text, 0)] |
|
|
|
for _ in range(max_length): |
|
new_beams = [] |
|
for beam in beams: |
|
seq, text, score = beam |
|
with torch.no_grad(): |
|
outputs = model(seq) |
|
next_word_probs = outputs[:, -1, :] |
|
topk_probs, topk_indices = torch.topk(next_word_probs, top_k, dim=1) |
|
|
|
|
|
probabilities = torch.softmax(topk_probs, dim=1).squeeze().cpu().numpy() |
|
for i in range(top_k): |
|
next_index = np.random.choice(topk_indices[0].cpu().numpy(), p=probabilities) |
|
next_word = [word for word, index in vocab.items() if index == next_index][0] |
|
new_seq = torch.cat([seq, torch.tensor([[next_index]])], dim=1) |
|
new_score = score + np.log(probabilities[i]) |
|
new_beams.append((new_seq, text + ' ' + next_word, new_score)) |
|
|
|
|
|
beams = sorted(new_beams, key=lambda x: x[2], reverse=True)[:beam_width] |
|
|
|
return beams[-1][1] |
|
|
|
|
|
start_text = "A Millennium Fulcrum Edition produced" |
|
generated_text = beam_search_with_top_k_sampling(model, start_text, vocab, tokenize) |
|
print(f"Generated text: {generated_text}") |
|
|