new experiments. code refactoring.
parent
d9aa740746
commit
a042b64d7e
@ -0,0 +1,227 @@
|
|||||||
|
import torch
|
||||||
|
import torch.nn as nn
|
||||||
|
from torch.nn import functional as F
|
||||||
|
from einops import rearrange
|
||||||
|
import optical_matrix_multiplication as omm
|
||||||
|
from optical_matrix_multiplication import propagator
|
||||||
|
import numpy as np
|
||||||
|
from torch.utils.tensorboard import SummaryWriter
|
||||||
|
from datetime import datetime
|
||||||
|
from pathlib import Path
|
||||||
|
import sys
|
||||||
|
torch.manual_seed(1337)
|
||||||
|
|
||||||
|
#################################### Model #########################################
|
||||||
|
|
||||||
|
# def norm(matrix: torch.Tensor, max_val: float = 1) -> torch.Tensor:
|
||||||
|
# return matrix / (max_val + 1e-10)
|
||||||
|
|
||||||
|
def new_formula(sim, tensor_1, tensor_2):
|
||||||
|
tensor_1 = tensor_1[None,:,:,:] if len(tensor_1.shape) < 4 else tensor_1
|
||||||
|
tensor_2 = tensor_2[None,:,:,:] if len(tensor_2.shape) < 4 else tensor_2
|
||||||
|
device = tensor_1.device
|
||||||
|
|
||||||
|
A_pos = torch.clamp(tensor_1, min=0) # A⁺ = max(A, 0)
|
||||||
|
A_neg = torch.clamp(-tensor_1, min=0) # A⁻ = max(-A, 0)
|
||||||
|
B_pos = torch.clamp(tensor_2, min=0) # B⁺ = max(B, 0)
|
||||||
|
B_neg = torch.clamp(-tensor_2, min=0) # B⁻ = max(-B, 0)
|
||||||
|
|
||||||
|
max_A_pos = torch.max(A_pos) # Может быть 0, если нет положительных значений
|
||||||
|
max_A_neg = torch.max(A_neg) # Может быть 0, если нет отрицательных значений
|
||||||
|
max_B_pos = torch.max(B_pos)
|
||||||
|
max_B_neg = torch.max(B_neg)
|
||||||
|
|
||||||
|
zero_template = torch.zeros_like(
|
||||||
|
torch.empty(tensor_1.shape[0],tensor_1.shape[1], tensor_1.shape[2], tensor_2.shape[3]))
|
||||||
|
|
||||||
|
if max_A_pos > 0 and max_B_pos > 0:
|
||||||
|
t1 = sim(A_pos / max_A_pos, B_pos / max_B_pos) * max_A_pos * max_B_pos
|
||||||
|
else:
|
||||||
|
t1 = zero_template.clone().to(device)
|
||||||
|
|
||||||
|
if max_A_pos > 0 and max_B_neg > 0:
|
||||||
|
t2 = sim(A_pos / max_A_pos, B_neg / max_B_neg) * max_A_pos * max_B_neg
|
||||||
|
else:
|
||||||
|
t2 = zero_template.clone().to(device)
|
||||||
|
|
||||||
|
if max_A_neg > 0 and max_B_pos > 0:
|
||||||
|
t3 = sim(A_neg / max_A_neg, B_pos / max_B_pos) * max_A_neg * max_B_pos
|
||||||
|
else:
|
||||||
|
t3 = zero_template.clone().to(device)
|
||||||
|
|
||||||
|
if max_A_neg > 0 and max_B_neg > 0:
|
||||||
|
t4 = sim(A_neg / max_A_neg, B_neg / max_B_neg) * max_A_neg * max_B_neg
|
||||||
|
else:
|
||||||
|
t4 = zero_template.clone().to(device)
|
||||||
|
|
||||||
|
return (t1 - t2 - t3 + t4)[0,:,:,:]
|
||||||
|
|
||||||
|
# RoFormer: Enhanced Transformer with Rotary Position Embedding https://arxiv.org/abs/2104.09864
|
||||||
|
class RoPE(nn.Module):
|
||||||
|
def __init__(self, dim, max_seq_len=512):
|
||||||
|
super().__init__()
|
||||||
|
inv_freq = 1.0 / (10000 ** (torch.arange(0, dim, 2).float() / dim))
|
||||||
|
t = torch.arange(max_seq_len).type_as(inv_freq)
|
||||||
|
freqs = torch.einsum('i,j->ij', t, inv_freq)
|
||||||
|
self.register_buffer('emb', torch.cat([freqs, freqs], dim=-1))
|
||||||
|
|
||||||
|
def rotate_half(self, x):
|
||||||
|
x1, x2 = x.chunk(2, dim=-1)
|
||||||
|
return torch.cat((-x2, x1), dim=-1)
|
||||||
|
|
||||||
|
def forward(self, x, offset=0):
|
||||||
|
seq_len = x.size(1)
|
||||||
|
emb = self.emb[offset:offset+seq_len].view(1, seq_len, -1)
|
||||||
|
cos = emb.cos()
|
||||||
|
sin = emb.sin()
|
||||||
|
return (x * cos) + (self.rotate_half(x) * sin)
|
||||||
|
|
||||||
|
# Transformers without Normalization https://jiachenzhu.github.io/DyT/
|
||||||
|
class DyT(nn.Module):
|
||||||
|
def __init__(self, num_features, alpha_init_value=0.5):
|
||||||
|
super().__init__()
|
||||||
|
self.alpha = nn.Parameter(torch.ones(1) * alpha_init_value)
|
||||||
|
self.weight = nn.Parameter(torch.ones(num_features))
|
||||||
|
self.bias = nn.Parameter(torch.zeros(num_features))
|
||||||
|
|
||||||
|
def forward(self, x):
|
||||||
|
x = torch.tanh(self.alpha * x)
|
||||||
|
return x * self.weight + self.bias
|
||||||
|
|
||||||
|
# Attention Is All You Need https://arxiv.org/pdf/1706.03762v7
|
||||||
|
# NeoBERT: A Next-Generation BERT https://arxiv.org/html/2502.19587v1
|
||||||
|
class TransformerLayer(nn.Module):
|
||||||
|
def __init__(self, h_dim, sim_scores, sim_output, num_heads=4, dropout_rate = 0.1, max_seq_len = 128):
|
||||||
|
super().__init__()
|
||||||
|
self.__dict__.update({k:v for k,v in locals().items() if k != 'self'})
|
||||||
|
self.q_proj = nn.Linear(h_dim, h_dim)
|
||||||
|
self.k_proj = nn.Linear(h_dim, h_dim)
|
||||||
|
self.v_proj = nn.Linear(h_dim, h_dim)
|
||||||
|
self.o_proj = nn.Linear(h_dim, h_dim)
|
||||||
|
self.ff1 = nn.Linear(h_dim, 4*h_dim)
|
||||||
|
self.ff2 = nn.Linear(4*h_dim, h_dim)
|
||||||
|
self.ln1 = DyT(h_dim)
|
||||||
|
self.ln2 = DyT(h_dim)
|
||||||
|
self.rope = RoPE(dim=h_dim//self.num_heads, max_seq_len=max_seq_len)
|
||||||
|
self.k1 = nn.Parameter(torch.randn(1))
|
||||||
|
self.k2 = nn.Parameter(torch.randn(1))
|
||||||
|
|
||||||
|
def split_to_heads(self, x, B, T, H):
|
||||||
|
if self.num_heads <= 1: return x
|
||||||
|
return rearrange(x, 'b t (n h) -> (b n) t h', b=B, t=T, n=self.num_heads)
|
||||||
|
|
||||||
|
def gather_heads(self, x, B, T, H):
|
||||||
|
if self.num_heads <= 1: return x
|
||||||
|
return rearrange(x, '(b n) t h -> b t (n h)', b=B, t=T, n=self.num_heads)
|
||||||
|
|
||||||
|
def attention(self, x):
|
||||||
|
q = self.rope(self.split_to_heads(self.q_proj(x), *x.shape))
|
||||||
|
k = self.rope(self.split_to_heads(self.k_proj(x), *x.shape))
|
||||||
|
v = self.split_to_heads(self.v_proj(x), *x.shape)
|
||||||
|
scores = self.k1 * new_formula(self.sim_scores, q, k.transpose(1, 2)) * (self.h_dim ** -0.5)
|
||||||
|
tril = torch.tril(torch.ones(x.shape[1],x.shape[1])).to(self.q_proj.bias.device)
|
||||||
|
scores = scores.masked_fill(tril == 0, float('-inf'))
|
||||||
|
attention = nn.functional.softmax(scores, dim=2)
|
||||||
|
output = self.k2 * new_formula(self.sim_output, attention, v)
|
||||||
|
return self.o_proj(self.gather_heads(output, *x.shape))
|
||||||
|
|
||||||
|
def forward(self, x):
|
||||||
|
x = x + F.dropout1d(self.attention(self.ln1(x)), p=self.dropout_rate)
|
||||||
|
x = x + F.dropout1d(self.ff2(F.gelu(self.ff1(self.ln2(x)))), p=self.dropout_rate)
|
||||||
|
return x
|
||||||
|
|
||||||
|
class OpticGPT2NewFormula(nn.Module):
|
||||||
|
def __init__(self, vocab_size, layers_num=1, h_dim=64, max_seq_len=64, num_heads=1, dropout_rate = 0.1,
|
||||||
|
pixel_size = 3.6e-6):
|
||||||
|
super().__init__()
|
||||||
|
self.__dict__.update({k:v for k,v in locals().items() if k != 'self'})
|
||||||
|
if max_seq_len != 512:
|
||||||
|
self.sim_scores = omm.OpticalMul(
|
||||||
|
omm.Config(right_matrix_count_columns = max_seq_len,
|
||||||
|
right_matrix_count_rows = h_dim // num_heads,
|
||||||
|
right_matrix_width = pixel_size * max_seq_len,
|
||||||
|
right_matrix_height = pixel_size * (h_dim // num_heads),
|
||||||
|
min_height_gap = pixel_size,
|
||||||
|
right_matrix_split_x = 2,
|
||||||
|
right_matrix_split_y = 2,
|
||||||
|
left_matrix_split_x = 2,
|
||||||
|
left_matrix_split_y = 2,
|
||||||
|
result_matrix_split = 2,
|
||||||
|
distance = 0.01,
|
||||||
|
trainable_cylind_lens=False)
|
||||||
|
)
|
||||||
|
self.sim_output = omm.OpticalMul(
|
||||||
|
omm.Config(right_matrix_count_columns = h_dim // num_heads,
|
||||||
|
right_matrix_count_rows = max_seq_len,
|
||||||
|
right_matrix_width = pixel_size * (h_dim // num_heads),
|
||||||
|
right_matrix_height = pixel_size * max_seq_len,
|
||||||
|
min_height_gap = pixel_size,
|
||||||
|
right_matrix_split_x = 2,
|
||||||
|
right_matrix_split_y = 2,
|
||||||
|
left_matrix_split_x = 2,
|
||||||
|
left_matrix_split_y = 2,
|
||||||
|
result_matrix_split = 2,
|
||||||
|
distance = 0.01,
|
||||||
|
trainable_cylind_lens=False)
|
||||||
|
)
|
||||||
|
if max_seq_len == 512:
|
||||||
|
self.sim_scores = omm.OpticalMul(
|
||||||
|
omm.Config(right_matrix_count_columns = max_seq_len,
|
||||||
|
right_matrix_count_rows = h_dim // num_heads,
|
||||||
|
right_matrix_width = pixel_size * max_seq_len,
|
||||||
|
right_matrix_height = pixel_size * (h_dim // num_heads),
|
||||||
|
min_height_gap = pixel_size,
|
||||||
|
right_matrix_split_x = 2,
|
||||||
|
right_matrix_split_y = 2,
|
||||||
|
left_matrix_split_x = 2,
|
||||||
|
left_matrix_split_y = 2,
|
||||||
|
result_matrix_split = 2,
|
||||||
|
distance = 0.15,
|
||||||
|
lens_size = 8192 * 2,
|
||||||
|
trainable_cylind_lens=False)
|
||||||
|
)
|
||||||
|
self.sim_output = omm.OpticalMul(
|
||||||
|
omm.Config(right_matrix_count_columns = h_dim // num_heads,
|
||||||
|
right_matrix_count_rows = max_seq_len,
|
||||||
|
right_matrix_width = pixel_size * (h_dim // num_heads),
|
||||||
|
right_matrix_height = pixel_size * max_seq_len,
|
||||||
|
min_height_gap = pixel_size,
|
||||||
|
right_matrix_split_x = 2,
|
||||||
|
right_matrix_split_y = 2,
|
||||||
|
left_matrix_split_x = 2,
|
||||||
|
left_matrix_split_y = 2,
|
||||||
|
result_matrix_split = 2,
|
||||||
|
distance = 0.15,
|
||||||
|
lens_size = 8192 * 2,
|
||||||
|
trainable_cylind_lens=False)
|
||||||
|
)
|
||||||
|
self.sim_scores = omm.DataParallel(self.sim_scores)
|
||||||
|
self.sim_output = omm.DataParallel(self.sim_output)
|
||||||
|
|
||||||
|
self.layers = nn.ModuleList([
|
||||||
|
TransformerLayer(h_dim=self.h_dim, sim_scores=self.sim_scores, sim_output=self.sim_output, num_heads=self.num_heads,
|
||||||
|
dropout_rate=self.dropout_rate, max_seq_len=self.max_seq_len)
|
||||||
|
for _ in range(layers_num)])
|
||||||
|
self.tok_embeds = nn.Embedding(vocab_size, h_dim)
|
||||||
|
self.pos_embeds = nn.Parameter(torch.randn(1, self.max_seq_len, h_dim))
|
||||||
|
self.lm_head = nn.Linear(h_dim, vocab_size)
|
||||||
|
|
||||||
|
def forward(self, x, targets=None):
|
||||||
|
x = self.tok_embeds(x) + self.pos_embeds[:, :x.shape[1], :]
|
||||||
|
for l in self.layers:
|
||||||
|
x = l(x)
|
||||||
|
logits = self.lm_head(x) # B,T,C
|
||||||
|
loss = F.cross_entropy(rearrange(logits, "b t c -> b c t"), targets) if not targets is None else None
|
||||||
|
return logits, loss
|
||||||
|
|
||||||
|
# what is the purpose? autoregressive inference!
|
||||||
|
def generate(self, start_idx, max_new_tokens):
|
||||||
|
idx = start_idx
|
||||||
|
for i in range(max_new_tokens):
|
||||||
|
idx_cond = idx[:,-self.max_seq_len:]
|
||||||
|
logits, loss = self(idx_cond)
|
||||||
|
logits = logits[:,-1,:] # B, C
|
||||||
|
probs = F.softmax(logits, dim=-1)
|
||||||
|
idx_next = torch.multinomial(probs, num_samples=1).to(self.lm_head.bias.device)
|
||||||
|
idx = torch.cat([idx, idx_next], dim=1)
|
||||||
|
return idx
|
||||||
@ -0,0 +1,209 @@
|
|||||||
|
import torch
|
||||||
|
import torch.nn as nn
|
||||||
|
from torch.nn import functional as F
|
||||||
|
from einops import rearrange
|
||||||
|
import optical_matrix_multiplication as omm
|
||||||
|
from optical_matrix_multiplication import propagator
|
||||||
|
import numpy as np
|
||||||
|
from torch.utils.tensorboard import SummaryWriter
|
||||||
|
from datetime import datetime
|
||||||
|
from pathlib import Path
|
||||||
|
import sys
|
||||||
|
torch.manual_seed(1337)
|
||||||
|
|
||||||
|
#################################### Model #########################################
|
||||||
|
|
||||||
|
def norm(matrix: torch.Tensor, max_val: float = 1) -> torch.Tensor:
|
||||||
|
return matrix / (max_val + 1e-10)
|
||||||
|
|
||||||
|
def optics_matmul_shift(sim, tensor_1, tensor_2):
|
||||||
|
tensor_1 = tensor_1[None,:,:,:]
|
||||||
|
tensor_2 = tensor_2[None,:,:,:]
|
||||||
|
|
||||||
|
if torch.min(tensor_1) >= 0 and torch.min(tensor_2) >= 0:
|
||||||
|
max_abs = abs(max(torch.max(tensor_1), torch.max(tensor_2)))
|
||||||
|
a, b = norm(tensor_1, max_abs), norm(tensor_2, max_abs)
|
||||||
|
return sim(a, b)[0,:,:,:] * max_abs **2
|
||||||
|
|
||||||
|
min_abs = abs(min(torch.min(tensor_1), torch.min(tensor_2)))
|
||||||
|
max_abs = abs(max(torch.max(tensor_1), torch.max(tensor_2))) + min_abs
|
||||||
|
|
||||||
|
shift_a = min_abs * torch.ones(tensor_1.shape).to(tensor_1.device)
|
||||||
|
shift_b = min_abs * torch.ones(tensor_2.shape).to(tensor_1.device)
|
||||||
|
a_a_sh = tensor_1 + shift_a
|
||||||
|
b_b_sh = tensor_2 + shift_b
|
||||||
|
|
||||||
|
a_a_sh_norm, b_b_sh_norm = norm(a_a_sh, max_abs), norm(b_b_sh, max_abs)
|
||||||
|
shift_a_norm, shift_b_norm = norm(shift_a, max_abs), norm(shift_b, max_abs)
|
||||||
|
a_a_sh_b_b_sh = sim(a_a_sh_norm, b_b_sh_norm)
|
||||||
|
a_a_sh_b_sh = sim(a_a_sh_norm, shift_b_norm)
|
||||||
|
a_sh_b_b_sh = sim(shift_a_norm, b_b_sh_norm)
|
||||||
|
a_sh_b_sh = sim(shift_a_norm, shift_b_norm)
|
||||||
|
|
||||||
|
return (a_a_sh_b_b_sh - a_a_sh_b_sh - a_sh_b_b_sh + a_sh_b_sh)[0,:,:,:] * max_abs ** 2
|
||||||
|
|
||||||
|
# RoFormer: Enhanced Transformer with Rotary Position Embedding https://arxiv.org/abs/2104.09864
|
||||||
|
class RoPE(nn.Module):
|
||||||
|
def __init__(self, dim, max_seq_len=512):
|
||||||
|
super().__init__()
|
||||||
|
inv_freq = 1.0 / (10000 ** (torch.arange(0, dim, 2).float() / dim))
|
||||||
|
t = torch.arange(max_seq_len).type_as(inv_freq)
|
||||||
|
freqs = torch.einsum('i,j->ij', t, inv_freq)
|
||||||
|
self.register_buffer('emb', torch.cat([freqs, freqs], dim=-1))
|
||||||
|
|
||||||
|
def rotate_half(self, x):
|
||||||
|
x1, x2 = x.chunk(2, dim=-1)
|
||||||
|
return torch.cat((-x2, x1), dim=-1)
|
||||||
|
|
||||||
|
def forward(self, x, offset=0):
|
||||||
|
seq_len = x.size(1)
|
||||||
|
emb = self.emb[offset:offset+seq_len].view(1, seq_len, -1)
|
||||||
|
cos = emb.cos()
|
||||||
|
sin = emb.sin()
|
||||||
|
return (x * cos) + (self.rotate_half(x) * sin)
|
||||||
|
|
||||||
|
# Transformers without Normalization https://jiachenzhu.github.io/DyT/
|
||||||
|
class DyT(nn.Module):
|
||||||
|
def __init__(self, num_features, alpha_init_value=0.5):
|
||||||
|
super().__init__()
|
||||||
|
self.alpha = nn.Parameter(torch.ones(1) * alpha_init_value)
|
||||||
|
self.weight = nn.Parameter(torch.ones(num_features))
|
||||||
|
self.bias = nn.Parameter(torch.zeros(num_features))
|
||||||
|
|
||||||
|
def forward(self, x):
|
||||||
|
x = torch.tanh(self.alpha * x)
|
||||||
|
return x * self.weight + self.bias
|
||||||
|
|
||||||
|
# Attention Is All You Need https://arxiv.org/pdf/1706.03762v7
|
||||||
|
# NeoBERT: A Next-Generation BERT https://arxiv.org/html/2502.19587v1
|
||||||
|
class TransformerLayer(nn.Module):
|
||||||
|
def __init__(self, h_dim, sim_scores, sim_output, num_heads=4, dropout_rate = 0.1, max_seq_len = 128):
|
||||||
|
super().__init__()
|
||||||
|
self.__dict__.update({k:v for k,v in locals().items() if k != 'self'})
|
||||||
|
self.q_proj = nn.Linear(h_dim, h_dim)
|
||||||
|
self.k_proj = nn.Linear(h_dim, h_dim)
|
||||||
|
self.v_proj = nn.Linear(h_dim, h_dim)
|
||||||
|
self.o_proj = nn.Linear(h_dim, h_dim)
|
||||||
|
self.ff1 = nn.Linear(h_dim, 4*h_dim)
|
||||||
|
self.ff2 = nn.Linear(4*h_dim, h_dim)
|
||||||
|
self.ln1 = DyT(h_dim)
|
||||||
|
self.ln2 = DyT(h_dim)
|
||||||
|
self.rope = RoPE(dim=h_dim//self.num_heads, max_seq_len=max_seq_len)
|
||||||
|
|
||||||
|
def split_to_heads(self, x, B, T, H):
|
||||||
|
if self.num_heads <= 1: return x
|
||||||
|
return rearrange(x, 'b t (n h) -> (b n) t h', b=B, t=T, n=self.num_heads)
|
||||||
|
|
||||||
|
def gather_heads(self, x, B, T, H):
|
||||||
|
if self.num_heads <= 1: return x
|
||||||
|
return rearrange(x, '(b n) t h -> b t (n h)', b=B, t=T, n=self.num_heads)
|
||||||
|
|
||||||
|
def attention(self, x):
|
||||||
|
q = self.rope(self.split_to_heads(self.q_proj(x), *x.shape))
|
||||||
|
k = self.rope(self.split_to_heads(self.k_proj(x), *x.shape))
|
||||||
|
v = self.split_to_heads(self.v_proj(x), *x.shape)
|
||||||
|
scores = optics_matmul_shift(self.sim_scores, q, k.transpose(1, 2)) * (self.h_dim ** -0.5)
|
||||||
|
tril = torch.tril(torch.ones(x.shape[1],x.shape[1])).to(self.q_proj.bias.device)
|
||||||
|
scores = scores.masked_fill(tril == 0, float('-inf'))
|
||||||
|
attention = nn.functional.softmax(scores, dim=2)
|
||||||
|
output = optics_matmul_shift(self.sim_output, attention, v)
|
||||||
|
return self.o_proj(self.gather_heads(output, *x.shape))
|
||||||
|
|
||||||
|
def forward(self, x):
|
||||||
|
x = x + F.dropout1d(self.attention(self.ln1(x)), p=self.dropout_rate)
|
||||||
|
x = x + F.dropout1d(self.ff2(F.gelu(self.ff1(self.ln2(x)))), p=self.dropout_rate)
|
||||||
|
return x
|
||||||
|
|
||||||
|
class OpticGPT2NOKoef(nn.Module):
|
||||||
|
def __init__(self, vocab_size, layers_num=1, h_dim=64, max_seq_len=64, num_heads=1, dropout_rate = 0.1,
|
||||||
|
pixel_size = 3.6e-6):
|
||||||
|
super().__init__()
|
||||||
|
self.__dict__.update({k:v for k,v in locals().items() if k != 'self'})
|
||||||
|
if max_seq_len < 512:
|
||||||
|
self.sim_scores = omm.OpticalMul(
|
||||||
|
omm.Config(right_matrix_count_columns = max_seq_len,
|
||||||
|
right_matrix_count_rows = h_dim // num_heads,
|
||||||
|
right_matrix_width = pixel_size * max_seq_len,
|
||||||
|
right_matrix_height = pixel_size * (h_dim // num_heads),
|
||||||
|
min_height_gap = pixel_size,
|
||||||
|
right_matrix_split_x = 2,
|
||||||
|
right_matrix_split_y = 2,
|
||||||
|
left_matrix_split_x = 2,
|
||||||
|
left_matrix_split_y = 2,
|
||||||
|
result_matrix_split = 2,
|
||||||
|
distance = 0.01,
|
||||||
|
trainable_cylind_lens=False)
|
||||||
|
)
|
||||||
|
self.sim_output = omm.OpticalMul(
|
||||||
|
omm.Config(right_matrix_count_columns = h_dim // num_heads,
|
||||||
|
right_matrix_count_rows = max_seq_len,
|
||||||
|
right_matrix_width = pixel_size * (h_dim // num_heads),
|
||||||
|
right_matrix_height = pixel_size * max_seq_len,
|
||||||
|
min_height_gap = pixel_size,
|
||||||
|
right_matrix_split_x = 2,
|
||||||
|
right_matrix_split_y = 2,
|
||||||
|
left_matrix_split_x = 2,
|
||||||
|
left_matrix_split_y = 2,
|
||||||
|
result_matrix_split = 2,
|
||||||
|
distance = 0.01,
|
||||||
|
trainable_cylind_lens=False)
|
||||||
|
)
|
||||||
|
if max_seq_len >= 512:
|
||||||
|
self.sim_scores = omm.OpticalMul(
|
||||||
|
omm.Config(right_matrix_count_columns = max_seq_len,
|
||||||
|
right_matrix_count_rows = h_dim // num_heads,
|
||||||
|
right_matrix_width = pixel_size * max_seq_len,
|
||||||
|
right_matrix_height = pixel_size * (h_dim // num_heads),
|
||||||
|
min_height_gap = pixel_size,
|
||||||
|
right_matrix_split_x = 2,
|
||||||
|
right_matrix_split_y = 2,
|
||||||
|
left_matrix_split_x = 2,
|
||||||
|
left_matrix_split_y = 2,
|
||||||
|
result_matrix_split = 2,
|
||||||
|
distance = 0.15,
|
||||||
|
lens_size = 8192 * 2,
|
||||||
|
trainable_cylind_lens=False)
|
||||||
|
)
|
||||||
|
self.sim_output = omm.OpticalMul(
|
||||||
|
omm.Config(right_matrix_count_columns = h_dim // num_heads,
|
||||||
|
right_matrix_count_rows = max_seq_len,
|
||||||
|
right_matrix_width = pixel_size * (h_dim // num_heads),
|
||||||
|
right_matrix_height = pixel_size * max_seq_len,
|
||||||
|
min_height_gap = pixel_size,
|
||||||
|
right_matrix_split_x = 2,
|
||||||
|
right_matrix_split_y = 2,
|
||||||
|
left_matrix_split_x = 2,
|
||||||
|
left_matrix_split_y = 2,
|
||||||
|
result_matrix_split = 2,
|
||||||
|
distance = 0.15,
|
||||||
|
lens_size = 8192 * 2,
|
||||||
|
trainable_cylind_lens=False)
|
||||||
|
)
|
||||||
|
|
||||||
|
self.layers = nn.ModuleList([
|
||||||
|
TransformerLayer(h_dim=self.h_dim, sim_scores=self.sim_scores, sim_output=self.sim_output, num_heads=self.num_heads,
|
||||||
|
dropout_rate=self.dropout_rate, max_seq_len=self.max_seq_len)
|
||||||
|
for _ in range(layers_num)])
|
||||||
|
self.tok_embeds = nn.Embedding(vocab_size, h_dim)
|
||||||
|
self.pos_embeds = nn.Parameter(torch.randn(1, self.max_seq_len, h_dim))
|
||||||
|
self.lm_head = nn.Linear(h_dim, vocab_size)
|
||||||
|
|
||||||
|
def forward(self, x, targets=None):
|
||||||
|
x = self.tok_embeds(x) + self.pos_embeds[:, :x.shape[1], :]
|
||||||
|
for l in self.layers:
|
||||||
|
x = l(x)
|
||||||
|
logits = self.lm_head(x) # B,T,C
|
||||||
|
loss = F.cross_entropy(rearrange(logits, "b t c -> b c t"), targets) if not targets is None else None
|
||||||
|
return logits, loss
|
||||||
|
|
||||||
|
# what is the purpose? autoregressive inference!
|
||||||
|
def generate(self, start_idx, max_new_tokens):
|
||||||
|
idx = start_idx
|
||||||
|
for i in range(max_new_tokens):
|
||||||
|
idx_cond = idx[:,-self.max_seq_len:]
|
||||||
|
logits, loss = self(idx_cond)
|
||||||
|
logits = logits[:,-1,:] # B, C
|
||||||
|
probs = F.softmax(logits, dim=-1)
|
||||||
|
idx_next = torch.multinomial(probs, num_samples=1).to(self.lm_head.bias.device)
|
||||||
|
idx = torch.cat([idx, idx_next], dim=1)
|
||||||
|
return idx
|
||||||
@ -0,0 +1,220 @@
|
|||||||
|
import torch
|
||||||
|
import torch.nn as nn
|
||||||
|
from torch.nn import functional as F
|
||||||
|
from einops import rearrange
|
||||||
|
import optical_matrix_multiplication as omm
|
||||||
|
from optical_matrix_multiplication import propagator
|
||||||
|
import numpy as np
|
||||||
|
from torch.utils.tensorboard import SummaryWriter
|
||||||
|
from datetime import datetime
|
||||||
|
from pathlib import Path
|
||||||
|
import sys
|
||||||
|
torch.manual_seed(1337)
|
||||||
|
|
||||||
|
#################################### Model #########################################
|
||||||
|
|
||||||
|
def new_formula(sim, tensor_1, tensor_2):
|
||||||
|
tensor_1 = tensor_1[None,:,:,:] if len(tensor_1.shape) < 4 else tensor_1
|
||||||
|
tensor_2 = tensor_2[None,:,:,:] if len(tensor_2.shape) < 4 else tensor_2
|
||||||
|
device = tensor_1.device
|
||||||
|
|
||||||
|
A_pos = torch.clamp(tensor_1, min=0) # A⁺ = max(A, 0)
|
||||||
|
A_neg = torch.clamp(-tensor_1, min=0) # A⁻ = max(-A, 0)
|
||||||
|
B_pos = torch.clamp(tensor_2, min=0) # B⁺ = max(B, 0)
|
||||||
|
B_neg = torch.clamp(-tensor_2, min=0) # B⁻ = max(-B, 0)
|
||||||
|
|
||||||
|
max_A_pos = torch.max(A_pos) # Может быть 0, если нет положительных значений
|
||||||
|
max_A_neg = torch.max(A_neg) # Может быть 0, если нет отрицательных значений
|
||||||
|
max_B_pos = torch.max(B_pos)
|
||||||
|
max_B_neg = torch.max(B_neg)
|
||||||
|
|
||||||
|
zero_template = torch.zeros_like(
|
||||||
|
torch.empty(tensor_1.shape[0],tensor_1.shape[1], tensor_1.shape[2], tensor_2.shape[3]))
|
||||||
|
|
||||||
|
if max_A_pos > 0 and max_B_pos > 0:
|
||||||
|
t1 = sim(A_pos / max_A_pos, B_pos / max_B_pos) * max_A_pos * max_B_pos
|
||||||
|
else:
|
||||||
|
t1 = zero_template.clone().to(device)
|
||||||
|
|
||||||
|
if max_A_pos > 0 and max_B_neg > 0:
|
||||||
|
t2 = sim(A_pos / max_A_pos, B_neg / max_B_neg) * max_A_pos * max_B_neg
|
||||||
|
else:
|
||||||
|
t2 = zero_template.clone().to(device)
|
||||||
|
|
||||||
|
if max_A_neg > 0 and max_B_pos > 0:
|
||||||
|
t3 = sim(A_neg / max_A_neg, B_pos / max_B_pos) * max_A_neg * max_B_pos
|
||||||
|
else:
|
||||||
|
t3 = zero_template.clone().to(device)
|
||||||
|
|
||||||
|
if max_A_neg > 0 and max_B_neg > 0:
|
||||||
|
t4 = sim(A_neg / max_A_neg, B_neg / max_B_neg) * max_A_neg * max_B_neg
|
||||||
|
else:
|
||||||
|
t4 = zero_template.clone().to(device)
|
||||||
|
|
||||||
|
return (t1 - t2 - t3 + t4)[0,:,:,:]
|
||||||
|
|
||||||
|
# RoFormer: Enhanced Transformer with Rotary Position Embedding https://arxiv.org/abs/2104.09864
|
||||||
|
class RoPE(nn.Module):
|
||||||
|
def __init__(self, dim, max_seq_len=512):
|
||||||
|
super().__init__()
|
||||||
|
inv_freq = 1.0 / (10000 ** (torch.arange(0, dim, 2).float() / dim))
|
||||||
|
t = torch.arange(max_seq_len).type_as(inv_freq)
|
||||||
|
freqs = torch.einsum('i,j->ij', t, inv_freq)
|
||||||
|
self.register_buffer('emb', torch.cat([freqs, freqs], dim=-1))
|
||||||
|
|
||||||
|
def rotate_half(self, x):
|
||||||
|
x1, x2 = x.chunk(2, dim=-1)
|
||||||
|
return torch.cat((-x2, x1), dim=-1)
|
||||||
|
|
||||||
|
def forward(self, x, offset=0):
|
||||||
|
seq_len = x.size(1)
|
||||||
|
emb = self.emb[offset:offset+seq_len].view(1, seq_len, -1)
|
||||||
|
cos = emb.cos()
|
||||||
|
sin = emb.sin()
|
||||||
|
return (x * cos) + (self.rotate_half(x) * sin)
|
||||||
|
|
||||||
|
# Transformers without Normalization https://jiachenzhu.github.io/DyT/
|
||||||
|
class DyT(nn.Module):
|
||||||
|
def __init__(self, num_features, alpha_init_value=0.5):
|
||||||
|
super().__init__()
|
||||||
|
self.alpha = nn.Parameter(torch.ones(1) * alpha_init_value)
|
||||||
|
self.weight = nn.Parameter(torch.ones(num_features))
|
||||||
|
self.bias = nn.Parameter(torch.zeros(num_features))
|
||||||
|
|
||||||
|
def forward(self, x):
|
||||||
|
x = torch.tanh(self.alpha * x)
|
||||||
|
return x * self.weight + self.bias
|
||||||
|
|
||||||
|
# Attention Is All You Need https://arxiv.org/pdf/1706.03762v7
|
||||||
|
# NeoBERT: A Next-Generation BERT https://arxiv.org/html/2502.19587v1
|
||||||
|
class TransformerLayer(nn.Module):
|
||||||
|
def __init__(self, h_dim, sim_scores, sim_output, num_heads=4, dropout_rate = 0.1, max_seq_len = 128):
|
||||||
|
super().__init__()
|
||||||
|
self.__dict__.update({k:v for k,v in locals().items() if k != 'self'})
|
||||||
|
self.q_proj = nn.Linear(h_dim, h_dim)
|
||||||
|
self.k_proj = nn.Linear(h_dim, h_dim)
|
||||||
|
self.v_proj = nn.Linear(h_dim, h_dim)
|
||||||
|
self.o_proj = nn.Linear(h_dim, h_dim)
|
||||||
|
self.ff1 = nn.Linear(h_dim, 4*h_dim)
|
||||||
|
self.ff2 = nn.Linear(4*h_dim, h_dim)
|
||||||
|
self.ln1 = DyT(h_dim)
|
||||||
|
self.ln2 = DyT(h_dim)
|
||||||
|
self.rope = RoPE(dim=h_dim//self.num_heads, max_seq_len=max_seq_len)
|
||||||
|
|
||||||
|
def split_to_heads(self, x, B, T, H):
|
||||||
|
if self.num_heads <= 1: return x
|
||||||
|
return rearrange(x, 'b t (n h) -> (b n) t h', b=B, t=T, n=self.num_heads)
|
||||||
|
|
||||||
|
def gather_heads(self, x, B, T, H):
|
||||||
|
if self.num_heads <= 1: return x
|
||||||
|
return rearrange(x, '(b n) t h -> b t (n h)', b=B, t=T, n=self.num_heads)
|
||||||
|
|
||||||
|
def attention(self, x):
|
||||||
|
q = self.rope(self.split_to_heads(self.q_proj(x), *x.shape))
|
||||||
|
k = self.rope(self.split_to_heads(self.k_proj(x), *x.shape))
|
||||||
|
v = self.split_to_heads(self.v_proj(x), *x.shape)
|
||||||
|
scores = new_formula(self.sim_scores, q, k.transpose(1, 2)) * (self.h_dim ** -0.5)
|
||||||
|
tril = torch.tril(torch.ones(x.shape[1],x.shape[1])).to(self.q_proj.bias.device)
|
||||||
|
scores = scores.masked_fill(tril == 0, float('-inf'))
|
||||||
|
attention = nn.functional.softmax(scores, dim=2)
|
||||||
|
output = new_formula(self.sim_output, attention, v)
|
||||||
|
return self.o_proj(self.gather_heads(output, *x.shape))
|
||||||
|
|
||||||
|
def forward(self, x):
|
||||||
|
x = x + F.dropout1d(self.attention(self.ln1(x)), p=self.dropout_rate)
|
||||||
|
x = x + F.dropout1d(self.ff2(F.gelu(self.ff1(self.ln2(x)))), p=self.dropout_rate)
|
||||||
|
return x
|
||||||
|
|
||||||
|
class OpticGPT2NOKoefNewF(nn.Module):
|
||||||
|
def __init__(self, vocab_size, layers_num=1, h_dim=64, max_seq_len=64, num_heads=1, dropout_rate = 0.1,
|
||||||
|
pixel_size = 3.6e-6):
|
||||||
|
super().__init__()
|
||||||
|
self.__dict__.update({k:v for k,v in locals().items() if k != 'self'})
|
||||||
|
if max_seq_len < 512:
|
||||||
|
self.sim_scores = omm.OpticalMul(
|
||||||
|
omm.Config(right_matrix_count_columns = max_seq_len,
|
||||||
|
right_matrix_count_rows = h_dim // num_heads,
|
||||||
|
right_matrix_width = pixel_size * max_seq_len,
|
||||||
|
right_matrix_height = pixel_size * (h_dim // num_heads),
|
||||||
|
min_height_gap = pixel_size,
|
||||||
|
right_matrix_split_x = 2,
|
||||||
|
right_matrix_split_y = 2,
|
||||||
|
left_matrix_split_x = 2,
|
||||||
|
left_matrix_split_y = 2,
|
||||||
|
result_matrix_split = 2,
|
||||||
|
distance = 0.01,
|
||||||
|
trainable_cylind_lens=False)
|
||||||
|
)
|
||||||
|
self.sim_output = omm.OpticalMul(
|
||||||
|
omm.Config(right_matrix_count_columns = h_dim // num_heads,
|
||||||
|
right_matrix_count_rows = max_seq_len,
|
||||||
|
right_matrix_width = pixel_size * (h_dim // num_heads),
|
||||||
|
right_matrix_height = pixel_size * max_seq_len,
|
||||||
|
min_height_gap = pixel_size,
|
||||||
|
right_matrix_split_x = 2,
|
||||||
|
right_matrix_split_y = 2,
|
||||||
|
left_matrix_split_x = 2,
|
||||||
|
left_matrix_split_y = 2,
|
||||||
|
result_matrix_split = 2,
|
||||||
|
distance = 0.01,
|
||||||
|
trainable_cylind_lens=False)
|
||||||
|
)
|
||||||
|
if max_seq_len >= 512:
|
||||||
|
self.sim_scores = omm.OpticalMul(
|
||||||
|
omm.Config(right_matrix_count_columns = max_seq_len,
|
||||||
|
right_matrix_count_rows = h_dim // num_heads,
|
||||||
|
right_matrix_width = pixel_size * max_seq_len,
|
||||||
|
right_matrix_height = pixel_size * (h_dim // num_heads),
|
||||||
|
min_height_gap = pixel_size,
|
||||||
|
right_matrix_split_x = 2,
|
||||||
|
right_matrix_split_y = 2,
|
||||||
|
left_matrix_split_x = 2,
|
||||||
|
left_matrix_split_y = 2,
|
||||||
|
result_matrix_split = 2,
|
||||||
|
distance = 0.15,
|
||||||
|
lens_size = 8192 * 2,
|
||||||
|
trainable_cylind_lens=False)
|
||||||
|
)
|
||||||
|
self.sim_output = omm.OpticalMul(
|
||||||
|
omm.Config(right_matrix_count_columns = h_dim // num_heads,
|
||||||
|
right_matrix_count_rows = max_seq_len,
|
||||||
|
right_matrix_width = pixel_size * (h_dim // num_heads),
|
||||||
|
right_matrix_height = pixel_size * max_seq_len,
|
||||||
|
min_height_gap = pixel_size,
|
||||||
|
right_matrix_split_x = 2,
|
||||||
|
right_matrix_split_y = 2,
|
||||||
|
left_matrix_split_x = 2,
|
||||||
|
left_matrix_split_y = 2,
|
||||||
|
result_matrix_split = 2,
|
||||||
|
distance = 0.15,
|
||||||
|
lens_size = 8192 * 2,
|
||||||
|
trainable_cylind_lens=False)
|
||||||
|
)
|
||||||
|
|
||||||
|
self.layers = nn.ModuleList([
|
||||||
|
TransformerLayer(h_dim=self.h_dim, sim_scores=self.sim_scores, sim_output=self.sim_output, num_heads=self.num_heads,
|
||||||
|
dropout_rate=self.dropout_rate, max_seq_len=self.max_seq_len)
|
||||||
|
for _ in range(layers_num)])
|
||||||
|
self.tok_embeds = nn.Embedding(vocab_size, h_dim)
|
||||||
|
self.pos_embeds = nn.Parameter(torch.randn(1, self.max_seq_len, h_dim))
|
||||||
|
self.lm_head = nn.Linear(h_dim, vocab_size)
|
||||||
|
|
||||||
|
def forward(self, x, targets=None):
|
||||||
|
x = self.tok_embeds(x) + self.pos_embeds[:, :x.shape[1], :]
|
||||||
|
for l in self.layers:
|
||||||
|
x = l(x)
|
||||||
|
logits = self.lm_head(x) # B,T,C
|
||||||
|
loss = F.cross_entropy(rearrange(logits, "b t c -> b c t"), targets) if not targets is None else None
|
||||||
|
return logits, loss
|
||||||
|
|
||||||
|
# what is the purpose? autoregressive inference!
|
||||||
|
def generate(self, start_idx, max_new_tokens):
|
||||||
|
idx = start_idx
|
||||||
|
for i in range(max_new_tokens):
|
||||||
|
idx_cond = idx[:,-self.max_seq_len:]
|
||||||
|
logits, loss = self(idx_cond)
|
||||||
|
logits = logits[:,-1,:] # B, C
|
||||||
|
probs = F.softmax(logits, dim=-1)
|
||||||
|
idx_next = torch.multinomial(probs, num_samples=1).to(self.lm_head.bias.device)
|
||||||
|
idx = torch.cat([idx, idx_next], dim=1)
|
||||||
|
return idx
|
||||||
Loading…
Reference in New Issue