-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbackprop_network.py
157 lines (133 loc) · 7.08 KB
/
backprop_network.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import numpy as np
from scipy.special import softmax, logsumexp
class Network(object):
def __init__(self, sizes):
"""
The list ``sizes`` contains the number of neurons in the
respective layers of the network. For example, if the list
is [784, 40, 10] then it would be a three-layer network, with the
first layer (the input layer) containing 784 neurons, the second layer 40 neurons,
and the third layer (the output layer) 10 neuron. The biases and weights for the
network are initialized randomly, using a Gaussian
distribution centered around 0.
"""
self.num_layers = len(sizes) - 1
self.sizes = sizes
self.parameters = {}
for l in range(1, len(sizes)):
self.parameters['W' + str(l)] = np.random.randn(sizes[l], sizes[l-1]) * np.sqrt(2. / sizes[l-1])
self.parameters['b' + str(l)] = np.zeros((sizes[l], 1))
def cross_entropy_loss(self, logits, y_true):
m = y_true.shape[0]
# Compute log-sum-exp across each column for normalization
log_probs = logits - logsumexp(logits, axis=0)
y_one_hot = np.eye(10)[y_true].T # Assuming 10 classes
# Compute the cross-entropy loss
loss = -np.sum(y_one_hot * log_probs) / m
return loss
def relu(self,x):
return np.maximum(0, x)
def relu_derivative(self,x):
return np.where(x > 0, 1, 0)
def cross_entropy_derivative(self, logits, y_true):
""" Input: "logits": numpy array of shape (10, batch_size) where each column is the network output on the given example (before softmax)
"y_true": numpy array of shape (batch_size,) containing the true labels of the batch
Returns: a numpy array of shape (10,batch_size) where each column is the gradient of the loss with respect to y_pred (the output of the network before the softmax layer) for the given example.
"""
batch_size = y_true.shape[0]
y_one_hot = np.eye(10)[y_true].T
grad = softmax(logits, axis=0) - y_one_hot
grad /= batch_size
return grad
def forward_propagation(self, X):
"""Implement the forward step of the backpropagation algorithm.
Input: "X" - numpy array of shape (784, batch_size) - the input to the network
Returns: "ZL" - numpy array of shape (10, batch_size), the output of the network on the input X (before the softmax layer)
"forward_outputs" - A list of length self.num_layers containing the forward computation (parameters & output of each layer).
"""
forward_outputs = []
A = X #activations of each layer in the neural network. initialized to X, X is the activation of the input layer.
forward_outputs.append(('A0', A))
for l in range(1, self.num_layers + 1):
W = self.parameters['W' + str(l)]
b = self.parameters['b' + str(l)]
Z = np.dot(W, A) + b
if l < self.num_layers:
A = self.relu(Z)
else:
A = Z
forward_outputs.append((Z, A))
ZL = A
return ZL, forward_outputs
def backpropagation(self, ZL, Y, forward_outputs):
"""Implement the backward step of the backpropagation algorithm.
Input: "ZL" - numpy array of shape (10, batch_size), the output of the network on the input X (before the softmax layer)
"Y" - numpy array of shape (batch_size,) containing the labels of each example in the current batch.
"forward_outputs" - list of length self.num_layers given by the output of the forward function
Returns: "grads" - dictionary containing the gradients of the loss with respect to the network parameters across the batch.
grads["dW" + str(l)] is a numpy array of shape (sizes[l], sizes[l-1]),
grads["db" + str(l)] is a numpy array of shape (sizes[l],1).
"""
grads = {}
dZL = self.cross_entropy_derivative(ZL, Y)
for l in range(self.num_layers, 0, -1):
A_prev = forward_outputs[l - 1][1] # the activation from the previous layer
dW = np.dot(dZL, A_prev.T)
db = np.sum(dZL, axis=1, keepdims=True)
grads['dW' + str(l)] = dW
grads['db' + str(l)] = db
if l > 1:
W = self.parameters['W' + str(l)]
Z_prev = forward_outputs[l - 1][0 ]
dA_prev = np.dot(W.T, dZL)
dZL = dA_prev * self.relu_derivative(Z_prev)
return grads
#TODO: Implement the backward function
raise NotImplementedError
return grads
def sgd_step(self, grads, learning_rate):
"""
Updates the network parameters via SGD with the given gradients and learning rate.
"""
parameters = self.parameters
L = self.num_layers
for l in range(L):
parameters["W" + str(l+1)] -= learning_rate * grads["dW" + str(l+1)]
parameters["b" + str(l+1)] -= learning_rate * grads["db" + str(l+1)]
return parameters
def train(self, x_train, y_train, epochs, batch_size, learning_rate, x_test, y_test):
epoch_train_cost = []
epoch_test_cost = []
epoch_train_acc = []
epoch_test_acc = []
for epoch in range(epochs):
costs = []
acc = []
for i in range(0, x_train.shape[1], batch_size):
X_batch = x_train[:, i:i+batch_size]
Y_batch = y_train[i:i+batch_size]
ZL, caches = self.forward_propagation(X_batch)
cost = self.cross_entropy_loss(ZL, Y_batch)
costs.append(cost)
grads = self.backpropagation(ZL, Y_batch, caches)
self.parameters = self.sgd_step(grads, learning_rate)
preds = np.argmax(ZL, axis=0)
train_acc = self.calculate_accuracy(preds, Y_batch, batch_size)
acc.append(train_acc)
average_train_cost = np.mean(costs)
average_train_acc = np.mean(acc)
print(f"Epoch: {epoch + 1}, Training loss: {average_train_cost:.20f}, Training accuracy: {average_train_acc:.20f}")
epoch_train_cost.append(average_train_cost)
epoch_train_acc.append(average_train_acc)
# Evaluate test error
ZL, caches = self.forward_propagation(x_test)
test_cost = self.cross_entropy_loss(ZL, y_test)
preds = np.argmax(ZL, axis=0)
test_acc = self.calculate_accuracy(preds, y_test, len(y_test))
# print(f"Epoch: {epoch + 1}, Test loss: {test_cost:.20f}, Test accuracy: {test_acc:.20f}")
epoch_test_cost.append(test_cost)
epoch_test_acc.append(test_acc)
return self.parameters, epoch_train_cost, epoch_test_cost, epoch_train_acc, epoch_test_acc
def calculate_accuracy(self, y_pred, y_true, batch_size):
"""Returns the average accuracy of the prediction over the batch """
return np.sum(y_pred == y_true) / batch_size