-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmlp_pytorch.py
118 lines (104 loc) · 4.52 KB
/
mlp_pytorch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import numpy as np
from torch.utils.data import Dataset
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
import random
from Adam import AdamO
class Feature_DataSet(Dataset):
def __init__(self, X_train, Y_train, lan_detected_train):
self.X_train = X_train
self.Y_train = Y_train
self.lan_detected_train = [0 if l == "en" else 1 for l in lan_detected_train]
def __getitem__(self, index: int):
x = self.X_train[index]
y = self.Y_train[index]
l = self.lan_detected_train[index]
return x, y, l
def __len__(self):
return len(self.X_train)
class Proj_Model(nn.Module):
def __init__(self, in_dim, out_dim):
super(Proj_Model, self).__init__()
self.clf = nn.Linear(in_dim, out_dim)
def forward(self, x):
y = self.clf(x)
return y
def set_random_seed(seed):
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed(seed)
if torch.cuda.device_count() > 1: torch.cuda.manual_seed_all(seed)
np.random.seed(seed)
random.seed(seed)
return seed
class MLP:
def __init__(self, seed) -> None:
self.seed = set_random_seed(seed)
self.use_cuda = torch.cuda.is_available()
self.device = torch.device("cuda" if self.use_cuda else "cpu")
self.batch_size = 32
self.lr_init = 1e-2
self.epochs = 100
self.num_workers = 0
self.penalty = "l2"
self.l_lambda = 0.1
def fit(self, X_train, Y_train, lan_detected_train):
dataset_train = Feature_DataSet(X_train, Y_train, lan_detected_train)
dataloader_train = DataLoader(dataset=dataset_train, batch_size = self.batch_size, num_workers = self.num_workers, drop_last = True, shuffle = True)
self.model = Proj_Model(in_dim = len(X_train[0]), out_dim = 2).to(self.device)
optimizer = AdamO(self.model.parameters())
lr_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer = optimizer, T_max = len(dataloader_train)*self.epochs)
self.model.train()
for i in range(self.epochs):
for batch_idx, (x, y, l) in enumerate(dataloader_train):
x = x.to(self.device).float()
y = y.to(self.device).long()
yhat = self.model(x)
loss = self.cross_entropy_ln_penalty(yhat, y, l)
optimizer.zero_grad()
loss.backward()
optimizer.step()
lr_scheduler.step()
def predict(self, X_test):
dataset_test = Feature_DataSet(X_test, X_test, ["en" for i in X_test])
dataloader_test = DataLoader(dataset=dataset_test, batch_size = self.batch_size, num_workers = self.num_workers, drop_last = False, shuffle = False)
yhat_all = []
self.model.eval()
with torch.no_grad():
for batch_idx, (x, _, _, ) in enumerate(dataloader_test):
x = x.to(self.device).float()
yhat = self.model(x)
outputs = F.softmax(yhat, dim=1)
_, yhat = torch.max(outputs.data, 1)
yhat = yhat.squeeze().data.cpu().numpy()
try:
yhat[0]
except:
yhat = np.array([yhat.item()])
yhat_all.append(yhat)
y_pred = np.concatenate(yhat_all, axis=0)
return y_pred
def cross_entropy_ln_penalty(self, yhat, Y_label, lan): #pred, train_Y
weights_category = {0:{0:0.4, 1:0.3}, 1:{0:0.2, 1:0.3}}
loss = F.cross_entropy(yhat, Y_label, reduction = 'none')
weight = [weights_category[y][l] for y, l in zip(Y_label.detach().data.cpu().tolist(), lan.detach().data.cpu().tolist())]
weight = torch.tensor(weight).to(loss)
loss = loss * weight
loss_mean = loss.mean()
if self.penalty == "l2":
l2_reg = torch.tensor(0., requires_grad=True)
for name, param in self.model.named_parameters():
if 'weight' in name:
l2_reg = l2_reg + torch.norm(param, 2)
l_reg = self.l_lambda * l2_reg
loss_mean += l_reg
elif self.penalty == "l1":
l1_reg = torch.tensor(0., requires_grad=True)
for name, param in self.model.named_parameters():
if 'weight' in name:
l1_reg = l1_reg + torch.norm(param, 1)
l_reg = self.l_lambda * l1_reg
loss_mean += l_reg
return loss_mean