From 40889a17bd94d17411bced4edd61160fbb7d8b13 Mon Sep 17 00:00:00 2001 From: Vladimir Date: Sat, 5 Apr 2025 17:00:35 +0400 Subject: [PATCH] update --- checkpoints/.gitignore | 2 + runs/.gitignore | 2 + src/bert_training.py | 182 +++++++++++++++++++++-------------------- 3 files changed, 99 insertions(+), 87 deletions(-) create mode 100644 checkpoints/.gitignore create mode 100644 runs/.gitignore diff --git a/checkpoints/.gitignore b/checkpoints/.gitignore new file mode 100644 index 0000000..c96a04f --- /dev/null +++ b/checkpoints/.gitignore @@ -0,0 +1,2 @@ +* +!.gitignore \ No newline at end of file diff --git a/runs/.gitignore b/runs/.gitignore new file mode 100644 index 0000000..c96a04f --- /dev/null +++ b/runs/.gitignore @@ -0,0 +1,2 @@ +* +!.gitignore \ No newline at end of file diff --git a/src/bert_training.py b/src/bert_training.py index dec488e..1e9812d 100644 --- a/src/bert_training.py +++ b/src/bert_training.py @@ -80,7 +80,7 @@ class CreditProductsDataset: 'fclose_flag', 'pre_loans5', 'pre_loans6090', 'pre_loans530', 'pre_loans90', 'pre_loans3060' ] - self.num_columns = ['pre_loans5'] # TODO empty list get DatParallel to crash + self.num_columns = [] # make unified category index for embeddings for all columns. zero index embedding for padding will be zeroed during training self.cat_cardinalities = self.features_df.max(axis=0)[self.cat_columns] + 1 @@ -102,43 +102,43 @@ class CreditProductsDataset: self.targets_df = self.targets_df.sort_index() self.targets = torch.tensor(self.targets_df.flag.values).type(torch.float32) - def get_batch(self, batch_size=4): + def get_train_batch(self, batch_size=4): sampled_ids = np.random.choice(self.train_uniq_client_ids, batch_size, replace=False) # think about replace=True - cat_features_batch = self.cat_features[sampled_ids] - num_features_batch = self.num_features[sampled_ids] - if self.dropout_rate > 0.0: - cat_features_batch *= torch.empty_like(cat_features_batch).bernoulli_(1-self.dropout_rate) # argument is keep_prob - num_features_batch *= torch.empty_like(num_features_batch).bernoulli_(1-self.dropout_rate) # argument is keep_prob + cat_features_batch = self.cat_features[sampled_ids] + num_features_batch = self.num_features[sampled_ids] + cat_features_batch *= torch.empty_like(cat_features_batch).bernoulli_(1-self.dropout_rate) # arg is keep_probability + num_features_batch *= torch.empty_like(num_features_batch).bernoulli_(1-self.dropout_rate) targets_batch = self.targets[sampled_ids] return cat_features_batch, num_features_batch, targets_batch def get_test_batch_iterator(self, batch_size=4): for i in range(0, len(self.test_uniq_client_ids), batch_size): ids = self.test_uniq_client_ids[i:i+batch_size] - cat_features_batch = self.cat_features[ids] + cat_features_batch = self.cat_features[ids] num_features_batch = self.num_features[ids] targets_batch = self.targets[ids] yield cat_features_batch, num_features_batch, targets_batch # for parallel data selection class WrapperDataset(Dataset): - def __init__(self, credit_dataset, encoder, batch_size, datasets_per_epoch): + def __init__(self, credit_dataset, batch_size, datasets_per_epoch=1): self.credit_dataset = credit_dataset - self.encoder = encoder self.batch_size = batch_size - self.num_batches = len(self.credit_dataset.train_uniq_client_ids) // self.batch_size // torch.distributed.get_world_size() * datasets_per_epoch + self.num_batches = len(self.credit_dataset.train_uniq_client_ids) \ + // self.batch_size \ + * datasets_per_epoch def __len__(self): return self.num_batches def __getitem__(self, idx): - cat_inputs, num_inputs, targets = self.credit_dataset.get_batch(batch_size=self.batch_size) + cat_inputs, num_inputs, targets = self.credit_dataset.get_train_batch(batch_size=self.batch_size) return cat_inputs, num_inputs, targets ##################################### Model ########################################################################################### class Encoder(nn.Module): - def __init__(self, cat_columns, num_columns, cat_features_max_id, category_feature_dim=4, out_dim=64, dropout_rate=0.0): + def __init__(self, cat_columns, num_columns, cat_features_max_id, category_feature_dim=4, out_dim=64): super().__init__() self.__dict__.update({k:v for k,v in locals().items() if k != 'self'}) self.total_h_dim = len(self.cat_columns) * category_feature_dim + len(self.num_columns) @@ -150,14 +150,10 @@ class Encoder(nn.Module): def forward(self, cat_features_batch, num_features_batch, targets_batch): cat_embed_tensor = self.cat_embeds(cat_features_batch.type(torch.int32)) cat_embed_tensor = cat_embed_tensor.reshape(cat_features_batch.shape[0], cat_features_batch.shape[1], -1) - num_embed_tensor = self.num_scales * num_features_batch + self.num_shifts + num_embed_tensor = self.num_scales * num_features_batch + self.num_shifts embed_tensor = torch.concat([cat_embed_tensor, num_embed_tensor], dim=-1) - inputs = self.proj(embed_tensor) - if self.dropout_rate > 0.0: - inputs = F.dropout1d(inputs, p=self.dropout_rate) targets = targets_batch - return inputs, targets # RoFormer: Enhanced Transformer with Rotary Position Embedding https://arxiv.org/abs/2104.09864 @@ -192,18 +188,6 @@ class DyT(nn.Module): x = torch.tanh(self.alpha * x) return x * self.weight + self.bias -class DyC(nn.Module): - def __init__(self, num_features, alpha_init_value=0.5): - super().__init__() - self.alpha = nn.Parameter(torch.ones(1) * alpha_init_value) - self.weight = nn.Parameter(torch.ones(num_features)) - self.bias = nn.Parameter(torch.zeros(num_features)) - - def forward(self, x): - x = torch.clip(self.alpha * x, min=-1, max=1) - return x * self.weight + self.bias - -# from layers import ChebyKANLayer # Attention Is All You Need https://arxiv.org/pdf/1706.03762v7 # NeoBERT: A Next-Generation BERT https://arxiv.org/html/2502.19587v1 class TransformerLayer(nn.Module): @@ -216,9 +200,8 @@ class TransformerLayer(nn.Module): self.o_proj = nn.Linear(h_dim, h_dim) self.ff1 = nn.Linear(h_dim, 4*h_dim) self.ff2 = nn.Linear(4*h_dim, h_dim) - self.ln1 = DyC(h_dim) - self.ln2 = DyC(h_dim) - self.ln3 = DyC(max_seq_len) + self.ln1 = DyT(h_dim) + self.ln2 = DyT(h_dim) self.rope = RoPE(dim=h_dim//self.num_heads, max_seq_len=max_seq_len) def split_to_heads(self, x, B, T, H): @@ -227,15 +210,12 @@ class TransformerLayer(nn.Module): def gather_heads(self, x, B, T, H): return rearrange(x, '(b n) t h -> b t (n h)', b=B, t=T, n=self.num_heads) if self.num_heads > 1 else x - # how to check that attention is actually make some difference def attention(self, x): q = self.rope(self.split_to_heads(self.q_proj(x), *x.shape)) k = self.rope(self.split_to_heads(self.k_proj(x), *x.shape)) v = self.split_to_heads(self.v_proj(x), *x.shape) scores = (q @ k.transpose(1, 2)) * (self.h_dim ** -0.5) - # attention = nn.functional.softmax(F.dropout1d(scores, p=self.dropout_rate), dim=2) - # attention = self.ln3(F.dropout1d(scores, p=self.dropout_rate)) - attention = self.ln3(scores) + attention = nn.functional.softmax(scores, dim=2) return self.o_proj(self.gather_heads(attention @ v, *x.shape)) def forward(self, x): @@ -271,20 +251,21 @@ class Model(nn.Module): inputs, targets = self.encoder(cat_inputs, num_inputs, targets) return self.classifier(inputs), targets -def test(start_time, epoch, batches_per_epoch, batch_size, model, optimizer, credit_train_dataset, test_auroc, writer): +def test(start_time, epoch, batches_per_epoch, batch_size, model, optimizer, credit_dataset, test_auroc, writer): model.eval() optimizer.eval() with torch.no_grad(): - test_iterator = credit_train_dataset.get_test_batch_iterator(batch_size=batch_size) + test_iterator = credit_dataset.get_test_batch_iterator(batch_size=batch_size) for test_batch_id, (test_cat_inputs, test_num_inputs, test_targets) in enumerate(test_iterator): - test_cat_inputs, test_num_inputs, test_targets = [x.to(device_id, non_blocking=True) for x in [test_cat_inputs, test_num_inputs, test_targets]] + test_cat_inputs = test_cat_inputs.to("cuda", non_blocking=True) + test_num_inputs = test_num_inputs.to("cuda", non_blocking=True) + test_targets = test_targets.to("cuda", non_blocking=True) outputs, targets = model(test_cat_inputs, test_num_inputs, test_targets) test_auroc.update(outputs, targets.long()) - print(f"\r {test_batch_id}/{len(credit_train_dataset.test_uniq_client_ids)//batch_size} {test_auroc.compute().item():.5f}", end = " "*20) - if torch.distributed.get_rank() == 0: - writer.add_scalar('test_roc_auc', test_auroc.compute().item(), epoch * batches_per_epoch) - print(f"\r {datetime.now() - start_time} {epoch}/{epochs} Test rocauc: {test_auroc.compute().item():.5f}", end = " "*20) - print() + print(f"\r {test_batch_id}/{len(credit_dataset.test_uniq_client_ids)//batch_size} {test_auroc.compute().item():.5f}", end = " "*20) + writer.add_scalar('test_roc_auc', test_auroc.compute().item(), epoch * batches_per_epoch) + print(f"\r {datetime.now() - start_time} {epoch}/{epochs} Test rocauc: {test_auroc.compute().item():.5f}", end = " "*20) + print() ######################################### Training ################################################################ @@ -294,11 +275,10 @@ layers_num = 6 num_heads = 2 class_num = 1 dataset_dropout_rate = 0.4 -encoder_dropout_rate = 0.0 classifier_dropout_date = 0.4 epochs = 500 -batch_size = 2000 -datasets_per_epoch = 5 +batch_size = 30000 +datasets_per_epoch = 1 num_workers = 10 comment = sys.argv[1] @@ -316,12 +296,6 @@ start_prep_time = datetime.now() credit_train_dataset = CreditProductsDataset( features_path="/wd/data/train_data/", targets_path="/wd/data/train_target.csv", - # train_uniq_client_ids_path="/wd/train_uniq_client_ids.csv", - # test_uniq_client_ids_path="/wd/test_uniq_client_ids.csv", - # train_uniq_client_ids_path="/wd/dima_train_ids.csv", - # test_uniq_client_ids_path="/wd/dima_test_ids.csv", - # train_uniq_client_ids_path=f"/wd/fold{DEVICE_IDX}_train_ids.csv", - # test_uniq_client_ids_path=f"/wd/fold{DEVICE_IDX}_test_ids.csv", train_uniq_client_ids_path=f"/wd/fold3_train_ids.csv", test_uniq_client_ids_path=f"/wd/fold3_test_ids.csv", dropout_rate=dataset_dropout_rate @@ -333,8 +307,7 @@ encoder = Encoder( num_columns=credit_train_dataset.num_columns, cat_features_max_id=credit_train_dataset.cat_features.max(), category_feature_dim=category_feature_dim, - out_dim=h_dim, - dropout_rate=encoder_dropout_rate + out_dim=h_dim ) classifier = BertClassifier( @@ -346,11 +319,8 @@ classifier = BertClassifier( dropout_rate = classifier_dropout_date ) -device_id = int(os.environ["LOCAL_RANK"]) -model = Model(encoder=encoder, classifier=classifier).to(f"cuda:{device_id}") +model = Model(encoder=encoder, classifier=classifier).to("cuda") print(f"Model parameters count: ", sum(p.numel() for p in model.parameters())) -model = DDP(model, device_ids=[device_id]) - # The Road Less Scheduled https://arxiv.org/html/2405.15682v4 optimizer = schedulefree.AdamWScheduleFree(model.parameters()) @@ -359,15 +329,16 @@ positive_counts = credit_train_dataset.targets_df.loc[credit_train_dataset.train negative_counts = len(credit_train_dataset.targets_df.loc[credit_train_dataset.train_uniq_client_ids]) - positive_counts pos_weight = negative_counts / (positive_counts + 1e-15) print(f"Class imbalance: {negative_counts} {positive_counts}. Pos weight: {pos_weight}") + criterion = torch.nn.BCEWithLogitsLoss(pos_weight=torch.tensor(pos_weight)) -training_data = WrapperDataset(credit_train_dataset, encoder, batch_size=batch_size, datasets_per_epoch=datasets_per_epoch) +training_data = WrapperDataset(credit_dataset=credit_train_dataset, batch_size=batch_size, datasets_per_epoch=datasets_per_epoch) dataloader = DataLoader(training_data, batch_size=1, shuffle=False, num_workers=num_workers, pin_memory=True) -# number of batches to go through dataset once + batches_per_epoch = len(training_data) print(f"Number of batches per epoch: {batches_per_epoch}, Number of datasets per epoch : {datasets_per_epoch}") -test_auroc = AUROC(task='binary', sync_on_compute=True) +test_auroc = AUROC(task='binary') start_time = datetime.now() print("Started at:", start_time) @@ -375,45 +346,82 @@ last_display_time = start_time last_checkpoint_time = start_time try: for epoch in range(epochs): - test(start_time, epoch, batches_per_epoch, batch_size, model, optimizer, credit_train_dataset, test_auroc, - writer=writer ) + test( + start_time=start_time, + epoch=epoch, + batches_per_epoch=batches_per_epoch, + batch_size=batch_size, + model=model, + optimizer=optimizer, + credit_dataset=credit_train_dataset, + test_auroc=test_auroc, + writer=writer + ) for batch_id, (cat_inputs, num_inputs, targets) in enumerate(dataloader): model.train() optimizer.train() optimizer.zero_grad() - cat_inputs, num_inputs, targets = [x.to(device_id, non_blocking=True) for x in [cat_inputs[0], num_inputs[0], targets[0]]] - outputs, targets = model(cat_inputs, num_inputs, targets) - loss = criterion(outputs, targets) + outputs, targets = model( + cat_inputs[0].to("cuda"), + num_inputs[0].to("cuda"), + targets[0].to("cuda") + ) + loss = criterion(outputs, targets) loss.backward() optimizer.step() - ddp_loss[0] = loss.item() - torch.distributed.all_reduce(ddp_loss, op=torch.distributed.ReduceOp.SUM) - ddp_loss[0] /= world_size + current_time = datetime.now() if current_time - last_display_time > timedelta(seconds=1): last_display_time = current_time - if rank == 0: - writer.add_scalar(f'Loss', loss.item(), epoch*batches_per_epoch+batch_id) - print(f"\r {current_time-start_time} {epoch+1}/{epochs} {batch_id}/{batches_per_epoch} loss: {loss.item():.6f} {comment}", end = " "*2) + writer.add_scalar(f'Loss', loss.item(), epoch*batches_per_epoch+batch_id) + print(f"\r {current_time-start_time} {epoch+1}/{epochs} {batch_id}/{batches_per_epoch} loss: {loss.item():.6f} {comment}", end = " "*2) if current_time - last_checkpoint_time > timedelta(hours=8): last_checkpoint_time = current_time - test(start_time, epoch, batches_per_epoch, batch_size, model, optimizer, credit_train_dataset, test_auroc, - writer=writer ) + test( + start_time=start_time, + epoch=epoch, + batches_per_epoch=batches_per_epoch, + batch_size=batch_size, + model=model, + optimizer=optimizer, + credit_dataset=credit_train_dataset, + test_auroc=test_auroc, + writer=writer + ) rocauc = test_auroc.compute().item() save_checkpoint( - credit_dataset=credit_train_dataset, - encoder = model.module.encoder, model=model.module.classifier, optimizer=optimizer, epoch=epoch, - loss=loss.item(), rocauc=rocauc, сheсkpoints_dir=сheсkpoints_dir) + credit_dataset=credit_train_dataset, + encoder = model.module.encoder, + model=model.module.classifier, + optimizer=optimizer, + epoch=epoch, + loss=loss.item(), + rocauc=rocauc, + сheсkpoints_dir=сheсkpoints_dir + ) except KeyboardInterrupt: print() finally: - test(epoch+1, batches_per_epoch, batch_size, model, optimizer, credit_train_dataset, test_auroc, - writer=writer if rank==0 else None) + test( + start_time=start_time, + epoch=epoch+1, + batches_per_epoch=batches_per_epoch, + batch_size=batch_size, + model=model, + optimizer=optimizer, + credit_dataset=credit_train_dataset, + test_auroc=test_auroc, + writer=writer + ) rocauc = test_auroc.compute().item() save_checkpoint( - credit_dataset=credit_train_dataset, - encoder = model.module.encoder, model=model.module.classifier, optimizer=optimizer, epoch=epoch, - loss=loss.item(), rocauc=rocauc, сheсkpoints_dir=сheсkpoints_dir) - writer.close() - torch.distributed.destroy_process_group() - + credit_dataset=credit_train_dataset, + encoder = model.encoder, + model=model.classifier, + optimizer=optimizer, + epoch=epoch, + loss=loss.item(), + rocauc=rocauc, + сheсkpoints_dir=сheсkpoints_dir + ) + writer.close() \ No newline at end of file