From 65582937c7f7125c2489853b0ee54371a589b715 Mon Sep 17 00:00:00 2001 From: Eli Bixby Date: Tue, 28 Feb 2017 19:53:03 -0800 Subject: [PATCH 01/22] Attempt at continuous eval --- census/lowlevel-tf/trainer/model.py | 130 ++++++++++++++-------- census/lowlevel-tf/trainer/task.py | 162 ++++++++++++++++++++-------- 2 files changed, 204 insertions(+), 88 deletions(-) diff --git a/census/lowlevel-tf/trainer/model.py b/census/lowlevel-tf/trainer/model.py index 12baed53d..43249236c 100644 --- a/census/lowlevel-tf/trainer/model.py +++ b/census/lowlevel-tf/trainer/model.py @@ -55,13 +55,14 @@ UNUSED_COLUMNS = set(CSV_COLUMNS) - set( zip(*CATEGORICAL_COLS)[0] + CONTINUOUS_COLS + (LABEL_COLUMN,)) +TRAIN, EVAL, PREDICT = 'TRAIN', 'EVAL', 'PREDICT' -# Graph creation section for training and evaluation -def model_fn(features, + +def model_fn(mode, + features, labels, hidden_units=[100, 70, 50, 20], - learning_rate=0.5, - batch_size=40): + learning_rate=0.5): """Create a Feed forward network classification network Args: @@ -72,6 +73,8 @@ def model_fn(features, Returns: """ + label_values = tf.constant(LABELS) + # Convert categorical (string) values to one_hot values for col, bucket_size in HASH_BUCKET_COLS: features[col] = string_ops.string_to_hash_bucket_fast( @@ -113,43 +116,91 @@ def model_fn(features, # Make predictions logits = curr_layer - probabilities = tf.nn.softmax(logits) - predictions = tf.argmax(probabilities, 1) - # Make labels a vector - labels = tf.squeeze(labels) + if mode in (PREDICT, EVAL): + probabilities = tf.nn.softmax(logits) + predicted_indices = tf.argmax(probabilities, 1) - # Build training operation. - global_step = tf.contrib.framework.get_or_create_global_step() - cross_entropy = tf.reduce_mean( - tf.nn.sparse_softmax_cross_entropy_with_logits( - logits=logits, labels=labels)) - train_op = tf.train.GradientDescentOptimizer(learning_rate).minimize( + if mode in (TRAIN, EVAL): + # Conver the string label column to indices + # Build a Hash Table inside the graph + table = tf.contrib.lookup.string_to_index_table_from_tensor( + label_values) + # Use the hash table to convert string labels to ints + label_indices = table.lookup(labels) + # Make labels a vector + label_indices_vector = tf.squeeze(label_indices) + + if mode == PREDICT: + # Convert predicted_indices back into strings + return { + 'predictions': tf.gather(predicted_indices, label_values), + 'confidence': tf.gather(predicted_indices, probabilities) + } + + if mode == TRAIN: + # Build training operation. + cross_entropy = tf.reduce_mean( + tf.nn.sparse_softmax_cross_entropy_with_logits( + logits=logits, labels=label_indices_vector)) + global_step = tf.contrib.framework.get_or_create_global_step() + train_op = tf.train.GradientDescentOptimizer(learning_rate).minimize( cross_entropy, global_step=global_step) + return train_op, global_step + + if mode == EVAL: + return { + 'accuracy': tf.contrib.metrics.streaming_accuracy( + predicted_indices, label_indices), + 'auroc': tf.contrib.metrics.streaming_auc(predicted_indices, label_indices) + } + + +def build_serving_inputs(mode, default_batch_size=None): + if mode == 'CSV': + placeholders = [tf.placeholder( + shape=[default_batch_size], + dtype=tf.string, + name='csv_rows' + )] + features = parse_csv(placeholders[0]) + else: + feature_spec = {} + for feat in CONTINUOUS_COLS: + feature_spec[feat] = tf.train.FixedLenFeature([], tf.int32) + + for feat, _ in CATEGORICAL_COLS: + feature_spec[feat] = tf.train.FixedLenFeature([], tf.string) + + tf_record = tf.placeholder( + shape=[default_batch_size], + dtype=tf.string, + name='tf_record' + ) + feature_scalars = tf.parse_example(tf_record, feature_spec) + features = { + key: tf.expand_dims(tensor, -1) + for key, tensor in feature_scalars.iteritems() + } + if mode == 'TF_RECORD': + placeholders = [tf_record] + else: + placeholders = feature_scalars.values() - accuracy_op = tf.reduce_mean(tf.to_float(tf.equal(predictions, labels))) - - return train_op, accuracy_op, global_step, predictions - + return features, placeholders -def parse_label_column(label_string_tensor): - """Parses a string tensor into the label tensor - Args: - label_string_tensor: Tensor of dtype string. Result of parsing the - CSV column specified by LABEL_COLUMN - Returns: - A Tensor of the same shape as label_string_tensor, should return - an int64 Tensor representing the label index for classification tasks, - and a float32 Tensor representing the value for a regression task. - """ - # Build a Hash Table inside the graph - table = tf.contrib.lookup.string_to_index_table_from_tensor( - tf.constant(LABELS)) - # Use the hash table to convert string labels to ints - return table.lookup(label_string_tensor) +def parse_csv(rows_string_tensor): + # model_fn expects rank 2 tensors. + row_columns = tf.expand_dims(rows_string_tensor, -1) + columns = tf.decode_csv(row_columns, record_defaults=CSV_COLUMN_DEFAULTS) + features = dict(zip(CSV_COLUMNS, columns)) + # Remove unused columns + for col in UNUSED_COLUMNS: + features.pop(col) + return features def input_fn(filenames, num_epochs=None, @@ -178,17 +229,9 @@ def input_fn(filenames, _, rows = reader.read_up_to(filename_queue, num_records=batch_size) - # model_fn expects rank 2 tensors. - row_columns = tf.expand_dims(rows, -1) + features = parse_csv(rows) # Parse the CSV File - columns = tf.decode_csv(row_columns, record_defaults=CSV_COLUMN_DEFAULTS) - features = dict(zip(CSV_COLUMNS, columns)) - - # Remove unused columns - for col in UNUSED_COLUMNS: - features.pop(col) - if shuffle: # This operation builds up a buffer of rows so that, even between batches, # rows are fed to training in a suitably randomized order. @@ -200,5 +243,4 @@ def input_fn(filenames, num_threads=multiprocessing.cpu_count(), enqueue_many=True, ) - label_tensor = parse_label_column(features.pop(LABEL_COLUMN)) - return features, label_tensor + return features, features.pop(LABEL_COLUMN) diff --git a/census/lowlevel-tf/trainer/task.py b/census/lowlevel-tf/trainer/task.py index f507bdce4..4650de8b0 100644 --- a/census/lowlevel-tf/trainer/task.py +++ b/census/lowlevel-tf/trainer/task.py @@ -28,72 +28,146 @@ tf.logging.set_verbosity(tf.logging.INFO) -EVAL = 'EVAL' -TRAIN = 'TRAIN' + +class EvalRepeatedlyHook(tf.train.SessionRunHook): + + def __init__(self, + checkpoint_dir, + metric_dict, + graph, + eval_every_n_checkpoints=1, + eval_steps=None, + **kwargs): + + self._eval_steps = eval_steps + self._checkpoint_dir = checkpoint_dir + self._kwargs = kwargs + self._eval_every = eval_every_n_checkpoints + self._last_checkpoint = None + self._checkpoints_since_eval = 0 + self._graph = graph + with graph.as_default(): + value_dict, update_dict = tf.contrib.metrics.aggregate_metric_map( + metric_dict) + + self._summary_op = tf.summary.merge([ + tf.summary.scalar(name, value_op) + for name, value_op in value_dict.iteritems() + ]) + + self._final_ops_dict = value_dict + self._eval_ops = update_dict.values() + + self._file_writer = tf.summary.FileWriter(checkpoint_dir, graph=graph) + + def after_run(self, run_context, run_values): + latest = tf.train.latest_checkpoint(self._checkpoint_dir) + if not latest == self._last_checkpoint: + self._checkpoints_since_eval += 1 + + if not self._checkpoints_since_eval > self._eval_every: + return + else: + self._checkpoints_since_eval = 0 + + with self._graph.as_default(): + with tf.Session() as session: + session.run([tf.local_variables_initializer(), tf.tables_initializer()]) + saver = tf.train.Saver() + saver.restore(session, latest) + + + coord = tf.train.Coordinator(clean_stop_exception_types=( + tf.errors.CancelledError,)) + tf.train.start_queue_runners(coord=coord, sess=session) + with coord.stop_on_exception(): + step = 0 + while (self._eval_steps is None + or step < self._eval_steps) and not coord.should_stop(): + session.run(self._eval_ops) + step += 1 + if step % 100 == 0: + tf.logging.info("On Evaluation Step: {}".format(step)) + + gs = session.run(tf.contrib.framework.get_or_create_global_step()) + summaries = session.run(self._summary_op) + self._file_writer.add_summary(summaries, global_step=gs) + tf.logging.info(session.run(self._final_ops_dict)) def run(target, is_chief, max_steps, + output_dir, train_data_path, eval_data_path, - output_dir, - train_batch_size, - eval_batch_size, + train_batch_size=40, + eval_batch_size=40, + eval_num_epochs=1, eval_every=100, - eval_steps=10, + hidden_units=[100, 70, 50, 20], + eval_steps=None, + eval_interval_secs=1, learning_rate=0.1, num_epochs=None): - training_eval_graph = tf.Graph() - with training_eval_graph.as_default(): + if is_chief: + evaluation_graph = tf.Graph() + with evaluation_graph.as_default(): + features, labels = model.input_fn( + eval_data_path, + num_epochs=None, + batch_size=eval_batch_size, + shuffle=False + ) + + metric_dict = model.model_fn( + model.EVAL, + features, + labels, + hidden_units=hidden_units, + learning_rate=learning_rate + ) + hooks = [EvalRepeatedlyHook( + output_dir, + metric_dict, + evaluation_graph, + eval_steps=eval_steps, + )] + else: + hooks = [] + + with tf.Graph().as_default(): with tf.device(tf.train.replica_device_setter()): - mode = tf.placeholder(shape=[], dtype=tf.string) - eval_features, eval_label = model.input_fn( - eval_data_path, shuffle=False, batch_size=eval_batch_size) - train_features, train_label = model.input_fn( - train_data_path, num_epochs=num_epochs, batch_size=train_batch_size) - - is_train = tf.equal(mode, tf.constant(TRAIN)) - sorted_keys = train_features.keys() - sorted_keys.sort() - inputs = dict(zip( - sorted_keys, - tf.cond( - is_train, - lambda: [train_features[k] for k in sorted_keys], - lambda: [eval_features[k] for k in sorted_keys] - ) - )) - labels = tf.cond(is_train, lambda: train_label, lambda: eval_label) - train_op, accuracy_op, global_step_tensor, predictions = model.model_fn( - inputs, labels, learning_rate=learning_rate) + features, labels = model.input_fn( + train_data_path, + num_epochs=num_epochs, + batch_size=train_batch_size + ) + + train_op, global_step_tensor = model.model_fn( + model.TRAIN, + features, + labels, + hidden_units=hidden_units, + learning_rate=learning_rate + ) + with tf.train.MonitoredTrainingSession(master=target, is_chief=is_chief, checkpoint_dir=output_dir, + hooks=hooks, save_checkpoint_secs=20, save_summaries_steps=50) as session: coord = tf.train.Coordinator(clean_stop_exception_types=( tf.errors.CancelledError,)) tf.train.start_queue_runners(coord=coord, sess=session) - step = 0 - last_eval = 0 + step = global_step_tensor.eval(session=session) with coord.stop_on_exception(): - while step < max_steps and not coord.should_stop(): - if is_chief and step - last_eval > eval_every: - last_eval = step - accuracies = [ - session.run(accuracy_op, feed_dict={mode: EVAL}) - for _ in range(eval_steps) - ] - accuracy = sum(accuracies) / eval_steps - print("Accuracy at step: {} is {:.2f}%".format(step, 100*accuracy)) - - step, _ = session.run( - [global_step_tensor, train_op], - feed_dict={mode: TRAIN} - ) + while (max_steps is None or step < max_steps) and not coord.should_stop(): + step, _ = session.run([global_step_tensor, train_op]) + def dispatch(*args, **kwargs): From a443083dcc56bc4773ca28776cad71631990795b Mon Sep 17 00:00:00 2001 From: Eli Bixby Date: Wed, 1 Mar 2017 11:03:48 -0800 Subject: [PATCH 02/22] Fix continuous evaluation --- census/lowlevel-tf/trainer/model.py | 1 + census/lowlevel-tf/trainer/task.py | 59 ++++++++++++++++++----------- 2 files changed, 37 insertions(+), 23 deletions(-) diff --git a/census/lowlevel-tf/trainer/model.py b/census/lowlevel-tf/trainer/model.py index 43249236c..6eb8e0676 100644 --- a/census/lowlevel-tf/trainer/model.py +++ b/census/lowlevel-tf/trainer/model.py @@ -242,5 +242,6 @@ def input_fn(filenames, min_after_dequeue=batch_size*2 + 1, num_threads=multiprocessing.cpu_count(), enqueue_many=True, + allow_smaller_final_batch=True ) return features, features.pop(LABEL_COLUMN) diff --git a/census/lowlevel-tf/trainer/task.py b/census/lowlevel-tf/trainer/task.py index 4650de8b0..3d187f8de 100644 --- a/census/lowlevel-tf/trainer/task.py +++ b/census/lowlevel-tf/trainer/task.py @@ -21,6 +21,7 @@ import argparse import json import os +import threading import tensorflow as tf @@ -58,42 +59,54 @@ def __init__(self, self._final_ops_dict = value_dict self._eval_ops = update_dict.values() + self._eval_lock = threading.Lock() self._file_writer = tf.summary.FileWriter(checkpoint_dir, graph=graph) def after_run(self, run_context, run_values): + if not self._eval_lock.acquire(False): + return + latest = tf.train.latest_checkpoint(self._checkpoint_dir) if not latest == self._last_checkpoint: self._checkpoints_since_eval += 1 - if not self._checkpoints_since_eval > self._eval_every: - return - else: + if self._checkpoints_since_eval > self._eval_every: self._checkpoints_since_eval = 0 + self._run_eval(latest) + + self._eval_lock.release() + def end(self, session): + # Block to ensure we always eval at the end + self._eval_lock.acquire() + latest = tf.train.latest_checkpoint(self._checkpoint_dir) + self._run_eval(latest) + self._eval_lock.release() + + def _run_eval(self, latest): with self._graph.as_default(): + gs = tf.contrib.framework.get_or_create_global_step() + saver = tf.train.Saver() + coord = tf.train.Coordinator(clean_stop_exception_types=( + tf.errors.CancelledError, tf.errors.OutOfRangeError)) + with tf.Session() as session: session.run([tf.local_variables_initializer(), tf.tables_initializer()]) - saver = tf.train.Saver() - saver.restore(session, latest) - - - coord = tf.train.Coordinator(clean_stop_exception_types=( - tf.errors.CancelledError,)) tf.train.start_queue_runners(coord=coord, sess=session) + saver.restore(session, latest) + train_step = session.run(gs) + tf.logging.info('Starting Evaluation For Step: {}'.format(train_step)) with coord.stop_on_exception(): - step = 0 - while (self._eval_steps is None - or step < self._eval_steps) and not coord.should_stop(): - session.run(self._eval_ops) - step += 1 - if step % 100 == 0: - tf.logging.info("On Evaluation Step: {}".format(step)) - - gs = session.run(tf.contrib.framework.get_or_create_global_step()) - summaries = session.run(self._summary_op) - self._file_writer.add_summary(summaries, global_step=gs) - tf.logging.info(session.run(self._final_ops_dict)) + eval_step = 0 + while self._eval_steps is None or eval_step < self._eval_steps: + summaries, final_values, _ = session.run( + [self._summary_op, self._final_ops_dict, self._eval_ops]) + if eval_step % 100 == 0: + tf.logging.info("On Evaluation Step: {}".format(eval_step)) + eval_step += 1 + self._file_writer.add_summary(summaries, global_step=train_step) + tf.logging.info(final_values) def run(target, is_chief, @@ -116,7 +129,7 @@ def run(target, with evaluation_graph.as_default(): features, labels = model.input_fn( eval_data_path, - num_epochs=None, + num_epochs=eval_num_epochs, batch_size=eval_batch_size, shuffle=False ) @@ -158,7 +171,7 @@ def run(target, is_chief=is_chief, checkpoint_dir=output_dir, hooks=hooks, - save_checkpoint_secs=20, + save_checkpoint_secs=2, save_summaries_steps=50) as session: coord = tf.train.Coordinator(clean_stop_exception_types=( tf.errors.CancelledError,)) From a1863fa6ca1bdfa29d0447694db3312019a40e7a Mon Sep 17 00:00:00 2001 From: Eli Bixby Date: Wed, 1 Mar 2017 16:34:42 -0800 Subject: [PATCH 03/22] Fix ExportRepeatedlyHook to write out correct step --- census/lowlevel-tf/trainer/model.py | 5 +- census/lowlevel-tf/trainer/task.py | 86 ++++++++++++++++------------- 2 files changed, 52 insertions(+), 39 deletions(-) diff --git a/census/lowlevel-tf/trainer/model.py b/census/lowlevel-tf/trainer/model.py index 6eb8e0676..d32f24da9 100644 --- a/census/lowlevel-tf/trainer/model.py +++ b/census/lowlevel-tf/trainer/model.py @@ -131,6 +131,10 @@ def model_fn(mode, label_indices = table.lookup(labels) # Make labels a vector label_indices_vector = tf.squeeze(label_indices) + # global_step is necessary in eval to correctly load the step + # of the checkpoint we are evaluating + global_step = tf.contrib.framework.get_or_create_global_step() + tf.logging.info(global_step.name) if mode == PREDICT: # Convert predicted_indices back into strings @@ -144,7 +148,6 @@ def model_fn(mode, cross_entropy = tf.reduce_mean( tf.nn.sparse_softmax_cross_entropy_with_logits( logits=logits, labels=label_indices_vector)) - global_step = tf.contrib.framework.get_or_create_global_step() train_op = tf.train.GradientDescentOptimizer(learning_rate).minimize( cross_entropy, global_step=global_step) return train_op, global_step diff --git a/census/lowlevel-tf/trainer/task.py b/census/lowlevel-tf/trainer/task.py index 3d187f8de..c71b2f620 100644 --- a/census/lowlevel-tf/trainer/task.py +++ b/census/lowlevel-tf/trainer/task.py @@ -44,7 +44,7 @@ def __init__(self, self._checkpoint_dir = checkpoint_dir self._kwargs = kwargs self._eval_every = eval_every_n_checkpoints - self._last_checkpoint = None + self._latest_checkpoint = None self._checkpoints_since_eval = 0 self._graph = graph with graph.as_default(): @@ -55,58 +55,68 @@ def __init__(self, tf.summary.scalar(name, value_op) for name, value_op in value_dict.iteritems() ]) - + self._gs = tf.contrib.framework.get_or_create_global_step() self._final_ops_dict = value_dict self._eval_ops = update_dict.values() self._eval_lock = threading.Lock() + self._checkpoint_lock = threading.Lock() self._file_writer = tf.summary.FileWriter(checkpoint_dir, graph=graph) def after_run(self, run_context, run_values): - if not self._eval_lock.acquire(False): - return - - latest = tf.train.latest_checkpoint(self._checkpoint_dir) - if not latest == self._last_checkpoint: - self._checkpoints_since_eval += 1 - - if self._checkpoints_since_eval > self._eval_every: - self._checkpoints_since_eval = 0 - self._run_eval(latest) - - self._eval_lock.release() + # Always check for new checkpoints in case a single evaluation + # takes longer than checkpoint frequency and _eval_every is >1 + self._update_latest_checkpoint() + if self._eval_lock.acquire(False): + if self._checkpoints_since_eval > self._eval_every: + self._checkpoints_since_eval = 0 + self._run_eval() + + self._eval_lock.release() + + def _update_latest_checkpoint(self): + if self._checkpoint_lock.acquire(False): + latest = tf.train.latest_checkpoint(self._checkpoint_dir) + if not latest == self._latest_checkpoint: + self._checkpoints_since_eval += 1 + self._latest_checkpoint = latest + self._checkpoint_lock.release() def end(self, session): # Block to ensure we always eval at the end self._eval_lock.acquire() latest = tf.train.latest_checkpoint(self._checkpoint_dir) - self._run_eval(latest) + self._run_eval() self._eval_lock.release() - def _run_eval(self, latest): - with self._graph.as_default(): - gs = tf.contrib.framework.get_or_create_global_step() + def _run_eval(self): + coord = tf.train.Coordinator(clean_stop_exception_types=( + tf.errors.CancelledError, tf.errors.OutOfRangeError)) + + with tf.Session(graph=self._graph) as session: saver = tf.train.Saver() - coord = tf.train.Coordinator(clean_stop_exception_types=( - tf.errors.CancelledError, tf.errors.OutOfRangeError)) - - with tf.Session() as session: - session.run([tf.local_variables_initializer(), tf.tables_initializer()]) - tf.train.start_queue_runners(coord=coord, sess=session) - saver.restore(session, latest) - train_step = session.run(gs) - tf.logging.info('Starting Evaluation For Step: {}'.format(train_step)) - with coord.stop_on_exception(): - eval_step = 0 - while self._eval_steps is None or eval_step < self._eval_steps: - summaries, final_values, _ = session.run( - [self._summary_op, self._final_ops_dict, self._eval_ops]) - if eval_step % 100 == 0: - tf.logging.info("On Evaluation Step: {}".format(eval_step)) - eval_step += 1 - - self._file_writer.add_summary(summaries, global_step=train_step) - tf.logging.info(final_values) + + saver.restore(session, self._latest_checkpoint) + + session.run([ + tf.local_variables_initializer(), + tf.tables_initializer() + ]) + train_step = session.run(self._gs) + + tf.train.start_queue_runners(coord=coord, sess=session) + tf.logging.info('Starting Evaluation For Step: {}'.format(train_step)) + with coord.stop_on_exception(): + eval_step = 0 + while self._eval_steps is None or eval_step < self._eval_steps: + summaries, final_values, _ = session.run( + [self._summary_op, self._final_ops_dict, self._eval_ops]) + if eval_step % 100 == 0: + tf.logging.info("On Evaluation Step: {}".format(eval_step)) + eval_step += 1 + + self._file_writer.add_summary(summaries, global_step=train_step) + tf.logging.info(final_values) def run(target, is_chief, From a15d207d746eddff82a9af7d12055fa2c1ac319a Mon Sep 17 00:00:00 2001 From: Puneith Kaul Date: Wed, 1 Mar 2017 16:39:03 -0800 Subject: [PATCH 04/22] added hptuning --- census/lowlevel-tf/README.md | 34 +++++++++++++ census/lowlevel-tf/hptuning_config.yaml | 22 +++++++++ census/lowlevel-tf/trainer/task.py | 66 ++++++++++++++++++++++--- 3 files changed, 115 insertions(+), 7 deletions(-) create mode 100644 census/lowlevel-tf/hptuning_config.yaml diff --git a/census/lowlevel-tf/README.md b/census/lowlevel-tf/README.md index 0426d8b57..f1ea0d308 100644 --- a/census/lowlevel-tf/README.md +++ b/census/lowlevel-tf/README.md @@ -15,6 +15,10 @@ on Google Cloud Storage: * Training file is `adult.data.csv` * Evaluation file is `adult.test.csv` +### Disclaimer +The source of this dataset is from a third party. Google provides no representation, +warranty, or other guarantees about the validity or any other aspects of this dataset. + ``` export CENSUS_DATA=census_data export TRAIN_FILE=adult.data.csv @@ -161,3 +165,33 @@ gcloud beta ml jobs submit training $JOB_NAME \ --max_steps $MAX_STEPS \ --output_dir $GCS_OUTPUT_DIR ``` + +# Hyperparameter Tuning +Cloud ML Engine allows you to perform Hyperparameter tuning to find out the +most optimal hyperparameters. See [Overview of Hyperparameter Tuning] +(https://cloud.google.com/ml/docs/concepts/hyperparameter-tuning-overview) for more details. + +## Running Hyperparameter Job + +Running Hyperparameter job is almost exactly same as Training job except that +you need to add the `--config` argument. + +``` +export HPTUNING_CONFIG=hptuning_config.yaml +``` + +``` +gcloud beta ml jobs submit training $JOB_NAME \ + --scale-tier $SCALE_TIER \ + --runtime-version 1.0 \ + --config $HPTUNING_CONFIG + --job-dir $GCS_JOB_DIR \ + --module-name trainer.task \ + --package-path trainer/ \ + --region us-central1 \ + -- \ + --train_data_path $TRAIN_GCS_FILE \ + --eval_data_path $EVAL_GCS_FILE \ + --max_steps $MAX_STEPS \ + --output_dir $GCS_OUTPUT_DIR +``` diff --git a/census/lowlevel-tf/hptuning_config.yaml b/census/lowlevel-tf/hptuning_config.yaml new file mode 100644 index 000000000..8d6c96d0f --- /dev/null +++ b/census/lowlevel-tf/hptuning_config.yaml @@ -0,0 +1,22 @@ +trainingInput: + hyperparameters: + goal: MAXIMIZE + hyperparameterMetricTag: accuracy + maxTrials: 4 + maxParallelTrials: 2 + params: + - parameterName: first_layer_size + type: INTEGER + minValue: 50 + maxValue: 500 + scaleType: UNIT_LINEAR_SCALE + - parameterName: num_layers + type: INTEGER + minValue: 1 + maxValue: 15 + scaleType: UNIT_LINEAR_SCALE + - parameterName: scale_factor + type: DOUBLE + minValue: 0.1 + maxValue: 1.0 + scaleType: UNIT_REVERSE_LOG_SCALE diff --git a/census/lowlevel-tf/trainer/task.py b/census/lowlevel-tf/trainer/task.py index 3d187f8de..41410e3ee 100644 --- a/census/lowlevel-tf/trainer/task.py +++ b/census/lowlevel-tf/trainer/task.py @@ -110,21 +110,56 @@ def _run_eval(self, latest): def run(target, is_chief, + trial_id, max_steps, output_dir, train_data_path, eval_data_path, - train_batch_size=40, - eval_batch_size=40, + train_batch_size, + eval_batch_size, + learning_rate, + first_layer_size, + num_layers, + scale_factor, eval_num_epochs=1, eval_every=100, - hidden_units=[100, 70, 50, 20], eval_steps=None, eval_interval_secs=1, - learning_rate=0.1, num_epochs=None): + """Run the training and evaluation graph. + + Args: + target (string): Tensorflow server target + is_chief (bool): Boolean flag to specify a chief server + max_steps (int): Maximum training steps + train_data_path (string): List of CSV files to read train data + eval_data_path (string): List of CSV files to read eval data + output_dir (string): Output directory for model and checkpoint + train_batch_size (int): Batch size for training + eval_batch_size (int): Batch size for evaluation + learning_rate (float): Learning rate for Gradient Descent + first_layer_size (int): Size of the first DNN layer + num_layers (int): Number of hidden layers in the DNN + scale_factor (float): Decay rate for the size of hidden layers + eval_every (int): Run evaluation frequency + eval_steps (int): Eval steps + num_epochs (int): Number of epochs + """ + + # Add trial_id to the output path when doing Hyperparameter tuning + # so that output for each run goes in its own trial. + if trial_id: + output_dir = os.path.join(output_dir, trial_id) + + # Calculate the number of hidden units + hidden_units=[ + max(2, int(first_layer_size * scale_factor**i)) + for i in range(num_layers) + ] + if is_chief: + tf.logging.info("Created DNN hidden units {}".format(hidden_units)) evaluation_graph = tf.Graph() with evaluation_graph.as_default(): features, labels = model.input_fn( @@ -189,17 +224,18 @@ def dispatch(*args, **kwargs): tf_config = os.environ.get('TF_CONFIG') if not tf_config: - return run('', True, *args, **kwargs) + return run('', True, None, *args, **kwargs) tf_config_json = json.loads(tf_config) cluster = tf_config_json.get('cluster') job_name = tf_config_json.get('task').get('type') task_index = tf_config_json.get('task').get('index') + trial_id = tf_config_json.get('task').get('trial') # If cluster information is empty run local if job_name is None or task_index is None: - return run('', True, *args, **kwargs) + return run('', True, None, *args, **kwargs) cluster_spec = tf.train.ClusterSpec(cluster) server = tf.train.Server(cluster_spec, @@ -210,7 +246,7 @@ def dispatch(*args, **kwargs): server.join() return elif job_name in ['master', 'worker']: - return run(server.target, job_name == 'master', *args, **kwargs) + return run(server.target, job_name == 'master', trial_id, *args, **kwargs) @@ -240,6 +276,22 @@ def dispatch(*args, **kwargs): type=int, default=40, help='Batch size for evaluation steps') + parser.add_argument('--learning_rate', + type=float, + default=0.5, + help='Learning rate for SGD') + parser.add_argument('--first_layer_size', + type=int, + default=100, + help='Number of nodes in the first layer of DNN') + parser.add_argument('--num_layers', + type=int, + default=4, + help='Number of layers in DNN') + parser.add_argument('--scale_factor', + type=float, + default=0.7, + help='Rate of decay size of layer for DNN') parse_args, unknown = parser.parse_known_args() dispatch(**parse_args.__dict__) From d5d8c3676cdde27012874b19da1a7011fdf605c9 Mon Sep 17 00:00:00 2001 From: Eli Bixby Date: Wed, 1 Mar 2017 16:42:12 -0800 Subject: [PATCH 05/22] Initial commit of export code (#3) --- census/lowlevel-tf/trainer/model.py | 23 +++++++------ census/lowlevel-tf/trainer/task.py | 50 ++++++++++++++++++++++++++--- 2 files changed, 57 insertions(+), 16 deletions(-) diff --git a/census/lowlevel-tf/trainer/model.py b/census/lowlevel-tf/trainer/model.py index d32f24da9..7984dec50 100644 --- a/census/lowlevel-tf/trainer/model.py +++ b/census/lowlevel-tf/trainer/model.py @@ -134,13 +134,12 @@ def model_fn(mode, # global_step is necessary in eval to correctly load the step # of the checkpoint we are evaluating global_step = tf.contrib.framework.get_or_create_global_step() - tf.logging.info(global_step.name) if mode == PREDICT: # Convert predicted_indices back into strings return { - 'predictions': tf.gather(predicted_indices, label_values), - 'confidence': tf.gather(predicted_indices, probabilities) + 'predictions': tf.gather(label_values, predicted_indices), + 'confidence': tf.gather(probabilities, predicted_indices) } if mode == TRAIN: @@ -162,19 +161,19 @@ def model_fn(mode, def build_serving_inputs(mode, default_batch_size=None): if mode == 'CSV': - placeholders = [tf.placeholder( + placeholders = {'csv_row': tf.placeholder( shape=[default_batch_size], - dtype=tf.string, - name='csv_rows' - )] - features = parse_csv(placeholders[0]) + dtype=tf.string + )} + features = parse_csv(placeholders['csv_row']) + features.pop(LABEL_COLUMN) else: feature_spec = {} for feat in CONTINUOUS_COLS: - feature_spec[feat] = tf.train.FixedLenFeature([], tf.int32) + feature_spec[feat] = tf.FixedLenFeature(shape=[], dtype=tf.float32) for feat, _ in CATEGORICAL_COLS: - feature_spec[feat] = tf.train.FixedLenFeature([], tf.string) + feature_spec[feat] = tf.FixedLenFeature(shape=[], dtype=tf.string) tf_record = tf.placeholder( shape=[default_batch_size], @@ -187,9 +186,9 @@ def build_serving_inputs(mode, default_batch_size=None): for key, tensor in feature_scalars.iteritems() } if mode == 'TF_RECORD': - placeholders = [tf_record] + placeholders = {'tf_record': tf_record} else: - placeholders = feature_scalars.values() + placeholders = feature_scalars return features, placeholders diff --git a/census/lowlevel-tf/trainer/task.py b/census/lowlevel-tf/trainer/task.py index 9e24b90ff..6890452fa 100644 --- a/census/lowlevel-tf/trainer/task.py +++ b/census/lowlevel-tf/trainer/task.py @@ -55,6 +55,7 @@ def __init__(self, tf.summary.scalar(name, value_op) for name, value_op in value_dict.iteritems() ]) + self._saver = tf.train.Saver() self._gs = tf.contrib.framework.get_or_create_global_step() self._final_ops_dict = value_dict self._eval_ops = update_dict.values() @@ -94,17 +95,15 @@ def _run_eval(self): tf.errors.CancelledError, tf.errors.OutOfRangeError)) with tf.Session(graph=self._graph) as session: - saver = tf.train.Saver() - - saver.restore(session, self._latest_checkpoint) + self._saver.restore(session, self._latest_checkpoint) session.run([ tf.local_variables_initializer(), tf.tables_initializer() ]) + tf.train.start_queue_runners(coord=coord, sess=session) train_step = session.run(self._gs) - tf.train.start_queue_runners(coord=coord, sess=session) tf.logging.info('Starting Evaluation For Step: {}'.format(train_step)) with coord.stop_on_exception(): eval_step = 0 @@ -226,6 +225,49 @@ def run(target, while (max_steps is None or step < max_steps) and not coord.should_stop(): step, _ = session.run([global_step_tensor, train_op]) + latest_checkpoint = tf.train.latest_checkpoint(output_dir) + for mode in ['CSV', 'TF_RECORD', 'JSON']: + build_and_run_exports(latest_checkpoint, output_dir, mode, hidden_units, learning_rate) + + +def build_and_run_exports(latest, output_dir, mode, hidden_units, learning_rate): + prediction_graph = tf.Graph() + exporter = tf.saved_model.builder.SavedModelBuilder(os.path.join(output_dir, mode)) + with prediction_graph.as_default(): + features, placeholder_dict = model.build_serving_inputs(mode) + prediction_dict = model.model_fn( + model.PREDICT, + features, + None, # labels + hidden_units=hidden_units, + learning_rate=learning_rate + ) + saver = tf.train.Saver() + + placeholder_info = { + name: tf.saved_model.utils.build_tensor_info(tensor) + for name, tensor in placeholder_dict.iteritems() + } + output_info = { + name: tf.saved_model.utils.build_tensor_info(tensor) + for name, tensor in prediction_dict.iteritems() + } + signature_def = tf.saved_model.signature_def_utils.build_signature_def( + inputs=placeholder_info, outputs=output_info) + + + with tf.Session(graph=prediction_graph) as session: + session.run([tf.local_variables_initializer(), tf.tables_initializer()]) + saver.restore(session, latest) + exporter.add_meta_graph_and_variables( + session, + tags=[tf.saved_model.tag_constants.SERVING], + signature_def_map={ + tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: signature_def + } + ) + + exporter.save() def dispatch(*args, **kwargs): From 61783b705c65a6e41d16741ec0c27355ed5a6ab5 Mon Sep 17 00:00:00 2001 From: Eli Bixby Date: Wed, 1 Mar 2017 16:55:04 -0800 Subject: [PATCH 06/22] Use PREDICT_METHOD_NAME --- census/lowlevel-tf/trainer/task.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/census/lowlevel-tf/trainer/task.py b/census/lowlevel-tf/trainer/task.py index 6890452fa..581f88103 100644 --- a/census/lowlevel-tf/trainer/task.py +++ b/census/lowlevel-tf/trainer/task.py @@ -253,7 +253,10 @@ def build_and_run_exports(latest, output_dir, mode, hidden_units, learning_rate) for name, tensor in prediction_dict.iteritems() } signature_def = tf.saved_model.signature_def_utils.build_signature_def( - inputs=placeholder_info, outputs=output_info) + inputs=placeholder_info, + outputs=output_info, + method_name=tf.saved_model.signature_constants.PREDICT_METHOD_NAME + ) with tf.Session(graph=prediction_graph) as session: From 1d8d67adc4036c73438e6c0bb8220712c5edc255 Mon Sep 17 00:00:00 2001 From: Puneith Kaul Date: Wed, 1 Mar 2017 17:09:41 -0800 Subject: [PATCH 07/22] added args docstring --- census/lowlevel-tf/trainer/task.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/census/lowlevel-tf/trainer/task.py b/census/lowlevel-tf/trainer/task.py index 6ea6e22ca..cf9bcab23 100644 --- a/census/lowlevel-tf/trainer/task.py +++ b/census/lowlevel-tf/trainer/task.py @@ -142,10 +142,11 @@ def run(target, Args: target (string): Tensorflow server target is_chief (bool): Boolean flag to specify a chief server + trial_id (string): Trial id for hyperparameter tuning max_steps (int): Maximum training steps + output_dir (string): Output dir for checkpoint and summary train_data_path (string): List of CSV files to read train data eval_data_path (string): List of CSV files to read eval data - output_dir (string): Output directory for model and checkpoint train_batch_size (int): Batch size for training eval_batch_size (int): Batch size for evaluation learning_rate (float): Learning rate for Gradient Descent From 5b7d48aced17fa779cb53d6e03391f6a25e8b542 Mon Sep 17 00:00:00 2001 From: Puneith Kaul Date: Wed, 1 Mar 2017 17:49:04 -0800 Subject: [PATCH 08/22] fixed underscores and job-dir --- census/lowlevel-tf/trainer/task.py | 58 ++++++++++++++---------------- 1 file changed, 27 insertions(+), 31 deletions(-) diff --git a/census/lowlevel-tf/trainer/task.py b/census/lowlevel-tf/trainer/task.py index cf9bcab23..868d1e273 100644 --- a/census/lowlevel-tf/trainer/task.py +++ b/census/lowlevel-tf/trainer/task.py @@ -47,6 +47,7 @@ def __init__(self, self._latest_checkpoint = None self._checkpoints_since_eval = 0 self._graph = graph + with graph.as_default(): value_dict, update_dict = tf.contrib.metrics.aggregate_metric_map( metric_dict) @@ -63,6 +64,9 @@ def __init__(self, self._checkpoint_lock = threading.Lock() self._file_writer = tf.summary.FileWriter(checkpoint_dir, graph=graph) + #def before_run(self, run_context): + # return tf.train.SessionRunArgs(self._gs) + def after_run(self, run_context, run_values): # Always check for new checkpoints in case a single evaluation # takes longer than checkpoint frequency and _eval_every is >1 @@ -120,11 +124,10 @@ def _run_eval(self): def run(target, is_chief, - trial_id, max_steps, - output_dir, - train_data_path, - eval_data_path, + job_dir, + train_data_paths, + eval_data_paths, train_batch_size, eval_batch_size, learning_rate, @@ -142,11 +145,10 @@ def run(target, Args: target (string): Tensorflow server target is_chief (bool): Boolean flag to specify a chief server - trial_id (string): Trial id for hyperparameter tuning max_steps (int): Maximum training steps - output_dir (string): Output dir for checkpoint and summary - train_data_path (string): List of CSV files to read train data - eval_data_path (string): List of CSV files to read eval data + job_dir (string): Output dir for checkpoint and summary + train_data_paths (string): List of CSV files to read train data + eval_data_paths (string): List of CSV files to read eval data train_batch_size (int): Batch size for training eval_batch_size (int): Batch size for evaluation learning_rate (float): Learning rate for Gradient Descent @@ -158,11 +160,6 @@ def run(target, num_epochs (int): Number of epochs """ - # Add trial_id to the output path when doing Hyperparameter tuning - # so that output for each run goes in its own trial. - if trial_id: - output_dir = os.path.join(output_dir, trial_id) - # Calculate the number of hidden units hidden_units=[ max(2, int(first_layer_size * scale_factor**i)) @@ -174,7 +171,7 @@ def run(target, evaluation_graph = tf.Graph() with evaluation_graph.as_default(): features, labels = model.input_fn( - eval_data_path, + eval_data_paths, num_epochs=eval_num_epochs, batch_size=eval_batch_size, shuffle=False @@ -188,7 +185,7 @@ def run(target, learning_rate=learning_rate ) hooks = [EvalRepeatedlyHook( - output_dir, + job_dir, metric_dict, evaluation_graph, eval_steps=eval_steps, @@ -199,7 +196,7 @@ def run(target, with tf.Graph().as_default(): with tf.device(tf.train.replica_device_setter()): features, labels = model.input_fn( - train_data_path, + train_data_paths, num_epochs=num_epochs, batch_size=train_batch_size ) @@ -215,7 +212,7 @@ def run(target, with tf.train.MonitoredTrainingSession(master=target, is_chief=is_chief, - checkpoint_dir=output_dir, + checkpoint_dir=job_dir, hooks=hooks, save_checkpoint_secs=2, save_summaries_steps=50) as session: @@ -236,18 +233,17 @@ def dispatch(*args, **kwargs): # If TF_CONFIG not available run local if not tf_config: - return run('', True, None, *args, **kwargs) + return run('', True, *args, **kwargs) tf_config_json = json.loads(tf_config) cluster = tf_config_json.get('cluster') job_name = tf_config_json.get('task').get('type') task_index = tf_config_json.get('task').get('index') - trial_id = tf_config_json.get('task').get('trial') # If cluster information is empty run local if job_name is None or task_index is None: - return run('', True, None, *args, **kwargs) + return run('', True, *args, **kwargs) cluster_spec = tf.train.ClusterSpec(cluster) server = tf.train.Server(cluster_spec, @@ -258,49 +254,49 @@ def dispatch(*args, **kwargs): server.join() return elif job_name in ['master', 'worker']: - return run(server.target, job_name == 'master', trial_id, *args, **kwargs) + return run(server.target, job_name == 'master', *args, **kwargs) if __name__ == "__main__": parser = argparse.ArgumentParser() - parser.add_argument('--train_data_path', + parser.add_argument('--train-data-paths', required=True, type=str, help='Training file location', nargs='+') - parser.add_argument('--eval_data_path', + parser.add_argument('--eval-data-paths', required=True, type=str, help='Evaluation file location', nargs='+') - parser.add_argument('--output_dir', + parser.add_argument('--job-dir', required=True, type=str, help='GCS or local dir to write checkpoints and export model') - parser.add_argument('--max_steps', + parser.add_argument('--max-steps', type=int, default=1000, help='Maximum number of training steps to perform') - parser.add_argument('--train_batch_size', + parser.add_argument('--train-batch-size', type=int, default=40, help='Batch size for training steps') - parser.add_argument('--eval_batch_size', + parser.add_argument('--eval-batch-size', type=int, default=40, help='Batch size for evaluation steps') - parser.add_argument('--learning_rate', + parser.add_argument('--learning-rate', type=float, default=0.5, help='Learning rate for SGD') - parser.add_argument('--first_layer_size', + parser.add_argument('--first-layer-size', type=int, default=100, help='Number of nodes in the first layer of DNN') - parser.add_argument('--num_layers', + parser.add_argument('--num-layers', type=int, default=4, help='Number of layers in DNN') - parser.add_argument('--scale_factor', + parser.add_argument('--scale-factor', type=float, default=0.7, help='Rate of decay size of layer for DNN') From 03de5d57a1612e59067ea9a28ea074e72b92a833 Mon Sep 17 00:00:00 2001 From: Puneith Kaul Date: Wed, 1 Mar 2017 17:49:29 -0800 Subject: [PATCH 09/22] fixed underscores and job-dir --- census/lowlevel-tf/trainer/task.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/census/lowlevel-tf/trainer/task.py b/census/lowlevel-tf/trainer/task.py index 868d1e273..6233d2c4a 100644 --- a/census/lowlevel-tf/trainer/task.py +++ b/census/lowlevel-tf/trainer/task.py @@ -64,12 +64,13 @@ def __init__(self, self._checkpoint_lock = threading.Lock() self._file_writer = tf.summary.FileWriter(checkpoint_dir, graph=graph) - #def before_run(self, run_context): - # return tf.train.SessionRunArgs(self._gs) + def before_run(self, run_context): + return tf.train.SessionRunArgs(self._gs) def after_run(self, run_context, run_values): # Always check for new checkpoints in case a single evaluation # takes longer than checkpoint frequency and _eval_every is >1 + self._gs = run_values.values self._update_latest_checkpoint() if self._eval_lock.acquire(False): if self._checkpoints_since_eval > self._eval_every: From 9d512b3c58c7fa9db78f57824586cf136b7549c6 Mon Sep 17 00:00:00 2001 From: Puneith Kaul Date: Wed, 1 Mar 2017 17:51:24 -0800 Subject: [PATCH 10/22] fixed underscores and job-dir --- census/lowlevel-tf/trainer/task.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/census/lowlevel-tf/trainer/task.py b/census/lowlevel-tf/trainer/task.py index 6233d2c4a..8ac239041 100644 --- a/census/lowlevel-tf/trainer/task.py +++ b/census/lowlevel-tf/trainer/task.py @@ -64,13 +64,9 @@ def __init__(self, self._checkpoint_lock = threading.Lock() self._file_writer = tf.summary.FileWriter(checkpoint_dir, graph=graph) - def before_run(self, run_context): - return tf.train.SessionRunArgs(self._gs) - def after_run(self, run_context, run_values): # Always check for new checkpoints in case a single evaluation # takes longer than checkpoint frequency and _eval_every is >1 - self._gs = run_values.values self._update_latest_checkpoint() if self._eval_lock.acquire(False): if self._checkpoints_since_eval > self._eval_every: From 5aa9ff1e0d4798f76d541dfc011c8411e4a9ebdf Mon Sep 17 00:00:00 2001 From: Eli Bixby Date: Wed, 1 Mar 2017 17:52:57 -0800 Subject: [PATCH 11/22] Updates to export --- census/lowlevel-tf/trainer/task.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/census/lowlevel-tf/trainer/task.py b/census/lowlevel-tf/trainer/task.py index 581f88103..cbce982ad 100644 --- a/census/lowlevel-tf/trainer/task.py +++ b/census/lowlevel-tf/trainer/task.py @@ -85,8 +85,8 @@ def _update_latest_checkpoint(self): def end(self, session): # Block to ensure we always eval at the end + self._update_latest_checkpoint() self._eval_lock.acquire() - latest = tf.train.latest_checkpoint(self._checkpoint_dir) self._run_eval() self._eval_lock.release() @@ -267,7 +267,8 @@ def build_and_run_exports(latest, output_dir, mode, hidden_units, learning_rate) tags=[tf.saved_model.tag_constants.SERVING], signature_def_map={ tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: signature_def - } + }, + main_op=tf.tables_initializer() ) exporter.save() From b27a9b3be188fe7ae3daca3364910572c2ae29fd Mon Sep 17 00:00:00 2001 From: Eli Bixby Date: Wed, 1 Mar 2017 18:25:51 -0800 Subject: [PATCH 12/22] Fix merge errors and partial update README --- census/lowlevel-tf/README.md | 20 ++++++++++---------- census/lowlevel-tf/trainer/model.py | 2 +- census/lowlevel-tf/trainer/task.py | 11 ++++++----- 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/census/lowlevel-tf/README.md b/census/lowlevel-tf/README.md index 65efcc031..1b817f41b 100644 --- a/census/lowlevel-tf/README.md +++ b/census/lowlevel-tf/README.md @@ -66,10 +66,10 @@ export OUTPUT_DIR=census_output ``` ``` -python trainer/task.py --train_data_path $CENSUS_DATA/$TRAIN_FILE \ - --eval_data_path $CENSUS_DATA/$EVAL_FILE \ - --output_dir $OUTPUT_DIR - [--max_steps $MAX_STEPS] +python trainer/task.py --train-data-path $CENSUS_DATA/$TRAIN_FILE \ + --eval-data-path $CENSUS_DATA/$EVAL_FILE \ + --output-dir $OUTPUT_DIR + [--max-steps $MAX_STEPS] ``` ### Using gcloud local @@ -85,9 +85,9 @@ export OUTPUT_DIR=census_output gcloud beta ml local train --package-path trainer \ --module-name trainer.task \ -- \ - --train_data_path $CENSUS_DATA/$TRAIN_FILE \ - --eval_data_path $CENSUS_DATA/$EVAL_FILE \ - --output_dir $OUTPUT_DIR + --train-data-path $CENSUS_DATA/$TRAIN_FILE \ + --eval-data-path $CENSUS_DATA/$EVAL_FILE \ + --output-dir $OUTPUT_DIR ``` ### Using Cloud ML Engine @@ -106,9 +106,9 @@ gcloud beta ml jobs submit training $JOB_NAME \ --package-path trainer/ \ --region us-central1 \ -- \ - --train_data_path $TRAIN_GCS_FILE \ - --eval_data_path $EVAL_GCS_FILE \ - --output_dir $GCS_OUTPUT_DIR + --train-data-path $TRAIN_GCS_FILE \ + --eval-data-path $EVAL_GCS_FILE \ + --output-dir $GCS_OUTPUT_DIR ``` ## Accuracy and Output You should see the output for default number of training steps and approx accuracy close to `80.25%`. diff --git a/census/lowlevel-tf/trainer/model.py b/census/lowlevel-tf/trainer/model.py index 087c62326..4e182fecc 100644 --- a/census/lowlevel-tf/trainer/model.py +++ b/census/lowlevel-tf/trainer/model.py @@ -62,7 +62,7 @@ def model_fn(mode, features, labels, hidden_units=[100, 70, 50, 20], - learning_rate=0.5): + learning_rate=0.1): """Create a Feed forward network classification network Args: diff --git a/census/lowlevel-tf/trainer/task.py b/census/lowlevel-tf/trainer/task.py index ba8949ee3..2cb5cf2ec 100644 --- a/census/lowlevel-tf/trainer/task.py +++ b/census/lowlevel-tf/trainer/task.py @@ -220,14 +220,15 @@ def run(target, while (max_steps is None or step < max_steps) and not coord.should_stop(): step, _ = session.run([global_step_tensor, train_op]) - latest_checkpoint = tf.train.latest_checkpoint(output_dir) + latest_checkpoint = tf.train.latest_checkpoint(job_dir) for mode in ['CSV', 'TF_RECORD', 'JSON']: - build_and_run_exports(latest_checkpoint, output_dir, mode, hidden_units, learning_rate) + build_and_run_exports(latest_checkpoint, job_dir, mode, hidden_units, learning_rate) -def build_and_run_exports(latest, output_dir, mode, hidden_units, learning_rate): +def build_and_run_exports(latest, job_dir, mode, hidden_units, learning_rate): prediction_graph = tf.Graph() - exporter = tf.saved_model.builder.SavedModelBuilder(os.path.join(output_dir, mode)) + exporter = tf.saved_model.builder.SavedModelBuilder( + os.path.join(job_dir, 'exports', mode)) with prediction_graph.as_default(): features, placeholder_dict = model.build_serving_inputs(mode) prediction_dict = model.model_fn( @@ -329,7 +330,7 @@ def dispatch(*args, **kwargs): help='Batch size for evaluation steps') parser.add_argument('--learning-rate', type=float, - default=0.5, + default=0.1, help='Learning rate for SGD') parser.add_argument('--first-layer-size', type=int, From 49ddd49f67d5f388dc0d5a00a95238223a47117f Mon Sep 17 00:00:00 2001 From: Eli Bixby Date: Wed, 1 Mar 2017 18:41:40 -0800 Subject: [PATCH 13/22] Separate eval run so TensorBoard works --- census/lowlevel-tf/trainer/task.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/census/lowlevel-tf/trainer/task.py b/census/lowlevel-tf/trainer/task.py index 2cb5cf2ec..cbed78d94 100644 --- a/census/lowlevel-tf/trainer/task.py +++ b/census/lowlevel-tf/trainer/task.py @@ -63,7 +63,8 @@ def __init__(self, self._eval_lock = threading.Lock() self._checkpoint_lock = threading.Lock() - self._file_writer = tf.summary.FileWriter(checkpoint_dir, graph=graph) + self._file_writer = tf.summary.FileWriter( + os.path.join(checkpoint_dir, 'eval'), graph=graph) def after_run(self, run_context, run_values): # Always check for new checkpoints in case a single evaluation From 0cb55bd01a4e64f93644177740cb001d601eedd1 Mon Sep 17 00:00:00 2001 From: Puneith Kaul Date: Wed, 1 Mar 2017 19:12:52 -0800 Subject: [PATCH 14/22] changed param names in README --- census/lowlevel-tf/README.md | 41 +++++++++++++----------------- census/lowlevel-tf/trainer/task.py | 8 +++--- 2 files changed, 22 insertions(+), 27 deletions(-) diff --git a/census/lowlevel-tf/README.md b/census/lowlevel-tf/README.md index 65efcc031..de7e2045d 100644 --- a/census/lowlevel-tf/README.md +++ b/census/lowlevel-tf/README.md @@ -66,10 +66,10 @@ export OUTPUT_DIR=census_output ``` ``` -python trainer/task.py --train_data_path $CENSUS_DATA/$TRAIN_FILE \ - --eval_data_path $CENSUS_DATA/$EVAL_FILE \ - --output_dir $OUTPUT_DIR - [--max_steps $MAX_STEPS] +python trainer/task.py --train-data-paths $CENSUS_DATA/$TRAIN_FILE \ + --eval-data-paths $CENSUS_DATA/$EVAL_FILE \ + --job-dir $OUTPUT_DIR + [--max-steps $MAX_STEPS] ``` ### Using gcloud local @@ -85,9 +85,9 @@ export OUTPUT_DIR=census_output gcloud beta ml local train --package-path trainer \ --module-name trainer.task \ -- \ - --train_data_path $CENSUS_DATA/$TRAIN_FILE \ - --eval_data_path $CENSUS_DATA/$EVAL_FILE \ - --output_dir $OUTPUT_DIR + --train-data-paths $CENSUS_DATA/$TRAIN_FILE \ + --eval-data-paths $CENSUS_DATA/$EVAL_FILE \ + --job-dir $OUTPUT_DIR ``` ### Using Cloud ML Engine @@ -95,7 +95,6 @@ Run the code on Cloud ML Engine using `gcloud`: ``` export GCS_JOB_DIR=gs:///path/to/my/jobs/job3 -export GCS_OUTPUT_DIR=gs:///path/to/my/models/run3 ``` ``` @@ -106,9 +105,8 @@ gcloud beta ml jobs submit training $JOB_NAME \ --package-path trainer/ \ --region us-central1 \ -- \ - --train_data_path $TRAIN_GCS_FILE \ - --eval_data_path $EVAL_GCS_FILE \ - --output_dir $GCS_OUTPUT_DIR + --train-data-paths $TRAIN_GCS_FILE \ + --eval-data-paths $EVAL_GCS_FILE ``` ## Accuracy and Output You should see the output for default number of training steps and approx accuracy close to `80.25%`. @@ -161,10 +159,10 @@ gcloud beta ml local train --package-path trainer \ --worker-count $WORKER_COUNT \ --distributed \ -- \ - --train_data_path $CENSUS_DATA/$TRAIN_FILE \ - --eval_data_path $CENSUS_DATA/$EVAL_FILE \ - --max_steps $MAX_STEPS \ - --output_dir $OUTPUT_DIR + --train-data-paths $CENSUS_DATA/$TRAIN_FILE \ + --eval-data-paths $CENSUS_DATA/$EVAL_FILE \ + --max-steps $MAX_STEPS \ + --job-dir $OUTPUT_DIR ``` ### Using Cloud ML Engine @@ -173,7 +171,6 @@ Run the distributed training code on cloud using `gcloud`. ``` export SCALE_TIER=STANDARD_1 export GCS_JOB_DIR=gs:///path/to/my/models/run3 -export GCS_OUTPUT_DIR=gs:///path/to/my/models/run3 ``` ``` @@ -185,9 +182,8 @@ gcloud beta ml jobs submit training $JOB_NAME \ --package-path trainer/ \ --region us-central1 \ -- \ - --train_data_path $TRAIN_GCS_FILE \ - --eval_data_path $EVAL_GCS_FILE \ - --output_dir $GCS_OUTPUT_DIR + --train-data-paths $TRAIN_GCS_FILE \ + --eval-data-paths $EVAL_GCS_FILE ``` # Hyperparameter Tuning @@ -214,8 +210,7 @@ gcloud beta ml jobs submit training $JOB_NAME \ --package-path trainer/ \ --region us-central1 \ -- \ - --train_data_path $TRAIN_GCS_FILE \ - --eval_data_path $EVAL_GCS_FILE \ - --max_steps $MAX_STEPS \ - --output_dir $GCS_OUTPUT_DIR + --train-data-paths $TRAIN_GCS_FILE \ + --eval-data-paths $EVAL_GCS_FILE \ + --max-steps $MAX_STEPS ``` diff --git a/census/lowlevel-tf/trainer/task.py b/census/lowlevel-tf/trainer/task.py index f79ab8ff2..2b9734b98 100644 --- a/census/lowlevel-tf/trainer/task.py +++ b/census/lowlevel-tf/trainer/task.py @@ -220,14 +220,14 @@ def run(target, while (max_steps is None or step < max_steps) and not coord.should_stop(): step, _ = session.run([global_step_tensor, train_op]) - latest_checkpoint = tf.train.latest_checkpoint(output_dir) + latest_checkpoint = tf.train.latest_checkpoint(job_dir) for mode in ['CSV', 'TF_RECORD', 'JSON']: - build_and_run_exports(latest_checkpoint, output_dir, mode, hidden_units, learning_rate) + build_and_run_exports(latest_checkpoint, job_dir, mode, hidden_units, learning_rate) -def build_and_run_exports(latest, output_dir, mode, hidden_units, learning_rate): +def build_and_run_exports(latest, job_dir, mode, hidden_units, learning_rate): prediction_graph = tf.Graph() - exporter = tf.saved_model.builder.SavedModelBuilder(os.path.join(output_dir, mode)) + exporter = tf.saved_model.builder.SavedModelBuilder(os.path.join(job_dir, mode)) with prediction_graph.as_default(): features, placeholder_dict = model.build_serving_inputs(mode) prediction_dict = model.model_fn( From 6a233f4f5d5b90719c5c7d159fcbd935d838c2f5 Mon Sep 17 00:00:00 2001 From: Puneith Kaul Date: Wed, 1 Mar 2017 21:26:39 -0800 Subject: [PATCH 15/22] added docstring --- census/lowlevel-tf/trainer/task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/census/lowlevel-tf/trainer/task.py b/census/lowlevel-tf/trainer/task.py index cbed78d94..eb9805652 100644 --- a/census/lowlevel-tf/trainer/task.py +++ b/census/lowlevel-tf/trainer/task.py @@ -31,7 +31,7 @@ class EvalRepeatedlyHook(tf.train.SessionRunHook): - + """EvalRepeatedlyHook performs continuous evaluation of the model.""" def __init__(self, checkpoint_dir, metric_dict, From 0653ef213e3ca0602f7e2f7c8cd8d2a898db2a65 Mon Sep 17 00:00:00 2001 From: Puneith Kaul Date: Thu, 2 Mar 2017 00:11:28 -0800 Subject: [PATCH 16/22] added comments in the code --- census/lowlevel-tf/trainer/model.py | 10 ++++++++++ census/lowlevel-tf/trainer/task.py | 30 +++++++++++++++++++++++------ 2 files changed, 34 insertions(+), 6 deletions(-) diff --git a/census/lowlevel-tf/trainer/model.py b/census/lowlevel-tf/trainer/model.py index 4e182fecc..5305a7889 100644 --- a/census/lowlevel-tf/trainer/model.py +++ b/census/lowlevel-tf/trainer/model.py @@ -128,10 +128,13 @@ def model_fn(mode, # Build a Hash Table inside the graph table = tf.contrib.lookup.string_to_index_table_from_tensor( label_values) + # Use the hash table to convert string labels to ints label_indices = table.lookup(labels) + # Make labels a vector label_indices_vector = tf.squeeze(label_indices) + # global_step is necessary in eval to correctly load the step # of the checkpoint we are evaluating global_step = tf.contrib.framework.get_or_create_global_step() @@ -153,6 +156,9 @@ def model_fn(mode, return train_op, global_step if mode == EVAL: + # Return accuracy and area under ROC curve metrics + # See https://en.wikipedia.org/wiki/Receiver_operating_characteristic + # See https://www.kaggle.com/wiki/AreaUnderCurve return { 'accuracy': tf.contrib.metrics.streaming_accuracy( predicted_indices, label_indices), @@ -205,12 +211,16 @@ def parse_csv(rows_string_tensor): features.pop(col) return features + def input_fn(filenames, num_epochs=None, shuffle=True, skip_header_lines=0, batch_size=40): """Generates an input function for training or evaluation. + This uses the input pipeline based approach using file name queue + to read data so that entire data is not loaded in memory. + Args: filenames: [str] list of CSV files to read data from. num_epochs: int how many times through to read the data. diff --git a/census/lowlevel-tf/trainer/task.py b/census/lowlevel-tf/trainer/task.py index eb9805652..ff71cef59 100644 --- a/census/lowlevel-tf/trainer/task.py +++ b/census/lowlevel-tf/trainer/task.py @@ -30,8 +30,8 @@ tf.logging.set_verbosity(tf.logging.INFO) -class EvalRepeatedlyHook(tf.train.SessionRunHook): - """EvalRepeatedlyHook performs continuous evaluation of the model.""" +class EvaluationRunHook(tf.train.SessionRunHook): + """EvaluationRunHook performs continuous evaluation of the model.""" def __init__(self, checkpoint_dir, metric_dict, @@ -86,7 +86,7 @@ def _update_latest_checkpoint(self): self._checkpoint_lock.release() def end(self, session): - # Block to ensure we always eval at the end + """Called at then end of session to make sure we always evaluate.""" self._update_latest_checkpoint() self._eval_lock.acquire() self._run_eval() @@ -119,6 +119,7 @@ def _run_eval(self): self._file_writer.add_summary(summaries, global_step=train_step) tf.logging.info(final_values) + def run(target, is_chief, max_steps, @@ -181,16 +182,18 @@ def run(target, hidden_units=hidden_units, learning_rate=learning_rate ) - hooks = [EvalRepeatedlyHook( + hooks = [EvaluationRunHook( job_dir, metric_dict, evaluation_graph, eval_steps=eval_steps, )] - else: - hooks = [] + # Create a new graph and specify that as default with tf.Graph().as_default(): + # Placement of ops on devices using replica device setter + # which automatically places the parameters on the `ps` server + # and the `ops` on the workers with tf.device(tf.train.replica_device_setter()): features, labels = model.input_fn( train_data_paths, @@ -198,6 +201,7 @@ def run(target, batch_size=train_batch_size ) + # Returns the training graph and global step tensor train_op, global_step_tensor = model.model_fn( model.TRAIN, features, @@ -207,16 +211,30 @@ def run(target, ) + # Creates a MonitoredSession for training + # https://www.tensorflow.org/api_docs/python/tf/train/MonitoredTrainingSession with tf.train.MonitoredTrainingSession(master=target, is_chief=is_chief, checkpoint_dir=job_dir, hooks=hooks, save_checkpoint_secs=2, save_summaries_steps=50) as session: + + # Tuple of exceptions that should cause a clean stop of the coordinator + # https://www.tensorflow.org/api_docs/python/train/coordinator_and_queuerunner#Coordinator coord = tf.train.Coordinator(clean_stop_exception_types=( tf.errors.CancelledError,)) + + # Important to start all queue runners so that data is available + # for reading tf.train.start_queue_runners(coord=coord, sess=session) + + # Global step to keep track of global number of steps particularly in + # distributed setting step = global_step_tensor.eval(session=session) + + # Run the training graph which returns the step number as tracked by + # the global step tensorr with coord.stop_on_exception(): while (max_steps is None or step < max_steps) and not coord.should_stop(): step, _ = session.run([global_step_tensor, train_op]) From 1638f638eeb9d0041b1b3c6cc7f031f358608c64 Mon Sep 17 00:00:00 2001 From: Puneith Kaul Date: Thu, 2 Mar 2017 00:35:48 -0800 Subject: [PATCH 17/22] added more comments --- census/lowlevel-tf/trainer/task.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/census/lowlevel-tf/trainer/task.py b/census/lowlevel-tf/trainer/task.py index ff71cef59..6e1f80fb3 100644 --- a/census/lowlevel-tf/trainer/task.py +++ b/census/lowlevel-tf/trainer/task.py @@ -290,11 +290,15 @@ def build_and_run_exports(latest, job_dir, mode, hidden_units, learning_rate): def dispatch(*args, **kwargs): - """Parse TF_CONFIG to cluster_spec and call run().""" + """Parse TF_CONFIG to cluster_spec and call run() method + TF_CONFIG environment variable is available when running using + gcloud either locally or on cloud. It has all the information required + to create a ClusterSpec which is important for running distributed code. + """ tf_config = os.environ.get('TF_CONFIG') - # If TF_CONFIG not available run local + # If TF_CONFIG is not available run local if not tf_config: return run('', True, *args, **kwargs) @@ -313,6 +317,9 @@ def dispatch(*args, **kwargs): job_name=job_name, task_index=task_index) + # Wait for incoming connections forever + # Worker ships the graph to the ps server + # The ps server manages the parameters if job_name == 'ps': server.join() return From 741c699e22d6d20cdd5b54a46ffc125ffde40727 Mon Sep 17 00:00:00 2001 From: Puneith Kaul Date: Thu, 2 Mar 2017 00:50:23 -0800 Subject: [PATCH 18/22] fixed hptuning command --- census/lowlevel-tf/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/census/lowlevel-tf/README.md b/census/lowlevel-tf/README.md index de7e2045d..0407226cf 100644 --- a/census/lowlevel-tf/README.md +++ b/census/lowlevel-tf/README.md @@ -204,7 +204,7 @@ export HPTUNING_CONFIG=hptuning_config.yaml gcloud beta ml jobs submit training $JOB_NAME \ --scale-tier $SCALE_TIER \ --runtime-version 1.0 \ - --config $HPTUNING_CONFIG + --config $HPTUNING_CONFIG \ --job-dir $GCS_JOB_DIR \ --module-name trainer.task \ --package-path trainer/ \ From 0cc09b129145059687f0ddd62046dfc20a0a83e4 Mon Sep 17 00:00:00 2001 From: Puneith Kaul Date: Thu, 2 Mar 2017 01:01:26 -0800 Subject: [PATCH 19/22] added parse_csv comment --- census/lowlevel-tf/trainer/model.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/census/lowlevel-tf/trainer/model.py b/census/lowlevel-tf/trainer/model.py index 5305a7889..7060b94d4 100644 --- a/census/lowlevel-tf/trainer/model.py +++ b/census/lowlevel-tf/trainer/model.py @@ -66,9 +66,11 @@ def model_fn(mode, """Create a Feed forward network classification network Args: - input_x (tf.placeholder): Feature placeholder input + mode (string): Mode running training, evaluation or prediction + features (dict): Dictionary of input feature Tensors + labels (Tensor): Class label Tensor hidden_units (list): Hidden units - num_classes (int): Number of classes + learning_rate (float): Learning rate for the SGD Returns: Tuple (train_op, accuracy_op, global_step, predictions): Tuple containing @@ -201,7 +203,10 @@ def build_serving_inputs(mode, default_batch_size=None): def parse_csv(rows_string_tensor): - # model_fn expects rank 2 tensors. + """Takes the string input tensor and returns a dict of rank-2 tensors.""" + + # Takes a rank-1 tensor and converts it into rank-2 tensor + # Example if the data is [1,2,3,4] then converts it into [[1],[2],[3],[4]] row_columns = tf.expand_dims(rows_string_tensor, -1) columns = tf.decode_csv(row_columns, record_defaults=CSV_COLUMN_DEFAULTS) features = dict(zip(CSV_COLUMNS, columns)) From a0e3c5e0da8c606ac6286bb3850c8bc171bd0e50 Mon Sep 17 00:00:00 2001 From: Puneith Kaul Date: Thu, 2 Mar 2017 01:14:24 -0800 Subject: [PATCH 20/22] added comments --- census/lowlevel-tf/trainer/task.py | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/census/lowlevel-tf/trainer/task.py b/census/lowlevel-tf/trainer/task.py index 6e1f80fb3..e2cb5d2ff 100644 --- a/census/lowlevel-tf/trainer/task.py +++ b/census/lowlevel-tf/trainer/task.py @@ -164,6 +164,7 @@ def run(target, for i in range(num_layers) ] + # If the server is chief which is `master` if is_chief: tf.logging.info("Created DNN hidden units {}".format(hidden_units)) evaluation_graph = tf.Graph() @@ -182,6 +183,7 @@ def run(target, hidden_units=hidden_units, learning_rate=learning_rate ) + hooks = [EvaluationRunHook( job_dir, metric_dict, @@ -195,6 +197,8 @@ def run(target, # which automatically places the parameters on the `ps` server # and the `ops` on the workers with tf.device(tf.train.replica_device_setter()): + + # Features and label tensors as read using filename queue features, labels = model.input_fn( train_data_paths, num_epochs=num_epochs, @@ -234,17 +238,33 @@ def run(target, step = global_step_tensor.eval(session=session) # Run the training graph which returns the step number as tracked by - # the global step tensorr + # the global step tensor with coord.stop_on_exception(): while (max_steps is None or step < max_steps) and not coord.should_stop(): step, _ = session.run([global_step_tensor, train_op]) + # Find the filename of the latest saved checkpoint file latest_checkpoint = tf.train.latest_checkpoint(job_dir) + for mode in ['CSV', 'TF_RECORD', 'JSON']: - build_and_run_exports(latest_checkpoint, job_dir, mode, hidden_units, learning_rate) + build_and_run_exports(latest_checkpoint, + job_dir, + mode, + hidden_units, + learning_rate) def build_and_run_exports(latest, job_dir, mode, hidden_units, learning_rate): + """Given the latest checkpoint file export the saved model. + + Args: + latest (string): Latest checkpoint file + job_dir (string): Location of checkpoints and model files + mode (string): Train, Eval or Predict + hidden_units (list): Number of hidden units + learning_rate (float): Learning rate for the SGD + """ + prediction_graph = tf.Graph() exporter = tf.saved_model.builder.SavedModelBuilder( os.path.join(job_dir, 'exports', mode)) From 6b2b78642f5ca1f67a3d926e1ae83861c5135fb5 Mon Sep 17 00:00:00 2001 From: Puneith Kaul Date: Thu, 2 Mar 2017 01:42:47 -0800 Subject: [PATCH 21/22] added comments --- census/lowlevel-tf/trainer/task.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/census/lowlevel-tf/trainer/task.py b/census/lowlevel-tf/trainer/task.py index e2cb5d2ff..82dcefcba 100644 --- a/census/lowlevel-tf/trainer/task.py +++ b/census/lowlevel-tf/trainer/task.py @@ -169,13 +169,15 @@ def run(target, tf.logging.info("Created DNN hidden units {}".format(hidden_units)) evaluation_graph = tf.Graph() with evaluation_graph.as_default(): + + # Features and label tensors features, labels = model.input_fn( eval_data_paths, num_epochs=eval_num_epochs, batch_size=eval_batch_size, shuffle=False ) - + # Accuracy and AUROC metrics metric_dict = model.model_fn( model.EVAL, features, @@ -260,7 +262,7 @@ def build_and_run_exports(latest, job_dir, mode, hidden_units, learning_rate): Args: latest (string): Latest checkpoint file job_dir (string): Location of checkpoints and model files - mode (string): Train, Eval or Predict + mode (string): Model export format hidden_units (list): Number of hidden units learning_rate (float): Learning rate for the SGD """ From c959e0229b6f35776cbe0f3a71f75bc42d9b5675 Mon Sep 17 00:00:00 2001 From: Eli Bixby Date: Thu, 2 Mar 2017 10:14:18 -0800 Subject: [PATCH 22/22] Remove key columns as table_initializers cannot be run by prediction service (#4) * workaround for bad init op in ModelServer * Completely remove key columns --- census/lowlevel-tf/trainer/model.py | 21 ++++----------------- census/lowlevel-tf/trainer/task.py | 2 +- 2 files changed, 5 insertions(+), 18 deletions(-) diff --git a/census/lowlevel-tf/trainer/model.py b/census/lowlevel-tf/trainer/model.py index 7060b94d4..cc8b15e43 100644 --- a/census/lowlevel-tf/trainer/model.py +++ b/census/lowlevel-tf/trainer/model.py @@ -35,20 +35,13 @@ [''], [0.], [0.], [0.], [''], ['']] # Categorical columns with vocab size -HASH_BUCKET_COLS = (('education', 16), ('marital_status', 7), +CATEGORICAL_COLS = (('education', 16), ('marital_status', 7), ('relationship', 6), ('workclass', 9), ('occupation', 15), - ('native_country', 42)) -KEY_COLS = (('gender', ('female', 'male')), ('race', ('Amer-Indian-Eskimo', - 'Asian-Pac-Islander', - 'Black', - 'Other', - 'White'))) - + ('native_country', 42), ('gender', 2), ('race', 5)) CONTINUOUS_COLS = ('age', 'education_num', 'capital_gain', 'capital_loss', 'hours_per_week') -CATEGORICAL_COLS = HASH_BUCKET_COLS + tuple((col, len(keys)) for col, keys in KEY_COLS) LABELS = [' <=50K', ' >50K'] LABEL_COLUMN = 'income_bracket' @@ -79,19 +72,13 @@ def model_fn(mode, label_values = tf.constant(LABELS) # Convert categorical (string) values to one_hot values - for col, bucket_size in HASH_BUCKET_COLS: + for col, bucket_size in CATEGORICAL_COLS: features[col] = string_ops.string_to_hash_bucket_fast( features[col], bucket_size) - for col, keys in KEY_COLS: - table = tf.contrib.lookup.string_to_index_table_from_tensor( - tf.constant(keys)) - features[col] = table.lookup(features[col]) - - for col, size in CATEGORICAL_COLS: features[col] = tf.squeeze(tf.one_hot( features[col], - size, + bucket_size, axis=1, dtype=tf.float32), axis=[2]) diff --git a/census/lowlevel-tf/trainer/task.py b/census/lowlevel-tf/trainer/task.py index 82dcefcba..df6db6bea 100644 --- a/census/lowlevel-tf/trainer/task.py +++ b/census/lowlevel-tf/trainer/task.py @@ -305,7 +305,7 @@ def build_and_run_exports(latest, job_dir, mode, hidden_units, learning_rate): signature_def_map={ tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: signature_def }, - main_op=tf.tables_initializer() +# main_op=tf.tables_initializer() ) exporter.save()