Занятие 4

Лабораторная 3

P.S Лабораторная будет похожа на предыдущую пока мы не дойдём до блока Attention

Сегодня будем решать следующую задачу:

Дан текстовый корпус
$$ \mathcal{D} = \{ s_1, s_2, \dots, s_N \}, $$
где каждое $ s_i $ — последовательность слов из словаря $ V $.

Требуется:

  1. Построить word-level языковую модель на основе RNN-архитектуры, которая аппроксимирует совместное распределение слов в предложении:
    $$ P(w_1, w_2, \dots, w_T) = \prod_{t=1}^{T} P(w_t \mid w_1, \dots, w_{t-1}) $$

  2. Реализовать несколько вариантов модели, отличающихся:

    • типом рекуррентного блока (RNN, GRU, LSTM),
    • глубиной (num_layers ∈ {1, 2}),
    • размером скрытого состояния,
    • наличием регуляризации (dropout),
    • алгоритмом оптимизации (SGD, Adam).
  3. Для каждой конфигурации вычислить перплексию на тестовом подмножестве корпуса:
    $$ \text{Perplexity} = \exp\left( -\frac{1}{N} \sum_{i=1}^{N} \log P(w_i \mid w_1, \dots, w_{i-1}) \right) $$

1. Библиотеки

In [ ]:
from datasets import load_dataset
from collections import Counter
import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader
import random
import math
import pandas as pd
from tabulate import tabulate

import re

import time
from tqdm import tqdm  # для отображения прогресс-бара

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Используется устройство: {device}")

2. Подготовка датасета

Сегодня будем работать с датасетом IMDB

Датасет состоит из 50 000 отзывов (25k обучающих + 25k тестовых: 50% положительных, 50% отрицательных).

In [ ]:
# Загружаем IMDB
dataset = load_dataset('imdb')

# Простая токенизация без NLTK
def tokenize(text):
    # Оставляем только буквы и апострофы (для "don't" и т.п.)
    return re.findall(r"\b\w+\b", text.lower())

# Собираем все слова из обучающей выборки
all_words = []
for example in dataset['train']:
    all_words.extend(tokenize(example['text']))
In [ ]:
# обучающий пример
print(dataset['train'][3])
In [ ]:
# Зададим, сколько самых частых слов из корпуса мы хотим включить в словарь.
# Это гиперпараметр: можно уменьшить (для быстрого обучения) или увеличить (для лучшего покрытия).
vocab_size = 10000

# Counter подсчитывает, сколько раз каждое слово встречается в обучающем корпусе.
counter = Counter(all_words)

# Служебные токены
#<pad> - Заполнение коротких последовательностей до одинаковой длины (для батчей)
# <unk> Замена редких/неизвестных слов (out-of-vocabulary)
# <bos> Beginning of sentence — маркер начала предложения
# <eos> End of sentence — маркер конца предложения
special_tokens = ['<pad>', '<unk>', '<bos>', '<eos>']

# Берём 10 000 самых частых слов из корпуса (без учёта служебных).
# Редкие слова автоматически будут заменяться на <unk>.
most_common = [word for word, _ in counter.most_common(vocab_size)]

# Формируем итоговый словарь: сначала идут служебные токены, затем частые слова.
vocab = special_tokens + most_common
word2idx = {word: idx for idx, word in enumerate(vocab)}
idx2word = {idx: word for word, idx in word2idx.items()}

print(f"Размер словаря: {len(vocab)}")

Задача 1

Найдите самое редкое слово в словаре.

In [ ]:

In [ ]:
"""Данная функция преобразует текстовую строку в последовательность числовых индексов,
пригодную для подачи в нейронную сеть (например, RNN).
"""

def text_to_indices(text):
    tokens = tokenize(text)
    # добавление начала предложения
    indices = [word2idx['<bos>']]
    for token in tokens:
        indices.append(word2idx.get(token, word2idx['<unk>']))

    # добавление конца предложения
    indices.append(word2idx['<eos>'])
    return torch.tensor(indices, dtype=torch.long)
In [ ]:
"""Данная функция подготавливает подмножество данных (раздел датасета)
для обучения или тестирования языковой модели."""
def prepare_split(split_name, max_samples=5000):
    data = []
    for i, example in enumerate(dataset[split_name]):
        if i >= max_samples:
            break
        data.append(text_to_indices(example['text']))
    return data
In [ ]:
train_data = prepare_split('train', max_samples=8000)
test_data = prepare_split('test', max_samples=2000)
In [ ]:
train_data[0]

Задача 2

train_data[0] - это набор цифр. Что он означает?

3. Обучение модели

In [ ]:
class RNNLanguageModel(nn.Module):
    def __init__(self, vocab_size, embed_dim=200, hidden_dim=200,
                 num_layers=1, rnn_type='LSTM', dropout=0.0):
        super().__init__()
        # Эмбеддинг-слой: преобразует индексы слов в плотные векторы
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=word2idx['<pad>'])
        # Dropout после эмбеддингов для регуляризации
        self.dropout_emb = nn.Dropout(dropout)

        # Выбор типа рекуррентного слоя
        rnn_types = {
            'RNN': nn.RNN,
            'GRU': nn.GRU,
            'LSTM': nn.LSTM
        }
        # Инициализация RNN-блока
        self.rnn = rnn_types[rnn_type](
            input_size=embed_dim,      # размер входного эмбеддинга
            hidden_size=hidden_dim,    # размер скрытого состояния
            num_layers=num_layers,     # количество слоёв
            dropout=dropout if num_layers > 1 else 0,  # dropout между слоями (только если их >1)
            batch_first=False          # ожидаем форму (seq_len, batch, ...)
        )
        # Dropout на выходе RNN
        self.dropout_out = nn.Dropout(dropout)
        # Линейный слой: преобразует скрытое состояние в логиты над словарём
        self.fc = nn.Linear(hidden_dim, vocab_size)
        # Сохраняем тип RNN для возможного использования в forward
        self.rnn_type = rnn_type

    def forward(self, x, hidden=None):
        # x: тензор формы (seq_len, batch_size) с индексами слов
        emb = self.embedding(x)               # (seq_len, batch, embed_dim)
        emb = self.dropout_emb(emb)           # применяем dropout к эмбеддингам

        # Пропускаем через RNN
        output, hidden = self.rnn(emb, hidden)  # output: (seq_len, batch, hidden_dim)

        output = self.dropout_out(output)      # dropout на выходе RNN
        logits = self.fc(output)              # (seq_len, batch, vocab_size)

        return logits, hidden
In [ ]:
# Преобразуем список последовательностей разной длины в единый тензор фиксированной формы, который
#можно подать в нейронную сеть.
def collate_batch(batch):
    batch = [seq[:200] for seq in batch if len(seq) > 1]  # ограничиваем длину и фильтруем короткие
    padded = pad_sequence(batch, batch_first=False, padding_value=word2idx['<pad>'])
    return padded.to(device)
In [ ]:
# Оценка модели на тестовых данных: вычисляет среднюю кросс-энтропию (loss)
def evaluate(model, data_loader, criterion):
    model.eval()
    total_loss = 0.0
    total_tokens = 0
    with torch.no_grad():  # отключаем градиенты для ускорения
        for batch in data_loader:
            if batch.size(0) <= 1:
                continue
            x = batch[:-1, :]          # вход: все слова, кроме последнего
            y = batch[1:, :].reshape(-1)  # цель: все слова, кроме первого
            logits, _ = model(x)
            loss = criterion(logits.reshape(-1, len(vocab)), y)
            total_loss += loss.item() * y.numel()
            total_tokens += y.numel()
    return total_loss / total_tokens  # средний loss на токен
In [ ]:
# Обучение модели
def train_model(model, train_loader, epochs=3, lr=0.002, clip=0.5):
    # Переносим модель на GPU (если доступен) или CPU
    model.to(device)

    # Функция потерь: кросс-энтропия, игнорирующая паддинг-токены при подсчёте
    criterion = nn.CrossEntropyLoss(ignore_index=word2idx['<pad>'])

    # Используем только Adam — адаптивный оптимизатор, устойчивый к выбору lr
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    # Планировщик: уменьшает learning rate на 5% после каждой эпохи для стабильной сходимости
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.95)

    for epoch in range(epochs):
        model.train()  # включаем режим обучения (включает dropout и т.п.)
        total_loss = 0
        start_time = time.time()

        # Прогресс-бар для отслеживания обучения внутри эпохи
        progress_bar = tqdm(train_loader, desc=f"Эпоха {epoch+1}/{epochs}", leave=False)
        for batch in progress_bar:
            # Пропускаем слишком короткие последовательности (нельзя предсказать следующее слово)
            if batch.size(0) <= 1:
                continue

            # x — вход: все слова, кроме последнего (<eos>)
            # y — цель: все слова, кроме первого (<bos>)
            x = batch[:-1, :]
            y = batch[1:, :].reshape(-1)

            optimizer.zero_grad()          # обнуляем градиенты с прошлой итерации
            logits, _ = model(x)           # прямой проход: получаем логиты
            loss = criterion(logits.reshape(-1, len(vocab)), y)  # вычисляем потерю
            loss.backward()                # обратное распространение ошибки
            torch.nn.utils.clip_grad_norm_(model.parameters(), clip)  # обрезка градиентов против взрыва
            optimizer.step()               # обновление весов

            total_loss += loss.item()
            progress_bar.set_postfix({'loss': f"{loss.item():.3f}"})  # отображаем текущий loss

        # Средний loss за эпоху и затраченное время
        avg_loss = total_loss / len(train_loader)
        elapsed = time.time() - start_time
        print(f"Эпоха {epoch+1} завершена. Средний loss: {avg_loss:.3f}. Время: {elapsed:.1f} сек.")

        # Уменьшаем learning rate
        scheduler.step()

    return model

4. Эксперименты

In [ ]:
experiments = []

# Подготовка данных
train_loader = DataLoader(train_data, batch_size=32, shuffle=True, collate_fn=collate_batch)
test_loader = DataLoader(test_data, batch_size=32, shuffle=False, collate_fn=collate_batch)

print("\n🚀 Эксперимент 3: Размер скрытого состояния")
for hidden_dim in [50, 100, 200]:
    print(f"  Обучение LSTM, hidden_dim={hidden_dim}...")
    model = RNNLanguageModel(
        vocab_size=len(vocab),
        embed_dim=hidden_dim,  # обычно embed_dim = hidden_dim для простоты
        hidden_dim=hidden_dim,
        num_layers=1,
        rnn_type='LSTM',
        dropout=0.2
    )
    model = train_model(model, train_loader, epochs=3, lr=0.002)

    criterion = nn.CrossEntropyLoss(ignore_index=word2idx['<pad>'])
    test_loss = evaluate(model, test_loader, criterion)
    ppl = math.exp(test_loss)
    experiments.append({
        'Модель': f'LSTM (hidden={hidden_dim})',
        'Скрытый размер': hidden_dim,
        'Слои': 1,
        'Dropout': 0.2,
        'Оптимизатор': 'Adam',
        'Перплексия': round(ppl, 1)
    })
In [ ]:
df = pd.DataFrame(experiments)
print("\n=== Результаты экспериментов ===")
print(tabulate(df, headers='keys', tablefmt='grid', showindex=False))

5. Генерация текста

In [ ]:
def generate_text(model, seed_text, max_len=40, temperature=1.0, method='greedy'):
    """
    Генерирует текст с помощью обученной RNN-модели.

    Аргументы:
        model: обученная RNNLanguageModel
        seed_text (str): начальный текст (например, "I love")
        max_len (int): максимальная длина генерируемого текста (в словах)
        temperature (float): температура для sampling'а (1.0 = стандартная)
        method (str): 'greedy' или 'sample'
    """
    model.eval()
    tokens = tokenize(seed_text)
    indices = [word2idx['<bos>']] + [word2idx.get(t, word2idx['<unk>']) for t in tokens]
    generated = tokens[:]

    with torch.no_grad():
        for _ in range(max_len):
            # Подготавливаем входной тензор
            x = torch.tensor(indices, dtype=torch.long).unsqueeze(1).to(device)  # (seq_len, 1)

            # Прямой проход
            logits, _ = model(x)

            # Берём логиты последнего слова
            next_logits = logits[-1, 0, :] / temperature
            probs = torch.softmax(next_logits, dim=-1)

            if method == 'greedy':
                next_idx = torch.argmax(probs).item()
            else:  # sampling
                next_idx = torch.multinomial(probs, 1).item()

            # Остановка при <eos>
            if next_idx == word2idx['<eos>']:
                break

            # Добавляем слово
            word = idx2word.get(next_idx, '<unk>')
            generated.append(word)
            indices.append(next_idx)

    return ' '.join(generated)
In [ ]:
# После обучения модели (например, lstm_model)
print("=== Примеры генерации ===")
seeds = ["i", "i think", "i think this", "i think this film"]

for seed in seeds:
    greedy = generate_text(model, seed, max_len=25, method='greedy')
    sampled = generate_text(model, seed, max_len=25, method='sample', temperature=0.8)
    print(f"Начало: '{seed}'")
    print(f"  Жадно:     {greedy}")
    print(f"  Sampling:  {sampled}\n")

6. Attention

In [ ]:
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
In [ ]:
# Настройка стилей для графиков
sns.set_theme(style="whitegrid")
plt.rcParams['figure.figsize'] = (10, 8)
plt.rcParams['font.size'] = 12
In [ ]:
class BahdanauAttention(nn.Module):
    """
    Механизм внимания Бахданау (Additive Attention).
    Вычисляет веса внимания для скрытых состояний RNN.
    """
    def __init__(self, hidden_dim):
        super().__init__()
        # Линейные слои для вычисления энергии внимания
        self.W1 = nn.Linear(hidden_dim, hidden_dim)
        self.W2 = nn.Linear(hidden_dim, hidden_dim)
        self.V = nn.Linear(hidden_dim, 1, bias=False)

    def forward(self, query, values, mask=None):
        """
        query: текущее скрытое состояние (batch_size, hidden_dim)
        values: все выходы RNN (seq_len, batch_size, hidden_dim)
        mask: маска для игнорирования паддинга или будущих токенов (seq_len, batch_size)
        """
        # query_with_time_axis: (seq_len, batch_size, hidden_dim) для сложения
        query = query.unsqueeze(0)

        # Вычисление энергии (score)
        # tanh(W1 * values + W2 * query)
        score = self.V(torch.tanh(self.W1(values) + self.W2(query)))
        score = score.squeeze(-1)  # (seq_len, batch_size)

        # Применение маски (если есть)
        if mask is not None:
            score = score.masked_fill(mask, -1e9)

        # Softmax для получения весов (вероятностей)
        attention_weights = torch.softmax(score, dim=0) # (seq_len, batch_size)

        # Контекстный вектор: взвешенная сумма значений
        # (seq_len, batch, hidden) * (seq_len, batch, 1) -> sum over seq_len
        context_vector = torch.sum(attention_weights.unsqueeze(-1) * values, dim=0)

        return context_vector, attention_weights
In [ ]:
class RNNAttentionModel(nn.Module):
    def __init__(self, vocab_size, embed_dim=200, hidden_dim=200,
                 num_layers=1, rnn_type='LSTM', dropout=0.0):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=word2idx['<pad>'])
        self.dropout_emb = nn.Dropout(dropout)

        rnn_types = {'RNN': nn.RNN, 'GRU': nn.GRU, 'LSTM': nn.LSTM}
        self.rnn = rnn_types[rnn_type](
            input_size=embed_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            dropout=dropout if num_layers > 1 else 0,
            batch_first=False
        )

        # Механизм внимания
        self.attention = BahdanauAttention(hidden_dim)

        self.dropout_out = nn.Dropout(dropout)
        # Вход в FC теперь: hidden_dim (из RNN) + hidden_dim (из Context)
        self.fc = nn.Linear(hidden_dim * 2, vocab_size)
        self.hidden_dim = hidden_dim
        self.rnn_type = rnn_type

        # Для визуализации
        self.last_attention_weights = None

    def forward(self, x, hidden=None, return_attention=False):
        # x: (seq_len, batch)
        emb = self.embedding(x)
        emb = self.dropout_emb(emb)

        output, hidden = self.rnn(emb, hidden)
        # output: (seq_len, batch, hidden_dim)

        # Для LM мы предсказываем следующее слово для каждого шага.
        # Внимание вычисляется для каждого шага времени отдельно.
        # Чтобы ускорить, сделаем это циклом по длине последовательности (для наглядности)
        # или векторизованно. Здесь упрощенный вариант:
        # берем последний hidden state как query для предсказания следующего токена в конце,
        # НО для Language Modeling нам нужно предсказание для КАЖДОГО шага.

        # Реализация Attention для каждого шага времени (Time-step Attention)
        seq_len = x.size(0)
        batch_size = x.size(1)

        all_contexts = []
        all_attentions = []

        # Создаем маску (causal mask + pad mask)
        # Мы не можем смотреть в будущее. На шаге t доступны токены 0..t
        mask = torch.triu(torch.ones(seq_len, seq_len, device=x.device), diagonal=1).bool()
        # mask[i, j] = True, если j > i (будущее)

        # Также маска паддинга
        pad_mask = (x == word2idx['<pad>']).transpose(0, 1) # (batch, seq_len)
        # Нужно привести к (seq_len, batch)
        pad_mask = pad_mask.transpose(0, 1)

        combined_mask = mask.unsqueeze(1).expand(-1, batch_size, -1) | pad_mask.unsqueeze(-1).expand(-1, -1, seq_len)
        # combined_mask: (seq_len, batch, seq_len) - сложно.
        # Упростим: для задачи LM часто используют просто output RNN.
        # Но чтобы реализовать Attention, сделаем так:
        # Для каждого шага t, query = output[t], values = output[:t+1]

        outputs_list = []
        attn_list = []

        for t in range(seq_len):
            query = output[t] # (batch, hidden)
            values = output[:t+1] # (t+1, batch, hidden)

            # Маска для текущего шага: игнорируем паддинг в values
            current_pad_mask = (x[:t+1, :] == word2idx['<pad>']).transpose(0, 1) # (batch, t+1)

            ctx, attn = self.attention(query, values, mask=current_pad_mask.transpose(0, 1))
            # ctx: (batch, hidden), attn: (t+1, batch)

            # Конкатенация скрытого состояния и контекста
            concat = torch.cat((query, ctx), dim=1) # (batch, hidden*2)
            out = self.fc(concat)
            outputs_list.append(out)
            attn_list.append(attn)

        logits = torch.stack(outputs_list, dim=0) # (seq_len, batch, vocab)

        if return_attention:
            # Сохраняем веса для последней итерации (для визуализации)
            # Для простоты возьмем среднее внимание по батчу для последнего шага
            self.last_attention_weights = attn_list[-1].cpu().detach()

        return logits, hidden
In [ ]:
def visualize_attention(model, text, max_len=30):
    """
    Визуализирует веса внимания для данного текста.
    """
    model.eval()
    tokens = tokenize(text)
    # Обрезаем для наглядности
    tokens = tokens[:max_len]
    indices = [word2idx['<bos>']] + [word2idx.get(t, word2idx['<unk>']) for t in tokens]

    x = torch.tensor(indices, dtype=torch.long).unsqueeze(1).to(device)

    with torch.no_grad():
        # Прямой проход с возвратом внимания
        # Примечание: в нашей реализации attention возвращается для последнего шага
        # Для полной матрицы нужно модифицировать forward, но для примера
        # покажем, на что смотрит модель при предсказании ПОСЛЕДНЕГО слова.
        logits, _ = model(x, return_attention=True)

        # Получаем сохраненные веса (внимание последнего токена к предыдущим)
        attn_weights = model.last_attention_weights.squeeze().numpy() # (seq_len,)

        # Добавляем <bos> в метки
        words = ['<bos>'] + tokens

        plt.figure(figsize=(15, 5))

        # График 1: Тепловая карта (вектор внимания для последнего слова)
        plt.subplot(1, 2, 1)
        # Создаем матрицу для отображения (1 строка, N столбцов)
        attn_matrix = attn_weights[:len(words)].reshape(1, -1)
        sns.heatmap(attn_matrix, annot=True, fmt='.2f', cmap='YlGnBu',
                    xticklabels=words, yticklabels=['Last Token'])
        plt.title(f"Внимание при генерации слова после: '{tokens[-1] if tokens else ''}'")
        plt.xlabel("Входные токены")
        plt.ylabel("Целевой токен")

        # График 2: Столбчатая диаграмма весов
        plt.subplot(1, 2, 2)
        plt.bar(range(len(words)), attn_weights[:len(words)])
        plt.xticks(range(len(words)), words, rotation=45)
        plt.title("Вес внимания для каждого слова")
        plt.xlabel("Токены")
        plt.ylabel("Weight")

        plt.tight_layout()
        plt.show()

7. Эксперименты с Attention

In [ ]:
attn_loader = DataLoader(train_data, batch_size=32, shuffle=True, collate_fn=collate_batch)

model_attn = RNNAttentionModel(
    vocab_size=len(vocab),
    embed_dim=200,
    hidden_dim=200,
    num_layers=1,
    rnn_type='LSTM',
    dropout=0.2
)

# Функция train_model универсальна, но нужно убедиться, что criterion подходит
# В RNNAttentionModel выход FC имеет размер hidden*2, но это внутри модели, для loss важно logits shape
model_attn = train_model(model_attn, attn_loader, epochs=3, lr=0.002)

criterion = nn.CrossEntropyLoss(ignore_index=word2idx['<pad>'])
test_loss_attn = evaluate(model_attn, test_loader, criterion)
ppl_attn = math.exp(test_loss_attn)

experiments.append({
    'Модель': 'LSTM + Attention',
    'Скрытый размер': 200,
    'Слои': 1,
    'Dropout': 0.2,
    'Оптимизатор': 'Adam',
    'Перплексия': round(ppl_attn, 1)
})

# Обновляем таблицу
df = pd.DataFrame(experiments)
print("\n=== Итоговые Результаты Экспериментов ===")
print(tabulate(df, headers='keys', tablefmt='grid', showindex=False))
In [ ]:
print("\n=== Визуализация Attention ===")
# Возьмем пример из теста
sample_text = dataset['test'][25]['text']
print(f"Текст для анализа: {sample_text[:100]}...")
visualize_attention(model_attn, sample_text, max_len=20)
In [ ]:
tasks = [
    {
        "№": 3,
        "Задача": "Сравнение Перплексии",
        "Описание": "Запустите эксперименты для обычного LSTM и LSTM+Attention с одинаковыми гиперпараметрами (hidden_dim=200, epochs=5). Какая модель показывает лучшую перплексию на тесте? Объясните, почему внимание могло помочь (или не помочь)."
    },
    {
        "№": 4,
        "Задача": "Анализ Внимания на Отрицаниях",
        "Описание": "Найдите в тестовом датасете отзывы, содержащие слова 'not', 'never', 'bad'. Визуализируйте внимание модели на этих словах. Фокусируется ли модель на отрицаниях при предсказании следующего слова (например, при прогнозе тональности или продолжения фразы)?"
    },
    {
        "№": 5,
        "Задача": "Влияние Длины Последовательности",
        "Описание": "Измените функцию collate_batch так, чтобы она обрезала последовательности до 50, 100 и 500 токенов. Обучите модель с Attention на этих настройках. Как длина контекста влияет на качество генерации и скорость обучения?"
    },
    {
        "№": 6,
        "Задача": "GRU vs LSTM с Attention",
        "Описание": "Замените тип RNN на 'GRU' в классе RNNAttentionModel. Проведите обучение. Сравните время обучения и итоговую перплексию с LSTM. Какой архитектурный блок эффективнее в связке с Attention для этого датасета?"
    },
    {
        "№": 7,
        "Задача": "Температура и Разнообразие",
        "Описание": "Используя модель с Attention, сгенерируйте 5 вариантов продолжения фразы 'The movie was' при temperature=[0.5, 1.0, 1.5, 2.0]. Оцените, как температура влияет на грамматическую правильность и смысловую связность текста."
    }
]

df_tasks = pd.DataFrame(tasks)
print("\n=== 📚 Задачи для закрепления материала ===")
print(tabulate(df_tasks, headers='keys', tablefmt='grid', showindex=False))