# Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. import json import argparse import os, sys # Parsing Elastic Search documents to Vespa documents # Example of usage: python ES_Vespa_parser.py my_index.json my_index_mapping.json __author__ = 'henrhoi' class ElasticSearchParser: document_file = None mapping_file = None application_name = None search_definitions = {} path = "" _all = True all_mappings = {} no_index = [] types = [] def __init__(self): parser = argparse.ArgumentParser() parser.add_argument("documents_path", help="location of file with documents to be parsed", type=str) parser.add_argument("mappings_path", help="location of file with mappings", type=str) parser.add_argument("--application_name", help="name of application", default="application_name", type=str) args = parser.parse_args() self.document_file = args.documents_path self.mapping_file = args.mappings_path self.application_name = args.application_name def main(self): self.path = os.getcwd() + "/application/" try: os.mkdir(self.path, 0o777) print(" > Created folder '" + self.path + "'") except OSError: print(" > Folder '" + self.path + "' already existed") try: os.makedirs(self.path + "searchdefinitions/", 0o777) print(" > Created folder '" + self.path + "searchdefinitions/" + "'") except OSError: print(" > Folder '" + self.path + "searchdefinitions/" + "' already existed") self.parse() self.createServices_xml() self.createHosts_xml() def getMapping(self, type): unparsed_mapping_file = open(self.mapping_file, "r") type_mapping = {} for line in unparsed_mapping_file: data = json.loads(line) index = list(data.keys())[0] mappings = data[index]["mappings"][type]["properties"] # Checking if some fields could be no-index try: _all_enabled = data[index]["mappings"][type]["_all"]["enabled"] if not _all_enabled: self._all = False print(" > All fields in the document type '" + type + "' is not searchable. Go inside " + self.path + type + ".sd to add which fields that should be searchable") except KeyError: print(" > All fields in the document type '" + type + "' is searchable") self.walk(mappings, type_mapping, "properties") unparsed_mapping_file.close() if type not in self.search_definitions: self.search_definitions[type] = True self.types.append(type) self.createSearchDefinition(type, type_mapping) # Adding mapping to global map with mappings self.all_mappings[type] = type_mapping return type_mapping def parse(self): file_path = self.path + "documents" + ".json" unparsed_document_file = open(self.document_file, "r") vespa_docs = open(file_path, "w") for line in unparsed_document_file: data = json.loads(line) type = data["_type"] parsed_data = { "put": "id:"+self.application_name+":" + type + "::" + data["_id"], "fields": {} } # Checking for already existing mapping for a type, if not create a new if type in self.all_mappings: mapping = self.all_mappings[type] else: mapping = self.getMapping(type) for key, item in mapping.items(): try: parsed_data["fields"][key] = data["_source"][key] except KeyError: continue json.dump(parsed_data, vespa_docs) vespa_docs.write("\n") vespa_docs.close() unparsed_document_file.close() print(" > Parsed all documents '" + ", ".join(self.types) + "'" + "' at '" + file_path + "'") def createSearchDefinition(self, type, type_mapping): file_path = self.path + "searchdefinitions/" + type + ".sd" new_sd = open(file_path, "w") new_sd.write("search " + type + " {\n") new_sd.write(" document " + type + " {\n") for key, item in type_mapping.items(): new_sd.write(" field " + key + " type " + self.get_type(item) + " {\n") new_sd.write(" indexing: " + self.get_indexing(key, self.get_type(item)) + "\n") new_sd.write(" }\n") new_sd.write(" }\n") new_sd.write("}\n") new_sd.close() print(" > Created search definition for '" + type + "' at '" + file_path + "'") def createServices_xml(self): file_path = self.path + "services.xml" new_services = open(file_path, "w") template = ("" "\n\n" " \n" " \n" " \n" " \n" " \n" " \n" " \n\n" " \n" " 1\n" " \n" " 1.0\n" " \n" " \n") for i in range(0, len(self.types)): template += " \n" template += (" \n" " \n" " \n" " \n" " \n" " \n" " 1\n" " \n" " \n" " \n\n" "") new_services.write(template) new_services.close() print(" > Created services.xml at '" + file_path + "'") def createHosts_xml(self): file_path = self.path + "hosts.xml" new_hosts = open(file_path, "w") template = ("\n" "\n" " \n" " node1\n" " \n" "") new_hosts.write(template) new_hosts.close() print(" > Created hosts.xml at '" + file_path + "'") def get_type(self, type): return { "text": "string", "keyword": "string", "date": "string", "long": "long", "double": "double", "boolean": "string", "ip": "text", "byte": "byte", "float": "float", }[type] def get_indexing(self, key, key_type): if not self._all: return "summary" if key not in self.no_index: if key_type == "string": return "summary | index" else: return "summary | attribute" return "summary" def walk(self, node, mapping, parent): for key, item in node.items(): if isinstance(item, dict): self.walk(item, mapping, key) elif key == "type": mapping[parent] = item elif key == "include_in_all": if not item: # Field should not be searchable self.no_index.append(parent) elif key == "index" and parent != "properties": if item == "no": # Field should not be searchable self.no_index.append(parent) if __name__ == '__main__': ElasticSearchParser().main()