欢迎访问宙启技术站
智能推送

tensorpack中的QueueInput()函数实现多线程数据输入

发布时间:2023-12-23 07:30:11

tensorpack是一个基于TensorFlow的高性能数据输入工具,能够实现多线程数据输入。QueueInput()函数是tensorpack中用来将多线程数据输入操作封装起来的函数之一。下面将对QueueInput()函数进行详细介绍,并提供一个使用例子。

QueueInput()函数的定义如下:

def QueueInput(
        input_queues, size, dynamic_pad=False, dynamic_padding=False,
        queue_runner=True, prefetch=True, multiprocessing=False, dtypes=None,
        names=None):
    '''
    Args:
        input_queues(list): a list of input source. Each element of input_queues should be either a Queue object or a EnqueueThread object, which defines the input source.
        size(int): size of the queue from which minimibatch is popping
        dynamic_pad, dynamic_padding(bool): see below
        queue_runner(bool): whether to start a thread to run the queue.
        prefetch(bool): whether to enable prefetching. See prefetch_data for details.
        multiprocessing(bool): whether to enable multiprocessing. See PrefetchDataZMQ for details.
        dtypes(list): target dtypes of each column. If not provided, will use the dtypes from input_queues.
        names(list): names for each column, see below.

    Returns:
        a QueueInput object that produces batches by popping from the input queue.

    QueueInput is designed for the typical multi-threaded data pipeline.
    It does the following things to smooth the whole input pipeline:
    1. It starts N dataflow threads, each runs a DataFlow object.
    2. Each DataFlow object puts datapoints into an input queue.
    3. The input queue is usually backed by a tf.RandomShuffleQueue. But when dynamic_padding=True, it uses a custom
        DynamicPaddingQueue to handle padding more efficiently.
    4. A separating thread runs a QueueInput op to read tensors from the input queue.
       The separating thread also starts an EnqueueThread shadow thread for each DataFlow.

    QueueInput returns a QueueInput object, which is a kind of Tensor.

    The PrefetchData object it setups is accessible via the .input_setup attribute.
    and the actual data tensors can be accessed via .input_tensors.

    It also creates a number of threads that deploys the EnqueueThread to run the DataFlow objects.
    They do data processing parallel to the training, and you can set their priority using thread.daemon attribute.

    Random :class:tf.Tensor producer.

    Under the hood, it uses several :class:tf.QueueBase to do the work. The return value can be directly used in :meth:tf.train.QueueRunner.
    '''

QueueInput()函数使用一个input_queues参数来定义输入源, input_queues可以是一个Queue对象或者一个EnqueueThread对象的列表。Queue对象是TensorFlow中的队列,而EnqueueThread对象是队列输入器,可以看作是一个线程,用于将数据放入队列。

QueueInput()函数还有一些可选的参数,如size用于指定队列大小,dynamic_pad和dynamic_padding用于动态填充数据。

下面是一个使用QueueInput()函数的例子,该例子使用了两个线程的数据输入。首先定义了一个EnqueueThread对象,它用于将数据放入队列中。然后使用QueueInput()函数将队列封装为一个Tensor对象,最后通过Session.run()调用来获取数据。

import tensorflow as tf
from tensorpack import QueueInput, \
    BatchData, dataset

def get_data():
    return [[1], [2], [3], [4], [5], [6], [7], [8], [9], [10]]

def main():
    # 创建一个EnqueueThread对象,用于将数据放入队列中
    class MyDataEnqueueThread(tf.train.Coordinator, tf.train.Enqueuer):
        def __init__(self, sess, coord=None):
            tf.train.Coordinator.__init__(self, coord)
            tf.train.Enqueuer.__init__(self, get_data(), tf.float32, size=10)
            self.sess = sess

        def enqueue(self):
            self.start()
            while not self.coord.should_stop():
                sess.run(self.enqueue_ops)
        def close(self):
            self.coord.request_stop()
            self.coord.join(self.thread)

    sess = tf.Session()
    coord = tf.train.Coordinator()

    # 创建一个QueueInput对象,将队列封装为Tensor对象
    queue_input = QueueInput([MyDataEnqueueThread(sess, coord)], size=10)
    data_flow = BatchData(queue_input, 2)

    with tf.train.MonitoredSession() as sess:
        while not sess.should_stop():
            data = sess.run(data_flow.get_data())
            print(data)

    coord.request_stop()
    coord.join()

if __name__ == '__main__':
    main()

上述例子中,首先定义了一个自定义的EnqueueThread对象MyDataEnqueueThread,用于将数据放入队列中。然后通过QueueInput()函数将队列封装为一个Tensor对象queue_input。最后通过Session.run()调用来获取数据,数据会通过data_flow.get_data()方法返回。在每次调用sess.run(data_flow.get_data())时,数据会以batch_size的大小从队列中取出。