summaryrefslogtreecommitdiffstats
path: root/sample-apps/blog-tutorial-shared/src/python
diff options
context:
space:
mode:
Diffstat (limited to 'sample-apps/blog-tutorial-shared/src/python')
-rw-r--r--sample-apps/blog-tutorial-shared/src/python/__init__.py0
-rw-r--r--sample-apps/blog-tutorial-shared/src/python/parse.py72
-rwxr-xr-xsample-apps/blog-tutorial-shared/src/python/vespaModel.py397
3 files changed, 469 insertions, 0 deletions
diff --git a/sample-apps/blog-tutorial-shared/src/python/__init__.py b/sample-apps/blog-tutorial-shared/src/python/__init__.py
new file mode 100644
index 00000000000..e69de29bb2d
--- /dev/null
+++ b/sample-apps/blog-tutorial-shared/src/python/__init__.py
diff --git a/sample-apps/blog-tutorial-shared/src/python/parse.py b/sample-apps/blog-tutorial-shared/src/python/parse.py
new file mode 100644
index 00000000000..0d5f892eebc
--- /dev/null
+++ b/sample-apps/blog-tutorial-shared/src/python/parse.py
@@ -0,0 +1,72 @@
+import json
+import argparse
+
+
+class KaggleRawDataParser:
+
+ popularity = False
+ raw_data_file = None
+ total_number_of_likes = 0
+ likes_per_blog = {}
+
+ def __init__(self):
+ parser = argparse.ArgumentParser()
+ parser.add_argument("-p", "--popularity", action="store_true", help="add 'popularity' field")
+ parser.add_argument("file", help="location of file to be parsed")
+ args = parser.parse_args()
+
+ self.popularity = args.popularity
+ self.raw_data_file = args.file
+
+ def main(self):
+ if self.popularity:
+ self.calculate_popularity()
+ self.parse()
+
+ def calculate_popularity(self):
+ unparsed_file = open(self.raw_data_file, "r")
+
+ for line in unparsed_file:
+ data = json.loads(line)
+
+ self.total_number_of_likes += len(data["likes"])
+ if data["blog"] in self.likes_per_blog:
+ self.likes_per_blog[data["blog"]] += len(data["likes"])
+ else:
+ self.likes_per_blog[data["blog"]] = len(data["likes"])
+
+ unparsed_file.close()
+
+ def parse(self):
+ unparsed_file = open(self.raw_data_file, "r")
+
+ for line in unparsed_file:
+ data = json.loads(line)
+
+ parsed_data = {
+ "put": "id:blog-search:blog_post::" + data["post_id"],
+ "fields": {
+ "blogname": data["blogname"],
+ "post_id": data["post_id"],
+ "author": data["author"],
+ "language": data["language"],
+ "categories": data["categories"],
+ "title": data["title"],
+ "blog": data["blog"],
+ "date_gmt": data["date_gmt"],
+ "url": data["url"],
+ "content": data["content"],
+ "tags": data["tags"],
+ "date": int(data["date_gmt"][0:4] + data["date_gmt"][5:7] + data["date_gmt"][8:10])
+ }
+ }
+ if self.popularity:
+ parsed_data["fields"]["popularity"] = \
+ float(self.likes_per_blog[data["blog"]]) / float(self.total_number_of_likes)
+
+ print(json.dumps(parsed_data))
+
+ unparsed_file.close()
+
+if __name__ == '__main__':
+ KaggleRawDataParser().main()
diff --git a/sample-apps/blog-tutorial-shared/src/python/vespaModel.py b/sample-apps/blog-tutorial-shared/src/python/vespaModel.py
new file mode 100755
index 00000000000..fd0718721eb
--- /dev/null
+++ b/sample-apps/blog-tutorial-shared/src/python/vespaModel.py
@@ -0,0 +1,397 @@
+#! /Users/tmartins/anaconda/envs/tensorflow/bin/python
+
+"""
+Train a 2 layers neural network to compute the probability of a user
+represented by the vector u liking a document represented by the vector d.
+
+Usage: ./vespaModel.py --product_features_file_path path \
+ --user_features_file_path path \
+ --dataset_file_path path
+
+Expected File formats:
+
+- product_features_file_path contains a file with rows following the JSON format below:
+
+{"post_id" : 20,
+ "user_item_cf" : {"user_item_cf:5" : -0.66617566,
+ "user_item_cf:6" : 0.29197264,
+ "user_item_cf:1" : -0.15582734,
+ "user_item_cf:7" : 0.3350679,
+ "user_item_cf:2" : -0.16676047,
+ "user_item_cf:9" : -0.31653953,
+ "user_item_cf:3" : -0.21495385,
+ "user_item_cf:4" : -0.036676258,
+ "user_item_cf:8" : 0.122069225,
+ "user_item_cf:0" : 0.20922394}}
+
+- user_features_file_path contains a file with rows following the JSON format below:
+
+{"user_id" : 270,
+ "user_item_cf" : {"user_item_cf:5" : -0.54011273,
+ "user_item_cf:6" : 0.2723072,
+ "user_item_cf:1" : -0.23280832,
+ "user_item_cf:7" : -0.011183357,
+ "user_item_cf:2" : -0.3987285,
+ "user_item_cf:9" : -0.05703937,
+ "user_item_cf:3" : 0.04699418,
+ "user_item_cf:4" : 0.06679048,
+ "user_item_cf:8" : 0.31399783,
+ "user_item_cf:0" : 0.5000366}}
+
+- dataset_file_path contains a file with rows containing tab-separated post_id, user_id, label such as the sample below:
+
+1000054 118475 1
+10001560 666315 0
+10001560 1230226 0
+10001560 561306 1
+"""
+
+
+import tensorflow as tf
+import time
+import os
+import datetime
+import json
+import numpy as np
+
+class getData:
+ """
+ Data pre-processing
+ """
+ def __init__(self, product_features_file_path, user_features_file_path, data_set_file_path):
+ self.product_features_file_path = product_features_file_path
+ self.user_features_file_path = user_features_file_path
+ self.data_set_file_path = data_set_file_path
+
+ # Create user and document lookup features
+ def parse_cf_features(self, json, id_name):
+ id = json[id_name]
+ indexes = ['user_item_cf:' + str(x) for x in range(0,10,1)]
+ values = [json['user_item_cf'][x] for x in indexes]
+ return [id, values]
+
+ def get_product_features_lookup(self):
+ product_features = [self.parse_cf_features(json.loads(line), 'post_id') for line in open(self.product_features_file_path)]
+ return dict(product_features)
+
+ def get_user_features_lookup(self):
+ user_features = [self.parse_cf_features(json.loads(line), 'user_id') for line in open(self.user_features_file_path)]
+ return dict(user_features)
+
+ def parse_dataset(self, line, lookup_user_features, lookup_product_features):
+ info = line.strip("\n").split("\t")
+ user_id = float(info[0])
+ product_id = float(info[1])
+ label = int(info[2])
+ return lookup_user_features[user_id], lookup_product_features[product_id], [label]
+
+ def prepare_dataset(self):
+ lookup_product_features = self.get_product_features_lookup()
+ lookup_user_features = self.get_user_features_lookup()
+ with open(self.data_set_file_path) as f:
+ input_u = []; input_d = []; input_y = []
+ for line in f:
+ u, d, y = self.parse_dataset(line, lookup_user_features, lookup_product_features)
+ input_u.append(u)
+ input_d.append(d)
+ input_y.append(y)
+ input_u = np.array(input_u)
+ input_d = np.array(input_d)
+ input_y = np.array(input_y)
+ return input_u, input_d, input_y
+
+ def create_train_test_sets(self, input_u, input_d, input_y, seed = 10, perc = 0.2):
+ # Randomly shuffle data
+ np.random.seed(seed)
+ shuffle_indices = np.random.permutation(np.arange(len(input_u)))
+ input_u_shuffled = input_u[shuffle_indices]
+ input_d_shuffled = input_d[shuffle_indices]
+ input_y_shuffled = input_y[shuffle_indices]
+
+ # Split train/test set
+ dev_samples = int(len(input_u_shuffled)*perc)
+ u_train, u_dev = input_u_shuffled[:-dev_samples], input_u_shuffled[-dev_samples:]
+ d_train, d_dev = input_d_shuffled[:-dev_samples], input_d_shuffled[-dev_samples:]
+ y_train, y_dev = input_y_shuffled[:-dev_samples], input_y_shuffled[-dev_samples:]
+ print("Train/Dev split: {:d}/{:d}".format(len(y_train), len(y_dev)))
+
+ return u_train, u_dev, d_train, d_dev, y_train, y_dev
+
+ def batch_iter(self, data, batch_size, num_epochs, shuffle=True):
+ """
+ Generates a batch iterator for a dataset.
+ """
+ data = np.array(data)
+ data_size = len(data)
+ num_batches_per_epoch = int(len(data)/batch_size) + 1
+ for epoch in range(num_epochs):
+ # Shuffle the data at each epoch
+ if shuffle:
+ shuffle_indices = np.random.permutation(np.arange(data_size))
+ shuffled_data = data[shuffle_indices]
+ else:
+ shuffled_data = data
+ for batch_num in range(num_batches_per_epoch):
+ start_index = batch_num * batch_size
+ end_index = min((batch_num + 1) * batch_size, data_size)
+ yield shuffled_data[start_index:end_index]
+
+class vespaRunTimeModel:
+ """
+ Model that combine user and document features and needs to be evaluated at query time.
+ """
+ def __init__(self, user_feature_length, doc_feature_length, hidden_length):
+
+ # placeholders
+ self.input_u = tf.placeholder(tf.float32, [None, user_feature_length], name = 'input_u')
+ self.input_d = tf.placeholder(tf.float32, [None, doc_feature_length], name = 'input_d')
+ self.input_y = tf.placeholder(tf.float32, [None, 1], name = 'input_y')
+
+ # merge user and document vector
+ self.input_concat = tf.concat(1, [self.input_d, self.input_u], name = 'input_concat')
+
+ # hidden layer
+ self.W_hidden = tf.Variable(
+ tf.truncated_normal([user_feature_length +
+ doc_feature_length, hidden_length], stddev=0.1), name = 'W_hidden')
+ self.b_hidden = tf.Variable(tf.constant(0.1, shape=[hidden_length]), name = 'b_hidden')
+
+ self.hidden_layer = tf.nn.relu(tf.matmul(self.input_concat, self.W_hidden) + self.b_hidden,
+ name = 'hidden_layer')
+
+ # output layer
+ self.W_final = tf.Variable(
+ tf.random_uniform([hidden_length, 1], -0.1, 0.1),
+ name="W_final")
+ self.b_final = tf.Variable(tf.zeros([1]), name="b_final")
+
+ self.y = tf.sigmoid(tf.matmul(self.hidden_layer, self.W_final) + self.b_final, name = 'y')
+
+ # prediction based on model output
+ self.prediction = tf.cast(tf.greater_equal(self.y, 0.5), "float", name = 'prediction')
+
+ # loss function
+ prob = tf.clip_by_value(self.y,1e-5,1.0 - 1e-5)
+ self.loss = tf.reduce_mean(- self.input_y * tf.log(prob) - (1 - self.input_y) * tf.log(1 - prob), name = 'loss')
+
+ # accuracy
+ correct_predictions = tf.equal(self.prediction, self.input_y)
+ self.accuracy = tf.reduce_mean(tf.cast(correct_predictions, "float"), name="accuracy")
+
+ def train_operation(self, learning_rate):
+ global_step = tf.Variable(0, name="global_step", trainable=False)
+ #optimizer = tf.train.GradientDescentOptimizer(learning_rate)
+ optimizer = tf.train.AdagradOptimizer(learning_rate)
+ train_op = optimizer.minimize(self.loss, global_step=global_step)
+ return train_op, global_step
+
+ def create_output_dir(self):
+ timestamp = str(int(time.time()))
+ out_dir = os.path.abspath(os.path.join(os.path.curdir, "runs", timestamp))
+ print("Writing to {}\n".format(out_dir))
+ return out_dir
+
+ def summary_oprations(self):
+ loss_summary = tf.scalar_summary("loss", self.loss)
+ acc_summary = tf.scalar_summary("accuracy", self.accuracy)
+ train_summary_op = tf.merge_summary([loss_summary, acc_summary])
+ dev_summary_op = tf.merge_summary([loss_summary, acc_summary])
+ return train_summary_op, dev_summary_op
+
+ def train_step(self, u_batch, d_batch, y_batch, writer=None):
+ """
+ A single training step
+ """
+ feed_dict = {
+ self.input_u: u_batch,
+ self.input_d: d_batch,
+ self.input_y: y_batch
+ }
+ _, step, summaries, loss, accuracy = sess.run(
+ [train_op, global_step, train_summary_op, self.loss, self.accuracy],
+ feed_dict)
+ time_str = datetime.datetime.now().isoformat()
+ print("{}: step {}, loss {:g}, acc {:g}".format(time_str, step, loss, accuracy))
+ if writer:
+ writer.add_summary(summaries, step)
+
+ def dev_step(self, u_batch, d_batch, y_batch, writer=None):
+ """
+ Evaluates model on a dev set
+ """
+ feed_dict = {
+ self.input_u: u_batch,
+ self.input_d: d_batch,
+ self.input_y: y_batch
+ }
+ step, summaries, loss, accuracy = sess.run(
+ [global_step, dev_summary_op, self.loss, self.accuracy],
+ feed_dict)
+ time_str = datetime.datetime.now().isoformat()
+ print("{}: step {}, loss {:g}, acc {:g}".format(time_str, step, loss, accuracy))
+ if writer:
+ writer.add_summary(summaries, step)
+
+class serializeVespaModel:
+ """
+ Serialize TensorFlow variables to Vespa JSON format
+
+ Example:
+ checkpoint_dir = "./runs/1473845959/checkpoints"
+ output_dir = "./runs/1473845959/vespa_variables"
+
+ serializer = serializeVespaModel(checkpoint_dir, output_dir)
+ serializer.serialize_to_disk(variable_name = "W_hidden", dimension_names = ['input', 'hidden'])
+ serializer.serialize_to_disk(variable_name = "b_hidden", dimension_names = ['hidden'])
+ serializer.serialize_to_disk(variable_name = "W_final", dimension_names = ['hidden', 'final'])
+ serializer.serialize_to_disk(variable_name = "b_final", dimension_names = ['final'])
+ """
+ def __init__(self, checkpoint_dir, output_dir):
+ self.checkpoint_file = tf.train.latest_checkpoint(checkpoint_dir)
+ self.reader = tf.train.NewCheckpointReader(self.checkpoint_file)
+ self.output_dir = output_dir
+
+ def write_cell_value(self, variable, dimension_names, dimension_address = None):
+ if dimension_address is None:
+ dimension_address = []
+ shape = variable.shape
+ if len(shape) == 1:
+ count = 0
+ cells = []
+ for element in variable:
+ dimension_address.append((dimension_names[0], str(count)))
+ count += 1
+ cells.append({ 'address': dict(dimension_address), "value": float(element) })
+ return cells
+ else:
+ count = 0
+ output = []
+ for slice in variable:
+ dimension_address.append((dimension_names[0], str(count)))
+ output.extend(self.write_cell_value(slice, dimension_names[1:], dimension_address))
+ count += 1
+ return output
+
+ def write_to_vespa_json_format(self, variable_name, dimension_names):
+ variable = self.reader.get_tensor(variable_name)
+ cells = self.write_cell_value(variable, dimension_names)
+ return json.dumps({'cells': cells})
+
+ def serialize_to_disk(self, variable_name, dimension_names):
+ text_file = open(os.path.join(output_dir, variable_name + ".json"), "w")
+ text_file.write(serializer.write_to_vespa_json_format(variable_name, dimension_names))
+ text_file.close()
+
+
+def task_train():
+ # Data
+ tf.flags.DEFINE_string("product_features_file_path", '', "File containing product features")
+ tf.flags.DEFINE_string("user_features_file_path", '', "File containing user features")
+ tf.flags.DEFINE_string("dataset_file_path", '', "File containing labels for each document user pair")
+
+ tf.flags.DEFINE_integer("hidden_length_factor", 2, "The hidden layer has size 'hidden_length_factor * input_vector_length'")
+
+ # Misc Parameters
+ tf.flags.DEFINE_boolean("allow_soft_placement", True, "Allow device soft device placement")
+ tf.flags.DEFINE_boolean("log_device_placement", False, "Log placement of ops on devices")
+
+ # Training parameters
+ tf.flags.DEFINE_float("learning_rate", 0.1, "Gradient Descent learning rate")
+
+ tf.flags.DEFINE_integer("batch_size", 64, "Batch Size (default: 64)")
+ tf.flags.DEFINE_integer("num_epochs", 200, "Number of training epochs (default: 200)")
+ tf.flags.DEFINE_integer("evaluate_every", 100, "Evaluate model on dev set after this many steps (default: 100)")
+ tf.flags.DEFINE_integer("checkpoint_every", 100, "Save model after this many steps (default: 100)")
+
+ FLAGS = tf.flags.FLAGS
+ FLAGS._parse_flags()
+ print("\nParameters:")
+ for attr, value in sorted(FLAGS.__flags.items()):
+ print("{}={}".format(attr.upper(), value))
+ print("")
+
+ # Data preparation
+ data_pre_processing = getData(
+ FLAGS.product_features_file_path,
+ FLAGS.user_features_file_path,
+ FLAGS.dataset_file_path)
+
+ input_u, input_d, input_y = data_pre_processing.prepare_dataset()
+ u_train, u_dev, d_train, d_dev, y_train, y_dev = data_pre_processing.create_train_test_sets(input_u, input_d, input_y, seed = 10, perc = 0.2)
+
+ user_feature_length = input_u.shape[1]
+ doc_feature_length = input_d.shape[1]
+
+
+ # Create a graph
+ with tf.Graph().as_default():
+
+ # Create a session
+ session_conf = tf.ConfigProto(
+ allow_soft_placement=FLAGS.allow_soft_placement,
+ log_device_placement=FLAGS.log_device_placement)
+ sess = tf.Session(config=session_conf)
+ with sess.as_default():
+
+ # instanciate a model
+ vespa_model = vespaRunTimeModel(user_feature_length = user_feature_length,
+ doc_feature_length = doc_feature_length,
+ hidden_length = FLAGS.hidden_length_factor * (user_feature_length + doc_feature_length))
+
+ # create a train operation
+ train_op, global_step = vespa_model.train_operation(learning_rate = FLAGS.learning_rate)
+
+ # Summaries for loss and accuracy
+ train_summary_op, dev_summary_op = vespa_model.summary_oprations()
+
+ # Output directory for models and summaries
+ out_dir = vespa_model.create_output_dir()
+
+ # Write train summaries to disk
+ train_summary_dir = os.path.join(out_dir, "summaries", "train")
+ train_summary_writer = tf.train.SummaryWriter(train_summary_dir, sess.graph)
+
+ # Dev summaries
+ dev_summary_dir = os.path.join(out_dir, "summaries", "dev")
+ dev_summary_writer = tf.train.SummaryWriter(dev_summary_dir, sess.graph)
+
+ # Checkpoint directory. Tensorflow assumes this directory already exists so we need to create it
+ checkpoint_dir = os.path.abspath(os.path.join(out_dir, "checkpoints"))
+ checkpoint_prefix = os.path.join(checkpoint_dir, "model")
+ if not os.path.exists(checkpoint_dir):
+ os.makedirs(checkpoint_dir)
+ saver = tf.train.Saver(tf.all_variables())
+
+ # Initialize all variables
+ sess.run(tf.initialize_all_variables())
+
+ # Generate batches
+ batches = data_pre_processing.batch_iter(
+ list(zip(u_train, d_train, y_train)), FLAGS.batch_size, FLAGS.num_epochs)
+ # Training loop. For each batch...
+ for batch in batches:
+ u_batch, d_batch, y_batch = zip(*batch)
+ vespa_model.train_step(u_batch, d_batch, y_batch, writer=train_summary_writer)
+ current_step = tf.train.global_step(sess, global_step)
+ if current_step % FLAGS.evaluate_every == 0:
+ print("\nEvaluation:")
+ vespa_model.dev_step(u_dev, d_dev, y_dev, writer=dev_summary_writer)
+ print("")
+ if current_step % FLAGS.checkpoint_every == 0:
+ path = saver.save(sess, checkpoint_prefix, global_step=current_step)
+ print("Saved model checkpoint to {}\n".format(path))
+
+if __name__ == "__main__":
+
+ # Task
+ tf.flags.DEFINE_string("task", 'train', "Train a model from scratch")
+
+ FLAGS = tf.flags.FLAGS
+ FLAGS._parse_flags()
+ print("\nParameters:")
+ for attr, value in sorted(FLAGS.__flags.items()):
+ print("{}={}".format(attr.upper(), value))
+ print("")
+
+ if FLAGS.task == "train":
+ task_train()