summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--config-application-package/src/main/java/com/yahoo/config/application/FileSystemWrapper.java50
-rw-r--r--config-application-package/src/main/java/com/yahoo/config/application/IncludeProcessor.java24
-rw-r--r--config-application-package/src/main/java/com/yahoo/config/application/XmlPreProcessor.java7
-rw-r--r--container-search/src/main/java/com/yahoo/search/result/FeatureData.java22
-rw-r--r--container-search/src/test/java/com/yahoo/search/result/FeatureDataTestCase.java8
-rw-r--r--controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/deployment/ApplicationStore.java8
-rw-r--r--controller-server/pom.xml7
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/ApplicationController.java16
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/ApplicationPackage.java156
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/ApplicationPackageTest.java91
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/integration/ApplicationStoreMock.java13
-rw-r--r--searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp3
-rw-r--r--searchlib/src/tests/features/onnx_feature/.gitattributes1
-rw-r--r--searchlib/src/tests/features/onnx_feature/onnx_feature_test.cpp13
-rw-r--r--searchlib/src/tests/features/onnx_feature/strange_names.onnx23
-rwxr-xr-xsearchlib/src/tests/features/onnx_feature/strange_names.py36
-rw-r--r--searchlib/src/tests/transactionlog/translogclient_test.cpp493
-rw-r--r--searchlib/src/tests/transactionlogstress/translogstress.cpp2
-rw-r--r--searchlib/src/vespa/searchlib/config/translogserver.def16
-rw-r--r--searchlib/src/vespa/searchlib/features/onnx_feature.cpp29
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/common.h1
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.cpp79
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domain.h79
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp127
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/domainpart.h35
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/ichunk.h2
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/session.h2
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp45
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.h10
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp69
-rw-r--r--vespajlib/src/main/java/com/yahoo/tensor/IndexedTensor.java4
-rw-r--r--vespajlib/src/main/java/com/yahoo/tensor/MixedTensor.java1
-rw-r--r--vespajlib/src/main/java/com/yahoo/tensor/serialization/TypedBinaryFormat.java26
-rw-r--r--vespajlib/src/test/java/com/yahoo/tensor/serialization/SerializationTestCase.java3
-rw-r--r--vespajlib/src/test/java/com/yahoo/tensor/serialization/SparseBinaryFormatTestCase.java18
35 files changed, 1008 insertions, 511 deletions
diff --git a/config-application-package/src/main/java/com/yahoo/config/application/FileSystemWrapper.java b/config-application-package/src/main/java/com/yahoo/config/application/FileSystemWrapper.java
new file mode 100644
index 00000000000..8a08c56b3c0
--- /dev/null
+++ b/config-application-package/src/main/java/com/yahoo/config/application/FileSystemWrapper.java
@@ -0,0 +1,50 @@
+package com.yahoo.config.application;
+
+import com.yahoo.yolean.function.ThrowingFunction;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Optional;
+import java.util.function.Predicate;
+
+/**
+ * Wraps a real or virtual file system — essentially a mapping from paths to bytes.
+ *
+ * @author jonmv
+ */
+public class FileSystemWrapper {
+
+ Predicate<Path> existence;
+ ThrowingFunction<Path, byte[], IOException> reader;
+
+ private FileSystemWrapper(Predicate<Path> existence, ThrowingFunction<Path, byte[], IOException> reader) {
+ this.existence = existence;
+ this.reader = reader;
+ }
+
+ public static FileSystemWrapper ofFiles(Predicate<Path> existence, ThrowingFunction<Path, byte[], IOException> reader) {
+ return new FileSystemWrapper(existence, reader);
+ }
+
+ public static FileSystemWrapper getDefault() {
+ return ofFiles(Files::exists, Files::readAllBytes);
+ }
+
+ public FileWrapper wrap(Path path) {
+ return new FileWrapper(path);
+ }
+
+
+ public class FileWrapper {
+ private final Path path;
+ private FileWrapper(Path path) { this.path = path; }
+
+ public Path path() { return path; }
+ public boolean exists() { return existence.test(path); }
+ public byte[] content() throws IOException { return reader.apply(path); }
+ public Optional<FileWrapper> parent() { return Optional.ofNullable(path.getParent()).map(path -> wrap(path)); }
+ public FileWrapper child(String name) { return wrap(path.resolve(name)); }
+ }
+
+}
diff --git a/config-application-package/src/main/java/com/yahoo/config/application/IncludeProcessor.java b/config-application-package/src/main/java/com/yahoo/config/application/IncludeProcessor.java
index c2da2a55b48..b09953c738c 100644
--- a/config-application-package/src/main/java/com/yahoo/config/application/IncludeProcessor.java
+++ b/config-application-package/src/main/java/com/yahoo/config/application/IncludeProcessor.java
@@ -1,7 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.config.application;
-import com.yahoo.io.IOUtils;
+import com.yahoo.config.application.FileSystemWrapper.FileWrapper;
import com.yahoo.text.XML;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
@@ -14,6 +14,9 @@ import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
import java.util.List;
+import java.util.NoSuchElementException;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
/**
* Handles preprocess:include statements and returns a Document which has all the include statements resolved
@@ -23,10 +26,14 @@ import java.util.List;
*/
class IncludeProcessor implements PreProcessor {
- private final File application;
+ private final FileWrapper application;
+
public IncludeProcessor(File application) {
- this.application = application;
+ this(FileSystemWrapper.getDefault().wrap(application.toPath()));
+ }
+ public IncludeProcessor(FileWrapper application) {
+ this.application = application;
}
public Document process(Document input) throws IOException, TransformerException {
@@ -35,17 +42,18 @@ class IncludeProcessor implements PreProcessor {
return doc;
}
- private static void includeFile(File currentFolder, Element currentElement) throws IOException {
+ private static void includeFile(FileWrapper currentFolder, Element currentElement) throws IOException {
NodeList list = currentElement.getElementsByTagNameNS(XmlPreProcessor.preprocessNamespaceUri, "include");
while (list.getLength() > 0) {
Element elem = (Element) list.item(0);
Element parent = (Element) elem.getParentNode();
String filename = elem.getAttribute("file");
boolean required = ! elem.hasAttribute("required") || Boolean.parseBoolean(elem.getAttribute("required"));
- File file = new File(currentFolder, filename);
+ FileWrapper file = currentFolder.child(filename);
Document subFile = IncludeProcessor.parseIncludeFile(file, parent.getTagName(), required);
- includeFile(file.getParentFile(), subFile.getDocumentElement());
+ includeFile(file.parent().orElseThrow(() -> new NoSuchElementException(file + " has no parent")),
+ subFile.getDocumentElement());
//System.out.println("document before merging: " + documentAsString(doc));
IncludeProcessor.mergeInto(parent, XML.getChildren(subFile.getDocumentElement()));
@@ -65,12 +73,12 @@ class IncludeProcessor implements PreProcessor {
}
}
- private static Document parseIncludeFile(File file, String parentTagName, boolean required) throws IOException {
+ private static Document parseIncludeFile(FileWrapper file, String parentTagName, boolean required) throws IOException {
StringWriter w = new StringWriter();
final String startTag = "<" + parentTagName + " " + XmlPreProcessor.deployNamespace + "='" + XmlPreProcessor.deployNamespaceUri + "' " + XmlPreProcessor.preprocessNamespace + "='" + XmlPreProcessor.preprocessNamespaceUri + "'>";
w.append(startTag);
if (file.exists() || required) {
- w.append(IOUtils.readFile(file));
+ w.append(new String(file.content(), UTF_8));
}
final String endTag = "</" + parentTagName + ">";
w.append(endTag);
diff --git a/config-application-package/src/main/java/com/yahoo/config/application/XmlPreProcessor.java b/config-application-package/src/main/java/com/yahoo/config/application/XmlPreProcessor.java
index c49a799f2b7..61b67dfb5c6 100644
--- a/config-application-package/src/main/java/com/yahoo/config/application/XmlPreProcessor.java
+++ b/config-application-package/src/main/java/com/yahoo/config/application/XmlPreProcessor.java
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.config.application;
+import com.yahoo.config.application.FileSystemWrapper.FileWrapper;
import com.yahoo.config.provision.Environment;
import com.yahoo.config.provision.InstanceName;
import com.yahoo.config.provision.RegionName;
@@ -31,7 +32,7 @@ public class XmlPreProcessor {
final static String preprocessNamespace = "xmlns:preprocess";
final static String preprocessNamespaceUri = "properties"; //TODO
- private final File applicationDir;
+ private final FileWrapper applicationDir;
private final Reader xmlInput;
private final InstanceName instance;
private final Environment environment;
@@ -43,6 +44,10 @@ public class XmlPreProcessor {
}
public XmlPreProcessor(File applicationDir, Reader xmlInput, InstanceName instance, Environment environment, RegionName region) {
+ this(FileSystemWrapper.getDefault().wrap(applicationDir.toPath()), xmlInput, instance, environment, region);
+ }
+
+ public XmlPreProcessor(FileWrapper applicationDir, Reader xmlInput, InstanceName instance, Environment environment, RegionName region) {
this.applicationDir = applicationDir;
this.xmlInput = xmlInput;
this.instance = instance;
diff --git a/container-search/src/main/java/com/yahoo/search/result/FeatureData.java b/container-search/src/main/java/com/yahoo/search/result/FeatureData.java
index f0637473ef5..fd41d4ee10c 100644
--- a/container-search/src/main/java/com/yahoo/search/result/FeatureData.java
+++ b/container-search/src/main/java/com/yahoo/search/result/FeatureData.java
@@ -74,12 +74,15 @@ public class FeatureData implements Inspectable, JsonProducer {
* (that is, if it is a tensor with nonzero rank)
*/
public Double getDouble(String featureName) {
- if (decodedDoubles != null && decodedDoubles.containsKey(featureName))
- return decodedDoubles.get(featureName);
- Double value = decodeDouble(featureName);
if (decodedDoubles == null)
decodedDoubles = new HashMap<>();
- decodedDoubles.put(featureName, value);
+
+ Double value = decodedDoubles.get(featureName);
+ if (value != null) return value;
+
+ value = decodeDouble(featureName);
+ if (value != null)
+ decodedDoubles.put(featureName, value);
return value;
}
@@ -99,12 +102,15 @@ public class FeatureData implements Inspectable, JsonProducer {
* This will return any feature value: Scalars are returned as a rank 0 tensor.
*/
public Tensor getTensor(String featureName) {
- if (decodedTensors != null && decodedTensors.containsKey(featureName))
- return decodedTensors.get(featureName);
- Tensor value = decodeTensor(featureName);
if (decodedTensors == null)
decodedTensors = new HashMap<>();
- decodedTensors.put(featureName, value);
+
+ Tensor value = decodedTensors.get(featureName);
+ if (value != null) return value;
+
+ value = decodeTensor(featureName);
+ if (value != null)
+ decodedTensors.put(featureName, value);
return value;
}
diff --git a/container-search/src/test/java/com/yahoo/search/result/FeatureDataTestCase.java b/container-search/src/test/java/com/yahoo/search/result/FeatureDataTestCase.java
index f9b89b64e5b..3c8e147029c 100644
--- a/container-search/src/test/java/com/yahoo/search/result/FeatureDataTestCase.java
+++ b/container-search/src/test/java/com/yahoo/search/result/FeatureDataTestCase.java
@@ -11,6 +11,7 @@ import org.junit.Test;
import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
/**
* @author bratseth
@@ -32,14 +33,21 @@ public class FeatureDataTestCase {
FeatureData featureData = new FeatureData(new SlimeAdapter(features));
assertEquals("scalar1,scalar2,tensor1,tensor2",
featureData.featureNames().stream().sorted().collect(Collectors.joining(",")));
+ assertNull(featureData.getDouble("nosuch1"));
assertEquals(1.5, featureData.getDouble("scalar1"), delta);
assertEquals(2.5, featureData.getDouble("scalar2"), delta);
assertEquals("Cached lookup", 2.5, featureData.getDouble("scalar2"), delta);
+ assertNull(featureData.getDouble("nosuch2"));
+ assertNull(featureData.getDouble("nosuch2"));
+
+ assertNull(featureData.getTensor("nosuch1"));
assertEquals(Tensor.from(1.5), featureData.getTensor("scalar1"));
assertEquals(Tensor.from(2.5), featureData.getTensor("scalar2"));
assertEquals(tensor1, featureData.getTensor("tensor1"));
assertEquals(tensor2, featureData.getTensor("tensor2"));
assertEquals("Cached lookup", tensor2, featureData.getTensor("tensor2"));
+ assertNull(featureData.getTensor("nosuch2"));
+ assertNull(featureData.getTensor("nosuch2"));
String expectedJson =
"{" +
diff --git a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/deployment/ApplicationStore.java b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/deployment/ApplicationStore.java
index 28a803fd43d..dd9f8c38802 100644
--- a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/deployment/ApplicationStore.java
+++ b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/integration/deployment/ApplicationStore.java
@@ -5,9 +5,9 @@ import com.yahoo.config.provision.ApplicationId;
import com.yahoo.config.provision.ApplicationName;
import com.yahoo.config.provision.TenantName;
import com.yahoo.config.provision.zone.ZoneId;
+import com.yahoo.vespa.hosted.controller.api.identifiers.DeploymentId;
import java.time.Instant;
-import java.util.Collection;
import java.util.Optional;
/**
@@ -59,6 +59,12 @@ public interface ApplicationStore {
/** Marks the given application as deleted, and eligible for meta data GC at a later time. */
void putMetaTombstone(TenantName tenant, ApplicationName application, Instant now);
+ /** Stores the given manual deployment meta data with the current time as part of the path. */
+ void putMeta(DeploymentId id, Instant now, byte[] metaZip);
+
+ /** Marks the given manual deployment as deleted, and eligible for meta data GC at a later time. */
+ void putMetaTombstone(DeploymentId id, Instant now);
+
/** Prunes meta data such that only what was active at the given instant, and anything newer, is retained. */
void pruneMeta(Instant oldest);
diff --git a/controller-server/pom.xml b/controller-server/pom.xml
index 158c1b5cd39..0ddc1ecd8be 100644
--- a/controller-server/pom.xml
+++ b/controller-server/pom.xml
@@ -28,6 +28,13 @@
<dependency>
<groupId>com.yahoo.vespa</groupId>
+ <artifactId>config-application-package</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
<artifactId>container-dev</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/ApplicationController.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/ApplicationController.java
index 83a1895a85d..d8abc77ed06 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/ApplicationController.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/ApplicationController.java
@@ -527,6 +527,11 @@ public class ApplicationController {
Optional<Quota> quota = billingController.getQuota(application.tenant(), zone.environment());
+ if (zone.environment().isManuallyDeployed())
+ controller.applications().applicationStore().putMeta(new DeploymentId(application, zone),
+ clock.instant(),
+ applicationPackage.metaDataZip());
+
ConfigServer.PreparedApplication preparedApplication =
configServer.deploy(new DeploymentData(application, zone, applicationPackage.zippedContent(), platform,
endpoints, endpointCertificateMetadata, dockerImageRepo, domain, applicationRoles, quota));
@@ -583,13 +588,6 @@ public class ApplicationController {
return application;
}
- private DeployOptions withVersion(Version version, DeployOptions options) {
- return new DeployOptions(options.deployDirectly,
- Optional.of(version),
- options.ignoreValidationErrors,
- options.deployCurrentVersion);
- }
-
/**
* Deletes the the given application. All known instances of the applications will be deleted.
*
@@ -720,12 +718,14 @@ public class ApplicationController {
* @return the application with the deployment in the given zone removed
*/
private LockedApplication deactivate(LockedApplication application, InstanceName instanceName, ZoneId zone) {
+ DeploymentId id = new DeploymentId(application.get().id().instance(instanceName), zone);
try {
- configServer.deactivate(new DeploymentId(application.get().id().instance(instanceName), zone));
+ configServer.deactivate(id);
} catch (NotFoundException ignored) {
// ok; already gone
} finally {
controller.routing().policies().refresh(application.get().id().instance(instanceName), application.get().deploymentSpec(), zone);
+ applicationStore.putMetaTombstone(id, clock.instant());
}
return application.with(instanceName, instance -> instance.withoutDeploymentIn(zone));
}
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/ApplicationPackage.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/ApplicationPackage.java
index b246e10ca82..c29bc3f3f5e 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/ApplicationPackage.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/ApplicationPackage.java
@@ -1,12 +1,17 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.hosted.controller.application;
-import com.google.common.collect.ImmutableMap;
import com.google.common.hash.Hashing;
import com.yahoo.component.Version;
+import com.yahoo.config.application.FileSystemWrapper;
+import com.yahoo.config.application.FileSystemWrapper.FileWrapper;
+import com.yahoo.config.application.XmlPreProcessor;
import com.yahoo.config.application.api.DeploymentSpec;
import com.yahoo.config.application.api.ValidationId;
import com.yahoo.config.application.api.ValidationOverrides;
+import com.yahoo.config.provision.Environment;
+import com.yahoo.config.provision.InstanceName;
+import com.yahoo.config.provision.RegionName;
import com.yahoo.security.X509CertificateUtils;
import com.yahoo.slime.Inspector;
import com.yahoo.slime.Slime;
@@ -16,10 +21,10 @@ import com.yahoo.yolean.Exceptions;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
-import java.io.IOException;
import java.io.InputStreamReader;
-import java.io.Reader;
-import java.io.UncheckedIOException;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.security.cert.X509Certificate;
import java.time.Duration;
import java.time.Instant;
@@ -32,11 +37,11 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.toMap;
/**
* A representation of the content of an application package.
@@ -46,6 +51,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
* This is immutable.
*
* @author bratseth
+ * @author jonmv
*/
public class ApplicationPackage {
@@ -59,7 +65,7 @@ public class ApplicationPackage {
private final byte[] zippedContent;
private final DeploymentSpec deploymentSpec;
private final ValidationOverrides validationOverrides;
- private final Files files;
+ private final ZipArchiveCache files;
private final Optional<Version> compileVersion;
private final Optional<Instant> buildTime;
private final List<X509Certificate> trustedCertificates;
@@ -82,14 +88,14 @@ public class ApplicationPackage {
public ApplicationPackage(byte[] zippedContent, boolean requireFiles) {
this.zippedContent = Objects.requireNonNull(zippedContent, "The application package content cannot be null");
this.contentHash = Hashing.sha1().hashBytes(zippedContent).toString();
- this.files = Files.extract(Set.of(deploymentFile, validationOverridesFile, servicesFile, buildMetaFile, trustedCertificatesFile), zippedContent);
+ this.files = new ZipArchiveCache(zippedContent, Set.of(deploymentFile, validationOverridesFile, servicesFile, buildMetaFile, trustedCertificatesFile));
- Optional<DeploymentSpec> deploymentSpec = files.getAsReader(deploymentFile).map(DeploymentSpec::fromXml);
+ Optional<DeploymentSpec> deploymentSpec = files.get(deploymentFile).map(bytes -> new String(bytes, UTF_8)).map(DeploymentSpec::fromXml);
if (requireFiles && deploymentSpec.isEmpty())
throw new IllegalArgumentException("Missing required file '" + deploymentFile + "'");
this.deploymentSpec = deploymentSpec.orElse(DeploymentSpec.empty);
- this.validationOverrides = files.getAsReader(validationOverridesFile).map(ValidationOverrides::fromXml).orElse(ValidationOverrides.empty);
+ this.validationOverrides = files.get(validationOverridesFile).map(bytes -> new String(bytes, UTF_8)).map(ValidationOverrides::fromXml).orElse(ValidationOverrides.empty);
Optional<Inspector> buildMetaObject = files.get(buildMetaFile).map(SlimeUtils::jsonToSlime).map(Slime::get);
if (requireFiles && buildMetaObject.isEmpty())
@@ -151,68 +157,42 @@ public class ApplicationPackage {
}
}
- private static class Files {
-
- /** Max size of each extracted file */
- private static final int maxSize = 10 * 1024 * 1024; // 10 MiB
-
- // TODO: Vespa 8: Remove application/ directory support
- private static final String applicationDir = "application/";
-
- private final ImmutableMap<String, byte[]> files;
-
- private Files(ImmutableMap<String, byte[]> files) {
- this.files = files;
- }
-
- public static Files extract(Set<String> filesToExtract, byte[] zippedContent) {
- ImmutableMap.Builder<String, byte[]> builder = ImmutableMap.builder();
- try (ByteArrayInputStream stream = new ByteArrayInputStream(zippedContent)) {
- ZipStreamReader reader = new ZipStreamReader(stream,
- (name) -> filesToExtract.contains(withoutLegacyDir(name)),
- maxSize);
- for (ZipStreamReader.ZipEntryWithContent entry : reader.entries()) {
- builder.put(withoutLegacyDir(entry.zipEntry().getName()), entry.content());
- }
- } catch (IOException e) {
- throw new UncheckedIOException("Exception reading application package", e);
- }
- return new Files(builder.build());
- }
-
-
- /** Get content of given file name */
- public Optional<byte[]> get(String name) {
- return Optional.ofNullable(files.get(name));
- }
-
- /** Get reader for the content of given file name */
- public Optional<Reader> getAsReader(String name) {
- return get(name).map(ByteArrayInputStream::new).map(InputStreamReader::new);
- }
-
- private static String withoutLegacyDir(String name) {
- if (name.startsWith(applicationDir)) return name.substring(applicationDir.length());
- return name;
- }
-
- }
-
/** Creates a valid application package that will remove all application's deployments */
public static ApplicationPackage deploymentRemoval() {
return new ApplicationPackage(filesZip(Map.of(validationOverridesFile, allValidationOverrides().xmlForm().getBytes(UTF_8),
deploymentFile, DeploymentSpec.empty.xmlForm().getBytes(UTF_8))));
}
- /** Returns a zip containing only services.xml and deployment.xml files of this. */
+ /** Returns a zip containing meta data about deployments of this package by the given job. */
public byte[] metaDataZip() {
- return filesZip(Stream.of(deploymentFile, servicesFile)
- .filter(name -> files.files.containsKey(name))
- .collect(Collectors.toMap(name -> name,
- name -> files.files.get(name))));
+ preProcessAndPopulateCache();
+ return cacheZip();
+ }
+
+ private void preProcessAndPopulateCache() {
+ FileWrapper servicesXml = files.wrapper().wrap(Paths.get(servicesFile));
+ if (servicesXml.exists())
+ try {
+ new XmlPreProcessor(files.wrapper().wrap(Paths.get("./")),
+ new InputStreamReader(new ByteArrayInputStream(servicesXml.content()), UTF_8),
+ InstanceName.defaultName(),
+ Environment.prod,
+ RegionName.defaultName())
+ .run(); // Populates the zip archive cache with files that would be included.
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
- private static byte[] filesZip(Map<String, byte[]> files) {
+ private byte[] cacheZip() {
+ return filesZip(files.cache.entrySet().stream()
+ .filter(entry -> entry.getValue().isPresent())
+ .collect(toMap(entry -> entry.getKey().toString(),
+ entry -> entry.getValue().get())));
+ }
+
+ static byte[] filesZip(Map<String, byte[]> files) {
try (ZipBuilder zipBuilder = new ZipBuilder(files.values().stream().mapToInt(bytes -> bytes.length).sum() + 512)) {
files.forEach(zipBuilder::add);
zipBuilder.close();
@@ -231,4 +211,54 @@ public class ApplicationPackage {
return ValidationOverrides.fromXml(validationOverridesContents.toString());
}
+
+ /** Maps normalized paths to cached content read from a zip archive. */
+ private static class ZipArchiveCache {
+
+ /** Max size of each extracted file */
+ private static final int maxSize = 10 << 20; // 10 Mb
+
+ // TODO: Vespa 8: Remove application/ directory support
+ private static final String applicationDir = "application/";
+
+ private static String withoutLegacyDir(String name) {
+ if (name.startsWith(applicationDir)) return name.substring(applicationDir.length());
+ return name;
+ }
+
+ private final byte[] zip;
+ private final Map<Path, Optional<byte[]>> cache;
+
+ public ZipArchiveCache(byte[] zip, Collection<String> prePopulated) {
+ this.zip = zip;
+ this.cache = new ConcurrentSkipListMap<>();
+ this.cache.putAll(read(prePopulated));
+ }
+
+ public Optional<byte[]> get(String path) {
+ return get(Paths.get(path));
+ }
+
+ public Optional<byte[]> get(Path path) {
+ return cache.computeIfAbsent(path.normalize(), read(List.of(path.normalize().toString()))::get);
+ }
+
+ public FileSystemWrapper wrapper() {
+ return FileSystemWrapper.ofFiles(path -> get(path).isPresent(), // Assume content asked for will also be read ...
+ path -> get(path).orElseThrow(() -> new NoSuchFileException(path.toString())));
+ }
+
+ private Map<Path, Optional<byte[]>> read(Collection<String> names) {
+ var entries = new ZipStreamReader(new ByteArrayInputStream(zip),
+ name -> names.contains(withoutLegacyDir(name)),
+ maxSize)
+ .entries().stream()
+ .collect(toMap(entry -> Paths.get(withoutLegacyDir(entry.zipEntry().getName())).normalize(),
+ entry -> Optional.of(entry.content())));
+ names.stream().map(Paths::get).forEach(path -> entries.putIfAbsent(path.normalize(), Optional.empty()));
+ return entries;
+ }
+
+ }
+
}
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/ApplicationPackageTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/ApplicationPackageTest.java
index f7f0c9ce58e..67e5358678a 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/ApplicationPackageTest.java
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/ApplicationPackageTest.java
@@ -2,18 +2,56 @@ package com.yahoo.vespa.hosted.controller.application;
import com.yahoo.config.application.api.DeploymentSpec;
import com.yahoo.config.application.api.ValidationId;
+import org.junit.Assert;
import org.junit.Test;
+import java.io.ByteArrayInputStream;
+import java.nio.file.NoSuchFileException;
import java.time.Instant;
import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* @author valerijf
+ * @author jonmv
*/
public class ApplicationPackageTest {
+
+ private static final String deploymentXml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
+ "<deployment version=\"1.0\">\n" +
+ " <test />\n" +
+ " <prod>\n" +
+ " <parallel>\n" +
+ " <region active=\"true\">us-central-1</region>\n" +
+ " </parallel>\n" +
+ " </prod>\n" +
+ "</deployment>\n";
+
+ private static final String servicesXml = "<services version='1.0' xmlns:deploy=\"vespa\" xmlns:preprocess=\"properties\">\n" +
+ " <preprocess:include file='jdisc.xml' />\n" +
+ " <content version='1.0' if='foo' />\n" +
+ " <content version='1.0' id='foo' deploy:environment='staging prod' deploy:region='us-east-3 us-central-1'>\n" +
+ " <preprocess:include file='content/content.xml' />\n" +
+ " </content>\n" +
+ " <preprocess:include file='not_found.xml' required='false' />\n" +
+ "</services>\n";
+
+ private static final String jdiscXml = "<container id='stateless' version='1.0' />\n";
+
+ private static final String contentXml = "<documents>\n" +
+ " <document type=\"music.sd\" mode=\"index\" />\n" +
+ "</documents>\n" +
+ "<preprocess:include file=\"nodes.xml\" />";
+
+ private static final String nodesXml = "<nodes>\n" +
+ " <node hostalias=\"node0\" distribution-key=\"0\" />\n" +
+ "</nodes>";
+
@Test
public void test_createEmptyForDeploymentRemoval() {
ApplicationPackage app = ApplicationPackage.deploymentRemoval();
@@ -24,4 +62,57 @@ public class ApplicationPackageTest {
assertTrue(app.validationOverrides().allows(validationId, Instant.now()));
}
}
+
+ @Test
+ public void testMetaData() {
+ byte[] zip = ApplicationPackage.filesZip(Map.of("services.xml", servicesXml.getBytes(UTF_8),
+ "jdisc.xml", jdiscXml.getBytes(UTF_8),
+ "content/content.xml", contentXml.getBytes(UTF_8),
+ "content/nodes.xml", nodesXml.getBytes(UTF_8),
+ "gurba", "gurba".getBytes(UTF_8)));
+
+ assertEquals(Map.of("services.xml", servicesXml,
+ "jdisc.xml", jdiscXml,
+ "content/content.xml", contentXml,
+ "content/nodes.xml", nodesXml),
+ unzip(new ApplicationPackage(zip, false).metaDataZip()));
+ }
+
+ @Test
+ public void testMetaDataWithLegacyApplicationDirectory() {
+ byte[] zip = ApplicationPackage.filesZip(Map.of("application/deployment.xml", deploymentXml.getBytes(UTF_8),
+ "application/services.xml", servicesXml.getBytes(UTF_8),
+ "application/jdisc.xml", jdiscXml.getBytes(UTF_8),
+ "application/content/content.xml", contentXml.getBytes(UTF_8),
+ "application/content/nodes.xml", nodesXml.getBytes(UTF_8),
+ "application/gurba", "gurba".getBytes(UTF_8)));
+
+ assertEquals(Map.of("deployment.xml", deploymentXml,
+ "services.xml", servicesXml,
+ "jdisc.xml", jdiscXml,
+ "content/content.xml", contentXml,
+ "content/nodes.xml", nodesXml),
+ unzip(new ApplicationPackage(zip, false).metaDataZip()));
+ }
+
+ @Test
+ public void testMetaDataWithMissingFiles() {
+ byte[] zip = ApplicationPackage.filesZip(Map.of("services.xml", servicesXml.getBytes(UTF_8)));
+
+ try {
+ new ApplicationPackage(zip, false).metaDataZip();
+ Assert.fail("Should fail on missing include file");
+ }
+ catch (RuntimeException e) {
+ assertEquals("./jdisc.xml", e.getCause().getMessage());
+ }
+ }
+
+ private static Map<String, String> unzip(byte[] zip) {
+ return new ZipStreamReader(new ByteArrayInputStream(zip), __ -> true, 1 << 10)
+ .entries().stream()
+ .collect(Collectors.toMap(entry -> entry.zipEntry().getName(),
+ entry -> new String(entry.content(), UTF_8)));
+ }
+
}
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/integration/ApplicationStoreMock.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/integration/ApplicationStoreMock.java
index be0cd975190..81bda23146e 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/integration/ApplicationStoreMock.java
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/integration/ApplicationStoreMock.java
@@ -6,6 +6,7 @@ import com.yahoo.config.provision.ApplicationName;
import com.yahoo.config.provision.InstanceName;
import com.yahoo.config.provision.TenantName;
import com.yahoo.config.provision.zone.ZoneId;
+import com.yahoo.vespa.hosted.controller.api.identifiers.DeploymentId;
import com.yahoo.vespa.hosted.controller.api.integration.deployment.ApplicationStore;
import com.yahoo.vespa.hosted.controller.api.integration.deployment.ApplicationVersion;
import com.yahoo.vespa.hosted.controller.api.integration.deployment.TesterId;
@@ -14,7 +15,6 @@ import java.time.Instant;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -32,6 +32,7 @@ public class ApplicationStoreMock implements ApplicationStore {
private final Map<ApplicationId, Map<ApplicationVersion, byte[]>> store = new ConcurrentHashMap<>();
private final Map<ApplicationId, Map<ZoneId, byte[]>> devStore = new ConcurrentHashMap<>();
private final Map<ApplicationId, NavigableMap<Instant, byte[]>> meta = new ConcurrentHashMap<>();
+ private final Map<DeploymentId, NavigableMap<Instant, byte[]>> metaManual = new ConcurrentHashMap<>();
private static ApplicationId appId(TenantName tenant, ApplicationName application) {
return ApplicationId.from(tenant, application, InstanceName.defaultName());
@@ -120,6 +121,16 @@ public class ApplicationStoreMock implements ApplicationStore {
}
@Override
+ public void putMeta(DeploymentId id, Instant now, byte[] metaZip) {
+ metaManual.computeIfAbsent(id, __ -> new ConcurrentSkipListMap<>()).put(now, metaZip);
+ }
+
+ @Override
+ public void putMetaTombstone(DeploymentId id, Instant now) {
+ putMeta(id, now, tombstone);
+ }
+
+ @Override
public void pruneMeta(Instant oldest) {
for (ApplicationId id : meta.keySet()) {
Instant activeAtOldest = meta.get(id).lowerKey(oldest);
diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
index 18b3a5c5d8e..44058d48d1e 100644
--- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
@@ -49,6 +49,7 @@ using search::index::schema::CollectionType;
using search::index::schema::DataType;
using vespalib::makeLambdaTask;
using search::transactionlog::TransLogServer;
+using search::transactionlog::DomainConfig;
using storage::spi::PartitionId;
using storage::spi::RemoveResult;
using storage::spi::Result;
@@ -459,7 +460,7 @@ struct FeedHandlerFixture
FeedHandler handler;
FeedHandlerFixture()
: _fileHeaderContext(),
- tls("mytls", 9016, "mytlsdir", _fileHeaderContext, 0x10000),
+ tls("mytls", 9016, "mytlsdir", _fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000)),
tlsSpec("tcp/localhost:9016"),
sharedExecutor(1, 0x10000),
writeService(sharedExecutor),
diff --git a/searchlib/src/tests/features/onnx_feature/.gitattributes b/searchlib/src/tests/features/onnx_feature/.gitattributes
new file mode 100644
index 00000000000..62e8ad1e0a0
--- /dev/null
+++ b/searchlib/src/tests/features/onnx_feature/.gitattributes
@@ -0,0 +1 @@
+/*.onnx binary
diff --git a/searchlib/src/tests/features/onnx_feature/onnx_feature_test.cpp b/searchlib/src/tests/features/onnx_feature/onnx_feature_test.cpp
index 826984832f6..b49d9c365de 100644
--- a/searchlib/src/tests/features/onnx_feature/onnx_feature_test.cpp
+++ b/searchlib/src/tests/features/onnx_feature/onnx_feature_test.cpp
@@ -27,6 +27,7 @@ std::string source_dir = get_source_dir();
std::string vespa_dir = source_dir + "/" + "../../../../..";
std::string simple_model = vespa_dir + "/" + "eval/src/tests/tensor/onnx_wrapper/simple.onnx";
std::string dynamic_model = vespa_dir + "/" + "eval/src/tests/tensor/onnx_wrapper/dynamic.onnx";
+std::string strange_names_model = source_dir + "/" + "strange_names.onnx";
uint32_t default_docid = 1;
@@ -108,4 +109,16 @@ TEST_F(OnnxFeatureTest, dynamic_onnx_model_can_be_calculated) {
EXPECT_EQ(get(3), TensorSpec("tensor<float>(d0[1],d1[1])").add({{"d0",0},{"d1",0}}, 89.0));
}
+TEST_F(OnnxFeatureTest, strange_input_and_output_names_are_normalized) {
+ add_expr("input_0", "tensor<float>(a[2]):[10,20]");
+ add_expr("input_1", "tensor<float>(a[2]):[5,10]");
+ add_onnx("strange_names", strange_names_model);
+ compile(onnx_feature("strange_names"));
+ auto expect_add = TensorSpec("tensor<float>(d0[2])").add({{"d0",0}},15).add({{"d0",1}},30);
+ auto expect_sub = TensorSpec("tensor<float>(d0[2])").add({{"d0",0}},5).add({{"d0",1}},10);
+ EXPECT_EQ(get(1), expect_add);
+ EXPECT_EQ(get("onnxModel(strange_names).foo_bar", 1), expect_add);
+ EXPECT_EQ(get("onnxModel(strange_names)._baz_0", 1), expect_sub);
+}
+
GTEST_MAIN_RUN_ALL_TESTS()
diff --git a/searchlib/src/tests/features/onnx_feature/strange_names.onnx b/searchlib/src/tests/features/onnx_feature/strange_names.onnx
new file mode 100644
index 00000000000..d61ecbd5540
--- /dev/null
+++ b/searchlib/src/tests/features/onnx_feature/strange_names.onnx
@@ -0,0 +1,23 @@
+strange_names.py:­
+
+input:0
+input/1foo/bar"Add
+
+input:0
+input/1-baz:0"Sub strange_namesZ
+input:0
+
+
+Z
+input/1
+
+
+b
+foo/bar
+
+
+b
+-baz:0
+
+
+B \ No newline at end of file
diff --git a/searchlib/src/tests/features/onnx_feature/strange_names.py b/searchlib/src/tests/features/onnx_feature/strange_names.py
new file mode 100755
index 00000000000..681da641264
--- /dev/null
+++ b/searchlib/src/tests/features/onnx_feature/strange_names.py
@@ -0,0 +1,36 @@
+# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+import onnx
+from onnx import helper, TensorProto
+
+INPUT1 = helper.make_tensor_value_info('input:0', TensorProto.FLOAT, [2])
+INPUT2 = helper.make_tensor_value_info('input/1', TensorProto.FLOAT, [2])
+
+OUTPUT1 = helper.make_tensor_value_info('foo/bar', TensorProto.FLOAT, [2])
+OUTPUT2 = helper.make_tensor_value_info('-baz:0', TensorProto.FLOAT, [2])
+
+nodes = [
+ helper.make_node(
+ 'Add',
+ ['input:0', 'input/1'],
+ ['foo/bar'],
+ ),
+ helper.make_node(
+ 'Sub',
+ ['input:0', 'input/1'],
+ ['-baz:0'],
+ ),
+]
+graph_def = helper.make_graph(
+ nodes,
+ 'strange_names',
+ [
+ INPUT1,
+ INPUT2,
+ ],
+ [
+ OUTPUT1,
+ OUTPUT2,
+ ],
+)
+model_def = helper.make_model(graph_def, producer_name='strange_names.py')
+onnx.save(model_def, 'strange_names.onnx')
diff --git a/searchlib/src/tests/transactionlog/translogclient_test.cpp b/searchlib/src/tests/transactionlog/translogclient_test.cpp
index 0dced597917..d003bad0582 100644
--- a/searchlib/src/tests/transactionlog/translogclient_test.cpp
+++ b/searchlib/src/tests/transactionlog/translogclient_test.cpp
@@ -5,7 +5,6 @@
#include <vespa/vespalib/objects/identifiable.h>
#include <vespa/searchlib/index/dummyfileheadercontext.h>
#include <vespa/fastos/file.h>
-#include <map>
#include <vespa/log/log.h>
LOG_SETUP("translogclient_test");
@@ -14,9 +13,24 @@ using namespace search;
using namespace transactionlog;
using namespace document;
using namespace vespalib;
+using namespace std::chrono_literals;
using search::index::DummyFileHeaderContext;
-vespalib::string myhex(const void * b, size_t sz)
+namespace {
+
+bool createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t preExistingDomains=0);
+TransLogClient::Session::UP openDomainTest(TransLogClient & tls, const vespalib::string & name);
+bool fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name);
+void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries);
+void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries, size_t entrySize);
+uint32_t countFiles(const vespalib::string &dir);
+void checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries);
+bool visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name);
+void createAndFillDomain(const vespalib::string & name, Encoding encoding, size_t preExistingDomains);
+void verifyDomain(const vespalib::string & name);
+
+vespalib::string
+myhex(const void * b, size_t sz)
{
static const char * hextab="0123456789ABCDEF";
const unsigned char * c = static_cast<const unsigned char *>(b);
@@ -29,35 +43,6 @@ vespalib::string myhex(const void * b, size_t sz)
return s;
}
-class Test : public vespalib::TestApp
-{
-public:
- int Main() override;
-private:
- bool createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t preExistingDomains=0);
- TransLogClient::Session::UP openDomainTest(TransLogClient & tls, const vespalib::string & name);
- bool fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name);
- void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries);
- void fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries, size_t entrySize);
- uint32_t countFiles(const vespalib::string &dir);
- void checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries);
- bool visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name);
- bool partialUpdateTest();
- bool testVisitOverGeneratedDomain();
- bool testRemove();
- void createAndFillDomain(const vespalib::string & name, DomainPart::Crc crcMethod, size_t preExistingDomains);
- void verifyDomain(const vespalib::string & name);
- void testCrcVersions();
- bool testVisitOverPreExistingDomain();
- void testMany();
- void testErase();
- void testSync();
- void testTruncateOnShortRead();
- void testTruncateOnVersionMismatch();
-};
-
-TEST_APPHOOK(Test);
-
class CallBackTest : public TransLogClient::Visitor::Callback
{
private:
@@ -75,7 +60,8 @@ public:
bool _eof;
};
-RPC::Result CallBackTest::receive(const Packet & p)
+RPC::Result
+CallBackTest::receive(const Packet & p)
{
nbostream_longlivedbuf h(p.getHandle().data(), p.getHandle().size());
LOG(info,"CallBackTest::receive (%zu, %zu, %zu)(%s)", h.rp(), h.size(), h.capacity(), myhex(h.peek(), h.size()).c_str());
@@ -101,7 +87,8 @@ public:
size_t _value;
};
-RPC::Result CallBackManyTest::receive(const Packet & p)
+RPC::Result
+CallBackManyTest::receive(const Packet & p)
{
nbostream_longlivedbuf h(p.getHandle().data(), p.getHandle().size());
for(;h.size() > 0; _count++, _value++) {
@@ -133,7 +120,8 @@ public:
};
-RPC::Result CallBackUpdate::receive(const Packet & packet)
+RPC::Result
+CallBackUpdate::receive(const Packet & packet)
{
nbostream_longlivedbuf h(packet.getHandle().data(), packet.getHandle().size());
while (h.size() > 0) {
@@ -185,7 +173,8 @@ public:
SerialNum _prevSerial;
};
-RPC::Result CallBackStatsTest::receive(const Packet & p)
+RPC::Result
+CallBackStatsTest::receive(const Packet & p)
{
nbostream_longlivedbuf h(p.getHandle().data(), p.getHandle().size());
for(;h.size() > 0; ++_count) {
@@ -221,67 +210,10 @@ public:
IMPLEMENT_IDENTIFIABLE(TestIdentifiable, Identifiable);
-bool Test::partialUpdateTest()
-{
- bool retval(false);
- DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test7", 18377, ".", fileHeaderContext, 0x10000);
- TransLogClient tls("tcp/localhost:18377");
-
- TransLogClient::Session::UP s1 = openDomainTest(tls, "test1");
- TransLogClient::Session & session = *s1;
-
- TestIdentifiable du;
-
- nbostream os;
- os << du;
-
- vespalib::ConstBufferRef bb(os.data(), os.size());
- LOG(info, "DU : %s", myhex(bb.c_str(), bb.size()).c_str());
- Packet::Entry e(7, du.getClass().id(), bb);
- Packet pa;
- pa.add(e);
- pa.close();
- ASSERT_TRUE(session.commit(vespalib::ConstBufferRef(pa.getHandle().data(), pa.getHandle().size())));
-
- CallBackUpdate ca;
- TransLogClient::Visitor::UP visitor = tls.createVisitor("test1", ca);
- ASSERT_TRUE(visitor.get());
- ASSERT_TRUE( visitor->visit(5, 7) );
- for (size_t i(0); ! ca._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); }
- ASSERT_TRUE( ca._eof );
- ASSERT_TRUE( ca.map().size() == 1);
- ASSERT_TRUE( ca.hasSerial(7) );
-
- CallBackUpdate ca1;
- TransLogClient::Visitor::UP visitor1 = tls.createVisitor("test1", ca1);
- ASSERT_TRUE(visitor1.get());
- ASSERT_TRUE( visitor1->visit(4, 5) );
- for (size_t i(0); ! ca1._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); }
- ASSERT_TRUE( ca1._eof );
- ASSERT_TRUE( ca1.map().size() == 0);
-
- CallBackUpdate ca2;
- TransLogClient::Visitor::UP visitor2 = tls.createVisitor("test1", ca2);
- ASSERT_TRUE(visitor2.get());
- ASSERT_TRUE( visitor2->visit(5, 6) );
- for (size_t i(0); ! ca2._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); }
- ASSERT_TRUE( ca2._eof );
- ASSERT_TRUE( ca2.map().size() == 0);
-
- CallBackUpdate ca3;
- TransLogClient::Visitor::UP visitor3 = tls.createVisitor("test1", ca3);
- ASSERT_TRUE(visitor3.get());
- ASSERT_TRUE( visitor3->visit(5, 1000) );
- for (size_t i(0); ! ca3._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); }
- ASSERT_TRUE( ca3._eof );
- ASSERT_TRUE( ca3.map().size() == 1);
- ASSERT_TRUE( ca3.hasSerial(7) );
-
- return retval;
-}
+constexpr size_t DEFAULT_PACKET_SIZE = 0xf000;
-bool Test::createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t preExistingDomains)
+bool
+createDomainTest(TransLogClient & tls, const vespalib::string & name, size_t preExistingDomains)
{
bool retval(true);
std::vector<vespalib::string> dir;
@@ -298,43 +230,40 @@ bool Test::createDomainTest(TransLogClient & tls, const vespalib::string & name,
return retval;
}
-TransLogClient::Session::UP Test::openDomainTest(TransLogClient & tls, const vespalib::string & name)
+TransLogClient::Session::UP
+openDomainTest(TransLogClient & tls, const vespalib::string & name)
{
TransLogClient::Session::UP s1 = tls.open(name);
ASSERT_TRUE (s1.get() != NULL);
return s1;
}
-bool Test::fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name)
+bool
+fillDomainTest(TransLogClient::Session * s1, const vespalib::string & name)
{
bool retval(true);
Packet::Entry e1(1, 1, vespalib::ConstBufferRef("Content in buffer A", 20));
Packet::Entry e2(2, 2, vespalib::ConstBufferRef("Content in buffer B", 20));
Packet::Entry e3(3, 1, vespalib::ConstBufferRef("Content in buffer C", 20));
- Packet a;
- ASSERT_TRUE (a.add(e1));
- Packet b;
- ASSERT_TRUE (b.add(e2));
- ASSERT_TRUE (b.add(e3));
- ASSERT_TRUE (!b.add(e1));
- a.close();
- b.close();
+ Packet a(DEFAULT_PACKET_SIZE);
+ a.add(e1);
+ Packet b(DEFAULT_PACKET_SIZE);
+ b.add(e2);
+ b.add(e3);
+ EXPECT_FALSE(b.add(e1));
ASSERT_TRUE (s1->commit(vespalib::ConstBufferRef(a.getHandle().data(), a.getHandle().size())));
ASSERT_TRUE (s1->commit(vespalib::ConstBufferRef(b.getHandle().data(), b.getHandle().size())));
- try {
- s1->commit(vespalib::ConstBufferRef(a.getHandle().data(), a.getHandle().size()));
- ASSERT_TRUE(false);
- } catch (const std::exception & e) {
- EXPECT_EQUAL(vespalib::string("commit failed with code -2. server says: Exception during commit on " + name + " : Incomming serial number(1) must be bigger than the last one (3)."), e.what());
- }
+ EXPECT_EXCEPTION(s1->commit(vespalib::ConstBufferRef(a.getHandle().data(), a.getHandle().size())),
+ std::runtime_error,
+ "commit failed with code -2. server says: Exception during commit on " + name + " : Incomming serial number(1) must be bigger than the last one (3).");
EXPECT_EQUAL(a.size(), 1u);
EXPECT_EQUAL(a.range().from(), 1u);
EXPECT_EQUAL(a.range().to(), 1u);
EXPECT_EQUAL(b.size(), 2u);
EXPECT_EQUAL(b.range().from(), 2u);
EXPECT_EQUAL(b.range().to(), 3u);
- EXPECT_TRUE(a.merge(b));
+ a.merge(b);
EXPECT_EQUAL(a.size(), 3u);
EXPECT_EQUAL(a.range().from(), 1u);
EXPECT_EQUAL(a.range().to(), 3u);
@@ -349,52 +278,82 @@ bool Test::fillDomainTest(TransLogClient::Session * s1, const vespalib::string &
return retval;
}
-void Test::fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries)
+void
+fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries)
{
size_t value(0);
for(size_t i=0; i < numPackets; i++) {
- std::unique_ptr<Packet> p(new Packet());
+ std::unique_ptr<Packet> p(new Packet(DEFAULT_PACKET_SIZE));
for(size_t j=0; j < numEntries; j++, value++) {
Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&value, sizeof(value)));
- if ( ! p->add(e) ) {
- p->close();
+ p->add(e);
+ if (p->sizeBytes() > DEFAULT_PACKET_SIZE){
ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().data(), p->getHandle().size())));
- p.reset(new Packet());
- ASSERT_TRUE(p->add(e));
+ p.reset(new Packet(DEFAULT_PACKET_SIZE));
}
}
- p->close();
ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().data(), p->getHandle().size())));
}
}
+using Counter = std::atomic<size_t>;
+
+class CountDone : public IDestructorCallback {
+public:
+ CountDone(Counter & inFlight) : _inFlight(inFlight) { ++_inFlight; }
+ ~CountDone() override { --_inFlight; }
+private:
+ Counter & _inFlight;
+};
+
+void
+fillDomainTest(TransLogServer & s1, const vespalib::string & domain, size_t numPackets, size_t numEntries)
+{
+ size_t value(0);
+ Counter inFlight(0);
+ for(size_t i=0; i < numPackets; i++) {
+ std::unique_ptr<Packet> p(new Packet(DEFAULT_PACKET_SIZE));
+ for(size_t j=0; j < numEntries; j++, value++) {
+ Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&value, sizeof(value)));
+ p->add(e);
+ if ( p->sizeBytes() > DEFAULT_PACKET_SIZE ) {
+ s1.commit(domain, *p, std::make_shared<CountDone>(inFlight));
+ p.reset(new Packet(DEFAULT_PACKET_SIZE));
+ }
+ }
+ s1.commit(domain, *p, std::make_shared<CountDone>(inFlight));
+ LOG(info, "Inflight %ld", inFlight.load());
+ }
+ while (inFlight.load() != 0) {
+ std::this_thread::sleep_for(10ms);
+ LOG(info, "Waiting for inflight %ld to reach zero", inFlight.load());
+ }
+
+}
+
void
-Test::fillDomainTest(TransLogClient::Session * s1,
- size_t numPackets, size_t numEntries,
- size_t entrySize)
+fillDomainTest(TransLogClient::Session * s1, size_t numPackets, size_t numEntries, size_t entrySize)
{
size_t value(0);
std::vector<char> entryBuffer(entrySize);
for(size_t i=0; i < numPackets; i++) {
- std::unique_ptr<Packet> p(new Packet());
+ std::unique_ptr<Packet> p(new Packet(DEFAULT_PACKET_SIZE));
for(size_t j=0; j < numEntries; j++, value++) {
Packet::Entry e(value+1, j+1, vespalib::ConstBufferRef((const char *)&entryBuffer[0], entryBuffer.size()));
- if ( ! p->add(e) ) {
- p->close();
+ p->add(e);
+ if (p->sizeBytes() > DEFAULT_PACKET_SIZE){
ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().data(), p->getHandle().size())));
- p.reset(new Packet());
- ASSERT_TRUE(p->add(e));
+ p.reset(new Packet(DEFAULT_PACKET_SIZE));
}
}
- p->close();
ASSERT_TRUE(s1->commit(vespalib::ConstBufferRef(p->getHandle().data(), p->getHandle().size())));
}
}
uint32_t
-Test::countFiles(const vespalib::string &dir)
+countFiles(const vespalib::string &dir)
{
uint32_t res = 0;
FastOS_DirectoryScan dirScan(dir.c_str());
@@ -408,10 +367,8 @@ Test::countFiles(const vespalib::string &dir)
return res;
}
-
void
-Test::checkFilledDomainTest(const TransLogClient::Session::UP &s1,
- size_t numEntries)
+checkFilledDomainTest(const TransLogClient::Session::UP &s1, size_t numEntries)
{
SerialNum b(0), e(0);
size_t c(0);
@@ -421,8 +378,8 @@ Test::checkFilledDomainTest(const TransLogClient::Session::UP &s1,
EXPECT_EQUAL(c, numEntries);
}
-
-bool Test::visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name)
+bool
+visitDomainTest(TransLogClient & tls, TransLogClient::Session * s1, const vespalib::string & name)
{
bool retval(true);
@@ -486,10 +443,31 @@ getMaxSessionRunTime(TransLogServer &tls, const vespalib::string &domain)
return tls.getDomainStats()[domain].maxSessionRunTime.count();
}
-bool Test::testVisitOverGeneratedDomain()
+void createAndFillDomain(const vespalib::string & name, Encoding encoding, size_t preExistingDomains)
{
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test7", 18377, ".", fileHeaderContext, 0x10000);
+ TransLogServer tlss("test13", 18377, ".", fileHeaderContext,
+ DomainConfig().setPartSizeLimit(0x1000000).setEncoding(encoding), 4);
+ TransLogClient tls("tcp/localhost:18377");
+
+ createDomainTest(tls, name, preExistingDomains);
+ TransLogClient::Session::UP s1 = openDomainTest(tls, name);
+ fillDomainTest(s1.get(), name);
+}
+
+void verifyDomain(const vespalib::string & name) {
+ DummyFileHeaderContext fileHeaderContext;
+ TransLogServer tlss("test13", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000));
+ TransLogClient tls("tcp/localhost:18377");
+ TransLogClient::Session::UP s1 = openDomainTest(tls, name);
+ visitDomainTest(tls, s1.get(), name);
+}
+
+}
+
+TEST("testVisitOverGeneratedDomain") {
+ DummyFileHeaderContext fileHeaderContext;
+ TransLogServer tlss("test7", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
TransLogClient tls("tcp/localhost:18377");
vespalib::string name("test1");
@@ -501,42 +479,85 @@ bool Test::testVisitOverGeneratedDomain()
double maxSessionRunTime = getMaxSessionRunTime(tlss, "test1");
LOG(info, "testVisitOverGeneratedDomain(): maxSessionRunTime=%f", maxSessionRunTime);
EXPECT_GREATER(maxSessionRunTime, 0);
- return true;
}
-void Test::createAndFillDomain(const vespalib::string & name, DomainPart::Crc crcMethod, size_t preExistingDomains)
-{
+TEST("testVisitOverPreExistingDomain") {
+ // Depends on Test::testVisitOverGeneratedDomain()
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test13", 18377, ".", fileHeaderContext, 0x10000, 4, crcMethod);
+ TransLogServer tlss("test7", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
TransLogClient tls("tcp/localhost:18377");
- createDomainTest(tls, name, preExistingDomains);
+ vespalib::string name("test1");
TransLogClient::Session::UP s1 = openDomainTest(tls, name);
- fillDomainTest(s1.get(), name);
+ visitDomainTest(tls, s1.get(), name);
}
-void Test::verifyDomain(const vespalib::string & name)
-{
+TEST("partialUpdateTest") {
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test13", 18377, ".", fileHeaderContext, 0x10000);
+ TransLogServer tlss("test7", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
TransLogClient tls("tcp/localhost:18377");
- TransLogClient::Session::UP s1 = openDomainTest(tls, name);
- visitDomainTest(tls, s1.get(), name);
+
+ TransLogClient::Session::UP s1 = openDomainTest(tls, "test1");
+ TransLogClient::Session & session = *s1;
+
+ TestIdentifiable du;
+
+ nbostream os;
+ os << du;
+
+ vespalib::ConstBufferRef bb(os.data(), os.size());
+ LOG(info, "DU : %s", myhex(bb.c_str(), bb.size()).c_str());
+ Packet::Entry e(7, du.getClass().id(), bb);
+ Packet pa(DEFAULT_PACKET_SIZE);
+ pa.add(e);
+ ASSERT_TRUE(session.commit(vespalib::ConstBufferRef(pa.getHandle().data(), pa.getHandle().size())));
+
+ CallBackUpdate ca;
+ TransLogClient::Visitor::UP visitor = tls.createVisitor("test1", ca);
+ ASSERT_TRUE(visitor.get());
+ ASSERT_TRUE( visitor->visit(5, 7) );
+ for (size_t i(0); ! ca._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); }
+ ASSERT_TRUE( ca._eof );
+ ASSERT_TRUE( ca.map().size() == 1);
+ ASSERT_TRUE( ca.hasSerial(7) );
+
+ CallBackUpdate ca1;
+ TransLogClient::Visitor::UP visitor1 = tls.createVisitor("test1", ca1);
+ ASSERT_TRUE(visitor1.get());
+ ASSERT_TRUE( visitor1->visit(4, 5) );
+ for (size_t i(0); ! ca1._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); }
+ ASSERT_TRUE( ca1._eof );
+ ASSERT_TRUE( ca1.map().size() == 0);
+
+ CallBackUpdate ca2;
+ TransLogClient::Visitor::UP visitor2 = tls.createVisitor("test1", ca2);
+ ASSERT_TRUE(visitor2.get());
+ ASSERT_TRUE( visitor2->visit(5, 6) );
+ for (size_t i(0); ! ca2._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); }
+ ASSERT_TRUE( ca2._eof );
+ ASSERT_TRUE( ca2.map().size() == 0);
+
+ CallBackUpdate ca3;
+ TransLogClient::Visitor::UP visitor3 = tls.createVisitor("test1", ca3);
+ ASSERT_TRUE(visitor3.get());
+ ASSERT_TRUE( visitor3->visit(5, 1000) );
+ for (size_t i(0); ! ca3._eof && (i < 1000); i++ ) { std::this_thread::sleep_for(10ms); }
+ ASSERT_TRUE( ca3._eof );
+ ASSERT_TRUE( ca3.map().size() == 1);
+ ASSERT_TRUE( ca3.hasSerial(7) );
}
-void Test::testCrcVersions()
-{
- createAndFillDomain("ccitt_crc32", DomainPart::ccitt_crc32, 0);
- createAndFillDomain("xxh64", DomainPart::xxh64, 1);
+TEST("testCrcVersions") {
+ createAndFillDomain("ccitt_crc32", Encoding(Encoding::Crc::ccitt_crc32, Encoding::Compression::none), 0);
+ createAndFillDomain("xxh64", Encoding(Encoding::Crc::xxh64, Encoding::Compression::none), 1);
verifyDomain("ccitt_crc32");
verifyDomain("xxh64");
}
-bool Test::testRemove()
-{
+TEST("testRemove") {
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("testremove", 18377, ".", fileHeaderContext, 0x10000);
+ TransLogServer tlss("testremove", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
TransLogClient tls("tcp/localhost:18377");
vespalib::string name("test-delete");
@@ -545,21 +566,6 @@ bool Test::testRemove()
fillDomainTest(s1.get(), name);
visitDomainTest(tls, s1.get(), name);
ASSERT_TRUE(tls.remove(name));
-
- return true;
-}
-
-bool Test::testVisitOverPreExistingDomain()
-{
- // Depends on Test::testVisitOverGeneratedDomain()
- DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test7", 18377, ".", fileHeaderContext, 0x10000);
- TransLogClient tls("tcp/localhost:18377");
-
- vespalib::string name("test1");
- TransLogClient::Session::UP s1 = openDomainTest(tls, name);
- visitDomainTest(tls, s1.get(), name);
- return true;
}
namespace {
@@ -600,18 +606,19 @@ assertStatus(TransLogClient::Session &s,
}
-void Test::testMany()
-{
+TEST("test sending a lot of data") {
const unsigned int NUM_PACKETS = 1000;
const unsigned int NUM_ENTRIES = 100;
const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES;
+ const vespalib::string MANY("many");
{
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test8", 18377, ".", fileHeaderContext, 0x80000);
+ TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x80000)
+ .setChunkAgeLimit(100us));
TransLogClient tls("tcp/localhost:18377");
- createDomainTest(tls, "many", 0);
- TransLogClient::Session::UP s1 = openDomainTest(tls, "many");
+ createDomainTest(tls, MANY, 0);
+ TransLogClient::Session::UP s1 = openDomainTest(tls, MANY);
fillDomainTest(s1.get(), NUM_PACKETS, NUM_ENTRIES);
SerialNum b(0), e(0);
size_t c(0);
@@ -630,7 +637,7 @@ void Test::testMany()
}
{
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test8", 18377, ".", fileHeaderContext, 0x1000000);
+ TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000));
TransLogClient tls("tcp/localhost:18377");
TransLogClient::Session::UP s1 = openDomainTest(tls, "many");
@@ -641,7 +648,28 @@ void Test::testMany()
EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES);
EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES);
CallBackManyTest ca(2);
- TransLogClient::Visitor::UP visitor = tls.createVisitor("many", ca);
+ TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca);
+ ASSERT_TRUE(visitor.get());
+ ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) );
+ for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
+ ASSERT_TRUE( ca._eof );
+ EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES);
+ EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES);
+ }
+ {
+ DummyFileHeaderContext fileHeaderContext;
+ TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000));
+ TransLogClient tls("tcp/localhost:18377");
+
+ TransLogClient::Session::UP s1 = openDomainTest(tls, MANY);
+ SerialNum b(0), e(0);
+ size_t c(0);
+ EXPECT_TRUE(s1->status(b, e, c));
+ EXPECT_EQUAL(b, 1u);
+ EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES);
+ EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES);
+ CallBackManyTest ca(2);
+ TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca);
ASSERT_TRUE(visitor.get());
ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) );
for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
@@ -651,14 +679,67 @@ void Test::testMany()
}
}
-void Test::testErase()
-{
+TEST("test sending a lot of data async") {
const unsigned int NUM_PACKETS = 1000;
const unsigned int NUM_ENTRIES = 100;
const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES;
+ const vespalib::string MANY("many-async");
+ {
+ DummyFileHeaderContext fileHeaderContext;
+ TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x80000)
+ .setChunkAgeLimit(10ms));
+ TransLogClient tls("tcp/localhost:18377");
+ createDomainTest(tls, MANY, 1);
+ TransLogClient::Session::UP s1 = openDomainTest(tls, MANY);
+ fillDomainTest(tlss, MANY, NUM_PACKETS, NUM_ENTRIES);
+ SerialNum b(0), e(0);
+ size_t c(0);
+ EXPECT_TRUE(s1->status(b, e, c));
+
+ EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES);
+ EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES);
+ CallBackManyTest ca(2);
+ TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca);
+ ASSERT_TRUE(visitor.get());
+ ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) );
+ for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
+ ASSERT_TRUE( ca._eof );
+ EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES);
+ EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES);
+ }
{
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test12", 18377, ".", fileHeaderContext, 0x80000);
+ TransLogServer tlss("test8", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000));
+ TransLogClient tls("tcp/localhost:18377");
+
+ TransLogClient::Session::UP s1 = openDomainTest(tls, MANY);
+ SerialNum b(0), e(0);
+ size_t c(0);
+ EXPECT_TRUE(s1->status(b, e, c));
+ EXPECT_EQUAL(b, 1u);
+ EXPECT_EQUAL(e, TOTAL_NUM_ENTRIES);
+ EXPECT_EQUAL(c, TOTAL_NUM_ENTRIES);
+ CallBackManyTest ca(2);
+ TransLogClient::Visitor::UP visitor = tls.createVisitor(MANY, ca);
+ ASSERT_TRUE(visitor.get());
+ ASSERT_TRUE( visitor->visit(2, TOTAL_NUM_ENTRIES) );
+ for (size_t i(0); ! ca._eof && (i < 60000); i++ ) { std::this_thread::sleep_for(10ms); }
+ ASSERT_TRUE( ca._eof );
+ EXPECT_EQUAL(ca._count, TOTAL_NUM_ENTRIES);
+ EXPECT_EQUAL(ca._value, TOTAL_NUM_ENTRIES);
+ }
+}
+
+
+
+
+TEST("testErase") {
+ const unsigned int NUM_PACKETS = 1000;
+ const unsigned int NUM_ENTRIES = 100;
+ const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES;
+ {
+ DummyFileHeaderContext fileHeaderContext;
+ TransLogServer tlss("test12", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x80000));
TransLogClient tls("tcp/localhost:18377");
createDomainTest(tls, "erase", 0);
@@ -667,7 +748,7 @@ void Test::testErase()
}
{
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test12", 18377, ".", fileHeaderContext, 0x1000000);
+ TransLogServer tlss("test12", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000));
TransLogClient tls("tcp/localhost:18377");
TransLogClient::Session::UP s1 = openDomainTest(tls, "erase");
@@ -748,16 +829,13 @@ void Test::testErase()
}
}
-
-void
-Test::testSync()
-{
+TEST("testSync") {
const unsigned int NUM_PACKETS = 3;
const unsigned int NUM_ENTRIES = 4;
const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES;
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tlss("test9", 18377, ".", fileHeaderContext, 0x1000000);
+ TransLogServer tlss("test9", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000));
TransLogClient tls("tcp/localhost:18377");
createDomainTest(tls, "sync", 0);
@@ -770,10 +848,7 @@ Test::testSync()
EXPECT_EQUAL(syncedTo, TOTAL_NUM_ENTRIES);
}
-
-void
-Test::testTruncateOnVersionMismatch()
-{
+TEST("test truncate on version mismatch") {
const unsigned int NUM_PACKETS = 3;
const unsigned int NUM_ENTRIES = 4;
const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES;
@@ -782,7 +857,7 @@ Test::testTruncateOnVersionMismatch()
size_t countOld(0);
DummyFileHeaderContext fileHeaderContext;
{
- TransLogServer tlss("test11", 18377, ".", fileHeaderContext, 0x1000000);
+ TransLogServer tlss("test11", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x1000000));
TransLogClient tls("tcp/localhost:18377");
createDomainTest(tls, "sync", 0);
@@ -803,7 +878,7 @@ Test::testTruncateOnVersionMismatch()
EXPECT_EQUAL(static_cast<ssize_t>(sizeof(tmp)), f.Write2(tmp, sizeof(tmp)));
EXPECT_TRUE(f.Close());
{
- TransLogServer tlss("test11", 18377, ".", fileHeaderContext, 0x1000000);
+ TransLogServer tlss("test11", 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
TransLogClient tls("tcp/localhost:18377");
TransLogClient::Session::UP s1 = openDomainTest(tls, "sync");
uint64_t from(0), to(0);
@@ -815,9 +890,7 @@ Test::testTruncateOnVersionMismatch()
}
}
-void
-Test::testTruncateOnShortRead()
-{
+TEST("test truncation after short read") {
const unsigned int NUM_PACKETS = 17;
const unsigned int NUM_ENTRIES = 1;
const unsigned int TOTAL_NUM_ENTRIES = NUM_PACKETS * NUM_ENTRIES;
@@ -829,7 +902,7 @@ Test::testTruncateOnShortRead()
DummyFileHeaderContext fileHeaderContext;
{
- TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, 0x10000);
+ TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
TransLogClient tls(tlsspec);
createDomainTest(tls, domain, 0);
@@ -845,7 +918,7 @@ Test::testTruncateOnShortRead()
EXPECT_EQUAL(2u, countFiles(dir));
}
{
- TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, 0x10000);
+ TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
TransLogClient tls(tlsspec);
TransLogClient::Session::UP s1 = openDomainTest(tls, domain);
checkFilledDomainTest(s1, TOTAL_NUM_ENTRIES);
@@ -861,7 +934,7 @@ Test::testTruncateOnShortRead()
trfile.Close();
}
{
- TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, 0x10000);
+ TransLogServer tlss(topdir, 18377, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(0x10000));
TransLogClient tls(tlsspec);
TransLogClient::Session::UP s1 = openDomainTest(tls, domain);
checkFilledDomainTest(s1, TOTAL_NUM_ENTRIES - 1);
@@ -871,28 +944,4 @@ Test::testTruncateOnShortRead()
}
}
-
-int Test::Main()
-{
- TEST_INIT("translogclient_test");
-
- if (_argc > 0) {
- DummyFileHeaderContext::setCreator(_argv[0]);
- }
- testVisitOverGeneratedDomain();
- testVisitOverPreExistingDomain();
- testMany();
- testErase();
- partialUpdateTest();
-
- testRemove();
-
- testSync();
-
- testTruncateOnShortRead();
- testTruncateOnVersionMismatch();
-
- testCrcVersions();
-
- TEST_DONE();
-}
+TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/searchlib/src/tests/transactionlogstress/translogstress.cpp b/searchlib/src/tests/transactionlogstress/translogstress.cpp
index 013ca81dcc9..925f297bf48 100644
--- a/searchlib/src/tests/transactionlogstress/translogstress.cpp
+++ b/searchlib/src/tests/transactionlogstress/translogstress.cpp
@@ -698,7 +698,7 @@ TransLogStress::Main()
// start transaction log server
DummyFileHeaderContext fileHeaderContext;
- TransLogServer tls("server", 17897, ".", fileHeaderContext, _cfg.domainPartSize);
+ TransLogServer tls("server", 17897, ".", fileHeaderContext, DomainConfig().setPartSizeLimit(_cfg.domainPartSize));
TransLogClient client(tlsSpec);
client.create(domain);
diff --git a/searchlib/src/vespa/searchlib/config/translogserver.def b/searchlib/src/vespa/searchlib/config/translogserver.def
index 74efe3fe68e..f822fc80fc1 100644
--- a/searchlib/src/vespa/searchlib/config/translogserver.def
+++ b/searchlib/src/vespa/searchlib/config/translogserver.def
@@ -5,7 +5,7 @@ namespace=searchlib
listenport int default=13700 restart
## Max file size (50M)
-filesizemax int default=50000000 restart
+filesizemax int default=50000000
## Server name to identify server.
servername string default="tls" restart
@@ -22,3 +22,17 @@ maxthreads int default=4 restart
##Default crc method used
crcmethod enum {ccitt_crc32, xxh64} default=xxh64
+
+## Control compression type.
+compression.type enum {NONE, NONE_MULTI, LZ4, ZSTD} default=LZ4
+
+## Control compression level
+## LZ4 has normal range 1..9 while ZSTD has range 1..19
+## 9 is a reasonable default for both
+compression.level int default=9
+
+## How large a chunk can grow in memory before beeing flushed
+chunk.sizelimit int default = 256000 # 256k
+
+## How long a chunk can reside in memory befor ebeeing flushed to disk.
+chunk.agelimit double default = 0.010 # 10 milliseconds
diff --git a/searchlib/src/vespa/searchlib/features/onnx_feature.cpp b/searchlib/src/vespa/searchlib/features/onnx_feature.cpp
index b24392ce629..698d2309e5a 100644
--- a/searchlib/src/vespa/searchlib/features/onnx_feature.cpp
+++ b/searchlib/src/vespa/searchlib/features/onnx_feature.cpp
@@ -26,6 +26,25 @@ using vespalib::tensor::Onnx;
namespace search::features {
+namespace {
+
+vespalib::string normalize_name(const vespalib::string &name, const char *context) {
+ vespalib::string result;
+ for (char c: name) {
+ if (isalnum(c)) {
+ result.push_back(c);
+ } else {
+ result.push_back('_');
+ }
+ }
+ if (result != name) {
+ LOG(warning, "normalized %s name: '%s' -> '%s'", context, name.c_str(), result.c_str());
+ }
+ return result;
+}
+
+}
+
/**
* Feature executor that evaluates an onnx model
*/
@@ -78,23 +97,25 @@ OnnxBlueprint::setup(const IIndexEnvironment &env,
Onnx::WirePlanner planner;
for (size_t i = 0; i < _model->inputs().size(); ++i) {
const auto &model_input = _model->inputs()[i];
- if (auto maybe_input = defineInput(fmt("rankingExpression(\"%s\")", model_input.name.c_str()), AcceptInput::OBJECT)) {
+ vespalib::string input_name = normalize_name(model_input.name, "input");
+ if (auto maybe_input = defineInput(fmt("rankingExpression(\"%s\")", input_name.c_str()), AcceptInput::OBJECT)) {
const FeatureType &feature_input = maybe_input.value();
assert(feature_input.is_object());
if (!planner.bind_input_type(feature_input.type(), model_input)) {
- return fail("incompatible type for input '%s': %s -> %s", model_input.name.c_str(),
+ return fail("incompatible type for input '%s': %s -> %s", input_name.c_str(),
feature_input.type().to_spec().c_str(), model_input.type_as_string().c_str());
}
}
}
for (size_t i = 0; i < _model->outputs().size(); ++i) {
const auto &model_output = _model->outputs()[i];
+ vespalib::string output_name = normalize_name(model_output.name, "output");
ValueType output_type = planner.make_output_type(model_output);
if (output_type.is_error()) {
return fail("unable to make compatible type for output '%s': %s -> error",
- model_output.name.c_str(), model_output.type_as_string().c_str());
+ output_name.c_str(), model_output.type_as_string().c_str());
}
- describeOutput(model_output.name, "output from onnx model", FeatureType::object(output_type));
+ describeOutput(output_name, "output from onnx model", FeatureType::object(output_type));
}
_wire_info = planner.get_wire_info(*_model);
return true;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.h b/searchlib/src/vespa/searchlib/transactionlog/common.h
index db8b9727daa..f3276027078 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/common.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/common.h
@@ -72,7 +72,6 @@ public:
Packet(size_t m=0xf000) : _count(0), _range(), _limit(m), _buf(m) { }
Packet(const void * buf, size_t sz);
bool add(const Entry & data);
- void close() { }
void clear() { _buf.clear(); _count = 0; _range.from(0); _range.to(0); }
const SerialNumRange & range() const { return _range; }
const vespalib::nbostream & getHandle() const { return _buf; }
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
index 5e7cfc74199..804a558789e 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.cpp
@@ -4,11 +4,14 @@
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/vespalib/util/closuretask.h>
#include <vespa/vespalib/io/fileutil.h>
+#include <vespa/vespalib/util/lambdatask.h>
#include <vespa/fastos/file.h>
#include <algorithm>
#include <thread>
#include <vespa/log/log.h>
+#include <vespa/vespalib/util/threadstackexecutor.h>
+
LOG_SETUP(".transactionlog.domain");
using vespalib::string;
@@ -20,29 +23,43 @@ using vespalib::Monitor;
using vespalib::MonitorGuard;
using search::common::FileHeaderContext;
using std::runtime_error;
-using namespace std::chrono_literals;
+using std::make_shared;
namespace search::transactionlog {
-Domain::Domain(const string &domainName, const string & baseDir, Executor & commitExecutor,
- Executor & sessionExecutor, uint64_t domainPartSize, DomainPart::Crc defaultCrcType,
- const FileHeaderContext &fileHeaderContext) :
- _defaultCrcType(defaultCrcType),
- _commitExecutor(commitExecutor),
- _sessionExecutor(sessionExecutor),
- _sessionId(1),
- _syncMonitor(),
- _pendingSync(false),
- _name(domainName),
- _domainPartSize(domainPartSize),
- _parts(),
- _lock(),
- _sessionLock(),
- _sessions(),
- _maxSessionRunTime(),
- _baseDir(baseDir),
- _fileHeaderContext(fileHeaderContext),
- _markedDeleted(false)
+VESPA_THREAD_STACK_TAG(domain_commit_executor);
+
+DomainConfig::DomainConfig()
+ : _encoding(Encoding::Crc::xxh64, Encoding::Compression::none),
+ _compressionLevel(9),
+ _partSizeLimit(0x10000000), // 256M
+ _chunkSizeLimit(0x40000), // 256k
+ _chunkAgeLimit(10ms)
+{ }
+
+Domain::Domain(const string &domainName, const string & baseDir, FastOS_ThreadPool & threadPool,
+ Executor & commitExecutor, Executor & sessionExecutor, const DomainConfig & cfg,
+ const FileHeaderContext &fileHeaderContext)
+ : _config(cfg),
+ _lastSerial(0),
+ _threadPool(threadPool),
+ _singleCommiter(std::make_unique<vespalib::ThreadStackExecutor>(1, 128*1024)),
+ _commitExecutor(commitExecutor),
+ _sessionExecutor(sessionExecutor),
+ _sessionId(1),
+ _syncMonitor(),
+ _pendingSync(false),
+ _name(domainName),
+ _parts(),
+ _lock(),
+ _currentChunkMonitor(),
+ _sessionLock(),
+ _sessions(),
+ _maxSessionRunTime(),
+ _baseDir(baseDir),
+ _fileHeaderContext(fileHeaderContext),
+ _markedDeleted(false),
+ _self(nullptr)
{
int retval(0);
if ((retval = makeDirectory(_baseDir.c_str())) != 0) {
@@ -60,13 +77,22 @@ Domain::Domain(const string &domainName, const string & baseDir, Executor & comm
}
_sessionExecutor.sync();
if (_parts.empty() || _parts.crbegin()->second->isClosed()) {
- _parts[lastPart] = std::make_shared<DomainPart>(_name, dir(), lastPart, _defaultCrcType, _fileHeaderContext, false);
+ _parts[lastPart] = std::make_shared<DomainPart>(_name, dir(), lastPart, _config.getEncoding(),
+ _config.getCompressionlevel(), _fileHeaderContext, false);
vespalib::File::sync(dir());
}
+ _lastSerial = end();
+}
+
+Domain &
+Domain::setConfig(const DomainConfig & cfg) {
+ _config = cfg;
+ return *this;
}
void Domain::addPart(int64_t partId, bool isLastPart) {
- auto dp = std::make_shared<DomainPart>(_name, dir(), partId, _defaultCrcType, _fileHeaderContext, isLastPart);
+ auto dp = std::make_shared<DomainPart>(_name, dir(), partId, _config.getEncoding(),
+ _config.getCompressionlevel(), _fileHeaderContext, isLastPart);
if (dp->size() == 0) {
// Only last domain part is allowed to be truncated down to
// empty size.
@@ -285,18 +311,21 @@ waitPendingSync(vespalib::Monitor &syncMonitor, bool &pendingSync)
}
-void Domain::commit(const Packet & packet)
+void
+Domain::commit(const Packet & packet, Writer::DoneCallback onDone)
{
+ (void) onDone;
DomainPart::SP dp(_parts.rbegin()->second);
vespalib::nbostream_longlivedbuf is(packet.getHandle().data(), packet.getHandle().size());
Packet::Entry entry;
entry.deserialize(is);
- if (dp->byteSize() > _domainPartSize) {
+ if (dp->byteSize() > _config.getPartSizeLimit()) {
waitPendingSync(_syncMonitor, _pendingSync);
triggerSyncNow();
waitPendingSync(_syncMonitor, _pendingSync);
dp->close();
- dp = std::make_shared<DomainPart>(_name, dir(), entry.serial(), _defaultCrcType, _fileHeaderContext, false);
+ dp = std::make_shared<DomainPart>(_name, dir(), entry.serial(), _config.getEncoding(),
+ _config.getCompressionlevel(), _fileHeaderContext, false);
{
LockGuard guard(_lock);
_parts[entry.serial()] = dp;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domain.h b/searchlib/src/vespa/searchlib/transactionlog/domain.h
index d6f964d5140..cc7f4ac5e4b 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domain.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/domain.h
@@ -4,20 +4,45 @@
#include "domainpart.h"
#include "session.h"
#include <vespa/vespalib/util/threadexecutor.h>
+#include <vespa/vespalib/util/time.h>
+#include <vespa/fastos/thread.h>
#include <chrono>
namespace search::transactionlog {
+class DomainConfig {
+public:
+ using duration = vespalib::duration;
+ DomainConfig();
+ DomainConfig & setEncoding(Encoding v) { _encoding = v; return *this; }
+ DomainConfig & setPartSizeLimit(size_t v) { _partSizeLimit = v; return *this; }
+ DomainConfig & setChunkSizeLimit(size_t v) { _chunkSizeLimit = v; return *this; }
+ DomainConfig & setChunkAgeLimit(vespalib::duration v) { _chunkAgeLimit = v; return *this; }
+ DomainConfig & setCompressionLevel(uint8_t v) { _compressionLevel = v; return *this; }
+ Encoding getEncoding() const { return _encoding; }
+ size_t getPartSizeLimit() const { return _partSizeLimit; }
+ size_t getChunkSizeLimit() const { return _chunkSizeLimit; }
+ duration getChunkAgeLimit() const { return _chunkAgeLimit; }
+ uint8_t getCompressionlevel() const { return _compressionLevel; }
+private:
+ Encoding _encoding;
+ uint8_t _compressionLevel;
+ size_t _partSizeLimit;
+ size_t _chunkSizeLimit;
+ duration _chunkAgeLimit;
+};
+
struct PartInfo {
SerialNumRange range;
size_t numEntries;
size_t byteSize;
vespalib::string file;
- PartInfo(SerialNumRange range_in, size_t numEntries_in,
- size_t byteSize_in,
- vespalib::stringref file_in)
- : range(range_in), numEntries(numEntries_in), byteSize(byteSize_in),
- file(file_in) {}
+ PartInfo(SerialNumRange range_in, size_t numEntries_in, size_t byteSize_in, vespalib::stringref file_in)
+ : range(range_in),
+ numEntries(numEntries_in),
+ byteSize(byteSize_in),
+ file(file_in)
+ {}
};
struct DomainInfo {
@@ -40,17 +65,17 @@ class Domain
public:
using SP = std::shared_ptr<Domain>;
using Executor = vespalib::SyncableThreadExecutor;
- Domain(const vespalib::string &name, const vespalib::string &baseDir, Executor & commitExecutor,
- Executor & sessionExecutor, uint64_t domainPartSize, DomainPart::Crc defaultCrcType,
+ Domain(const vespalib::string &name, const vespalib::string &baseDir, FastOS_ThreadPool & threadPool,
+ Executor & commitExecutor, Executor & sessionExecutor, const DomainConfig & cfg,
const common::FileHeaderContext &fileHeaderContext);
- virtual ~Domain();
+ ~Domain();
DomainInfo getDomainInfo() const;
const vespalib::string & name() const { return _name; }
bool erase(SerialNum to);
- void commit(const Packet & packet);
+ void commit(const Packet & packet, Writer::DoneCallback onDone);
int visit(const Domain::SP & self, SerialNum from, SerialNum to, std::unique_ptr<Session::Destination> dest);
SerialNum begin() const;
@@ -77,7 +102,7 @@ public:
return _sessionExecutor.execute(std::move(task));
}
uint64_t size() const;
-
+ Domain & setConfig(const DomainConfig & cfg);
private:
SerialNum begin(const vespalib::LockGuard & guard) const;
SerialNum end(const vespalib::LockGuard & guard) const;
@@ -95,22 +120,26 @@ private:
using DomainPartList = std::map<int64_t, DomainPart::SP>;
using DurationSeconds = std::chrono::duration<double>;
- DomainPart::Crc _defaultCrcType;
- Executor & _commitExecutor;
- Executor & _sessionExecutor;
- std::atomic<int> _sessionId;
- vespalib::Monitor _syncMonitor;
- bool _pendingSync;
- vespalib::string _name;
- uint64_t _domainPartSize;
- DomainPartList _parts;
- vespalib::Lock _lock;
- vespalib::Lock _sessionLock;
- SessionList _sessions;
- DurationSeconds _maxSessionRunTime;
- vespalib::string _baseDir;
+ DomainConfig _config;
+ SerialNum _lastSerial;
+ FastOS_ThreadPool & _threadPool;
+ std::unique_ptr<Executor> _singleCommiter;
+ Executor & _commitExecutor;
+ Executor & _sessionExecutor;
+ std::atomic<int> _sessionId;
+ vespalib::Monitor _syncMonitor;
+ bool _pendingSync;
+ vespalib::string _name;
+ DomainPartList _parts;
+ vespalib::Lock _lock;
+ vespalib::Monitor _currentChunkMonitor;
+ vespalib::Lock _sessionLock;
+ SessionList _sessions;
+ DurationSeconds _maxSessionRunTime;
+ vespalib::string _baseDir;
const common::FileHeaderContext &_fileHeaderContext;
- bool _markedDeleted;
+ bool _markedDeleted;
+ FastOS_ThreadInterface * _self;
};
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
index 91599f8218a..3829a978ec4 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.cpp
@@ -27,30 +27,19 @@ namespace search::transactionlog {
namespace {
-void
-handleSync(FastOS_FileInterface &file) __attribute__ ((noinline));
+constexpr size_t TARGET_PACKET_SIZE = 0x3f000;
string
-handleWriteError(const char *text,
- FastOS_FileInterface &file,
- int64_t lastKnownGoodPos,
- const Packet::Entry &entry,
- int bufLen) __attribute__ ((noinline));
-
-bool
-handleReadError(const char *text,
- FastOS_FileInterface &file,
- ssize_t len,
- ssize_t rlen,
- int64_t lastKnownGoodPos,
- bool allowTruncate) __attribute__ ((noinline));
+handleWriteError(const char *text, FastOS_FileInterface &file, int64_t lastKnownGoodPos,
+ SerialNumRange range, int bufLen) __attribute__ ((noinline));
bool
-addPacket(Packet &packet,
- const Packet::Entry &e) __attribute__ ((noinline));
+handleReadError(const char *text, FastOS_FileInterface &file, ssize_t len, ssize_t rlen,
+ int64_t lastKnownGoodPos, bool allowTruncate) __attribute__ ((noinline));
-bool
-tailOfFileIsZero(FastOS_FileInterface &file, int64_t lastKnownGoodPos) __attribute__ ((noinline));
+void handleSync(FastOS_FileInterface &file) __attribute__ ((noinline));
+bool addPacket(Packet &packet, const Packet::Entry &e) __attribute__ ((noinline));
+bool tailOfFileIsZero(FastOS_FileInterface &file, int64_t lastKnownGoodPos) __attribute__ ((noinline));
bool
addPacket(Packet &packet, const Packet::Entry &e)
@@ -72,21 +61,18 @@ handleSync(FastOS_FileInterface &file)
}
string
-handleWriteError(const char *text,
- FastOS_FileInterface &file,
- int64_t lastKnownGoodPos,
- const Packet::Entry &entry,
- int bufLen)
+handleWriteError(const char *text, FastOS_FileInterface &file, int64_t lastKnownGoodPos,
+ SerialNumRange range, int bufLen)
{
string last(FastOS_File::getLastErrorString());
- string e(make_string("%s. File '%s' at position %" PRId64 " for entry %" PRIu64 " of length %u. "
+ string e(make_string("%s. File '%s' at position %" PRId64 " for entries [%" PRIu64 ", %" PRIu64 "] of length %u. "
"OS says '%s'. Rewind to last known good position %" PRId64 ".",
- text, file.GetFileName(), file.GetPosition(), entry.serial(), bufLen,
+ text, file.GetFileName(), file.GetPosition(), range.from(), range.to(), bufLen,
last.c_str(), lastKnownGoodPos));
LOG(error, "%s", e.c_str());
if ( ! file.SetPosition(lastKnownGoodPos) ) {
last = FastOS_File::getLastErrorString();
- throw runtime_error(make_string("Failed setting position %" PRId64 " of file '%s' of size %" PRId64 ": OS says '%s'",
+ throw runtime_error(make_string("Failed setting position %" PRId64 " of file '%s' of size %" PRId64 " : OS says '%s'",
lastKnownGoodPos, file.GetFileName(), file.GetSize(), last.c_str()));
}
handleSync(file);
@@ -118,12 +104,8 @@ tailOfFileIsZero(FastOS_FileInterface &file, int64_t lastKnownGoodPos)
}
bool
-handleReadError(const char *text,
- FastOS_FileInterface &file,
- ssize_t len,
- ssize_t rlen,
- int64_t lastKnownGoodPos,
- bool allowTruncate)
+handleReadError(const char *text, FastOS_FileInterface &file, ssize_t len, ssize_t rlen,
+ int64_t lastKnownGoodPos, bool allowTruncate)
{
bool retval(true);
if (rlen != -1) {
@@ -176,6 +158,20 @@ handleReadError(const char *text,
return retval;
}
+int32_t
+calcCrc(Encoding::Crc version, const void * buf, size_t sz)
+{
+ if (version == Encoding::Crc::xxh64) {
+ return static_cast<int32_t>(XXH64(buf, sz, 0ll));
+ } else if (version == Encoding::Crc::ccitt_crc32) {
+ vespalib::crc_32_type calculator;
+ calculator.process_bytes(buf, sz);
+ return calculator.checksum();
+ } else {
+ LOG_ABORT("should not be reached");
+ }
+}
+
}
int64_t
@@ -251,7 +247,6 @@ DomainPart::buildPacketMapping(bool allowTruncate)
}
}
}
- packet.close();
if (!packet.empty()) {
_packets[firstSerial] = packet;
_range.to(lastSerial);
@@ -265,22 +260,23 @@ DomainPart::buildPacketMapping(bool allowTruncate)
return currPos;
}
-DomainPart::DomainPart(const string & name, const string & baseDir, SerialNum s, Crc defaultCrc,
- const FileHeaderContext &fileHeaderContext, bool allowTruncate) :
- _defaultCrc(defaultCrc),
- _lock(),
- _fileLock(),
- _range(s),
- _sz(0),
- _byteSize(0),
- _packets(),
- _fileName(make_string("%s/%s-%016" PRIu64, baseDir.c_str(), name.c_str(), s)),
- _transLog(std::make_unique<FastOS_File>(_fileName.c_str())),
- _skipList(),
- _headerLen(0),
- _writeLock(),
- _writtenSerial(0),
- _syncedSerial(0)
+DomainPart::DomainPart(const string & name, const string & baseDir, SerialNum s, Encoding encoding,
+ uint8_t compressionLevel, const FileHeaderContext &fileHeaderContext, bool allowTruncate)
+ : _encoding(encoding.getCrc(), Encoding::Compression::none), //TODO We do not yet support compression
+ _compressionLevel(compressionLevel),
+ _lock(),
+ _fileLock(),
+ _range(s),
+ _sz(0),
+ _byteSize(0),
+ _packets(),
+ _fileName(make_string("%s/%s-%016" PRIu64, baseDir.c_str(), name.c_str(), s)),
+ _transLog(std::make_unique<FastOS_File>(_fileName.c_str())),
+ _skipList(),
+ _headerLen(0),
+ _writeLock(),
+ _writtenSerial(0),
+ _syncedSerial(0)
{
if (_transLog->OpenReadOnly()) {
int64_t currPos = buildPacketMapping(allowTruncate);
@@ -519,12 +515,9 @@ DomainPart::visit(SerialNumRange &r, Packet &packet)
}
}
}
- newPacket.close();
packet = newPacket;
retval = next != _packets.end();
}
- } else {
- packet.close();
}
} else {
/// File has been closed must continue from file.
@@ -568,7 +561,6 @@ DomainPart::visit(FastOS_FileInterface &file, SerialNumRange &r, Packet &packet)
}
}
}
- newPacket.close();
packet = newPacket;
}
@@ -582,19 +574,19 @@ DomainPart::write(FastOS_FileInterface &file, const Packet::Entry &entry)
int32_t crc(0);
uint32_t len(entry.serializedSize() + sizeof(crc));
nbostream os;
- os << static_cast<uint8_t>(_defaultCrc);
+ os << static_cast<uint8_t>(_encoding.getRaw());
os << len;
size_t start(os.size());
entry.serialize(os);
size_t end(os.size());
- crc = calcCrc(_defaultCrc, os.data() + start, end - start);
+ crc = calcCrc(_encoding.getCrc(), os.data() + start, end - start);
os << crc;
size_t osSize = os.size();
assert(osSize == len + sizeof(len) + sizeof(uint8_t));
LockGuard guard(_writeLock);
if ( ! file.CheckedWrite(os.data(), osSize) ) {
- throw runtime_error(handleWriteError("Failed writing the entry.", file, lastKnownGoodPos, entry, end - start));
+ throw runtime_error(handleWriteError("Failed writing the entry.", file, lastKnownGoodPos, SerialNumRange(entry.serial(), entry.serial()), end - start));
}
_writtenSerial = entry.serial();
_byteSize.store(lastKnownGoodPos + osSize, std::memory_order_release);
@@ -615,10 +607,10 @@ DomainPart::read(FastOS_FileInterface &file,
uint32_t len(0);
his >> version >> len;
if ((retval = (rlen == sizeof(tmp)))) {
- if ( ! (retval = (version == ccitt_crc32) || version == xxh64)) {
+ if ( ! (retval = (version == Encoding::Crc::ccitt_crc32) || version == Encoding::Crc::xxh64)) {
string msg(make_string("Version mismatch. Expected 'ccitt_crc32=1' or 'xxh64=2',"
- " got %d from '%s' at position %" PRId64,
- version, file.GetFileName(), lastKnownGoodPos));
+ " got %d from '%s' at position %" PRId64,
+ version, file.GetFileName(), lastKnownGoodPos));
if ((version == 0) && (len == 0) && tailOfFileIsZero(file, lastKnownGoodPos)) {
LOG(warning, "%s", msg.c_str());
return handleReadError("packet version", file, sizeof(tmp), rlen, lastKnownGoodPos, allowTruncate);
@@ -638,7 +630,7 @@ DomainPart::read(FastOS_FileInterface &file,
entry.deserialize(is);
int32_t crc(0);
is >> crc;
- int32_t crcVerify(calcCrc(static_cast<Crc>(version), buf.get(), len - sizeof(crc)));
+ int32_t crcVerify(calcCrc(Encoding(version).getCrc(), buf.get(), len - sizeof(crc)));
if (crc != crcVerify) {
throw runtime_error(make_string("Got bad crc for packet from '%s' (len pos=%" PRId64 ", len=%d) : crcVerify = %d, expected %d",
file.GetFileName(), file.GetPosition() - len - sizeof(len),
@@ -655,17 +647,4 @@ DomainPart::read(FastOS_FileInterface &file,
return retval;
}
-int32_t DomainPart::calcCrc(Crc version, const void * buf, size_t sz)
-{
- if (version == xxh64) {
- return static_cast<int32_t>(XXH64(buf, sz, 0ll));
- } else if (version == ccitt_crc32) {
- vespalib::crc_32_type calculator;
- calculator.process_bytes(buf, sz);
- return calculator.checksum();
- } else {
- LOG_ABORT("should not be reached");
- }
-}
-
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h
index 59d0df6df94..f3a53c1e9a9 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/domainpart.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/domainpart.h
@@ -2,6 +2,7 @@
#pragma once
#include "common.h"
+#include "ichunk.h"
#include <vespa/vespalib/util/sync.h>
#include <vespa/vespalib/util/memory.h>
#include <map>
@@ -19,13 +20,9 @@ private:
DomainPart& operator=(const DomainPart &);
public:
- enum Crc {
- ccitt_crc32=1,
- xxh64=2
- };
typedef std::shared_ptr<DomainPart> SP;
- DomainPart(const vespalib::string &name, const vespalib::string &baseDir, SerialNum s, Crc defaultCrc,
- const common::FileHeaderContext &FileHeaderContext, bool allowTruncate);
+ DomainPart(const vespalib::string &name, const vespalib::string &baseDir, SerialNum s, Encoding defaultEncoding,
+ uint8_t compressionLevel, const common::FileHeaderContext &FileHeaderContext, bool allowTruncate);
~DomainPart();
@@ -55,7 +52,6 @@ private:
static bool read(FastOS_FileInterface &file, Packet::Entry &entry, vespalib::alloc::Alloc &buf, bool allowTruncate);
void write(FastOS_FileInterface &file, const Packet::Entry &entry);
- static int32_t calcCrc(Crc crc, const void * buf, size_t len);
void writeHeader(const common::FileHeaderContext &fileHeaderContext);
class SkipInfo
@@ -77,21 +73,22 @@ private:
};
typedef std::vector<SkipInfo> SkipList;
typedef std::map<SerialNum, Packet> PacketList;
- const Crc _defaultCrc;
- vespalib::Lock _lock;
- vespalib::Lock _fileLock;
- SerialNumRange _range;
- size_t _sz;
+ const Encoding _encoding;
+ const uint8_t _compressionLevel;
+ vespalib::Lock _lock;
+ vespalib::Lock _fileLock;
+ SerialNumRange _range;
+ size_t _sz;
std::atomic<uint64_t> _byteSize;
- PacketList _packets;
- vespalib::string _fileName;
+ PacketList _packets;
+ vespalib::string _fileName;
std::unique_ptr<FastOS_FileInterface> _transLog;
- SkipList _skipList;
- uint32_t _headerLen;
- vespalib::Lock _writeLock;
+ SkipList _skipList;
+ uint32_t _headerLen;
+ vespalib::Lock _writeLock;
// Protected by _writeLock
- SerialNum _writtenSerial;
- SerialNum _syncedSerial;
+ SerialNum _writtenSerial;
+ SerialNum _syncedSerial;
};
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h
index 4aa1b0a5a90..5e44815cb1b 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/ichunk.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/ichunk.h
@@ -22,7 +22,7 @@ public:
lz4 = 2,
zstd = 3
};
- Encoding(uint8_t raw) : _raw(raw) {}
+ explicit Encoding(uint8_t raw) : _raw(raw) { }
Encoding(Crc crc, Compression compression);
Crc getCrc() const { return Crc(_raw & 0xf); }
Compression getCompression() const { return Compression((_raw >> 4) & 0xf); }
diff --git a/searchlib/src/vespa/searchlib/transactionlog/session.h b/searchlib/src/vespa/searchlib/transactionlog/session.h
index bf35d83c000..9b8d23371e8 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/session.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/session.h
@@ -25,7 +25,7 @@ private:
public:
class Destination {
public:
- virtual ~Destination() {}
+ virtual ~Destination() = default;
virtual bool send(int32_t id, const vespalib::string & domain, const Packet & packet) = 0;
virtual bool sendDone(int32_t id, const vespalib::string & domain) = 0;
virtual bool connected() const = 0;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
index caef792704a..b98453a7648 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
@@ -1,5 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "translogserver.h"
+#include <vespa/searchlib/common/gatecallback.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/vespalib/io/fileutil.h>
#include <vespa/vespalib/util/time.h>
@@ -78,25 +79,25 @@ SyncHandler::PerformTask()
TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir,
const FileHeaderContext &fileHeaderContext)
- : TransLogServer(name, listenPort, baseDir, fileHeaderContext, 0x10000000)
+ : TransLogServer(name, listenPort, baseDir, fileHeaderContext,
+ DomainConfig().setEncoding(Encoding(Encoding::xxh64, Encoding::Compression::none))
+ .setPartSizeLimit(0x10000000).setChunkSizeLimit(0x40000).setChunkAgeLimit( 100us))
{}
TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir,
- const FileHeaderContext &fileHeaderContext, uint64_t domainPartSize)
- : TransLogServer(name, listenPort, baseDir, fileHeaderContext, domainPartSize, 4, DomainPart::Crc::xxh64)
+ const FileHeaderContext &fileHeaderContext, const DomainConfig & cfg)
+ : TransLogServer(name, listenPort, baseDir, fileHeaderContext, cfg, 4)
{}
TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir,
- const FileHeaderContext &fileHeaderContext, uint64_t domainPartSize,
- size_t maxThreads, DomainPart::Crc defaultCrcType)
+ const FileHeaderContext &fileHeaderContext, const DomainConfig & cfg, size_t maxThreads)
: FRT_Invokable(),
_name(name),
_baseDir(baseDir),
- _domainPartSize(domainPartSize),
- _defaultCrcType(defaultCrcType),
+ _domainConfig(cfg),
_commitExecutor(maxThreads, 128*1024),
_sessionExecutor(maxThreads, 128*1024),
- _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)),
+ _threadPool(std::make_unique<FastOS_ThreadPool>(1024*120)),
_transport(std::make_unique<FNET_Transport>()),
_supervisor(std::make_unique<FRT_Supervisor>(_transport.get())),
_domains(),
@@ -112,8 +113,8 @@ TransLogServer::TransLogServer(const vespalib::string &name, int listenPort, con
domainDir >> domainName;
if ( ! domainName.empty()) {
try {
- auto domain = std::make_shared<Domain>(domainName, dir(), _commitExecutor, _sessionExecutor,
- _domainPartSize, _defaultCrcType,_fileHeaderContext);
+ auto domain = make_shared<Domain>(domainName, dir(), *_threadPool, _commitExecutor,
+ _sessionExecutor, cfg, _fileHeaderContext);
_domains[domain->name()] = domain;
} catch (const std::exception & e) {
LOG(warning, "Failed creating %s domain on startup. Exception = %s", domainName.c_str(), e.what());
@@ -202,10 +203,21 @@ TransLogServer::run()
req->Return();
}
}
- } while (running() && !(hasPacket && (req == NULL)));
+ } while (running() && !(hasPacket && (req == nullptr)));
LOG(info, "TLS Stopped");
}
+
+TransLogServer &
+TransLogServer::setDomainConfig(const DomainConfig & cfg) {
+ Guard domainGuard(_lock);
+ _domainConfig = cfg;
+ for(auto &domain: _domains) {
+ domain.second->setConfig(cfg);
+ }
+ return *this;
+}
+
DomainStats
TransLogServer::getDomainStats() const
{
@@ -434,8 +446,8 @@ TransLogServer::createDomain(FRT_RPCRequest *req)
Domain::SP domain(findDomain(domainName));
if ( !domain ) {
try {
- domain = std::make_shared<Domain>(domainName, dir(), _commitExecutor, _sessionExecutor,
- _domainPartSize, _defaultCrcType, _fileHeaderContext);
+ domain = std::make_shared<Domain>(domainName, dir(), *_threadPool, _commitExecutor,
+ _sessionExecutor, _domainConfig, _fileHeaderContext);
Guard domainGuard(_lock);
_domains[domain->name()] = domain;
writeDomainDir(domainGuard, dir(), domainList(), _domains);
@@ -544,10 +556,9 @@ TransLogServer::domainStatus(FRT_RPCRequest *req)
void
TransLogServer::commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done)
{
- (void) done;
Domain::SP domain(findDomain(domainName));
if (domain) {
- domain->commit(packet);
+ domain->commit(packet, std::move(done));
} else {
throw IllegalArgumentException("Could not find domain " + domainName);
}
@@ -564,7 +575,9 @@ TransLogServer::domainCommit(FRT_RPCRequest *req)
if (domain) {
Packet packet(params[1]._data._buf, params[1]._data._len);
try {
- domain->commit(packet);
+ vespalib::Gate gate;
+ domain->commit(packet, make_shared<GateCallback>(gate));
+ gate.await();
ret.AddInt32(0);
ret.AddString("ok");
} catch (const std::exception & e) {
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
index 0d65f36e07d..3f945977386 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
@@ -26,16 +26,15 @@ public:
typedef std::shared_ptr<TransLogServer> SP;
TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir,
- const common::FileHeaderContext &fileHeaderContext,
- uint64_t domainPartSize, size_t maxThreads, DomainPart::Crc defaultCrc);
+ const common::FileHeaderContext &fileHeaderContext, const DomainConfig & cfg, size_t maxThreads);
TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir,
- const common::FileHeaderContext &fileHeaderContext, uint64_t domainPartSize);
+ const common::FileHeaderContext &fileHeaderContext, const DomainConfig & cfg);
TransLogServer(const vespalib::string &name, int listenPort, const vespalib::string &baseDir,
const common::FileHeaderContext &fileHeaderContext);
~TransLogServer() override;
DomainStats getDomainStats() const;
-
void commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done) override;
+ TransLogServer & setDomainConfig(const DomainConfig & cfg);
class Session
{
@@ -82,8 +81,7 @@ private:
vespalib::string _name;
vespalib::string _baseDir;
- const uint64_t _domainPartSize;
- const DomainPart::Crc _defaultCrcType;
+ DomainConfig _domainConfig;
vespalib::ThreadStackExecutor _commitExecutor;
vespalib::ThreadStackExecutor _sessionExecutor;
std::unique_ptr<FastOS_ThreadPool> _threadPool;
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp
index b8d21fb7465..d83623661ff 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserverapp.cpp
@@ -2,6 +2,7 @@
#include "translogserverapp.h"
#include <vespa/config/subscription/configuri.h>
+#include <vespa/vespalib/util/time.h>
#include <vespa/log/log.h>
LOG_SETUP(".translogserverapp");
@@ -24,16 +25,59 @@ TransLogServerApp::TransLogServerApp(const config::ConfigUri & tlsConfigUri,
namespace {
-DomainPart::Crc
+Encoding::Crc
getCrc(searchlib::TranslogserverConfig::Crcmethod crcType)
{
switch (crcType) {
case searchlib::TranslogserverConfig::Crcmethod::ccitt_crc32:
- return DomainPart::ccitt_crc32;
+ return Encoding::Crc::ccitt_crc32;
case searchlib::TranslogserverConfig::Crcmethod::xxh64:
- return DomainPart::xxh64;
+ return Encoding::Crc::xxh64;
}
- LOG_ABORT("should not be reached");
+ assert(false);
+}
+
+Encoding::Compression
+getCompression(searchlib::TranslogserverConfig::Compression::Type type)
+{
+ switch (type) {
+ case searchlib::TranslogserverConfig::Compression::Type::NONE:
+ return Encoding::Compression::none;
+ case searchlib::TranslogserverConfig::Compression::Type::NONE_MULTI:
+ return Encoding::Compression::none_multi;
+ case searchlib::TranslogserverConfig::Compression::Type::LZ4:
+ return Encoding::Compression::lz4;
+ case searchlib::TranslogserverConfig::Compression::Type::ZSTD:
+ return Encoding::Compression::zstd;
+ }
+ assert(false);
+}
+
+Encoding
+getEncoding(const searchlib::TranslogserverConfig & cfg)
+{
+ return Encoding(getCrc(cfg.crcmethod), getCompression(cfg.compression.type));
+}
+
+DomainConfig
+getDomainConfig(const searchlib::TranslogserverConfig & cfg) {
+ DomainConfig dcfg;
+ dcfg.setEncoding(getEncoding(cfg))
+ .setCompressionLevel(cfg.compression.level)
+ .setPartSizeLimit(cfg.filesizemax)
+ .setChunkSizeLimit(cfg.chunk.sizelimit)
+ .setChunkAgeLimit(vespalib::from_s(cfg.chunk.agelimit));
+ return dcfg;
+}
+
+void
+logReconfig(const searchlib::TranslogserverConfig & cfg, const DomainConfig & dcfg) {
+ LOG(config, "configure Transaction Log Server %s at port %d\n"
+ "DomainConfig {encoding={%d, %d}, compression_level=%d, part_limit=%ld, chunk_limit=%ld age=%1.4f}",
+ cfg.servername.c_str(), cfg.listenport,
+ dcfg.getEncoding().getCrc(), dcfg.getEncoding().getCompression(), dcfg.getCompressionlevel(),
+ dcfg.getPartSizeLimit(), dcfg.getChunkSizeLimit(), vespalib::to_s(dcfg.getChunkAgeLimit())
+ );
}
}
@@ -41,11 +85,12 @@ getCrc(searchlib::TranslogserverConfig::Crcmethod crcType)
void
TransLogServerApp::start()
{
- std::shared_ptr<searchlib::TranslogserverConfig> c = _tlsConfig.get();
- auto tls = std::make_shared<TransLogServer>(c->servername, c->listenport, c->basedir, _fileHeaderContext,
- c->filesizemax, c->maxthreads, getCrc(c->crcmethod));
std::lock_guard<std::mutex> guard(_lock);
- _tls = std::move(tls);
+ auto c = _tlsConfig.get();
+ DomainConfig domainConfig = getDomainConfig(*c);
+ logReconfig(*c, domainConfig);
+ _tls = std::make_shared<TransLogServer>(c->servername, c->listenport, c->basedir, _fileHeaderContext,
+ domainConfig, c->maxthreads);
}
TransLogServerApp::~TransLogServerApp()
@@ -56,9 +101,15 @@ TransLogServerApp::~TransLogServerApp()
void
TransLogServerApp::configure(std::unique_ptr<searchlib::TranslogserverConfig> cfg)
{
- LOG(config, "configure Transaction Log Server %s at port %d", cfg->servername.c_str(), cfg->listenport);
+
+ std::lock_guard<std::mutex> guard(_lock);
+ DomainConfig dcfg = getDomainConfig(*cfg);
+ logReconfig(*cfg, dcfg);
_tlsConfig.set(cfg.release());
_tlsConfig.latch();
+ if (_tls) {
+ _tls->setDomainConfig(dcfg);
+ }
}
TransLogServer::SP
diff --git a/vespajlib/src/main/java/com/yahoo/tensor/IndexedTensor.java b/vespajlib/src/main/java/com/yahoo/tensor/IndexedTensor.java
index ad82dd6c3ac..dc17c657db9 100644
--- a/vespajlib/src/main/java/com/yahoo/tensor/IndexedTensor.java
+++ b/vespajlib/src/main/java/com/yahoo/tensor/IndexedTensor.java
@@ -178,9 +178,7 @@ public abstract class IndexedTensor implements Tensor {
@Override
public abstract IndexedTensor withType(TensorType type);
- public DimensionSizes dimensionSizes() {
- return dimensionSizes;
- }
+ public DimensionSizes dimensionSizes() { return dimensionSizes; }
@Override
public Map<TensorAddress, Double> cells() {
diff --git a/vespajlib/src/main/java/com/yahoo/tensor/MixedTensor.java b/vespajlib/src/main/java/com/yahoo/tensor/MixedTensor.java
index 1ec4993bf57..f608aead347 100644
--- a/vespajlib/src/main/java/com/yahoo/tensor/MixedTensor.java
+++ b/vespajlib/src/main/java/com/yahoo/tensor/MixedTensor.java
@@ -152,7 +152,6 @@ public class MixedTensor implements Tensor {
return index.denseSubspaceSize();
}
-
/**
* Base class for building mixed tensors.
*/
diff --git a/vespajlib/src/main/java/com/yahoo/tensor/serialization/TypedBinaryFormat.java b/vespajlib/src/main/java/com/yahoo/tensor/serialization/TypedBinaryFormat.java
index bcff4392c9a..5c47572c779 100644
--- a/vespajlib/src/main/java/com/yahoo/tensor/serialization/TypedBinaryFormat.java
+++ b/vespajlib/src/main/java/com/yahoo/tensor/serialization/TypedBinaryFormat.java
@@ -51,31 +51,41 @@ public class TypedBinaryFormat {
}
private static BinaryFormat getFormatEncoder(GrowableByteBuffer buffer, Tensor tensor) {
- if (tensor instanceof MixedTensor && tensor.type().valueType() == TensorType.Value.DOUBLE) {
+ boolean hasMappedDimensions = tensor.type().dimensions().stream().anyMatch(d -> d.isMapped());
+ boolean hasIndexedDimensions = tensor.type().dimensions().stream().anyMatch(d -> d.isIndexed());
+ boolean isMixed = hasMappedDimensions && hasIndexedDimensions;
+
+ // TODO: Encoding as indexed if the implementation is mixed is not yet supported so use mixed format instead
+ if (tensor instanceof MixedTensor && ! isMixed && hasIndexedDimensions)
+ isMixed = true;
+
+ if (isMixed && tensor.type().valueType() == TensorType.Value.DOUBLE) {
encodeFormatType(buffer, MIXED_BINARY_FORMAT_TYPE);
return new MixedBinaryFormat();
}
- if (tensor instanceof MixedTensor) {
+ else if (isMixed) {
encodeFormatType(buffer, MIXED_BINARY_FORMAT_WITH_CELLTYPE);
encodeValueType(buffer, tensor.type().valueType());
return new MixedBinaryFormat(tensor.type().valueType());
}
- if (tensor instanceof IndexedTensor && tensor.type().valueType() == TensorType.Value.DOUBLE) {
+ else if (hasIndexedDimensions && tensor.type().valueType() == TensorType.Value.DOUBLE) {
encodeFormatType(buffer, DENSE_BINARY_FORMAT_TYPE);
return new DenseBinaryFormat();
}
- if (tensor instanceof IndexedTensor) {
+ else if (hasIndexedDimensions) {
encodeFormatType(buffer, DENSE_BINARY_FORMAT_WITH_CELLTYPE);
encodeValueType(buffer, tensor.type().valueType());
return new DenseBinaryFormat(tensor.type().valueType());
}
- if (tensor.type().valueType() == TensorType.Value.DOUBLE) {
+ else if (tensor.type().valueType() == TensorType.Value.DOUBLE) {
encodeFormatType(buffer, SPARSE_BINARY_FORMAT_TYPE);
return new SparseBinaryFormat();
}
- encodeFormatType(buffer, SPARSE_BINARY_FORMAT_WITH_CELLTYPE);
- encodeValueType(buffer, tensor.type().valueType());
- return new SparseBinaryFormat(tensor.type().valueType());
+ else {
+ encodeFormatType(buffer, SPARSE_BINARY_FORMAT_WITH_CELLTYPE);
+ encodeValueType(buffer, tensor.type().valueType());
+ return new SparseBinaryFormat(tensor.type().valueType());
+ }
}
private static BinaryFormat getFormatDecoder(GrowableByteBuffer buffer) {
diff --git a/vespajlib/src/test/java/com/yahoo/tensor/serialization/SerializationTestCase.java b/vespajlib/src/test/java/com/yahoo/tensor/serialization/SerializationTestCase.java
index f002637847b..066a63b6d90 100644
--- a/vespajlib/src/test/java/com/yahoo/tensor/serialization/SerializationTestCase.java
+++ b/vespajlib/src/test/java/com/yahoo/tensor/serialization/SerializationTestCase.java
@@ -71,7 +71,8 @@ public class SerializationTestCase {
serializedToABinaryRepresentation = true;
}
}
- assertTrue("Tensor did not serialize to one of the given representations", serializedToABinaryRepresentation);
+ assertTrue("Tensor serialized to one of the given representations",
+ serializedToABinaryRepresentation);
}
}
}
diff --git a/vespajlib/src/test/java/com/yahoo/tensor/serialization/SparseBinaryFormatTestCase.java b/vespajlib/src/test/java/com/yahoo/tensor/serialization/SparseBinaryFormatTestCase.java
index 9074579094c..50b71024ddf 100644
--- a/vespajlib/src/test/java/com/yahoo/tensor/serialization/SparseBinaryFormatTestCase.java
+++ b/vespajlib/src/test/java/com/yahoo/tensor/serialization/SparseBinaryFormatTestCase.java
@@ -2,7 +2,9 @@
package com.yahoo.tensor.serialization;
import com.yahoo.io.GrowableByteBuffer;
+import com.yahoo.tensor.MixedTensor;
import com.yahoo.tensor.Tensor;
+import com.yahoo.tensor.TensorAddress;
import com.yahoo.tensor.TensorType;
import org.junit.Test;
@@ -31,6 +33,17 @@ public class SparseBinaryFormatTestCase {
}
@Test
+ public void testSerializationFormatIsDecidedByTensorTypeNotImplementationType() {
+ Tensor sparse = Tensor.Builder.of(TensorType.fromSpec("tensor(x{})"))
+ .cell(TensorAddress.ofLabels("key1"), 9.1).build();
+ Tensor sparseAsMixed = MixedTensor.Builder.of(TensorType.fromSpec("tensor(x{})"))
+ .cell(TensorAddress.ofLabels("key1"), 9.1).build();
+ byte[] sparseEncoded = TypedBinaryFormat.encode(sparse);
+ byte[] sparseAsMixedEncoded = TypedBinaryFormat.encode(sparseAsMixed);
+ assertEquals(Arrays.toString(sparseEncoded), Arrays.toString(sparseAsMixedEncoded));
+ }
+
+ @Test
public void testSerializationToSeparateType() {
try {
assertSerialization(Tensor.from("tensor(x{},y{}):{{x:0,y:0}:2.0}"), TensorType.fromSpec("tensor(x{})"));
@@ -55,7 +68,8 @@ public class SparseBinaryFormatTestCase {
@Test
public void requireThatFloatSerializationFormatDoNotChange() {
- byte[] encodedTensor = new byte[] {5, // binary format type
+ byte[] encodedTensor = new byte[] {
+ 5, // binary format type
1, // float type
2, // num dimensions
2, (byte)'x', (byte)'y', 1, (byte)'z', // dimensions
@@ -63,7 +77,7 @@ public class SparseBinaryFormatTestCase {
2, (byte)'a', (byte)'b', 1, (byte)'e', 64, 0, 0, 0, // cell 0
2, (byte)'c', (byte)'d', 1, (byte)'e', 64, 64, 0, 0}; // cell 1
assertEquals(Arrays.toString(encodedTensor),
- Arrays.toString(TypedBinaryFormat.encode(Tensor.from("tensor<float>(xy{},z{}):{{xy:ab,z:e}:2.0,{xy:cd,z:e}:3.0}"))));
+ Arrays.toString(TypedBinaryFormat.encode(Tensor.from("tensor<float>(xy{},z{}):{{xy:ab,z:e}:2.0,{xy:cd,z:e}:3.0}"))));
}
@Test