2017-07-24 11 views
4

Ich versuche herauszufinden, wie ich meine Input-Pipeline für Tensorflow in verteilten Training einrichten. Es ist nicht klar, ob die Leser von einem einzigen Prozess lesen und die Daten an alle Arbeiter senden oder jeder Server startet seine eigene Eingangspipeline? Wie stellen wir sicher, dass jeder Mitarbeiter einen anderen Input erhält?Tensorflow Eingangspipeline für verteiltes Training

+0

, wenn Sie eine der Standardbeispiele von Google folgen, jeder Arbeiter hat einen eigenen Leser –

+0

Bitte lesen Sie die Beschreibungen der von Ihnen verwendeten Tags. "ML" bezieht sich auf die Programmiersprache. – molbdnilo

Antwort

0

Ich will ein Beispiel geben, wie ich es tun:

import tensorflow as tf 
batch_size = 50 
task_index = 2 
num_workers = 10 
input_pattern = "gs://backet/dir/part-00*" 

alle Namen von Dateien auf den heißen Stein, die task_index zu input_pattern

files_names = tf.train.match_filenames_once(
       input_pattern, name = "myFiles") 

wählen Sie die Namen für Arbeitnehmer entsprechen erhalten. tf.strided_slice ist wie Slice für Listen: a [::, task_index] (wählen Sie jede task_index te Datei für Arbeiter task_index)

to_process = tf.strided_slice(files_names, [task_index], 
       [999999999], strides=[num_workers]) 
filename_queue = tf.train.string_input_producer(to_process, 
        shuffle=True, #shufle files 
        num_epochs=num_epochs) 

reader = tf.TextLineReader() 
_ , value = reader.read(filename_queue) 
col1,col2 = tf.decode_csv(value, 
     record_defaults=[[1],[1]], field_delim="\t") 

train_inputs, train_labels = tf.train.shuffle_batch([col1,[col2]], 
     batch_size=batch_size, 
     capacity=50*batch_size, 
     num_threads=10, 
     min_after_dequeue = 10*batch_size, 
     allow_smaller_final_batch = True) 

loss = f(...,train_inputs, train_labels) 
optimizer = ... 

with tf.train.MonitoredTrainingSession(...) as mon_sess: 
    coord = tf.train.Coordinator() 
    with coord.stop_on_exception(): 
     _ = tf.train.start_queue_runners(sess = mon_sess, coord=coord) 
     while not coord.should_stop() and not mon_sess.should_stop(): 
      optimizer.run() 

Ich bin nicht sicher, ob meine Methode die beste Art und Weise eingegeben Pipeline bei verteilt zu implementieren TensorFlow Implementierung, da jeder Arbeiter liest Namen aller Dateien auf dem heißen Stein


Guter Vortrag über Eingang Pipeline in TensorFlow: http://web.stanford.edu/class/cs20si/lectures/notes_09.pdf

Verwandte Themen