python构建深度神经网络,DNN续

这篇文章在前一篇文章:python构建深度神经网络(DNN)的基础上,添加了一下几个内容:

1) 正则化项

2) 调出中间损失函数的输出

3) 构建了交叉损失函数

4) 将训练好的网络进行保存,并调用用来测试新数据

1 数据预处理

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2017-03-12 15:11
# @Author  : CC
# @File    : net_load_data.py

from numpy import *
import numpy as np
import cPickle
def load_data():
    """载入解压后的数据,并读取"""
    with open('data/mnist_pkl/mnist.pkl','rb') as f:
        try:
            train_data,validation_data,test_data = cPickle.load(f)
            print " the file open sucessfully"
            # print train_data[0].shape  #(50000,784)
            # print train_data[1].shape   #(50000,)
            return (train_data,validation_data,test_data)
        except EOFError:
            print 'the file open error'
            return None

def data_transform():
    """将数据转化为计算格式"""
    t_d,va_d,te_d = load_data()
    # print t_d[0].shape  # (50000,784)
    # print te_d[0].shape  # (10000,784)
    # print va_d[0].shape  # (10000,784)
    # n1 = [np.reshape(x,784,1) for x in t_d[0]] # 将5万个数据分别逐个取出化成(784,1),逐个排列
    n = [np.reshape(x, (784, 1)) for x in t_d[0]]  # 将5万个数据分别逐个取出化成(784,1),逐个排列
    # print 'n1',n1[0].shape
    # print 'n',n[0].shape
    m = [vectors(y) for y in t_d[1]] # 将5万标签(50000,1)化为(10,50000)
    train_data = zip(n,m)  # 将数据与标签打包成元组形式
    n = [np.reshape(x, (784, 1)) for x in va_d[0]]  # 将5万个数据分别逐个取出化成(784,1),排列
    validation_data = zip(n,va_d[1])   # 没有将标签数据矢量化
    n = [np.reshape(x, (784, 1)) for x in te_d[0]]  # 将5万个数据分别逐个取出化成(784,1),排列
    test_data = zip(n, te_d[1])  # 没有将标签数据矢量化
    # print train_data[0][0].shape  #(784,)
    # print "len(train_data[0])",len(train_data[0]) #2
    # print "len(train_data[100])",len(train_data[100]) #2
    # print "len(train_data[0][0])", len(train_data[0][0]) #784
    # print "train_data[0][0].shape", train_data[0][0].shape #(784,1)
    # print "len(train_data)", len(train_data)  #50000
    # print train_data[0][1].shape  #(10,1)
    # print test_data[0][1] # 7
    return (train_data,validation_data,test_data)
def vectors(y):
    "赋予标签"
    label = np.zeros((10,1))
    label[y] = 1.0 #浮点计算
    return label

2 网络定义和训练

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2017-03-28 10:18
# @Author  : CC
# @File    : net_network2.py

from numpy import *
import numpy as np
import operator
import json
# import sys

class QuadraticCost():
    """定义二次代价函数类的方法"""
    @staticmethod
    def fn(a,y):
        cost = 0.5*np.linalg.norm(a-y)**2
        return cost
    @staticmethod
    def delta(z,a,y):
        delta = (a-y)*sig_derivate(z)
        return delta

class CrossEntroyCost():
    """定义交叉熵函数类的方法"""
    @staticmethod
    def fn(a, y):
        cost = np.sum(np.nan_to_num(-y*np.log(a)-(1-y)*np.log(1-a)))  # not a number---0,  inf---larger number
        return cost
    @staticmethod
    def delta(z, a, y):
        delta = (a - y)
        return delta

class Network(object):
    """定义网络结构和方法"""
    def __init__(self,sizes,cost):
        self.num_layer = len(sizes)
        self.sizes = sizes
        self.cost = cost
        # print "self.cost.__name__:",self.cost.__name__  # CrossEntropyCost
        self.default_weight_initializer()
    def default_weight_initializer(self):
        """权值初始化"""
        self.bias = [np.random.rand(x, 1) for x in self.sizes[1:]]
        self.weight = [np.random.randn(y, x)/float(np.sqrt(x)) for (x, y) in zip(self.sizes[:-1], self.sizes[1:])]

    def  large_weight_initializer(self):
        """权值另一种初始化"""
        self.bias = [np.random.rand(x, 1) for x in self.sizes[1:]]
        self.weight = [np.random.randn(y, x) for x, y in zip(self.sizes[:-1], self.sizes[1:])]
    def forward(self,a):
        """forward the network"""
        for w,b in zip(self.weight,self.bias):
            a=sigmoid(np.dot(w,a)+b)
        return a

    def SGD(self,train_data,min_batch_size,epochs,eta,test_data=False,
            lambd = 0,
            monitor_train_cost = False,
            monitor_train_accuracy = False,
            monitor_test_cost=False,
            monitor_test_accuracy=False
            ):
        """1)Set the train_data,shuffle;
           2) loop the epoches,
           3) set the min_batches,and rule of update"""
        if test_data: n_test=len(test_data)
        n = len(train_data)
        for i in xrange(epochs):
            random.shuffle(train_data)
            min_batches = [train_data[k:k+min_batch_size] for k in xrange(0,n,min_batch_size)]

            for min_batch in min_batches:    # 每次提取一个批次的样本
                self.update_minbatch_parameter(min_batch,eta,lambd,n)
            train_cost = []
            if monitor_train_cost:
                cost1 = self.total_cost(train_data,lambd,cont=False)
                train_cost.append(cost1)
                print "epoche {0},train_cost: {1}".format(i,cost1)
            if monitor_train_accuracy:
                accuracy = self.accuracy(train_data,cont=True)
                train_cost.append(accuracy)
                print "epoche {0}/{1},train_accuracy: {2}".format(i,epochs,accuracy)
            test_cost = []
            if monitor_test_cost:
                cost1 = self.total_cost(test_data,lambd)
                test_cost.append(cost1)
                print "epoche {0},test_cost: {1}".format(i,cost1)
            test_accuracy = []
            if monitor_test_accuracy:
                accuracy = self.accuracy(test_data)
                test_cost.append(accuracy)
                print "epoche:{0}/{1},test_accuracy:{2}".format(i,epochs,accuracy)
        self.save(filename= "net_save") #保存网络网络参数

    def total_cost(self,train_data,lambd,cont=True):
        cost1 = 0.0
        for x,y in train_data:
            a = self.forward(x)
            if cont: y = vectors(y)  #将测试样本标签化为矩阵
            cost1 += (self.cost).fn(a,y)/len(train_data)
        cost1 += lambd/len(train_data)*np.sum(np.linalg.norm(weight)**2 for weight in self.weight) #加上权值项
        return cost1
    def accuracy(self,train_data,cont=False):
        if cont:
            output1 = [(np.argmax(self.forward(x)),np.argmax(y)) for (x,y) in train_data]
        else:
            output1 = [(np.argmax(self.forward(x)), y) for (x, y) in train_data]
        return sum(int(out1 == y) for (out1, y) in output1)
    def update_minbatch_parameter(self,min_batch, eta,lambd,n):
        """1) determine the weight and bias
           2) calculate the the delta
           3) update the data """
        able_b = [np.zeros(b.shape) for b in self.bias]
        able_w=[np.zeros(w.shape) for w in self.weight]
        for x,y in min_batch:  #每次只取一个样本?
            deltab,deltaw = self.backprop(x,y)
            able_b =[a_b+dab for a_b, dab in zip(able_b,deltab)]  #实际上对dw,db做批次累加,最后小批次取平均
            able_w = [a_w + daw for a_w, daw in zip(able_w, deltaw)]
        self.weight = [weight - eta * (dw) / len(min_batch)- eta*(lambd*weight)/n  for weight, dw in zip(self.weight,able_w) ]
        #增加正则化项:eta*lambda/m *weight
        self.bias = [bias - eta * db / len(min_batch) for bias, db in zip(self.bias, able_b)]

    def backprop(self,x,y):
        """" 1) clacu the forward value
            2) calcu the delta: delta =(y-f(z));  deltak = delta*w(k)*fz(k-1)'
           3) clacu the delta in every layer: deltab=delta; deltaw=delta*fz(k-1)"""
        deltab = [np.zeros(b.shape) for b in self.bias]
        deltaw = [np.zeros(w.shape) for w in self.weight]
        zs = []
        activate = x
        activates = [x]
        for w,b in zip(self.weight,self.bias):
            z =np.dot(w, activate) +b
            zs.append(z)
            activate = sigmoid(z)
            activates.append(activate)
         # backprop
        delta = self.cost.delta(zs[-1],activates[-1],y)   #调用不同代价函数的方法求梯度
        deltab[-1] = delta
        deltaw[-1] = np.dot(delta ,activates[-2].transpose())
        for i in xrange(2,self.num_layer):
            z = zs[-i]
            delta = np.dot(self.weight[-i+1].transpose(),delta)* sig_derivate(z)
            deltab[-i] = delta
            deltaw[-i] = np.dot(delta,activates[-i-1].transpose())
        return (deltab,deltaw)

    def save(self,filename):
        """将训练好的网络采用json(java script object notation)将对象保存成字符串保存,用于生产部署
        encoder=json.dumps(data)
        python 原始类型(没有数组类型)向 json 类型的转化对照表:
         python              json
         dict               object
        list/tuple         arrary
        int/long/float     number
        .tolist() 将数组转化为列表
        >>> a = np.array([[1, 2], [3, 4]])
        >>> list(a)
        [array([1, 2]), array([3, 4])]
        >>> a.tolist()
        [[1, 2], [3, 4]]
        """
        data = {"sizes": self.sizes,"weight": [weight.tolist() for weight in self.weight],
                "bias": ([bias.tolist() for bias in self.bias]),
                "cost": str(self.cost.__name__)}
        # 保存网络训练好的权值,偏置,交叉熵参数。
        f = open(filename, "w")
        json.dump(data,f)
        f.close()

def load_net(filename):
    """采用data=json.load(json.dumps(data))进行解码,
    decoder = json.load(encoder)
    编码后和解码后键不会按照原始data的键顺序排列,但每个键对应的值不会变
    载入训练好的网络用于测试"""
    f = open(filename,"r")
    data = json.load(f)
    f.close()
    # print "data[cost]", getattr(sys.modules[__name__], data["cost"])#获得属性__main__.CrossEntropyCost
    # print "data[cost]", data["cost"], data["sizes"]
    net = Network(data["sizes"], cost=data["cost"])   #网络初始化
    net.weight = [np.array(w) for w in data["weight"]]  #赋予训练好的权值,并将list--->array
    net.bias = [np.array(b) for b in data["bias"]]
    return net

def sig_derivate(z):
    """derivate sigmoid"""
    return sigmoid(z) * (1-sigmoid(z))

def sigmoid(x):
    sigm=1.0/(1.0+exp(-x))
    return sigm

def vectors(y):
    """赋予标签"""
    label = np.zeros((10,1))
    label[y] = 1.0 #浮点计算
    return label

3) 网络测试

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2017-03-12 15:24
# @Author  : CC
# @File    : net_test.py

import net_load_data
# net_load_data.load_data()
train_data,validation_data,test_data = net_load_data.data_transform()

import net_network2 as net
cost = net.QuadraticCost
cost = net.CrossEntroyCost
lambd = 0
net1 = net.Network([784,50,10],cost)
min_batch_size = 30
eta = 3.0
epoches = 2
net1.SGD(train_data,min_batch_size,epoches,eta,test_data,
         lambd,
         monitor_train_cost=True,
         monitor_train_accuracy=True,
         monitor_test_cost=True,
         monitor_test_accuracy=True
         )
print "complete"

4 调用训练好的网络进行测试

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2017-03-28 17:27
# @Author  : CC
# @File    : forward_test.py

import numpy as np
# 对训练好的网络直接进行调用,并用测试样本进行测试
import net_load_data   #导入测试数据
import net_network2 as net
train_data,validation_data,test_data = net_load_data.data_transform()
net = net.load_net(filename= "net_save")        #导入网络
output = [(np.argmax(net.forward(x)),y) for (x,y) in test_data]  #测试
print sum(int(y1 == y2) for (y1,y2) in output)     #输出最终值

以上就是python构建深度神经网络(DNN)续的全部内容了,更多内容请关注:CPP学习网_CPP大学

本文固定链接:CPP学习网_CPP大学-python构建深度神经网络(DNN)续