diff options
author | tmartins <thigm85@gmail.com> | 2020-09-01 19:36:05 +0200 |
---|---|---|
committer | tmartins <thigm85@gmail.com> | 2020-09-01 19:36:05 +0200 |
commit | b32126c1383cd3c12e64096e329252748bd7f644 (patch) | |
tree | 99d6a4211a5acd9d2a6579ea4f9e28efbb1fb4c3 /python | |
parent | 4772c87da9ac3350d9c28291e76c17ec576f1fbf (diff) | |
parent | 1838608019b8004a6a86c6b6c530a60bc151f633 (diff) |
Merge branch 'master' into tgm/reference-doc
Diffstat (limited to 'python')
-rw-r--r-- | python/vespa/setup.py | 2 | ||||
-rw-r--r-- | python/vespa/vespa/application.py | 8 | ||||
-rw-r--r-- | python/vespa/vespa/package.py | 376 |
3 files changed, 353 insertions, 33 deletions
diff --git a/python/vespa/setup.py b/python/vespa/setup.py index c9ee130c197..8a1bcd0a6b6 100644 --- a/python/vespa/setup.py +++ b/python/vespa/setup.py @@ -23,7 +23,7 @@ setuptools.setup( ), packages=setuptools.find_packages(), include_package_data=True, - install_requires=["requests", "pandas", "docker", "jinja2"], + install_requires=["requests", "pandas", "docker", "jinja2", "cryptography"], python_requires=">=3.6", zip_safe=False, data_files=[ diff --git a/python/vespa/vespa/application.py b/python/vespa/vespa/application.py index 3ab31c4dd8c..6ccf5e8d854 100644 --- a/python/vespa/vespa/application.py +++ b/python/vespa/vespa/application.py @@ -15,6 +15,7 @@ class Vespa(object): url: str, port: Optional[int] = None, deployment_message: Optional[List[str]] = None, + cert: Optional[str] = None, ) -> None: """ Establish a connection with a Vespa application. @@ -22,14 +23,17 @@ class Vespa(object): :param url: URL :param port: Port :param deployment_message: Message returned by Vespa engine after deployment. + :param cert: Path to certificate and key file >>> Vespa(url = "https://cord19.vespa.ai") >>> Vespa(url = "http://localhost", port = 8080) + >>> Vespa(url = "https://api.vespa-external.aws.oath.cloud", port = 4443, cert = "/path/to/cert-and-key.pem") """ self.url = url self.port = port self.deployment_message = deployment_message + self.cert = cert if port is None: self.end_point = self.url @@ -87,7 +91,7 @@ class Vespa(object): if debug_request: return VespaResult(vespa_result={}, request_body=body) else: - r = post(self.search_end_point, json=body) + r = post(self.search_end_point, json=body, cert=self.cert) return VespaResult(vespa_result=r.json()) def feed_data_point(self, schema: str, data_id: str, fields: Dict) -> Response: @@ -103,7 +107,7 @@ class Vespa(object): self.end_point, schema, schema, str(data_id) ) vespa_format = {"fields": fields} - response = post(end_point, json=vespa_format) + response = post(end_point, json=vespa_format, cert=self.cert) return response def collect_training_data_point( diff --git a/python/vespa/vespa/package.py b/python/vespa/vespa/package.py index 4b5d1e701d5..301a9df43bd 100644 --- a/python/vespa/vespa/package.py +++ b/python/vespa/vespa/package.py @@ -1,11 +1,23 @@ +import sys +import http.client +import json import os import re -from time import sleep -from typing import List, Mapping, Optional +import zipfile +from base64 import standard_b64encode +from datetime import datetime, timedelta +from io import BytesIO from pathlib import Path +from time import sleep, strftime, gmtime +from typing import List, Mapping, Optional, IO -from jinja2 import Environment, PackageLoader, select_autoescape import docker +from cryptography import x509 +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives import serialization +from jinja2 import Environment, PackageLoader, select_autoescape from vespa.json_serialization import ToJson, FromJson from vespa.application import Vespa @@ -300,7 +312,9 @@ class ApplicationPackage(ToJson, FromJson["ApplicationPackage"]): env = Environment( loader=PackageLoader("vespa", "templates"), autoescape=select_autoescape( - disabled_extensions=("txt",), default_for_string=True, default=True, + disabled_extensions=("txt",), + default_for_string=True, + default=True, ), ) env.trim_blocks = True @@ -319,7 +333,9 @@ class ApplicationPackage(ToJson, FromJson["ApplicationPackage"]): env = Environment( loader=PackageLoader("vespa", "templates"), autoescape=select_autoescape( - disabled_extensions=("txt",), default_for_string=True, default=True, + disabled_extensions=("txt",), + default_for_string=True, + default=True, ), ) env.trim_blocks = True @@ -332,32 +348,19 @@ class ApplicationPackage(ToJson, FromJson["ApplicationPackage"]): env = Environment( loader=PackageLoader("vespa", "templates"), autoescape=select_autoescape( - disabled_extensions=("txt",), default_for_string=True, default=True, + disabled_extensions=("txt",), + default_for_string=True, + default=True, ), ) env.trim_blocks = True env.lstrip_blocks = True schema_template = env.get_template("services.xml") return schema_template.render( - application_name=self.name, document_name=self.schema.name, + application_name=self.name, + document_name=self.schema.name, ) - def create_application_package_files(self, dir_path): - Path(os.path.join(dir_path, "application/schemas")).mkdir( - parents=True, exist_ok=True - ) - with open( - os.path.join( - dir_path, "application/schemas/{}.sd".format(self.schema.name) - ), - "w", - ) as f: - f.write(self.schema_to_text) - with open(os.path.join(dir_path, "application/hosts.xml"), "w") as f: - f.write(self.hosts_to_text) - with open(os.path.join(dir_path, "application/services.xml"), "w") as f: - f.write(self.services_to_text) - @staticmethod def from_dict(mapping: Mapping) -> "ApplicationPackage": schema = mapping.get("schema", None) @@ -384,17 +387,23 @@ class ApplicationPackage(ToJson, FromJson["ApplicationPackage"]): class VespaDocker(object): - def __init__(self, application_package: ApplicationPackage) -> None: + def __init__( + self, + application_package: ApplicationPackage, + output_file: IO = sys.stdout, + ) -> None: """ Deploy application to a Vespa container :param application_package: ApplicationPackage to be deployed. + :param output_file: Output file to write output messages. """ self.application_package = application_package self.container = None self.local_port = 8080 + self.output = output_file - def run_vespa_engine_container(self, disk_folder: str, container_memory: str): + def _run_vespa_engine_container(self, disk_folder: str, container_memory: str): """ Run a vespa container. @@ -418,7 +427,7 @@ class VespaDocker(object): ports={self.local_port: self.local_port, 19112: 19112}, ) - def check_configuration_server(self) -> bool: + def _check_configuration_server(self) -> bool: """ Check if configuration server is running and ready for deployment @@ -434,6 +443,25 @@ class VespaDocker(object): == "HTTP/1.1 200 OK" ) + def _create_application_package_files(self, dir_path): + Path(os.path.join(dir_path, "application/schemas")).mkdir( + parents=True, exist_ok=True + ) + with open( + os.path.join( + dir_path, + "application/schemas/{}.sd".format( + self.application_package.schema.name + ), + ), + "w", + ) as f: + f.write(self.application_package.schema_to_text) + with open(os.path.join(dir_path, "application/hosts.xml"), "w") as f: + f.write(self.application_package.hosts_to_text) + with open(os.path.join(dir_path, "application/services.xml"), "w") as f: + f.write(self.application_package.services_to_text) + def deploy(self, disk_folder: str, container_memory: str = "4G"): """ Deploy the application into a Vespa container. @@ -444,14 +472,14 @@ class VespaDocker(object): :return: a Vespa connection instance. """ - self.application_package.create_application_package_files(dir_path=disk_folder) + self._create_application_package_files(dir_path=disk_folder) - self.run_vespa_engine_container( + self._run_vespa_engine_container( disk_folder=disk_folder, container_memory=container_memory ) - while not self.check_configuration_server(): - print("Waiting for configuration server.") + while not self._check_configuration_server(): + print("Waiting for configuration server.", file=self.output) sleep(5) deployment = self.container.exec_run( @@ -468,3 +496,291 @@ class VespaDocker(object): port=self.local_port, deployment_message=deployment_message, ) + + +class VespaCloud(object): + def __init__( + self, + tenant: str, + application: str, + key_location: str, + application_package: ApplicationPackage, + output_file: IO = sys.stdout, + ) -> None: + """ + Deploy application to the Vespa Cloud (cloud.vespa.ai) + + :param tenant: Tenant name registered in the Vespa Cloud. + :param application: Application name registered in the Vespa Cloud. + :param key_location: Location of the private key used for signing HTTP requests to the Vespa Cloud. + :param application_package: ApplicationPackage to be deployed. + :param output_file: Output file to write output messages. + """ + self.tenant = tenant + self.application = application + self.application_package = application_package + self.api_key = self._read_private_key(key_location) + self.api_public_key_bytes = standard_b64encode( + self.api_key.public_key().public_bytes( + serialization.Encoding.PEM, + serialization.PublicFormat.SubjectPublicKeyInfo, + ) + ) + self.data_key, self.data_certificate = self._create_certificate_pair() + self.private_cert_file_name = "private_cert.txt" + self.connection = http.client.HTTPSConnection( + "api.vespa-external.aws.oath.cloud", 4443 + ) + self.output = output_file + + @staticmethod + def _read_private_key(key_location: str) -> ec.EllipticCurvePrivateKey: + with open(key_location, "rb") as key_data: + key = serialization.load_pem_private_key( + key_data.read(), None, default_backend() + ) + if not isinstance(key, ec.EllipticCurvePrivateKey): + raise TypeError( + "Key at " + key_location + " must be an elliptic curve private key" + ) + return key + + def _write_private_key_and_cert( + self, key: ec.EllipticCurvePrivateKey, cert: x509.Certificate, disk_folder: str + ) -> None: + cert_file = os.path.join(disk_folder, self.private_cert_file_name) + with open(cert_file, "w+") as file: + file.write( + key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.TraditionalOpenSSL, + serialization.NoEncryption(), + ).decode("UTF-8") + ) + file.write(cert.public_bytes(serialization.Encoding.PEM).decode("UTF-8")) + + @staticmethod + def _create_certificate_pair() -> (ec.EllipticCurvePrivateKey, x509.Certificate): + key = ec.generate_private_key(ec.SECP384R1, default_backend()) + name = x509.Name([x509.NameAttribute(x509.NameOID.COMMON_NAME, u"localhost")]) + certificate = ( + x509.CertificateBuilder() + .subject_name(name) + .issuer_name(name) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.utcnow() - timedelta(minutes=1)) + .not_valid_after(datetime.utcnow() + timedelta(days=7)) + .public_key(key.public_key()) + .sign(key, hashes.SHA256(), default_backend()) + ) + return (key, certificate) + + def _request( + self, method: str, path: str, body: BytesIO = BytesIO(), headers={} + ) -> dict: + digest = hashes.Hash(hashes.SHA256(), default_backend()) + body.seek(0) + digest.update(body.read()) + content_hash = standard_b64encode(digest.finalize()).decode("UTF-8") + timestamp = ( + datetime.utcnow().isoformat() + "Z" + ) # Java's Instant.parse requires the neutral time zone appended + url = "https://" + self.connection.host + ":" + str(self.connection.port) + path + + canonical_message = method + "\n" + url + "\n" + timestamp + "\n" + content_hash + signature = self.api_key.sign( + canonical_message.encode("UTF-8"), ec.ECDSA(hashes.SHA256()) + ) + + headers = { + "X-Timestamp": timestamp, + "X-Content-Hash": content_hash, + "X-Key-Id": self.tenant + ":" + self.application + ":" + "default", + "X-Key": self.api_public_key_bytes, + "X-Authorization": standard_b64encode(signature), + **headers, + } + + body.seek(0) + self.connection.request(method, path, body, headers) + with self.connection.getresponse() as response: + parsed = json.load(response) + if response.status != 200: + raise RuntimeError( + "Status code " + + str(response.status) + + " doing " + + method + + " at " + + url + + ":\n" + + parsed["message"] + ) + return parsed + + def _get_dev_region(self) -> str: + return self._request("GET", "/zone/v1/environment/dev/default")["name"] + + def _get_endpoint(self, instance: str, region: str) -> str: + endpoints = self._request( + "GET", + "/application/v4/tenant/{}/application/{}/instance/{}/environment/dev/region/{}".format( + self.tenant, self.application, instance, region + ), + )["endpoints"] + container_url = [ + endpoint["url"] + for endpoint in endpoints + if endpoint["cluster"] + == "{}_container".format(self.application_package.name) + ] + if not container_url: + raise RuntimeError("No endpoints found for container 'test_app_container'") + return container_url[0] + + def _to_application_zip(self) -> BytesIO: + buffer = BytesIO() + with zipfile.ZipFile(buffer, "a") as zip_archive: + zip_archive.writestr( + "application/schemas/{}.sd".format( + self.application_package.schema.name + ), + self.application_package.schema_to_text, + ) + zip_archive.writestr( + "application/services.xml", self.application_package.services_to_text + ) + zip_archive.writestr( + "application/security/clients.pem", + self.data_certificate.public_bytes(serialization.Encoding.PEM), + ) + + return buffer + + def _start_deployment(self, instance: str, job: str, disk_folder: str) -> int: + deploy_path = ( + "/application/v4/tenant/{}/application/{}/instance/{}/deploy/{}".format( + self.tenant, self.application, instance, job + ) + ) + + application_zip_bytes = self._to_application_zip() + + self._write_private_key_and_cert( + self.data_key, self.data_certificate, disk_folder + ) + + response = self._request( + "POST", + deploy_path, + application_zip_bytes, + {"Content-Type": "application/zip"}, + ) + print(response["message"], file=self.output) + return response["run"] + + def _get_deployment_status( + self, instance: str, job: str, run: int, last: int + ) -> (str, int): + + update = self._request( + "GET", + "/application/v4/tenant/{}/application/{}/instance/{}/job/{}/run/{}?after={}".format( + self.tenant, self.application, instance, job, run, last + ), + ) + + for step, entries in update["log"].items(): + for entry in entries: + self._print_log_entry(step, entry) + last = update.get("lastId", last) + + fail_status_message = { + "error": "Unexpected error during deployment; see log for details", + "aborted": "Deployment was aborted, probably by a newer deployment", + "outOfCapacity": "No capacity left in zone; please contact the Vespa team", + "deploymentFailed": "Deployment failed; see log for details", + "installationFailed": "Installation failed; see Vespa log for details", + "running": "Deployment not completed", + "endpointCertificateTimeout": "Endpoint certificate not ready in time; please contact Vespa team", + "testFailure": "Unexpected status; tests are not run for manual deployments", + } + + if update["active"]: + return "active", last + else: + status = update["status"] + if status == "success": + return "success", last + elif status in fail_status_message.keys(): + raise RuntimeError(fail_status_message[status]) + else: + raise RuntimeError("Unexpected status: {}".format(status)) + + def _follow_deployment(self, instance: str, job: str, run: int) -> None: + last = -1 + while True: + try: + status, last = self._get_deployment_status(instance, job, run, last) + except RuntimeError: + raise + + if status == "active": + sleep(1) + elif status == "success": + return + else: + raise RuntimeError("Unexpected status: {}".format(status)) + + def _print_log_entry(self, step: str, entry: dict): + timestamp = strftime("%H:%M:%S", gmtime(entry["at"] / 1e3)) + message = entry["message"].replace("\n", "\n" + " " * 23) + if step != "copyVespaLogs" or entry["type"] == "error": + print( + "{:<7} [{}] {}".format(entry["type"].upper(), timestamp, message), + file=self.output, + ) + + def deploy(self, instance: str, disk_folder: str) -> Vespa: + """ + Deploy the given application package as the given instance in the Vespa Cloud dev environment. + + :param instance: Name of this instance of the application, in the Vespa Cloud. + :param disk_folder: Disk folder to save the required Vespa config files. + + :return: a Vespa connection instance. + """ + region = self._get_dev_region() + job = "dev-" + region + run = self._start_deployment(instance, job, disk_folder) + self._follow_deployment(instance, job, run) + endpoint_url = self._get_endpoint(instance=instance, region=region) + return Vespa( + url=endpoint_url, + cert=os.path.join(disk_folder, self.private_cert_file_name), + ) + + def delete(self, instance: str): + """ + Delete the specified instance from the dev environment in the Vespa Cloud. + :param instance: Name of the instance to delete. + :return: + """ + print( + self._request( + "DELETE", + "/application/v4/tenant/{}/application/{}/instance/{}/environment/dev/region/{}".format( + self.tenant, self.application, instance, self._get_dev_region() + ), + )["message"], + file=self.output, + ) + + def close(self): + self.connection.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() |