AI Engineer
The is an opinion piece based on the author’s POV and does not necessarily reflect the views of HackerNoon.
This story contains AI-generated text. The author has used AI either for research, to generate outlines, or write the text itself.
No espírito da agenda mecanicista de interpretabilidade para redes neurais , este post (e talvez outros a seguir em uma série) investiga os “algoritmos” aprendidos por um modelo de transformador para lidar com uma tarefa técnica restrita – uma versão modificada dos “Parênteses Válidos” Problema de código Leet.
Embora a utilidade da tarefa seja muito mais modesta em escopo do que a previsão mais geral do próximo token que você esperaria em um LLM, o exercício nos ajudará a explorar algumas das primeiras intuições, ferramentas investigativas e metodologias epistemológicas gerais normalmente implantadas para sabemos o que os modelos estão fazendo (e como sabemos que estão).
Os desafios mensais da ARENA Mechinterp tiveram uma grande influência neste post, e daí virá o primeiro conjunto de problemas. (Você definitivamente deveria conferir o programa .)
O problema dos parênteses válidos visto no Leetcode:
Algumas restrições modificadas no problema que usaremos para a tarefa:
Isso elimina a necessidade de lidar com casos como "([)]”.
Exemplos
“(((())))” → Válido
“()()()(” → Inválido
“)()()()(” → Inválido
def isValid(self, s: str) -> bool: nesting_depth = 0 for bracket in s: if bracket == '(': # An opening bracket increases unresolved nesting depth nesting_depth += 1 elif bracket == ')': # A closing bracket decreases unresolved nesting depth nesting_depth -= 1 # We don't expect to ever have negative unresolved nesting depth, # so we can declare 'invalid' midway through the sequence if we see this if nesting_depth < 0: return False # Final check that all open brackets were closed. return nesting_depth == 0
Notas sobre casos de falha:
nesting_profundidade ≠ 0 no final da sequência
“()()()((” → Inválido
Para isso, não é óbvio que algo esteja errado até o final, quando vemos que os últimos colchetes abertos não possuem colchetes. O que devemos notar é que não há nenhum ponto na sequência, até o final, em que tivemos informações suficientes para saber que algo estava errado.
nesting_profundidade < 0 em qualquer ponto da sequência
exemplo: “())()()(” → Inválido
Neste caso, por outro lado, há informação suficiente da terceira posição para saber que a validade da sequência é irrecuperável, para que possamos encerrar o processo mais cedo.
Algo a ser observado é que este exemplo teria passado no primeiro teste de falha, já que nesting_depth
no final seria igual a 0. Portanto, este caso de teste não apenas nos ajuda a parar mais cedo, é vital. O mesmo se aplica ao primeiro exemplo de caso de falha, onde teria passado no teste 2.
Agora, não esperamos que um modelo de transformador autorregressivo resolva o problema exatamente da mesma maneira, visto que sua arquitetura oferece mecanismos ligeiramente diferentes do que repetir a sequência uma vez e verificar se tudo está bem. No entanto, sabemos com certeza que a arquitetura do transformador (e outras arquiteturas de processamento de sequência) é pelo menos capaz de descobrir e processar informações sobre todos os elementos de uma sequência. É importante lembrar que embora a solução possa parecer diferente, a estrutura do problema é a mesma e os limites rígidos sobre o que é conhecido e onde na sequência continuam a ser verdadeiros, seja um loop e instruções if ou um conjunto de self -varreduras de atenção e não linearidades MLP.
A questão interessante então é como essa arquitetura aproveita essas informações e se elas são facilmente discerníveis com as ferramentas existentes; porque é inevitável que uma solução com desempenho suficiente de qualquer arquitetura não teste pelo menos os dois casos de falha acima.
Esta é uma das vantagens dos problemas dos brinquedos; obtemos uma tarefa restrita suficientemente compreendida com essas garantias duras que podem ajudar a informar a investigação, como veremos em breve.
Aqui estão algumas características alvo que buscamos com a geração de dados:
Um número igual de strings balanceadas e desbalanceadas.
As strings terão comprimento par, pois uma string de comprimento ímpar é obviamente desequilibrada; o que não seria uma heurística muito interessante para o modelo aprender.
Todos os comprimentos de string (2-40) devem ser igualmente prováveis.
Para um determinado comprimento de string, todas as possíveis profundidades de aninhamento de parênteses devem ser igualmente prováveis.
Um tema comum é aparente: estamos a tentar tornar todas as estatísticas de distribuição imagináveis igualmente susceptíveis de reduzir o enviesamento em qualquer direcção, de garantir a robustez e de negar heurísticas óbvias de ganhos rápidos como uma opção para o modelo. Para gerar casos de falha, primeiro geraremos parênteses válidos com as garantias listadas acima e, em seguida, modificaremos metade deles para ficarmos desequilibrados.
from random import randint, randrange, sample from typing import List, Tuple, Union, Optional, Callable, Dict from jaxtyping import Float, Int import torch as t from torch import Tensor import plotly.express as px import einops from dataclasses import dataclass import math
def isValid(s: str) -> bool: nesting_depth = 0 for bracket in s: if bracket == '(': # An opening bracket increases unresolved nesting depth nesting_depth += 1 elif bracket == ')': # A closing bracket decreases unresolved nesting depth nesting_depth -= 1 # We don't expect to ever have negative unresolved nesting depth, # so we can declare 'invalid' midway through the sequence if we see this if nesting_depth < 0: return False # Final check that all open brackets were closed. return nesting_depth == 0
assert isValid('()()((((()())())))') == True assert isValid(')()((((()())()))(') == False
A primeira tentativa de geração de parênteses apenas faz um passeio aleatório. Mas, como você pode ver nos gráficos abaixo, o subespaço dos parênteses não balanceados é muito maior do que o dos parênteses equilibrados; então teremos que introduzir a estocasticidade de forma diferente.
PARENS = ['(', ')'] def get_random_walk_parens(parens_num: int, length_range: Tuple[int]) -> List[str]: range_start, range_end = length_range random_parens = [ # Add 1 to make passed range_end inclusive ''.join(PARENS[randint(0, 1)] for _ in range(randrange(range_start, range_end + 1, 2))) for _ in range(parens_num) ] return random_parens
random_parens = get_random_walk_parens(1000, (2, 10))
random_parens[:10] # output [')(', '(((())()', ')(((()()))', '))))))', '))())()(', '))', '(())', ')()(()()()', ')()())))((', '()']
is_valid_evals = [str(isValid(random_paren)) for random_paren in random_parens] len_evals = [len(random_paren) for random_paren in random_parens]
fig = px.histogram(is_valid_evals, title="Count of is-balanced for random walk parentheses strings") fig.show()
Podemos dividir a construção de uma sequência de parênteses balanceada em unidades discretas de parênteses aninhados. Para esta construção gananciosa, em cada etapa do processo de geração de uma string, uma profundidade de aninhamento é escolhida a partir de uma cesta de profundidades viáveis (para respeitar o comprimento alvo da string).
Por exemplo, para o comprimento alvo 6
, são possíveis as seguintes decomposições de aninhamento exclusivas:
-> [2, 1], [1, 2], [1,1,1] or [3]
Corresponding to:
-> (())(), ()(()), ()()(), ((()))
def get_balanced_parens(nest_depth: int) -> str: """Generate parentheses at the required nesting depth.""" return (PARENS[0] * nest_depth) + (PARENS[1] * nest_depth) assert get_balanced_parens(3) == '((()))'
def get_balanced_sequence_parens(nest_depth_sequence: List[int]) -> str: """Return a parentheses string following the nesting depth sequence from a given list.""" return ''.join(get_balanced_parens(nest_depth) for nest_depth in nest_depth_sequence) assert get_balanced_sequence_parens([1,1,2,3]) == '()()(())((()))'
def get_random_depth_sequence(target_paren_len: int) -> List[int]: depth_sequence = [] while target_paren_len > 0: depth = randint(1, target_paren_len / 2) depth_sequence.append(depth) target_paren_len -= 2 * depth return depth_sequence rand_depth_seq = get_random_depth_sequence(10) print(rand_depth_seq) # Example output: '[3, 1, 1]' assert sum([2 * depth for depth in rand_depth_seq]) == 10
def get_random_sequence_parens(parens_num: int, length_range: Tuple[int]) -> List[str]: random_depth_sequences = [get_random_depth_sequence( randrange(*length_range, 2) ) for _ in range(parens_num)] random_parens = [ get_balanced_sequence_parens(random_depth_sequence) for random_depth_sequence in random_depth_sequences ] return random_parens, random_depth_sequences
Obtenha pais equilibrados
random_seq_parens, depth_sequences = get_random_sequence_parens(100000, (2, 11)) is_valid_evals = [str(isValid(random_paren)) for random_paren in random_seq_parens] len_evals = [len(random_paren) for random_paren in random_seq_parens]
Vamos ver as frequências das profundidades de aninhamento
depth_freq = {} for seq in depth_sequences: for depth in seq: depth_freq.setdefault(depth, 0) depth_freq[depth] += 1 depth_freq # output -> {2: 39814, 1: 100088, 3: 20127, 4: 9908, 5: 4012}
depth_seq_hist = px.histogram(depth_sequences, title="Frequence of nesting depths in 'Random Nesting Depth Sequence' Output") depth_seq_hist.show()
E agora, para ver as frequências de comprimento.
paren_len_hist = px.histogram(len_evals, title="Frequency of string lengths") paren_len_hist.show()
Observe que há uma tensão entre as seguintes propriedades potenciais de nossa distribuição de dados.
Isso ocorre porque subsequências de baixa profundidade de aninhamento terão mais oportunidades de aparecer em uma determinada sequência de aninhamento aleatório, conforme mostrado nos gráficos acima.
Para contrariar esta tendência natural da sequência puramente aleatória, ao gerar uma determinada substring de parênteses, poderíamos amostrar a partir de uma distribuição distorcida para tornar mais prováveis valores de ninhos mais profundos.
Isso será revisto após uma primeira passagem no treinamento.
px.histogram(random_seq_parens, title="Frequency of balanced Parentheses").show()
Nosso conjunto de dados não pode ter apenas parênteses balanceados. Assim, podemos criar uma estratégia de geração de dados para derivar strings não balanceadas de nosso conjunto de dados balanceado.
def _flip_idx(idx): return (idx + 1) % 2 assert _flip_idx(0) == 1 assert _flip_idx(1) == 0
def make_parens_unbalanced(paren: str) -> str: """Take balanced-parentheses and randomly mutate it till it's unbalanced. Both the number of mutations and indices are chosen at random. """ paren_idx_dict = {'(': 0, ')': 1} paren_list = list(paren) num_flipped_positions = randint(1, len(paren)) while isValid(''.join(paren_list)): flip_points = sample(range(len(paren)), num_flipped_positions) for flip_idx in flip_points: idx_char = paren_idx_dict[paren_list[flip_idx]] flipped_idx = _flip_idx(idx_char) paren_list[flip_idx] = PARENS[flipped_idx] return ''.join(paren_list) assert not isValid(make_parens_unbalanced('((()))'))
Obtenha um conjunto de dados de parênteses desequilibrados
unbal_random_seq_parens = [make_parens_unbalanced(paren) for paren in random_seq_parens]
Agora que temos nossos conjuntos de dados, por diversão, vamos escrever nossa arquitetura Transformer do zero.
Primeiro algumas configurações
@dataclass class Config: context_len = 12 d_vocab: int = 5 d_out_vocab: int = 2 d_model: int = 56 d_head = 28 d_mlp = 56 * 4 causal_attention = False num_heads = 2 num_layers = 3 init_range: float = 1 PAD_TOKEN_IDX = 1
Em seguida, nosso tokenizer para análise de entradas:
class Tokenizer: def __init__(self, vocab: str, context_width: Int, enforce_context: bool=False): self.START_TOKEN, START_TOKEN_IDX = "<start>", 0 self.PAD_TOKEN, PAD_TOKEN_IDX = "<pad>", 1 self.END_TOKEN, END_TOKEN_IDX = "<end>", 2 util_tokens_t_to_i = {self.START_TOKEN: START_TOKEN_IDX, self.PAD_TOKEN: PAD_TOKEN_IDX, self.END_TOKEN: END_TOKEN_IDX} util_tokens_i_to_t = {START_TOKEN_IDX: self.START_TOKEN, PAD_TOKEN_IDX: self.PAD_TOKEN, END_TOKEN_IDX: self.END_TOKEN} self.enforce_context = enforce_context self.context_width = context_width self.vocab = vocab self.t_to_i = {**util_tokens_t_to_i, **{token: token_id + 3 for token_id, token in enumerate(self.vocab)}} self.i_to_t = {**util_tokens_i_to_t, **{token_id + 3: token for token_id, token in enumerate(self.vocab)}} @staticmethod def pad_sequence(sequence: str, end_token: str, pad_token: str, max_length: Int, enforce_context: bool) -> List[str]: if not enforce_context: # Truncate if sequence length is greater sequence = sequence[:max_length] else: assert len(sequence) <= max_length, f"Sequence length is greater than the max allowed data length: {max_length}" return list(sequence) + [end_token] + [pad_token] * (max_length - len(sequence)) def tokenize(self, data: Union[str, List[str]]) -> Int[Tensor, "batch seq"]: if isinstance(data, str): data = [data] def _list_tokens_to_id(tokens: List[str]) -> List[Int]: return [self.t_to_i[token] for token in tokens] # to leave room for start and end tokens max_seq_len = self.context_width - 2 data_as_tokens = [ _list_tokens_to_id([ self.START_TOKEN, *self.pad_sequence(seq, self.END_TOKEN, self.PAD_TOKEN, max_seq_len, self.enforce_context), ]) for seq in data ] return t.tensor(data_as_tokens)
(Des)Incorporações
class EmbedLayer(t.nn.Module): def __init__(self, cfg: Config): super().__init__() self.W_E = t.nn.Parameter(t.empty(cfg.d_vocab, cfg.d_model)) t.nn.init.normal_(self.W_E, mean=0.0, std=cfg.init_range) def forward(self, x: Int[Tensor, "batch seq"]) -> Int[Tensor, "batch seq d_model"]: return self.W_E[x] class UnEmbedLayer(t.nn.Module): def __init__(self, cfg: Config): super().__init__() self.W_U = t.nn.Parameter(t.empty(cfg.d_model, cfg.d_out_vocab)) t.nn.init.normal_(self.W_U, mean=0.0, std=cfg.init_range) def forward(self, x: Int[Tensor, "batch seq d_model"]) -> Int[Tensor, "batch seq d_out_vocab"]: return x @ self.W_U class PositionalEmbedding(t.nn.Module): def __init__(self, cfg: Config): super().__init__() denom = t.exp( t.arange(0, cfg.d_model, 2) * -(math.log(10000.0) / cfg.d_model) ) pos = t.arange(0, cfg.context_len).unsqueeze(1) param = pos * denom P_E = t.zeros(cfg.context_len, cfg.d_model) P_E[:, 0::2] = t.sin(param) P_E[:, 1::2] = t.cos(param) P_E = P_E.unsqueeze(0) self.register_buffer("P_E", P_E) def forward(self, x): _batch, seq_len, d_model = x.shape x = x + self.P_E[..., :seq_len, :d_model].requires_grad_(False) return x
Norma de Camada Prática
class LayerNorm(t.nn.Module): def __init__(self, cfg): super().__init__() self.scale = t.nn.Parameter(t.ones(cfg.d_model)) self.bias = t.nn.Parameter(t.zeros(cfg.d_model)) def forward(self, x): mean = t.mean(x, dim=2, keepdim=True) var = t.var(x, dim=2, keepdim=True, unbiased=False) y = (x - mean) / (var + 0.00001).sqrt() return (y * self.scale) + self.bias
E finalmente Atenção!
class AttentionLayer(t.nn.Module): def __init__(self, cfg): super().__init__() self.register_buffer("IGNORE", t.tensor(-1e5, dtype=t.float32)) self.cfg = cfg self.W_Q = t.nn.Parameter(t.empty(cfg.num_heads, cfg.d_model, cfg.d_head)) self.W_K = t.nn.Parameter(t.empty(cfg.num_heads, cfg.d_model, cfg.d_head)) self.W_V = t.nn.Parameter(t.empty(cfg.num_heads, cfg.d_model, cfg.d_head)) self.W_O = t.nn.Parameter(t.empty(cfg.num_heads, cfg.d_head, cfg.d_model)) self.b_Q = t.nn.Parameter(t.zeros(cfg.num_heads, cfg.d_head)) self.b_K = t.nn.Parameter(t.zeros(cfg.num_heads, cfg.d_head)) self.b_V = t.nn.Parameter(t.zeros(cfg.num_heads, cfg.d_head)) self.b_O = t.nn.Parameter(t.zeros(cfg.d_model)) t.nn.init.normal_(self.W_Q, mean=0.0, std=cfg.init_range) t.nn.init.normal_(self.W_K, mean=0.0, std=cfg.init_range) t.nn.init.normal_(self.W_V, mean=0.0, std=cfg.init_range) t.nn.init.normal_(self.W_O, mean=0.0, std=cfg.init_range) def forward(self, params): #TODO: revisit implementing pad_mask with hooks x, pad_mask = params Q = einops.einsum(x, self.W_Q, 'bs dm, h dm dh -> bsh dh') + self.b_Q K = einops.einsum(x, self.W_K, 'bs dm, h dm dh -> bsh dh') + self.b_K V = einops.einsum(x, self.W_V, 'bs dm, h dm dh -> bsh dh') + self.b_V attention_scores = einops.einsum(Q, K, 'b s_q h dh, b s_k h dh -> bh s_q s_k') scaled_attention_scores = attention_scores / (self.cfg.d_head ** 0.5) if self.cfg.causal_attention: scaled_attention_scores = self.apply_causal_mask(scaled_attention_scores) scaled_attention_scores = self.apply_padding_mask(scaled_attention_scores, pad_mask) attention_patterns = t.nn.Softmax(dim=-1)(scaled_attention_scores) post_attention_values = einops.einsum( attention_patterns, V, 'bh s_q s_k, b s_k h dh -> b s_q h dh' ) out = einops.einsum( post_attention_values, self.W_O, 'b s_q h dh, h dh dm -> b s_q dm' ) + self.b_O return out def apply_causal_mask(self, attention_scores): b, h, s_q, s_k = attention_scores.shape mask = t.tril(t.ones(s_q,s_k)).bool() return t.where(mask, attention_scores, self.IGNORE) def apply_padding_mask(self, attention_scores, pad_mask): return t.where(pad_mask, attention_scores, self.IGNORE)
Camadas MLP
class LinearLayer(t.nn.Module): def __init__(self, in_dim, out_dim, include_bias=True): super().__init__() self.include_bias = include_bias self.W = t.nn.Parameter(t.empty(in_dim, out_dim)) t.nn.init.normal_(self.W, mean=0.0, std=cfg.init_range) self.b = None if include_bias: self.b = t.zeros(out_dim) def forward(self, x: Int[Tensor, "batch seq in_dim"]) -> Int[Tensor, "batch seq out_dim"]: out = x @ self.W if self.include_bias: out = out + self.b return out class MLP(t.nn.Module): def __init__(self, cfg): super().__init__() self.in_layer = LinearLayer(cfg.d_model, cfg.d_mlp) self.out_layer = LinearLayer(cfg.d_mlp, cfg.d_model) self.non_linearity = t.nn.ReLU() def forward(self, x): post_W_in = self.in_layer(x) post_non_lin = self.non_linearity(post_W_in) return self.out_layer(post_non_lin)
Juntando tudo em um transformador
class TransformerBlock(t.nn.Module): def __init__(self, cfg): super().__init__() self.ln1 = LayerNorm(cfg) self.attention = AttentionLayer(cfg) self.ln2 = LayerNorm(cfg) self.mlp = MLP(cfg) def forward(self, params): x, pad_mask = params resid_mid = self.attention((self.ln1(x), pad_mask)) + x resid_post = self.mlp(self.ln2(resid_mid)) + resid_mid return resid_post, pad_mask
class Transformer(t.nn.Module): def __init__(self, cfg: Config): super().__init__() self.cfg = cfg self.embed = EmbedLayer(cfg) self.pos_embed = PositionalEmbedding(cfg) self.final_ln = LayerNorm(cfg) self.unembed = UnEmbedLayer(cfg) self.blocks = t.nn.Sequential(*([TransformerBlock(cfg)] * cfg.num_layers)) def forward(self, x): #TODO: revisit implementing pad_mask with hooks pad_mask = self.get_pad_mask(x) res_post_pos_embed = self.pos_embed(self.embed(x)) post_blocks, _ = self.blocks((res_post_pos_embed, pad_mask)) logits = self.unembed(self.final_ln(post_blocks)) return logits def get_pad_mask(self, x): batch, seq = x.shape return einops.repeat(x != self.cfg.PAD_TOKEN_IDX, 'batch seq -> batch 1 seq_q seq', seq_q=seq)
Utilitários de treinamento
def cross_entropy_loss(output, targets): log_probs = output.log_softmax(dim=-1) predictions = log_probs[:, 0] batch, out_dim = predictions.shape true_output = predictions[range(batch), targets] return -true_output.sum() / batch def test(model, data, loss_func): inputs, targets = data with t.no_grad(): output = model(inputs) loss = loss_func(output, targets) return loss def train(model, data, optimizer, loss_func): inputs, targets = data optimizer.zero_grad() output = model(inputs) loss = loss_func(output, targets) loss.backward() optimizer.step() return loss
Configuração de treinamento
cfg = Config() tokenizer = Tokenizer('()', 12, True) inputs = tokenizer.tokenize([*unbal_random_seq_parens, *random_seq_parens]) targets = t.tensor([*([0] * len(unbal_random_seq_parens)), *([1] * len(random_seq_parens))]) rand_indices = t.randperm(targets.shape[0]) rand_inputs = inputs[rand_indices, :] rand_targets = targets[rand_indices] model = Transformer(cfg) adamW = t.optim.AdamW(model.parameters(), lr=0.01)
Treinamento real
batch_size = 10000 train_size = int(0.7 * batch_size) epochs = 15 for epoch in range(epochs): for batch_id in range(0, rand_inputs.shape[0], batch_size): rand_inputs_batch, rand_targets_batch = rand_inputs[batch_id : batch_id + batch_size], rand_targets[batch_id : batch_id + batch_size] train_input, train_target = rand_inputs_batch[:train_size, :], rand_targets_batch[:train_size] test_input, test_target = rand_inputs_batch[train_size:, :], rand_targets_batch[train_size:] train(model, (train_input, train_target), adamW, cross_entropy_loss) test_loss = test(model, (test_input, test_target), cross_entropy_loss) print(f'Loss: {test_loss} on epoch: {epoch}/{epochs}')
Na Parte 3, investigaremos os componentes internos desta rede treinada. Faremos isso observando os padrões de atenção e aplicando algumas das ferramentas de diagnóstico da interpretabilidade mecanicista, como patch de ativação, para construir um modelo mecanicista de compreensão de como a rede resolveu essa tarefa.
Obrigado por ler até aqui e nos vemos em breve na Parte 3!