TensorFlow之DNN,一:构建“裸机版”全连接神经网络

博客断更了一周,干啥去了?想做个聊天机器人出来,去看教程了,然后大受打击,哭着回来补TensorFlow和自然语言处理的基础了。本来如意算盘打得挺响,作为一个初学者,直接看项目(不是指MINIST手写数字识别这种),哪里不会补哪里,这样不仅能学习到TensorFlow和算法知识,还知道如何在具体项目中应用,学完后还能出来一个项目。是不是要为博主的想法双击666?图样!

现在明白了什么叫基础不牢地动山摇,明白了什么叫步子太大直接就放弃,明白了我是适合循序渐进的学习,暂时不适合对着项目直接干。

同时也明白了一点,那就是为什么很多TensoFlow教程都用MINIST数据集来展示如何构建各种模型,我之前还很鄙视一MINIST到底,觉得这就是个toy项目。现在明白了,用这个数据集是为了把精力集中在模型搭建和优化上,而不是浪费在数据预处理上。知道数据的输入格式,明白如何构建模型和优化模型,那么面对新的任务时,只要把数据处理成相同的输入格式,就能比较快的用TensorFlow和模型来完成任务最关键的部分。

所以打算好好学习一下用TensorFlow实现深度学习模型的基础知识,主要用的这本书:《Hands On Machine Learning with Scikit-Learn and TensorFlow》。这本书真是神书,可以说是学习TensorFlow最好的资料了,一方面这本书把各种深度学习基础都用代码实现了,从DNN到CNN、RNN、自编码器,从参数初始化、选择优化器、Batch Norm、调整学习率、网络预训练等加速技巧到dropout、早停等正则化技巧,代码丰富,讲解详尽;另一方面作者一直在Github上更新代码,根据TensorFlow语法的变化更改代码,我看到前几天还在更新,感动哭。建议电子书和Github一起看。

好,接下来首先整理如何用TensorFlow构建DNN网络,实现参数初始化、选择优化算法、Batch Norm、梯度截断和学习率衰减这些加速技巧,以及实现dropout、L1范数和L2范数等正则化技巧。

这一篇博客整理如何用TensorFlow构建一个DNN网络,后面再写博客整理如何使用加速训练技巧和正则化技巧来优化模型,这样做也是为了方便对比。

一、全连接神经网络的“裸机版”

我们先用TensorFlow搭建一个没有使用任何加速优化技巧的全连接神经网络, 可以看作是低配的“裸机版”全连接神经网络。数据集是经典的MINIST数据集,训练一个DNN用于分类,有两个隐藏层(一个有300个神经元,另一个有100个神经元)和一个带有10个神经元的softmax输出层,用小批量梯度下降算法(Mini-Batch Gradient Descent)来进行训练。我们一步步来构建,最后再给出一份完整的代码。

第一步:指定输入的维度(每个样本的特征维度),输出的维度(类别数),并设置每个隐藏层神经元的数量

MINIST数据集就是手写数字识别数据集,里面的数字图片是黑白的,所以通道数是1,那么特征维度就是长和高两维,为了方便输入到网络中,把2维矩阵拉平成1维的向量,就是28 * 28。而标签是0-9这10个数字,所以输出是10维。将第2个隐藏层的神经元设为100个,小于第1个隐藏层的神经元数,因为神经网络中越深的隐藏层,提取到的特征越高级,维度越小。

import tensorflow as tf
import numpy as np

n_inputs = 28*28  
n_hidden1 = 300
n_hidden2 = 100
n_outputs = 10

第二步:切分数据集,打乱顺序,并生成小批量样本

首先获取数据,推荐用tf.keras.datasets来获取这个内置的数据集,速度快。训练集和测试集都可以直接获取,再把训练集切分为用于训练的样本和用于验证的样本(5000个)。

然后用np.random.permutation这个函数打乱训练样本的索引列表,再用np.array_split这个函数把索引列表进行切分,用来获取小批量样本。

(X_train, y_train), (X_test, y_test) = tf.keras.datasets.mnist.load_data()

# 一开始是三维数组,(60000, 28, 28)
print(X_train.shape,y_train.shape)

# 把数据重新组合为二维数组,(60000, 784)
X_train = X_train.astype(np.float32).reshape(-1, 28*28) / 255.0
X_test = X_test.astype(np.float32).reshape(-1, 28*28) / 255.0
y_train = y_train.astype(np.int32)
y_test = y_test.astype(np.int32)
print(X_train.shape)
X_valid, X_train = X_train[:5000],X_train[5000:]
y_valid, y_train = y_train[:5000],y_train[5000:]

# 打乱数据,并生成batch
def shuffle_batch(X, y, batch_size):
    # permutation不直接在原来的数组上进行操作,而是返回一个新的打乱顺序的数组,并不改变原来的数组。
    rnd_idx = np.random.permutation(len(X))
    n_batches = len(X) // batch_size
    # 把rnd_idx这个一位数组进行切分
    for batch_idx in np.array_split(rnd_idx, n_batches):
        X_batch, y_batch = X[batch_idx], y[batch_idx]
        yield X_batch, y_batch

第三步:定义占位符节点

为了使用小批量梯度下降算法,我们需要使用占位符节点,然后在每次迭代时用下一个小批量样本替换X和y。这些占位符节点在训练阶段才将小批量样本传入给TensorFlow,目前是构建图阶段,不执行运算。

注意到X和y的形状中有None,也就是只定义了一部分。为什么呢?

X是一个2D张量,第一个维度是样本数,第二个维度是特征。我们知道特征的数量是28*28,但是还不知道batch size是多少,所以第一个维度指定为None,也就是任意大小。

y是一个1D张量,维度是样本数,同样指定为None。

X = tf.placeholder(tf.float32, shape=(None, n_inputs), name="X")
y = tf.placeholder(tf.int32, shape=(None), name="y")

第四步:搭建网络层

注意最后一层并没有定义softmax函数来求出概率分布,而是得到通过softmax函数激活之前的输入值logtis。接下来会说明为什么这里不定义一个softmax激活函数。

当然输出的概率分布也可以计算出来,用tf.nn.softmax,但是这个值我们并不会用于下面的计算。

with tf.name_scope("dnn"):
    hidden1 = tf.layers.dense(X, n_hidden1, name="hidden1",
                              activation=tf.nn.relu)
    hidden2 = tf.layers.dense(hidden1, n_hidden2, name="hidden2",
                              activation=tf.nn.relu)
    logits = tf.layers.dense(hidden2, n_outputs, name="outputs")
    y_proba = tf.nn.softmax(logits)

第五步:定义损失函数和优化器

损失函数用交叉熵损失函数。tf.nn.sparse_softmax_cross_entropy_with_logits这个老长老长的函数,是用softmax激活之前的输入值直接计算交叉熵损失,为什么这么干呢?一方面是为了正确处理像log等于0的极端情况,这是为啥不先用softmax激活,另一方面是与样本标签的格式相匹配,我们说了样本的标签是1D张量,也就是1或者7这种整数,这是这个函数需要的格式,而不是[0, 1, 0, ..., 0]这种独热编码格式。独热编码格式的标签需要使用tf.nn.softmax_cross_entropy_with_logits这个函数。

然后定义一个GD优化器。

# 定义损失函数和计算损失
with tf.name_scope("loss"): xentropy = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=y, logits=logits) loss = tf.reduce_mean(xentropy, name="loss") # 定义优化器 learning_rate = 0.01 with tf.name_scope("train"): optimizer = tf.train.GradientDescentOptimizer(learning_rate) training_op = optimizer.minimize(loss)

第六步:评估模型

使用准确性作为我们的模型评估指标。首先,对于每个样本,通过检查logits最大值的索引是否与标签y相等,来确定神经网络的预测是否正确。为此,可以使用in_top_k()函数,这会返回一个充满布尔值的1D张量。

然后我们需要用tf.cast这个函数将这些布尔值转换为浮点值,然后计算平均值。

# 评估模型,使用准确性作为我们的绩效指标
with tf.name_scope("eval"):
    # logists最大值的索引在0-9之间,恰好就是被预测所属于的类,因此和y进行对比,相等就是True,否则为False
    correct = tf.nn.in_top_k(logits, y, 1)
    accuracy = tf.reduce_mean(tf.cast(correct, tf.float32))

第七步:初始化所有变量,并保存模型

init = tf.global_variables_initializer()
saver = tf.train.Saver()

第八步:定义训练轮次和batch size,训练模型和保存模型,记录训练时间

训练40轮(epoch),把所有的样本都输入进去训练一次,叫做一轮。然后一个小批量是输入200个样本。

在训练阶段可以把小批量样本传入模型中开始训练了。

定义了一个记录训练时间的函数,因为训练时间的长短也是调参的关注点之一。

# 定义好训练轮次和batch-size
n_epochs = 40
batch_size = 200

# 获取计算所花费的时间
import time
from datetime import timedelta
def get_time_dif(start_time):
    end_time = time.time()
    time_dif = end_time - start_time
    
    #timedelta是用于对间隔进行规范化输出,间隔10秒的输出为:00:00:10    
    return timedelta(seconds=int(round(time_dif)))

with tf.Session() as sess:
    init.run()
    start_time = time.time()
    
    for epoch in range(n_epochs):
        for X_batch, y_batch in shuffle_batch(X_train, y_train, batch_size):
            sess.run(training_op, feed_dict={X: X_batch, y: y_batch})
        if epoch % 5 == 0:
            acc_batch = accuracy.eval(feed_dict={X: X_batch, y: y_batch})
            acc_val = accuracy.eval(feed_dict={X: X_valid, y: y_valid})
            print(epoch, "Batch accuracy:", acc_batch, "Val accuracy:", acc_val)
            
    time_dif = get_time_dif(start_time)
    print("\nTime usage:", time_dif)
    save_path = saver.save(sess, "./my_model_final.ckpt")

得到的输出结果如下。用时22秒,感觉效果贼一般啊,最后一轮的验证精度为96.3%。后面通过调整batch size,看是否能得到更好的结果。

0 Batch accuracy: 0.835 Val accuracy: 0.8132
5 Batch accuracy: 0.895 Val accuracy: 0.9178
10 Batch accuracy: 0.94 Val accuracy: 0.934
15 Batch accuracy: 0.96 Val accuracy: 0.9414
20 Batch accuracy: 0.955 Val accuracy: 0.948
25 Batch accuracy: 0.94 Val accuracy: 0.9534
30 Batch accuracy: 0.96 Val accuracy: 0.9576
35 Batch accuracy: 0.955 Val accuracy: 0.9608
39 Batch accuracy: 0.96 Val accuracy: 0.963

Time usage: 0:00:22

第九步:调用训练好的模型进行预测

把保存好的模型恢复,然后对测试集中的20个样本进行预测,发现全部预测正确。

再评估模型在全部测试集上的正确率,得到正确率为95.76%。

效果确实贼一般。

with tf.Session() as sess:
    saver.restore(sess, "./my_model_final.ckpt") # or better, use save_path
    X_test_20 = X_test[:20]
    # 得到softmax之前的输出
    Z = logits.eval(feed_dict={X: X_test_20})
    # 得到每一行最大值的索引
    y_pred = np.argmax(Z, axis=1)
    print("Predicted classes:", y_pred)
    print("Actual calsses:   ", y_test[:20])
    # 评估在测试集上的正确率
    acc_test = accuracy.eval(feed_dict={X: X_test, y: y_test})
    print("\nTest_accuracy:", acc_test)
INFO:tensorflow:Restoring parameters from ./my_model_final.ckpt
Predicted classes: [7 2 1 0 4 1 4 9 6 9 0 6 9 0 1 5 9 7 3 4]
Actual calsses:    [7 2 1 0 4 1 4 9 5 9 0 6 9 0 1 5 9 7 3 4]

Test_accuracy: 0.9576

二、对全连接神经网络进行微调

这里的微调是调整batch size,学习率和迭代轮次。調参还是有点花时间,我就以调整batch size为例,来看看是否可以得到更好的结果。

分别把batch size设置为200,100,50,10,训练好模型后,计算在测试集上的正确率,分别为:

batch_size = 200    Test_accuracy: 0.9576

batch_size = 100    Test_accuracy: 0.9599

batch_size = 50     Test_accuracy: 0.9781

batch_size = 10     Test_accuracy: 0.9812

发现规律没有,在这个数据集上,batch size越小,则测试精度越高。batch size为10的时候,测试精度达到了98.12%,还是很不错的。

然后我想,如果用SGD,每次只输入一个样本又会怎么样呢?测试精度还能不能提高我不知道,可是能确定的一点是,我点击运行之后,我可以去听几首邓紫棋的歌再回来了。然后我就去跑步了。

我跑步和听歌回来了,用SGD训练模型耗时26分30秒,测试精度为98.51%,取得了到目前为止最好的成绩。

三、完整代码整理

import tensorflow as tf
import numpy as np
import time
from datetime import timedelta

# 记录训练花费的时间
def get_time_dif(start_time):
    end_time = time.time()
    time_dif = end_time - start_time
    #timedelta是用于对间隔进行规范化输出,间隔10秒的输出为:00:00:10    
    return timedelta(seconds=int(round(time_dif)))


n_inputs = 28*28  
n_hidden1 = 300
n_hidden2 = 100
n_outputs = 10

(X_train, y_train), (X_test, y_test) = tf.keras.datasets.mnist.load_data()

# 一开始是三维数组,(60000, 28, 28)
print(X_train.shape,y_train.shape)

# 把数据重新组合为二维数组,(60000, 784)
X_train = X_train.astype(np.float32).reshape(-1, 28*28) / 255.0
X_test = X_test.astype(np.float32).reshape(-1, 28*28) / 255.0
y_train = y_train.astype(np.int32)
y_test = y_test.astype(np.int32)
print(X_train.shape)
X_valid, X_train = X_train[:5000],X_train[5000:]
y_valid, y_train = y_train[:5000],y_train[5000:]

# 打乱数据,并生成batch
def shuffle_batch(X, y, batch_size):
    # permutation不直接在原来的数组上进行操作,而是返回一个新的打乱顺序的数组,并不改变原来的数组。
    rnd_idx = np.random.permutation(len(X))
    n_batches = len(X) // batch_size
    # 把rnd_idx这个一位数组进行切分
    for batch_idx in np.array_split(rnd_idx, n_batches):
        X_batch, y_batch = X[batch_idx], y[batch_idx]
        yield X_batch, y_batch
        
X = tf.placeholder(tf.float32, shape=(None, n_inputs), name="X")
y = tf.placeholder(tf.int32, shape=(None), name="y")

with tf.name_scope("dnn"):
    hidden1 = tf.layers.dense(X, n_hidden1, name="hidden1",
                              activation=tf.nn.relu)
    hidden2 = tf.layers.dense(hidden1, n_hidden2, name="hidden2",
                              activation=tf.nn.relu)
    logits = tf.layers.dense(hidden2, n_outputs, name="outputs")
    y_proba = tf.nn.softmax(logits)
    
# 定义损失函数和计算损失
with tf.name_scope("loss"):

    xentropy = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=y,
                                                              logits=logits)
    loss = tf.reduce_mean(xentropy, name="loss")
    
# 定义优化器
learning_rate = 0.01
with tf.name_scope("train"):
    optimizer = tf.train.GradientDescentOptimizer(learning_rate)
    training_op = optimizer.minimize(loss)
    
# 评估模型,使用准确性作为我们的绩效指标
with tf.name_scope("eval"):
    # logists最大值的索引在0-9之间,恰好就是被预测所属于的类,因此和y进行对比,相等就是True,否则为False
    correct = tf.nn.in_top_k(logits, y, 1)
    accuracy = tf.reduce_mean(tf.cast(correct, tf.float32))
    
init = tf.global_variables_initializer()
saver = tf.train.Saver()

# 定义好训练轮次和batch-size
n_epochs = 40
batch_size = 200

with tf.Session() as sess:
    init.run()
    start_time = time.time()
    
    for epoch in range(n_epochs):
        for X_batch, y_batch in shuffle_batch(X_train, y_train, batch_size):
            sess.run(training_op, feed_dict={X: X_batch, y: y_batch})
        if epoch % 5 == 0 or epoch == 39:
            acc_batch = accuracy.eval(feed_dict={X: X_batch, y: y_batch})
            acc_val = accuracy.eval(feed_dict={X: X_valid, y: y_valid})
            print(epoch, "Batch accuracy:", acc_batch, "Val accuracy:", acc_val)
            
    time_dif = get_time_dif(start_time)
    print("\nTime usage:", time_dif)
    save_path = saver.save(sess, "./my_model_final.ckpt")
with tf.Session() as sess:
    saver.restore(sess, "./my_model_final.ckpt") # or better, use save_path
    X_test_20 = X_test[:20]
    # 得到softmax之前的输出
    Z = logits.eval(feed_dict={X: X_test_20})
    # 得到每一行最大值的索引
    y_pred = np.argmax(Z, axis=1)
    print("Predicted classes:", y_pred)
    print("Actual calsses:   ", y_test[:20])
    # 评估在测试集上的正确率
    acc_test = accuracy.eval(feed_dict={X: X_test, y: y_test})
    print("\nTest_accuracy:", acc_test)

参考资料:

《Hands On Machine Learning with Scikit-Learn and TensorFlow》