0

Ich möchte ein Convolutional Neural Network auf MNIST auf eine verteilte Weise mit Tensorflow High-Level-Apis trainieren. Ich habe versucht, eine Clusterkonfiguration anzugeben und sie an einen Estimator weiterzuleiten (Code unten).Verteiltes Training von tf.learn Estimators?

Ich erhalte die folgenden Fehler Parameter zu MergeFrom() Instanz derselben Klasse sein muss: erwartete tensorflow.ConfigProto Eigenschaft bekam

Wer weiß, was in, wie falsch ich die Konfiguration am spezifizieren?

from __future__ import absolute_import 
from __future__ import division 
from __future__ import print_function 


import grpc 
import numpy as np 
import tensorflow as tf 
from tensorflow.contrib import learn 
from tensorflow.contrib.learn.python.learn.estimators import model_fn as model_fn_lib 
from tensorflow.contrib.learn.python.learn.estimators import run_config as run_config_lib 
from tensorflow.python import debug as tf_debug 
tf.logging.set_verbosity(tf.logging.ERROR) 
import json 
import os 
import shutil 

### Data - Mnist 

mnist=learn.datasets.load_dataset('mnist') 
train_data=mnist.train.images 
train_labels=np.asarray(mnist.train.labels, dtype=np.int32) 
eval_data=mnist.test.images 
eval_labels=np.asarray(mnist.test.labels, dtype=np.int32) 

BATCH_SIZE=100 
NUM_EPOCHS=10 
train_input_fn = learn.io.numpy_input_fn({'x': train_data}, train_labels, shuffle=True, batch_size=BATCH_SIZE, 
             num_epochs=NUM_EPOCHS) 
batch_size = 100 
num_epochs = 1 
eval_input_fn = learn.io.numpy_input_fn({'x': eval_data}, eval_labels, shuffle=False, batch_size=batch_size, num_epochs=num_epochs) 

### Cluster 

my_cluster = {'ps': ['/cpu:0'], 
       'worker': ['/gpu:0']} 
os.environ['TF_CONFIG'] = json.dumps(
      {'cluster': my_cluster, 
      'task': {'type': 'worker', 'index': 1}}) 

my_configs=learn.RunConfig() 

server = tf.train.Server(server_or_cluster_def=my_configs.cluster_spec, job_name='worker') 

### Model 

def cnn_model_fn(features, labels, mode): 

    input_layer=tf.reshape(features['x'],shape=[-1,28,28,1]) 

    #conv1 
    conv1=tf.layers.conv2d(inputs=input_layer, 
          filters=32, 
          kernel_size=[5, 5], 
          padding='same', 
          activation=tf.nn.relu) 
    pool1=tf.layers.max_pooling2d(inputs=conv1, pool_size=[2,2], strides=2) 

    #conv2 
    conv2=tf.layers.conv2d(inputs=pool1, 
          filters=64, 
          kernel_size=[5,5], 
          padding='same', 
          activation=tf.nn.relu) 
    pool2=tf.layers.max_pooling2d(inputs=conv2, pool_size=[2,2], strides=2) 

    #fully connected layers 
    pool2_flat=tf.reshape(pool2, [-1, 7*7*64]) 
    dense1=tf.layers.dense(pool2_flat, 1024, activation=tf.nn.relu) 
    dropout = tf.layers.dropout(inputs=dense1, rate=0.4, training=mode == learn.ModeKeys.TRAIN) 

    #fc2 
    logits=tf.layers.dense(dropout, 10, activation=tf.nn.relu) 
    loss = None 
    train_op = None 

    #loss 
    if mode != learn.ModeKeys.INFER: 
     onehot_labels=tf.one_hot(indices=tf.cast(labels, tf.int32),depth=10) 
     loss=tf.losses.softmax_cross_entropy(onehot_labels=onehot_labels, logits=logits) 

    #optimizer 
    if mode == learn.ModeKeys.TRAIN: 
     with tf.device("/job:worker/task:1"): 
      train_op = tf.contrib.layers.optimize_loss(
      loss=loss, 
      global_step=tf.contrib.framework.get_global_step(), 
      learning_rate=0.0001, 
      optimizer="Adam") 

    #predictions 
    predictions={ 
      'classes': tf.argmax(logits, axis=1) , 
      'predictions': tf.nn.softmax(logits,name="softmax_tensor")   
     } 
    return model_fn_lib.ModelFnOps(mode=mode, predictions=predictions, loss=loss, train_op=train_op) 

classifier=learn.Estimator(model_fn=cnn_model_fn, model_dir="/tmp/mnist_distributed", config=my_configs) 

### logging 

tensors_to_log = {"probabilities": "softmax_tensor"} 
logging_hook = tf.train.LoggingTensorHook(tensors=tensors_to_log, every_n_iter=50) 

### Metrics 

metrics = { 
    "accuracy": 
     learn.MetricSpec(
      metric_fn=tf.metrics.accuracy, prediction_key="classes"), 
} 

### Distributing training 

distributed_experiment=learn.Experiment(estimator=classifier, 
       train_input_fn=train_input_fn, 
       eval_input_fn=eval_input_fn, 
       eval_metrics=metrics, 
       #train_monitors=my_monitors, 
       train_steps=200, 
       ) 

distributed_experiment.train_and_evaluate() 

Antwort

0

my_config sollte eine Instanz von RunConfig statt RunConfig selbst sein. Wenn Sie RunConfig init starten, wird die Konfiguration von ps, workers und task von der Umgebungsvariablen TF_CONFIG geladen. https://www.tensorflow.org/api_docs/python/tf/contrib/learn/RunConfig

+0

Klasseninstanziierung behoben. Danke, ich bin jetzt in der Lage, einen Server zu starten, aber der obige Code stoppt immer noch, wenn ich renne distributed_experiment.train() Siehst du noch etwas, das ich vermisse? – smh

+0

@smh Können Sie Protokolle bereitstellen? Hast du nicht 1 PS und 1 Arbeiter? – BoscoTsang

+0

Muss ich einen anderen Server als PS starten? Ich bin schlecht definiert den Cluster oder die Art, wie Training in der cnn_model_fn verteilt wird, aber nicht herausgefunden haben, wie es zu beheben. Dies sind die Protokolle, die ich bekomme ** distributed_experiment.train() ** hält: 'DEBUG: tensorflow: Festlegen von Feature-Informationen zu {'x': TensorSignature (dtype = tf.float32, shape = TensorShape ([Dimension (Keine), Dimension (784)]), is_sparse = False)}. DEBUG: tensorflow: Setzen von Etiketteninformationen auf TensorSignature (dtype = tf.int32, shape = TensorShape ([Dimension (Keine)]), is_sparse = False) INFO: Tensorflow: Erstellen Sie CheckpointSaverHook. – smh

1

Wenn u eine verteilte Schätzer in TF ausgeführt werden soll, gibt es eine Instanz:

from tensorflow.contrib.learn.python.learn import learn_runner 
from tensorflow.contrib.learn.python.learn.estimators import run_config 

... 

learn_runner.run(
    experiment_fn=create_experiment_fn(config), 
    output_dir=output_dir) 

Und die 'experiment_fn' hier ist nur die 'distributed_experiment' in Ihrem Code. Es sollte auch ein "output_dir" in Ihrem Experiment sein

Verwandte Themen