使用tensorflow 构建rnn网络

使用tensorflow实现了简单的rnn网络用来学习加法运算。

tensorflow 版本:1.1

import tensorflow as tf
from tensorflow.contrib import rnn

class RNN():
    def __init__(self, input_dim , hidden_dim , step_num , class_num,learning_rate):
        # # tf Graph input
        self.x = tf.placeholder("float", [None, step_num, input_dim])
        self.y = tf.placeholder("float", [None, class_num])
        # Define weights
        weights = {
            'out': tf.Variable(tf.random_normal([hidden_dim, hidden_dim])),
            'sigout':tf.Variable(tf.random_normal([hidden_dim , class_num]))
        }
        biases = {
            'out': tf.Variable(tf.random_normal([hidden_dim])),
            'sigout':tf.Variable(tf.random_normal([class_num]))
        }
        # Unstack to get a list of 'step_num' tensors of shape (batch_size, input_dim)
        x_unstack = tf.unstack(self.x, step_num, 1)
        # Define a lstm cell with tensorflow
        lstm_cell = rnn.BasicLSTMCell(hidden_dim, forget_bias=1.0)

        # Get lstm cell output
        outputs, states = rnn.static_rnn(lstm_cell, x_unstack, dtype=tf.float32)

        # Linear activation, using rnn inner loop last output
        pred = tf.matmul(outputs[-1], weights['out']) + biases['out']
        sigmodout = tf.matmul(tf.nn.sigmoid(pred , 'relu') , weights['sigout']) + biases['sigout']
        self.predict = tf.round(sigmodout)
        correct_pred = tf.equal(self.predict , self.y)
        self.accuracy = tf.reduce_mean(tf.cast(correct_pred, tf.float32))
        cost = tf.reduce_sum(tf.abs(tf.subtract(sigmodout , self.y)))
        self.loss = cost
        tf.summary.scalar('acc' , self.accuracy)
        tf.summary.scalar('cost' , cost)
        self.merge_all = tf.summary.merge_all()

import random
import numpy as np
class DataProcess():
    def __init__(self):
        self.point = 0
        self.max_integer = 10000
        self.max_len = len(self.filter(bin(self.max_integer))) + 1
    def nextBatch(self,batch_size , fake = False ,is_test = False):
        x_batch = []
        y_batch = []
        t_batch = []
        for i in range( 0 , batch_size):
            a = random.randint(0,self.max_integer)
            b = random.randint(0,self.max_integer)
            if fake:
                a = 1
                b = 1
            c = a + b
            abin = self.process(a)
            bbin = self.process(b)
            cbin = self.process(c)
            xa = np.array(abin)
            xb = np.array(bbin)
            x_ = np.concatenate((xa , xb),axis= 0)
            y_ = np.array(cbin)
            x_batch.append(x_.reshape(2,self.max_len))
            y_batch.append(cbin)
            if is_test:
                temp = []
                temp.append(a)
                temp.append(b)
                t_batch.append(temp)
        # if not is_test:
        return np.array(x_batch) , np.array(y_batch),np.array(t_batch)
        # else:
        #     return np.array(x_batch) , np.array(y_batch)

    def process(self , str):
        bstr = bin(str)
        bstr = self.filter(bstr)
        bstr = self.completion(bstr)
        return bstr[::-1]

    def filter(self , bstr):
        return bstr.replace('0b' , '')

    def completion(self , bstr):
        lst = []
        for num in list(bstr):
            lst.append(int(num))
        length = len(bstr)
        for i in range(length , self.max_len):
            lst.insert(0,0)

        return[ (float)(i) for i in lst]
import tensorflow as tf
import time
import os
from src.bitplus.Process import DataProcess
from src.bitplus.rnn import RNN

dp = DataProcess()
tf.app.flags.DEFINE_float("learning_rate",0.01,'learning rate')
tf.app.flags.DEFINE_integer("training_iters",500000,'')
tf.app.flags.DEFINE_integer("batch_size",100,'')
tf.app.flags.DEFINE_integer("display_step",10,'')
tf.app.flags.DEFINE_integer("input_dim" , 2,'')
tf.app.flags.DEFINE_integer("steps_num" , dp.max_len,'')
tf.app.flags.DEFINE_integer("hidden_dim", dp.max_len * 2 ,'')
tf.app.flags.DEFINE_integer("class_num" , dp.max_len,'')
tf.app.flags.DEFINE_string("rnn_model",'rnn.model','')
FLAGS = tf.app.flags.FLAGS

logfolder = time.strftime("%Y%m%d_%H%M%S", time.localtime())


def train():
    rnn = RNN(FLAGS.input_dim , FLAGS.hidden_dim ,FLAGS.steps_num , FLAGS.class_num , FLAGS.learning_rate)
    optimizer = tf.train.AdamOptimizer(learning_rate=FLAGS.learning_rate).minimize(rnn.loss)
    init = tf.global_variables_initializer()
    sess = tf.Session()
    sess.run(init)
    train_writer = tf.summary.FileWriter('../LOGS/'+logfolder,sess.graph)
    step = 1
    while step  * FLAGS.batch_size < FLAGS.training_iters:
        batch_x, batch_y ,_= dp.nextBatch(FLAGS.batch_size , False)
        batch_x = batch_x.transpose((0, 2, 1))
        # Run optimization op (backprop)
        sess.run(optimizer, feed_dict={rnn.x: batch_x, rnn.y: batch_y})
        if step % FLAGS.display_step == 0:
            # Calculate batch loss
            merge_all , loss , acc,predict= sess.run([rnn.merge_all ,rnn.loss ,rnn.accuracy,rnn.predict],feed_dict={rnn.x: batch_x, rnn.y: batch_y})
            print("Iter " + str(step*FLAGS.batch_size) + ", Minibatch Loss= " + \
                  "{:.6f}".format(loss) + ", Training Accuracy= " + \
                  "{:.5f}".format(acc))
            train_writer.add_summary(merge_all , step)
        step += 1

    print("Optimization Finished!")
    train_writer.close()
    saver = tf.train.Saver(tf.all_variables())
    if not os.path.exists('model'):
        os.mkdir('model')
    saver.save(sess, './model/' + FLAGS.rnn_model)
def tModel(model):
    rnn = RNN(FLAGS.input_dim , FLAGS.hidden_dim ,FLAGS.steps_num , FLAGS.class_num , FLAGS.learning_rate)
    saver = tf.train.Saver()
    init = tf.global_variables_initializer()
    sess = tf.Session()
    sess.run(init)
    saver.restore(sess,model)
    batch_x , batch_y , batch_test = dp.nextBatch(100 , False,True)
    batch_x = batch_x.transpose((0, 2, 1))
    predict = sess.run([rnn.predict],feed_dict={rnn.x: batch_x, rnn.y: batch_y})

    for ndx in range(len(predict[0])):
        print('%s + %s = %d(%d)'%(batch_test[ndx][0] , batch_test[ndx][1] , batch_test[ndx][0] + batch_test[ndx][1],bin2Ten(predict[0][ndx])))
    print('pause!')
def bin2Ten(bin):
    lst = []
    for i in bin[::-1]:
        lst.append(str(int(i)))
    return int(''.join(lst) , base=2)
def main(_):
    print('start...')
    print('max len:' , dp.max_len)
    # train()
    tModel('./model/' + FLAGS.rnn_model)
if __name__ == '__main__':
    tf.app.run()