Занятие 5
Лабораторная 5¶
P.S Лабораторная будет похожа на предыдущую пока мы не дойдём до блока Transformer
Сегодня будем решать следующую задачу:
Дан текстовый корпус
$$
\mathcal{D} = \{ s_1, s_2, \dots, s_N \},
$$
где каждое $ s_i $ — последовательность слов из словаря $ V $.
Требуется:
Построить word-level языковую модель на основе RNN-архитектуры, которая аппроксимирует совместное распределение слов в предложении:
$$ P(w_1, w_2, \dots, w_T) = \prod_{t=1}^{T} P(w_t \mid w_1, \dots, w_{t-1}) $$Реализовать несколько вариантов модели, отличающихся:
- типом рекуррентного блока (RNN, GRU, LSTM),
- глубиной (
num_layers∈ {1, 2}), - размером скрытого состояния,
- наличием регуляризации (dropout),
- алгоритмом оптимизации (SGD, Adam).
Для каждой конфигурации вычислить перплексию на тестовом подмножестве корпуса:
$$ \text{Perplexity} = \exp\left( -\frac{1}{N} \sum_{i=1}^{N} \log P(w_i \mid w_1, \dots, w_{i-1}) \right) $$
1. Библиотеки¶
from datasets import load_dataset
from collections import Counter
import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader
import random
import math
import pandas as pd
from tabulate import tabulate
import re
import time
from tqdm import tqdm
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Используется устройство: {device}")
2. Подготовка датасета¶
Сегодня будем работать с датасетом IMDB
Датасет состоит из 50 000 отзывов (25k обучающих + 25k тестовых: 50% положительных, 50% отрицательных).
# Загружаем IMDB
dataset = load_dataset('imdb')
# Простая токенизация
def tokenize(text):
return re.findall(r"\b\w+\b", text.lower())
# Собираем словарь
all_words = []
for example in dataset['train']:
all_words.extend(tokenize(example['text']))
vocab_size = 10000
counter = Counter(all_words)
special_tokens = ['<pad>', '<unk>', '<bos>', '<eos>']
most_common = [word for word, _ in counter.most_common(vocab_size)]
vocab = special_tokens + most_common
word2idx = {word: idx for idx, word in enumerate(vocab)}
idx2word = {idx: word for word, idx in word2idx.items()}
print(f"Размер словаря: {len(vocab)}")
"""Данная функция преобразует текстовую строку в последовательность числовых индексов,
пригодную для подачи в нейронную сеть (например, RNN).
"""
def text_to_indices(text):
tokens = tokenize(text)
# добавление начала предложения
indices = [word2idx['<bos>']]
for token in tokens:
indices.append(word2idx.get(token, word2idx['<unk>']))
# добавление конца предложения
indices.append(word2idx['<eos>'])
return torch.tensor(indices, dtype=torch.long)
"""Данная функция подготавливает подмножество данных (раздел датасета)
для обучения или тестирования языковой модели."""
def prepare_split(split_name, max_samples=5000):
data = []
for i, example in enumerate(dataset[split_name]):
if i >= max_samples:
break
data.append(text_to_indices(example['text']))
return data
train_data = prepare_split('train', max_samples=8000)
test_data = prepare_split('test', max_samples=2000)
train_data[0]
3. Обучение модели¶
class PositionalEncoding(nn.Module):
def __init__(self, d_model, dropout=0.1, max_len=5000):
super().__init__()
self.dropout = nn.Dropout(p=dropout)
# Создаем матрицу позиционных кодировок формы (max_len, d_model)
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
# Добавляем измерение batch: (1, max_len, d_model)
pe = pe.unsqueeze(0)
self.register_buffer('pe', pe)
def forward(self, x):
# x: (seq_len, batch, d_model)
# self.pe: (1, max_len, d_model)
# Берём первые seq_len позиций и транспонируем для совпадения размерностей
x = x + self.pe[:, :x.size(0), :].transpose(0, 1)
return self.dropout(x)
class TransformerLM(nn.Module):
def __init__(self, vocab_size, d_model=200, nhead=8, num_layers=2, dropout=0.1):
super().__init__()
self.d_model = d_model
# Эмбеддинги
self.embedding = nn.Embedding(vocab_size, d_model, padding_idx=word2idx['<pad>'])
# Позиционное кодирование
self.pos_encoder = PositionalEncoding(d_model, dropout)
# Слой Transformer Encoder
# В режиме language model мы используем Encoder с causal mask,
# имитируя Decoder-only архитектуру
encoder_layers = nn.TransformerEncoderLayer(
d_model=d_model,
nhead=nhead,
dim_feedforward=d_model*4,
dropout=dropout,
batch_first=False # Важно: (Seq, Batch, Feature)
)
self.transformer_encoder = nn.TransformerEncoder(encoder_layers, num_layers=num_layers)
self.fc_out = nn.Linear(d_model, vocab_size)
self.init_weights()
def init_weights(self):
initrange = 0.1
self.embedding.weight.data.uniform_(-initrange, initrange)
self.fc_out.bias.data.zero_()
self.fc_out.weight.data.uniform_(-initrange, initrange)
def forward(self, src, mask=None, src_key_padding_mask=None):
# src: (seq_len, batch)
emb = self.embedding(src) * math.sqrt(self.d_model) # Scaling
emb = self.pos_encoder(emb)
# output: (seq_len, batch, d_model)
output = self.transformer_encoder(emb, mask=mask, src_key_padding_mask=src_key_padding_mask)
logits = self.fc_out(output)
return logits
Задача 1¶
Трансформеры обычно содержат больше параметров, чем простые RNN. Действие: Создайте экземпляр модели TransformerLM и посчитайте общее количество обучаемых параметров.
# Подсказка: sum(p.numel() for p in model.parameters() if p.requires_grad)
temp_model = TransformerLM(vocab_size=len(vocab), d_model=128, nhead=4, num_layers=2)
total_params = 0
print(f"Количество параметров: {total_params}")
# Преобразуем список последовательностей разной длины в единый тензор фиксированной формы, который
#можно подать в нейронную сеть.
def collate_batch(batch):
batch = [seq[:200] for seq in batch if len(seq) > 1] # ограничиваем длину и фильтруем короткие
padded = pad_sequence(batch, batch_first=False, padding_value=word2idx['<pad>'])
return padded.to(device)
# Collate функция для батчей
def collate_batch(batch):
# Ограничиваем длину для экономии памяти (Transformer чувствителен к длине^2)
batch = [seq[:100] for seq in batch if len(seq) > 1]
padded = pad_sequence(batch, batch_first=False, padding_value=word2idx['<pad>'])
return padded.to(device)
# Генерация маски (Causal + Padding)
def generate_masks(src):
"""
src: тензор формы (seq_len, batch_size)
Возвращает:
mask: causal mask формы (seq_len, seq_len)
src_key_padding_mask: padding mask формы (batch_size, seq_len)
"""
sz = src.size(0) # Длина последовательности
# Causal mask (запрещает видеть будущие токены)
mask = torch.triu(torch.ones(sz, sz, device=device), diagonal=1).bool()
# Padding mask (игнорирует токены <pad>)
src_key_padding_mask = src.T == word2idx['<pad>']
return mask, src_key_padding_mask
# Оценка (Evaluation)
def evaluate(model, data_loader, criterion):
model.eval()
total_loss = 0.0
total_tokens = 0
with torch.no_grad():
for batch in data_loader:
if batch.size(0) <= 1:
continue
x = batch[:-1, :]
y = batch[1:, :].reshape(-1)
# Генерируем маски под размер x
mask, padding_mask = generate_masks(x)
logits = model(x, mask=mask, src_key_padding_mask=padding_mask)
loss = criterion(logits.reshape(-1, len(vocab)), y)
total_loss += loss.item() * y.numel()
total_tokens += y.numel()
return total_loss / total_tokens
# Обучение
def train_model(model, train_loader, val_loader, epochs=3, lr=0.001):
model.to(device)
criterion = nn.CrossEntropyLoss(ignore_index=word2idx['<pad>'])
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.95)
for epoch in range(epochs):
model.train()
total_loss = 0
start_time = time.time()
progress_bar = tqdm(train_loader, desc=f"Эпоха {epoch+1}/{epochs}", leave=False)
for batch in progress_bar:
if batch.size(0) <= 1:
continue
# 1. Сначала нарезаем вход и цель
x = batch[:-1, :] # (seq_len-1, batch)
y = batch[1:, :].reshape(-1) # (seq_len-1 * batch,)
# 2. Генерируем маски под размер x
mask, padding_mask = generate_masks(x)
optimizer.zero_grad()
logits = model(x, mask=mask, src_key_padding_mask=padding_mask)
loss = criterion(logits.reshape(-1, len(vocab)), y)
loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
optimizer.step()
total_loss += loss.item()
progress_bar.set_postfix({'loss': f"{loss.item():.3f}"})
avg_loss = total_loss / len(train_loader)
elapsed = time.time() - start_time
print(f"Эпоха {epoch+1}. Loss: {avg_loss:.3f}. Время: {elapsed:.1f} сек.")
scheduler.step()
return model
4. Эксперименты¶
Задача 2¶
Задача: Запустите эксперимент с разными значениями nhead (например, 2, 4, 8).
Гипотеза: Увеличение числа nhead должно улучшить качество (снизить перплексию), но увеличить время обучения.
experiments = []
train_loader = DataLoader(train_data, batch_size=32, shuffle=True, collate_fn=collate_batch)
test_loader = DataLoader(test_data, batch_size=32, shuffle=False, collate_fn=collate_batch)
# Базовая конфигурация
d_model = 128
configs = [
{'nhead': 2, 'num_layers': 2},
{'nhead': 4, 'num_layers': 2},
{'nhead': 8, 'num_layers': 2} # Если d_model=128, то 8 голов допустимо (128/8=16 dim на голову)
]
for cfg in configs:
print(f"\n🚀 Эксперимент: nhead={cfg['nhead']}, layers={cfg['num_layers']}")
model = TransformerLM(
vocab_size=len(vocab),
d_model=d_model,
nhead=cfg['nhead'],
num_layers=cfg['num_layers'],
dropout=0.1
)
# Для быстрого демо ставим 1 эпоху, в реальности нужно больше
model = train_model(model, train_loader, test_loader, epochs=1, lr=0.001)
criterion = nn.CrossEntropyLoss(ignore_index=word2idx['<pad>'])
test_loss = evaluate(model, test_loader, criterion)
ppl = math.exp(test_loss)
experiments.append({
'nhead': cfg['nhead'],
'Слои': cfg['num_layers'],
'Perplexity': round(ppl, 1)
})
df = pd.DataFrame(experiments)
print("\n=== Результаты экспериментов ===")
print(tabulate(df, headers='keys', tablefmt='grid', showindex=False))
df = pd.DataFrame(experiments)
print("\n=== Результаты экспериментов ===")
print(tabulate(df, headers='keys', tablefmt='grid', showindex=False))
5. Генерация текста¶
def generate_text(model, seed_text, max_len=20, temperature=1.0):
model.eval()
tokens = tokenize(seed_text)
indices = [word2idx['<bos>']] + [word2idx.get(t, word2idx['<unk>']) for t in tokens]
generated = tokens[:]
with torch.no_grad():
for _ in range(max_len):
x = torch.tensor(indices, dtype=torch.long).unsqueeze(1).to(device)
# Создаем маску для текущей длины
sz = x.size(0)
mask = torch.triu(torch.ones(sz, sz, device=device), diagonal=1).bool()
# Паддинг маска не нужна, так как у нас один батч и нет паддинга внутри последовательности
logits = model(x, mask=mask)
next_logits = logits[-1, 0, :] / temperature
probs = torch.softmax(next_logits, dim=-1)
next_idx = torch.multinomial(probs, 1).item()
if next_idx == word2idx['<eos>']:
break
word = idx2word.get(next_idx, '<unk>')
generated.append(word)
indices.append(next_idx)
return ' '.join(generated)
# Пример использования (нужно обучить модель перед запуском)
# print(generate_text(model, "this movie is", max_len=10, temperature=0.8))
Задача 3¶
Сравните результат работы Transformer c результатом работы RNN + Attention.