-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathFICNN RoughDraft
200 lines (171 loc) · 7.88 KB
/
FICNN RoughDraft
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
'''Rough Draft of an Input Convex Neural Network (ICNN), built using PyTorch, that has a simple structure,
contains biases for all linear layers, and uses a softplus activation function.'''
import torch
from torch import nn, Tensor
import torch.nn.functional as F
import torch.nn.init as init
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import r2_score
'''The MPS device enables high-performance training on GPU for MacOS devices with Metal programming framework.
It introduces a new device to map Machine Learning computational graphs and primitives on highly efficient
Metal Performance Shaders Graph framework and tuned kernels provided by Metal Performance Shaders framework respectively.'''
# Check that MPS is available, and if available, sets it up, else it prints why MPS is not available.
if not torch.backends.mps.is_available():
if not torch.backends.mps.is_built():
print("MPS not available because the current PyTorch install was not "
"built with MPS enabled.")
else:
print("MPS not available because the current MacOS version is not 12.3+ "
"and/or you do not have an MPS-enabled device on this machine.")
else:
mps_device = torch.device("mps")
print("running on the GPU")
class FICNN(nn.Module):
def __init__(self, input_size, hidden_layers_dim, num_hidden_layers, output_size):
super(FICNN, self).__init__()
self.input_size = input_size
self.hidden_layers_dim = hidden_layers_dim
self.num_hidden_layers = num_hidden_layers
self.output_size = output_size
# z_flow is a list of linear layers representing the forward linear flow from y to z1 to z2 to ... zk
z_flow = list()
# nn.Linear applies a linear transformation to incoming data: output = input*weight + bias
z_flow.append(nn.Linear(input_size, hidden_layers_dim))
for layer in range(num_hidden_layers - 1):
z_flow.append(nn.Linear(hidden_layers_dim, hidden_layers_dim))
# ensures that the z-flow weights is non-negative, which is a constraint from Proposition 1 of the ICNN paper
F.relu(z_flow[-1].weight)
z_flow.append(nn.Linear(hidden_layers_dim, output_size))
F.relu(z_flow[-1].weight)
# nn.ModuleList outputs a list of all the modules in the inputted flow
self.z_traversal = nn.ModuleList(z_flow)
# y_flow is a list of linear layers representing the forward linear connections between y and z2, y and z3, ..., y and zk
y_flow = list()
for layer in range(num_hidden_layers - 1):
y_flow.append(nn.Linear(input_size, hidden_layers_dim))
y_flow.append(nn.Linear(input_size, output_size))
self.y_traversal = nn.ModuleList(y_flow)
# Softplus is convex and nondecreasing thus satisfies Proposition 1's criteria for network convexity
self.activation_function = nn.Softplus()
def forward(self, x):
# vector holds the activation of each neuron at a given step and serves as the final output vector of the neural network
vector = self.activation_function(self.z_traversal[0](x))
for z_flow, y_flow in zip(self.z_traversal[1:-1], self.y_traversal[:-1]):
vector = self.activation_function(z_flow(vector) + y_flow(x))
vector = self.z_traversal[-1](vector) + self.y_traversal[-1](x)
return self.activation_function(vector)
'''This next section will run test forward and backward passes of the network
icnn = FICNN(input_size, hidden_layers_dim, num_hidden_layers, output_size) Input these bad boys'''
icnn = FICNN(1, 8, 1, 1)
# Define the data batches 'trainset' here. 'trainset' architecture: batch size 16, 2048 batches, and then some dimensionality of each x-value with its corresponding norm(x)^2
num_of_batches = 2048 * 16
batch_size = 16
trainset = list()
# not sure whether this commented out code is still necessary
#batch = list()
#for _ in range(num_of_batches):
#batch.append(np.random.rand(batch_size) * 2 - 1)
#batch.append(batch[0] ** 2)
#trainset.append(batch)
#batch.clear()
# The following section generates the data in the form necessary to pass into the training loop
batch = np.empty([batch_size,2])
#for i in range(batch_size):
#n = np.random.rand() * 2 - 1 # * 2 - 1 for normal distribution
#batch[i, 0] = n
#batch[i, 1] = n ** 2
#trainset.append(batch)
for _ in range(num_of_batches):
# batch = np.zeros([batch_size,2])
for i in range(batch_size):
n = np.random.rand() # * 2 - 1 for unifrom distribution in the interval [-1,1]
batch[i, 0] = n
batch[i, 1] = n ** 2
trainset.append(batch)
# Mean-Squared Error Loss function is preferred for regression problems and L1 loss is preferred for dealing with small values
mse_loss = nn.MSELoss()
l1 = nn.L1Loss()
optimizer = optim.Adam(icnn.parameters(), lr=0.0001) # Adam optimizer is the best for our purposes
#Forward pass of the network
EPOCHS = 2 # Number of times to loop through the data
for epoch in range(EPOCHS):
# initializes values to be used for visualization
batch_num = 0
batch_graph =[]
for data in trainset:
X = torch.tensor(np.array([data[:,0]]).T)
y = torch.tensor(np.array([data[:,1]]).T)
output = icnn.forward(X.float()) # Gotta fix the batch input rather than a single input
loss = l1(output, y.float())
loss.backward()
optimizer.step()
# creates a list of points where the x-value is the batch number and y-value is the loss
batch_num += 1
batch_graph.append([batch_num,loss])
for z_flow in icnn.z_traversal:
F.relu(z_flow.weight) # guarantees that the z-flow weights are non-negative
print("The loss for epoch ", (epoch + 1), "is:", loss.item())
# plots the loss for each batch in the epoch and displays
plt.figure(figsize = (9, 9))
batch_plot_X = [(point[0]) for point in batch_graph]
batch_plot_Y = [(point[1]) for point in batch_graph]
plt.plot(batch_plot_X, batch_plot_Y, label = 'Loss over batches in Training Set')
plt.legend()
plt.show()
should_test = True
if should_test:
# Create testing batch of manual inputs
test_values = [1, 0, 0.5, -1, -0.5]
test_batch_size = len(test_values)
for i in range(test_batch_size):
x = test_values[i]
fx = x ** 2
test_batch = np.empty([1, 2])
test_batch[0, 0] = x
test_batch[0, 1] = fx
test_X = torch.tensor(np.array([test_batch[:,0]]).T)
test_Y = torch.tensor(np.array([test_batch[:,1]]).T)
test_output = icnn.forward(test_X.float())
test_loss = mse_loss(test_output, test_Y.float())
print("Testing: x = " + str(x) + " Expected: y = " + str(fx) + " Actual: y = " + str(test_output.item()))
print("Loss: " + str(test_loss.item()))
# Testing batch of random inputs
test_size = 50
test = np.zeros([test_size,2])
#random.seed
for i in range(test_size):
test[i,0] = np.random.rand() # * 2 - 1
test[i,1] = test[i,0] ** 2
print(test)
X = torch.tensor(np.array([test[:,0]]).T)
y = torch.tensor(np.array([test[:,1]]).T)
output = icnn.forward(X.float())
print('\n Output is: \n', output)
loss = l1(output, y.float())
print('\nLoss is:', loss.item())
# sets style and size
plt.style.use('ggplot')
plt.figure(figsize = (9, 9))
# there is probably a simpler way to do this, but this converts the X-tensor into a list of floats
plot_X = [x[0] for x in X.tolist()]
# sorts the X- values, needed for matplotlib to plot it all as one line
plot_X.sort()
# creates a list of true y-values corresponding x-values to plot the true line
true_y = [x*x for x in plot_X]
# plots the true line with true points
plt.plot(plot_X, true_y, label = 'true line')
plt.scatter(plot_X, true_y)
pred_y = [y[0] for y in output.tolist()]
# this part may be problematic, it assumes that the neural network will get all the y-values in the correct order
# there is probably a better way to do this, but it is working for now
pred_y.sort()
# plots the predicted line with predicted points
plt.plot(plot_X, pred_y, 'blue', label = 'predicted line')
plt.scatter(plot_X, pred_y)
# adds the function and r2 score to the title, and displays
score = 'r2 score: ' + str(r2_score(true_y, pred_y))
plt.title('x^2 with ' + score)
plt.legend()
plt.show()