diff --git a/census/lowlevel-tf/README.md b/census/lowlevel-tf/README.md index 271d3f793..0407226cf 100644 --- a/census/lowlevel-tf/README.md +++ b/census/lowlevel-tf/README.md @@ -15,6 +15,10 @@ on Google Cloud Storage: * Training file is `adult.data.csv` * Evaluation file is `adult.test.csv` +### Disclaimer +The source of this dataset is from a third party. Google provides no representation, +warranty, or other guarantees about the validity or any other aspects of this dataset. + ``` export CENSUS_DATA=census_data export TRAIN_FILE=adult.data.csv @@ -62,10 +66,10 @@ export OUTPUT_DIR=census_output ``` ``` -python trainer/task.py --train_data_path $CENSUS_DATA/$TRAIN_FILE \ - --eval_data_path $CENSUS_DATA/$EVAL_FILE \ - --output_dir $OUTPUT_DIR - [--max_steps $MAX_STEPS] +python trainer/task.py --train-data-paths $CENSUS_DATA/$TRAIN_FILE \ + --eval-data-paths $CENSUS_DATA/$EVAL_FILE \ + --job-dir $OUTPUT_DIR + [--max-steps $MAX_STEPS] ``` ### Using gcloud local @@ -81,9 +85,9 @@ export OUTPUT_DIR=census_output gcloud beta ml local train --package-path trainer \ --module-name trainer.task \ -- \ - --train_data_path $CENSUS_DATA/$TRAIN_FILE \ - --eval_data_path $CENSUS_DATA/$EVAL_FILE \ - --output_dir $OUTPUT_DIR + --train-data-paths $CENSUS_DATA/$TRAIN_FILE \ + --eval-data-paths $CENSUS_DATA/$EVAL_FILE \ + --job-dir $OUTPUT_DIR ``` ### Using Cloud ML Engine @@ -91,7 +95,6 @@ Run the code on Cloud ML Engine using `gcloud`: ``` export GCS_JOB_DIR=gs:///path/to/my/jobs/job3 -export GCS_OUTPUT_DIR=gs:///path/to/my/models/run3 ``` ``` @@ -102,9 +105,8 @@ gcloud beta ml jobs submit training $JOB_NAME \ --package-path trainer/ \ --region us-central1 \ -- \ - --train_data_path $TRAIN_GCS_FILE \ - --eval_data_path $EVAL_GCS_FILE \ - --output_dir $GCS_OUTPUT_DIR + --train-data-paths $TRAIN_GCS_FILE \ + --eval-data-paths $EVAL_GCS_FILE ``` ## Accuracy and Output You should see the output for default number of training steps and approx accuracy close to `80.25%`. @@ -157,10 +159,10 @@ gcloud beta ml local train --package-path trainer \ --worker-count $WORKER_COUNT \ --distributed \ -- \ - --train_data_path $CENSUS_DATA/$TRAIN_FILE \ - --eval_data_path $CENSUS_DATA/$EVAL_FILE \ - --max_steps $MAX_STEPS \ - --output_dir $OUTPUT_DIR + --train-data-paths $CENSUS_DATA/$TRAIN_FILE \ + --eval-data-paths $CENSUS_DATA/$EVAL_FILE \ + --max-steps $MAX_STEPS \ + --job-dir $OUTPUT_DIR ``` ### Using Cloud ML Engine @@ -169,7 +171,6 @@ Run the distributed training code on cloud using `gcloud`. ``` export SCALE_TIER=STANDARD_1 export GCS_JOB_DIR=gs:///path/to/my/models/run3 -export GCS_OUTPUT_DIR=gs:///path/to/my/models/run3 ``` ``` @@ -181,7 +182,35 @@ gcloud beta ml jobs submit training $JOB_NAME \ --package-path trainer/ \ --region us-central1 \ -- \ - --train_data_path $TRAIN_GCS_FILE \ - --eval_data_path $EVAL_GCS_FILE \ - --output_dir $GCS_OUTPUT_DIR + --train-data-paths $TRAIN_GCS_FILE \ + --eval-data-paths $EVAL_GCS_FILE +``` + +# Hyperparameter Tuning +Cloud ML Engine allows you to perform Hyperparameter tuning to find out the +most optimal hyperparameters. See [Overview of Hyperparameter Tuning] +(https://cloud.google.com/ml/docs/concepts/hyperparameter-tuning-overview) for more details. + +## Running Hyperparameter Job + +Running Hyperparameter job is almost exactly same as Training job except that +you need to add the `--config` argument. + +``` +export HPTUNING_CONFIG=hptuning_config.yaml +``` + +``` +gcloud beta ml jobs submit training $JOB_NAME \ + --scale-tier $SCALE_TIER \ + --runtime-version 1.0 \ + --config $HPTUNING_CONFIG \ + --job-dir $GCS_JOB_DIR \ + --module-name trainer.task \ + --package-path trainer/ \ + --region us-central1 \ + -- \ + --train-data-paths $TRAIN_GCS_FILE \ + --eval-data-paths $EVAL_GCS_FILE \ + --max-steps $MAX_STEPS ``` diff --git a/census/lowlevel-tf/hptuning_config.yaml b/census/lowlevel-tf/hptuning_config.yaml new file mode 100644 index 000000000..8d6c96d0f --- /dev/null +++ b/census/lowlevel-tf/hptuning_config.yaml @@ -0,0 +1,22 @@ +trainingInput: + hyperparameters: + goal: MAXIMIZE + hyperparameterMetricTag: accuracy + maxTrials: 4 + maxParallelTrials: 2 + params: + - parameterName: first_layer_size + type: INTEGER + minValue: 50 + maxValue: 500 + scaleType: UNIT_LINEAR_SCALE + - parameterName: num_layers + type: INTEGER + minValue: 1 + maxValue: 15 + scaleType: UNIT_LINEAR_SCALE + - parameterName: scale_factor + type: DOUBLE + minValue: 0.1 + maxValue: 1.0 + scaleType: UNIT_REVERSE_LOG_SCALE diff --git a/census/lowlevel-tf/trainer/model.py b/census/lowlevel-tf/trainer/model.py index d6960980e..cc8b15e43 100644 --- a/census/lowlevel-tf/trainer/model.py +++ b/census/lowlevel-tf/trainer/model.py @@ -35,58 +35,50 @@ [''], [0.], [0.], [0.], [''], ['']] # Categorical columns with vocab size -HASH_BUCKET_COLS = (('education', 16), ('marital_status', 7), +CATEGORICAL_COLS = (('education', 16), ('marital_status', 7), ('relationship', 6), ('workclass', 9), ('occupation', 15), - ('native_country', 42)) -KEY_COLS = (('gender', ('female', 'male')), ('race', ('Amer-Indian-Eskimo', - 'Asian-Pac-Islander', - 'Black', - 'Other', - 'White'))) - + ('native_country', 42), ('gender', 2), ('race', 5)) CONTINUOUS_COLS = ('age', 'education_num', 'capital_gain', 'capital_loss', 'hours_per_week') -CATEGORICAL_COLS = HASH_BUCKET_COLS + tuple((col, len(keys)) for col, keys in KEY_COLS) LABELS = [' <=50K', ' >50K'] LABEL_COLUMN = 'income_bracket' UNUSED_COLUMNS = set(CSV_COLUMNS) - set( zip(*CATEGORICAL_COLS)[0] + CONTINUOUS_COLS + (LABEL_COLUMN,)) +TRAIN, EVAL, PREDICT = 'TRAIN', 'EVAL', 'PREDICT' + -# Graph creation section for training and evaluation -def model_fn(features, +def model_fn(mode, + features, labels, hidden_units=[100, 70, 50, 20], - learning_rate=0.5, - batch_size=40): + learning_rate=0.1): """Create a Feed forward network classification network Args: - input_x (tf.placeholder): Feature placeholder input + mode (string): Mode running training, evaluation or prediction + features (dict): Dictionary of input feature Tensors + labels (Tensor): Class label Tensor hidden_units (list): Hidden units - num_classes (int): Number of classes + learning_rate (float): Learning rate for the SGD Returns: Tuple (train_op, accuracy_op, global_step, predictions): Tuple containing training graph, accuracy graph, global step and predictions """ + label_values = tf.constant(LABELS) + # Convert categorical (string) values to one_hot values - for col, bucket_size in HASH_BUCKET_COLS: + for col, bucket_size in CATEGORICAL_COLS: features[col] = string_ops.string_to_hash_bucket_fast( features[col], bucket_size) - for col, keys in KEY_COLS: - table = tf.contrib.lookup.string_to_index_table_from_tensor( - tf.constant(keys)) - features[col] = table.lookup(features[col]) - - for col, size in CATEGORICAL_COLS: features[col] = tf.squeeze(tf.one_hot( features[col], - size, + bucket_size, axis=1, dtype=tf.float32), axis=[2]) @@ -114,42 +106,102 @@ def model_fn(features, # Make predictions logits = curr_layer - probabilities = tf.nn.softmax(logits) - predictions = tf.argmax(probabilities, 1) - # Make labels a vector - labels = tf.squeeze(labels) + if mode in (PREDICT, EVAL): + probabilities = tf.nn.softmax(logits) + predicted_indices = tf.argmax(probabilities, 1) - # Build training operation. - global_step = tf.contrib.framework.get_or_create_global_step() - cross_entropy = tf.reduce_mean( - tf.nn.sparse_softmax_cross_entropy_with_logits( - logits=logits, labels=labels)) - train_op = tf.train.GradientDescentOptimizer(learning_rate).minimize( + if mode in (TRAIN, EVAL): + # Conver the string label column to indices + # Build a Hash Table inside the graph + table = tf.contrib.lookup.string_to_index_table_from_tensor( + label_values) + + # Use the hash table to convert string labels to ints + label_indices = table.lookup(labels) + + # Make labels a vector + label_indices_vector = tf.squeeze(label_indices) + + # global_step is necessary in eval to correctly load the step + # of the checkpoint we are evaluating + global_step = tf.contrib.framework.get_or_create_global_step() + + if mode == PREDICT: + # Convert predicted_indices back into strings + return { + 'predictions': tf.gather(label_values, predicted_indices), + 'confidence': tf.gather(probabilities, predicted_indices) + } + + if mode == TRAIN: + # Build training operation. + cross_entropy = tf.reduce_mean( + tf.nn.sparse_softmax_cross_entropy_with_logits( + logits=logits, labels=label_indices_vector)) + train_op = tf.train.GradientDescentOptimizer(learning_rate).minimize( cross_entropy, global_step=global_step) + return train_op, global_step + + if mode == EVAL: + # Return accuracy and area under ROC curve metrics + # See https://en.wikipedia.org/wiki/Receiver_operating_characteristic + # See https://www.kaggle.com/wiki/AreaUnderCurve + return { + 'accuracy': tf.contrib.metrics.streaming_accuracy( + predicted_indices, label_indices), + 'auroc': tf.contrib.metrics.streaming_auc(predicted_indices, label_indices) + } + + +def build_serving_inputs(mode, default_batch_size=None): + if mode == 'CSV': + placeholders = {'csv_row': tf.placeholder( + shape=[default_batch_size], + dtype=tf.string + )} + features = parse_csv(placeholders['csv_row']) + features.pop(LABEL_COLUMN) + else: + feature_spec = {} + for feat in CONTINUOUS_COLS: + feature_spec[feat] = tf.FixedLenFeature(shape=[], dtype=tf.float32) + + for feat, _ in CATEGORICAL_COLS: + feature_spec[feat] = tf.FixedLenFeature(shape=[], dtype=tf.string) + + tf_record = tf.placeholder( + shape=[default_batch_size], + dtype=tf.string, + name='tf_record' + ) + feature_scalars = tf.parse_example(tf_record, feature_spec) + features = { + key: tf.expand_dims(tensor, -1) + for key, tensor in feature_scalars.iteritems() + } + if mode == 'TF_RECORD': + placeholders = {'tf_record': tf_record} + else: + placeholders = feature_scalars - accuracy_op = tf.reduce_mean(tf.to_float(tf.equal(predictions, labels))) + return features, placeholders - return train_op, accuracy_op, global_step, predictions +def parse_csv(rows_string_tensor): + """Takes the string input tensor and returns a dict of rank-2 tensors.""" -def parse_label_column(label_string_tensor): - """Parses a string tensor into the label tensor - Args: - label_string_tensor: Tensor of dtype string. Result of parsing the - CSV column specified by LABEL_COLUMN - Returns: - A Tensor of the same shape as label_string_tensor, should return - an int64 Tensor representing the label index for classification tasks, - and a float32 Tensor representing the value for a regression task. - """ - # Build a Hash Table inside the graph - table = tf.contrib.lookup.string_to_index_table_from_tensor( - tf.constant(LABELS)) + # Takes a rank-1 tensor and converts it into rank-2 tensor + # Example if the data is [1,2,3,4] then converts it into [[1],[2],[3],[4]] + row_columns = tf.expand_dims(rows_string_tensor, -1) + columns = tf.decode_csv(row_columns, record_defaults=CSV_COLUMN_DEFAULTS) + features = dict(zip(CSV_COLUMNS, columns)) - # Use the hash table to convert string labels to ints - return table.lookup(label_string_tensor) + # Remove unused columns + for col in UNUSED_COLUMNS: + features.pop(col) + return features def input_fn(filenames, @@ -158,6 +210,9 @@ def input_fn(filenames, skip_header_lines=0, batch_size=40): """Generates an input function for training or evaluation. + This uses the input pipeline based approach using file name queue + to read data so that entire data is not loaded in memory. + Args: filenames: [str] list of CSV files to read data from. num_epochs: int how many times through to read the data. @@ -179,17 +234,9 @@ def input_fn(filenames, _, rows = reader.read_up_to(filename_queue, num_records=batch_size) - # model_fn expects rank 2 tensors. - row_columns = tf.expand_dims(rows, -1) + features = parse_csv(rows) # Parse the CSV File - columns = tf.decode_csv(row_columns, record_defaults=CSV_COLUMN_DEFAULTS) - features = dict(zip(CSV_COLUMNS, columns)) - - # Remove unused columns - for col in UNUSED_COLUMNS: - features.pop(col) - if shuffle: # This operation builds up a buffer of rows so that, even between batches, # rows are fed to training in a suitably randomized order. @@ -200,6 +247,6 @@ def input_fn(filenames, min_after_dequeue=batch_size*2 + 1, num_threads=multiprocessing.cpu_count(), enqueue_many=True, + allow_smaller_final_batch=True ) - label_tensor = parse_label_column(features.pop(LABEL_COLUMN)) - return features, label_tensor + return features, features.pop(LABEL_COLUMN) diff --git a/census/lowlevel-tf/trainer/task.py b/census/lowlevel-tf/trainer/task.py index 0d85b0ffe..df6db6bea 100644 --- a/census/lowlevel-tf/trainer/task.py +++ b/census/lowlevel-tf/trainer/task.py @@ -21,6 +21,7 @@ import argparse import json import os +import threading import tensorflow as tf @@ -28,95 +29,298 @@ tf.logging.set_verbosity(tf.logging.INFO) -EVAL = 'EVAL' -TRAIN = 'TRAIN' + +class EvaluationRunHook(tf.train.SessionRunHook): + """EvaluationRunHook performs continuous evaluation of the model.""" + def __init__(self, + checkpoint_dir, + metric_dict, + graph, + eval_every_n_checkpoints=1, + eval_steps=None, + **kwargs): + + self._eval_steps = eval_steps + self._checkpoint_dir = checkpoint_dir + self._kwargs = kwargs + self._eval_every = eval_every_n_checkpoints + self._latest_checkpoint = None + self._checkpoints_since_eval = 0 + self._graph = graph + + with graph.as_default(): + value_dict, update_dict = tf.contrib.metrics.aggregate_metric_map( + metric_dict) + + self._summary_op = tf.summary.merge([ + tf.summary.scalar(name, value_op) + for name, value_op in value_dict.iteritems() + ]) + self._saver = tf.train.Saver() + self._gs = tf.contrib.framework.get_or_create_global_step() + self._final_ops_dict = value_dict + self._eval_ops = update_dict.values() + + self._eval_lock = threading.Lock() + self._checkpoint_lock = threading.Lock() + self._file_writer = tf.summary.FileWriter( + os.path.join(checkpoint_dir, 'eval'), graph=graph) + + def after_run(self, run_context, run_values): + # Always check for new checkpoints in case a single evaluation + # takes longer than checkpoint frequency and _eval_every is >1 + self._update_latest_checkpoint() + if self._eval_lock.acquire(False): + if self._checkpoints_since_eval > self._eval_every: + self._checkpoints_since_eval = 0 + self._run_eval() + + self._eval_lock.release() + + def _update_latest_checkpoint(self): + if self._checkpoint_lock.acquire(False): + latest = tf.train.latest_checkpoint(self._checkpoint_dir) + if not latest == self._latest_checkpoint: + self._checkpoints_since_eval += 1 + self._latest_checkpoint = latest + self._checkpoint_lock.release() + + def end(self, session): + """Called at then end of session to make sure we always evaluate.""" + self._update_latest_checkpoint() + self._eval_lock.acquire() + self._run_eval() + self._eval_lock.release() + + def _run_eval(self): + coord = tf.train.Coordinator(clean_stop_exception_types=( + tf.errors.CancelledError, tf.errors.OutOfRangeError)) + + with tf.Session(graph=self._graph) as session: + self._saver.restore(session, self._latest_checkpoint) + + session.run([ + tf.local_variables_initializer(), + tf.tables_initializer() + ]) + tf.train.start_queue_runners(coord=coord, sess=session) + train_step = session.run(self._gs) + + tf.logging.info('Starting Evaluation For Step: {}'.format(train_step)) + with coord.stop_on_exception(): + eval_step = 0 + while self._eval_steps is None or eval_step < self._eval_steps: + summaries, final_values, _ = session.run( + [self._summary_op, self._final_ops_dict, self._eval_ops]) + if eval_step % 100 == 0: + tf.logging.info("On Evaluation Step: {}".format(eval_step)) + eval_step += 1 + + self._file_writer.add_summary(summaries, global_step=train_step) + tf.logging.info(final_values) def run(target, is_chief, max_steps, - train_data_path, - eval_data_path, - output_dir, + job_dir, + train_data_paths, + eval_data_paths, train_batch_size, eval_batch_size, + learning_rate, + first_layer_size, + num_layers, + scale_factor, + eval_num_epochs=1, eval_every=100, - eval_steps=10, - learning_rate=0.1, + eval_steps=None, + eval_interval_secs=1, num_epochs=None): + """Run the training and evaluation graph. Args: target (string): Tensorflow server target is_chief (bool): Boolean flag to specify a chief server max_steps (int): Maximum training steps - train_data_path (string): List of CSV files to read train data - eval_data_path (string): List of CSV files to read eval data - output_dir (string): Output directory for model and checkpoint + job_dir (string): Output dir for checkpoint and summary + train_data_paths (string): List of CSV files to read train data + eval_data_paths (string): List of CSV files to read eval data train_batch_size (int): Batch size for training eval_batch_size (int): Batch size for evaluation + learning_rate (float): Learning rate for Gradient Descent + first_layer_size (int): Size of the first DNN layer + num_layers (int): Number of hidden layers in the DNN + scale_factor (float): Decay rate for the size of hidden layers eval_every (int): Run evaluation frequency eval_steps (int): Eval steps - learning_rate (float): Learning rate for Gradient Descent num_epochs (int): Number of epochs """ - training_eval_graph = tf.Graph() - with training_eval_graph.as_default(): + + # Calculate the number of hidden units + hidden_units=[ + max(2, int(first_layer_size * scale_factor**i)) + for i in range(num_layers) + ] + + # If the server is chief which is `master` + if is_chief: + tf.logging.info("Created DNN hidden units {}".format(hidden_units)) + evaluation_graph = tf.Graph() + with evaluation_graph.as_default(): + + # Features and label tensors + features, labels = model.input_fn( + eval_data_paths, + num_epochs=eval_num_epochs, + batch_size=eval_batch_size, + shuffle=False + ) + # Accuracy and AUROC metrics + metric_dict = model.model_fn( + model.EVAL, + features, + labels, + hidden_units=hidden_units, + learning_rate=learning_rate + ) + + hooks = [EvaluationRunHook( + job_dir, + metric_dict, + evaluation_graph, + eval_steps=eval_steps, + )] + + # Create a new graph and specify that as default + with tf.Graph().as_default(): + # Placement of ops on devices using replica device setter + # which automatically places the parameters on the `ps` server + # and the `ops` on the workers with tf.device(tf.train.replica_device_setter()): - mode = tf.placeholder(shape=[], dtype=tf.string) - eval_features, eval_label = model.input_fn( - eval_data_path, shuffle=False, batch_size=eval_batch_size) - train_features, train_label = model.input_fn( - train_data_path, num_epochs=num_epochs, batch_size=train_batch_size) - - is_train = tf.equal(mode, tf.constant(TRAIN)) - sorted_keys = train_features.keys() - sorted_keys.sort() - inputs = dict(zip( - sorted_keys, - tf.cond( - is_train, - lambda: [train_features[k] for k in sorted_keys], - lambda: [eval_features[k] for k in sorted_keys] - ) - )) - labels = tf.cond(is_train, lambda: train_label, lambda: eval_label) - train_op, accuracy_op, global_step_tensor, predictions = model.model_fn( - inputs, labels, learning_rate=learning_rate) + # Features and label tensors as read using filename queue + features, labels = model.input_fn( + train_data_paths, + num_epochs=num_epochs, + batch_size=train_batch_size + ) + + # Returns the training graph and global step tensor + train_op, global_step_tensor = model.model_fn( + model.TRAIN, + features, + labels, + hidden_units=hidden_units, + learning_rate=learning_rate + ) + + + # Creates a MonitoredSession for training + # https://www.tensorflow.org/api_docs/python/tf/train/MonitoredTrainingSession with tf.train.MonitoredTrainingSession(master=target, is_chief=is_chief, - checkpoint_dir=output_dir, - save_checkpoint_secs=20, + checkpoint_dir=job_dir, + hooks=hooks, + save_checkpoint_secs=2, save_summaries_steps=50) as session: + + # Tuple of exceptions that should cause a clean stop of the coordinator + # https://www.tensorflow.org/api_docs/python/train/coordinator_and_queuerunner#Coordinator coord = tf.train.Coordinator(clean_stop_exception_types=( tf.errors.CancelledError,)) + + # Important to start all queue runners so that data is available + # for reading tf.train.start_queue_runners(coord=coord, sess=session) - step = 0 - last_eval = 0 + + # Global step to keep track of global number of steps particularly in + # distributed setting + step = global_step_tensor.eval(session=session) + + # Run the training graph which returns the step number as tracked by + # the global step tensor with coord.stop_on_exception(): - while step < max_steps and not coord.should_stop(): - if is_chief and step - last_eval > eval_every: - last_eval = step - accuracies = [ - session.run(accuracy_op, feed_dict={mode: EVAL}) - for _ in range(eval_steps) - ] - accuracy = sum(accuracies) / eval_steps - print("Accuracy at step: {} is {:.2f}%".format(step, 100*accuracy)) - - step, _ = session.run( - [global_step_tensor, train_op], - feed_dict={mode: TRAIN} - ) + while (max_steps is None or step < max_steps) and not coord.should_stop(): + step, _ = session.run([global_step_tensor, train_op]) + + # Find the filename of the latest saved checkpoint file + latest_checkpoint = tf.train.latest_checkpoint(job_dir) + + for mode in ['CSV', 'TF_RECORD', 'JSON']: + build_and_run_exports(latest_checkpoint, + job_dir, + mode, + hidden_units, + learning_rate) + + +def build_and_run_exports(latest, job_dir, mode, hidden_units, learning_rate): + """Given the latest checkpoint file export the saved model. + + Args: + latest (string): Latest checkpoint file + job_dir (string): Location of checkpoints and model files + mode (string): Model export format + hidden_units (list): Number of hidden units + learning_rate (float): Learning rate for the SGD + """ + + prediction_graph = tf.Graph() + exporter = tf.saved_model.builder.SavedModelBuilder( + os.path.join(job_dir, 'exports', mode)) + with prediction_graph.as_default(): + features, placeholder_dict = model.build_serving_inputs(mode) + prediction_dict = model.model_fn( + model.PREDICT, + features, + None, # labels + hidden_units=hidden_units, + learning_rate=learning_rate + ) + saver = tf.train.Saver() + + placeholder_info = { + name: tf.saved_model.utils.build_tensor_info(tensor) + for name, tensor in placeholder_dict.iteritems() + } + output_info = { + name: tf.saved_model.utils.build_tensor_info(tensor) + for name, tensor in prediction_dict.iteritems() + } + signature_def = tf.saved_model.signature_def_utils.build_signature_def( + inputs=placeholder_info, + outputs=output_info, + method_name=tf.saved_model.signature_constants.PREDICT_METHOD_NAME + ) + + + with tf.Session(graph=prediction_graph) as session: + session.run([tf.local_variables_initializer(), tf.tables_initializer()]) + saver.restore(session, latest) + exporter.add_meta_graph_and_variables( + session, + tags=[tf.saved_model.tag_constants.SERVING], + signature_def_map={ + tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: signature_def + }, +# main_op=tf.tables_initializer() + ) + + exporter.save() def dispatch(*args, **kwargs): - """Parse TF_CONFIG to cluster_spec and call run().""" + """Parse TF_CONFIG to cluster_spec and call run() method + TF_CONFIG environment variable is available when running using + gcloud either locally or on cloud. It has all the information required + to create a ClusterSpec which is important for running distributed code. + """ tf_config = os.environ.get('TF_CONFIG') - # If TF_CONFIG not available run local + # If TF_CONFIG is not available run local if not tf_config: return run('', True, *args, **kwargs) @@ -135,6 +339,9 @@ def dispatch(*args, **kwargs): job_name=job_name, task_index=task_index) + # Wait for incoming connections forever + # Worker ships the graph to the ps server + # The ps server manages the parameters if job_name == 'ps': server.join() return @@ -145,30 +352,46 @@ def dispatch(*args, **kwargs): if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument('--train_data_path', + parser.add_argument('--train-data-paths', required=True, type=str, help='Training file location', nargs='+') - parser.add_argument('--eval_data_path', + parser.add_argument('--eval-data-paths', required=True, type=str, help='Evaluation file location', nargs='+') - parser.add_argument('--output_dir', + parser.add_argument('--job-dir', required=True, type=str, help='GCS or local dir to write checkpoints and export model') - parser.add_argument('--max_steps', + parser.add_argument('--max-steps', type=int, default=1000, help='Maximum number of training steps to perform') - parser.add_argument('--train_batch_size', + parser.add_argument('--train-batch-size', type=int, default=40, help='Batch size for training steps') - parser.add_argument('--eval_batch_size', + parser.add_argument('--eval-batch-size', type=int, default=40, help='Batch size for evaluation steps') + parser.add_argument('--learning-rate', + type=float, + default=0.1, + help='Learning rate for SGD') + parser.add_argument('--first-layer-size', + type=int, + default=100, + help='Number of nodes in the first layer of DNN') + parser.add_argument('--num-layers', + type=int, + default=4, + help='Number of layers in DNN') + parser.add_argument('--scale-factor', + type=float, + default=0.7, + help='Rate of decay size of layer for DNN') parse_args, unknown = parser.parse_known_args() dispatch(**parse_args.__dict__)