Tensorflow多线程输入数据处理框架

Tensorflow提供了一系列的对图像进行预处理的方法,但是复杂的预处理过程会减慢整个训练过程,所以,为了避免图像的预处理成为训练神经网络效率的瓶颈,Tensorflow提供了多线程处理输入数据的框架!

下面将一一阐述多线程处理输入数据的理论及Tensorflow的源代码!

1 队列和多线程

首先来介绍Tensorflow中定义队列的方式,在Tensorflow中,可以定义FIFOQueue和RandomShuffleQueue两种类型的队列,FIFO顾名思义是先入先出型队列,而RandomShuffleQueue就是随机读出的操作,RandomShufleQueue操作在训练过程中的作用很大,可以随机的从数据集中取出数据,提高鲁棒性

tensorflow中,队列和变量类似,都是计算图上有状态的节点,其他的状态节点可以修改他们的状态,对于一个变量来说,赋值操作可以修改变量的取值,而对于队列来说,修改队列的相应状态主要有三个方法:

1Enquene Enqueue的操作相当于进入队列的操作,也就是想要将数据插入到队列中时,调用此方法,假如已经定义好队列q,那么调用q.enqueue(([55]))就可以将数据写入队列

2 EnqueneMany q.EnqueneMany方法是一次性写入多个数据!举例:q.EnqueneMany(([34,55]))

3 Dequeue 此方法相当于从队列中调出数据的操作, x = q.dequeue()

队列的作用不仅仅是一种数据结构,它可以提供一种多线程的机制,提高数据的读取存储速度,比如多线程可以同时向一个队列中写元素,或者同时从一个队列中读取元素,多线程并行处理数据,提高数据预处理的速度就是Tensorflow提供的解决上述瓶颈问题的方案!

既然是多线程处理,那么必须需要某个交通指挥官来协调处理多线程的工作,没错,协调-coordinator,tensorflow提供了一个tf.Coordinator类和tf.QueueRunner类来完成多线程工作状态的管理和启动!

Coordinator提供了三个函数:1 should_stop 2 request_stop 3 join 来进行线程的管理操作,在每个线程启动之前,都需要先定义一个coordinator类然后当做参数传入到线程里面去,表明接管本线程的所有操作,线程在启动运行的过程中不断的查询should_stop的状态,一旦发现should_stop变成True,那么该线程就必须得退出,同样,每一个启动的线程都可以调用request_stop函数来通知其他线程退出,下面给出一段使用这两个类来操作线程的代码:

 1 import tensorflow as tf 
 2 import numpy as np 
 3 import threading 
 4 import time 
 5 
 6 
 7 
 8 def MyLooP(coord,worker_id):
 9 
10     while not coord.should_stop():
11 
12         if np.random.rand() < 0.1 
13         print("Stoping form id: %d\n" %worker_id)
14         coord.request_stop()
15 
16     else :
17         print("Working on id : %d\n" %worker_id)
18         time.sleep(1)
19 
20 
21 coord = tf.train.coordinator()
22 
23 threads = [ threading.Thread(target = MyLooP,args=(coord,i,)) for i in xrange(5)]
24 
25 for t in threads: t.start()
26 
27 coord.join(threads) #等待所有线程退出
28 
29 #运行以上程序,得到结果如下:
30 #working on id : 0
31 #working on id : 1
32 #working on id : 2
33 #working on id : 4
34 #working on id : 3
35 #working on id : 0
36 #stoping on id : 4
37 #working on id : 1

如上面程序描述的,首先定义了一个MyLoop函数,接受coord和workerid两个参数,当coord.should_stop != True的时候,产生一个随机数,如果随机数小于0.1,那么对应的这个进程就会去申请request_stop()函数,通知其他的所有进程结束!

否则的话就打印当前进程的workerid,接下来定义了协调器类的实例coordinator()和五个进程,进程id分别是0-4,之后启动所有的进程等待某个进程的request_stop(),最后调用一个coord.join()来等待所有进程合理的退出,从运行的结果中我们

我们可以看出,working on id :0,1,2,4,3五个进程分别启动完成,然后重新进入MyLoop中继续执行,然后working on id:0, 突然进程4在执行的过程中它随机生成数小于0.1,这导致进程4会去申请request.stop(),从而导致五个进程查询should_stop全部变成了True,于是5个进程便全部结束自身进程,可以看到最后一行仍然有working on id:1打印出来,实际上是因为在进程4申请Request_stop的过程中,进程1已经完成了判断进入到了下一条else中,所以它仍然可以打印出来,但是再一次循环之前查询should_stop的时候就执行结束进程的相关操作了!

未完待续.......