-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsimple_nn_xy.py
156 lines (121 loc) · 6.11 KB
/
simple_nn_xy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# standard imports
import numpy as np
import matplotlib.pyplot as plt
import math
import random
# data handling imports
import pandas as pd
import csv
# tensorflow imports
import h5py
import tensorflow as tf
from tensorflow.python.framework import ops
import tf_utils
from data_augmentation_and_transformation import augment_scalar
# Creates Random Minibatches (credit: Andrew Ng and deeplearning.ai)
def random_mini_batches(X, Y, mini_batch_size = 180, seed = 0):
"""
Creates a list of random minibatches from (X, Y)
Arguments:
X -- input data, of shape (input size, number of examples)
Y -- true "label" vector (containing 0 if cat, 1 if non-cat), of shape (1, number of examples)
mini_batch_size - size of the mini-batches, integer
seed -- this is only for the purpose of grading, so that you're "random minibatches are the same as ours.
Returns:
mini_batches -- list of synchronous (mini_batch_X, mini_batch_Y)
"""
m = X.shape[1] # number of training examples
mini_batches = []
np.random.seed(seed)
# Step 1: Shuffle (X, Y)
permutation = list(np.random.permutation(m))
shuffled_X = X[:, permutation]
shuffled_Y = Y[:, permutation].reshape((Y.shape[0],m))
# Step 2: Partition (shuffled_X, shuffled_Y). Minus the end case.
num_complete_minibatches = math.floor(m/mini_batch_size) # number of mini batches of size mini_batch_size in your partitionning
for k in range(0, num_complete_minibatches):
mini_batch_X = shuffled_X[:, k * mini_batch_size : k * mini_batch_size + mini_batch_size]
mini_batch_Y = shuffled_Y[:, k * mini_batch_size : k * mini_batch_size + mini_batch_size]
mini_batch = (mini_batch_X, mini_batch_Y)
mini_batches.append(mini_batch)
# Handling the end case (last mini-batch < mini_batch_size)
if m % mini_batch_size != 0:
mini_batch_X = shuffled_X[:, num_complete_minibatches * mini_batch_size : m]
mini_batch_Y = shuffled_Y[:, num_complete_minibatches * mini_batch_size : m]
mini_batch = (mini_batch_X, mini_batch_Y)
mini_batches.append(mini_batch)
return mini_batches
# initializes tf parameters
def initialize_parameters():
# initializes W1 randomly as a 12 x 12 matrix
W1 = tf.get_variable("W1", [12,12], initializer = tf.contrib.layers.xavier_initializer(seed = 1))
# initializes b as zeros as a 12 x 1 vector
b1 = tf.get_variable("b1", [12,1], initializer = tf.zeros_initializer())
# initializes W1 randomly as a 12 x 12 matrix
W2 = tf.get_variable("W2", [12,12], initializer = tf.contrib.layers.xavier_initializer(seed = 1))
# initializes b as zeros as a 12 x 1 vector
b2 = tf.get_variable("b2", [12,1], initializer = tf.zeros_initializer())
# initializes W1 randomly as a 12 x 12 matrix
W3 = tf.get_variable("W3", [12,12], initializer = tf.contrib.layers.xavier_initializer(seed = 1))
# initializes b as zeros as a 12 x 1 vector
b3 = tf.get_variable("b3", [12,1], initializer = tf.zeros_initializer())
# returns parameters in dictionary
return {"W1": W1, "b1": b1, "W2": W2, "b2": b2, "W3": W3, "b3": b3}
def forward_prop(X, parameters):
# takes in input X and model parameters for Linear -> ReLu -> Linear -> ReLu -> Linear
# consistent with the initialize params shapes
# outputs output of the model
Z1 = tf.add(tf.matmul(parameters['W1'],X),parameters['b1']) # vectorized multiplication/addition step
A1 = tf.nn.relu(Z1) # ReLu step
Z2 = tf.add(tf.matmul(parameters['W2'],A1),parameters['b2']) # vectorized multiplication/addition step
A2 = tf.nn.relu(Z2) # ReLu step
Z3 = tf.add(tf.matmul(parameters['W3'],A2),parameters['b3']) # vectorized multiplication/addition step
return Z3 # returns output
# cost function
def compute_cost(Yhat, Y):
# takes in true output Y and estimated output Yhat
# returns cost as potision mean squared error
return tf.reduce_mean(tf.keras.losses.MSE(Yhat[0:6], Y[0:6]))
def optimize_params(X_train, Y_train, X_test, Y_test, learning_rate, num_epochs, minibatch_size):
# takes in training X and Y, test X and Y, learning rate, number of epochs, and minibatch size
# outputs optimized parameters
# cost record
costs = []
# placeholders for X and Y
X = tf.placeholder(tf.float32, shape = [X_train.shape[0], None])
Y = tf.placeholder(tf.float32, shape = [Y_train.shape[0] , None])
# initial parameters
parameters = initialize_parameters()
# forward propagation
Z3 = forward_prop(X, parameters)
# first cost
cost = compute_cost(Z3, Y)
# implements Adam Optimizer
optimizer = tf.train.AdamOptimizer(learning_rate = learning_rate).minimize(cost)
# variable initialization
init = tf.global_variables_initializer()
# start session
with tf.Session() as sess:
# run initialization
sess.run(init)
for epoch in range(num_epochs): # loops over epochs
epoch_cost = 0 # initialize zero epoch cost
# number of minibatches
num_minibatches = int(X_train.shape[1] / minibatch_size)
# defines minibatch
minibatches = random_mini_batches(X_train, Y_train, minibatch_size)
for minibatch in minibatches:# loops over minibatches
# runs optimizer on minibatch
(minibatch_X, minibatch_Y) = minibatch
_ , minibatch_cost = sess.run([optimizer, cost], feed_dict={X: minibatch_X, Y: minibatch_Y})
epoch_cost += minibatch_cost / minibatch_size
# appends cost list
costs.append(epoch_cost)
# plot the cost
plt.plot(np.squeeze(costs))
plt.ylabel('mean squared error')
plt.xlabel('Epoch')
plt.title("Simple Model on x-y Data: Learning rate =" + str(learning_rate))
plt.show()
# saves parameters returns final optimized parameters
return sess.run(parameters)