diff --git a/readme.md b/readme.md index 8fee9b7..e0127b1 100644 --- a/readme.md +++ b/readme.md @@ -4,11 +4,11 @@ https://ods.ai/competitions/dl-fintech-bki ```python # single gpu версия -python src/bert_training.py fold3_18l_dyt_04_04_3750 +CUDA_VISIBLE_DEVICES=0 python src/bert_training.py experiment_name # data parallel версия -CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7 python src/bert_training_dp.py fold3_18l_dyt_04_04_3750 +CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7 python src/bert_training_dp.py experiment_name # distributed data parallel версия -CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7 torchrun --standalone --nproc-per-node=8 src/bert_training_ddp.py fold3_18l_dyt_04_04_3750 +CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7 torchrun --standalone --nproc-per-node=8 src/bert_training_ddp.py experiment_name ``` Логирование ведётся в tensorboard в папку `./logs/`. В папку с логами при запуске копируется текущая версия скрипта. Чекпоинты моделей сохраняются в папку `./checkpoints/`. diff --git a/src/bert_training.py b/src/bert_training.py index 49636dd..08ff8c3 100644 --- a/src/bert_training.py +++ b/src/bert_training.py @@ -147,16 +147,28 @@ class Encoder(nn.Module): super().__init__() self.__dict__.update({k:v for k,v in locals().items() if k != 'self'}) # all args are added as object variables self.total_h_dim = len(self.cat_columns) * category_feature_dim + len(self.num_columns) - self.cat_embeds = nn.Embedding(cat_features_max_id + 1, self.category_feature_dim, padding_idx=0) - self.num_scales = nn.Parameter(torch.randn(1, len(self.num_columns))) - self.num_shifts = nn.Parameter(torch.randn(1, len(self.num_columns))) + if len(self.cat_columns) > 0: + self.cat_embeds = nn.Embedding(cat_features_max_id + 1, self.category_feature_dim, padding_idx=0) + if len(self.num_columns) > 0: + # in case == 0 script crashes during backprob without if in dataparallel mode + self.num_scales = nn.Parameter(torch.randn(1, len(self.num_columns))) + self.num_shifts = nn.Parameter(torch.randn(1, len(self.num_columns))) self.proj = nn.Linear(self.total_h_dim, self.out_dim, bias=False) def forward(self, cat_features_batch, num_features_batch): - cat_embed_tensor = self.cat_embeds(cat_features_batch.data.type(torch.int32)) - cat_embed_tensor = cat_embed_tensor.reshape(cat_features_batch.data.shape[0], cat_features_batch.data.shape[1], -1) - num_embed_tensor = self.num_scales * num_features_batch.data + self.num_shifts - embed_tensor = torch.concat([cat_embed_tensor.data, num_embed_tensor.data], dim=-1) + if len(self.cat_columns) > 0 and len(self.num_columns) > 0: + cat_embed_tensor = self.cat_embeds(cat_features_batch.data.type(torch.int32)) + cat_embed_tensor = cat_embed_tensor.reshape(cat_features_batch.data.shape[0], cat_features_batch.data.shape[1], -1) + num_embed_tensor = self.num_scales * num_features_batch.data + self.num_shifts + embed_tensor = torch.concat([cat_embed_tensor.data, num_embed_tensor.data], dim=-1) + elif len(self.cat_columns) == 0: + embed_tensor = self.num_scales * num_features_batch.data + self.num_shifts + elif len(self.num_columns) == 0: + cat_embed_tensor = self.cat_embeds(cat_features_batch.data.type(torch.int32)) + cat_embed_tensor = cat_embed_tensor.reshape(cat_features_batch.data.shape[0], cat_features_batch.data.shape[1], -1) + embed_tensor = cat_embed_tensor.data + else: + raise Exception("The batch is empty.") embed_tensor = F.dropout(embed_tensor, self.features_dropout_rate) inputs = self.proj(embed_tensor) return inputs diff --git a/src/bert_training_ddp.py b/src/bert_training_ddp.py index f6e460d..ca74cf3 100644 --- a/src/bert_training_ddp.py +++ b/src/bert_training_ddp.py @@ -25,7 +25,6 @@ from torch.distributed.fsdp.wrap import ( wrap, ) import functools -from layers import * def save_checkpoint(credit_dataset, encoder, model, optimizer, epoch, loss, rocauc, сheсkpoints_dir): checkpoint = { @@ -54,21 +53,20 @@ def save_checkpoint(credit_dataset, encoder, model, optimizer, epoch, loss, roca ######################################## Dataset ######################################################### class CreditProductsDataset: - def __init__(self, + def __init__(self, features_path, targets_path, train_test_split_ratio=0.9, - train_uniq_client_ids_path=None, test_uniq_client_ids_path=None, - dropout_rate=0.0 + train_uniq_client_ids_path=None, test_uniq_client_ids_path=None ): self.__dict__.update({k:v for k,v in locals().items() if k != 'self'}) if Path(self.train_uniq_client_ids_path).exists(): self.train_uniq_client_ids = pd.read_csv(self.train_uniq_client_ids_path).iloc[:,0].values print("Loaded", self.train_uniq_client_ids_path) - else: + else: raise Exception(f"No {self.train_uniq_client_ids_path}") if Path(self.test_uniq_client_ids_path).exists(): self.test_uniq_client_ids = pd.read_csv(self.test_uniq_client_ids_path).iloc[:,0].values print("Loaded", self.test_uniq_client_ids_path) - else: + else: raise Exception(f"No {self.test_uniq_client_ids_path}") assert(len(np.intersect1d(self.train_uniq_client_ids, self.test_uniq_client_ids)) == 0), "Train contains test examples." self.features_df = pd.read_parquet(features_path) @@ -88,8 +86,8 @@ class CreditProductsDataset: 'fclose_flag', 'pre_loans5', 'pre_loans6090', 'pre_loans530', 'pre_loans90', 'pre_loans3060' ] - self.num_columns = ['pre_loans5'] # TODO empty list get DatParallel to crash - + self.num_columns = [] + # make unified category index for embeddings for all columns. zero index embedding for padding will be zeroed during training self.cat_cardinalities = self.features_df.max(axis=0)[self.cat_columns] + 1 self.cat_cardinalities_integral = self.cat_cardinalities.cumsum() @@ -105,68 +103,80 @@ class CreditProductsDataset: self.cat_features = pad_sequence(torch.split(self.cat_features, self.user_seq_lengths.tolist()), batch_first=True) # implicit max seq self.num_features = torch.tensor(self.features_df[self.num_columns].values, dtype=torch.float32) self.num_features = pad_sequence(torch.split(self.num_features, self.user_seq_lengths.tolist()), batch_first=True) + self.padding_mask = torch.ones(len(self.features_df), dtype=torch.bool) + self.padding_mask = pad_sequence(torch.split(self.padding_mask, self.user_seq_lengths.tolist()), batch_first=True) self.targets_df = self.targets_df.set_index('id') self.targets_df = self.targets_df.sort_index() self.targets = torch.tensor(self.targets_df.flag.values).type(torch.float32) - def get_batch(self, batch_size=4): + def get_train_batch(self, batch_size=4): sampled_ids = np.random.choice(self.train_uniq_client_ids, batch_size, replace=False) # think about replace=True - cat_features_batch = self.cat_features[sampled_ids] - num_features_batch = self.num_features[sampled_ids] - if self.dropout_rate > 0.0: - cat_features_batch *= torch.empty_like(cat_features_batch).bernoulli_(1-self.dropout_rate) # argument is keep_prob - num_features_batch *= torch.empty_like(num_features_batch).bernoulli_(1-self.dropout_rate) # argument is keep_prob - targets_batch = self.targets[sampled_ids] - return cat_features_batch, num_features_batch, targets_batch + return ( + self.cat_features[sampled_ids], + self.num_features[sampled_ids], + self.padding_mask[sampled_ids], + self.targets[sampled_ids] + ) def get_test_batch_iterator(self, batch_size=4): for i in range(0, len(self.test_uniq_client_ids), batch_size): - ids = self.test_uniq_client_ids[i:i+batch_size] - cat_features_batch = self.cat_features[ids] - num_features_batch = self.num_features[ids] - targets_batch = self.targets[ids] - yield cat_features_batch, num_features_batch, targets_batch - -# for parallel data selection + sampled_ids = self.test_uniq_client_ids[i:i+batch_size] + yield ( + self.cat_features[sampled_ids], + self.num_features[sampled_ids], + self.padding_mask[sampled_ids], + self.targets[sampled_ids] + ) + +# for parallel data selection class WrapperDataset(Dataset): - def __init__(self, credit_dataset, encoder, batch_size, datasets_per_epoch): + def __init__(self, credit_dataset, batch_size, datasets_per_epoch=1): self.credit_dataset = credit_dataset - self.encoder = encoder self.batch_size = batch_size - self.num_batches = len(self.credit_dataset.train_uniq_client_ids) // self.batch_size // torch.distributed.get_world_size() * datasets_per_epoch + self.num_batches = len(self.credit_dataset.train_uniq_client_ids) \ + // self.batch_size \ + * datasets_per_epoch def __len__(self): return self.num_batches def __getitem__(self, idx): - cat_inputs, num_inputs, targets = self.credit_dataset.get_batch(batch_size=self.batch_size) - return cat_inputs, num_inputs, targets + cat_inputs, num_inputs, padding_mask, targets = self.credit_dataset.get_train_batch(batch_size=self.batch_size) + return cat_inputs, num_inputs, padding_mask, targets ##################################### Model ########################################################################################### class Encoder(nn.Module): - def __init__(self, cat_columns, num_columns, cat_features_max_id, category_feature_dim=4, out_dim=64, dropout_rate=0.0): + def __init__(self, cat_columns, num_columns, cat_features_max_id, category_feature_dim=4, out_dim=64, features_dropout_rate=0.0): super().__init__() - self.__dict__.update({k:v for k,v in locals().items() if k != 'self'}) + self.__dict__.update({k:v for k,v in locals().items() if k != 'self'}) # all args are added as object variables self.total_h_dim = len(self.cat_columns) * category_feature_dim + len(self.num_columns) - self.cat_embeds = nn.Embedding(cat_features_max_id + 1, self.category_feature_dim, padding_idx=0) - self.num_scales = nn.Parameter(torch.randn(1, len(self.num_columns))) - self.num_shifts = nn.Parameter(torch.randn(1, len(self.num_columns))) + if len(self.cat_columns) > 0: + self.cat_embeds = nn.Embedding(cat_features_max_id + 1, self.category_feature_dim, padding_idx=0) + if len(self.num_columns) > 0: + # in case == 0 script crashes during backprob without if in dataparallel mode + self.num_scales = nn.Parameter(torch.randn(1, len(self.num_columns))) + self.num_shifts = nn.Parameter(torch.randn(1, len(self.num_columns))) self.proj = nn.Linear(self.total_h_dim, self.out_dim, bias=False) - - def forward(self, cat_features_batch, num_features_batch, targets_batch): - cat_embed_tensor = self.cat_embeds(cat_features_batch.type(torch.int32)) - cat_embed_tensor = cat_embed_tensor.reshape(cat_features_batch.shape[0], cat_features_batch.shape[1], -1) - num_embed_tensor = self.num_scales * num_features_batch + self.num_shifts - embed_tensor = torch.concat([cat_embed_tensor, num_embed_tensor], dim=-1) + def forward(self, cat_features_batch, num_features_batch): + if len(self.cat_columns) > 0 and len(self.num_columns) > 0: + cat_embed_tensor = self.cat_embeds(cat_features_batch.data.type(torch.int32)) + cat_embed_tensor = cat_embed_tensor.reshape(cat_features_batch.data.shape[0], cat_features_batch.data.shape[1], -1) + num_embed_tensor = self.num_scales * num_features_batch.data + self.num_shifts + embed_tensor = torch.concat([cat_embed_tensor.data, num_embed_tensor.data], dim=-1) + elif len(self.cat_columns) == 0: + embed_tensor = self.num_scales * num_features_batch.data + self.num_shifts + elif len(self.num_columns) == 0: + cat_embed_tensor = self.cat_embeds(cat_features_batch.data.type(torch.int32)) + cat_embed_tensor = cat_embed_tensor.reshape(cat_features_batch.data.shape[0], cat_features_batch.data.shape[1], -1) + embed_tensor = cat_embed_tensor.data + else: + raise Exception("The batch is empty.") + embed_tensor = F.dropout(embed_tensor, self.features_dropout_rate) inputs = self.proj(embed_tensor) - if self.dropout_rate > 0.0: - inputs = F.dropout1d(inputs, p=self.dropout_rate) - targets = targets_batch - - return inputs, targets + return inputs # RoFormer: Enhanced Transformer with Rotary Position Embedding https://arxiv.org/abs/2104.09864 class RoPE(nn.Module): @@ -195,23 +205,11 @@ class DyT(nn.Module): self.alpha = nn.Parameter(torch.ones(1) * alpha_init_value) self.weight = nn.Parameter(torch.ones(num_features)) self.bias = nn.Parameter(torch.zeros(num_features)) - - def forward(self, x): - x = torch.tanh(self.alpha * x) - return x * self.weight + self.bias -class DyC(nn.Module): - def __init__(self, num_features, alpha_init_value=0.5): - super().__init__() - self.alpha = nn.Parameter(torch.ones(1) * alpha_init_value) - self.weight = nn.Parameter(torch.ones(num_features)) - self.bias = nn.Parameter(torch.zeros(num_features)) - def forward(self, x): - x = torch.clip(self.alpha * x, min=-1, max=1) + x = torch.tanh(self.alpha * x) return x * self.weight + self.bias -# from layers import ChebyKANLayer # Attention Is All You Need https://arxiv.org/pdf/1706.03762v7 # NeoBERT: A Next-Generation BERT https://arxiv.org/html/2502.19587v1 class TransformerLayer(nn.Module): @@ -224,30 +222,31 @@ class TransformerLayer(nn.Module): self.o_proj = nn.Linear(h_dim, h_dim) self.ff1 = nn.Linear(h_dim, 4*h_dim) self.ff2 = nn.Linear(4*h_dim, h_dim) - self.ln1 = DyC(h_dim) - self.ln2 = DyC(h_dim) - self.ln3 = DyC(max_seq_len) + self.ln1 = DyT(h_dim) + self.ln2 = DyT(h_dim) self.rope = RoPE(dim=h_dim//self.num_heads, max_seq_len=max_seq_len) def split_to_heads(self, x, B, T, H): - return rearrange(x, 'b t (n h) -> (b n) t h', b=B, t=T, n=self.num_heads) if self.num_heads > 1 else x + if self.num_heads <= 1: return x + return rearrange(x, 'b t (n h) -> (b n) t h', b=B, t=T, n=self.num_heads) def gather_heads(self, x, B, T, H): - return rearrange(x, '(b n) t h -> b t (n h)', b=B, t=T, n=self.num_heads) if self.num_heads > 1 else x + if self.num_heads <= 1: return x + return rearrange(x, '(b n) t h -> b t (n h)', b=B, t=T, n=self.num_heads) - # how to check that attention is actually make some difference - def attention(self, x): + def attention(self, x, padding_mask): + padding_mask = padding_mask.unsqueeze(-1).expand(*padding_mask.shape+(self.num_heads,)) + padding_mask = self.split_to_heads(padding_mask, *padding_mask.shape) q = self.rope(self.split_to_heads(self.q_proj(x), *x.shape)) k = self.rope(self.split_to_heads(self.k_proj(x), *x.shape)) v = self.split_to_heads(self.v_proj(x), *x.shape) - scores = (q @ k.transpose(1, 2)) * (self.h_dim ** -0.5) - # attention = nn.functional.softmax(F.dropout1d(scores, p=self.dropout_rate), dim=2) - # attention = self.ln3(F.dropout1d(scores, p=self.dropout_rate)) - attention = self.ln3(scores) + scores = (q @ k.transpose(1, 2)) * (self.h_dim ** -0.5) + scores = scores.masked_fill(~padding_mask, -1e9) + attention = nn.functional.softmax(scores, dim=2) return self.o_proj(self.gather_heads(attention @ v, *x.shape)) - def forward(self, x): - x = x + F.dropout1d(self.attention(self.ln1(x)), p=self.dropout_rate) + def forward(self, x, padding_mask): + x = x + F.dropout1d(self.attention(self.ln1(x), padding_mask), p=self.dropout_rate) x = x + F.dropout1d(self.ff2(F.gelu(self.ff1(self.ln2(x)))), p=self.dropout_rate) return x @@ -255,17 +254,18 @@ class BertClassifier(nn.Module): def __init__(self, layers_num=1, h_dim=64, class_num=2, max_seq_len=128, num_heads=4, dropout_rate = 0.1): super().__init__() self.__dict__.update({k:v for k,v in locals().items() if k != 'self'}) - self.cls_token = nn.Parameter(torch.randn(1,1,h_dim)) + self.cls_token = nn.Parameter(torch.randn(1,1,h_dim)) self.max_seq_len = max_seq_len + self.cls_token.shape[1] self.layers = nn.ModuleList([TransformerLayer(h_dim=h_dim, num_heads=num_heads, dropout_rate = dropout_rate, max_seq_len=self.max_seq_len) for _ in range(layers_num)]) self.classifier_head = nn.Sequential(nn.Linear(h_dim, h_dim), nn.GELU(), nn.Linear(h_dim, class_num)) self.pos_embeds = nn.Parameter(torch.randn(1, self.max_seq_len, h_dim)) - def forward(self, x): - x = torch.concat([self.cls_token.expand([x.shape[0], self.cls_token.shape[1], self.cls_token.shape[2]]), x], dim=1) + def forward(self, x, padding_mask): + x = torch.cat([self.cls_token.expand([x.shape[0], self.cls_token.shape[1], x.shape[2]]), x], dim=1) # prepend + padding_mask = torch.cat([torch.ones(x.shape[0], 1, dtype=torch.bool, device=x.device), padding_mask], dim=1) # prepend x = x + self.pos_embeds[:, :x.shape[1], :] for l in self.layers: - x = l(x) + x = l(x, padding_mask) x = self.classifier_head(x[:,0,:]) return x[:,:] if self.class_num > 1 else x[:,0] @@ -274,20 +274,23 @@ class Model(nn.Module): super().__init__() self.encoder = encoder self.classifier = classifier - - def forward(self, cat_inputs, num_inputs, targets): - inputs, targets = self.encoder(cat_inputs, num_inputs, targets) - return self.classifier(inputs), targets + + def forward(self, cat_inputs, num_inputs, padding_mask): + inputs = self.encoder(cat_inputs, num_inputs) + return self.classifier(inputs, padding_mask) def test(start_time, epoch, batches_per_epoch, batch_size, model, optimizer, credit_train_dataset, test_auroc, writer): model.eval() optimizer.eval() with torch.no_grad(): test_iterator = credit_train_dataset.get_test_batch_iterator(batch_size=batch_size) - for test_batch_id, (test_cat_inputs, test_num_inputs, test_targets) in enumerate(test_iterator): - test_cat_inputs, test_num_inputs, test_targets = [x.to(device_id, non_blocking=True) for x in [test_cat_inputs, test_num_inputs, test_targets]] - outputs, targets = model(test_cat_inputs, test_num_inputs, test_targets) - test_auroc.update(outputs, targets.long()) + for test_batch_id, (test_cat_inputs, test_num_inputs, test_padding_mask, test_targets) in enumerate(test_iterator): + test_cat_inputs = test_cat_inputs.to("cuda", non_blocking=True) + test_num_inputs = test_num_inputs.to("cuda", non_blocking=True) + test_padding_mask = test_padding_mask.to("cuda", non_blocking=True) + test_targets = test_targets.to("cuda", non_blocking=True) + outputs = model(test_cat_inputs, test_num_inputs, test_padding_mask) + test_auroc.update(outputs, test_targets.long()) print(f"\r {test_batch_id}/{len(credit_train_dataset.test_uniq_client_ids)//batch_size} {test_auroc.compute().item():.5f}", end = " "*20) if torch.distributed.get_rank() == 0: writer.add_scalar('test_roc_auc', test_auroc.compute().item(), epoch * batches_per_epoch) @@ -296,17 +299,16 @@ def test(start_time, epoch, batches_per_epoch, batch_size, model, optimizer, cre ######################################### Training ################################################################ -h_dim = 32 +h_dim = 64 category_feature_dim = 8 layers_num = 6 num_heads = 2 class_num = 1 -dataset_dropout_rate = 0.4 -encoder_dropout_rate = 0.0 +features_dropout_rate = 0.4 classifier_dropout_date = 0.4 epochs = 500 batch_size = 2000 -datasets_per_epoch = 5 +datasets_per_epoch = 2 num_workers = 10 if __name__ == "__main__": @@ -326,7 +328,7 @@ if __name__ == "__main__": # DEVICE = "cuda" if rank == 0: comment = sys.argv[1] - logs_dir = f'runs/{datetime.now().date()}_{datetime.now().hour:02d}_{datetime.now().minute:02d}_{datetime.now().second:02d}_{comment}/' + logs_dir = f'logs/{datetime.now().date()}_{datetime.now().hour:02d}_{datetime.now().minute:02d}_{datetime.now().second:02d}_{comment}/' writer = SummaryWriter(logs_dir) сheсkpoints_dir = f'checkpoints/{datetime.now().date()}_{datetime.now().hour:02d}_{datetime.now().minute:02d}_{datetime.now().second:02d}_{comment}/' script_snapshot_path = Path(logs_dir + Path(sys.argv[0]).name) @@ -340,15 +342,8 @@ if __name__ == "__main__": credit_train_dataset = CreditProductsDataset( features_path="/wd/data/train_data/", targets_path="/wd/data/train_target.csv", - # train_uniq_client_ids_path="/wd/train_uniq_client_ids.csv", - # test_uniq_client_ids_path="/wd/test_uniq_client_ids.csv", - # train_uniq_client_ids_path="/wd/dima_train_ids.csv", - # test_uniq_client_ids_path="/wd/dima_test_ids.csv", - # train_uniq_client_ids_path=f"/wd/fold{DEVICE_IDX}_train_ids.csv", - # test_uniq_client_ids_path=f"/wd/fold{DEVICE_IDX}_test_ids.csv", train_uniq_client_ids_path=f"/wd/fold3_train_ids.csv", - test_uniq_client_ids_path=f"/wd/fold3_test_ids.csv", - dropout_rate=dataset_dropout_rate + test_uniq_client_ids_path=f"/wd/fold3_test_ids.csv" ) if rank == 0: print(f"Dataset preparation time: {datetime.now() - start_prep_time}") @@ -358,7 +353,7 @@ if __name__ == "__main__": cat_features_max_id=credit_train_dataset.cat_features.max(), category_feature_dim=category_feature_dim, out_dim=h_dim, - dropout_rate=encoder_dropout_rate + features_dropout_rate=features_dropout_rate ) classifier = BertClassifier( @@ -373,7 +368,8 @@ if __name__ == "__main__": device_id = int(os.environ["LOCAL_RANK"]) model = Model(encoder=encoder, classifier=classifier).to(f"cuda:{device_id}") if rank == 0: print(f"Model parameters count: ", sum(p.numel() for p in model.parameters())) - model = DDP(model, device_ids=[device_id]) + model = DDP(model, device_ids=[device_id], find_unused_parameters=True) + # TODO if find_unused_parameters is False then crashes with the message that asks to find unused parameters # my_auto_wrap_policy = functools.partial( # size_based_auto_wrap_policy, min_num_params=20000 # ) @@ -388,7 +384,7 @@ if __name__ == "__main__": if rank == 0: print(f"Class imbalance: {negative_counts} {positive_counts}. Pos weight: {pos_weight}") criterion = torch.nn.BCEWithLogitsLoss(pos_weight=torch.tensor(pos_weight)) - training_data = WrapperDataset(credit_train_dataset, encoder, batch_size=batch_size, datasets_per_epoch=datasets_per_epoch) + training_data = WrapperDataset(credit_dataset=credit_train_dataset, batch_size=batch_size, datasets_per_epoch=datasets_per_epoch) dataloader = DataLoader(training_data, batch_size=1, shuffle=False, num_workers=num_workers, pin_memory=True) # number of batches to go through dataset once batches_per_epoch = len(training_data) @@ -399,19 +395,22 @@ if __name__ == "__main__": start_time = datetime.now() if rank == 0: print("Started at:", start_time) last_display_time = start_time - last_checkpoint_time = start_time + last_checkpoint_time = start_timebug ddp_loss = torch.zeros(1).to(rank) # quickly goes to nan TODO debug. Are all replicas training properly? try: for epoch in range(epochs): test(start_time, epoch, batches_per_epoch, batch_size, model, optimizer, credit_train_dataset, test_auroc, writer=writer if rank==0 else None) - for batch_id, (cat_inputs, num_inputs, targets) in enumerate(dataloader): + for batch_id, (cat_inputs, num_inputs, padding_mask, targets) in enumerate(dataloader): model.train() optimizer.train() optimizer.zero_grad() - cat_inputs, num_inputs, targets = [x.to(device_id, non_blocking=True) for x in [cat_inputs[0], num_inputs[0], targets[0]]] - outputs, targets = model(cat_inputs, num_inputs, targets) - loss = criterion(outputs, targets) + outputs = model( + cat_inputs[0].to("cuda"), + num_inputs[0].to("cuda"), + padding_mask[0].to("cuda") + ) + loss = criterion(outputs, targets[0].to("cuda")) loss.backward() optimizer.step() ddp_loss[0] = loss.item() @@ -438,7 +437,7 @@ if __name__ == "__main__": except KeyboardInterrupt: print() finally: - test(epoch+1, batches_per_epoch, batch_size, model, optimizer, credit_train_dataset, test_auroc, + test(start_time, epoch+1, batches_per_epoch, batch_size, model, optimizer, credit_train_dataset, test_auroc, writer=writer if rank==0 else None) rocauc = test_auroc.compute().item() if rank == 0: diff --git a/src/bert_training_dp.py b/src/bert_training_dp.py index 71daa3c..3959eec 100644 --- a/src/bert_training_dp.py +++ b/src/bert_training_dp.py @@ -18,17 +18,8 @@ import torch.nn.functional as F DEVICE = "cuda" comment = sys.argv[1] -logs_dir = f'runs/{datetime.now().date()}_{datetime.now().hour:02d}_{datetime.now().minute:02d}_{datetime.now().second:02d}_{DEVICE_IDX}_{comment}/' -сheсkpoints_dir = f'checkpoints/{datetime.now().date()}_{datetime.now().hour:02d}_{datetime.now().minute:02d}_{datetime.now().second:02d}_{DEVICE_IDX}_{comment}/' -Path(сheсkpoints_dir).mkdir(parents=True, exist_ok=True) -print("Logs dir:", logs_dir) -print("Chekpoints dir:", сheсkpoints_dir) -writer = SummaryWriter(logs_dir) -script_snapshot_path = Path(logs_dir + Path(sys.argv[0]).name) -script_snapshot_path.write_bytes(Path(sys.argv[0]).read_bytes()) # copy this version of script -script_snapshot_path.chmod(0o400) # with read-only permission -def save_checkpoint(credit_dataset, encoder, model, optimizer, epoch, loss, rocauc, сheсkpoints_dir): +def save_checkpoint(credit_dataset, encoder, model, optimizer, epoch, loss, rocauc, checkpoints_dir): checkpoint = { 'encoder': { 'state_dict': encoder.state_dict(), @@ -47,28 +38,28 @@ def save_checkpoint(credit_dataset, encoder, model, optimizer, epoch, loss, roca 'train_uniq_client_ids_path': credit_dataset.train_uniq_client_ids_path, 'test_uniq_client_ids_path': credit_dataset.test_uniq_client_ids_path } - path = сheсkpoints_dir + f"epoch_{epoch}_{rocauc:.4f}.pth" + path = checkpoints_dir + f"epoch_{epoch}_{rocauc:.4f}.pth" + # if torch.distributed.get_rank() == 0: torch.save(checkpoint, path) print(f"\nCheckpoint saved to {path}") ################################################################################################# class CreditProductsDataset: - def __init__(self, + def __init__(self, features_path, targets_path, train_test_split_ratio=0.9, - train_uniq_client_ids_path=None, test_uniq_client_ids_path=None, - dropout_rate=0.0 + train_uniq_client_ids_path=None, test_uniq_client_ids_path=None ): self.__dict__.update({k:v for k,v in locals().items() if k != 'self'}) if Path(self.train_uniq_client_ids_path).exists(): self.train_uniq_client_ids = pd.read_csv(self.train_uniq_client_ids_path).iloc[:,0].values print("Loaded", self.train_uniq_client_ids_path) - else: + else: raise Exception(f"No {self.train_uniq_client_ids_path}") if Path(self.test_uniq_client_ids_path).exists(): self.test_uniq_client_ids = pd.read_csv(self.test_uniq_client_ids_path).iloc[:,0].values print("Loaded", self.test_uniq_client_ids_path) - else: + else: raise Exception(f"No {self.test_uniq_client_ids_path}") assert(len(np.intersect1d(self.train_uniq_client_ids, self.test_uniq_client_ids)) == 0), "Train contains test examples." self.features_df = pd.read_parquet(features_path) @@ -88,7 +79,8 @@ class CreditProductsDataset: 'fclose_flag', 'pre_loans5', 'pre_loans6090', 'pre_loans530', 'pre_loans90', 'pre_loans3060' ] - self.num_columns = ['pre_loans5'] # TODO empty list get DatParallel to crash + self.num_columns = [] + # make unified category index for embeddings for all columns. zero index embedding for padding will be zeroed during training self.cat_cardinalities = self.features_df.max(axis=0)[self.cat_columns] + 1 self.cat_cardinalities_integral = self.cat_cardinalities.cumsum() @@ -97,8 +89,6 @@ class CreditProductsDataset: self.features_df = self.features_df.sort_values(self.id_columns, ascending=[True, True]) self.features_df = self.features_df.set_index('id') - self.targets_df = self.targets_df.set_index('id') - self.targets_df = self.targets_df.sort_index() self.user_seq_lengths = self.features_df.index.value_counts().sort_index() @@ -106,43 +96,80 @@ class CreditProductsDataset: self.cat_features = pad_sequence(torch.split(self.cat_features, self.user_seq_lengths.tolist()), batch_first=True) # implicit max seq self.num_features = torch.tensor(self.features_df[self.num_columns].values, dtype=torch.float32) self.num_features = pad_sequence(torch.split(self.num_features, self.user_seq_lengths.tolist()), batch_first=True) + self.padding_mask = torch.ones(len(self.features_df), dtype=torch.bool) + self.padding_mask = pad_sequence(torch.split(self.padding_mask, self.user_seq_lengths.tolist()), batch_first=True) + + self.targets_df = self.targets_df.set_index('id') + self.targets_df = self.targets_df.sort_index() self.targets = torch.tensor(self.targets_df.flag.values).type(torch.float32) - def get_batch(self, batch_size=4): + def get_train_batch(self, batch_size=4): sampled_ids = np.random.choice(self.train_uniq_client_ids, batch_size, replace=False) # think about replace=True - cat_features_batch = self.cat_features[sampled_ids] * torch.empty_like(self.cat_features[sampled_ids]).bernoulli_(1-self.dropout_rate) # argument is keep_prob - num_features_batch = self.num_features[sampled_ids] * torch.empty_like(self.num_features[sampled_ids]).bernoulli_(1-self.dropout_rate) # argument is keep_prob - targets_batch = self.targets[sampled_ids] - return cat_features_batch, num_features_batch, targets_batch + return ( + self.cat_features[sampled_ids], + self.num_features[sampled_ids], + self.padding_mask[sampled_ids], + self.targets[sampled_ids] + ) def get_test_batch_iterator(self, batch_size=4): for i in range(0, len(self.test_uniq_client_ids), batch_size): - ids = self.test_uniq_client_ids[i:i+batch_size] - cat_features_batch = self.cat_features[ids] - num_features_batch = self.num_features[ids] - targets_batch = self.targets[ids] - yield cat_features_batch, num_features_batch, targets_batch + sampled_ids = self.test_uniq_client_ids[i:i+batch_size] + yield ( + self.cat_features[sampled_ids], + self.num_features[sampled_ids], + self.padding_mask[sampled_ids], + self.targets[sampled_ids] + ) + +# for parallel data selection +class WrapperDataset(Dataset): + def __init__(self, credit_dataset, batch_size, datasets_per_epoch=1): + self.credit_dataset = credit_dataset + self.batch_size = batch_size + self.num_batches = len(self.credit_dataset.train_uniq_client_ids) \ + // self.batch_size \ + * datasets_per_epoch + + def __len__(self): + return self.num_batches + + def __getitem__(self, idx): + cat_inputs, num_inputs, padding_mask, targets = self.credit_dataset.get_train_batch(batch_size=self.batch_size) + return cat_inputs, num_inputs, padding_mask, targets + +##################################### Model ########################################################################################### class Encoder(nn.Module): - def __init__(self, cat_columns, num_columns, cat_features_max_id, category_feature_dim=4, out_dim=64, dropout_rate=0.5): + def __init__(self, cat_columns, num_columns, cat_features_max_id, category_feature_dim=4, out_dim=64, features_dropout_rate=0.0): super().__init__() - self.__dict__.update({k:v for k,v in locals().items() if k != 'self'}) + self.__dict__.update({k:v for k,v in locals().items() if k != 'self'}) # all args are added as object variables self.total_h_dim = len(self.cat_columns) * category_feature_dim + len(self.num_columns) - self.cat_embeds = nn.Embedding(cat_features_max_id + 1, self.category_feature_dim, padding_idx=0) - self.num_scales = nn.Parameter(torch.randn(1, len(self.num_columns))) - self.num_shifts = nn.Parameter(torch.randn(1, len(self.num_columns))) + if len(self.cat_columns) > 0: + self.cat_embeds = nn.Embedding(cat_features_max_id + 1, self.category_feature_dim, padding_idx=0) + if len(self.num_columns) > 0: + # in case == 0 script crashes during backprob without if in dataparallel mode + self.num_scales = nn.Parameter(torch.randn(1, len(self.num_columns))) + self.num_shifts = nn.Parameter(torch.randn(1, len(self.num_columns))) self.proj = nn.Linear(self.total_h_dim, self.out_dim, bias=False) - - def forward(self, cat_features_batch, num_features_batch, targets_batch): - cat_embed_tensor = self.cat_embeds(cat_features_batch.type(torch.int32)) - cat_embed_tensor = cat_embed_tensor.reshape(cat_features_batch.shape[0], cat_features_batch.shape[1], -1) - num_embed_tensor = self.num_scales * num_features_batch + self.num_shifts - embed_tensor = torch.concat([cat_embed_tensor, num_embed_tensor], dim=-1) + def forward(self, cat_features_batch, num_features_batch): + if len(self.cat_columns) > 0 and len(self.num_columns) > 0: + cat_embed_tensor = self.cat_embeds(cat_features_batch.data.type(torch.int32)) + cat_embed_tensor = cat_embed_tensor.reshape(cat_features_batch.data.shape[0], cat_features_batch.data.shape[1], -1) + num_embed_tensor = self.num_scales * num_features_batch.data + self.num_shifts + embed_tensor = torch.concat([cat_embed_tensor.data, num_embed_tensor.data], dim=-1) + elif len(self.cat_columns) == 0: + embed_tensor = self.num_scales * num_features_batch.data + self.num_shifts + elif len(self.num_columns) == 0: + cat_embed_tensor = self.cat_embeds(cat_features_batch.data.type(torch.int32)) + cat_embed_tensor = cat_embed_tensor.reshape(cat_features_batch.data.shape[0], cat_features_batch.data.shape[1], -1) + embed_tensor = cat_embed_tensor.data + else: + raise Exception("The batch is empty.") + embed_tensor = F.dropout(embed_tensor, self.features_dropout_rate) inputs = self.proj(embed_tensor) - targets = targets_batch - - return inputs, targets + return inputs # RoFormer: Enhanced Transformer with Rotary Position Embedding https://arxiv.org/abs/2104.09864 class RoPE(nn.Module): @@ -171,7 +198,7 @@ class DyT(nn.Module): self.alpha = nn.Parameter(torch.ones(1) * alpha_init_value) self.weight = nn.Parameter(torch.ones(num_features)) self.bias = nn.Parameter(torch.zeros(num_features)) - + def forward(self, x): x = torch.tanh(self.alpha * x) return x * self.weight + self.bias @@ -188,27 +215,31 @@ class TransformerLayer(nn.Module): self.o_proj = nn.Linear(h_dim, h_dim) self.ff1 = nn.Linear(h_dim, 4*h_dim) self.ff2 = nn.Linear(4*h_dim, h_dim) - self.ln1 = DyT(h_dim) - self.ln2 = DyT(h_dim) - self.ln3 = DyT(max_seq_len) + self.ln1 = DyT(h_dim) + self.ln2 = DyT(h_dim) self.rope = RoPE(dim=h_dim//self.num_heads, max_seq_len=max_seq_len) def split_to_heads(self, x, B, T, H): - return rearrange(x, 'b t (n h) -> (b n) t h', b=B, t=T, n=self.num_heads) if self.num_heads > 1 else x + if self.num_heads <= 1: return x + return rearrange(x, 'b t (n h) -> (b n) t h', b=B, t=T, n=self.num_heads) def gather_heads(self, x, B, T, H): - return rearrange(x, '(b n) t h -> b t (n h)', b=B, t=T, n=self.num_heads) if self.num_heads > 1 else x + if self.num_heads <= 1: return x + return rearrange(x, '(b n) t h -> b t (n h)', b=B, t=T, n=self.num_heads) - def attention(self, x): + def attention(self, x, padding_mask): + padding_mask = padding_mask.unsqueeze(-1).expand(*padding_mask.shape+(self.num_heads,)) + padding_mask = self.split_to_heads(padding_mask, *padding_mask.shape) q = self.rope(self.split_to_heads(self.q_proj(x), *x.shape)) k = self.rope(self.split_to_heads(self.k_proj(x), *x.shape)) v = self.split_to_heads(self.v_proj(x), *x.shape) scores = (q @ k.transpose(1, 2)) * (self.h_dim ** -0.5) - attention = self.ln3(F.dropout1d(scores, p=self.dropout_rate)) + scores = scores.masked_fill(~padding_mask, -1e9) + attention = nn.functional.softmax(scores, dim=2) return self.o_proj(self.gather_heads(attention @ v, *x.shape)) - def forward(self, x): - x = x + F.dropout1d(self.attention(self.ln1(x)), p=self.dropout_rate) + def forward(self, x, padding_mask): + x = x + F.dropout1d(self.attention(self.ln1(x), padding_mask), p=self.dropout_rate) x = x + F.dropout1d(self.ff2(F.gelu(self.ff1(self.ln2(x)))), p=self.dropout_rate) return x @@ -216,17 +247,18 @@ class BertClassifier(nn.Module): def __init__(self, layers_num=1, h_dim=64, class_num=2, max_seq_len=128, num_heads=4, dropout_rate = 0.1): super().__init__() self.__dict__.update({k:v for k,v in locals().items() if k != 'self'}) - self.cls_token = nn.Parameter(torch.randn(1,1,h_dim)) + self.cls_token = nn.Parameter(torch.randn(1,1,h_dim)) self.max_seq_len = max_seq_len + self.cls_token.shape[1] self.layers = nn.ModuleList([TransformerLayer(h_dim=h_dim, num_heads=num_heads, dropout_rate = dropout_rate, max_seq_len=self.max_seq_len) for _ in range(layers_num)]) - self.classifier_head = nn.Sequential(nn.Linear(h_dim, h_dim), nn.Dropout(0.1), nn.GELU(), nn.Linear(h_dim, class_num)) + self.classifier_head = nn.Sequential(nn.Linear(h_dim, h_dim), nn.GELU(), nn.Linear(h_dim, class_num)) self.pos_embeds = nn.Parameter(torch.randn(1, self.max_seq_len, h_dim)) - def forward(self, x): - x = torch.concat([self.cls_token.expand([x.shape[0], self.cls_token.shape[1], self.cls_token.shape[2]]), x], dim=1) + def forward(self, x, padding_mask): + x = torch.cat([self.cls_token.expand([x.shape[0], self.cls_token.shape[1], x.shape[2]]), x], dim=1) # prepend + padding_mask = torch.cat([torch.ones(x.shape[0], 1, dtype=torch.bool, device=x.device), padding_mask], dim=1) # prepend x = x + self.pos_embeds[:, :x.shape[1], :] for l in self.layers: - x = l(x) + x = l(x, padding_mask) x = self.classifier_head(x[:,0,:]) return x[:,:] if self.class_num > 1 else x[:,0] @@ -235,56 +267,83 @@ class Model(nn.Module): super().__init__() self.encoder = encoder self.classifier = classifier - - def forward(self, cat_inputs, num_inputs, targets): - inputs, targets = self.encoder(cat_inputs, num_inputs, targets) - return self.classifier(inputs), targets + + def forward(self, cat_inputs, num_inputs, padding_mask): + inputs = self.encoder(cat_inputs, num_inputs) + return self.classifier(inputs, padding_mask) + +def test(start_time, epoch, batches_per_epoch, batch_size, model, optimizer, credit_dataset, test_auroc, writer): + model.eval() + optimizer.eval() + with torch.no_grad(): + test_iterator = credit_dataset.get_test_batch_iterator(batch_size=batch_size) + for test_batch_id, (test_cat_inputs, test_num_inputs, test_padding_mask, test_targets) in enumerate(test_iterator): + test_cat_inputs = test_cat_inputs.to("cuda", non_blocking=True) + test_num_inputs = test_num_inputs.to("cuda", non_blocking=True) + test_padding_mask = test_padding_mask.to("cuda", non_blocking=True) + test_targets = test_targets.to("cuda", non_blocking=True) + outputs = model(test_cat_inputs, test_num_inputs, test_padding_mask) + test_auroc.update(outputs, test_targets.long()) + print(f"\r {test_batch_id}/{len(credit_dataset.test_uniq_client_ids)//batch_size} {test_auroc.compute().item():.5f}", end = " "*20) + if not writer is None: + writer.add_scalar('test_roc_auc', test_auroc.compute().item(), epoch * batches_per_epoch) + print(f"\r {datetime.now() - start_time} {epoch}/{epochs} Test rocauc: {test_auroc.compute().item():.5f}", end = " "*20) + print() + +######################################### Training ################################################################ h_dim = 64 category_feature_dim = 8 layers_num = 6 num_heads = 2 class_num = 1 -dropout_rate = 0.4 -epochs = 800 -batch_size = 30000 +features_dropout_rate = 0.4 +model_dropout_date = 0.4 +epochs = 500 +batch_size = 30000*len(DEVICE_IDX.split(',')) +datasets_per_epoch = len(DEVICE_IDX.split(',')) +num_workers = 10 + +logs_dir = f'logs/{datetime.now().date()}_{datetime.now().hour:02d}_{datetime.now().minute:02d}_{datetime.now().second:02d}_{comment}/' +writer = SummaryWriter(logs_dir) +checkpoints_dir = f'checkpoints/{datetime.now().date()}_{datetime.now().hour:02d}_{datetime.now().minute:02d}_{datetime.now().second:02d}_{comment}/' +script_snapshot_path = Path(logs_dir + Path(sys.argv[0]).name) +Path(checkpoints_dir).mkdir(parents=True, exist_ok=True) +print("Logs dir:", logs_dir) +print("Chekpoints dir:", checkpoints_dir) +script_snapshot_path.write_bytes(Path(sys.argv[0]).read_bytes()) # copy this version of script +script_snapshot_path.chmod(0o400) # with read-only permission start_prep_time = datetime.now() credit_train_dataset = CreditProductsDataset( features_path="/wd/data/train_data/", targets_path="/wd/data/train_target.csv", - # train_uniq_client_ids_path="/wd/train_uniq_client_ids.csv", - # test_uniq_client_ids_path="/wd/test_uniq_client_ids.csv", - # train_uniq_client_ids_path="/wd/dima_train_ids.csv", - # test_uniq_client_ids_path="/wd/dima_test_ids.csv", - # train_uniq_client_ids_path=f"/wd/fold{DEVICE_IDX}_train_ids.csv", - # test_uniq_client_ids_path=f"/wd/fold{DEVICE_IDX}_test_ids.csv", train_uniq_client_ids_path=f"/wd/fold3_train_ids.csv", test_uniq_client_ids_path=f"/wd/fold3_test_ids.csv", - dropout_rate=dropout_rate ) batches_per_epoch = len(credit_train_dataset.uniq_client_ids) // batch_size print(f"Dataset preparation time: {datetime.now() - start_prep_time}") encoder = Encoder( cat_columns=credit_train_dataset.cat_columns, - num_columns=credit_train_dataset.num_columns, + num_columns=credit_train_dataset.num_columns, cat_features_max_id=credit_train_dataset.cat_features.max(), - category_feature_dim=category_feature_dim, + category_feature_dim=category_feature_dim, out_dim=h_dim, - dropout_rate=dropout_rate + features_dropout_rate=features_dropout_rate ) classifier = BertClassifier( - layers_num=layers_num, + layers_num=layers_num, num_heads=num_heads, - h_dim=h_dim, - class_num=class_num, + h_dim=h_dim, + class_num=class_num, max_seq_len=credit_train_dataset.max_user_history, - dropout_rate = dropout_rate + dropout_rate = model_dropout_date ) model = Model(encoder=encoder, classifier=classifier) +print(f"Model parameters count: ", sum(p.numel() for p in model.parameters())) model = torch.nn.DataParallel(model, device_ids=[int(idx) for idx in DEVICE_IDX.split(",")]).to(DEVICE) # The Road Less Scheduled https://arxiv.org/html/2405.15682v4 @@ -294,76 +353,99 @@ positive_counts = credit_train_dataset.targets_df.loc[credit_train_dataset.train negative_counts = len(credit_train_dataset.targets_df.loc[credit_train_dataset.train_uniq_client_ids]) - positive_counts pos_weight = negative_counts / (positive_counts + 1e-15) print(f"Class imbalance: {negative_counts} {positive_counts}. Pos weight: {pos_weight}") -criterion = torch.nn.BCEWithLogitsLoss(pos_weight=torch.tensor(pos_weight)) - - -# for parallel data selection -class WrapperDataset(Dataset): - def __init__(self, credit_dataset, encoder, batch_size): - self.credit_dataset = credit_dataset - self.encoder = encoder - self.batch_size = batch_size - def __len__(self): - return len(self.credit_dataset.uniq_client_ids) // self.batch_size +criterion = torch.nn.BCEWithLogitsLoss(pos_weight=torch.tensor(pos_weight)) - def __getitem__(self, idx): - cat_inputs, num_inputs, targets = credit_train_dataset.get_batch(batch_size=self.batch_size) - return cat_inputs, num_inputs, targets +training_data = WrapperDataset(credit_dataset=credit_train_dataset, batch_size=batch_size, datasets_per_epoch=datasets_per_epoch) +dataloader = DataLoader(training_data, batch_size=1, shuffle=False, num_workers=num_workers, pin_memory=True) -training_data = WrapperDataset(credit_train_dataset, encoder, batch_size=batch_size) -dataloader = DataLoader(training_data, batch_size=1, shuffle=False, num_workers=8*2, pin_memory=True) +batches_per_epoch = len(training_data) +print(f"Number of batches per epoch: {batches_per_epoch}, Number of datasets per epoch : {datasets_per_epoch}") test_auroc = AUROC(task='binary') -def test(epoch): - model.eval() - optimizer.eval() - with torch.no_grad(): - test_iterator = credit_train_dataset.get_test_batch_iterator(batch_size=batch_size) - for test_batch_id, (test_cat_inputs, test_num_inputs, test_targets) in enumerate(test_iterator): - test_cat_inputs, test_num_inputs, test_targets = [x.to("cuda", non_blocking=True) for x in [test_cat_inputs, test_num_inputs, test_targets]] - outputs, targets = model(test_cat_inputs, test_num_inputs, test_targets) - test_auroc.update(outputs, targets.long()) - print(f"\r {test_batch_id}/{len(credit_train_dataset.test_uniq_client_ids)//batch_size} {test_auroc.compute().item():.5f}", end = " "*2) - writer.add_scalar('test_roc_auc', test_auroc.compute().item(), epoch * batches_per_epoch) - print(f"\r {datetime.now() - start_time} {epoch}/{epochs} Test rocauc: {test_auroc.compute().item():.5f}", end = " "*2) - print() - start_time = datetime.now() print("Started at:", start_time) last_display_time = start_time last_checkpoint_time = start_time try: for epoch in range(epochs): - test(epoch) - for batch_id, (cat_inputs, num_inputs, targets) in enumerate(dataloader): + test( + start_time=start_time, + epoch=epoch, + batches_per_epoch=batches_per_epoch, + batch_size=batch_size, + model=model, + optimizer=optimizer, + credit_dataset=credit_train_dataset, + test_auroc=test_auroc, + writer=writer + ) + for batch_id, (cat_inputs, num_inputs, padding_mask, targets) in enumerate(dataloader): model.train() optimizer.train() optimizer.zero_grad() - cat_inputs, num_inputs, targets = [x.to("cuda", non_blocking=True) for x in [cat_inputs[0], num_inputs[0], targets[0]]] - outputs, targets = model(cat_inputs, num_inputs, targets) - loss = criterion(outputs, targets) + outputs = model( + cat_inputs[0].to("cuda"), + num_inputs[0].to("cuda"), + padding_mask[0].to("cuda") + ) + loss = criterion(outputs, targets[0].to("cuda")) loss.backward() optimizer.step() + current_time = datetime.now() if current_time - last_display_time > timedelta(seconds=1): last_display_time = current_time - writer.add_scalar('Loss', loss.item(), epoch*batches_per_epoch+batch_id) + writer.add_scalar(f'Loss', loss.item(), epoch*batches_per_epoch+batch_id) print(f"\r {current_time-start_time} {epoch+1}/{epochs} {batch_id}/{batches_per_epoch} loss: {loss.item():.6f} {comment}", end = " "*2) - if current_time - last_checkpoint_time > timedelta(hours=4): + if current_time - last_checkpoint_time > timedelta(hours=8): last_checkpoint_time = current_time - test(epoch) + test( + start_time=start_time, + epoch=epoch, + batches_per_epoch=batches_per_epoch, + batch_size=batch_size, + model=model, + optimizer=optimizer, + credit_dataset=credit_train_dataset, + test_auroc=test_auroc, + writer=None + ) + rocauc = test_auroc.compute().item() save_checkpoint( credit_dataset=credit_train_dataset, - encoder = encoder, model=model, optimizer=optimizer, epoch=epoch, - loss=loss.item(), rocauc=test_auroc.compute().item(), сheсkpoints_dir=сheсkpoints_dir) + encoder = model.module.encoder, + model=model.module.classifier, + optimizer=optimizer, + epoch=epoch, + loss=loss.item(), + rocauc=rocauc, + checkpoints_dir=checkpoints_dir + ) except KeyboardInterrupt: print() -finally: - test(epoch+1) +finally: + test( + start_time=start_time, + epoch=epoch+1, + batches_per_epoch=batches_per_epoch, + batch_size=batch_size, + model=model, + optimizer=optimizer, + credit_dataset=credit_train_dataset, + test_auroc=test_auroc, + writer=writer + ) + rocauc = test_auroc.compute().item() save_checkpoint( credit_dataset=credit_train_dataset, - encoder = encoder, model=model, optimizer=optimizer, epoch=epoch+1, - loss=loss.item(), rocauc=test_auroc.compute().item(), сheсkpoints_dir=сheсkpoints_dir) - writer.close() + encoder = model.module.encoder, + model=model.module.classifier, + optimizer=optimizer, + epoch=epoch, + loss=loss.item(), + rocauc=rocauc, + checkpoints_dir=checkpoints_dir + ) + writer.close() \ No newline at end of file