How to use TensorFlow reader and queue to read two file at same time?

Issue

My training set contains two kinds of file: training image with file name like “1.png” and label file with name like “1.label.txt”.

I found some usage of Queue and Reader in tutorials like this:

filename_queue = tf.train.string_input_producer(filenames)
result.key, value = reader.read(filename_queue)

However, because my training set contains two kinds of file, one correspond to one. How can I make use of Queue and Reader like code above?


EDIT

I am thinking about using one queue containing base names to feed to another two queue, which is image and label respectively. Code like this:

with tf.Session() as sess:
  base_name_queue = tf.train.string_input_producer(['image_names'], num_epochs=20)
  base_name = base_name_queue.dequeue()
  image_name = base_name + ".png"
  image_name_queue = data_flow_ops.FIFOQueue(32, image_name.dtype.base_dtype)
  image_name_queue.enqueue([image_name])
  x = image_name_queue.dequeue()
  print_op = tf.Print(image_name, [image_name])

  qr = tf.train.QueueRunner(base_name_queue, [base_name_queue] * 4)
  coord = tf.train.Coordinator()
  enqueue_threads = qr.create_threads(sess, coord=coord, start=True)

  for step in range(1000000):
    if coord.should_stop():
      break
    print(sess.run(print_op))

  coord.request_stop()
  coord.join(enqueue_threads)

But running this code would result in an error:

TypeError: Fetch argument of has invalid type , must be a string or Tensor. (Can not convert a FIFOQueue into a Tensor or Operation.)

and the error point to this line:

coord.join(enqueue_threads)

I think I must misunderstand how TensorFlow queue works.

Solution

I have figured out the solution to my problem. I would like post answer here instead of delete my question, hoping this will help people who is new to TensorFlow.

The answer contains two parts:

Part 1: How to read files pair by pair using TensorFlow’s queue

The solution is simple:

  1. Use 2 queue to store two set of files. Note that the two set should be ordered in the same way.
  2. Do some preprocessing respectively using dequeue.
  3. Combine two preprocessed tensor into one list and pass the list to shuffle_batch

Code here:

base_names = ['file1', 'file2']
base_tensor = tf.convert_to_tensor(base_names)
image_name_queue = tf.train.string_input_producer(
  tensor + '.png',
  shuffle=False # Note: must set shuffle to False
)
label_queue = tf.train.string_input_producer(
  tensor + '.lable.txt',
  shuffle=False # Note: must set shuffle to False
)

# use reader to read file
image_reader = tf.WholeFileReader()
image_key, image_raw = image_reader.read(image_name_queue)
image = tf.image.decode_png(image_raw)
label_reader = tf.WholeFileReader()
label_key, label_raw = label_reader.read(label_queue)
label = tf.image.decode_raw(label_raw)

# preprocess image
processed_image = tf.image.per_image_whitening(image)
batch = tf.train.shuffle_batch([processed_image, label], 10, 100, 100)

# print batch
queue_threads = queue_runner.start_queue_runners()
print(sess.run(batch))

Part 2: Queue, QueueRunner, Coordinator and helper functions

Queue is really a queue (seems meaningless). A queue has two method: enqueue and dequeue. The input of enqueue is Tensor (well, you can enqueue normal data, but it will be converted to Tensor internally). The return value of dequeue is a Tensor. So you can make pipeline of queues like this:

q1 = data_flow_ops.FIFOQueue(32, tf.int)
q2 = data_flow_ops.FIFOQueue(32, tf.int)
enq1 = q1.enqueue([1,2,3,4,5])
v1 = q1.dequeue()
enq2 = q2.enqueue(v1)

The benefit of using queue in TensorFlow is to asynchronously load data, which will improve performance and save memory. The code above is not runnable, because there is no thread running those operations. QueueRunner is designed to describe how to enqueue data in parallel. So the parameter of initializing QueueRunner is an enqueue operation (the output of enqueue).

After setting up all the QueueRunners, you have to start all the threads. One way is to start them when creating them:

enqueue_threads = qr.create_threads(sess, coord=coord, start=True)

or, you can start all threads after all the setting up works done:

# add queue runner
queue_runner.add_queue_runner(queue_runner.QueueRunner(q, [enq]))

# start all queue runners
queue_threads = queue_runner.start_queue_runners()

When all the threads started, you have to decide when to exit. Coordinator is here to do this. Coordinator is like a shared flag between all the running threads. if one of them finished or run into error, it will call coord.request_stop(), then all the thread will get True when calling coord.should_stop(). So the pattern of using Coordinator is:

coord = tf.train.Coordinator()

for step in range(1000000):
  if coord.should_stop():
    break
  print(sess.run(print_op))

coord.request_stop()
coord.join(enqueue_threads)

Answered By – Da Tong

This Answer collected from stackoverflow, is licensed under cc by-sa 2.5 , cc by-sa 3.0 and cc by-sa 4.0

Leave a Reply

(*) Required, Your email will not be published