2017-05-29 4 views
0

Ich lege Elemente in einer bestimmten Reihenfolge auf einen TensorFlow FIFOQueue und erwarte, dass sie in der gleichen Reihenfolge aus der Warteschlange entfernt werden können, aber das ist nicht das Verhalten, das ich beobachte.TensorFlow FIFOQueue nicht FIFO?

Das Ausführen des folgenden eigenständigen Codes zeigt die Vorgehensweise und das Verhalten. Dies wurde mit Python 2.7 (aber könnte in Python 3 funktionieren) mit TensorFlow 1.1 ausgeführt.

from __future__ import division, print_function, unicode_literals 
import math 
import numpy 
import tensorflow as tf 
from tensorflow.python.training import queue_runner 
from tensorflow.python.ops import control_flow_ops 

row_count, column_count = 7, 5 
batch_size, step_size = 3, 2 

# Create some random data 
data = numpy.arange(row_count * column_count).reshape(
    (row_count, column_count)) 
print(data) 

batch_count = int(math.ceil(row_count/batch_size)) 
step_count = int(math.ceil(column_count/step_size)) 
print(batch_count, step_count) 

slices = tf.train.slice_input_producer([data], num_epochs=1, shuffle=False) 
batch = tf.train.batch(slices, batch_size, allow_smaller_final_batch=True) 

queue = tf.FIFOQueue(32, dtypes=[batch.dtype]) 
enqueue_ops = [] 
dependency = None 

for step_index in range(step_count): 
    step = tf.strided_slice(
     batch, [0, step_index * step_size], 
     [tf.shape(batch)[0], (step_index + 1) * step_size]) 

    if dependency is None: 
     dependency = step 
    else: 
     step = control_flow_ops.with_dependencies([dependency], step) 

    enqueue_ops.append(queue.enqueue(step)) 

queue_runner.add_queue_runner(queue_runner.QueueRunner(
    queue=queue, enqueue_ops=[tf.group(*enqueue_ops)])) 
step = queue.dequeue() 

supervisor = tf.train.Supervisor() 

with supervisor.managed_session() as session: 
    for batch_index in range(batch_count): 
     for step_index in range(step_count): 
      print("Batch %d, step %d" % (batch_index, step_index)) 
      print(session.run(step)) 

die erwartete Ausgabe ist

Batch 0, step 0 
[[ 0 1] 
[ 5 6] 
[10 11]] 
Batch 0, step 1 
[[ 2 3] 
[ 7 8] 
[12 13]] 
Batch 0, step 2 
[[ 4] 
[ 9] 
[14]] 
Batch 1, step 0 
[[15 16] 
[20 21] 
[25 26]] 
Batch 1, step 1 
[[17 18] 
[22 23] 
[27 28]] 
Batch 1, step 2 
[[19] 
[24] 
[29]] 
Batch 2, step 0 
[[30 31]] 
Batch 2, step 1 
[[32 33]] 
Batch 2, step 2 
[[34]] 

die tatsächliche Ausgabe ist

Batch 0, step 0 
[[ 0 1] 
[ 5 6] 
[10 11]] 
Batch 0, step 1 
[[ 4] 
[ 9] 
[14]] 
Batch 0, step 2 
[[ 2 3] 
[ 7 8] 
[12 13]] 
Batch 1, step 0 
[[15 16] 
[20 21] 
[25 26]] 
Batch 1, step 1 
[[19] 
[24] 
[29]] 
Batch 1, step 2 
[[17 18] 
[22 23] 
[27 28]] 
Batch 2, step 0 
[[30 31]] 
Batch 2, step 1 
[[32 33]] 
Batch 2, step 2 
[[34]] 

beachte, dass die Reihenfolge der Schritte in den Reihen 0 und 1 nicht korrekt ist. Ich konnte die Reihenfolge der Schritte nicht bestimmen. Es scheint, dass die Chargen immer in der richtigen Reihenfolge sind, aber die Schritte innerhalb jeder Charge kommen in "zufälliger" Reihenfolge heraus: sie erscheint deterministisch, aber nicht FIFO.

Ich habe versucht mit und ohne die expliziten Abhängigkeitsdeklarationen wie im obigen Code verwendet. Ich habe versucht, die Warteschlangenkapazität auf 1 zu setzen. Ich habe versucht, enqueue_ops=enqueue_ops statt tf.group zu setzen, aber keine dieser Änderungen half und das letzte verursachte sehr seltsame Ausgabe.

Vielleicht ignoriert tf.group irgendwie Abhängigkeiten?

Antwort

0

Es erscheint tensorflow.python.ops.control_flow_ops.with_dependencies funktioniert nicht, wie ich dachte, oder ich habe es falsch verwendet. Wenn ich auf die Verwendung tf.control_dependencies stattdessen wechseln, habe ich das Verhalten, das ich brauche:

from __future__ import division, print_function, unicode_literals 
import math 
import numpy 
import tensorflow as tf 
from tensorflow.python.training import queue_runner 

row_count, column_count = 7, 5 
batch_size, step_size = 3, 2 

# Create some random data 
data = numpy.arange(row_count * column_count).reshape(
    (row_count, column_count)) 
print(data) 

batch_count = int(math.ceil(row_count/batch_size)) 
step_count = int(math.ceil(column_count/step_size)) 
print(batch_count, step_count) 

slices = tf.train.slice_input_producer([data], num_epochs=1, shuffle=False) 
batch = tf.train.batch(slices, batch_size, allow_smaller_final_batch=True) 

queue = tf.FIFOQueue(32, dtypes=[batch.dtype]) 
enqueue_ops = [] 
dependency = None 

for step_index in range(step_count): 
    step = tf.strided_slice(
     batch, [0, step_index * step_size], 
     [tf.shape(batch)[0], (step_index + 1) * step_size]) 

    if dependency is None: 
     dependency = queue.enqueue(step) 
    else: 
     with tf.control_dependencies([dependency]): 
      step = queue.enqueue(step) 
      dependency = step 

    enqueue_ops.append(step) 

queue_runner.add_queue_runner(queue_runner.QueueRunner(
    queue=queue, enqueue_ops=[tf.group(*enqueue_ops)])) 
step = queue.dequeue() 

supervisor = tf.train.Supervisor() 

with supervisor.managed_session() as session: 
    for batch_index in range(batch_count): 
     for step_index in range(step_count): 
      print("Batch %d, step %d" % (batch_index, step_index)) 
      print(session.run(step)) 

Diese Antwort des answer to another SO question motiviert war.

Verwandte Themen