tensorpack中的QueueInput()函数实现多线程数据输入
发布时间:2023-12-23 07:30:11
tensorpack是一个基于TensorFlow的高性能数据输入工具,能够实现多线程数据输入。QueueInput()函数是tensorpack中用来将多线程数据输入操作封装起来的函数之一。下面将对QueueInput()函数进行详细介绍,并提供一个使用例子。
QueueInput()函数的定义如下:
def QueueInput(
input_queues, size, dynamic_pad=False, dynamic_padding=False,
queue_runner=True, prefetch=True, multiprocessing=False, dtypes=None,
names=None):
'''
Args:
input_queues(list): a list of input source. Each element of input_queues should be either a Queue object or a EnqueueThread object, which defines the input source.
size(int): size of the queue from which minimibatch is popping
dynamic_pad, dynamic_padding(bool): see below
queue_runner(bool): whether to start a thread to run the queue.
prefetch(bool): whether to enable prefetching. See prefetch_data for details.
multiprocessing(bool): whether to enable multiprocessing. See PrefetchDataZMQ for details.
dtypes(list): target dtypes of each column. If not provided, will use the dtypes from input_queues.
names(list): names for each column, see below.
Returns:
a QueueInput object that produces batches by popping from the input queue.
QueueInput is designed for the typical multi-threaded data pipeline.
It does the following things to smooth the whole input pipeline:
1. It starts N dataflow threads, each runs a DataFlow object.
2. Each DataFlow object puts datapoints into an input queue.
3. The input queue is usually backed by a tf.RandomShuffleQueue. But when dynamic_padding=True, it uses a custom
DynamicPaddingQueue to handle padding more efficiently.
4. A separating thread runs a QueueInput op to read tensors from the input queue.
The separating thread also starts an EnqueueThread shadow thread for each DataFlow.
QueueInput returns a QueueInput object, which is a kind of Tensor.
The PrefetchData object it setups is accessible via the .input_setup attribute.
and the actual data tensors can be accessed via .input_tensors.
It also creates a number of threads that deploys the EnqueueThread to run the DataFlow objects.
They do data processing parallel to the training, and you can set their priority using thread.daemon attribute.
Random :class:tf.Tensor producer.
Under the hood, it uses several :class:tf.QueueBase to do the work. The return value can be directly used in :meth:tf.train.QueueRunner.
'''
QueueInput()函数使用一个input_queues参数来定义输入源, input_queues可以是一个Queue对象或者一个EnqueueThread对象的列表。Queue对象是TensorFlow中的队列,而EnqueueThread对象是队列输入器,可以看作是一个线程,用于将数据放入队列。
QueueInput()函数还有一些可选的参数,如size用于指定队列大小,dynamic_pad和dynamic_padding用于动态填充数据。
下面是一个使用QueueInput()函数的例子,该例子使用了两个线程的数据输入。首先定义了一个EnqueueThread对象,它用于将数据放入队列中。然后使用QueueInput()函数将队列封装为一个Tensor对象,最后通过Session.run()调用来获取数据。
import tensorflow as tf
from tensorpack import QueueInput, \
BatchData, dataset
def get_data():
return [[1], [2], [3], [4], [5], [6], [7], [8], [9], [10]]
def main():
# 创建一个EnqueueThread对象,用于将数据放入队列中
class MyDataEnqueueThread(tf.train.Coordinator, tf.train.Enqueuer):
def __init__(self, sess, coord=None):
tf.train.Coordinator.__init__(self, coord)
tf.train.Enqueuer.__init__(self, get_data(), tf.float32, size=10)
self.sess = sess
def enqueue(self):
self.start()
while not self.coord.should_stop():
sess.run(self.enqueue_ops)
def close(self):
self.coord.request_stop()
self.coord.join(self.thread)
sess = tf.Session()
coord = tf.train.Coordinator()
# 创建一个QueueInput对象,将队列封装为Tensor对象
queue_input = QueueInput([MyDataEnqueueThread(sess, coord)], size=10)
data_flow = BatchData(queue_input, 2)
with tf.train.MonitoredSession() as sess:
while not sess.should_stop():
data = sess.run(data_flow.get_data())
print(data)
coord.request_stop()
coord.join()
if __name__ == '__main__':
main()
上述例子中,首先定义了一个自定义的EnqueueThread对象MyDataEnqueueThread,用于将数据放入队列中。然后通过QueueInput()函数将队列封装为一个Tensor对象queue_input。最后通过Session.run()调用来获取数据,数据会通过data_flow.get_data()方法返回。在每次调用sess.run(data_flow.get_data())时,数据会以batch_size的大小从队列中取出。
