diff options
35 files changed, 1008 insertions, 511 deletions
diff --git a/config-application-package/src/main/java/com/yahoo/config/application/FileSystemWrapper.java b/config-application-package/src/main/java/com/yahoo/config/application/FileSystemWrapper.java new file mode 100644 index 00000000000..8a08c56b3c0 --- /dev/null +++ b/config-application-package/src/main/java/com/yahoo/config/application/FileSystemWrapper.java @@ -0,0 +1,50 @@ +package com.yahoo.config.application; + +import com.yahoo.yolean.function.ThrowingFunction; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Optional; +import java.util.function.Predicate; + +/** + * Wraps a real or virtual file system — essentially a mapping from paths to bytes. + * + * @author jonmv + */ +public class FileSystemWrapper { + + Predicate<Path> existence; + ThrowingFunction<Path, byte[], IOException> reader; + + private FileSystemWrapper(Predicate<Path> existence, ThrowingFunction<Path, byte[], IOException> reader) { + this.existence = existence; + this.reader = reader; + } + + public static FileSystemWrapper ofFiles(Predicate<Path> existence, ThrowingFunction<Path, byte[], IOException> reader) { + return new FileSystemWrapper(existence, reader); + } + + public static FileSystemWrapper getDefault() { + return ofFiles(Files::exists, Files::readAllBytes); + } + + public FileWrapper wrap(Path path) { + return new FileWrapper(path); + } + + + public class FileWrapper { + private final Path path; + private FileWrapper(Path path) { this.path = path; } + + public Path path() { return path; } + public boolean exists() { return existence.test(path); } + public byte[] content() throws IOException { return reader.apply(path); } + public Optional<FileWrapper> parent() { return Optional.ofNullable(path.getParent()).map(path -> wrap(path)); } + public FileWrapper child(String name) { return wrap(path.resolve(name)); } + } + +} diff --git a/config-application-package/src/main/java/com/yahoo/config/application/IncludeProcessor.java b/config-application-package/src/main/java/com/yahoo/config/application/IncludeProcessor.java index c2da2a55b48..b09953c738c 100644 --- a/config-application-package/src/main/java/com/yahoo/config/application/IncludeProcessor.java +++ b/config-application-package/src/main/java/com/yahoo/config/application/IncludeProcessor.java @@ -1,7 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.config.application; -import com.yahoo.io.IOUtils; +import com.yahoo.config.application.FileSystemWrapper.FileWrapper; import com.yahoo.text.XML; import org.w3c.dom.Document; import org.w3c.dom.Element; @@ -14,6 +14,9 @@ import java.io.IOException; import java.io.StringReader; import java.io.StringWriter; import java.util.List; +import java.util.NoSuchElementException; + +import static java.nio.charset.StandardCharsets.UTF_8; /** * Handles preprocess:include statements and returns a Document which has all the include statements resolved @@ -23,10 +26,14 @@ import java.util.List; */ class IncludeProcessor implements PreProcessor { - private final File application; + private final FileWrapper application; + public IncludeProcessor(File application) { - this.application = application; + this(FileSystemWrapper.getDefault().wrap(application.toPath())); + } + public IncludeProcessor(FileWrapper application) { + this.application = application; } public Document process(Document input) throws IOException, TransformerException { @@ -35,17 +42,18 @@ class IncludeProcessor implements PreProcessor { return doc; } - private static void includeFile(File currentFolder, Element currentElement) throws IOException { + private static void includeFile(FileWrapper currentFolder, Element currentElement) throws IOException { NodeList list = currentElement.getElementsByTagNameNS(XmlPreProcessor.preprocessNamespaceUri, "include"); while (list.getLength() > 0) { Element elem = (Element) list.item(0); Element parent = (Element) elem.getParentNode(); String filename = elem.getAttribute("file"); boolean required = ! elem.hasAttribute("required") || Boolean.parseBoolean(elem.getAttribute("required")); - File file = new File(currentFolder, filename); + FileWrapper file = currentFolder.child(filename); Document subFile = IncludeProcessor.parseIncludeFile(file, parent.getTagName(), required); - includeFile(file.getParentFile(), subFile.getDocumentElement()); + includeFile(file.parent().orElseThrow(() -> new NoSuchElementException(file + " has no parent")), + subFile.getDocumentElement()); //System.out.println("document before merging: " + documentAsString(doc)); IncludeProcessor.mergeInto(parent, XML.getChildren(subFile.getDocumentElement())); @@ -65,12 +73,12 @@ class IncludeProcessor implements PreProcessor { } } - private static Document parseIncludeFile(File file, String parentTagName, boolean required) throws IOException { + private static Document parseIncludeFile(FileWrapper file, String parentTagName, boolean required) throws IOException { StringWriter w = new StringWriter(); final String startTag = "<" + parentTagName + " " + XmlPreProcessor.deployNamespace + "='" + XmlPreProcessor.deployNamespaceUri + "' " + XmlPreProcessor.preprocessNamespace + "='" + XmlPreProcessor.preprocessNamespaceUri + "'>"; w.append(startTag); if (file.exists() || required) { - w.append(IOUtils.readFile(file)); + w.append(new String(file.content(), UTF_8)); } final String endTag = "</" + parentTagName + ">"; w.append(endTag); diff --git a/config-application-package/src/main/java/com/yahoo/config/application/XmlPreProcessor.java b/config-application-package/src/main/java/com/yahoo/config/application/XmlPreProcessor.java index c49a799f2b7..61b67dfb5c6 100644 --- a/config-application-package/src/main/java/com/yahoo/config/application/XmlPreProcessor.java +++ b/config-application-package/src/main/java/com/yahoo/config/application/XmlPreProcessor.java @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.config.application; +import com.yahoo.config.application.FileSystemWrapper.FileWrapper; import com.yahoo.config.provision.Environment; import com.yahoo.config.provision.InstanceName; import com.yahoo.config.provision.RegionName; @@ -31,7 +32,7 @@ public class XmlPreProcessor { final static String preprocessNamespace = "xmlns:preprocess"; final static String preprocessNamespaceUri = "properties"; //TODO - private final File applicationDir; + private final FileWrapper applicationDir; private final Reader xmlInput; private final InstanceName instance; private final Environment environment; @@ -43,6 +44,10 @@ public class XmlPreProcessor { } public XmlPreProcessor(File applicationDir, Reader xmlInput, InstanceName instance, Environment environment, RegionName region) { + this(FileSystemWrapper.getDefault().wrap(applicationDir.toPath()), xmlInput, instance, environment, region); + } + + public XmlPreProcessor(FileWrapper applicationDir, Reader xmlInput, InstanceName instance, Environment environment, RegionName region) { this.applicationDir = applicationDir; this.xmlInput = xmlInput; this.instance = instance; diff --git a/container-search/src/main/java/com/yahoo/search/result/FeatureData.java b/container-search/src/main/java/com/yahoo/search/result/FeatureData.java index f0637473ef5..fd41d4ee10c 100644 --- a/container-search/src/main/java/com/yahoo/search/result/FeatureData.java +++ b/container-search/src/main/java/com/yahoo/search/result/FeatureData.java @@ -74,12 +74,15 @@ public class FeatureData implements Inspectable, JsonProducer { * (that is, if it is a tensor with nonzero rank) */ public Double getDouble(String featureName) { - if (decodedDoubles != null && decodedDoubles.containsKey(featureName)) - return decodedDoubles.get(featureName); - Double value = decodeDouble(featureName); if (decodedDoubles == null) decodedDoubles = new HashMap<>(); - decodedDoubles.put(featureName, value); + + Double value = decodedDoubles.get(featureName); + if (value != null) return value; + + value = decodeDouble(featureName); + if (value != null) + decodedDoubles.put(featureName, value); return value; } @@ -99,12 +102,15 @@ public class FeatureData implements Inspectable, JsonProducer { * This will return any feature value: Scalars are returned as a rank 0 tensor. */ public Tensor getTensor(String featureName) { - if (decodedTensors != null && decodedTensors.containsKey(featureName)) - return decodedTensors.get(featureName); - Tensor value = decodeTensor(featureName); if (decodedTensors == null) decodedTensors = new HashMap<>(); - decodedTensors.put(featureName, value); + + Tensor value = decodedTensors.get(featureName); + if (value != null) return value; + + value = decodeTensor(featureName); + if (value != null) + decodedTensors.put(featureName, value); return value; } diff --git a/container-search/src/test/java/com/yahoo/search/result/FeatureDataTestCase.java b/container-search/src/test/java/com/yahoo/search/result/FeatureDataTestCase.java index f9b89b64e5b..3c8e147029c 100644 --- a/container-search/src/test/java/com/yahoo/search/result/FeatureDataTestCase.java +++ b/container-search/src/test/java/com/yahoo/search/result/FeatureDataTestCase.java @@ -11,6 +11,7 @@ import org.junit.Test; import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; /** * @author bratseth @@ -32,14 +33,21 @@ public class FeatureDataTestCase { FeatureData featureData = new FeatureData(new SlimeAdapter(features)); assertEquals("scalar1,scalar2,tensor1,tensor2", featureData.featureNames().stream().sorted().collect(Collectors.joining(","))); + assertNull(featureData.getDouble("nosuch1")); assertEquals(1.5, featureData.getDouble("scalar1"), delta); assertEquals(2.5, featureData.getDouble("scalar2"), delta); assertEquals("Cached lookup", 2.5, featureData.getDouble("scalar2"), delta); + assertNull(featureData.getDouble("nosuch2")); + assertNull(featureData.getDouble("nosuch2")); + + assertNull(featureData.getTensor("nosuch1")); assertEquals(Tensor.from(1.5), featureData.getTensor("scalar1")); assertEquals(Tensor.from(2.5), featureData.getTensor("scalar2")); assertEquals(tensor1, featureData.getTensor("tensor1")); assertEquals(tensor2, featureData.getTensor("tensor2")); assertEquals("Cached lookup", tensor2, featureData.getTensor("tensor2")); + assertNull(featureData.getTensor("nosuch2")); + assertNull(featureData.getTensor("nosuch2")); String expectedJson = "{" + diff --git a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/deployment/ApplicationStore.java b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/deployment/ApplicationStore.java index 28a803fd43d..dd9f8c38802 100644 --- a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/deployment/ApplicationStore.java +++ b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/deployment/ApplicationStore.java @@ -5,9 +5,9 @@ import com.yahoo.config.provision.ApplicationId; import com.yahoo.config.provision.ApplicationName; import com.yahoo.config.provision.TenantName; import com.yahoo.config.provision.zone.ZoneId; +import com.yahoo.vespa.hosted.controller.api.identifiers.DeploymentId; import java.time.Instant; -import java.util.Collection; import java.util.Optional; /** @@ -59,6 +59,12 @@ public interface ApplicationStore { /** Marks the given application as deleted, and eligible for meta data GC at a later time. */ void putMetaTombstone(TenantName tenant, ApplicationName application, Instant now); + /** Stores the given manual deployment meta data with the current time as part of the path. */ + void putMeta(DeploymentId id, Instant now, byte[] metaZip); + + /** Marks the given manual deployment as deleted, and eligible for meta data GC at a later time. */ + void putMetaTombstone(DeploymentId id, Instant now); + /** Prunes meta data such that only what was active at the given instant, and anything newer, is retained. */ void pruneMeta(Instant oldest); diff --git a/controller-server/pom.xml b/controller-server/pom.xml index 158c1b5cd39..0ddc1ecd8be 100644 --- a/controller-server/pom.xml +++ b/controller-server/pom.xml @@ -28,6 +28,13 @@ <dependency> <groupId>com.yahoo.vespa</groupId> + <artifactId>config-application-package</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>com.yahoo.vespa</groupId> <artifactId>container-dev</artifactId> <version>${project.version}</version> <scope>provided</scope> diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/ApplicationController.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/ApplicationController.java index 83a1895a85d..d8abc77ed06 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/ApplicationController.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/ApplicationController.java @@ -527,6 +527,11 @@ public class ApplicationController { Optional<Quota> quota = billingController.getQuota(application.tenant(), zone.environment()); + if (zone.environment().isManuallyDeployed()) + controller.applications().applicationStore().putMeta(new DeploymentId(application, zone), + clock.instant(), + applicationPackage.metaDataZip()); + ConfigServer.PreparedApplication preparedApplication = configServer.deploy(new DeploymentData(application, zone, applicationPackage.zippedContent(), platform, endpoints, endpointCertificateMetadata, dockerImageRepo, domain, applicationRoles, quota)); @@ -583,13 +588,6 @@ public class ApplicationController { return application; } - private DeployOptions withVersion(Version version, DeployOptions options) { - return new DeployOptions(options.deployDirectly, - Optional.of(version), - options.ignoreValidationErrors, - options.deployCurrentVersion); - } - /** * Deletes the the given application. All known instances of the applications will be deleted. * @@ -720,12 +718,14 @@ public class ApplicationController { * @return the application with the deployment in the given zone removed */ private LockedApplication deactivate(LockedApplication application, InstanceName instanceName, ZoneId zone) { + DeploymentId id = new DeploymentId(application.get().id().instance(instanceName), zone); try { - configServer.deactivate(new DeploymentId(application.get().id().instance(instanceName), zone)); + configServer.deactivate(id); } catch (NotFoundException ignored) { // ok; already gone } finally { controller.routing().policies().refresh(application.get().id().instance(instanceName), application.get().deploymentSpec(), zone); + applicationStore.putMetaTombstone(id, clock.instant()); } return application.with(instanceName, instance -> instance.withoutDeploymentIn(zone)); } diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/ApplicationPackage.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/ApplicationPackage.java index b246e10ca82..c29bc3f3f5e 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/ApplicationPackage.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/ApplicationPackage.java @@ -1,12 +1,17 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.hosted.controller.application; -import com.google.common.collect.ImmutableMap; import com.google.common.hash.Hashing; import com.yahoo.component.Version; +import com.yahoo.config.application.FileSystemWrapper; +import com.yahoo.config.application.FileSystemWrapper.FileWrapper; +import com.yahoo.config.application.XmlPreProcessor; import com.yahoo.config.application.api.DeploymentSpec; import com.yahoo.config.application.api.ValidationId; import com.yahoo.config.application.api.ValidationOverrides; +import com.yahoo.config.provision.Environment; +import com.yahoo.config.provision.InstanceName; +import com.yahoo.config.provision.RegionName; import com.yahoo.security.X509CertificateUtils; import com.yahoo.slime.Inspector; import com.yahoo.slime.Slime; @@ -16,10 +21,10 @@ import com.yahoo.yolean.Exceptions; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.io.InputStreamReader; -import java.io.Reader; -import java.io.UncheckedIOException; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.nio.file.Paths; import java.security.cert.X509Certificate; import java.time.Duration; import java.time.Instant; @@ -32,11 +37,11 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.Stream; import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.stream.Collectors.toMap; /** * A representation of the content of an application package. @@ -46,6 +51,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; * This is immutable. * * @author bratseth + * @author jonmv */ public class ApplicationPackage { @@ -59,7 +65,7 @@ public class ApplicationPackage { private final byte[] zippedContent; private final DeploymentSpec deploymentSpec; private final ValidationOverrides validationOverrides; - private final Files files; + private final ZipArchiveCache files; private final Optional<Version> compileVersion; private final Optional<Instant> buildTime; private final List<X509Certificate> trustedCertificates; @@ -82,14 +88,14 @@ public class ApplicationPackage { public ApplicationPackage(byte[] zippedContent, boolean requireFiles) { this.zippedContent = Objects.requireNonNull(zippedContent, "The application package content cannot be null"); this.contentHash = Hashing.sha1().hashBytes(zippedContent).toString(); - this.files = Files.extract(Set.of(deploymentFile, validationOverridesFile, servicesFile, buildMetaFile, trustedCertificatesFile), zippedContent); + this.files = new ZipArchiveCache(zippedContent, Set.of(deploymentFile, validationOverridesFile, servicesFile, buildMetaFile, trustedCertificatesFile)); - Optional<DeploymentSpec> deploymentSpec = files.getAsReader(deploymentFile).map(DeploymentSpec::fromXml); + Optional<DeploymentSpec> deploymentSpec = files.get(deploymentFile).map(bytes -> new String(bytes, UTF_8)).map(DeploymentSpec::fromXml); if (requireFiles && deploymentSpec.isEmpty()) throw new IllegalArgumentException("Missing required file '" + deploymentFile + "'"); this.deploymentSpec = deploymentSpec.orElse(DeploymentSpec.empty); - this.validationOverrides = files.getAsReader(validationOverridesFile).map(ValidationOverrides::fromXml).orElse(ValidationOverrides.empty); + this.validationOverrides = files.get(validationOverridesFile).map(bytes -> new String(bytes, UTF_8)).map(ValidationOverrides::fromXml).orElse(ValidationOverrides.empty); Optional<Inspector> buildMetaObject = files.get(buildMetaFile).map(SlimeUtils::jsonToSlime).map(Slime::get); if (requireFiles && buildMetaObject.isEmpty()) @@ -151,68 +157,42 @@ public class ApplicationPackage { } } - private static class Files { - - /** Max size of each extracted file */ - private static final int maxSize = 10 * 1024 * 1024; // 10 MiB - - // TODO: Vespa 8: Remove application/ directory support - private static final String applicationDir = "application/"; - - private final ImmutableMap<String, byte[]> files; - - private Files(ImmutableMap<String, byte[]> files) { - this.files = files; - } - - public static Files extract(Set<String> filesToExtract, byte[] zippedContent) { - ImmutableMap.Builder<String, byte[]> builder = ImmutableMap.builder(); - try (ByteArrayInputStream stream = new ByteArrayInputStream(zippedContent)) { - ZipStreamReader reader = new ZipStreamReader(stream, - (name) -> filesToExtract.contains(withoutLegacyDir(name)), - maxSize); - for (ZipStreamReader.ZipEntryWithContent entry : reader.entries()) { - builder.put(withoutLegacyDir(entry.zipEntry().getName()), entry.content()); - } - } catch (IOException e) { - throw new UncheckedIOException("Exception reading application package", e); - } - return new Files(builder.build()); - } - - - /** Get content of given file name */ - public Optional<byte[]> get(String name) { - return Optional.ofNullable(files.get(name)); - } - - /** Get reader for the content of given file name */ - public Optional<Reader> getAsReader(String name) { - return get(name).map(ByteArrayInputStream::new).map(InputStreamReader::new); - } - - private static String withoutLegacyDir(String name) { - if (name.startsWith(applicationDir)) return name.substring(applicationDir.length()); - return name; - } - - } - /** Creates a valid application package that will remove all application's deployments */ public static ApplicationPackage deploymentRemoval() { return new ApplicationPackage(filesZip(Map.of(validationOverridesFile, allValidationOverrides().xmlForm().getBytes(UTF_8), deploymentFile, DeploymentSpec.empty.xmlForm().getBytes(UTF_8)))); } - /** Returns a zip containing only services.xml and deployment.xml files of this. */ + /** Returns a zip containing meta data about deployments of this package by the given job. */ public byte[] metaDataZip() { - return filesZip(Stream.of(deploymentFile, servicesFile) - .filter(name -> files.files.containsKey(name)) - .collect(Collectors.toMap(name -> name, - name -> files.files.get(name)))); + preProcessAndPopulateCache(); + return cacheZip(); + } + + private void preProcessAndPopulateCache() { + FileWrapper servicesXml = files.wrapper().wrap(Paths.get(servicesFile)); + if (servicesXml.exists()) + try { + new XmlPreProcessor(files.wrapper().wrap(Paths.get("./")), + new InputStreamReader(new ByteArrayInputStream(servicesXml.content()), UTF_8), + InstanceName.defaultName(), + Environment.prod, + RegionName.defaultName()) + .run(); // Populates the zip archive cache with files that would be included. + } + catch (Exception e) { + throw new RuntimeException(e); + } } - private static byte[] filesZip(Map<String, byte[]> files) { + private byte[] cacheZip() { + return filesZip(files.cache.entrySet().stream() + .filter(entry -> entry.getValue().isPresent()) + .collect(toMap(entry -> entry.getKey().toString(), + entry -> entry.getValue().get()))); + } + + static byte[] filesZip(Map<String, byte[]> files) { try (ZipBuilder zipBuilder = new ZipBuilder(files.values().stream().mapToInt(bytes -> bytes.length).sum() + 512)) { files.forEach(zipBuilder::add); zipBuilder.close(); @@ -231,4 +211,54 @@ public class ApplicationPackage { return ValidationOverrides.fromXml(validationOverridesContents.toString()); } + + /** Maps normalized paths to cached content read from a zip archive. */ + private static class ZipArchiveCache { + + /** Max size of each extracted file */ + private static final int maxSize = 10 << 20; // 10 Mb + + // TODO: Vespa 8: Remove application/ directory support + private static final String applicationDir = "application/"; + + private static String withoutLegacyDir(String name) { + if (name.startsWith(applicationDir)) return name.substring(applicationDir.length()); + return name; + } + + private final byte[] zip; + private final Map<Path, Optional<byte[]>> cache; + + public ZipArchiveCache(byte[] zip, Collection<String> prePopulated) { + this.zip = zip; + this.cache = new ConcurrentSkipListMap<>(); + this.cache.putAll(read(prePopulated)); + } + + public Optional<byte[]> get(String path) { + return get(Paths.get(path)); + } + + public Optional<byte[]> get(Path path) { + return cache.computeIfAbsent(path.normalize(), read(List.of(path.normalize().toString()))::get); + } + + public FileSystemWrapper wrapper() { + return FileSystemWrapper.ofFiles(path -> get(path).isPresent(), // Assume content asked for will also be read ... + path -> get(path).orElseThrow(() -> new NoSuchFileException(path.toString()))); + } + + private Map<Path, Optional<byte[]>> read(Collection<String> names) { + var entries = new ZipStreamReader(new ByteArrayInputStream(zip), + name -> names.contains(withoutLegacyDir(name)), + maxSize) + .entries().stream() + .collect(toMap(entry -> Paths.get(withoutLegacyDir(entry.zipEntry().getName())).normalize(), + entry -> Optional.of(entry.content()))); + names.stream().map(Paths::get).forEach(path -> entries.putIfAbsent(path.normalize(), Optional.empty())); + return entries; + } + + } + } diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/ApplicationPackageTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/ApplicationPackageTest.java index f7f0c9ce58e..67e5358678a 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/ApplicationPackageTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/ApplicationPackageTest.java @@ -2,18 +2,56 @@ package com.yahoo.vespa.hosted.controller.application; import com.yahoo.config.application.api.DeploymentSpec; import com.yahoo.config.application.api.ValidationId; +import org.junit.Assert; import org.junit.Test; +import java.io.ByteArrayInputStream; +import java.nio.file.NoSuchFileException; import java.time.Instant; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; /** * @author valerijf + * @author jonmv */ public class ApplicationPackageTest { + + private static final String deploymentXml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" + + "<deployment version=\"1.0\">\n" + + " <test />\n" + + " <prod>\n" + + " <parallel>\n" + + " <region active=\"true\">us-central-1</region>\n" + + " </parallel>\n" + + " </prod>\n" + + "</deployment>\n"; + + private static final String servicesXml = "<services version='1.0' xmlns:deploy=\"vespa\" xmlns:preprocess=\"properties\">\n" + + " <preprocess:include file='jdisc.xml' />\n" + + " <content version='1.0' if='foo' />\n" + + " <content version='1.0' id='foo' deploy:environment='staging prod' deploy:region='us-east-3 us-central-1'>\n" + + " <preprocess:include file='content/content.xml' />\n" + + " </content>\n" + + " <preprocess:include file='not_found.xml' required='false' />\n" + + "</services>\n"; + + private static final String jdiscXml = "<container id='stateless' version='1.0' />\n"; + + private static final String contentXml = "<documents>\n" + + " <document type=\"music.sd\" mode=\"index\" />\n" + + "</documents>\n" + + "<preprocess:include file=\"nodes.xml\" />"; + + private static final String nodesXml = "<nodes>\n" + + " <node hostalias=\"node0\" distribution-key=\"0\" />\n" + + "</nodes>"; + @Test public void test_createEmptyForDeploymentRemoval() { ApplicationPackage app = ApplicationPackage.deploymentRemoval(); @@ -24,4 +62,57 @@ public class ApplicationPackageTest { assertTrue(app.validationOverrides().allows(validationId, Instant.now())); } } + + @Test + public void testMetaData() { + byte[] zip = ApplicationPackage.filesZip(Map.of("services.xml", servicesXml.getBytes(UTF_8), + "jdisc.xml", jdiscXml.getBytes(UTF_8), + "content/content.xml", contentXml.getBytes(UTF_8), + "content/nodes.xml", nodesXml.getBytes(UTF_8), + "gurba", "gurba".getBytes(UTF_8))); + + assertEquals(Map.of("services.xml", servicesXml, + "jdisc.xml", jdiscXml, + "content/content.xml", contentXml, + "content/nodes.xml", nodesXml), + unzip(new ApplicationPackage(zip, false).metaDataZip())); + } + + @Test + public void testMetaDataWithLegacyApplicationDirectory() { + byte[] zip = ApplicationPackage.filesZip(Map.of("application/deployment.xml", deploymentXml.getBytes(UTF_8), + "application/services.xml", servicesXml.getBytes(UTF_8), + "application/jdisc.xml", jdiscXml.getBytes(UTF_8), + "application/content/content.xml", contentXml.getBytes(UTF_8), + "application/content/nodes.xml", nodesXml.getBytes(UTF_8), + "application/gurba", "gurba".getBytes(UTF_8))); + + assertEquals(Map.of("deployment.xml", deploymentXml, + "services.xml", servicesXml, + "jdisc.xml", jdiscXml, + "content/content.xml", contentXml, + "content/nodes.xml", nodesXml), + unzip(new ApplicationPackage(zip, false).metaDataZip())); + } + + @Test + public void testMetaDataWithMissingFiles() { + byte[] zip = ApplicationPackage.filesZip(Map.of("services.xml", servicesXml.getBytes(UTF_8))); + + try { + new ApplicationPackage(zip, false).metaDataZip(); + Assert.fail("Should fail on missing include file"); + } + catch (RuntimeException e) { + assertEquals("./jdisc.xml", e.getCause().getMessage()); + } + } + + private static Map<String, String> unzip(byte[] zip) { + return new ZipStreamReader(new ByteArrayInputStream(zip), __ -> true, 1 << 10) + .entries().stream() + .collect(Collectors.toMap(entry -> entry.zipEntry().getName(), + entry -> new String(entry.content(), UTF_8))); + } + } diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/integration/ApplicationStoreMock.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/integration/ApplicationStoreMock.java index be0cd975190..81bda23146e 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/integration/ApplicationStoreMock.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/integration/ApplicationStoreMock.java @@ -6,6 +6,7 @@ import com.yahoo.config.provision.ApplicationName; import com.yahoo.config.provision.InstanceName; import com.yahoo.config.provision.TenantName; import com.yahoo.config.provision.zone.ZoneId; +import com.yahoo.vespa.hosted.controller.api.identifiers.DeploymentId; import com.yahoo.vespa.hosted.controller.api.integration.deployment.ApplicationStore; import com.yahoo.vespa.hosted.controller.api.integration.deployment.ApplicationVersion; import com.yahoo.vespa.hosted.controller.api.integration.deployment.TesterId; @@ -14,7 +15,6 @@ import java.time.Instant; import java.util.Map; import java.util.NavigableMap; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; @@ -32,6 +32,7 @@ public class ApplicationStoreMock implements ApplicationStore { private final Map<ApplicationId, Map<ApplicationVersion, byte[]>> store = new ConcurrentHashMap<>(); private final Map<ApplicationId, Map<ZoneId, byte[]>> devStore = new ConcurrentHashMap<>(); private final Map<ApplicationId, NavigableMap<Instant, byte[]>> meta = new ConcurrentHashMap<>(); + private final Map<DeploymentId, NavigableMap<Instant, byte[]>> metaManual = new ConcurrentHashMap<>(); private static ApplicationId appId(TenantName tenant, ApplicationName application) { return ApplicationId.from(tenant, application, InstanceName.defaultName()); @@ -120,6 +121,16 @@ public class ApplicationStoreMock implements ApplicationStore { } @Override + public void putMeta(DeploymentId id, Instant now, byte[] metaZip) { + metaManual.computeIfAbsent(id, __ -> new ConcurrentSkipListMap<>()).put(now, metaZip); + } + + @Override + public void putMetaTombstone(DeploymentId id, Instant now) { + putMeta(id, now, tombstone); + } + + @Override public void pruneMeta(Instant oldest) { for (ApplicationId id : meta.keySet()) { Instant activeAtOldest = meta.get(id).lowerKey(oldest); diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp index 18b3a5c5d8e..44058d48d1e 100644 --- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp +++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp @@ -49,6 +49,7 @@ using search::index::schema::CollectionType; using search::index::schema::DataType; using vespalib::makeLambdaTask; using search::transactionlog::TransLogServer; +using search::transactionlog::DomainConfig; using storage::spi::PartitionId; using storage::spi::RemoveResult; using storage::spi::Result; @@ -459,7 +460,7 @@ struct FeedHandlerFixture FeedHandler handler; FeedHandlerFixture() : _fileHeaderContext(), - tls("mytls", 9016, "mytlsdir", _fileHeaderContext, 0x10000), + tls("mytls", 9016, "mytlsdir", _fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)), tlsSpec("tcp/localhost:9016"), sharedExecutor(1, 0x10000), writeService(sharedExecutor), diff --git a/searchlib/src/tests/features/onnx_feature/.gitattributes b/searchlib/src/tests/features/onnx_feature/.gitattributes new file mode 100644 index 00000000000..62e8ad1e0a0 --- /dev/null +++ b/searchlib/src/tests/features/onnx_feature/.gitattributes @@ -0,0 +1 @@ +/*.onnx binary diff --git a/searchlib/src/tests/features/onnx_feature/onnx_feature_test.cpp b/searchlib/src/tests/features/onnx_feature/onnx_feature_test.cpp index 826984832f6..b49d9c365de 100644 --- a/searchlib/src/tests/features/onnx_feature/onnx_feature_test.cpp +++ b/searchlib/src/tests/features/onnx_feature/onnx_feature_test.cpp @@ -27,6 +27,7 @@ std::string source_dir = get_source_dir(); std::string vespa_dir = source_dir + "/" + "../../../../.."; std::string simple_model = vespa_dir + "/" + "eval/src/tests/tensor/onnx_wrapper/simple.onnx"; std::string dynamic_model = vespa_dir + "/" + "eval/src/tests/tensor/onnx_wrapper/dynamic.onnx"; +std::string strange_names_model = source_dir + "/" + "strange_names.onnx"; uint32_t default_docid = 1; @@ -108,4 +109,16 @@ TEST_F(OnnxFeatureTest, dynamic_onnx_model_can_be_calculated) { EXPECT_EQ(get(3), TensorSpec("tensor<float>(d0[1],d1[1])").add({{"d0",0},{"d1",0}}, 89.0)); } +TEST_F(OnnxFeatureTest, strange_input_and_output_names_are_normalized) { + add_expr("input_0", "tensor<float>(a[2]):[10,20]"); + add_expr("input_1", "tensor<float>(a[2]):[5,10]"); + add_onnx("strange_names", strange_names_model); + compile(onnx_feature("strange_names")); + auto expect_add = TensorSpec("tensor<float>(d0[2])").add({{"d0",0}},15).add({{"d0",1}},30); + auto expect_sub = TensorSpec("tensor<float>(d0[2])").add({{"d0",0}},5).add({{"d0",1}},10); + EXPECT_EQ(get(1), expect_add); + EXPECT_EQ(get("onnxModel(strange_names).foo_bar", 1), expect_add); + EXPECT_EQ(get("onnxModel(strange_names)._baz_0", 1), expect_sub); +} + GTEST_MAIN_RUN_ALL_TESTS() diff --git a/searchlib/src/tests/features/onnx_feature/strange_names.onnx b/searchlib/src/tests/features/onnx_feature/strange_names.onnx new file mode 100644 index 00000000000..d61ecbd5540 --- /dev/null +++ b/searchlib/src/tests/features/onnx_feature/strange_names.onnx @@ -0,0 +1,23 @@ +strange_names.py: + +input:0 +input/1foo/bar"Add + +input:0 +input/1-baz:0"Sub
strange_namesZ +input:0 + + +Z +input/1 + + +b +foo/bar + + +b +-baz:0 + + +B
\ No newline at end of file diff --git a/searchlib/src/tests/features/onnx_feature/strange_names.py b/searchlib/src/tests/features/onnx_feature/strange_names.py new file mode 100755 index 00000000000..681da641264 --- /dev/null +++ b/searchlib/src/tests/features/onnx_feature/strange_names.py @@ -0,0 +1,36 @@ +# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +import onnx +from onnx import helper, TensorProto + +INPUT1 = helper.make_tensor_value_info('input:0', TensorProto.FLOAT, [2]) +INPUT2 = helper.make_tensor_value_info('input/1', TensorProto.FLOAT, [2]) + +OUTPUT1 = helper.make_tensor_value_info('foo/bar', TensorProto.FLOAT, [2]) +OUTPUT2 = helper.make_tensor_value_info('-baz:0', TensorProto.FLOAT, [2]) + +nodes = [ + helper.make_node( + 'Add', + ['input:0', 'input/1'], + ['foo/bar'], + ), + helper.make_node( + 'Sub', + ['input:0', 'input/1'], + ['-baz:0'], + ), +] +graph_def = helper.make_graph( + nodes, + 'strange_names', + [ + INPUT1, + INPUT2, + ], + [ + OUTPUT1, + OUTPUT2, + ], +) +model_def = helper.make_model(graph_def, producer_name='strange_names.py') +onnx.save(model_def, 'strange_names.onnx') diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp index 0dced597917..d003bad0582 100644 --- a/searchlib/src/tests/transactionlog/translogclient_test.cpp +++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp @@ -5,7 +5,6 @@ #include <vespa/vespalib/objects/identifiable.h> #include <vespa/searchlib/index/dummyfileheadercontext.h> #include <vespa/fastos/file.h> -#include <map> #include <vespa/log/log.h> LOG_SETUP("translogclient_test"); @@ -14,9 +13,24 @@ using namespace search; using namespace transactionlog; using namespace document; using namespace vespalib; +using namespace std::chrono_literals; using search::index::DummyFileHeaderContext; -vespalib::string myhex(const void * b, size_t sz) +namespace { + +bool createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t preExistingDomains=0); +TransLogClient::Session::UP openDomainTest(TransLogClient & tls, const vespalib::string & name); +bool fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name); +void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries); +void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries, size_t entrySize); +uint32_t countFiles(const vespalib::string &dir); +void checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries); +bool visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name); +void createAndFillDomain(const vespalib::string & name, Encoding encoding, size_t preExistingDomains); +void verifyDomain(const vespalib::string & name); + +vespalib::string +myhex(const void * b, size_t sz) { static const char * hextab="0123456789ABCDEF"; const unsigned char * c = static_cast<const unsigned char *>(b); @@ -29,35 +43,6 @@ vespalib::string myhex(const void * b, size_t sz) return s; } -class Test : public vespalib::TestApp -{ -public: - int Main() override; -private: - bool createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t preExistingDomains=0); - TransLogClient::Session::UP openDomainTest(TransLogClient & tls, const vespalib::string & name); - bool fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name); - void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries); - void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries, size_t entrySize); - uint32_t countFiles(const vespalib::string &dir); - void checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries); - bool visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name); - bool partialUpdateTest(); - bool testVisitOverGeneratedDomain(); - bool testRemove(); - void createAndFillDomain(const vespalib::string & name, DomainPart::Crc crcMethod, size_t preExistingDomains); - void verifyDomain(const vespalib::string & name); - void testCrcVersions(); - bool testVisitOverPreExistingDomain(); - void testMany(); - void testErase(); - void testSync(); - void testTruncateOnShortRead(); - void testTruncateOnVersionMismatch(); -}; - -TEST_APPHOOK(Test); - class CallBackTest : public TransLogClient::Visitor::Callback { private: @@ -75,7 +60,8 @@ public: bool _eof; }; -RPC::Result CallBackTest::receive(const Packet & p) +RPC::Result +CallBackTest::receive(const Packet & p) { nbostream_longlivedbuf h(p.getHandle().data(), p.getHandle().size()); LOG(info,"CallBackTest::receive (%zu, %zu, %zu)(%s)", h.rp(), h.size(), h.capacity(), myhex(h.peek(), h.size()).c_str()); @@ -101,7 +87,8 @@ public: size_t _value; }; -RPC::Result CallBackManyTest::receive(const Packet & p) +RPC::Result +CallBackManyTest::receive(const Packet & p) { nbostream_longlivedbuf h(p.getHandle().data(), p.getHandle().size()); for(;h.size() > 0; _count++, _value++) { @@ -133,7 +120,8 @@ public: }; -RPC::Result CallBackUpdate::receive(const Packet & packet) +RPC::Result +CallBackUpdate::receive(const Packet & packet) { nbostream_longlivedbuf h(packet.getHandle().data(), packet.getHandle().size()); while (h.size() > 0) { @@ -185,7 +173,8 @@ public: SerialNum _prevSerial; }; -RPC::Result CallBackStatsTest::receive(const Packet & p) +RPC::Result +CallBackStatsTest::receive(const Packet & p) { nbostream_longlivedbuf h(p.getHandle().data(), p.getHandle().size()); for(;h.size() > 0; ++_count) { @@ -221,67 +210,10 @@ public: IMPLEMENT_IDENTIFIABLE(TestIdentifiable, Identifiable); -bool Test::partialUpdateTest() -{ - bool retval(false); - DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test7", 18377, ".", fileHeaderContext, 0x10000); - TransLogClient tls("tcp/localhost:18377"); - - TransLogClient::Session::UP s1 = openDomainTest(tls, "test1"); - TransLogClient::Session & session = *s1; - - TestIdentifiable du; - - nbostream os; - os << du; - - vespalib::ConstBufferRef bb(os.data(), os.size()); - LOG(info, "DU : %s", myhex(bb.c_str(), bb.size()).c_str()); - Packet::Entry e(7, du.getClass().id(), bb); - Packet pa; - pa.add(e); - pa.close(); - ASSERT_TRUE(session.commit(vespalib::ConstBufferRef(pa.getHandle().data(), pa.getHandle().size()))); - - CallBackUpdate ca; - TransLogClient::Visitor::UP visitor = tls.createVisitor("test1", ca); - ASSERT_TRUE(visitor.get()); - ASSERT_TRUE( visitor->visit(5, 7) ); - for (size_t i(0); ! ca._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } - ASSERT_TRUE( ca._eof ); - ASSERT_TRUE( ca.map().size() == 1); - ASSERT_TRUE( ca.hasSerial(7) ); - - CallBackUpdate ca1; - TransLogClient::Visitor::UP visitor1 = tls.createVisitor("test1", ca1); - ASSERT_TRUE(visitor1.get()); - ASSERT_TRUE( visitor1->visit(4, 5) ); - for (size_t i(0); ! ca1._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } - ASSERT_TRUE( ca1._eof ); - ASSERT_TRUE( ca1.map().size() == 0); - - CallBackUpdate ca2; - TransLogClient::Visitor::UP visitor2 = tls.createVisitor("test1", ca2); - ASSERT_TRUE(visitor2.get()); - ASSERT_TRUE( visitor2->visit(5, 6) ); - for (size_t i(0); ! ca2._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } - ASSERT_TRUE( ca2._eof ); - ASSERT_TRUE( ca2.map().size() == 0); - - CallBackUpdate ca3; - TransLogClient::Visitor::UP visitor3 = tls.createVisitor("test1", ca3); - ASSERT_TRUE(visitor3.get()); - ASSERT_TRUE( visitor3->visit(5, 1000) ); - for (size_t i(0); ! ca3._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } - ASSERT_TRUE( ca3._eof ); - ASSERT_TRUE( ca3.map().size() == 1); - ASSERT_TRUE( ca3.hasSerial(7) ); - - return retval; -} +constexpr size_t DEFAULT_PACKET_SIZE = 0xf000; -bool Test::createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t preExistingDomains) +bool +createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t preExistingDomains) { bool retval(true); std::vector<vespalib::string> dir; @@ -298,43 +230,40 @@ bool Test::createDomainTest(TransLogClient & tls, const vespalib::string & name, return retval; } -TransLogClient::Session::UP Test::openDomainTest(TransLogClient & tls, const vespalib::string & name) +TransLogClient::Session::UP +openDomainTest(TransLogClient & tls, const vespalib::string & name) { TransLogClient::Session::UP s1 = tls.open(name); ASSERT_TRUE (s1.get() != NULL); return s1; } -bool Test::fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name) +bool +fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name) { bool retval(true); Packet::Entry e1(1, 1, vespalib::ConstBufferRef("Content in buffer A", 20)); Packet::Entry e2(2, 2, vespalib::ConstBufferRef("Content in buffer B", 20)); Packet::Entry e3(3, 1, vespalib::ConstBufferRef("Content in buffer C", 20)); - Packet a; - ASSERT_TRUE (a.add(e1)); - Packet b; - ASSERT_TRUE (b.add(e2)); - ASSERT_TRUE (b.add(e3)); - ASSERT_TRUE (!b.add(e1)); - a.close(); - b.close(); + Packet a(DEFAULT_PACKET_SIZE); + a.add(e1); + Packet b(DEFAULT_PACKET_SIZE); + b.add(e2); + b.add(e3); + EXPECT_FALSE(b.add(e1)); ASSERT_TRUE (s1->commit(vespalib::ConstBufferRef(a.getHandle().data(), a.getHandle().size()))); ASSERT_TRUE (s1->commit(vespalib::ConstBufferRef(b.getHandle().data(), b.getHandle().size()))); - try { - s1->commit(vespalib::ConstBufferRef(a.getHandle().data(), a.getHandle().size())); - ASSERT_TRUE(false); - } catch (const std::exception & e) { - EXPECT_EQUAL(vespalib::string("commit failed with code -2. server says: Exception during commit on " + name + " : Incomming serial number(1) must be bigger than the last one (3)."), e.what()); - } + EXPECT_EXCEPTION(s1->commit(vespalib::ConstBufferRef(a.getHandle().data(), a.getHandle().size())), + std::runtime_error, + "commit failed with code -2. server says: Exception during commit on " + name + " : Incomming serial number(1) must be bigger than the last one (3)."); EXPECT_EQUAL(a.size(), 1u); EXPECT_EQUAL(a.range().from(), 1u); EXPECT_EQUAL(a.range().to(), 1u); EXPECT_EQUAL(b.size(), 2u); EXPECT_EQUAL(b.range().from(), 2u); EXPECT_EQUAL(b.range().to(), 3u); - EXPECT_TRUE(a.merge(b)); + a.merge(b); EXPECT_EQUAL(a.size(), 3u); EXPECT_EQUAL(a.range().from(), 1u); EXPECT_EQUAL(a.range().to(), 3u); @@ -349,52 +278,82 @@ bool Test::fillDomainTest(TransLogClient::Session * s1, const vespalib::string & return retval; } -void Test::fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries) +void +fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries) { size_t value(0); for(size_t i=0; i < numPackets; i++) { - std::unique_ptr<Packet> p(new Packet()); + std::unique_ptr<Packet> p(new Packet(DEFAULT_PACKET_SIZE)); for(size_t j=0; j < numEntries; j++, value++) { Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&value, sizeof(value))); - if ( ! p->add(e) ) { - p->close(); + p->add(e); + if (p->sizeBytes() > DEFAULT_PACKET_SIZE){ ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().data(), p->getHandle().size()))); - p.reset(new Packet()); - ASSERT_TRUE(p->add(e)); + p.reset(new Packet(DEFAULT_PACKET_SIZE)); } } - p->close(); ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().data(), p->getHandle().size()))); } } +using Counter = std::atomic<size_t>; + +class CountDone : public IDestructorCallback { +public: + CountDone(Counter & inFlight) : _inFlight(inFlight) { ++_inFlight; } + ~CountDone() override { --_inFlight; } +private: + Counter & _inFlight; +}; + +void +fillDomainTest(TransLogServer & s1, const vespalib::string & domain, size_t numPackets, size_t numEntries) +{ + size_t value(0); + Counter inFlight(0); + for(size_t i=0; i < numPackets; i++) { + std::unique_ptr<Packet> p(new Packet(DEFAULT_PACKET_SIZE)); + for(size_t j=0; j < numEntries; j++, value++) { + Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&value, sizeof(value))); + p->add(e); + if ( p->sizeBytes() > DEFAULT_PACKET_SIZE ) { + s1.commit(domain, *p, std::make_shared<CountDone>(inFlight)); + p.reset(new Packet(DEFAULT_PACKET_SIZE)); + } + } + s1.commit(domain, *p, std::make_shared<CountDone>(inFlight)); + LOG(info, "Inflight %ld", inFlight.load()); + } + while (inFlight.load() != 0) { + std::this_thread::sleep_for(10ms); + LOG(info, "Waiting for inflight %ld to reach zero", inFlight.load()); + } + +} + void -Test::fillDomainTest(TransLogClient::Session * s1, - size_t numPackets, size_t numEntries, - size_t entrySize) +fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries, size_t entrySize) { size_t value(0); std::vector<char> entryBuffer(entrySize); for(size_t i=0; i < numPackets; i++) { - std::unique_ptr<Packet> p(new Packet()); + std::unique_ptr<Packet> p(new Packet(DEFAULT_PACKET_SIZE)); for(size_t j=0; j < numEntries; j++, value++) { Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&entryBuffer[0], entryBuffer.size())); - if ( ! p->add(e) ) { - p->close(); + p->add(e); + if (p->sizeBytes() > DEFAULT_PACKET_SIZE){ ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().data(), p->getHandle().size()))); - p.reset(new Packet()); - ASSERT_TRUE(p->add(e)); + p.reset(new Packet(DEFAULT_PACKET_SIZE)); } } - p->close(); ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().data(), p->getHandle().size()))); } } uint32_t -Test::countFiles(const vespalib::string &dir) +countFiles(const vespalib::string &dir) { uint32_t res = 0; FastOS_DirectoryScan dirScan(dir.c_str()); @@ -408,10 +367,8 @@ Test::countFiles(const vespalib::string &dir) return res; } - void -Test::checkFilledDomainTest(const TransLogClient::Session::UP &s1, - size_t numEntries) +checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries) { SerialNum b(0), e(0); size_t c(0); @@ -421,8 +378,8 @@ Test::checkFilledDomainTest(const TransLogClient::Session::UP &s1, EXPECT_EQUAL(c, numEntries); } - -bool Test::visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name) +bool +visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name) { bool retval(true); @@ -486,10 +443,31 @@ getMaxSessionRunTime(TransLogServer &tls, const vespalib::string &domain) return tls.getDomainStats()[domain].maxSessionRunTime.count(); } -bool Test::testVisitOverGeneratedDomain() +void createAndFillDomain(const vespalib::string & name, Encoding encoding, size_t preExistingDomains) { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test7", 18377, ".", fileHeaderContext, 0x10000); + TransLogServer tlss("test13", 18377, ".", fileHeaderContext, + DomainConfig().setPartSizeLimit(0x1000000).setEncoding(encoding), 4); + TransLogClient tls("tcp/localhost:18377"); + + createDomainTest(tls, name, preExistingDomains); + TransLogClient::Session::UP s1 = openDomainTest(tls, name); + fillDomainTest(s1.get(), name); +} + +void verifyDomain(const vespalib::string & name) { + DummyFileHeaderContext fileHeaderContext; + TransLogServer tlss("test13", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); + TransLogClient tls("tcp/localhost:18377"); + TransLogClient::Session::UP s1 = openDomainTest(tls, name); + visitDomainTest(tls, s1.get(), name); +} + +} + +TEST("testVisitOverGeneratedDomain") { + DummyFileHeaderContext fileHeaderContext; + TransLogServer tlss("test7", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls("tcp/localhost:18377"); vespalib::string name("test1"); @@ -501,42 +479,85 @@ bool Test::testVisitOverGeneratedDomain() double maxSessionRunTime = getMaxSessionRunTime(tlss, "test1"); LOG(info, "testVisitOverGeneratedDomain(): maxSessionRunTime=%f", maxSessionRunTime); EXPECT_GREATER(maxSessionRunTime, 0); - return true; } -void Test::createAndFillDomain(const vespalib::string & name, DomainPart::Crc crcMethod, size_t preExistingDomains) -{ +TEST("testVisitOverPreExistingDomain") { + // Depends on Test::testVisitOverGeneratedDomain() DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test13", 18377, ".", fileHeaderContext, 0x10000, 4, crcMethod); + TransLogServer tlss("test7", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls("tcp/localhost:18377"); - createDomainTest(tls, name, preExistingDomains); + vespalib::string name("test1"); TransLogClient::Session::UP s1 = openDomainTest(tls, name); - fillDomainTest(s1.get(), name); + visitDomainTest(tls, s1.get(), name); } -void Test::verifyDomain(const vespalib::string & name) -{ +TEST("partialUpdateTest") { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test13", 18377, ".", fileHeaderContext, 0x10000); + TransLogServer tlss("test7", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls("tcp/localhost:18377"); - TransLogClient::Session::UP s1 = openDomainTest(tls, name); - visitDomainTest(tls, s1.get(), name); + + TransLogClient::Session::UP s1 = openDomainTest(tls, "test1"); + TransLogClient::Session & session = *s1; + + TestIdentifiable du; + + nbostream os; + os << du; + + vespalib::ConstBufferRef bb(os.data(), os.size()); + LOG(info, "DU : %s", myhex(bb.c_str(), bb.size()).c_str()); + Packet::Entry e(7, du.getClass().id(), bb); + Packet pa(DEFAULT_PACKET_SIZE); + pa.add(e); + ASSERT_TRUE(session.commit(vespalib::ConstBufferRef(pa.getHandle().data(), pa.getHandle().size()))); + + CallBackUpdate ca; + TransLogClient::Visitor::UP visitor = tls.createVisitor("test1", ca); + ASSERT_TRUE(visitor.get()); + ASSERT_TRUE( visitor->visit(5, 7) ); + for (size_t i(0); ! ca._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } + ASSERT_TRUE( ca._eof ); + ASSERT_TRUE( ca.map().size() == 1); + ASSERT_TRUE( ca.hasSerial(7) ); + + CallBackUpdate ca1; + TransLogClient::Visitor::UP visitor1 = tls.createVisitor("test1", ca1); + ASSERT_TRUE(visitor1.get()); + ASSERT_TRUE( visitor1->visit(4, 5) ); + for (size_t i(0); ! ca1._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } + ASSERT_TRUE( ca1._eof ); + ASSERT_TRUE( ca1.map().size() == 0); + + CallBackUpdate ca2; + TransLogClient::Visitor::UP visitor2 = tls.createVisitor("test1", ca2); + ASSERT_TRUE(visitor2.get()); + ASSERT_TRUE( visitor2->visit(5, 6) ); + for (size_t i(0); ! ca2._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } + ASSERT_TRUE( ca2._eof ); + ASSERT_TRUE( ca2.map().size() == 0); + + CallBackUpdate ca3; + TransLogClient::Visitor::UP visitor3 = tls.createVisitor("test1", ca3); + ASSERT_TRUE(visitor3.get()); + ASSERT_TRUE( visitor3->visit(5, 1000) ); + for (size_t i(0); ! ca3._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); } + ASSERT_TRUE( ca3._eof ); + ASSERT_TRUE( ca3.map().size() == 1); + ASSERT_TRUE( ca3.hasSerial(7) ); } -void Test::testCrcVersions() -{ - createAndFillDomain("ccitt_crc32", DomainPart::ccitt_crc32, 0); - createAndFillDomain("xxh64", DomainPart::xxh64, 1); +TEST("testCrcVersions") { + createAndFillDomain("ccitt_crc32", Encoding(Encoding::Crc::ccitt_crc32, Encoding::Compression::none), 0); + createAndFillDomain("xxh64", Encoding(Encoding::Crc::xxh64, Encoding::Compression::none), 1); verifyDomain("ccitt_crc32"); verifyDomain("xxh64"); } -bool Test::testRemove() -{ +TEST("testRemove") { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("testremove", 18377, ".", fileHeaderContext, 0x10000); + TransLogServer tlss("testremove", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls("tcp/localhost:18377"); vespalib::string name("test-delete"); @@ -545,21 +566,6 @@ bool Test::testRemove() fillDomainTest(s1.get(), name); visitDomainTest(tls, s1.get(), name); ASSERT_TRUE(tls.remove(name)); - - return true; -} - -bool Test::testVisitOverPreExistingDomain() -{ - // Depends on Test::testVisitOverGeneratedDomain() - DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test7", 18377, ".", fileHeaderContext, 0x10000); - TransLogClient tls("tcp/localhost:18377"); - - vespalib::string name("test1"); - TransLogClient::Session::UP s1 = openDomainTest(tls, name); - visitDomainTest(tls, s1.get(), name); - return true; } namespace { @@ -600,18 +606,19 @@ assertStatus(TransLogClient::Session &s, } -void Test::testMany() -{ +TEST("test sending a lot of data") { const unsigned int NUM_PACKETS = 1000; const unsigned int NUM_ENTRIES = 100; const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; + const vespalib::string MANY("many"); { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test8", 18377, ".", fileHeaderContext, 0x80000); + TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x80000) + .setChunkAgeLimit(100us)); TransLogClient tls("tcp/localhost:18377"); - createDomainTest(tls, "many", 0); - TransLogClient::Session::UP s1 = openDomainTest(tls, "many"); + createDomainTest(tls, MANY, 0); + TransLogClient::Session::UP s1 = openDomainTest(tls, MANY); fillDomainTest(s1.get(), NUM_PACKETS, NUM_ENTRIES); SerialNum b(0), e(0); size_t c(0); @@ -630,7 +637,7 @@ void Test::testMany() } { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test8", 18377, ".", fileHeaderContext, 0x1000000); + TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); TransLogClient tls("tcp/localhost:18377"); TransLogClient::Session::UP s1 = openDomainTest(tls, "many"); @@ -641,7 +648,28 @@ void Test::testMany() EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES); EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES); CallBackManyTest ca(2); - TransLogClient::Visitor::UP visitor = tls.createVisitor("many", ca); + TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca); + ASSERT_TRUE(visitor.get()); + ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) ); + for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); } + ASSERT_TRUE( ca._eof ); + EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES); + EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES); + } + { + DummyFileHeaderContext fileHeaderContext; + TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); + TransLogClient tls("tcp/localhost:18377"); + + TransLogClient::Session::UP s1 = openDomainTest(tls, MANY); + SerialNum b(0), e(0); + size_t c(0); + EXPECT_TRUE(s1->status(b, e, c)); + EXPECT_EQUAL(b, 1u); + EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES); + EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES); + CallBackManyTest ca(2); + TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca); ASSERT_TRUE(visitor.get()); ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) ); for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); } @@ -651,14 +679,67 @@ void Test::testMany() } } -void Test::testErase() -{ +TEST("test sending a lot of data async") { const unsigned int NUM_PACKETS = 1000; const unsigned int NUM_ENTRIES = 100; const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; + const vespalib::string MANY("many-async"); + { + DummyFileHeaderContext fileHeaderContext; + TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x80000) + .setChunkAgeLimit(10ms)); + TransLogClient tls("tcp/localhost:18377"); + createDomainTest(tls, MANY, 1); + TransLogClient::Session::UP s1 = openDomainTest(tls, MANY); + fillDomainTest(tlss, MANY, NUM_PACKETS, NUM_ENTRIES); + SerialNum b(0), e(0); + size_t c(0); + EXPECT_TRUE(s1->status(b, e, c)); + + EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES); + EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES); + CallBackManyTest ca(2); + TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca); + ASSERT_TRUE(visitor.get()); + ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) ); + for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); } + ASSERT_TRUE( ca._eof ); + EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES); + EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES); + } { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test12", 18377, ".", fileHeaderContext, 0x80000); + TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); + TransLogClient tls("tcp/localhost:18377"); + + TransLogClient::Session::UP s1 = openDomainTest(tls, MANY); + SerialNum b(0), e(0); + size_t c(0); + EXPECT_TRUE(s1->status(b, e, c)); + EXPECT_EQUAL(b, 1u); + EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES); + EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES); + CallBackManyTest ca(2); + TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca); + ASSERT_TRUE(visitor.get()); + ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) ); + for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); } + ASSERT_TRUE( ca._eof ); + EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES); + EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES); + } +} + + + + +TEST("testErase") { + const unsigned int NUM_PACKETS = 1000; + const unsigned int NUM_ENTRIES = 100; + const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; + { + DummyFileHeaderContext fileHeaderContext; + TransLogServer tlss("test12", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x80000)); TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, "erase", 0); @@ -667,7 +748,7 @@ void Test::testErase() } { DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test12", 18377, ".", fileHeaderContext, 0x1000000); + TransLogServer tlss("test12", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); TransLogClient tls("tcp/localhost:18377"); TransLogClient::Session::UP s1 = openDomainTest(tls, "erase"); @@ -748,16 +829,13 @@ void Test::testErase() } } - -void -Test::testSync() -{ +TEST("testSync") { const unsigned int NUM_PACKETS = 3; const unsigned int NUM_ENTRIES = 4; const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; DummyFileHeaderContext fileHeaderContext; - TransLogServer tlss("test9", 18377, ".", fileHeaderContext, 0x1000000); + TransLogServer tlss("test9", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, "sync", 0); @@ -770,10 +848,7 @@ Test::testSync() EXPECT_EQUAL(syncedTo, TOTAL_NUM_ENTRIES); } - -void -Test::testTruncateOnVersionMismatch() -{ +TEST("test truncate on version mismatch") { const unsigned int NUM_PACKETS = 3; const unsigned int NUM_ENTRIES = 4; const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; @@ -782,7 +857,7 @@ Test::testTruncateOnVersionMismatch() size_t countOld(0); DummyFileHeaderContext fileHeaderContext; { - TransLogServer tlss("test11", 18377, ".", fileHeaderContext, 0x1000000); + TransLogServer tlss("test11", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000)); TransLogClient tls("tcp/localhost:18377"); createDomainTest(tls, "sync", 0); @@ -803,7 +878,7 @@ Test::testTruncateOnVersionMismatch() EXPECT_EQUAL(static_cast<ssize_t>(sizeof(tmp)), f.Write2(tmp, sizeof(tmp))); EXPECT_TRUE(f.Close()); { - TransLogServer tlss("test11", 18377, ".", fileHeaderContext, 0x1000000); + TransLogServer tlss("test11", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls("tcp/localhost:18377"); TransLogClient::Session::UP s1 = openDomainTest(tls, "sync"); uint64_t from(0), to(0); @@ -815,9 +890,7 @@ Test::testTruncateOnVersionMismatch() } } -void -Test::testTruncateOnShortRead() -{ +TEST("test truncation after short read") { const unsigned int NUM_PACKETS = 17; const unsigned int NUM_ENTRIES = 1; const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES; @@ -829,7 +902,7 @@ Test::testTruncateOnShortRead() DummyFileHeaderContext fileHeaderContext; { - TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, 0x10000); + TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls(tlsspec); createDomainTest(tls, domain, 0); @@ -845,7 +918,7 @@ Test::testTruncateOnShortRead() EXPECT_EQUAL(2u, countFiles(dir)); } { - TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, 0x10000); + TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls(tlsspec); TransLogClient::Session::UP s1 = openDomainTest(tls, domain); checkFilledDomainTest(s1, TOTAL_NUM_ENTRIES); @@ -861,7 +934,7 @@ Test::testTruncateOnShortRead() trfile.Close(); } { - TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, 0x10000); + TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)); TransLogClient tls(tlsspec); TransLogClient::Session::UP s1 = openDomainTest(tls, domain); checkFilledDomainTest(s1, TOTAL_NUM_ENTRIES - 1); @@ -871,28 +944,4 @@ Test::testTruncateOnShortRead() } } - -int Test::Main() -{ - TEST_INIT("translogclient_test"); - - if (_argc > 0) { - DummyFileHeaderContext::setCreator(_argv[0]); - } - testVisitOverGeneratedDomain(); - testVisitOverPreExistingDomain(); - testMany(); - testErase(); - partialUpdateTest(); - - testRemove(); - - testSync(); - - testTruncateOnShortRead(); - testTruncateOnVersionMismatch(); - - testCrcVersions(); - - TEST_DONE(); -} +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/searchlib/src/tests/transactionlogstress/translogstress.cpp b/searchlib/src/tests/transactionlogstress/translogstress.cpp index 013ca81dcc9..925f297bf48 100644 --- a/searchlib/src/tests/transactionlogstress/translogstress.cpp +++ b/searchlib/src/tests/transactionlogstress/translogstress.cpp @@ -698,7 +698,7 @@ TransLogStress::Main() // start transaction log server DummyFileHeaderContext fileHeaderContext; - TransLogServer tls("server", 17897, ".", fileHeaderContext, _cfg.domainPartSize); + TransLogServer tls("server", 17897, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(_cfg.domainPartSize)); TransLogClient client(tlsSpec); client.create(domain); diff --git a/searchlib/src/vespa/searchlib/config/translogserver.def b/searchlib/src/vespa/searchlib/config/translogserver.def index 74efe3fe68e..f822fc80fc1 100644 --- a/searchlib/src/vespa/searchlib/config/translogserver.def +++ b/searchlib/src/vespa/searchlib/config/translogserver.def @@ -5,7 +5,7 @@ namespace=searchlib listenport int default=13700 restart ## Max file size (50M) -filesizemax int default=50000000 restart +filesizemax int default=50000000 ## Server name to identify server. servername string default="tls" restart @@ -22,3 +22,17 @@ maxthreads int default=4 restart ##Default crc method used crcmethod enum {ccitt_crc32, xxh64} default=xxh64 + +## Control compression type. +compression.type enum {NONE, NONE_MULTI, LZ4, ZSTD} default=LZ4 + +## Control compression level +## LZ4 has normal range 1..9 while ZSTD has range 1..19 +## 9 is a reasonable default for both +compression.level int default=9 + +## How large a chunk can grow in memory before beeing flushed +chunk.sizelimit int default = 256000 # 256k + +## How long a chunk can reside in memory befor ebeeing flushed to disk. +chunk.agelimit double default = 0.010 # 10 milliseconds diff --git a/searchlib/src/vespa/searchlib/features/onnx_feature.cpp b/searchlib/src/vespa/searchlib/features/onnx_feature.cpp index b24392ce629..698d2309e5a 100644 --- a/searchlib/src/vespa/searchlib/features/onnx_feature.cpp +++ b/searchlib/src/vespa/searchlib/features/onnx_feature.cpp @@ -26,6 +26,25 @@ using vespalib::tensor::Onnx; namespace search::features { +namespace { + +vespalib::string normalize_name(const vespalib::string &name, const char *context) { + vespalib::string result; + for (char c: name) { + if (isalnum(c)) { + result.push_back(c); + } else { + result.push_back('_'); + } + } + if (result != name) { + LOG(warning, "normalized %s name: '%s' -> '%s'", context, name.c_str(), result.c_str()); + } + return result; +} + +} + /** * Feature executor that evaluates an onnx model */ @@ -78,23 +97,25 @@ OnnxBlueprint::setup(const IIndexEnvironment &env, Onnx::WirePlanner planner; for (size_t i = 0; i < _model->inputs().size(); ++i) { const auto &model_input = _model->inputs()[i]; - if (auto maybe_input = defineInput(fmt("rankingExpression(\"%s\")", model_input.name.c_str()), AcceptInput::OBJECT)) { + vespalib::string input_name = normalize_name(model_input.name, "input"); + if (auto maybe_input = defineInput(fmt("rankingExpression(\"%s\")", input_name.c_str()), AcceptInput::OBJECT)) { const FeatureType &feature_input = maybe_input.value(); assert(feature_input.is_object()); if (!planner.bind_input_type(feature_input.type(), model_input)) { - return fail("incompatible type for input '%s': %s -> %s", model_input.name.c_str(), + return fail("incompatible type for input '%s': %s -> %s", input_name.c_str(), feature_input.type().to_spec().c_str(), model_input.type_as_string().c_str()); } } } for (size_t i = 0; i < _model->outputs().size(); ++i) { const auto &model_output = _model->outputs()[i]; + vespalib::string output_name = normalize_name(model_output.name, "output"); ValueType output_type = planner.make_output_type(model_output); if (output_type.is_error()) { return fail("unable to make compatible type for output '%s': %s -> error", - model_output.name.c_str(), model_output.type_as_string().c_str()); + output_name.c_str(), model_output.type_as_string().c_str()); } - describeOutput(model_output.name, "output from onnx model", FeatureType::object(output_type)); + describeOutput(output_name, "output from onnx model", FeatureType::object(output_type)); } _wire_info = planner.get_wire_info(*_model); return true; diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.h b/searchlib/src/vespa/searchlib/transactionlog/common.h index db8b9727daa..f3276027078 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/common.h +++ b/searchlib/src/vespa/searchlib/transactionlog/common.h @@ -72,7 +72,6 @@ public: Packet(size_t m=0xf000) : _count(0), _range(), _limit(m), _buf(m) { } Packet(const void * buf, size_t sz); bool add(const Entry & data); - void close() { } void clear() { _buf.clear(); _count = 0; _range.from(0); _range.to(0); } const SerialNumRange & range() const { return _range; } const vespalib::nbostream & getHandle() const { return _buf; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp index 5e7cfc74199..804a558789e 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp @@ -4,11 +4,14 @@ #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/util/closuretask.h> #include <vespa/vespalib/io/fileutil.h> +#include <vespa/vespalib/util/lambdatask.h> #include <vespa/fastos/file.h> #include <algorithm> #include <thread> #include <vespa/log/log.h> +#include <vespa/vespalib/util/threadstackexecutor.h> + LOG_SETUP(".transactionlog.domain"); using vespalib::string; @@ -20,29 +23,43 @@ using vespalib::Monitor; using vespalib::MonitorGuard; using search::common::FileHeaderContext; using std::runtime_error; -using namespace std::chrono_literals; +using std::make_shared; namespace search::transactionlog { -Domain::Domain(const string &domainName, const string & baseDir, Executor & commitExecutor, - Executor & sessionExecutor, uint64_t domainPartSize, DomainPart::Crc defaultCrcType, - const FileHeaderContext &fileHeaderContext) : - _defaultCrcType(defaultCrcType), - _commitExecutor(commitExecutor), - _sessionExecutor(sessionExecutor), - _sessionId(1), - _syncMonitor(), - _pendingSync(false), - _name(domainName), - _domainPartSize(domainPartSize), - _parts(), - _lock(), - _sessionLock(), - _sessions(), - _maxSessionRunTime(), - _baseDir(baseDir), - _fileHeaderContext(fileHeaderContext), - _markedDeleted(false) +VESPA_THREAD_STACK_TAG(domain_commit_executor); + +DomainConfig::DomainConfig() + : _encoding(Encoding::Crc::xxh64, Encoding::Compression::none), + _compressionLevel(9), + _partSizeLimit(0x10000000), // 256M + _chunkSizeLimit(0x40000), // 256k + _chunkAgeLimit(10ms) +{ } + +Domain::Domain(const string &domainName, const string & baseDir, FastOS_ThreadPool & threadPool, + Executor & commitExecutor, Executor & sessionExecutor, const DomainConfig & cfg, + const FileHeaderContext &fileHeaderContext) + : _config(cfg), + _lastSerial(0), + _threadPool(threadPool), + _singleCommiter(std::make_unique<vespalib::ThreadStackExecutor>(1, 128*1024)), + _commitExecutor(commitExecutor), + _sessionExecutor(sessionExecutor), + _sessionId(1), + _syncMonitor(), + _pendingSync(false), + _name(domainName), + _parts(), + _lock(), + _currentChunkMonitor(), + _sessionLock(), + _sessions(), + _maxSessionRunTime(), + _baseDir(baseDir), + _fileHeaderContext(fileHeaderContext), + _markedDeleted(false), + _self(nullptr) { int retval(0); if ((retval = makeDirectory(_baseDir.c_str())) != 0) { @@ -60,13 +77,22 @@ Domain::Domain(const string &domainName, const string & baseDir, Executor & comm } _sessionExecutor.sync(); if (_parts.empty() || _parts.crbegin()->second->isClosed()) { - _parts[lastPart] = std::make_shared<DomainPart>(_name, dir(), lastPart, _defaultCrcType, _fileHeaderContext, false); + _parts[lastPart] = std::make_shared<DomainPart>(_name, dir(), lastPart, _config.getEncoding(), + _config.getCompressionlevel(), _fileHeaderContext, false); vespalib::File::sync(dir()); } + _lastSerial = end(); +} + +Domain & +Domain::setConfig(const DomainConfig & cfg) { + _config = cfg; + return *this; } void Domain::addPart(int64_t partId, bool isLastPart) { - auto dp = std::make_shared<DomainPart>(_name, dir(), partId, _defaultCrcType, _fileHeaderContext, isLastPart); + auto dp = std::make_shared<DomainPart>(_name, dir(), partId, _config.getEncoding(), + _config.getCompressionlevel(), _fileHeaderContext, isLastPart); if (dp->size() == 0) { // Only last domain part is allowed to be truncated down to // empty size. @@ -285,18 +311,21 @@ waitPendingSync(vespalib::Monitor &syncMonitor, bool &pendingSync) } -void Domain::commit(const Packet & packet) +void +Domain::commit(const Packet & packet, Writer::DoneCallback onDone) { + (void) onDone; DomainPart::SP dp(_parts.rbegin()->second); vespalib::nbostream_longlivedbuf is(packet.getHandle().data(), packet.getHandle().size()); Packet::Entry entry; entry.deserialize(is); - if (dp->byteSize() > _domainPartSize) { + if (dp->byteSize() > _config.getPartSizeLimit()) { waitPendingSync(_syncMonitor, _pendingSync); triggerSyncNow(); waitPendingSync(_syncMonitor, _pendingSync); dp->close(); - dp = std::make_shared<DomainPart>(_name, dir(), entry.serial(), _defaultCrcType, _fileHeaderContext, false); + dp = std::make_shared<DomainPart>(_name, dir(), entry.serial(), _config.getEncoding(), + _config.getCompressionlevel(), _fileHeaderContext, false); { LockGuard guard(_lock); _parts[entry.serial()] = dp; diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h index d6f964d5140..cc7f4ac5e4b 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domain.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h @@ -4,20 +4,45 @@ #include "domainpart.h" #include "session.h" #include <vespa/vespalib/util/threadexecutor.h> +#include <vespa/vespalib/util/time.h> +#include <vespa/fastos/thread.h> #include <chrono> namespace search::transactionlog { +class DomainConfig { +public: + using duration = vespalib::duration; + DomainConfig(); + DomainConfig & setEncoding(Encoding v) { _encoding = v; return *this; } + DomainConfig & setPartSizeLimit(size_t v) { _partSizeLimit = v; return *this; } + DomainConfig & setChunkSizeLimit(size_t v) { _chunkSizeLimit = v; return *this; } + DomainConfig & setChunkAgeLimit(vespalib::duration v) { _chunkAgeLimit = v; return *this; } + DomainConfig & setCompressionLevel(uint8_t v) { _compressionLevel = v; return *this; } + Encoding getEncoding() const { return _encoding; } + size_t getPartSizeLimit() const { return _partSizeLimit; } + size_t getChunkSizeLimit() const { return _chunkSizeLimit; } + duration getChunkAgeLimit() const { return _chunkAgeLimit; } + uint8_t getCompressionlevel() const { return _compressionLevel; } +private: + Encoding _encoding; + uint8_t _compressionLevel; + size_t _partSizeLimit; + size_t _chunkSizeLimit; + duration _chunkAgeLimit; +}; + struct PartInfo { SerialNumRange range; size_t numEntries; size_t byteSize; vespalib::string file; - PartInfo(SerialNumRange range_in, size_t numEntries_in, - size_t byteSize_in, - vespalib::stringref file_in) - : range(range_in), numEntries(numEntries_in), byteSize(byteSize_in), - file(file_in) {} + PartInfo(SerialNumRange range_in, size_t numEntries_in, size_t byteSize_in, vespalib::stringref file_in) + : range(range_in), + numEntries(numEntries_in), + byteSize(byteSize_in), + file(file_in) + {} }; struct DomainInfo { @@ -40,17 +65,17 @@ class Domain public: using SP = std::shared_ptr<Domain>; using Executor = vespalib::SyncableThreadExecutor; - Domain(const vespalib::string &name, const vespalib::string &baseDir, Executor & commitExecutor, - Executor & sessionExecutor, uint64_t domainPartSize, DomainPart::Crc defaultCrcType, + Domain(const vespalib::string &name, const vespalib::string &baseDir, FastOS_ThreadPool & threadPool, + Executor & commitExecutor, Executor & sessionExecutor, const DomainConfig & cfg, const common::FileHeaderContext &fileHeaderContext); - virtual ~Domain(); + ~Domain(); DomainInfo getDomainInfo() const; const vespalib::string & name() const { return _name; } bool erase(SerialNum to); - void commit(const Packet & packet); + void commit(const Packet & packet, Writer::DoneCallback onDone); int visit(const Domain::SP & self, SerialNum from, SerialNum to, std::unique_ptr<Session::Destination> dest); SerialNum begin() const; @@ -77,7 +102,7 @@ public: return _sessionExecutor.execute(std::move(task)); } uint64_t size() const; - + Domain & setConfig(const DomainConfig & cfg); private: SerialNum begin(const vespalib::LockGuard & guard) const; SerialNum end(const vespalib::LockGuard & guard) const; @@ -95,22 +120,26 @@ private: using DomainPartList = std::map<int64_t, DomainPart::SP>; using DurationSeconds = std::chrono::duration<double>; - DomainPart::Crc _defaultCrcType; - Executor & _commitExecutor; - Executor & _sessionExecutor; - std::atomic<int> _sessionId; - vespalib::Monitor _syncMonitor; - bool _pendingSync; - vespalib::string _name; - uint64_t _domainPartSize; - DomainPartList _parts; - vespalib::Lock _lock; - vespalib::Lock _sessionLock; - SessionList _sessions; - DurationSeconds _maxSessionRunTime; - vespalib::string _baseDir; + DomainConfig _config; + SerialNum _lastSerial; + FastOS_ThreadPool & _threadPool; + std::unique_ptr<Executor> _singleCommiter; + Executor & _commitExecutor; + Executor & _sessionExecutor; + std::atomic<int> _sessionId; + vespalib::Monitor _syncMonitor; + bool _pendingSync; + vespalib::string _name; + DomainPartList _parts; + vespalib::Lock _lock; + vespalib::Monitor _currentChunkMonitor; + vespalib::Lock _sessionLock; + SessionList _sessions; + DurationSeconds _maxSessionRunTime; + vespalib::string _baseDir; const common::FileHeaderContext &_fileHeaderContext; - bool _markedDeleted; + bool _markedDeleted; + FastOS_ThreadInterface * _self; }; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp index 91599f8218a..3829a978ec4 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp @@ -27,30 +27,19 @@ namespace search::transactionlog { namespace { -void -handleSync(FastOS_FileInterface &file) __attribute__ ((noinline)); +constexpr size_t TARGET_PACKET_SIZE = 0x3f000; string -handleWriteError(const char *text, - FastOS_FileInterface &file, - int64_t lastKnownGoodPos, - const Packet::Entry &entry, - int bufLen) __attribute__ ((noinline)); - -bool -handleReadError(const char *text, - FastOS_FileInterface &file, - ssize_t len, - ssize_t rlen, - int64_t lastKnownGoodPos, - bool allowTruncate) __attribute__ ((noinline)); +handleWriteError(const char *text, FastOS_FileInterface &file, int64_t lastKnownGoodPos, + SerialNumRange range, int bufLen) __attribute__ ((noinline)); bool -addPacket(Packet &packet, - const Packet::Entry &e) __attribute__ ((noinline)); +handleReadError(const char *text, FastOS_FileInterface &file, ssize_t len, ssize_t rlen, + int64_t lastKnownGoodPos, bool allowTruncate) __attribute__ ((noinline)); -bool -tailOfFileIsZero(FastOS_FileInterface &file, int64_t lastKnownGoodPos) __attribute__ ((noinline)); +void handleSync(FastOS_FileInterface &file) __attribute__ ((noinline)); +bool addPacket(Packet &packet, const Packet::Entry &e) __attribute__ ((noinline)); +bool tailOfFileIsZero(FastOS_FileInterface &file, int64_t lastKnownGoodPos) __attribute__ ((noinline)); bool addPacket(Packet &packet, const Packet::Entry &e) @@ -72,21 +61,18 @@ handleSync(FastOS_FileInterface &file) } string -handleWriteError(const char *text, - FastOS_FileInterface &file, - int64_t lastKnownGoodPos, - const Packet::Entry &entry, - int bufLen) +handleWriteError(const char *text, FastOS_FileInterface &file, int64_t lastKnownGoodPos, + SerialNumRange range, int bufLen) { string last(FastOS_File::getLastErrorString()); - string e(make_string("%s. File '%s' at position %" PRId64 " for entry %" PRIu64 " of length %u. " + string e(make_string("%s. File '%s' at position %" PRId64 " for entries [%" PRIu64 ", %" PRIu64 "] of length %u. " "OS says '%s'. Rewind to last known good position %" PRId64 ".", - text, file.GetFileName(), file.GetPosition(), entry.serial(), bufLen, + text, file.GetFileName(), file.GetPosition(), range.from(), range.to(), bufLen, last.c_str(), lastKnownGoodPos)); LOG(error, "%s", e.c_str()); if ( ! file.SetPosition(lastKnownGoodPos) ) { last = FastOS_File::getLastErrorString(); - throw runtime_error(make_string("Failed setting position %" PRId64 " of file '%s' of size %" PRId64 ": OS says '%s'", + throw runtime_error(make_string("Failed setting position %" PRId64 " of file '%s' of size %" PRId64 " : OS says '%s'", lastKnownGoodPos, file.GetFileName(), file.GetSize(), last.c_str())); } handleSync(file); @@ -118,12 +104,8 @@ tailOfFileIsZero(FastOS_FileInterface &file, int64_t lastKnownGoodPos) } bool -handleReadError(const char *text, - FastOS_FileInterface &file, - ssize_t len, - ssize_t rlen, - int64_t lastKnownGoodPos, - bool allowTruncate) +handleReadError(const char *text, FastOS_FileInterface &file, ssize_t len, ssize_t rlen, + int64_t lastKnownGoodPos, bool allowTruncate) { bool retval(true); if (rlen != -1) { @@ -176,6 +158,20 @@ handleReadError(const char *text, return retval; } +int32_t +calcCrc(Encoding::Crc version, const void * buf, size_t sz) +{ + if (version == Encoding::Crc::xxh64) { + return static_cast<int32_t>(XXH64(buf, sz, 0ll)); + } else if (version == Encoding::Crc::ccitt_crc32) { + vespalib::crc_32_type calculator; + calculator.process_bytes(buf, sz); + return calculator.checksum(); + } else { + LOG_ABORT("should not be reached"); + } +} + } int64_t @@ -251,7 +247,6 @@ DomainPart::buildPacketMapping(bool allowTruncate) } } } - packet.close(); if (!packet.empty()) { _packets[firstSerial] = packet; _range.to(lastSerial); @@ -265,22 +260,23 @@ DomainPart::buildPacketMapping(bool allowTruncate) return currPos; } -DomainPart::DomainPart(const string & name, const string & baseDir, SerialNum s, Crc defaultCrc, - const FileHeaderContext &fileHeaderContext, bool allowTruncate) : - _defaultCrc(defaultCrc), - _lock(), - _fileLock(), - _range(s), - _sz(0), - _byteSize(0), - _packets(), - _fileName(make_string("%s/%s-%016" PRIu64, baseDir.c_str(), name.c_str(), s)), - _transLog(std::make_unique<FastOS_File>(_fileName.c_str())), - _skipList(), - _headerLen(0), - _writeLock(), - _writtenSerial(0), - _syncedSerial(0) +DomainPart::DomainPart(const string & name, const string & baseDir, SerialNum s, Encoding encoding, + uint8_t compressionLevel, const FileHeaderContext &fileHeaderContext, bool allowTruncate) + : _encoding(encoding.getCrc(), Encoding::Compression::none), //TODO We do not yet support compression + _compressionLevel(compressionLevel), + _lock(), + _fileLock(), + _range(s), + _sz(0), + _byteSize(0), + _packets(), + _fileName(make_string("%s/%s-%016" PRIu64, baseDir.c_str(), name.c_str(), s)), + _transLog(std::make_unique<FastOS_File>(_fileName.c_str())), + _skipList(), + _headerLen(0), + _writeLock(), + _writtenSerial(0), + _syncedSerial(0) { if (_transLog->OpenReadOnly()) { int64_t currPos = buildPacketMapping(allowTruncate); @@ -519,12 +515,9 @@ DomainPart::visit(SerialNumRange &r, Packet &packet) } } } - newPacket.close(); packet = newPacket; retval = next != _packets.end(); } - } else { - packet.close(); } } else { /// File has been closed must continue from file. @@ -568,7 +561,6 @@ DomainPart::visit(FastOS_FileInterface &file, SerialNumRange &r, Packet &packet) } } } - newPacket.close(); packet = newPacket; } @@ -582,19 +574,19 @@ DomainPart::write(FastOS_FileInterface &file, const Packet::Entry &entry) int32_t crc(0); uint32_t len(entry.serializedSize() + sizeof(crc)); nbostream os; - os << static_cast<uint8_t>(_defaultCrc); + os << static_cast<uint8_t>(_encoding.getRaw()); os << len; size_t start(os.size()); entry.serialize(os); size_t end(os.size()); - crc = calcCrc(_defaultCrc, os.data() + start, end - start); + crc = calcCrc(_encoding.getCrc(), os.data() + start, end - start); os << crc; size_t osSize = os.size(); assert(osSize == len + sizeof(len) + sizeof(uint8_t)); LockGuard guard(_writeLock); if ( ! file.CheckedWrite(os.data(), osSize) ) { - throw runtime_error(handleWriteError("Failed writing the entry.", file, lastKnownGoodPos, entry, end - start)); + throw runtime_error(handleWriteError("Failed writing the entry.", file, lastKnownGoodPos, SerialNumRange(entry.serial(), entry.serial()), end - start)); } _writtenSerial = entry.serial(); _byteSize.store(lastKnownGoodPos + osSize, std::memory_order_release); @@ -615,10 +607,10 @@ DomainPart::read(FastOS_FileInterface &file, uint32_t len(0); his >> version >> len; if ((retval = (rlen == sizeof(tmp)))) { - if ( ! (retval = (version == ccitt_crc32) || version == xxh64)) { + if ( ! (retval = (version == Encoding::Crc::ccitt_crc32) || version == Encoding::Crc::xxh64)) { string msg(make_string("Version mismatch. Expected 'ccitt_crc32=1' or 'xxh64=2'," - " got %d from '%s' at position %" PRId64, - version, file.GetFileName(), lastKnownGoodPos)); + " got %d from '%s' at position %" PRId64, + version, file.GetFileName(), lastKnownGoodPos)); if ((version == 0) && (len == 0) && tailOfFileIsZero(file, lastKnownGoodPos)) { LOG(warning, "%s", msg.c_str()); return handleReadError("packet version", file, sizeof(tmp), rlen, lastKnownGoodPos, allowTruncate); @@ -638,7 +630,7 @@ DomainPart::read(FastOS_FileInterface &file, entry.deserialize(is); int32_t crc(0); is >> crc; - int32_t crcVerify(calcCrc(static_cast<Crc>(version), buf.get(), len - sizeof(crc))); + int32_t crcVerify(calcCrc(Encoding(version).getCrc(), buf.get(), len - sizeof(crc))); if (crc != crcVerify) { throw runtime_error(make_string("Got bad crc for packet from '%s' (len pos=%" PRId64 ", len=%d) : crcVerify = %d, expected %d", file.GetFileName(), file.GetPosition() - len - sizeof(len), @@ -655,17 +647,4 @@ DomainPart::read(FastOS_FileInterface &file, return retval; } -int32_t DomainPart::calcCrc(Crc version, const void * buf, size_t sz) -{ - if (version == xxh64) { - return static_cast<int32_t>(XXH64(buf, sz, 0ll)); - } else if (version == ccitt_crc32) { - vespalib::crc_32_type calculator; - calculator.process_bytes(buf, sz); - return calculator.checksum(); - } else { - LOG_ABORT("should not be reached"); - } -} - } diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h index 59d0df6df94..f3a53c1e9a9 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h +++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h @@ -2,6 +2,7 @@ #pragma once #include "common.h" +#include "ichunk.h" #include <vespa/vespalib/util/sync.h> #include <vespa/vespalib/util/memory.h> #include <map> @@ -19,13 +20,9 @@ private: DomainPart& operator=(const DomainPart &); public: - enum Crc { - ccitt_crc32=1, - xxh64=2 - }; typedef std::shared_ptr<DomainPart> SP; - DomainPart(const vespalib::string &name, const vespalib::string &baseDir, SerialNum s, Crc defaultCrc, - const common::FileHeaderContext &FileHeaderContext, bool allowTruncate); + DomainPart(const vespalib::string &name, const vespalib::string &baseDir, SerialNum s, Encoding defaultEncoding, + uint8_t compressionLevel, const common::FileHeaderContext &FileHeaderContext, bool allowTruncate); ~DomainPart(); @@ -55,7 +52,6 @@ private: static bool read(FastOS_FileInterface &file, Packet::Entry &entry, vespalib::alloc::Alloc &buf, bool allowTruncate); void write(FastOS_FileInterface &file, const Packet::Entry &entry); - static int32_t calcCrc(Crc crc, const void * buf, size_t len); void writeHeader(const common::FileHeaderContext &fileHeaderContext); class SkipInfo @@ -77,21 +73,22 @@ private: }; typedef std::vector<SkipInfo> SkipList; typedef std::map<SerialNum, Packet> PacketList; - const Crc _defaultCrc; - vespalib::Lock _lock; - vespalib::Lock _fileLock; - SerialNumRange _range; - size_t _sz; + const Encoding _encoding; + const uint8_t _compressionLevel; + vespalib::Lock _lock; + vespalib::Lock _fileLock; + SerialNumRange _range; + size_t _sz; std::atomic<uint64_t> _byteSize; - PacketList _packets; - vespalib::string _fileName; + PacketList _packets; + vespalib::string _fileName; std::unique_ptr<FastOS_FileInterface> _transLog; - SkipList _skipList; - uint32_t _headerLen; - vespalib::Lock _writeLock; + SkipList _skipList; + uint32_t _headerLen; + vespalib::Lock _writeLock; // Protected by _writeLock - SerialNum _writtenSerial; - SerialNum _syncedSerial; + SerialNum _writtenSerial; + SerialNum _syncedSerial; }; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h index 4aa1b0a5a90..5e44815cb1b 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h +++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h @@ -22,7 +22,7 @@ public: lz4 = 2, zstd = 3 }; - Encoding(uint8_t raw) : _raw(raw) {} + explicit Encoding(uint8_t raw) : _raw(raw) { } Encoding(Crc crc, Compression compression); Crc getCrc() const { return Crc(_raw & 0xf); } Compression getCompression() const { return Compression((_raw >> 4) & 0xf); } diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.h b/searchlib/src/vespa/searchlib/transactionlog/session.h index bf35d83c000..9b8d23371e8 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/session.h +++ b/searchlib/src/vespa/searchlib/transactionlog/session.h @@ -25,7 +25,7 @@ private: public: class Destination { public: - virtual ~Destination() {} + virtual ~Destination() = default; virtual bool send(int32_t id, const vespalib::string & domain, const Packet & packet) = 0; virtual bool sendDone(int32_t id, const vespalib::string & domain) = 0; virtual bool connected() const = 0; diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index caef792704a..b98453a7648 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -1,5 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "translogserver.h" +#include <vespa/searchlib/common/gatecallback.h> #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/util/time.h> @@ -78,25 +79,25 @@ SyncHandler::PerformTask() TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, const FileHeaderContext &fileHeaderContext) - : TransLogServer(name, listenPort, baseDir, fileHeaderContext, 0x10000000) + : TransLogServer(name, listenPort, baseDir, fileHeaderContext, + DomainConfig().setEncoding(Encoding(Encoding::xxh64, Encoding::Compression::none)) + .setPartSizeLimit(0x10000000).setChunkSizeLimit(0x40000).setChunkAgeLimit( 100us)) {} TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, - const FileHeaderContext &fileHeaderContext, uint64_t domainPartSize) - : TransLogServer(name, listenPort, baseDir, fileHeaderContext, domainPartSize, 4, DomainPart::Crc::xxh64) + const FileHeaderContext &fileHeaderContext, const DomainConfig & cfg) + : TransLogServer(name, listenPort, baseDir, fileHeaderContext, cfg, 4) {} TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, - const FileHeaderContext &fileHeaderContext, uint64_t domainPartSize, - size_t maxThreads, DomainPart::Crc defaultCrcType) + const FileHeaderContext &fileHeaderContext, const DomainConfig & cfg, size_t maxThreads) : FRT_Invokable(), _name(name), _baseDir(baseDir), - _domainPartSize(domainPartSize), - _defaultCrcType(defaultCrcType), + _domainConfig(cfg), _commitExecutor(maxThreads, 128*1024), _sessionExecutor(maxThreads, 128*1024), - _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)), + _threadPool(std::make_unique<FastOS_ThreadPool>(1024*120)), _transport(std::make_unique<FNET_Transport>()), _supervisor(std::make_unique<FRT_Supervisor>(_transport.get())), _domains(), @@ -112,8 +113,8 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con domainDir >> domainName; if ( ! domainName.empty()) { try { - auto domain = std::make_shared<Domain>(domainName, dir(), _commitExecutor, _sessionExecutor, - _domainPartSize, _defaultCrcType,_fileHeaderContext); + auto domain = make_shared<Domain>(domainName, dir(), *_threadPool, _commitExecutor, + _sessionExecutor, cfg, _fileHeaderContext); _domains[domain->name()] = domain; } catch (const std::exception & e) { LOG(warning, "Failed creating %s domain on startup. Exception = %s", domainName.c_str(), e.what()); @@ -202,10 +203,21 @@ TransLogServer::run() req->Return(); } } - } while (running() && !(hasPacket && (req == NULL))); + } while (running() && !(hasPacket && (req == nullptr))); LOG(info, "TLS Stopped"); } + +TransLogServer & +TransLogServer::setDomainConfig(const DomainConfig & cfg) { + Guard domainGuard(_lock); + _domainConfig = cfg; + for(auto &domain: _domains) { + domain.second->setConfig(cfg); + } + return *this; +} + DomainStats TransLogServer::getDomainStats() const { @@ -434,8 +446,8 @@ TransLogServer::createDomain(FRT_RPCRequest *req) Domain::SP domain(findDomain(domainName)); if ( !domain ) { try { - domain = std::make_shared<Domain>(domainName, dir(), _commitExecutor, _sessionExecutor, - _domainPartSize, _defaultCrcType, _fileHeaderContext); + domain = std::make_shared<Domain>(domainName, dir(), *_threadPool, _commitExecutor, + _sessionExecutor, _domainConfig, _fileHeaderContext); Guard domainGuard(_lock); _domains[domain->name()] = domain; writeDomainDir(domainGuard, dir(), domainList(), _domains); @@ -544,10 +556,9 @@ TransLogServer::domainStatus(FRT_RPCRequest *req) void TransLogServer::commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done) { - (void) done; Domain::SP domain(findDomain(domainName)); if (domain) { - domain->commit(packet); + domain->commit(packet, std::move(done)); } else { throw IllegalArgumentException("Could not find domain " + domainName); } @@ -564,7 +575,9 @@ TransLogServer::domainCommit(FRT_RPCRequest *req) if (domain) { Packet packet(params[1]._data._buf, params[1]._data._len); try { - domain->commit(packet); + vespalib::Gate gate; + domain->commit(packet, make_shared<GateCallback>(gate)); + gate.await(); ret.AddInt32(0); ret.AddString("ok"); } catch (const std::exception & e) { diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h index 0d65f36e07d..3f945977386 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h @@ -26,16 +26,15 @@ public: typedef std::shared_ptr<TransLogServer> SP; TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, - const common::FileHeaderContext &fileHeaderContext, - uint64_t domainPartSize, size_t maxThreads, DomainPart::Crc defaultCrc); + const common::FileHeaderContext &fileHeaderContext, const DomainConfig & cfg, size_t maxThreads); TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, - const common::FileHeaderContext &fileHeaderContext, uint64_t domainPartSize); + const common::FileHeaderContext &fileHeaderContext, const DomainConfig & cfg); TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir, const common::FileHeaderContext &fileHeaderContext); ~TransLogServer() override; DomainStats getDomainStats() const; - void commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done) override; + TransLogServer & setDomainConfig(const DomainConfig & cfg); class Session { @@ -82,8 +81,7 @@ private: vespalib::string _name; vespalib::string _baseDir; - const uint64_t _domainPartSize; - const DomainPart::Crc _defaultCrcType; + DomainConfig _domainConfig; vespalib::ThreadStackExecutor _commitExecutor; vespalib::ThreadStackExecutor _sessionExecutor; std::unique_ptr<FastOS_ThreadPool> _threadPool; diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp index b8d21fb7465..d83623661ff 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp @@ -2,6 +2,7 @@ #include "translogserverapp.h" #include <vespa/config/subscription/configuri.h> +#include <vespa/vespalib/util/time.h> #include <vespa/log/log.h> LOG_SETUP(".translogserverapp"); @@ -24,16 +25,59 @@ TransLogServerApp::TransLogServerApp(const config::ConfigUri & tlsConfigUri, namespace { -DomainPart::Crc +Encoding::Crc getCrc(searchlib::TranslogserverConfig::Crcmethod crcType) { switch (crcType) { case searchlib::TranslogserverConfig::Crcmethod::ccitt_crc32: - return DomainPart::ccitt_crc32; + return Encoding::Crc::ccitt_crc32; case searchlib::TranslogserverConfig::Crcmethod::xxh64: - return DomainPart::xxh64; + return Encoding::Crc::xxh64; } - LOG_ABORT("should not be reached"); + assert(false); +} + +Encoding::Compression +getCompression(searchlib::TranslogserverConfig::Compression::Type type) +{ + switch (type) { + case searchlib::TranslogserverConfig::Compression::Type::NONE: + return Encoding::Compression::none; + case searchlib::TranslogserverConfig::Compression::Type::NONE_MULTI: + return Encoding::Compression::none_multi; + case searchlib::TranslogserverConfig::Compression::Type::LZ4: + return Encoding::Compression::lz4; + case searchlib::TranslogserverConfig::Compression::Type::ZSTD: + return Encoding::Compression::zstd; + } + assert(false); +} + +Encoding +getEncoding(const searchlib::TranslogserverConfig & cfg) +{ + return Encoding(getCrc(cfg.crcmethod), getCompression(cfg.compression.type)); +} + +DomainConfig +getDomainConfig(const searchlib::TranslogserverConfig & cfg) { + DomainConfig dcfg; + dcfg.setEncoding(getEncoding(cfg)) + .setCompressionLevel(cfg.compression.level) + .setPartSizeLimit(cfg.filesizemax) + .setChunkSizeLimit(cfg.chunk.sizelimit) + .setChunkAgeLimit(vespalib::from_s(cfg.chunk.agelimit)); + return dcfg; +} + +void +logReconfig(const searchlib::TranslogserverConfig & cfg, const DomainConfig & dcfg) { + LOG(config, "configure Transaction Log Server %s at port %d\n" + "DomainConfig {encoding={%d, %d}, compression_level=%d, part_limit=%ld, chunk_limit=%ld age=%1.4f}", + cfg.servername.c_str(), cfg.listenport, + dcfg.getEncoding().getCrc(), dcfg.getEncoding().getCompression(), dcfg.getCompressionlevel(), + dcfg.getPartSizeLimit(), dcfg.getChunkSizeLimit(), vespalib::to_s(dcfg.getChunkAgeLimit()) + ); } } @@ -41,11 +85,12 @@ getCrc(searchlib::TranslogserverConfig::Crcmethod crcType) void TransLogServerApp::start() { - std::shared_ptr<searchlib::TranslogserverConfig> c = _tlsConfig.get(); - auto tls = std::make_shared<TransLogServer>(c->servername, c->listenport, c->basedir, _fileHeaderContext, - c->filesizemax, c->maxthreads, getCrc(c->crcmethod)); std::lock_guard<std::mutex> guard(_lock); - _tls = std::move(tls); + auto c = _tlsConfig.get(); + DomainConfig domainConfig = getDomainConfig(*c); + logReconfig(*c, domainConfig); + _tls = std::make_shared<TransLogServer>(c->servername, c->listenport, c->basedir, _fileHeaderContext, + domainConfig, c->maxthreads); } TransLogServerApp::~TransLogServerApp() @@ -56,9 +101,15 @@ TransLogServerApp::~TransLogServerApp() void TransLogServerApp::configure(std::unique_ptr<searchlib::TranslogserverConfig> cfg) { - LOG(config, "configure Transaction Log Server %s at port %d", cfg->servername.c_str(), cfg->listenport); + + std::lock_guard<std::mutex> guard(_lock); + DomainConfig dcfg = getDomainConfig(*cfg); + logReconfig(*cfg, dcfg); _tlsConfig.set(cfg.release()); _tlsConfig.latch(); + if (_tls) { + _tls->setDomainConfig(dcfg); + } } TransLogServer::SP diff --git a/vespajlib/src/main/java/com/yahoo/tensor/IndexedTensor.java b/vespajlib/src/main/java/com/yahoo/tensor/IndexedTensor.java index ad82dd6c3ac..dc17c657db9 100644 --- a/vespajlib/src/main/java/com/yahoo/tensor/IndexedTensor.java +++ b/vespajlib/src/main/java/com/yahoo/tensor/IndexedTensor.java @@ -178,9 +178,7 @@ public abstract class IndexedTensor implements Tensor { @Override public abstract IndexedTensor withType(TensorType type); - public DimensionSizes dimensionSizes() { - return dimensionSizes; - } + public DimensionSizes dimensionSizes() { return dimensionSizes; } @Override public Map<TensorAddress, Double> cells() { diff --git a/vespajlib/src/main/java/com/yahoo/tensor/MixedTensor.java b/vespajlib/src/main/java/com/yahoo/tensor/MixedTensor.java index 1ec4993bf57..f608aead347 100644 --- a/vespajlib/src/main/java/com/yahoo/tensor/MixedTensor.java +++ b/vespajlib/src/main/java/com/yahoo/tensor/MixedTensor.java @@ -152,7 +152,6 @@ public class MixedTensor implements Tensor { return index.denseSubspaceSize(); } - /** * Base class for building mixed tensors. */ diff --git a/vespajlib/src/main/java/com/yahoo/tensor/serialization/TypedBinaryFormat.java b/vespajlib/src/main/java/com/yahoo/tensor/serialization/TypedBinaryFormat.java index bcff4392c9a..5c47572c779 100644 --- a/vespajlib/src/main/java/com/yahoo/tensor/serialization/TypedBinaryFormat.java +++ b/vespajlib/src/main/java/com/yahoo/tensor/serialization/TypedBinaryFormat.java @@ -51,31 +51,41 @@ public class TypedBinaryFormat { } private static BinaryFormat getFormatEncoder(GrowableByteBuffer buffer, Tensor tensor) { - if (tensor instanceof MixedTensor && tensor.type().valueType() == TensorType.Value.DOUBLE) { + boolean hasMappedDimensions = tensor.type().dimensions().stream().anyMatch(d -> d.isMapped()); + boolean hasIndexedDimensions = tensor.type().dimensions().stream().anyMatch(d -> d.isIndexed()); + boolean isMixed = hasMappedDimensions && hasIndexedDimensions; + + // TODO: Encoding as indexed if the implementation is mixed is not yet supported so use mixed format instead + if (tensor instanceof MixedTensor && ! isMixed && hasIndexedDimensions) + isMixed = true; + + if (isMixed && tensor.type().valueType() == TensorType.Value.DOUBLE) { encodeFormatType(buffer, MIXED_BINARY_FORMAT_TYPE); return new MixedBinaryFormat(); } - if (tensor instanceof MixedTensor) { + else if (isMixed) { encodeFormatType(buffer, MIXED_BINARY_FORMAT_WITH_CELLTYPE); encodeValueType(buffer, tensor.type().valueType()); return new MixedBinaryFormat(tensor.type().valueType()); } - if (tensor instanceof IndexedTensor && tensor.type().valueType() == TensorType.Value.DOUBLE) { + else if (hasIndexedDimensions && tensor.type().valueType() == TensorType.Value.DOUBLE) { encodeFormatType(buffer, DENSE_BINARY_FORMAT_TYPE); return new DenseBinaryFormat(); } - if (tensor instanceof IndexedTensor) { + else if (hasIndexedDimensions) { encodeFormatType(buffer, DENSE_BINARY_FORMAT_WITH_CELLTYPE); encodeValueType(buffer, tensor.type().valueType()); return new DenseBinaryFormat(tensor.type().valueType()); } - if (tensor.type().valueType() == TensorType.Value.DOUBLE) { + else if (tensor.type().valueType() == TensorType.Value.DOUBLE) { encodeFormatType(buffer, SPARSE_BINARY_FORMAT_TYPE); return new SparseBinaryFormat(); } - encodeFormatType(buffer, SPARSE_BINARY_FORMAT_WITH_CELLTYPE); - encodeValueType(buffer, tensor.type().valueType()); - return new SparseBinaryFormat(tensor.type().valueType()); + else { + encodeFormatType(buffer, SPARSE_BINARY_FORMAT_WITH_CELLTYPE); + encodeValueType(buffer, tensor.type().valueType()); + return new SparseBinaryFormat(tensor.type().valueType()); + } } private static BinaryFormat getFormatDecoder(GrowableByteBuffer buffer) { diff --git a/vespajlib/src/test/java/com/yahoo/tensor/serialization/SerializationTestCase.java b/vespajlib/src/test/java/com/yahoo/tensor/serialization/SerializationTestCase.java index f002637847b..066a63b6d90 100644 --- a/vespajlib/src/test/java/com/yahoo/tensor/serialization/SerializationTestCase.java +++ b/vespajlib/src/test/java/com/yahoo/tensor/serialization/SerializationTestCase.java @@ -71,7 +71,8 @@ public class SerializationTestCase { serializedToABinaryRepresentation = true; } } - assertTrue("Tensor did not serialize to one of the given representations", serializedToABinaryRepresentation); + assertTrue("Tensor serialized to one of the given representations", + serializedToABinaryRepresentation); } } } diff --git a/vespajlib/src/test/java/com/yahoo/tensor/serialization/SparseBinaryFormatTestCase.java b/vespajlib/src/test/java/com/yahoo/tensor/serialization/SparseBinaryFormatTestCase.java index 9074579094c..50b71024ddf 100644 --- a/vespajlib/src/test/java/com/yahoo/tensor/serialization/SparseBinaryFormatTestCase.java +++ b/vespajlib/src/test/java/com/yahoo/tensor/serialization/SparseBinaryFormatTestCase.java @@ -2,7 +2,9 @@ package com.yahoo.tensor.serialization; import com.yahoo.io.GrowableByteBuffer; +import com.yahoo.tensor.MixedTensor; import com.yahoo.tensor.Tensor; +import com.yahoo.tensor.TensorAddress; import com.yahoo.tensor.TensorType; import org.junit.Test; @@ -31,6 +33,17 @@ public class SparseBinaryFormatTestCase { } @Test + public void testSerializationFormatIsDecidedByTensorTypeNotImplementationType() { + Tensor sparse = Tensor.Builder.of(TensorType.fromSpec("tensor(x{})")) + .cell(TensorAddress.ofLabels("key1"), 9.1).build(); + Tensor sparseAsMixed = MixedTensor.Builder.of(TensorType.fromSpec("tensor(x{})")) + .cell(TensorAddress.ofLabels("key1"), 9.1).build(); + byte[] sparseEncoded = TypedBinaryFormat.encode(sparse); + byte[] sparseAsMixedEncoded = TypedBinaryFormat.encode(sparseAsMixed); + assertEquals(Arrays.toString(sparseEncoded), Arrays.toString(sparseAsMixedEncoded)); + } + + @Test public void testSerializationToSeparateType() { try { assertSerialization(Tensor.from("tensor(x{},y{}):{{x:0,y:0}:2.0}"), TensorType.fromSpec("tensor(x{})")); @@ -55,7 +68,8 @@ public class SparseBinaryFormatTestCase { @Test public void requireThatFloatSerializationFormatDoNotChange() { - byte[] encodedTensor = new byte[] {5, // binary format type + byte[] encodedTensor = new byte[] { + 5, // binary format type 1, // float type 2, // num dimensions 2, (byte)'x', (byte)'y', 1, (byte)'z', // dimensions @@ -63,7 +77,7 @@ public class SparseBinaryFormatTestCase { 2, (byte)'a', (byte)'b', 1, (byte)'e', 64, 0, 0, 0, // cell 0 2, (byte)'c', (byte)'d', 1, (byte)'e', 64, 64, 0, 0}; // cell 1 assertEquals(Arrays.toString(encodedTensor), - Arrays.toString(TypedBinaryFormat.encode(Tensor.from("tensor<float>(xy{},z{}):{{xy:ab,z:e}:2.0,{xy:cd,z:e}:3.0}")))); + Arrays.toString(TypedBinaryFormat.encode(Tensor.from("tensor<float>(xy{},z{}):{{xy:ab,z:e}:2.0,{xy:cd,z:e}:3.0}")))); } @Test |