Занятие 5

Лабораторная 5

P.S Лабораторная будет похожа на предыдущую пока мы не дойдём до блока Transformer

Сегодня будем решать следующую задачу:

Дан текстовый корпус
$$ \mathcal{D} = \{ s_1, s_2, \dots, s_N \}, $$
где каждое $ s_i $ — последовательность слов из словаря $ V $.

Требуется:

  1. Построить word-level языковую модель на основе RNN-архитектуры, которая аппроксимирует совместное распределение слов в предложении:
    $$ P(w_1, w_2, \dots, w_T) = \prod_{t=1}^{T} P(w_t \mid w_1, \dots, w_{t-1}) $$

  2. Реализовать несколько вариантов модели, отличающихся:

    • типом рекуррентного блока (RNN, GRU, LSTM),
    • глубиной (num_layers ∈ {1, 2}),
    • размером скрытого состояния,
    • наличием регуляризации (dropout),
    • алгоритмом оптимизации (SGD, Adam).
  3. Для каждой конфигурации вычислить перплексию на тестовом подмножестве корпуса:
    $$ \text{Perplexity} = \exp\left( -\frac{1}{N} \sum_{i=1}^{N} \log P(w_i \mid w_1, \dots, w_{i-1}) \right) $$

1. Библиотеки

In [ ]:
from datasets import load_dataset
from collections import Counter
import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader
import random
import math
import pandas as pd
from tabulate import tabulate
import re
import time
from tqdm import tqdm

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Используется устройство: {device}")

2. Подготовка датасета

Сегодня будем работать с датасетом IMDB

Датасет состоит из 50 000 отзывов (25k обучающих + 25k тестовых: 50% положительных, 50% отрицательных).

In [ ]:
# Загружаем IMDB
dataset = load_dataset('imdb')

# Простая токенизация
def tokenize(text):
    return re.findall(r"\b\w+\b", text.lower())

# Собираем словарь
all_words = []
for example in dataset['train']:
    all_words.extend(tokenize(example['text']))

vocab_size = 10000
counter = Counter(all_words)
special_tokens = ['<pad>', '<unk>', '<bos>', '<eos>']
most_common = [word for word, _ in counter.most_common(vocab_size)]
vocab = special_tokens + most_common
word2idx = {word: idx for idx, word in enumerate(vocab)}
idx2word = {idx: word for word, idx in word2idx.items()}

print(f"Размер словаря: {len(vocab)}")
In [ ]:
"""Данная функция преобразует текстовую строку в последовательность числовых индексов,
пригодную для подачи в нейронную сеть (например, RNN).
"""

def text_to_indices(text):
    tokens = tokenize(text)
    # добавление начала предложения
    indices = [word2idx['<bos>']]
    for token in tokens:
        indices.append(word2idx.get(token, word2idx['<unk>']))

    # добавление конца предложения
    indices.append(word2idx['<eos>'])
    return torch.tensor(indices, dtype=torch.long)
In [ ]:
"""Данная функция подготавливает подмножество данных (раздел датасета)
для обучения или тестирования языковой модели."""
def prepare_split(split_name, max_samples=5000):
    data = []
    for i, example in enumerate(dataset[split_name]):
        if i >= max_samples:
            break
        data.append(text_to_indices(example['text']))
    return data
In [ ]:
train_data = prepare_split('train', max_samples=8000)
test_data = prepare_split('test', max_samples=2000)
In [ ]:
train_data[0]

3. Обучение модели

In [ ]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        # Создаем матрицу позиционных кодировок формы (max_len, d_model)
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        # Добавляем измерение batch: (1, max_len, d_model)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        # x: (seq_len, batch, d_model)
        # self.pe: (1, max_len, d_model)
        # Берём первые seq_len позиций и транспонируем для совпадения размерностей
        x = x + self.pe[:, :x.size(0), :].transpose(0, 1)
        return self.dropout(x)

class TransformerLM(nn.Module):
    def __init__(self, vocab_size, d_model=200, nhead=8, num_layers=2, dropout=0.1):
        super().__init__()
        self.d_model = d_model

        # Эмбеддинги
        self.embedding = nn.Embedding(vocab_size, d_model, padding_idx=word2idx['<pad>'])

        # Позиционное кодирование
        self.pos_encoder = PositionalEncoding(d_model, dropout)

        # Слой Transformer Encoder
        # В режиме language model мы используем Encoder с causal mask,
        # имитируя Decoder-only архитектуру
        encoder_layers = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=d_model*4,
            dropout=dropout,
            batch_first=False # Важно: (Seq, Batch, Feature)
        )
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, num_layers=num_layers)

        self.fc_out = nn.Linear(d_model, vocab_size)
        self.init_weights()

    def init_weights(self):
        initrange = 0.1
        self.embedding.weight.data.uniform_(-initrange, initrange)
        self.fc_out.bias.data.zero_()
        self.fc_out.weight.data.uniform_(-initrange, initrange)

    def forward(self, src, mask=None, src_key_padding_mask=None):
        # src: (seq_len, batch)
        emb = self.embedding(src) * math.sqrt(self.d_model) # Scaling
        emb = self.pos_encoder(emb)

        # output: (seq_len, batch, d_model)
        output = self.transformer_encoder(emb, mask=mask, src_key_padding_mask=src_key_padding_mask)

        logits = self.fc_out(output)
        return logits

Задача 1

Трансформеры обычно содержат больше параметров, чем простые RNN. Действие: Создайте экземпляр модели TransformerLM и посчитайте общее количество обучаемых параметров.

In [ ]:
# Подсказка: sum(p.numel() for p in model.parameters() if p.requires_grad)
temp_model = TransformerLM(vocab_size=len(vocab), d_model=128, nhead=4, num_layers=2)
total_params = 0
print(f"Количество параметров: {total_params}")
In [ ]:
# Преобразуем список последовательностей разной длины в единый тензор фиксированной формы, который
#можно подать в нейронную сеть.
def collate_batch(batch):
    batch = [seq[:200] for seq in batch if len(seq) > 1]  # ограничиваем длину и фильтруем короткие
    padded = pad_sequence(batch, batch_first=False, padding_value=word2idx['<pad>'])
    return padded.to(device)
In [ ]:
# Collate функция для батчей
def collate_batch(batch):
    # Ограничиваем длину для экономии памяти (Transformer чувствителен к длине^2)
    batch = [seq[:100] for seq in batch if len(seq) > 1]
    padded = pad_sequence(batch, batch_first=False, padding_value=word2idx['<pad>'])
    return padded.to(device)

# Генерация маски (Causal + Padding)
def generate_masks(src):
    """
    src: тензор формы (seq_len, batch_size)
    Возвращает:
        mask: causal mask формы (seq_len, seq_len)
        src_key_padding_mask: padding mask формы (batch_size, seq_len)
    """
    sz = src.size(0)  # Длина последовательности
    # Causal mask (запрещает видеть будущие токены)
    mask = torch.triu(torch.ones(sz, sz, device=device), diagonal=1).bool()
    # Padding mask (игнорирует токены <pad>)
    src_key_padding_mask = src.T == word2idx['<pad>']
    return mask, src_key_padding_mask

# Оценка (Evaluation)
def evaluate(model, data_loader, criterion):
    model.eval()
    total_loss = 0.0
    total_tokens = 0
    with torch.no_grad():
        for batch in data_loader:
            if batch.size(0) <= 1:
                continue

            x = batch[:-1, :]
            y = batch[1:, :].reshape(-1)

            # Генерируем маски под размер x
            mask, padding_mask = generate_masks(x)

            logits = model(x, mask=mask, src_key_padding_mask=padding_mask)
            loss = criterion(logits.reshape(-1, len(vocab)), y)
            total_loss += loss.item() * y.numel()
            total_tokens += y.numel()

    return total_loss / total_tokens

# Обучение
def train_model(model, train_loader, val_loader, epochs=3, lr=0.001):
    model.to(device)
    criterion = nn.CrossEntropyLoss(ignore_index=word2idx['<pad>'])
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.95)

    for epoch in range(epochs):
        model.train()
        total_loss = 0
        start_time = time.time()
        progress_bar = tqdm(train_loader, desc=f"Эпоха {epoch+1}/{epochs}", leave=False)

        for batch in progress_bar:
            if batch.size(0) <= 1:
                continue

            # 1. Сначала нарезаем вход и цель
            x = batch[:-1, :]          # (seq_len-1, batch)
            y = batch[1:, :].reshape(-1)  # (seq_len-1 * batch,)

            # 2. Генерируем маски под размер x
            mask, padding_mask = generate_masks(x)

            optimizer.zero_grad()
            logits = model(x, mask=mask, src_key_padding_mask=padding_mask)
            loss = criterion(logits.reshape(-1, len(vocab)), y)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
            optimizer.step()

            total_loss += loss.item()
            progress_bar.set_postfix({'loss': f"{loss.item():.3f}"})

        avg_loss = total_loss / len(train_loader)
        elapsed = time.time() - start_time
        print(f"Эпоха {epoch+1}. Loss: {avg_loss:.3f}. Время: {elapsed:.1f} сек.")
        scheduler.step()
    return model

4. Эксперименты

Задача 2

Задача: Запустите эксперимент с разными значениями nhead (например, 2, 4, 8).

Гипотеза: Увеличение числа nhead должно улучшить качество (снизить перплексию), но увеличить время обучения.

In [ ]:
experiments = []
train_loader = DataLoader(train_data, batch_size=32, shuffle=True, collate_fn=collate_batch)
test_loader = DataLoader(test_data, batch_size=32, shuffle=False, collate_fn=collate_batch)

# Базовая конфигурация
d_model = 128

configs = [
    {'nhead': 2, 'num_layers': 2},
    {'nhead': 4, 'num_layers': 2},
    {'nhead': 8, 'num_layers': 2} # Если d_model=128, то 8 голов допустимо (128/8=16 dim на голову)
]

for cfg in configs:
    print(f"\n🚀 Эксперимент: nhead={cfg['nhead']}, layers={cfg['num_layers']}")
    model = TransformerLM(
        vocab_size=len(vocab),
        d_model=d_model,
        nhead=cfg['nhead'],
        num_layers=cfg['num_layers'],
        dropout=0.1
    )
    # Для быстрого демо ставим 1 эпоху, в реальности нужно больше
    model = train_model(model, train_loader, test_loader, epochs=1, lr=0.001)

    criterion = nn.CrossEntropyLoss(ignore_index=word2idx['<pad>'])
    test_loss = evaluate(model, test_loader, criterion)
    ppl = math.exp(test_loss)

    experiments.append({
        'nhead': cfg['nhead'],
        'Слои': cfg['num_layers'],
        'Perplexity': round(ppl, 1)
    })

df = pd.DataFrame(experiments)
print("\n=== Результаты экспериментов ===")
print(tabulate(df, headers='keys', tablefmt='grid', showindex=False))
In [ ]:
df = pd.DataFrame(experiments)
print("\n=== Результаты экспериментов ===")
print(tabulate(df, headers='keys', tablefmt='grid', showindex=False))

5. Генерация текста

In [ ]:
def generate_text(model, seed_text, max_len=20, temperature=1.0):
    model.eval()
    tokens = tokenize(seed_text)
    indices = [word2idx['<bos>']] + [word2idx.get(t, word2idx['<unk>']) for t in tokens]
    generated = tokens[:]

    with torch.no_grad():
        for _ in range(max_len):
            x = torch.tensor(indices, dtype=torch.long).unsqueeze(1).to(device)

            # Создаем маску для текущей длины
            sz = x.size(0)
            mask = torch.triu(torch.ones(sz, sz, device=device), diagonal=1).bool()
            # Паддинг маска не нужна, так как у нас один батч и нет паддинга внутри последовательности

            logits = model(x, mask=mask)

            next_logits = logits[-1, 0, :] / temperature
            probs = torch.softmax(next_logits, dim=-1)

            next_idx = torch.multinomial(probs, 1).item()

            if next_idx == word2idx['<eos>']:
                break

            word = idx2word.get(next_idx, '<unk>')
            generated.append(word)
            indices.append(next_idx)

    return ' '.join(generated)

# Пример использования (нужно обучить модель перед запуском)
# print(generate_text(model, "this movie is", max_len=10, temperature=0.8))

Задача 3

Сравните результат работы Transformer c результатом работы RNN + Attention.

In [ ]: