summaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
authortmartins <thigm85@gmail.com>2020-09-01 19:36:05 +0200
committertmartins <thigm85@gmail.com>2020-09-01 19:36:05 +0200
commitb32126c1383cd3c12e64096e329252748bd7f644 (patch)
tree99d6a4211a5acd9d2a6579ea4f9e28efbb1fb4c3 /python
parent4772c87da9ac3350d9c28291e76c17ec576f1fbf (diff)
parent1838608019b8004a6a86c6b6c530a60bc151f633 (diff)
Merge branch 'master' into tgm/reference-doc
Diffstat (limited to 'python')
-rw-r--r--python/vespa/setup.py2
-rw-r--r--python/vespa/vespa/application.py8
-rw-r--r--python/vespa/vespa/package.py376
3 files changed, 353 insertions, 33 deletions
diff --git a/python/vespa/setup.py b/python/vespa/setup.py
index c9ee130c197..8a1bcd0a6b6 100644
--- a/python/vespa/setup.py
+++ b/python/vespa/setup.py
@@ -23,7 +23,7 @@ setuptools.setup(
),
packages=setuptools.find_packages(),
include_package_data=True,
- install_requires=["requests", "pandas", "docker", "jinja2"],
+ install_requires=["requests", "pandas", "docker", "jinja2", "cryptography"],
python_requires=">=3.6",
zip_safe=False,
data_files=[
diff --git a/python/vespa/vespa/application.py b/python/vespa/vespa/application.py
index 3ab31c4dd8c..6ccf5e8d854 100644
--- a/python/vespa/vespa/application.py
+++ b/python/vespa/vespa/application.py
@@ -15,6 +15,7 @@ class Vespa(object):
url: str,
port: Optional[int] = None,
deployment_message: Optional[List[str]] = None,
+ cert: Optional[str] = None,
) -> None:
"""
Establish a connection with a Vespa application.
@@ -22,14 +23,17 @@ class Vespa(object):
:param url: URL
:param port: Port
:param deployment_message: Message returned by Vespa engine after deployment.
+ :param cert: Path to certificate and key file
>>> Vespa(url = "https://cord19.vespa.ai")
>>> Vespa(url = "http://localhost", port = 8080)
+ >>> Vespa(url = "https://api.vespa-external.aws.oath.cloud", port = 4443, cert = "/path/to/cert-and-key.pem")
"""
self.url = url
self.port = port
self.deployment_message = deployment_message
+ self.cert = cert
if port is None:
self.end_point = self.url
@@ -87,7 +91,7 @@ class Vespa(object):
if debug_request:
return VespaResult(vespa_result={}, request_body=body)
else:
- r = post(self.search_end_point, json=body)
+ r = post(self.search_end_point, json=body, cert=self.cert)
return VespaResult(vespa_result=r.json())
def feed_data_point(self, schema: str, data_id: str, fields: Dict) -> Response:
@@ -103,7 +107,7 @@ class Vespa(object):
self.end_point, schema, schema, str(data_id)
)
vespa_format = {"fields": fields}
- response = post(end_point, json=vespa_format)
+ response = post(end_point, json=vespa_format, cert=self.cert)
return response
def collect_training_data_point(
diff --git a/python/vespa/vespa/package.py b/python/vespa/vespa/package.py
index 4b5d1e701d5..301a9df43bd 100644
--- a/python/vespa/vespa/package.py
+++ b/python/vespa/vespa/package.py
@@ -1,11 +1,23 @@
+import sys
+import http.client
+import json
import os
import re
-from time import sleep
-from typing import List, Mapping, Optional
+import zipfile
+from base64 import standard_b64encode
+from datetime import datetime, timedelta
+from io import BytesIO
from pathlib import Path
+from time import sleep, strftime, gmtime
+from typing import List, Mapping, Optional, IO
-from jinja2 import Environment, PackageLoader, select_autoescape
import docker
+from cryptography import x509
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives.asymmetric import ec
+from cryptography.hazmat.primitives import hashes
+from cryptography.hazmat.primitives import serialization
+from jinja2 import Environment, PackageLoader, select_autoescape
from vespa.json_serialization import ToJson, FromJson
from vespa.application import Vespa
@@ -300,7 +312,9 @@ class ApplicationPackage(ToJson, FromJson["ApplicationPackage"]):
env = Environment(
loader=PackageLoader("vespa", "templates"),
autoescape=select_autoescape(
- disabled_extensions=("txt",), default_for_string=True, default=True,
+ disabled_extensions=("txt",),
+ default_for_string=True,
+ default=True,
),
)
env.trim_blocks = True
@@ -319,7 +333,9 @@ class ApplicationPackage(ToJson, FromJson["ApplicationPackage"]):
env = Environment(
loader=PackageLoader("vespa", "templates"),
autoescape=select_autoescape(
- disabled_extensions=("txt",), default_for_string=True, default=True,
+ disabled_extensions=("txt",),
+ default_for_string=True,
+ default=True,
),
)
env.trim_blocks = True
@@ -332,32 +348,19 @@ class ApplicationPackage(ToJson, FromJson["ApplicationPackage"]):
env = Environment(
loader=PackageLoader("vespa", "templates"),
autoescape=select_autoescape(
- disabled_extensions=("txt",), default_for_string=True, default=True,
+ disabled_extensions=("txt",),
+ default_for_string=True,
+ default=True,
),
)
env.trim_blocks = True
env.lstrip_blocks = True
schema_template = env.get_template("services.xml")
return schema_template.render(
- application_name=self.name, document_name=self.schema.name,
+ application_name=self.name,
+ document_name=self.schema.name,
)
- def create_application_package_files(self, dir_path):
- Path(os.path.join(dir_path, "application/schemas")).mkdir(
- parents=True, exist_ok=True
- )
- with open(
- os.path.join(
- dir_path, "application/schemas/{}.sd".format(self.schema.name)
- ),
- "w",
- ) as f:
- f.write(self.schema_to_text)
- with open(os.path.join(dir_path, "application/hosts.xml"), "w") as f:
- f.write(self.hosts_to_text)
- with open(os.path.join(dir_path, "application/services.xml"), "w") as f:
- f.write(self.services_to_text)
-
@staticmethod
def from_dict(mapping: Mapping) -> "ApplicationPackage":
schema = mapping.get("schema", None)
@@ -384,17 +387,23 @@ class ApplicationPackage(ToJson, FromJson["ApplicationPackage"]):
class VespaDocker(object):
- def __init__(self, application_package: ApplicationPackage) -> None:
+ def __init__(
+ self,
+ application_package: ApplicationPackage,
+ output_file: IO = sys.stdout,
+ ) -> None:
"""
Deploy application to a Vespa container
:param application_package: ApplicationPackage to be deployed.
+ :param output_file: Output file to write output messages.
"""
self.application_package = application_package
self.container = None
self.local_port = 8080
+ self.output = output_file
- def run_vespa_engine_container(self, disk_folder: str, container_memory: str):
+ def _run_vespa_engine_container(self, disk_folder: str, container_memory: str):
"""
Run a vespa container.
@@ -418,7 +427,7 @@ class VespaDocker(object):
ports={self.local_port: self.local_port, 19112: 19112},
)
- def check_configuration_server(self) -> bool:
+ def _check_configuration_server(self) -> bool:
"""
Check if configuration server is running and ready for deployment
@@ -434,6 +443,25 @@ class VespaDocker(object):
== "HTTP/1.1 200 OK"
)
+ def _create_application_package_files(self, dir_path):
+ Path(os.path.join(dir_path, "application/schemas")).mkdir(
+ parents=True, exist_ok=True
+ )
+ with open(
+ os.path.join(
+ dir_path,
+ "application/schemas/{}.sd".format(
+ self.application_package.schema.name
+ ),
+ ),
+ "w",
+ ) as f:
+ f.write(self.application_package.schema_to_text)
+ with open(os.path.join(dir_path, "application/hosts.xml"), "w") as f:
+ f.write(self.application_package.hosts_to_text)
+ with open(os.path.join(dir_path, "application/services.xml"), "w") as f:
+ f.write(self.application_package.services_to_text)
+
def deploy(self, disk_folder: str, container_memory: str = "4G"):
"""
Deploy the application into a Vespa container.
@@ -444,14 +472,14 @@ class VespaDocker(object):
:return: a Vespa connection instance.
"""
- self.application_package.create_application_package_files(dir_path=disk_folder)
+ self._create_application_package_files(dir_path=disk_folder)
- self.run_vespa_engine_container(
+ self._run_vespa_engine_container(
disk_folder=disk_folder, container_memory=container_memory
)
- while not self.check_configuration_server():
- print("Waiting for configuration server.")
+ while not self._check_configuration_server():
+ print("Waiting for configuration server.", file=self.output)
sleep(5)
deployment = self.container.exec_run(
@@ -468,3 +496,291 @@ class VespaDocker(object):
port=self.local_port,
deployment_message=deployment_message,
)
+
+
+class VespaCloud(object):
+ def __init__(
+ self,
+ tenant: str,
+ application: str,
+ key_location: str,
+ application_package: ApplicationPackage,
+ output_file: IO = sys.stdout,
+ ) -> None:
+ """
+ Deploy application to the Vespa Cloud (cloud.vespa.ai)
+
+ :param tenant: Tenant name registered in the Vespa Cloud.
+ :param application: Application name registered in the Vespa Cloud.
+ :param key_location: Location of the private key used for signing HTTP requests to the Vespa Cloud.
+ :param application_package: ApplicationPackage to be deployed.
+ :param output_file: Output file to write output messages.
+ """
+ self.tenant = tenant
+ self.application = application
+ self.application_package = application_package
+ self.api_key = self._read_private_key(key_location)
+ self.api_public_key_bytes = standard_b64encode(
+ self.api_key.public_key().public_bytes(
+ serialization.Encoding.PEM,
+ serialization.PublicFormat.SubjectPublicKeyInfo,
+ )
+ )
+ self.data_key, self.data_certificate = self._create_certificate_pair()
+ self.private_cert_file_name = "private_cert.txt"
+ self.connection = http.client.HTTPSConnection(
+ "api.vespa-external.aws.oath.cloud", 4443
+ )
+ self.output = output_file
+
+ @staticmethod
+ def _read_private_key(key_location: str) -> ec.EllipticCurvePrivateKey:
+ with open(key_location, "rb") as key_data:
+ key = serialization.load_pem_private_key(
+ key_data.read(), None, default_backend()
+ )
+ if not isinstance(key, ec.EllipticCurvePrivateKey):
+ raise TypeError(
+ "Key at " + key_location + " must be an elliptic curve private key"
+ )
+ return key
+
+ def _write_private_key_and_cert(
+ self, key: ec.EllipticCurvePrivateKey, cert: x509.Certificate, disk_folder: str
+ ) -> None:
+ cert_file = os.path.join(disk_folder, self.private_cert_file_name)
+ with open(cert_file, "w+") as file:
+ file.write(
+ key.private_bytes(
+ serialization.Encoding.PEM,
+ serialization.PrivateFormat.TraditionalOpenSSL,
+ serialization.NoEncryption(),
+ ).decode("UTF-8")
+ )
+ file.write(cert.public_bytes(serialization.Encoding.PEM).decode("UTF-8"))
+
+ @staticmethod
+ def _create_certificate_pair() -> (ec.EllipticCurvePrivateKey, x509.Certificate):
+ key = ec.generate_private_key(ec.SECP384R1, default_backend())
+ name = x509.Name([x509.NameAttribute(x509.NameOID.COMMON_NAME, u"localhost")])
+ certificate = (
+ x509.CertificateBuilder()
+ .subject_name(name)
+ .issuer_name(name)
+ .serial_number(x509.random_serial_number())
+ .not_valid_before(datetime.utcnow() - timedelta(minutes=1))
+ .not_valid_after(datetime.utcnow() + timedelta(days=7))
+ .public_key(key.public_key())
+ .sign(key, hashes.SHA256(), default_backend())
+ )
+ return (key, certificate)
+
+ def _request(
+ self, method: str, path: str, body: BytesIO = BytesIO(), headers={}
+ ) -> dict:
+ digest = hashes.Hash(hashes.SHA256(), default_backend())
+ body.seek(0)
+ digest.update(body.read())
+ content_hash = standard_b64encode(digest.finalize()).decode("UTF-8")
+ timestamp = (
+ datetime.utcnow().isoformat() + "Z"
+ ) # Java's Instant.parse requires the neutral time zone appended
+ url = "https://" + self.connection.host + ":" + str(self.connection.port) + path
+
+ canonical_message = method + "\n" + url + "\n" + timestamp + "\n" + content_hash
+ signature = self.api_key.sign(
+ canonical_message.encode("UTF-8"), ec.ECDSA(hashes.SHA256())
+ )
+
+ headers = {
+ "X-Timestamp": timestamp,
+ "X-Content-Hash": content_hash,
+ "X-Key-Id": self.tenant + ":" + self.application + ":" + "default",
+ "X-Key": self.api_public_key_bytes,
+ "X-Authorization": standard_b64encode(signature),
+ **headers,
+ }
+
+ body.seek(0)
+ self.connection.request(method, path, body, headers)
+ with self.connection.getresponse() as response:
+ parsed = json.load(response)
+ if response.status != 200:
+ raise RuntimeError(
+ "Status code "
+ + str(response.status)
+ + " doing "
+ + method
+ + " at "
+ + url
+ + ":\n"
+ + parsed["message"]
+ )
+ return parsed
+
+ def _get_dev_region(self) -> str:
+ return self._request("GET", "/zone/v1/environment/dev/default")["name"]
+
+ def _get_endpoint(self, instance: str, region: str) -> str:
+ endpoints = self._request(
+ "GET",
+ "/application/v4/tenant/{}/application/{}/instance/{}/environment/dev/region/{}".format(
+ self.tenant, self.application, instance, region
+ ),
+ )["endpoints"]
+ container_url = [
+ endpoint["url"]
+ for endpoint in endpoints
+ if endpoint["cluster"]
+ == "{}_container".format(self.application_package.name)
+ ]
+ if not container_url:
+ raise RuntimeError("No endpoints found for container 'test_app_container'")
+ return container_url[0]
+
+ def _to_application_zip(self) -> BytesIO:
+ buffer = BytesIO()
+ with zipfile.ZipFile(buffer, "a") as zip_archive:
+ zip_archive.writestr(
+ "application/schemas/{}.sd".format(
+ self.application_package.schema.name
+ ),
+ self.application_package.schema_to_text,
+ )
+ zip_archive.writestr(
+ "application/services.xml", self.application_package.services_to_text
+ )
+ zip_archive.writestr(
+ "application/security/clients.pem",
+ self.data_certificate.public_bytes(serialization.Encoding.PEM),
+ )
+
+ return buffer
+
+ def _start_deployment(self, instance: str, job: str, disk_folder: str) -> int:
+ deploy_path = (
+ "/application/v4/tenant/{}/application/{}/instance/{}/deploy/{}".format(
+ self.tenant, self.application, instance, job
+ )
+ )
+
+ application_zip_bytes = self._to_application_zip()
+
+ self._write_private_key_and_cert(
+ self.data_key, self.data_certificate, disk_folder
+ )
+
+ response = self._request(
+ "POST",
+ deploy_path,
+ application_zip_bytes,
+ {"Content-Type": "application/zip"},
+ )
+ print(response["message"], file=self.output)
+ return response["run"]
+
+ def _get_deployment_status(
+ self, instance: str, job: str, run: int, last: int
+ ) -> (str, int):
+
+ update = self._request(
+ "GET",
+ "/application/v4/tenant/{}/application/{}/instance/{}/job/{}/run/{}?after={}".format(
+ self.tenant, self.application, instance, job, run, last
+ ),
+ )
+
+ for step, entries in update["log"].items():
+ for entry in entries:
+ self._print_log_entry(step, entry)
+ last = update.get("lastId", last)
+
+ fail_status_message = {
+ "error": "Unexpected error during deployment; see log for details",
+ "aborted": "Deployment was aborted, probably by a newer deployment",
+ "outOfCapacity": "No capacity left in zone; please contact the Vespa team",
+ "deploymentFailed": "Deployment failed; see log for details",
+ "installationFailed": "Installation failed; see Vespa log for details",
+ "running": "Deployment not completed",
+ "endpointCertificateTimeout": "Endpoint certificate not ready in time; please contact Vespa team",
+ "testFailure": "Unexpected status; tests are not run for manual deployments",
+ }
+
+ if update["active"]:
+ return "active", last
+ else:
+ status = update["status"]
+ if status == "success":
+ return "success", last
+ elif status in fail_status_message.keys():
+ raise RuntimeError(fail_status_message[status])
+ else:
+ raise RuntimeError("Unexpected status: {}".format(status))
+
+ def _follow_deployment(self, instance: str, job: str, run: int) -> None:
+ last = -1
+ while True:
+ try:
+ status, last = self._get_deployment_status(instance, job, run, last)
+ except RuntimeError:
+ raise
+
+ if status == "active":
+ sleep(1)
+ elif status == "success":
+ return
+ else:
+ raise RuntimeError("Unexpected status: {}".format(status))
+
+ def _print_log_entry(self, step: str, entry: dict):
+ timestamp = strftime("%H:%M:%S", gmtime(entry["at"] / 1e3))
+ message = entry["message"].replace("\n", "\n" + " " * 23)
+ if step != "copyVespaLogs" or entry["type"] == "error":
+ print(
+ "{:<7} [{}] {}".format(entry["type"].upper(), timestamp, message),
+ file=self.output,
+ )
+
+ def deploy(self, instance: str, disk_folder: str) -> Vespa:
+ """
+ Deploy the given application package as the given instance in the Vespa Cloud dev environment.
+
+ :param instance: Name of this instance of the application, in the Vespa Cloud.
+ :param disk_folder: Disk folder to save the required Vespa config files.
+
+ :return: a Vespa connection instance.
+ """
+ region = self._get_dev_region()
+ job = "dev-" + region
+ run = self._start_deployment(instance, job, disk_folder)
+ self._follow_deployment(instance, job, run)
+ endpoint_url = self._get_endpoint(instance=instance, region=region)
+ return Vespa(
+ url=endpoint_url,
+ cert=os.path.join(disk_folder, self.private_cert_file_name),
+ )
+
+ def delete(self, instance: str):
+ """
+ Delete the specified instance from the dev environment in the Vespa Cloud.
+ :param instance: Name of the instance to delete.
+ :return:
+ """
+ print(
+ self._request(
+ "DELETE",
+ "/application/v4/tenant/{}/application/{}/instance/{}/environment/dev/region/{}".format(
+ self.tenant, self.application, instance, self._get_dev_region()
+ ),
+ )["message"],
+ file=self.output,
+ )
+
+ def close(self):
+ self.connection.close()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.close()