From aa132c4f3c91f6b05ec1015febe02bc5214b2e59 Mon Sep 17 00:00:00 2001 From: tmartins Date: Wed, 28 Sep 2016 22:41:47 +0200 Subject: include all python scripts on blog-tutorial-shared plus change in folder organization --- .../src/main/python/vespaModel.py | 397 --------------------- .../src/main/python/__init__.py | 0 .../blog-tutorial-shared/src/main/python/parse.py | 72 ---- .../blog-tutorial-shared/src/python/__init__.py | 0 .../blog-tutorial-shared/src/python/parse.py | 72 ++++ .../blog-tutorial-shared/src/python/vespaModel.py | 397 +++++++++++++++++++++ 6 files changed, 469 insertions(+), 469 deletions(-) delete mode 100755 sample-apps/blog-recommendation/src/main/python/vespaModel.py delete mode 100644 sample-apps/blog-tutorial-shared/src/main/python/__init__.py delete mode 100644 sample-apps/blog-tutorial-shared/src/main/python/parse.py create mode 100644 sample-apps/blog-tutorial-shared/src/python/__init__.py create mode 100644 sample-apps/blog-tutorial-shared/src/python/parse.py create mode 100755 sample-apps/blog-tutorial-shared/src/python/vespaModel.py (limited to 'sample-apps') diff --git a/sample-apps/blog-recommendation/src/main/python/vespaModel.py b/sample-apps/blog-recommendation/src/main/python/vespaModel.py deleted file mode 100755 index fd0718721eb..00000000000 --- a/sample-apps/blog-recommendation/src/main/python/vespaModel.py +++ /dev/null @@ -1,397 +0,0 @@ -#! /Users/tmartins/anaconda/envs/tensorflow/bin/python - -""" -Train a 2 layers neural network to compute the probability of a user -represented by the vector u liking a document represented by the vector d. - -Usage: ./vespaModel.py --product_features_file_path path \ - --user_features_file_path path \ - --dataset_file_path path - -Expected File formats: - -- product_features_file_path contains a file with rows following the JSON format below: - -{"post_id" : 20, - "user_item_cf" : {"user_item_cf:5" : -0.66617566, - "user_item_cf:6" : 0.29197264, - "user_item_cf:1" : -0.15582734, - "user_item_cf:7" : 0.3350679, - "user_item_cf:2" : -0.16676047, - "user_item_cf:9" : -0.31653953, - "user_item_cf:3" : -0.21495385, - "user_item_cf:4" : -0.036676258, - "user_item_cf:8" : 0.122069225, - "user_item_cf:0" : 0.20922394}} - -- user_features_file_path contains a file with rows following the JSON format below: - -{"user_id" : 270, - "user_item_cf" : {"user_item_cf:5" : -0.54011273, - "user_item_cf:6" : 0.2723072, - "user_item_cf:1" : -0.23280832, - "user_item_cf:7" : -0.011183357, - "user_item_cf:2" : -0.3987285, - "user_item_cf:9" : -0.05703937, - "user_item_cf:3" : 0.04699418, - "user_item_cf:4" : 0.06679048, - "user_item_cf:8" : 0.31399783, - "user_item_cf:0" : 0.5000366}} - -- dataset_file_path contains a file with rows containing tab-separated post_id, user_id, label such as the sample below: - -1000054 118475 1 -10001560 666315 0 -10001560 1230226 0 -10001560 561306 1 -""" - - -import tensorflow as tf -import time -import os -import datetime -import json -import numpy as np - -class getData: - """ - Data pre-processing - """ - def __init__(self, product_features_file_path, user_features_file_path, data_set_file_path): - self.product_features_file_path = product_features_file_path - self.user_features_file_path = user_features_file_path - self.data_set_file_path = data_set_file_path - - # Create user and document lookup features - def parse_cf_features(self, json, id_name): - id = json[id_name] - indexes = ['user_item_cf:' + str(x) for x in range(0,10,1)] - values = [json['user_item_cf'][x] for x in indexes] - return [id, values] - - def get_product_features_lookup(self): - product_features = [self.parse_cf_features(json.loads(line), 'post_id') for line in open(self.product_features_file_path)] - return dict(product_features) - - def get_user_features_lookup(self): - user_features = [self.parse_cf_features(json.loads(line), 'user_id') for line in open(self.user_features_file_path)] - return dict(user_features) - - def parse_dataset(self, line, lookup_user_features, lookup_product_features): - info = line.strip("\n").split("\t") - user_id = float(info[0]) - product_id = float(info[1]) - label = int(info[2]) - return lookup_user_features[user_id], lookup_product_features[product_id], [label] - - def prepare_dataset(self): - lookup_product_features = self.get_product_features_lookup() - lookup_user_features = self.get_user_features_lookup() - with open(self.data_set_file_path) as f: - input_u = []; input_d = []; input_y = [] - for line in f: - u, d, y = self.parse_dataset(line, lookup_user_features, lookup_product_features) - input_u.append(u) - input_d.append(d) - input_y.append(y) - input_u = np.array(input_u) - input_d = np.array(input_d) - input_y = np.array(input_y) - return input_u, input_d, input_y - - def create_train_test_sets(self, input_u, input_d, input_y, seed = 10, perc = 0.2): - # Randomly shuffle data - np.random.seed(seed) - shuffle_indices = np.random.permutation(np.arange(len(input_u))) - input_u_shuffled = input_u[shuffle_indices] - input_d_shuffled = input_d[shuffle_indices] - input_y_shuffled = input_y[shuffle_indices] - - # Split train/test set - dev_samples = int(len(input_u_shuffled)*perc) - u_train, u_dev = input_u_shuffled[:-dev_samples], input_u_shuffled[-dev_samples:] - d_train, d_dev = input_d_shuffled[:-dev_samples], input_d_shuffled[-dev_samples:] - y_train, y_dev = input_y_shuffled[:-dev_samples], input_y_shuffled[-dev_samples:] - print("Train/Dev split: {:d}/{:d}".format(len(y_train), len(y_dev))) - - return u_train, u_dev, d_train, d_dev, y_train, y_dev - - def batch_iter(self, data, batch_size, num_epochs, shuffle=True): - """ - Generates a batch iterator for a dataset. - """ - data = np.array(data) - data_size = len(data) - num_batches_per_epoch = int(len(data)/batch_size) + 1 - for epoch in range(num_epochs): - # Shuffle the data at each epoch - if shuffle: - shuffle_indices = np.random.permutation(np.arange(data_size)) - shuffled_data = data[shuffle_indices] - else: - shuffled_data = data - for batch_num in range(num_batches_per_epoch): - start_index = batch_num * batch_size - end_index = min((batch_num + 1) * batch_size, data_size) - yield shuffled_data[start_index:end_index] - -class vespaRunTimeModel: - """ - Model that combine user and document features and needs to be evaluated at query time. - """ - def __init__(self, user_feature_length, doc_feature_length, hidden_length): - - # placeholders - self.input_u = tf.placeholder(tf.float32, [None, user_feature_length], name = 'input_u') - self.input_d = tf.placeholder(tf.float32, [None, doc_feature_length], name = 'input_d') - self.input_y = tf.placeholder(tf.float32, [None, 1], name = 'input_y') - - # merge user and document vector - self.input_concat = tf.concat(1, [self.input_d, self.input_u], name = 'input_concat') - - # hidden layer - self.W_hidden = tf.Variable( - tf.truncated_normal([user_feature_length + - doc_feature_length, hidden_length], stddev=0.1), name = 'W_hidden') - self.b_hidden = tf.Variable(tf.constant(0.1, shape=[hidden_length]), name = 'b_hidden') - - self.hidden_layer = tf.nn.relu(tf.matmul(self.input_concat, self.W_hidden) + self.b_hidden, - name = 'hidden_layer') - - # output layer - self.W_final = tf.Variable( - tf.random_uniform([hidden_length, 1], -0.1, 0.1), - name="W_final") - self.b_final = tf.Variable(tf.zeros([1]), name="b_final") - - self.y = tf.sigmoid(tf.matmul(self.hidden_layer, self.W_final) + self.b_final, name = 'y') - - # prediction based on model output - self.prediction = tf.cast(tf.greater_equal(self.y, 0.5), "float", name = 'prediction') - - # loss function - prob = tf.clip_by_value(self.y,1e-5,1.0 - 1e-5) - self.loss = tf.reduce_mean(- self.input_y * tf.log(prob) - (1 - self.input_y) * tf.log(1 - prob), name = 'loss') - - # accuracy - correct_predictions = tf.equal(self.prediction, self.input_y) - self.accuracy = tf.reduce_mean(tf.cast(correct_predictions, "float"), name="accuracy") - - def train_operation(self, learning_rate): - global_step = tf.Variable(0, name="global_step", trainable=False) - #optimizer = tf.train.GradientDescentOptimizer(learning_rate) - optimizer = tf.train.AdagradOptimizer(learning_rate) - train_op = optimizer.minimize(self.loss, global_step=global_step) - return train_op, global_step - - def create_output_dir(self): - timestamp = str(int(time.time())) - out_dir = os.path.abspath(os.path.join(os.path.curdir, "runs", timestamp)) - print("Writing to {}\n".format(out_dir)) - return out_dir - - def summary_oprations(self): - loss_summary = tf.scalar_summary("loss", self.loss) - acc_summary = tf.scalar_summary("accuracy", self.accuracy) - train_summary_op = tf.merge_summary([loss_summary, acc_summary]) - dev_summary_op = tf.merge_summary([loss_summary, acc_summary]) - return train_summary_op, dev_summary_op - - def train_step(self, u_batch, d_batch, y_batch, writer=None): - """ - A single training step - """ - feed_dict = { - self.input_u: u_batch, - self.input_d: d_batch, - self.input_y: y_batch - } - _, step, summaries, loss, accuracy = sess.run( - [train_op, global_step, train_summary_op, self.loss, self.accuracy], - feed_dict) - time_str = datetime.datetime.now().isoformat() - print("{}: step {}, loss {:g}, acc {:g}".format(time_str, step, loss, accuracy)) - if writer: - writer.add_summary(summaries, step) - - def dev_step(self, u_batch, d_batch, y_batch, writer=None): - """ - Evaluates model on a dev set - """ - feed_dict = { - self.input_u: u_batch, - self.input_d: d_batch, - self.input_y: y_batch - } - step, summaries, loss, accuracy = sess.run( - [global_step, dev_summary_op, self.loss, self.accuracy], - feed_dict) - time_str = datetime.datetime.now().isoformat() - print("{}: step {}, loss {:g}, acc {:g}".format(time_str, step, loss, accuracy)) - if writer: - writer.add_summary(summaries, step) - -class serializeVespaModel: - """ - Serialize TensorFlow variables to Vespa JSON format - - Example: - checkpoint_dir = "./runs/1473845959/checkpoints" - output_dir = "./runs/1473845959/vespa_variables" - - serializer = serializeVespaModel(checkpoint_dir, output_dir) - serializer.serialize_to_disk(variable_name = "W_hidden", dimension_names = ['input', 'hidden']) - serializer.serialize_to_disk(variable_name = "b_hidden", dimension_names = ['hidden']) - serializer.serialize_to_disk(variable_name = "W_final", dimension_names = ['hidden', 'final']) - serializer.serialize_to_disk(variable_name = "b_final", dimension_names = ['final']) - """ - def __init__(self, checkpoint_dir, output_dir): - self.checkpoint_file = tf.train.latest_checkpoint(checkpoint_dir) - self.reader = tf.train.NewCheckpointReader(self.checkpoint_file) - self.output_dir = output_dir - - def write_cell_value(self, variable, dimension_names, dimension_address = None): - if dimension_address is None: - dimension_address = [] - shape = variable.shape - if len(shape) == 1: - count = 0 - cells = [] - for element in variable: - dimension_address.append((dimension_names[0], str(count))) - count += 1 - cells.append({ 'address': dict(dimension_address), "value": float(element) }) - return cells - else: - count = 0 - output = [] - for slice in variable: - dimension_address.append((dimension_names[0], str(count))) - output.extend(self.write_cell_value(slice, dimension_names[1:], dimension_address)) - count += 1 - return output - - def write_to_vespa_json_format(self, variable_name, dimension_names): - variable = self.reader.get_tensor(variable_name) - cells = self.write_cell_value(variable, dimension_names) - return json.dumps({'cells': cells}) - - def serialize_to_disk(self, variable_name, dimension_names): - text_file = open(os.path.join(output_dir, variable_name + ".json"), "w") - text_file.write(serializer.write_to_vespa_json_format(variable_name, dimension_names)) - text_file.close() - - -def task_train(): - # Data - tf.flags.DEFINE_string("product_features_file_path", '', "File containing product features") - tf.flags.DEFINE_string("user_features_file_path", '', "File containing user features") - tf.flags.DEFINE_string("dataset_file_path", '', "File containing labels for each document user pair") - - tf.flags.DEFINE_integer("hidden_length_factor", 2, "The hidden layer has size 'hidden_length_factor * input_vector_length'") - - # Misc Parameters - tf.flags.DEFINE_boolean("allow_soft_placement", True, "Allow device soft device placement") - tf.flags.DEFINE_boolean("log_device_placement", False, "Log placement of ops on devices") - - # Training parameters - tf.flags.DEFINE_float("learning_rate", 0.1, "Gradient Descent learning rate") - - tf.flags.DEFINE_integer("batch_size", 64, "Batch Size (default: 64)") - tf.flags.DEFINE_integer("num_epochs", 200, "Number of training epochs (default: 200)") - tf.flags.DEFINE_integer("evaluate_every", 100, "Evaluate model on dev set after this many steps (default: 100)") - tf.flags.DEFINE_integer("checkpoint_every", 100, "Save model after this many steps (default: 100)") - - FLAGS = tf.flags.FLAGS - FLAGS._parse_flags() - print("\nParameters:") - for attr, value in sorted(FLAGS.__flags.items()): - print("{}={}".format(attr.upper(), value)) - print("") - - # Data preparation - data_pre_processing = getData( - FLAGS.product_features_file_path, - FLAGS.user_features_file_path, - FLAGS.dataset_file_path) - - input_u, input_d, input_y = data_pre_processing.prepare_dataset() - u_train, u_dev, d_train, d_dev, y_train, y_dev = data_pre_processing.create_train_test_sets(input_u, input_d, input_y, seed = 10, perc = 0.2) - - user_feature_length = input_u.shape[1] - doc_feature_length = input_d.shape[1] - - - # Create a graph - with tf.Graph().as_default(): - - # Create a session - session_conf = tf.ConfigProto( - allow_soft_placement=FLAGS.allow_soft_placement, - log_device_placement=FLAGS.log_device_placement) - sess = tf.Session(config=session_conf) - with sess.as_default(): - - # instanciate a model - vespa_model = vespaRunTimeModel(user_feature_length = user_feature_length, - doc_feature_length = doc_feature_length, - hidden_length = FLAGS.hidden_length_factor * (user_feature_length + doc_feature_length)) - - # create a train operation - train_op, global_step = vespa_model.train_operation(learning_rate = FLAGS.learning_rate) - - # Summaries for loss and accuracy - train_summary_op, dev_summary_op = vespa_model.summary_oprations() - - # Output directory for models and summaries - out_dir = vespa_model.create_output_dir() - - # Write train summaries to disk - train_summary_dir = os.path.join(out_dir, "summaries", "train") - train_summary_writer = tf.train.SummaryWriter(train_summary_dir, sess.graph) - - # Dev summaries - dev_summary_dir = os.path.join(out_dir, "summaries", "dev") - dev_summary_writer = tf.train.SummaryWriter(dev_summary_dir, sess.graph) - - # Checkpoint directory. Tensorflow assumes this directory already exists so we need to create it - checkpoint_dir = os.path.abspath(os.path.join(out_dir, "checkpoints")) - checkpoint_prefix = os.path.join(checkpoint_dir, "model") - if not os.path.exists(checkpoint_dir): - os.makedirs(checkpoint_dir) - saver = tf.train.Saver(tf.all_variables()) - - # Initialize all variables - sess.run(tf.initialize_all_variables()) - - # Generate batches - batches = data_pre_processing.batch_iter( - list(zip(u_train, d_train, y_train)), FLAGS.batch_size, FLAGS.num_epochs) - # Training loop. For each batch... - for batch in batches: - u_batch, d_batch, y_batch = zip(*batch) - vespa_model.train_step(u_batch, d_batch, y_batch, writer=train_summary_writer) - current_step = tf.train.global_step(sess, global_step) - if current_step % FLAGS.evaluate_every == 0: - print("\nEvaluation:") - vespa_model.dev_step(u_dev, d_dev, y_dev, writer=dev_summary_writer) - print("") - if current_step % FLAGS.checkpoint_every == 0: - path = saver.save(sess, checkpoint_prefix, global_step=current_step) - print("Saved model checkpoint to {}\n".format(path)) - -if __name__ == "__main__": - - # Task - tf.flags.DEFINE_string("task", 'train', "Train a model from scratch") - - FLAGS = tf.flags.FLAGS - FLAGS._parse_flags() - print("\nParameters:") - for attr, value in sorted(FLAGS.__flags.items()): - print("{}={}".format(attr.upper(), value)) - print("") - - if FLAGS.task == "train": - task_train() diff --git a/sample-apps/blog-tutorial-shared/src/main/python/__init__.py b/sample-apps/blog-tutorial-shared/src/main/python/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/sample-apps/blog-tutorial-shared/src/main/python/parse.py b/sample-apps/blog-tutorial-shared/src/main/python/parse.py deleted file mode 100644 index 0d5f892eebc..00000000000 --- a/sample-apps/blog-tutorial-shared/src/main/python/parse.py +++ /dev/null @@ -1,72 +0,0 @@ -import json -import argparse - - -class KaggleRawDataParser: - - popularity = False - raw_data_file = None - total_number_of_likes = 0 - likes_per_blog = {} - - def __init__(self): - parser = argparse.ArgumentParser() - parser.add_argument("-p", "--popularity", action="store_true", help="add 'popularity' field") - parser.add_argument("file", help="location of file to be parsed") - args = parser.parse_args() - - self.popularity = args.popularity - self.raw_data_file = args.file - - def main(self): - if self.popularity: - self.calculate_popularity() - self.parse() - - def calculate_popularity(self): - unparsed_file = open(self.raw_data_file, "r") - - for line in unparsed_file: - data = json.loads(line) - - self.total_number_of_likes += len(data["likes"]) - if data["blog"] in self.likes_per_blog: - self.likes_per_blog[data["blog"]] += len(data["likes"]) - else: - self.likes_per_blog[data["blog"]] = len(data["likes"]) - - unparsed_file.close() - - def parse(self): - unparsed_file = open(self.raw_data_file, "r") - - for line in unparsed_file: - data = json.loads(line) - - parsed_data = { - "put": "id:blog-search:blog_post::" + data["post_id"], - "fields": { - "blogname": data["blogname"], - "post_id": data["post_id"], - "author": data["author"], - "language": data["language"], - "categories": data["categories"], - "title": data["title"], - "blog": data["blog"], - "date_gmt": data["date_gmt"], - "url": data["url"], - "content": data["content"], - "tags": data["tags"], - "date": int(data["date_gmt"][0:4] + data["date_gmt"][5:7] + data["date_gmt"][8:10]) - } - } - if self.popularity: - parsed_data["fields"]["popularity"] = \ - float(self.likes_per_blog[data["blog"]]) / float(self.total_number_of_likes) - - print(json.dumps(parsed_data)) - - unparsed_file.close() - -if __name__ == '__main__': - KaggleRawDataParser().main() diff --git a/sample-apps/blog-tutorial-shared/src/python/__init__.py b/sample-apps/blog-tutorial-shared/src/python/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/sample-apps/blog-tutorial-shared/src/python/parse.py b/sample-apps/blog-tutorial-shared/src/python/parse.py new file mode 100644 index 00000000000..0d5f892eebc --- /dev/null +++ b/sample-apps/blog-tutorial-shared/src/python/parse.py @@ -0,0 +1,72 @@ +import json +import argparse + + +class KaggleRawDataParser: + + popularity = False + raw_data_file = None + total_number_of_likes = 0 + likes_per_blog = {} + + def __init__(self): + parser = argparse.ArgumentParser() + parser.add_argument("-p", "--popularity", action="store_true", help="add 'popularity' field") + parser.add_argument("file", help="location of file to be parsed") + args = parser.parse_args() + + self.popularity = args.popularity + self.raw_data_file = args.file + + def main(self): + if self.popularity: + self.calculate_popularity() + self.parse() + + def calculate_popularity(self): + unparsed_file = open(self.raw_data_file, "r") + + for line in unparsed_file: + data = json.loads(line) + + self.total_number_of_likes += len(data["likes"]) + if data["blog"] in self.likes_per_blog: + self.likes_per_blog[data["blog"]] += len(data["likes"]) + else: + self.likes_per_blog[data["blog"]] = len(data["likes"]) + + unparsed_file.close() + + def parse(self): + unparsed_file = open(self.raw_data_file, "r") + + for line in unparsed_file: + data = json.loads(line) + + parsed_data = { + "put": "id:blog-search:blog_post::" + data["post_id"], + "fields": { + "blogname": data["blogname"], + "post_id": data["post_id"], + "author": data["author"], + "language": data["language"], + "categories": data["categories"], + "title": data["title"], + "blog": data["blog"], + "date_gmt": data["date_gmt"], + "url": data["url"], + "content": data["content"], + "tags": data["tags"], + "date": int(data["date_gmt"][0:4] + data["date_gmt"][5:7] + data["date_gmt"][8:10]) + } + } + if self.popularity: + parsed_data["fields"]["popularity"] = \ + float(self.likes_per_blog[data["blog"]]) / float(self.total_number_of_likes) + + print(json.dumps(parsed_data)) + + unparsed_file.close() + +if __name__ == '__main__': + KaggleRawDataParser().main() diff --git a/sample-apps/blog-tutorial-shared/src/python/vespaModel.py b/sample-apps/blog-tutorial-shared/src/python/vespaModel.py new file mode 100755 index 00000000000..fd0718721eb --- /dev/null +++ b/sample-apps/blog-tutorial-shared/src/python/vespaModel.py @@ -0,0 +1,397 @@ +#! /Users/tmartins/anaconda/envs/tensorflow/bin/python + +""" +Train a 2 layers neural network to compute the probability of a user +represented by the vector u liking a document represented by the vector d. + +Usage: ./vespaModel.py --product_features_file_path path \ + --user_features_file_path path \ + --dataset_file_path path + +Expected File formats: + +- product_features_file_path contains a file with rows following the JSON format below: + +{"post_id" : 20, + "user_item_cf" : {"user_item_cf:5" : -0.66617566, + "user_item_cf:6" : 0.29197264, + "user_item_cf:1" : -0.15582734, + "user_item_cf:7" : 0.3350679, + "user_item_cf:2" : -0.16676047, + "user_item_cf:9" : -0.31653953, + "user_item_cf:3" : -0.21495385, + "user_item_cf:4" : -0.036676258, + "user_item_cf:8" : 0.122069225, + "user_item_cf:0" : 0.20922394}} + +- user_features_file_path contains a file with rows following the JSON format below: + +{"user_id" : 270, + "user_item_cf" : {"user_item_cf:5" : -0.54011273, + "user_item_cf:6" : 0.2723072, + "user_item_cf:1" : -0.23280832, + "user_item_cf:7" : -0.011183357, + "user_item_cf:2" : -0.3987285, + "user_item_cf:9" : -0.05703937, + "user_item_cf:3" : 0.04699418, + "user_item_cf:4" : 0.06679048, + "user_item_cf:8" : 0.31399783, + "user_item_cf:0" : 0.5000366}} + +- dataset_file_path contains a file with rows containing tab-separated post_id, user_id, label such as the sample below: + +1000054 118475 1 +10001560 666315 0 +10001560 1230226 0 +10001560 561306 1 +""" + + +import tensorflow as tf +import time +import os +import datetime +import json +import numpy as np + +class getData: + """ + Data pre-processing + """ + def __init__(self, product_features_file_path, user_features_file_path, data_set_file_path): + self.product_features_file_path = product_features_file_path + self.user_features_file_path = user_features_file_path + self.data_set_file_path = data_set_file_path + + # Create user and document lookup features + def parse_cf_features(self, json, id_name): + id = json[id_name] + indexes = ['user_item_cf:' + str(x) for x in range(0,10,1)] + values = [json['user_item_cf'][x] for x in indexes] + return [id, values] + + def get_product_features_lookup(self): + product_features = [self.parse_cf_features(json.loads(line), 'post_id') for line in open(self.product_features_file_path)] + return dict(product_features) + + def get_user_features_lookup(self): + user_features = [self.parse_cf_features(json.loads(line), 'user_id') for line in open(self.user_features_file_path)] + return dict(user_features) + + def parse_dataset(self, line, lookup_user_features, lookup_product_features): + info = line.strip("\n").split("\t") + user_id = float(info[0]) + product_id = float(info[1]) + label = int(info[2]) + return lookup_user_features[user_id], lookup_product_features[product_id], [label] + + def prepare_dataset(self): + lookup_product_features = self.get_product_features_lookup() + lookup_user_features = self.get_user_features_lookup() + with open(self.data_set_file_path) as f: + input_u = []; input_d = []; input_y = [] + for line in f: + u, d, y = self.parse_dataset(line, lookup_user_features, lookup_product_features) + input_u.append(u) + input_d.append(d) + input_y.append(y) + input_u = np.array(input_u) + input_d = np.array(input_d) + input_y = np.array(input_y) + return input_u, input_d, input_y + + def create_train_test_sets(self, input_u, input_d, input_y, seed = 10, perc = 0.2): + # Randomly shuffle data + np.random.seed(seed) + shuffle_indices = np.random.permutation(np.arange(len(input_u))) + input_u_shuffled = input_u[shuffle_indices] + input_d_shuffled = input_d[shuffle_indices] + input_y_shuffled = input_y[shuffle_indices] + + # Split train/test set + dev_samples = int(len(input_u_shuffled)*perc) + u_train, u_dev = input_u_shuffled[:-dev_samples], input_u_shuffled[-dev_samples:] + d_train, d_dev = input_d_shuffled[:-dev_samples], input_d_shuffled[-dev_samples:] + y_train, y_dev = input_y_shuffled[:-dev_samples], input_y_shuffled[-dev_samples:] + print("Train/Dev split: {:d}/{:d}".format(len(y_train), len(y_dev))) + + return u_train, u_dev, d_train, d_dev, y_train, y_dev + + def batch_iter(self, data, batch_size, num_epochs, shuffle=True): + """ + Generates a batch iterator for a dataset. + """ + data = np.array(data) + data_size = len(data) + num_batches_per_epoch = int(len(data)/batch_size) + 1 + for epoch in range(num_epochs): + # Shuffle the data at each epoch + if shuffle: + shuffle_indices = np.random.permutation(np.arange(data_size)) + shuffled_data = data[shuffle_indices] + else: + shuffled_data = data + for batch_num in range(num_batches_per_epoch): + start_index = batch_num * batch_size + end_index = min((batch_num + 1) * batch_size, data_size) + yield shuffled_data[start_index:end_index] + +class vespaRunTimeModel: + """ + Model that combine user and document features and needs to be evaluated at query time. + """ + def __init__(self, user_feature_length, doc_feature_length, hidden_length): + + # placeholders + self.input_u = tf.placeholder(tf.float32, [None, user_feature_length], name = 'input_u') + self.input_d = tf.placeholder(tf.float32, [None, doc_feature_length], name = 'input_d') + self.input_y = tf.placeholder(tf.float32, [None, 1], name = 'input_y') + + # merge user and document vector + self.input_concat = tf.concat(1, [self.input_d, self.input_u], name = 'input_concat') + + # hidden layer + self.W_hidden = tf.Variable( + tf.truncated_normal([user_feature_length + + doc_feature_length, hidden_length], stddev=0.1), name = 'W_hidden') + self.b_hidden = tf.Variable(tf.constant(0.1, shape=[hidden_length]), name = 'b_hidden') + + self.hidden_layer = tf.nn.relu(tf.matmul(self.input_concat, self.W_hidden) + self.b_hidden, + name = 'hidden_layer') + + # output layer + self.W_final = tf.Variable( + tf.random_uniform([hidden_length, 1], -0.1, 0.1), + name="W_final") + self.b_final = tf.Variable(tf.zeros([1]), name="b_final") + + self.y = tf.sigmoid(tf.matmul(self.hidden_layer, self.W_final) + self.b_final, name = 'y') + + # prediction based on model output + self.prediction = tf.cast(tf.greater_equal(self.y, 0.5), "float", name = 'prediction') + + # loss function + prob = tf.clip_by_value(self.y,1e-5,1.0 - 1e-5) + self.loss = tf.reduce_mean(- self.input_y * tf.log(prob) - (1 - self.input_y) * tf.log(1 - prob), name = 'loss') + + # accuracy + correct_predictions = tf.equal(self.prediction, self.input_y) + self.accuracy = tf.reduce_mean(tf.cast(correct_predictions, "float"), name="accuracy") + + def train_operation(self, learning_rate): + global_step = tf.Variable(0, name="global_step", trainable=False) + #optimizer = tf.train.GradientDescentOptimizer(learning_rate) + optimizer = tf.train.AdagradOptimizer(learning_rate) + train_op = optimizer.minimize(self.loss, global_step=global_step) + return train_op, global_step + + def create_output_dir(self): + timestamp = str(int(time.time())) + out_dir = os.path.abspath(os.path.join(os.path.curdir, "runs", timestamp)) + print("Writing to {}\n".format(out_dir)) + return out_dir + + def summary_oprations(self): + loss_summary = tf.scalar_summary("loss", self.loss) + acc_summary = tf.scalar_summary("accuracy", self.accuracy) + train_summary_op = tf.merge_summary([loss_summary, acc_summary]) + dev_summary_op = tf.merge_summary([loss_summary, acc_summary]) + return train_summary_op, dev_summary_op + + def train_step(self, u_batch, d_batch, y_batch, writer=None): + """ + A single training step + """ + feed_dict = { + self.input_u: u_batch, + self.input_d: d_batch, + self.input_y: y_batch + } + _, step, summaries, loss, accuracy = sess.run( + [train_op, global_step, train_summary_op, self.loss, self.accuracy], + feed_dict) + time_str = datetime.datetime.now().isoformat() + print("{}: step {}, loss {:g}, acc {:g}".format(time_str, step, loss, accuracy)) + if writer: + writer.add_summary(summaries, step) + + def dev_step(self, u_batch, d_batch, y_batch, writer=None): + """ + Evaluates model on a dev set + """ + feed_dict = { + self.input_u: u_batch, + self.input_d: d_batch, + self.input_y: y_batch + } + step, summaries, loss, accuracy = sess.run( + [global_step, dev_summary_op, self.loss, self.accuracy], + feed_dict) + time_str = datetime.datetime.now().isoformat() + print("{}: step {}, loss {:g}, acc {:g}".format(time_str, step, loss, accuracy)) + if writer: + writer.add_summary(summaries, step) + +class serializeVespaModel: + """ + Serialize TensorFlow variables to Vespa JSON format + + Example: + checkpoint_dir = "./runs/1473845959/checkpoints" + output_dir = "./runs/1473845959/vespa_variables" + + serializer = serializeVespaModel(checkpoint_dir, output_dir) + serializer.serialize_to_disk(variable_name = "W_hidden", dimension_names = ['input', 'hidden']) + serializer.serialize_to_disk(variable_name = "b_hidden", dimension_names = ['hidden']) + serializer.serialize_to_disk(variable_name = "W_final", dimension_names = ['hidden', 'final']) + serializer.serialize_to_disk(variable_name = "b_final", dimension_names = ['final']) + """ + def __init__(self, checkpoint_dir, output_dir): + self.checkpoint_file = tf.train.latest_checkpoint(checkpoint_dir) + self.reader = tf.train.NewCheckpointReader(self.checkpoint_file) + self.output_dir = output_dir + + def write_cell_value(self, variable, dimension_names, dimension_address = None): + if dimension_address is None: + dimension_address = [] + shape = variable.shape + if len(shape) == 1: + count = 0 + cells = [] + for element in variable: + dimension_address.append((dimension_names[0], str(count))) + count += 1 + cells.append({ 'address': dict(dimension_address), "value": float(element) }) + return cells + else: + count = 0 + output = [] + for slice in variable: + dimension_address.append((dimension_names[0], str(count))) + output.extend(self.write_cell_value(slice, dimension_names[1:], dimension_address)) + count += 1 + return output + + def write_to_vespa_json_format(self, variable_name, dimension_names): + variable = self.reader.get_tensor(variable_name) + cells = self.write_cell_value(variable, dimension_names) + return json.dumps({'cells': cells}) + + def serialize_to_disk(self, variable_name, dimension_names): + text_file = open(os.path.join(output_dir, variable_name + ".json"), "w") + text_file.write(serializer.write_to_vespa_json_format(variable_name, dimension_names)) + text_file.close() + + +def task_train(): + # Data + tf.flags.DEFINE_string("product_features_file_path", '', "File containing product features") + tf.flags.DEFINE_string("user_features_file_path", '', "File containing user features") + tf.flags.DEFINE_string("dataset_file_path", '', "File containing labels for each document user pair") + + tf.flags.DEFINE_integer("hidden_length_factor", 2, "The hidden layer has size 'hidden_length_factor * input_vector_length'") + + # Misc Parameters + tf.flags.DEFINE_boolean("allow_soft_placement", True, "Allow device soft device placement") + tf.flags.DEFINE_boolean("log_device_placement", False, "Log placement of ops on devices") + + # Training parameters + tf.flags.DEFINE_float("learning_rate", 0.1, "Gradient Descent learning rate") + + tf.flags.DEFINE_integer("batch_size", 64, "Batch Size (default: 64)") + tf.flags.DEFINE_integer("num_epochs", 200, "Number of training epochs (default: 200)") + tf.flags.DEFINE_integer("evaluate_every", 100, "Evaluate model on dev set after this many steps (default: 100)") + tf.flags.DEFINE_integer("checkpoint_every", 100, "Save model after this many steps (default: 100)") + + FLAGS = tf.flags.FLAGS + FLAGS._parse_flags() + print("\nParameters:") + for attr, value in sorted(FLAGS.__flags.items()): + print("{}={}".format(attr.upper(), value)) + print("") + + # Data preparation + data_pre_processing = getData( + FLAGS.product_features_file_path, + FLAGS.user_features_file_path, + FLAGS.dataset_file_path) + + input_u, input_d, input_y = data_pre_processing.prepare_dataset() + u_train, u_dev, d_train, d_dev, y_train, y_dev = data_pre_processing.create_train_test_sets(input_u, input_d, input_y, seed = 10, perc = 0.2) + + user_feature_length = input_u.shape[1] + doc_feature_length = input_d.shape[1] + + + # Create a graph + with tf.Graph().as_default(): + + # Create a session + session_conf = tf.ConfigProto( + allow_soft_placement=FLAGS.allow_soft_placement, + log_device_placement=FLAGS.log_device_placement) + sess = tf.Session(config=session_conf) + with sess.as_default(): + + # instanciate a model + vespa_model = vespaRunTimeModel(user_feature_length = user_feature_length, + doc_feature_length = doc_feature_length, + hidden_length = FLAGS.hidden_length_factor * (user_feature_length + doc_feature_length)) + + # create a train operation + train_op, global_step = vespa_model.train_operation(learning_rate = FLAGS.learning_rate) + + # Summaries for loss and accuracy + train_summary_op, dev_summary_op = vespa_model.summary_oprations() + + # Output directory for models and summaries + out_dir = vespa_model.create_output_dir() + + # Write train summaries to disk + train_summary_dir = os.path.join(out_dir, "summaries", "train") + train_summary_writer = tf.train.SummaryWriter(train_summary_dir, sess.graph) + + # Dev summaries + dev_summary_dir = os.path.join(out_dir, "summaries", "dev") + dev_summary_writer = tf.train.SummaryWriter(dev_summary_dir, sess.graph) + + # Checkpoint directory. Tensorflow assumes this directory already exists so we need to create it + checkpoint_dir = os.path.abspath(os.path.join(out_dir, "checkpoints")) + checkpoint_prefix = os.path.join(checkpoint_dir, "model") + if not os.path.exists(checkpoint_dir): + os.makedirs(checkpoint_dir) + saver = tf.train.Saver(tf.all_variables()) + + # Initialize all variables + sess.run(tf.initialize_all_variables()) + + # Generate batches + batches = data_pre_processing.batch_iter( + list(zip(u_train, d_train, y_train)), FLAGS.batch_size, FLAGS.num_epochs) + # Training loop. For each batch... + for batch in batches: + u_batch, d_batch, y_batch = zip(*batch) + vespa_model.train_step(u_batch, d_batch, y_batch, writer=train_summary_writer) + current_step = tf.train.global_step(sess, global_step) + if current_step % FLAGS.evaluate_every == 0: + print("\nEvaluation:") + vespa_model.dev_step(u_dev, d_dev, y_dev, writer=dev_summary_writer) + print("") + if current_step % FLAGS.checkpoint_every == 0: + path = saver.save(sess, checkpoint_prefix, global_step=current_step) + print("Saved model checkpoint to {}\n".format(path)) + +if __name__ == "__main__": + + # Task + tf.flags.DEFINE_string("task", 'train', "Train a model from scratch") + + FLAGS = tf.flags.FLAGS + FLAGS._parse_flags() + print("\nParameters:") + for attr, value in sorted(FLAGS.__flags.items()): + print("{}={}".format(attr.upper(), value)) + print("") + + if FLAGS.task == "train": + task_train() -- cgit v1.2.3