diff options
129 files changed, 1783 insertions, 1504 deletions
diff --git a/config-application-package/src/main/java/com/yahoo/config/model/application/provider/FileReferenceCreator.java b/config-application-package/src/main/java/com/yahoo/config/model/application/provider/FileReferenceCreator.java deleted file mode 100644 index 7e1d247281c..00000000000 --- a/config-application-package/src/main/java/com/yahoo/config/model/application/provider/FileReferenceCreator.java +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.config.model.application.provider; - -import com.yahoo.config.FileReference; - -import java.lang.reflect.Constructor; - -/** - * Convenience for creating a {@link com.yahoo.config.FileReference}. - * - * @author gjoranv - */ -public class FileReferenceCreator { - - public static FileReference create(String stringVal) { - try { - Constructor<FileReference> ctor = FileReference.class.getDeclaredConstructor(String.class); - ctor.setAccessible(true); - return ctor.newInstance(stringVal); - } catch (Exception e) { - throw new RuntimeException("Could not create a new " + FileReference.class.getName() + - ". This should never happen!", e); - } - } - -} diff --git a/config-application-package/src/main/java/com/yahoo/config/model/application/provider/MockFileRegistry.java b/config-application-package/src/main/java/com/yahoo/config/model/application/provider/MockFileRegistry.java index ca0b37d8cc3..d635fe90ded 100644 --- a/config-application-package/src/main/java/com/yahoo/config/model/application/provider/MockFileRegistry.java +++ b/config-application-package/src/main/java/com/yahoo/config/model/application/provider/MockFileRegistry.java @@ -17,7 +17,7 @@ import java.util.Set; public class MockFileRegistry implements FileRegistry { public FileReference addFile(String relativePath) { - return FileReferenceCreator.create("0123456789abcdef"); + return new FileReference("0123456789abcdef"); } @Override @@ -25,8 +25,8 @@ public class MockFileRegistry implements FileRegistry { return "localhost.fortestingpurposesonly"; } - public static final Entry entry1 = new Entry("component/path1", FileReferenceCreator.create("1234")); - public static final Entry entry2 = new Entry("component/path2", FileReferenceCreator.create("56789")); + public static final Entry entry1 = new Entry("component/path1", new FileReference("1234")); + public static final Entry entry2 = new Entry("component/path2", new FileReference("56789")); public List<Entry> export() { List<Entry> result = new ArrayList<>(); diff --git a/config-application-package/src/main/java/com/yahoo/config/model/application/provider/PreGeneratedFileRegistry.java b/config-application-package/src/main/java/com/yahoo/config/model/application/provider/PreGeneratedFileRegistry.java index 29e83e00305..0b0b799f47f 100644 --- a/config-application-package/src/main/java/com/yahoo/config/model/application/provider/PreGeneratedFileRegistry.java +++ b/config-application-package/src/main/java/com/yahoo/config/model/application/provider/PreGeneratedFileRegistry.java @@ -70,7 +70,7 @@ public class PreGeneratedFileRegistry implements FileRegistry { } public FileReference addFile(String relativePath) { - return FileReferenceCreator.create(path2Hash.get(relativePath)); + return new FileReference(path2Hash.get(relativePath)); } @Override @@ -86,7 +86,7 @@ public class PreGeneratedFileRegistry implements FileRegistry { public List<Entry> export() { List<Entry> entries = new ArrayList<>(); for (Map.Entry<String, String> entry : path2Hash.entrySet()) { - entries.add(new Entry(entry.getKey(), FileReferenceCreator.create(entry.getValue()))); + entries.add(new Entry(entry.getKey(), new FileReference(entry.getValue()))); } return entries; } diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/FileDistributionOptions.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/FileDistributionOptions.java index b58406e1935..0160978773d 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/admin/FileDistributionOptions.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/FileDistributionOptions.java @@ -7,28 +7,39 @@ import com.yahoo.cloud.config.filedistribution.FiledistributorConfig; /** * Options for controlling the behavior of the file distribution services. + * * @author tonytv */ public class FileDistributionOptions implements FiledistributorConfig.Producer { + public static FileDistributionOptions defaultOptions() { return new FileDistributionOptions(); } - private FileDistributionOptions() {} + private FileDistributionOptions() { + } + + private BinaryScaledAmount uploadBitRate = new BinaryScaledAmount(); + private BinaryScaledAmount downloadBitRate = new BinaryScaledAmount(); + private boolean disabled = false; - private BinaryScaledAmount uploadbitrate = new BinaryScaledAmount(); - private BinaryScaledAmount downloadbitrate = new BinaryScaledAmount(); - //Called through reflection - public void downloadbitrate(BinaryScaledAmount amount) { + public void downloadBitRate(BinaryScaledAmount amount) { ensureNonNegative(amount); - downloadbitrate = amount; + downloadBitRate = amount; } - //Called through reflection - public void uploadbitrate(BinaryScaledAmount amount) { + public void uploadBitRate(BinaryScaledAmount amount) { ensureNonNegative(amount); - uploadbitrate = amount; + uploadBitRate = amount; + } + + public void disabled(boolean value) { + disabled = value; + } + + public boolean disabled() { + return disabled; } private void ensureNonNegative(BinaryScaledAmount amount) { @@ -38,12 +49,12 @@ public class FileDistributionOptions implements FiledistributorConfig.Producer { private int byteRate(BinaryScaledAmount bitRate) { BinaryScaledAmount byteRate = bitRate.divide(8); - return (int)byteRate.as(BinaryPrefix.unit); + return (int) byteRate.as(BinaryPrefix.unit); } @Override public void getConfig(FiledistributorConfig.Builder builder) { - builder.maxuploadspeed((double)byteRate(uploadbitrate)); - builder.maxdownloadspeed((double)byteRate(downloadbitrate)); + builder.maxuploadspeed((double) byteRate(uploadBitRate)); + builder.maxdownloadspeed((double) byteRate(downloadBitRate)); } } diff --git a/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/DomFileDistributionOptionsBuilder.java b/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/DomFileDistributionOptionsBuilder.java index 55c3f569900..8a5d6846a64 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/DomFileDistributionOptionsBuilder.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/DomFileDistributionOptionsBuilder.java @@ -6,47 +6,42 @@ import com.yahoo.text.XML; import com.yahoo.vespa.model.admin.FileDistributionOptions; import org.w3c.dom.Element; +import java.util.Optional; + /** - * Builds a file distribution options. - * @author tonytv + * Builds file distribution options. + * + * @author Tony Vaagenes + * @author hmusum */ public class DomFileDistributionOptionsBuilder { + private static void throwExceptionForElementInFileDistribution(String subElement, String reason) { throw new RuntimeException("In element '" + subElement + "' contained in 'filedistribution': " + reason); } - private static void callSetter(FileDistributionOptions options, String name, BinaryScaledAmount amount) { - try { - options.getClass().getMethod(name, BinaryScaledAmountParser.class).invoke(options, amount); - } catch (IllegalArgumentException e) { - throwExceptionForElementInFileDistribution(name, e.getMessage()); - } - catch (Exception e) { - if (e instanceof RuntimeException) - throw (RuntimeException)e; - else - throw new RuntimeException(e); - } - } - - private static void setIfPresent(FileDistributionOptions options, String name, Element fileDistributionElement) { + private Optional<BinaryScaledAmount> getAmount(String name, Element fileDistributionElement) { + Element optionElement = XML.getChild(fileDistributionElement, name); try { - Element optionElement = XML.getChild(fileDistributionElement, name); if (optionElement != null) { String valueString = XML.getValue(optionElement); - BinaryScaledAmount amount = BinaryScaledAmountParser.parse(valueString); - callSetter(options, name, amount); + return Optional.of(BinaryScaledAmountParser.parse(valueString)); } } catch (NumberFormatException e) { throwExceptionForElementInFileDistribution(name, "Expected a valid number. (Message = " + e.getMessage() + ")."); } + return Optional.empty(); } public FileDistributionOptions build(Element fileDistributionElement) { FileDistributionOptions options = FileDistributionOptions.defaultOptions(); if (fileDistributionElement != null) { - setIfPresent(options, "uploadbitrate", fileDistributionElement); - setIfPresent(options, "downloadbitrate", fileDistributionElement); + getAmount("uploadbitrate", fileDistributionElement).ifPresent(options::uploadBitRate); + getAmount("downloadbitrate", fileDistributionElement).ifPresent(options::downloadBitRate); + Element disable = XML.getChild(fileDistributionElement, "disabled"); + if (disable != null) { + options.disabled(Boolean.valueOf(XML.getValue(disable))); + } } return options; } diff --git a/config-model/src/main/java/com/yahoo/vespa/model/container/Identity.java b/config-model/src/main/java/com/yahoo/vespa/model/container/Identity.java index 81762f95a90..d3038a32bfe 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/container/Identity.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/container/Identity.java @@ -5,6 +5,9 @@ import com.yahoo.container.core.identity.IdentityConfig; import com.yahoo.container.jdisc.athenz.impl.AthenzIdentityProviderImpl; import com.yahoo.vespa.model.container.component.SimpleComponent; +import java.net.URI; +import java.util.Optional; + /** * @author mortent */ @@ -13,9 +16,9 @@ public class Identity extends SimpleComponent implements IdentityConfig.Producer private final String domain; private final String service; - private final String loadBalancerAddress; + private final URI loadBalancerAddress; - public Identity(String domain, String service, String loadBalancerAddress) { + public Identity(String domain, String service, URI loadBalancerAddress) { super(CLASS); this.domain = domain; this.service = service; @@ -26,6 +29,13 @@ public class Identity extends SimpleComponent implements IdentityConfig.Producer public void getConfig(IdentityConfig.Builder builder) { builder.domain(domain); builder.service(service); - builder.loadBalancerAddress(loadBalancerAddress); + // Load balancer address might not have been set + // Current interpretation of loadbalancer address is: hostname. + // Config should be renamed or send the uri + builder.loadBalancerAddress( + Optional.ofNullable(loadBalancerAddress) + .map(URI::getHost) + .orElse("") + ); } } diff --git a/config-model/src/main/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilder.java b/config-model/src/main/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilder.java index 32f2a59a881..f8d69d1b0ac 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilder.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/container/xml/ContainerModelBuilder.java @@ -7,7 +7,6 @@ import com.yahoo.config.application.Xml; import com.yahoo.config.application.api.ApplicationPackage; import com.yahoo.config.application.api.DeployLogger; import com.yahoo.config.model.ConfigModelContext; -import com.yahoo.config.model.api.ConfigServerSpec; import com.yahoo.config.model.application.provider.IncludeDirs; import com.yahoo.config.model.builder.xml.ConfigModelBuilder; import com.yahoo.config.model.builder.xml.ConfigModelId; @@ -18,7 +17,6 @@ import com.yahoo.config.provision.ClusterSpec; import com.yahoo.config.provision.Environment; import com.yahoo.config.provision.NodeType; import com.yahoo.container.jdisc.config.MetricDefaultsConfig; -import com.yahoo.log.LogLevel; import com.yahoo.search.rendering.RendererRegistry; import com.yahoo.text.XML; import com.yahoo.vespa.defaults.Defaults; @@ -168,8 +166,7 @@ public class ContainerModelBuilder extends ConfigModelBuilder<ContainerModel> { // Athenz copper argos // NOTE: Must be done after addNodes() - addIdentity(spec, cluster, context.getDeployState().getProperties().configServerSpecs(), - context.getDeployState().getProperties().loadBalancerAddress()); + addIdentity(spec, cluster, context.getDeployState().getProperties().loadBalancerAddress()); //TODO: overview handler, see DomQrserverClusterBuilder } @@ -697,23 +694,13 @@ public class ContainerModelBuilder extends ConfigModelBuilder<ContainerModel> { } } - private void addIdentity(Element element, ContainerCluster cluster, List<ConfigServerSpec> configServerSpecs, URI loadBalancerAddress) { + private void addIdentity(Element element, ContainerCluster cluster, URI loadBalancerAddress) { Element identityElement = XML.getChild(element, "identity"); if(identityElement != null) { String domain = XML.getValue(XML.getChild(identityElement, "domain")); String service = XML.getValue(XML.getChild(identityElement, "service")); - // TODO: Remove after verifying that this is propagated correctly - logger.log(LogLevel.INFO, String.format("loadBalancerAddress: %s", loadBalancerAddress)); - - // TODO: Inject the load balancer address. For now only add first configserver - // TODO: The loadBalancerAddress is a URI, not specific host. - // TODO: Either rename loadBalancerAddress -> loadBalancerName (or similar) or - // TODO: make consumers of it use URI. - String cfgHostName = configServerSpecs.stream().findFirst().map(ConfigServerSpec::getHostName) - .orElse(""); // How to test this? - - Identity identity = new Identity(domain.trim(), service.trim(), cfgHostName); + Identity identity = new Identity(domain.trim(), service.trim(), loadBalancerAddress); cluster.addComponent(identity); cluster.getContainers().forEach(container -> { diff --git a/config-model/src/main/java/com/yahoo/vespa/model/filedistribution/FileDistributor.java b/config-model/src/main/java/com/yahoo/vespa/model/filedistribution/FileDistributor.java index 8860f5c2249..f6cc9203d00 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/filedistribution/FileDistributor.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/filedistribution/FileDistributor.java @@ -110,11 +110,5 @@ public class FileDistributor { public void reloadDeployFileDistributor(FileDistribution dbHandler) { dbHandler.reloadDeployFileDistributor(); } - - private Set<String> union(Set<String> hosts, String... additionalHosts) { - Set<String> result = new HashSet<>(hosts); - result.addAll(asList(additionalHosts)); - return result; - } } diff --git a/config-model/src/main/java/com/yahoo/vespa/model/filedistribution/FileDistributorService.java b/config-model/src/main/java/com/yahoo/vespa/model/filedistribution/FileDistributorService.java index 6c2da51f3e3..dd9e057e2fa 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/filedistribution/FileDistributorService.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/filedistribution/FileDistributorService.java @@ -54,8 +54,10 @@ public class FileDistributorService extends AbstractService implements @Override public String getStartupCommand() { - return "exec $ROOT/sbin/vespa-filedistributor" - + " --configid " + getConfigId(); + // If disabled config proxy should act as file distributor, so don't start this service + return (fileDistributionOptions.disabled()) + ? null + : "exec $ROOT/sbin/vespa-filedistributor" + " --configid " + getConfigId(); } @Override @@ -88,7 +90,9 @@ public class FileDistributorService extends AbstractService implements @Override public void getConfig(FiledistributorrpcConfig.Builder builder) { - builder.connectionspec("tcp/" + getHostName() + ":" + getRelativePort(0)); + // If disabled config proxy should act as file distributor, so use config proxy port + int port = (fileDistributionOptions.disabled()) ? 19090 : getRelativePort(0); + builder.connectionspec("tcp/" + getHostName() + ":" + port); } @Override diff --git a/config-model/src/main/resources/schema/admin.rnc b/config-model/src/main/resources/schema/admin.rnc index cdf26807bcb..a88109663eb 100644 --- a/config-model/src/main/resources/schema/admin.rnc +++ b/config-model/src/main/resources/schema/admin.rnc @@ -83,7 +83,8 @@ LogServer = element logserver { FileDistribution = element filedistribution { element uploadbitrate { xsd:string { pattern = "\d+(\.\d*)?\s*[kmgKMG]?" } }? & - element downloadbitrate { xsd:string { pattern = "\d+(\.\d*)?\s*[kmgKMG]?" } }? + element downloadbitrate { xsd:string { pattern = "\d+(\.\d*)?\s*[kmgKMG]?" } }? & + element disabled { xsd:boolean }? # Nov. 2017: Temporary, should not be documented } Metrics = element metrics { diff --git a/config-model/src/test/java/com/yahoo/vespa/model/admin/AdminTestCase.java b/config-model/src/test/java/com/yahoo/vespa/model/admin/AdminTestCase.java index b5375ccbce4..946624a1cdb 100644 --- a/config-model/src/test/java/com/yahoo/vespa/model/admin/AdminTestCase.java +++ b/config-model/src/test/java/com/yahoo/vespa/model/admin/AdminTestCase.java @@ -275,4 +275,36 @@ public class AdminTestCase { // 1 logforwarder on each host assertTrue(configIds.toString(), configIds.contains("admin/logforwarder.0")); } + + @Test + public void disableFiledistributorService() throws Exception { + String hosts = "<hosts>" + + " <host name=\"localhost\">" + + " <alias>node0</alias>" + + " </host>" + + "</hosts>"; + + String services = "<services>" + + " <admin version='2.0'>" + + " <adminserver hostalias='node0' />" + + " <filedistribution>" + + " <disabled>true</disabled>" + + " </filedistribution>" + + " </admin>" + + "</services>"; + + VespaModel vespaModel = new VespaModelCreatorWithMockPkg(hosts, services).create(); + String localhost = HostName.getLocalhost(); + String localhostConfigId = "hosts/" + localhost; + + // Verify services in the sentinel config + SentinelConfig.Builder b = new SentinelConfig.Builder(); + vespaModel.getConfig(b, localhostConfigId); + SentinelConfig sentinelConfig = new SentinelConfig(b); + assertThat(sentinelConfig.service().size(), is(3)); + assertThat(sentinelConfig.service(0).name(), is("logserver")); + assertThat(sentinelConfig.service(1).name(), is("slobrok")); + assertThat(sentinelConfig.service(2).name(), is("logd")); + // No filedistributor service + } } diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java index ae0360fecf2..fe5976e9c1f 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServer.java @@ -1,7 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config.proxy; -import com.yahoo.config.FileReference; import com.yahoo.jrt.*; import com.yahoo.log.LogLevel; import com.yahoo.vespa.config.*; @@ -10,16 +9,10 @@ import com.yahoo.vespa.config.protocol.JRTConfigRequestFactory; import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; import com.yahoo.vespa.config.protocol.JRTServerConfigRequestV3; -import java.io.File; import java.util.Arrays; import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.logging.Logger; -import java.util.stream.Collectors; -import java.util.stream.Stream; /** @@ -34,14 +27,14 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer private static final int TRACELEVEL = 6; private final Spec spec; - private final Supervisor supervisor = new Supervisor(new Transport()); + private final Supervisor supervisor; private final ProxyServer proxyServer; - ConfigProxyRpcServer(ProxyServer proxyServer, Spec spec) { + ConfigProxyRpcServer(ProxyServer proxyServer, Supervisor supervisor, Spec spec) { this.proxyServer = proxyServer; this.spec = spec; + this.supervisor = supervisor; declareConfigMethods(); - declareFileDistributionMethods(); } public void run() { @@ -109,40 +102,6 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer .returnDesc(0, "ret", "Empty string or error message")); } - private void declareFileDistributionMethods() { - // Legacy method, needs to be the same name as used in filedistributor - supervisor.addMethod(new Method("waitFor", "s", "s", - this, "getFile") - .methodDesc("get path to file reference") - .paramDesc(0, "file reference", "file reference") - .returnDesc(0, "path", "path to file")); - supervisor.addMethod(new Method("filedistribution.getFile", "s", "s", - this, "getFile") - .methodDesc("get path to file reference") - .paramDesc(0, "file reference", "file reference") - .returnDesc(0, "path", "path to file")); - supervisor.addMethod(new Method("filedistribution.getActiveFileReferencesStatus", "", "SD", - this, "getActiveFileReferencesStatus") - .methodDesc("download status for file references") - .returnDesc(0, "file references", "array of file references") - .returnDesc(1, "download status", "percentage downloaded of each file reference in above array")); - supervisor.addMethod(new Method("filedistribution.setFileReferencesToDownload", "S", "i", - this, "setFileReferencesToDownload") - .methodDesc("set which file references to download") - .paramDesc(0, "file references", "file reference to download") - .returnDesc(0, "ret", "0 if success, 1 otherwise")); - supervisor.addMethod(new Method("filedistribution.receiveFile", "ssxlis", "i", // TODO Temporary method to get started with testing - this, "receiveFile") - .methodDesc("receive file reference content") - .paramDesc(0, "file references", "file reference to download") - .paramDesc(1, "filename", "filename") - .paramDesc(2, "content", "array of bytes") - .paramDesc(3, "hash", "xx64hash of the file content") - .paramDesc(4, "errorcode", "Error code. 0 if none") - .paramDesc(5, "error-description", "Error description.") - .returnDesc(0, "ret", "0 if success, 1 otherwise")); - } - //---------------- RPC methods ------------------------------------ /** @@ -249,75 +208,6 @@ public class ConfigProxyRpcServer implements Runnable, TargetWatcher, RpcServer req.returnValues().add(new StringValue(memoryCache.dumpCacheToDisk(req.parameters().get(0).asString(), memoryCache))); } - // TODO: Duplicate of code in FileAcquirereImpl. Find out where to put it. What about C++ code using this RPC call? - private static final int baseErrorCode = 0x10000; - private static final int baseFileProviderErrorCode = baseErrorCode + 0x1000; - - private static final int fileReferenceDoesNotExists = baseFileProviderErrorCode; - private static final int fileReferenceRemoved = fileReferenceDoesNotExists + 1; - private static final int fileReferenceInternalError = fileReferenceRemoved + 1; - - @SuppressWarnings({"UnusedDeclaration"}) - public final void getFile(Request req) { - req.detach(); - FileReference fileReference = new FileReference(req.parameters().get(0).asString()); - log.log(LogLevel.DEBUG, "getFile() called for file reference '" + fileReference.value() + "'"); - Optional<File> pathToFile = proxyServer.fileDownloader().getFile(fileReference); - try { - if (pathToFile.isPresent()) { - req.returnValues().add(new StringValue(pathToFile.get().getAbsolutePath())); - log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' available at " + pathToFile.get()); - } else { - log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found, returning error"); - req.setError(fileReferenceDoesNotExists, "File reference '" + fileReference.value() + "' not found"); - } - } catch (Throwable e) { - log.log(LogLevel.WARNING, "File reference '" + fileReference.value() + "' got exeption: " + e.getMessage()); - req.setError(fileReferenceInternalError, "File reference '" + fileReference.value() + "' removed"); - } - req.returnRequest(); - } - - @SuppressWarnings({"UnusedDeclaration"}) - public final void getActiveFileReferencesStatus(Request req) { - Map<FileReference, Double> downloadStatus = proxyServer.fileDownloader().downloadStatus(); - - String[] fileRefArray = new String[downloadStatus.keySet().size()]; - fileRefArray = downloadStatus.keySet().stream() - .map(FileReference::value) - .collect(Collectors.toList()) - .toArray(fileRefArray); - - double[] downloadStatusArray = new double[downloadStatus.values().size()]; - int i = 0; - for (Double d : downloadStatus.values()) { - downloadStatusArray[i++] = d; - } - - req.returnValues().add(new StringArray(fileRefArray)); - req.returnValues().add(new DoubleArray(downloadStatusArray)); - } - - @SuppressWarnings({"UnusedDeclaration"}) - public final void setFileReferencesToDownload(Request req) { - String[] fileReferenceStrings = req.parameters().get(0).asStringArray(); - List<FileReference> fileReferences = Stream.of(fileReferenceStrings) - .map(FileReference::new) - .collect(Collectors.toList()); - proxyServer.fileDownloader().queueForDownload(fileReferences); - - req.returnValues().add(new Int32Value(0)); - } - - @SuppressWarnings({"UnusedDeclaration"}) - public final void receiveFile(Request req) { - FileReference fileReference = new FileReference(req.parameters().get(0).asString()); - String filename = req.parameters().get(1).asString(); - byte[] content = req.parameters().get(2).asData(); - proxyServer.fileDownloader().receiveFile(fileReference, filename, content); - req.returnValues().add(new Int32Value(0)); - } - //---------------------------------------------------- private boolean isProtocolVersionSupported(JRTServerConfigRequest request) { diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java index 5668852311f..173d2b8a43a 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/ProxyServer.java @@ -5,12 +5,15 @@ import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.config.subscription.ConfigSourceSet; import com.yahoo.jrt.Spec; +import com.yahoo.jrt.Supervisor; +import com.yahoo.jrt.Transport; import com.yahoo.log.LogLevel; import com.yahoo.log.LogSetup; import com.yahoo.log.event.Event; import com.yahoo.system.CatchSigTerm; import com.yahoo.vespa.config.*; import com.yahoo.vespa.config.protocol.JRTServerConfigRequest; +import com.yahoo.vespa.config.proxy.filedistribution.FileDistributionRpcServer; import com.yahoo.vespa.config.proxy.filedistribution.FileDownloader; import java.util.List; @@ -40,6 +43,7 @@ public class ProxyServer implements Runnable { // Scheduled executor that periodically checks for requests that have timed out and response should be returned to clients private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, new DaemonThreadFactory()); + private final Supervisor supervisor = new Supervisor(new Transport()); private final ClientUpdater clientUpdater; private ScheduledFuture<?> delayedResponseScheduler; @@ -84,6 +88,7 @@ public class ProxyServer implements Runnable { clientUpdater = new ClientUpdater(rpcServer, statistics, delayedResponses); this.configClient = createClient(clientUpdater, delayedResponses, source, timingValues, memoryCache, configClient); this.fileDownloader = new FileDownloader(new JRTConnectionPool(source)); + new FileDistributionRpcServer(supervisor, fileDownloader); } static ProxyServer createTestServer(ConfigSourceSet source) { @@ -162,7 +167,7 @@ public class ProxyServer implements Runnable { } private ConfigProxyRpcServer createRpcServer(Spec spec) { - return (spec == null) ? null : new ConfigProxyRpcServer(this, spec); // TODO: Try to avoid first argument being 'this' + return (spec == null) ? null : new ConfigProxyRpcServer(this, supervisor, spec); // TODO: Try to avoid first argument being 'this' } private RpcConfigSourceClient createRpcClient() { diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java new file mode 100644 index 00000000000..46b1ffc721e --- /dev/null +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileDistributionRpcServer.java @@ -0,0 +1,155 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.proxy.filedistribution; + +import com.yahoo.config.FileReference; +import com.yahoo.jrt.DoubleArray; +import com.yahoo.jrt.Int32Value; +import com.yahoo.jrt.Method; +import com.yahoo.jrt.Request; +import com.yahoo.jrt.StringArray; +import com.yahoo.jrt.StringValue; +import com.yahoo.jrt.Supervisor; +import com.yahoo.log.LogLevel; + +import java.io.File; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.logging.Logger; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * An RPC server that handles file distribution requests. + * + * @author hmusum + */ +public class FileDistributionRpcServer { + + private final static Logger log = Logger.getLogger(FileDistributionRpcServer.class.getName()); + + private final Supervisor supervisor; + private final FileDownloader downloader; + + public FileDistributionRpcServer(Supervisor supervisor, FileDownloader downloader) { + this.supervisor = supervisor; + this.downloader = downloader; + declareFileDistributionMethods(); + } + + private void declareFileDistributionMethods() { + // Legacy method, needs to be the same name as used in filedistributor + supervisor.addMethod(new Method("waitFor", "s", "s", + this, "getFile") + .methodDesc("get path to file reference") + .paramDesc(0, "file reference", "file reference") + .returnDesc(0, "path", "path to file")); + supervisor.addMethod(new Method("filedistribution.getFile", "s", "s", + this, "getFile") + .methodDesc("get path to file reference") + .paramDesc(0, "file reference", "file reference") + .returnDesc(0, "path", "path to file")); + supervisor.addMethod(new Method("filedistribution.getActiveFileReferencesStatus", "", "SD", + this, "getActiveFileReferencesStatus") + .methodDesc("download status for file references") + .returnDesc(0, "file references", "array of file references") + .returnDesc(1, "download status", "percentage downloaded of each file reference in above array")); + supervisor.addMethod(new Method("filedistribution.setFileReferencesToDownload", "S", "i", + this, "setFileReferencesToDownload") + .methodDesc("set which file references to download") + .paramDesc(0, "file references", "file reference to download") + .returnDesc(0, "ret", "0 if success, 1 otherwise")); + supervisor.addMethod(new Method("filedistribution.receiveFile", "ssxlis", "i", // TODO Temporary method to get started with testing + this, "receiveFile") + .methodDesc("receive file reference content") + .paramDesc(0, "file references", "file reference to download") + .paramDesc(1, "filename", "filename") + .paramDesc(2, "content", "array of bytes") + .paramDesc(3, "hash", "xx64hash of the file content") + .paramDesc(4, "errorcode", "Error code. 0 if none") + .paramDesc(5, "error-description", "Error description.") + .returnDesc(0, "ret", "0 if success, 1 otherwise")); + } + + //---------------- RPC methods ------------------------------------ + // TODO: Duplicate of code in FileAcquirereImpl. Find out where to put it. What about C++ code using this RPC call? + private static final int baseErrorCode = 0x10000; + private static final int baseFileProviderErrorCode = baseErrorCode + 0x1000; + + private static final int fileReferenceDoesNotExists = baseFileProviderErrorCode; + private static final int fileReferenceRemoved = fileReferenceDoesNotExists + 1; + private static final int fileReferenceInternalError = fileReferenceRemoved + 1; + + @SuppressWarnings({"UnusedDeclaration"}) + public final void getFile(Request req) { + req.detach(); + FileReference fileReference = new FileReference(req.parameters().get(0).asString()); + log.log(LogLevel.DEBUG, "getFile() called for file reference '" + fileReference.value() + "'"); + Optional<File> pathToFile = downloader.getFile(fileReference); + try { + if (pathToFile.isPresent()) { + req.returnValues().add(new StringValue(pathToFile.get().getAbsolutePath())); + log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' available at " + pathToFile.get()); + } else { + log.log(LogLevel.INFO, "File reference '" + fileReference.value() + "' not found, returning error"); + req.setError(fileReferenceDoesNotExists, "File reference '" + fileReference.value() + "' not found"); + } + } catch (Throwable e) { + log.log(LogLevel.WARNING, "File reference '" + fileReference.value() + "' got exeption: " + e.getMessage()); + req.setError(fileReferenceInternalError, "File reference '" + fileReference.value() + "' removed"); + } + req.returnRequest(); + } + + @SuppressWarnings({"UnusedDeclaration"}) + public final void getActiveFileReferencesStatus(Request req) { + Map<FileReference, Double> downloadStatus = downloader.downloadStatus(); + + String[] fileRefArray = new String[downloadStatus.keySet().size()]; + fileRefArray = downloadStatus.keySet().stream() + .map(FileReference::value) + .collect(Collectors.toList()) + .toArray(fileRefArray); + + double[] downloadStatusArray = new double[downloadStatus.values().size()]; + int i = 0; + for (Double d : downloadStatus.values()) { + downloadStatusArray[i++] = d; + } + + req.returnValues().add(new StringArray(fileRefArray)); + req.returnValues().add(new DoubleArray(downloadStatusArray)); + } + + @SuppressWarnings({"UnusedDeclaration"}) + public final void setFileReferencesToDownload(Request req) { + String[] fileReferenceStrings = req.parameters().get(0).asStringArray(); + List<FileReference> fileReferences = Stream.of(fileReferenceStrings) + .map(FileReference::new) + .collect(Collectors.toList()); + downloader.queueForDownload(fileReferences); + + req.returnValues().add(new Int32Value(0)); + } + + @SuppressWarnings({"UnusedDeclaration"}) + public final void receiveFile(Request req) { + FileReference fileReference = new FileReference(req.parameters().get(0).asString()); + String filename = req.parameters().get(1).asString(); + byte[] content = req.parameters().get(2).asData(); + long xxhash = req.parameters().get(3).asInt64(); + int errorCode = req.parameters().get(4).asInt32(); + String errorDescription = req.parameters().get(5).asString(); + + if (errorCode == 0) { + // TODO: Remove when system test works + log.log(LogLevel.INFO, "Receiving file reference '" + fileReference.value() + "'"); + downloader.receiveFile(fileReference, filename, content); + req.returnValues().add(new Int32Value(0)); + } else { + log.log(LogLevel.WARNING, "Receiving file reference '" + fileReference.value() + "' failed: " + errorDescription); + req.returnValues().add(new Int32Value(1)); + // TODO: Add error description return value here too? + } + } +} diff --git a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java index 917374740f1..4b32ffab2b7 100644 --- a/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java +++ b/config-proxy/src/main/java/com/yahoo/vespa/config/proxy/filedistribution/FileReferenceDownloader.java @@ -118,7 +118,8 @@ class FileReferenceDownloader { } private synchronized void completedDownloading(FileReference fileReference, File file) { - downloads.get(fileReference).future().set(Optional.of(file)); + if (downloads.containsKey(fileReference)) + downloads.get(fileReference).future().set(Optional.of(file)); downloadStatus.put(fileReference, 100.0); } @@ -162,7 +163,7 @@ class FileReferenceDownloader { return false; } else if (request.returnValues().size() == 0) { return false; - } else if (!request.checkReturnTypes("i")) { + } else if (!request.checkReturnTypes("is")) { // TODO: Do not hard-code return type log.log(LogLevel.WARNING, "Invalid return types for response: " + request.errorMessage()); return false; } @@ -180,4 +181,5 @@ class FileReferenceDownloader { Map<FileReference, Double> downloadStatus() { return ImmutableMap.copyOf(downloadStatus); } + } diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java index f9b334a6f87..4a9d2acb4c5 100644 --- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java +++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/ConfigProxyRpcServerTest.java @@ -5,6 +5,8 @@ import com.yahoo.config.subscription.ConfigSourceSet; import com.yahoo.jrt.Request; import com.yahoo.jrt.Spec; import com.yahoo.jrt.StringValue; +import com.yahoo.jrt.Supervisor; +import com.yahoo.jrt.Transport; import com.yahoo.vespa.config.RawConfig; import org.junit.After; import org.junit.Before; @@ -28,7 +30,7 @@ public class ConfigProxyRpcServerTest { @Before public void setup() { proxyServer = ProxyServer.createTestServer(new ConfigSourceSet(address)); - rpcServer = new ConfigProxyRpcServer(proxyServer, null); + rpcServer = new ConfigProxyRpcServer(proxyServer, new Supervisor(new Transport()), null); } @After @@ -40,7 +42,7 @@ public class ConfigProxyRpcServerTest { public void basic() { ProxyServer proxy = ProxyServer.createTestServer(new MockConfigSource(new MockClientUpdater())); Spec spec = new Spec("localhost", 12345); - ConfigProxyRpcServer server = new ConfigProxyRpcServer(proxy, spec); + ConfigProxyRpcServer server = new ConfigProxyRpcServer(proxy, new Supervisor(new Transport()), spec); assertThat(server.getSpec(), is(spec)); } diff --git a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java index 18d49e9a224..cad3d2d0330 100644 --- a/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java +++ b/config-proxy/src/test/java/com/yahoo/vespa/config/proxy/filedistribution/FileDownloaderTest.java @@ -5,6 +5,9 @@ import com.yahoo.io.IOUtils; import com.yahoo.jrt.Int32Value; import com.yahoo.jrt.Request; import com.yahoo.jrt.RequestWaiter; +import com.yahoo.jrt.StringValue; +import com.yahoo.jrt.Supervisor; +import com.yahoo.jrt.Transport; import com.yahoo.text.Utf8; import com.yahoo.vespa.config.Connection; import com.yahoo.vespa.config.ConnectionPool; @@ -18,9 +21,7 @@ import java.time.Duration; import java.util.Arrays; import java.util.LinkedHashSet; import java.util.List; -import java.util.Map; import java.util.Optional; -import java.util.Set; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -38,6 +39,7 @@ public class FileDownloaderTest { File downloadDir = Files.createTempDirectory("filedistribution").toFile(); connection = new MockConnection(); fileDownloader = new FileDownloader(connection, downloadDir, Duration.ofMillis(3000)); + FileDistributionRpcServer rpcServer = new FileDistributionRpcServer(new Supervisor(new Transport()), fileDownloader); } catch (IOException e) { e.printStackTrace(); fail(e.getMessage()); @@ -53,8 +55,8 @@ public class FileDownloaderTest { String fileReferenceString = "foo"; String filename = "foo.jar"; - File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReferenceString); FileReference fileReference = new FileReference(fileReferenceString); + File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReference); writeFileReference(downloadDir, fileReferenceString, filename); // Check that we get correct path and content when asking for file reference @@ -74,7 +76,7 @@ public class FileDownloaderTest { connection.setResponseHandler(new MockConnection.UnknownFileReferenceResponseHandler()); FileReference fileReference = new FileReference("bar"); - File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReference.value()); + File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReference); assertFalse(fileReferenceFullPath.getAbsolutePath(), fileDownloader.getFile(fileReference).isPresent()); // Verify download status when unable to download @@ -85,7 +87,7 @@ public class FileDownloaderTest { // fileReference does not exist on disk, needs to be downloaded) FileReference fileReference = new FileReference("fileReference"); - File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReference.value()); + File fileReferenceFullPath = fileReferenceFullPath(downloadDir, fileReference); assertFalse(fileReferenceFullPath.getAbsolutePath(), fileDownloader.getFile(fileReference).isPresent()); // Verify download status @@ -123,13 +125,24 @@ public class FileDownloaderTest { assertDownloadStatus(fileDownloader, bar, 0.0); } + @Test + public void receiveFile() throws IOException { + File downloadDir = Files.createTempDirectory("filedistribution").toFile(); + FileDownloader fileDownloader = new FileDownloader(null, downloadDir, Duration.ofMillis(200)); + FileReference foo = new FileReference("foo"); + String filename = "foo.jar"; + fileDownloader.receiveFile(foo, filename, Utf8.toBytes("content")); + File downloadedFile = new File(fileReferenceFullPath(downloadDir, foo), filename); + assertEquals("content", IOUtils.readFile(downloadedFile)); + } + private void writeFileReference(File dir, String fileReferenceString, String fileName) throws IOException { File file = new File(new File(dir, fileReferenceString), fileName); IOUtils.writeFile(file, "content", false); } - private File fileReferenceFullPath(File dir, String fileReferenceString) { - return new File(dir, fileReferenceString); + private File fileReferenceFullPath(File dir, FileReference fileReference) { + return new File(dir, fileReference.value()); } private void assertDownloadStatus(FileDownloader fileDownloader, FileReference fileReference, double expectedDownloadStatus) { @@ -204,8 +217,10 @@ public class FileDownloaderTest { @Override public void request(Request request) { - if (request.methodName().equals("filedistribution.serveFile")) + if (request.methodName().equals("filedistribution.serveFile")) { request.returnValues().add(new Int32Value(0)); + request.returnValues().add(new StringValue("OK")); + } } } @@ -213,8 +228,10 @@ public class FileDownloaderTest { @Override public void request(Request request) { - if (request.methodName().equals("filedistribution.serveFile")) + if (request.methodName().equals("filedistribution.serveFile")) { request.returnValues().add(new Int32Value(1)); + request.returnValues().add(new StringValue("Internal error")); + } } } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/AddFileInterface.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/AddFileInterface.java new file mode 100644 index 00000000000..61c376a7256 --- /dev/null +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/AddFileInterface.java @@ -0,0 +1,9 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.server.filedistribution; + +import com.yahoo.config.FileReference; + +public interface AddFileInterface { + FileReference addFile(String relativePath); + FileReference addFile(String relativePath, FileReference reference); +} diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/ApplicationFileManager.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/ApplicationFileManager.java new file mode 100644 index 00000000000..79c541d7b1a --- /dev/null +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/ApplicationFileManager.java @@ -0,0 +1,27 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.server.filedistribution; + +import com.yahoo.config.FileReference; +import java.io.File; + +public class ApplicationFileManager implements AddFileInterface { + private final File applicationDir; + private final FileDirectory master; + + ApplicationFileManager(File applicationDir, FileDirectory master) { + this.applicationDir = applicationDir; + this.master = master; + } + + @Override + public FileReference addFile(String relativePath, FileReference reference) { + // TODO Wire in when verified in system test + // return master.addFile(new File(applicationDir, relativePath), reference); + return reference; + } + + @Override + public FileReference addFile(String relativePath) { + return master.addFile(new File(applicationDir, relativePath)); + } +} diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/CombinedLegacyDistribution.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/CombinedLegacyDistribution.java new file mode 100644 index 00000000000..588f2d1d63f --- /dev/null +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/CombinedLegacyDistribution.java @@ -0,0 +1,30 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.server.filedistribution; + +import com.yahoo.config.FileReference; +import com.yahoo.config.model.api.FileDistribution; + +import java.util.Collection; +import java.util.Set; + +public class CombinedLegacyDistribution implements FileDistribution { + private final FileDistribution legacy; + + CombinedLegacyDistribution(FileDBHandler legacy) { + this.legacy = legacy; + } + @Override + public void sendDeployedFiles(String hostName, Set<FileReference> fileReferences) { + legacy.sendDeployedFiles(hostName, fileReferences); + } + + @Override + public void reloadDeployFileDistributor() { + legacy.reloadDeployFileDistributor(); + } + + @Override + public void removeDeploymentsThatHaveDifferentApplicationId(Collection<String> targetHostnames) { + legacy.removeDeploymentsThatHaveDifferentApplicationId(targetHostnames); + } +} diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/CombinedLegacyRegistry.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/CombinedLegacyRegistry.java new file mode 100644 index 00000000000..8f2cb194bbd --- /dev/null +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/CombinedLegacyRegistry.java @@ -0,0 +1,32 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.server.filedistribution; + +import com.yahoo.config.FileReference; +import com.yahoo.config.application.api.FileRegistry; + +import java.util.List; + +public class CombinedLegacyRegistry implements FileRegistry { + private final FileDBRegistry legacy; + private final FileDBRegistry future; + + CombinedLegacyRegistry(FileDBRegistry legacy, FileDBRegistry future) { + this.legacy = legacy; + this.future = future; + } + @Override + public FileReference addFile(String relativePath) { + FileReference reference = legacy.addFile(relativePath); + return future.addFile(relativePath, reference); + } + + @Override + public String fileSourceHost() { + return future.fileSourceHost(); + } + + @Override + public List<Entry> export() { + return future.export(); + } +} diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDBRegistry.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDBRegistry.java index d921d8d4f8d..1a76454fbed 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDBRegistry.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDBRegistry.java @@ -4,29 +4,41 @@ package com.yahoo.vespa.config.server.filedistribution; import com.yahoo.config.FileReference; import com.yahoo.config.application.api.FileRegistry; import com.yahoo.net.HostName; -import com.yahoo.vespa.filedistribution.FileDistributionManager; -import com.yahoo.config.model.application.provider.FileReferenceCreator; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; /** * @author tonytv */ public class FileDBRegistry implements FileRegistry { - private final FileDistributionManager manager; + private final AddFileInterface manager; private List<Entry> entries = new ArrayList<>(); private final Map<String, FileReference> fileReferenceCache = new HashMap<>(); - public FileDBRegistry(FileDistributionManager manager) { + public FileDBRegistry(AddFileInterface manager) { this.manager = manager; } + public synchronized FileReference addFile(String relativePath, FileReference reference) { + Optional<FileReference> cachedReference = Optional.ofNullable(fileReferenceCache.get(relativePath)); + return cachedReference.orElseGet(() -> { + FileReference newRef = manager.addFile(relativePath, reference); + entries.add(new Entry(relativePath, newRef)); + fileReferenceCache.put(relativePath, newRef); + return newRef; + }); + } + @Override public synchronized FileReference addFile(String relativePath) { Optional<FileReference> cachedReference = Optional.ofNullable(fileReferenceCache.get(relativePath)); return cachedReference.orElseGet(() -> { - FileReference newRef = FileReferenceCreator.create(manager.addFile(relativePath)); + FileReference newRef = manager.addFile(relativePath); entries.add(new Entry(relativePath, newRef)); fileReferenceCache.put(relativePath, newRef); return newRef; diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDirectory.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDirectory.java new file mode 100644 index 00000000000..b0042b15470 --- /dev/null +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDirectory.java @@ -0,0 +1,118 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +package com.yahoo.vespa.config.server.filedistribution; + +import com.yahoo.config.FileReference; +import com.yahoo.config.model.api.FileDistribution; +import com.yahoo.io.IOUtils; +import com.yahoo.log.LogLevel; +import com.yahoo.text.Utf8; +import net.jpountz.xxhash.XXHash64; +import net.jpountz.xxhash.XXHashFactory; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.logging.Logger; + +public class FileDirectory { + private static final Logger log = Logger.getLogger(FileDirectory.class.getName()); + private final File root; + + public FileDirectory() { + this(FileDistribution.getDefaultFileDBPath()); + } + + public FileDirectory(File rootDir) { + root = rootDir; + try { + ensureRootExist(); + } catch (IllegalArgumentException e) { + log.warning("Failed creating directory in constructor, will retry on demand : " + e.toString()); + } + } + + private void ensureRootExist() { + if (! root.exists()) { + if ( ! root.mkdir()) { + throw new IllegalArgumentException("Failed creating root dir '" + root.getAbsolutePath() + "'."); + } + } else if (!root.isDirectory()) { + throw new IllegalArgumentException("'" + root.getAbsolutePath() + "' is not a directory"); + } + } + + static private class Filter implements FilenameFilter { + @Override + public boolean accept(File dir, String name) { + return !".".equals(name) && !"..".equals(name) ; + } + } + + String getPath(FileReference ref) { + return root.getAbsolutePath() + "/" + ref.value(); + } + + File getFile(FileReference reference) { + ensureRootExist(); + File dir = new File(getPath(reference)); + if (!dir.exists()) { + throw new IllegalArgumentException("File reference '" + reference.toString() + "' with absolute path '" + dir.getAbsolutePath() + "' does not exist."); + } + if (!dir.isDirectory()) { + throw new IllegalArgumentException("File reference '" + reference.toString() + "' with absolute path '" + dir.getAbsolutePath() + "' is not a directory."); + } + File [] files = dir.listFiles(new Filter()); + if (files.length != 1) { + StringBuilder msg = new StringBuilder(); + for (File f: files) { + msg.append(f.getName()).append("\n"); + } + throw new IllegalArgumentException("File reference '" + reference.toString() + "' with absolute path '" + dir.getAbsolutePath() + " does not contain exactly one file, but [" + msg.toString() + "]"); + } + return files[0]; + } + + private Long computeReference(File file) throws IOException { + byte [] wholeFile = IOUtils.readFileBytes(file); + XXHash64 hasher = XXHashFactory.fastestInstance().hash64(); + return hasher.hash(ByteBuffer.wrap(wholeFile), hasher.hash(ByteBuffer.wrap(Utf8.toBytes(file.getName())), 0)); + } + + public FileReference addFile(File source) { + try { + Long hash = computeReference(source); + FileReference reference = new FileReference(Long.toHexString(hash)); + return addFile(source, reference); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + } + + public FileReference addFile(File source, FileReference reference) { + ensureRootExist(); + try { + File destinationDir = new File(root, reference.value()); + if (!destinationDir.exists()) { + destinationDir.mkdir(); + Path tempDestinationDir = Files.createTempDirectory(root.toPath(), "writing"); + File destination = new File(tempDestinationDir.toFile(), source.getName()); + IOUtils.copy(source, destination); + if (!destinationDir.exists()) { + if ( ! tempDestinationDir.toFile().renameTo(destinationDir)) { + log.warning("Failed moving '" + tempDestinationDir.toFile().getAbsolutePath() + "' to '" + destination.getAbsolutePath() + "'."); + } + } else { + IOUtils.copyDirectory(tempDestinationDir.toFile(), destinationDir, 1); + } + IOUtils.recursiveDeleteDir(tempDestinationDir.toFile()); + } + return reference; + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + } +} diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionProvider.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionProvider.java index 36b0138ad36..59c3a54897d 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionProvider.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionProvider.java @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config.server.filedistribution; +import com.yahoo.config.FileReference; import com.yahoo.config.model.api.FileDistribution; import com.yahoo.config.application.api.FileRegistry; import com.yahoo.vespa.filedistribution.FileDistributionManager; @@ -19,13 +20,32 @@ public class FileDistributionProvider { private final FileRegistry fileRegistry; private final FileDistribution fileDistribution; - public FileDistributionProvider(File applicationDir, String zooKeepersSpec, String applicationId, Lock fileDistributionLock) { + static private class ManagerWrapper implements AddFileInterface { + private final FileDistributionManager manager; + ManagerWrapper(FileDistributionManager manager) { + this.manager = manager; + } + @Override + public FileReference addFile(String relativePath) { + return new FileReference(manager.addFile(relativePath)); + } + + @Override + public FileReference addFile(String relativePath, FileReference reference) { + throw new IllegalStateException("addFile with external reference is not possible with legacy filedistribution."); + } + } + + public FileDistributionProvider(File applicationDir, String zooKeepersSpec, + String applicationId, Lock fileDistributionLock) + { ensureDirExists(FileDistribution.getDefaultFileDBPath()); final FileDistributionManager manager = new FileDistributionManager( FileDistribution.getDefaultFileDBPath(), applicationDir, zooKeepersSpec, applicationId, fileDistributionLock); - this.fileDistribution = new FileDBHandler(manager); - this.fileRegistry = new FileDBRegistry(manager); + this.fileDistribution = new CombinedLegacyDistribution(new FileDBHandler(manager)); + this.fileRegistry = new CombinedLegacyRegistry(new FileDBRegistry(new ManagerWrapper(manager)), + new FileDBRegistry(new ApplicationFileManager(applicationDir, new FileDirectory()))); } public FileDistributionProvider(FileRegistry fileRegistry, FileDistribution fileDistribution) { diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java index 1c77ee66d0c..a504cd120ee 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java @@ -1,3 +1,4 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config.server.filedistribution; import com.google.inject.Inject; @@ -6,7 +7,6 @@ import com.yahoo.config.model.api.FileDistribution; import com.yahoo.io.IOUtils; import java.io.File; -import java.io.FilenameFilter; import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -14,7 +14,7 @@ import java.util.logging.Logger; public class FileServer { private static final Logger log = Logger.getLogger(FileServer.class.getName()); - private final String rootDir; + private final FileDirectory root; private final ExecutorService executor; public static class ReplayStatus { @@ -33,46 +33,17 @@ public class FileServer { void receive(FileReference reference, String filename, byte [] content, ReplayStatus status); } - private String getPath(FileReference ref) { - return rootDir + "/" + ref.value(); - } - - static private class Filter implements FilenameFilter { - @Override - public boolean accept(File dir, String name) { - return !".".equals(name) && !"..".equals(name) ; - } - } - private File getFile(FileReference reference) { - File dir = new File(getPath(reference)); - if (!dir.exists()) { - throw new IllegalArgumentException("File reference '" + reference.toString() + "' with absolute path '" + dir.getAbsolutePath() + "' does not exist."); - } - if (!dir.isDirectory()) { - throw new IllegalArgumentException("File reference '" + reference.toString() + "' with absolute path '" + dir.getAbsolutePath() + "' is not a directory."); - } - File [] files = dir.listFiles(new Filter()); - if (files.length != 1) { - StringBuilder msg = new StringBuilder(); - for (File f: files) { - msg.append(f.getName()).append("\n"); - } - throw new IllegalArgumentException("File reference '" + reference.toString() + "' with absolute path '" + dir.getAbsolutePath() + " does not contain exactly one file, but [" + msg.toString() + "]"); - } - return files[0]; - } - @Inject public FileServer() { - this(FileDistribution.getDefaultFileDBRoot()); + this(FileDistribution.getDefaultFileDBPath()); } - public FileServer(String rootDir) { + public FileServer(File rootDir) { this(rootDir, Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); } - public FileServer(String rootDir, ExecutorService executor) { - this.rootDir = rootDir; + public FileServer(File rootDir, ExecutorService executor) { + this.root = new FileDirectory(rootDir); this.executor = executor; } public boolean hasFile(String fileName) { @@ -80,7 +51,7 @@ public class FileServer { } public boolean hasFile(FileReference reference) { try { - return getFile(reference).exists(); + return root.getFile(reference).exists(); } catch (IllegalArgumentException e) { log.warning("Failed locating file reference '" + reference + "' with error " + e.toString()); } @@ -88,7 +59,7 @@ public class FileServer { } public boolean startFileServing(String fileName, Receiver target) { FileReference reference = new FileReference(fileName); - File file = getFile(reference); + File file = root.getFile(reference); if (file.exists()) { executor.execute(() -> serveFile(reference, target)); @@ -97,8 +68,9 @@ public class FileServer { } private void serveFile(FileReference reference, Receiver target) { - - File file = getFile(reference); + File file = root.getFile(reference); + // TODO remove once verified in system tests. + log.info("Start serving reference '" + reference.value() + "' with file '" + file.getAbsolutePath() + "'"); byte [] blob = new byte [0]; boolean success = false; String errorDescription = "OK"; @@ -111,5 +83,7 @@ public class FileServer { } target.receive(reference, file.getName(), blob, new ReplayStatus(success ? 0 : 1, success ? "OK" : errorDescription)); + // TODO remove once verified in system tests. + log.info("Done serving reference '" + reference.toString() + "' with file '" + file.getAbsolutePath() + "'"); } } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/http/Utils.java b/configserver/src/main/java/com/yahoo/vespa/config/server/http/Utils.java index 873a24b5f05..e5bf8e22020 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/http/Utils.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/http/Utils.java @@ -31,9 +31,10 @@ public class Utils { com.yahoo.jdisc.http.HttpRequest jDiscRequest = req.getJDiscRequest(); BindingMatch<?> bm = jDiscRequest.getBindingMatch(); if (bm == null) { + UriPattern pattern = new UriPattern(uriPattern); bm = new BindingMatch<>( - new UriPattern(uriPattern).match(URI.create(jDiscRequest.getUri().toString())), - new Object()); + pattern.match(URI.create(jDiscRequest.getUri().toString())), + new Object(), pattern); } return bm; } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/HttpConfigRequests.java b/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/HttpConfigRequests.java index db92f53aacd..59270afd397 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/HttpConfigRequests.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/HttpConfigRequests.java @@ -39,7 +39,7 @@ public class HttpConfigRequests { UriPattern fullAppIdPattern = new UriPattern(pattern); URI uri = req.getUri(); Match match = fullAppIdPattern.match(uri); - if (match!=null) return new BindingMatch<>(match, new Object()); + if (match!=null) return new BindingMatch<>(match, new Object(), fullAppIdPattern); } throw new IllegalArgumentException("Illegal url for config request: " + req.getUri()); } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java index 662da63d198..7c5adb3b932 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java @@ -450,6 +450,10 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { fileBlob.parameters().add(new Int32Value(status.getCode())); fileBlob.parameters().add(new StringValue(status.getDescription())); target.invokeSync(fileBlob, 600); + if (fileBlob.isError()) { + log.warning("Failed delivering reference '" + reference.value() + "' with file '" + filename + "' to " + + target.toString() + " with error : '" + fileBlob.errorMessage() + "'."); + } } } @@ -458,6 +462,8 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { String fileReference = request.parameters().get(0).asString(); FileApiErrorCodes result; try { + // TODO remove once verified in system tests. + log.info("Received request for reference '" + fileReference + "'"); result = fileServer.hasFile(fileReference) ? FileApiErrorCodes.OK : FileApiErrorCodes.NOT_FOUND; diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/Tenants.java b/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/Tenants.java index c67ad0675b2..528a30e0191 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/Tenants.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/Tenants.java @@ -159,7 +159,7 @@ public class Tenants implements ConnectionStateListener, PathChildrenCacheListen Map<TenantName, Tenant> current = new LinkedHashMap<>(tenants); for (Map.Entry<TenantName, Tenant> entry : current.entrySet()) { TenantName tenant = entry.getKey(); - if (!newTenants.contains(tenant)) { + if (!newTenants.contains(tenant) && !DEFAULT_TENANT.equals(tenant)) { notifyRemovedTenant(tenant); entry.getValue().close(); tenants.remove(tenant); @@ -257,7 +257,7 @@ public class Tenants implements ConnectionStateListener, PathChildrenCacheListen * @return this Tenants instance */ public synchronized Tenants deleteTenant(TenantName name) { - if (name.equals(TenantName.defaultName())) + if (name.equals(DEFAULT_TENANT)) throw new IllegalArgumentException("Deleting 'default' tenant is not allowed"); Tenant tenant = tenants.get(name); tenant.delete(); @@ -275,7 +275,7 @@ public class Tenants implements ConnectionStateListener, PathChildrenCacheListen * @return the log string */ public static String logPre(ApplicationId app) { - if (TenantName.defaultName().equals(app.tenant())) return ""; + if (DEFAULT_TENANT.equals(app.tenant())) return ""; StringBuilder ret = new StringBuilder() .append(logPre(app.tenant())) .append("app:"+app.application().value()) diff --git a/configserver/src/main/sh/vespa-configserver-remove-state b/configserver/src/main/sh/vespa-configserver-remove-state index 404f0f89a53..c781fcb0c0d 100755 --- a/configserver/src/main/sh/vespa-configserver-remove-state +++ b/configserver/src/main/sh/vespa-configserver-remove-state @@ -75,14 +75,12 @@ usage() { sudo="sudo" ask=true remove_zookeeper_dir=true -remove_applications_dir=true remove_tenants_dir=true confirmed=true zookeeper_dir=var/zookeeper -applications_dir=var/db/vespa/config_server/serverdb/applications tenants_dir=var/db/vespa/config_server/serverdb/tenants -if [ -w $applications_dir ] && [ -w $zookeeper_dir ]; then +if [ -w $zookeeper_dir ]; then sudo="" fi @@ -123,9 +121,8 @@ confirm() { } garbage_collect_dirs() { - find $zookeeper_dir $applications_dir -type d -depth 2>/dev/null | while read dir; do + find $zookeeper_dir $tenants_dir -type d -depth 2>/dev/null | while read dir; do [ "$dir" = "$zookeeper_dir" ] && continue - [ "$dir" = "$applications_dir" ] && continue $sudo rmdir "$dir" 2>/dev/null done } @@ -148,10 +145,6 @@ if $remove_zookeeper_dir && [ -d $zookeeper_dir ]; then confirm_and_clean_dir $zookeeper_dir fi -if $remove_applications_dir && [ -d $applications_dir ]; then - confirm_and_clean_dir $applications_dir -fi - if $remove_tenants_dir && [ -d $tenants_dir ]; then confirm_and_clean_dir $tenants_dir fi diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/InjectedGlobalComponentRegistryTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/InjectedGlobalComponentRegistryTest.java index acda60049ab..dec9dd991de 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/InjectedGlobalComponentRegistryTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/InjectedGlobalComponentRegistryTest.java @@ -63,7 +63,7 @@ public class InjectedGlobalComponentRegistryTest { serverDB = new ConfigServerDB(configserverConfig); sessionPreparer = new SessionTest.MockSessionPreparer(); rpcServer = new RpcServer(configserverConfig, null, Metrics.createTestMetrics(), - new HostRegistries(), new ConfigRequestHostLivenessTracker(), new FileServer(FileDistribution.getDefaultFileDBRoot())); + new HostRegistries(), new ConfigRequestHostLivenessTracker(), new FileServer(FileDistribution.getDefaultFileDBPath())); generationCounter = new SuperModelGenerationCounter(curator); defRepo = new StaticConfigDefinitionRepo(); permanentApplicationPackage = new PermanentApplicationPackage(configserverConfig); diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/deploy/DeployTester.java b/configserver/src/test/java/com/yahoo/vespa/config/server/deploy/DeployTester.java index dea468eb0be..d9a0db7e811 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/deploy/DeployTester.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/deploy/DeployTester.java @@ -69,26 +69,19 @@ public class DeployTester { } public DeployTester(String appPath, List<ModelFactory> modelFactories) { - this(appPath, modelFactories, new ConfigserverConfig(new ConfigserverConfig.Builder() - .configServerDBDir(Files.createTempDir() - .getAbsolutePath()) - .configDefinitionsDir(Files.createTempDir() - .getAbsolutePath())), + this(appPath, modelFactories, + new ConfigserverConfig(new ConfigserverConfig.Builder() + .configServerDBDir(Files.createTempDir().getAbsolutePath()) + .configDefinitionsDir(Files.createTempDir().getAbsolutePath())), Clock.systemUTC()); } public DeployTester(String appPath, ConfigserverConfig configserverConfig) { - this(appPath, - Collections.singletonList(createModelFactory(Clock.systemUTC())), - configserverConfig, - Clock.systemUTC()); + this(appPath, Collections.singletonList(createModelFactory(Clock.systemUTC())), configserverConfig, Clock.systemUTC()); } public DeployTester(String appPath, ConfigserverConfig configserverConfig, Clock clock) { - this(appPath, - Collections.singletonList(createModelFactory(clock)), - configserverConfig, - clock); + this(appPath, Collections.singletonList(createModelFactory(clock)), configserverConfig, clock); } public DeployTester(String appPath, List<ModelFactory> modelFactories, ConfigserverConfig configserverConfig) { @@ -106,12 +99,12 @@ public class DeployTester { catch (Exception e) { throw new IllegalArgumentException(e); } - applicationRepository = new ApplicationRepository(tenants, - createHostProvisioner(), - clock); + applicationRepository = new ApplicationRepository(tenants, createHostProvisioner(), clock); } - public Tenant tenant() { return tenants.defaultTenant(); } + public Tenant tenant() { + return tenants.defaultTenant(); + } /** Create a model factory for the version of this source*/ public static ModelFactory createModelFactory(Clock clock) { @@ -137,6 +130,7 @@ public class DeployTester { * Do the initial "deploy" with the existing API-less code as the deploy API doesn't support first deploys yet. */ public ApplicationId deployApp(String appName, String vespaVersion, Instant now) { + Tenant tenant = tenant(); LocalSession session = tenant.getSessionFactory().createSession(testApp, appName, new TimeoutBudget(clock, Duration.ofSeconds(60))); ApplicationId id = ApplicationId.from(tenant.getName(), ApplicationName.from(appName), InstanceName.defaultName()); diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java index 0c2ace38389..4913798e5ad 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java @@ -1,3 +1,4 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config.server.filedistribution; import com.yahoo.config.FileReference; @@ -17,7 +18,7 @@ import static org.junit.Assert.assertFalse; public class FileServerTest { - FileServer fs = new FileServer("."); + FileServer fs = new FileServer(new File(".")); List<File> created = new LinkedList<>(); private void createCleanDir(String name) throws IOException{ diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/MockRpc.java b/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/MockRpc.java index b094a741f34..4c2a4b56751 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/MockRpc.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/MockRpc.java @@ -39,7 +39,7 @@ public class MockRpc extends RpcServer { public MockRpc(int port, boolean createDefaultTenant, boolean pretendToHaveLoadedAnyApplication) { super(createConfig(port), null, Metrics.createTestMetrics(), - new HostRegistries(), new ConfigRequestHostLivenessTracker(), new FileServer(FileDistribution.getDefaultFileDBRoot())); + new HostRegistries(), new ConfigRequestHostLivenessTracker(), new FileServer(FileDistribution.getDefaultFileDBPath())); if (createDefaultTenant) { onTenantCreate(TenantName.from("default"), new MockTenantProvider(pretendToHaveLoadedAnyApplication)); } diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/TestWithRpc.java b/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/TestWithRpc.java index 933cb770dd1..12dc584f055 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/TestWithRpc.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/TestWithRpc.java @@ -90,7 +90,7 @@ public class TestWithRpc { emptyNodeFlavors(), generationCounter)), Metrics.createTestMetrics(), new HostRegistries(), - hostLivenessTracker, new FileServer(FileDistribution.getDefaultFileDBRoot())); + hostLivenessTracker, new FileServer(FileDistribution.getDefaultFileDBPath())); rpcServer.onTenantCreate(TenantName.from("default"), tenantProvider); t = new Thread(rpcServer); t.start(); diff --git a/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedRequestHandler.java b/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedRequestHandler.java index 7142449f3d4..f6e1c1d6d5a 100644 --- a/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedRequestHandler.java +++ b/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedRequestHandler.java @@ -2,10 +2,13 @@ package com.yahoo.container.jdisc; import com.google.inject.Inject; +import com.yahoo.concurrent.CopyOnWriteHashMap; import com.yahoo.jdisc.Metric; import com.yahoo.jdisc.Request; import com.yahoo.jdisc.ResourceReference; import com.yahoo.jdisc.Response; +import com.yahoo.jdisc.application.BindingMatch; +import com.yahoo.jdisc.application.UriPattern; import com.yahoo.jdisc.handler.AbstractRequestHandler; import com.yahoo.jdisc.handler.BufferedContentChannel; import com.yahoo.jdisc.handler.ContentChannel; @@ -22,6 +25,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; +import static java.util.Collections.singletonMap; import javax.annotation.concurrent.GuardedBy; @@ -75,6 +79,21 @@ public abstract class ThreadedRequestHandler extends AbstractRequestHandler { this.allowAsyncResponse = allowAsyncResponse; } + private Map<String, Metric.Context> handlerContexts = new CopyOnWriteHashMap<>(); + private Metric.Context contextFor(BindingMatch match) { + if (match == null) return null; + UriPattern matched = match.matched(); + if (matched == null) return null; + String name = matched.toString(); + Metric.Context context = handlerContexts.get(name); + if (context == null) { + Map<String, String> dimensions = singletonMap("handler", name); + context = this.metric.createContext(dimensions); + handlerContexts.put(name, context); + } + return context; + } + /** * Handles a request by assigning a worker thread to it. * @@ -82,6 +101,7 @@ public abstract class ThreadedRequestHandler extends AbstractRequestHandler { */ @Override public final ContentChannel handleRequest(Request request, ResponseHandler responseHandler) { + metric.add("container.handled.requests", 1, contextFor(request.getBindingMatch())); if (request.getTimeout(TimeUnit.SECONDS) == null) { Duration timeout = getTimeout(); if (timeout != null) { @@ -173,7 +193,10 @@ public abstract class ThreadedRequestHandler extends AbstractRequestHandler { @Override public ContentChannel handleResponse(Response response) { if ( tryHasResponded()) throw new IllegalStateException("Response already handled"); - return responseHandler.handleResponse(response); + ContentChannel cc = responseHandler.handleResponse(response); + long millis = request.timeElapsed(TimeUnit.MILLISECONDS); + metric.set("container.handled.latency", millis, contextFor(request.getBindingMatch())); + return cc; } private boolean tryHasResponded() { diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/Application.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/Application.java index 9ad03522179..d26ff8ad65d 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/Application.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/Application.java @@ -172,12 +172,6 @@ public class Application { return "application '" + id + "'"; } - /** Returns true if there is no current change to deploy - i.e deploying is empty or completely deployed */ - public boolean deployingCompleted() { - if ( ! deploying.isPresent()) return true; - return deploymentJobs().isDeployed(deploying.get()); - } - /** Returns true if there is a current change which is blocked from being deployed to production at this instant */ public boolean deployingBlocked(Instant instant) { if ( ! deploying.isPresent()) return false; diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/ApplicationController.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/ApplicationController.java index 5677eb2c08d..f5bf77cd6d3 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/ApplicationController.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/ApplicationController.java @@ -322,7 +322,6 @@ public class ApplicationController { application = application.with(application.deploymentJobs() .withTriggering(jobType.get(), application.deploying(), - triggering.id(), version, Optional.of(revision), triggering.reason(), diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/Controller.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/Controller.java index 660013daa62..ddf5b989b07 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/Controller.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/Controller.java @@ -204,7 +204,7 @@ public class Controller extends AbstractComponent { } // TODO: Model the response properly - // TODO: What is this + // TODO: What is this -- I believe it fetches, and purges, errors from some log server public JsonNode grabLog(DeploymentId deploymentId) { return configServerClient.grabLog(deploymentId); } diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/LockedApplication.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/LockedApplication.java index 2b78bac48af..56b76260f14 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/LockedApplication.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/LockedApplication.java @@ -61,13 +61,11 @@ public class LockedApplication extends Application { deploying(), hasOutstandingChange()), lock); } - // TODO: runId is always -1. Don't pass it, and pretend it's there. - public LockedApplication withJobTriggering(long runId, DeploymentJobs.JobType type, Optional<Change> change, + public LockedApplication withJobTriggering(DeploymentJobs.JobType type, Optional<Change> change, String reason, Instant triggerTime, Controller controller) { return new LockedApplication(new Application(id(), deploymentSpec(), validationOverrides(), deployments(), deploymentJobs().withTriggering(type, change, - runId, determineTriggerVersion(type, controller), determineTriggerRevision(type, controller), reason, diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/ApplicationList.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/ApplicationList.java index 45746b23908..bf5529c210f 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/ApplicationList.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/ApplicationList.java @@ -119,8 +119,8 @@ public class ApplicationList { } /** Returns the subset of applications which started failing after the given instant */ - public ApplicationList startedFailingAfter(Instant instant) { - return listOf(list.stream().filter(application -> application.deploymentJobs().failingSince().isAfter(instant))); + public ApplicationList startedFailingOnVersionAfter(Version version, Instant instant) { + return listOf(list.stream().filter(application -> JobList.from(application).firstFailing().on(version).firstFailing().after(instant).anyMatch())); } /** Returns the subset of applications which has the given upgrade policy */ diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/DeploymentJobs.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/DeploymentJobs.java index c532344f3a3..b5733703fd4 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/DeploymentJobs.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/DeploymentJobs.java @@ -68,7 +68,6 @@ public class DeploymentJobs { public DeploymentJobs withTriggering(JobType jobType, Optional<Change> change, - long runId, Version version, Optional<ApplicationRevision> revision, String reason, @@ -76,8 +75,7 @@ public class DeploymentJobs { Map<JobType, JobStatus> status = new LinkedHashMap<>(this.status); status.compute(jobType, (type, job) -> { if (job == null) job = JobStatus.initial(jobType); - return job.withTriggering(runId, - version, + return job.withTriggering( version, revision, change.isPresent() && change.get() instanceof Change.VersionChange, reason, @@ -133,13 +131,6 @@ public class DeploymentJobs { return true; // other environments do not have any preconditions } - /** Returns whether the given change has been deployed completely */ - public boolean isDeployed(Change change) { - return status.values().stream() - .filter(status -> status.type().isProduction()) - .allMatch(status -> isSuccessful(change, status.type())); - } - /** Returns whether job has completed successfully */ public boolean isSuccessful(Change change, JobType jobType) { return Optional.ofNullable(jobStatus().get(jobType)) @@ -147,15 +138,6 @@ public class DeploymentJobs { .filter(status -> status.lastCompletedWas(change)) .isPresent(); } - - /** Returns the oldest failingSince time of the jobs of this, or null if none are failing */ - public Instant failingSince() { - return JobList.from(jobStatus().values()) - .failing() - .mapToList(job -> job.firstFailing().get().at()) - .stream() - .min(Comparator.naturalOrder()).orElse(null); - } /** * Returns the id of the Screwdriver project running these deployment jobs diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/JobStatus.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/JobStatus.java index 9b8377fb087..ceb04d88026 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/JobStatus.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/JobStatus.java @@ -55,9 +55,9 @@ public class JobStatus { return new JobStatus(type, Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty()); } - public JobStatus withTriggering(long runId, Version version, Optional<ApplicationRevision> revision, + public JobStatus withTriggering(Version version, Optional<ApplicationRevision> revision, boolean upgrade, String reason, Instant triggerTime) { - return new JobStatus(type, jobError, Optional.of(new JobRun(runId, version, revision, upgrade, reason, triggerTime)), + return new JobStatus(type, jobError, Optional.of(new JobRun(-1, version, revision, upgrade, reason, triggerTime)), lastCompleted, firstFailing, lastSuccess); } @@ -102,7 +102,9 @@ public class JobStatus { public DeploymentJobs.JobType type() { return type; } /** Returns true unless this job last completed with a failure */ - public boolean isSuccess() { return ! jobError.isPresent(); } + public boolean isSuccess() { + return lastCompleted().isPresent() && ! jobError.isPresent(); + } /** Returns true if last triggered is newer than last completed and was started after timeoutLimit */ public boolean isRunning(Instant timeoutLimit) { diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentOrder.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentOrder.java index 6e81f08a4a4..1f7a61599b3 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentOrder.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentOrder.java @@ -93,18 +93,6 @@ public class DeploymentOrder { return job == JobType.component; } - /** Returns whether the given job is last in a deployment */ - public boolean isLast(JobType job, Application application) { - List<DeploymentSpec.Step> deploymentSteps = deploymentSteps(application); - if (deploymentSteps.isEmpty()) { // Deployment spec not yet available - return false; - } - DeploymentSpec.Step lastStep = deploymentSteps.get(deploymentSteps.size() - 1); - Optional<DeploymentSpec.Step> step = fromJob(job, application); - // Step may not exist for all jobs, e.g. component - return step.map(s -> s.equals(lastStep)).orElse(false); - } - /** Returns jobs for given deployment spec, in the order they are declared */ public List<JobType> jobsFrom(DeploymentSpec deploymentSpec) { return deploymentSpec.steps().stream() @@ -120,7 +108,6 @@ public class DeploymentOrder { .collect(collectingAndThen(toList(), Collections::unmodifiableList)); } - // TODO: These sorts should throw when not all items to sort are listed. /** Returns deployments sorted according to declared zones */ public List<Deployment> sortBy(List<DeploymentSpec.DeclaredZone> zones, Collection<Deployment> deployments) { List<Zone> productionZones = zones.stream() diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTrigger.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTrigger.java index 473c279e28e..06b62b6bcee 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTrigger.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/deployment/DeploymentTrigger.java @@ -95,8 +95,7 @@ public class DeploymentTrigger { return; } } - // TODO: Should rather fix deployingCompleted() (let it check that all declared zones have the change). - else if (order.isLast(report.jobType(), application) && application.deployingCompleted()) { + else if (deploymentComplete(application)) { // change completed application = application.withDeploying(Optional.empty()); } @@ -109,7 +108,7 @@ public class DeploymentTrigger { else if (isCapacityConstrained(report.jobType()) && shouldRetryOnOutOfCapacity(application, report.jobType())) application = trigger(report.jobType(), application, true, "Retrying on out of capacity"); - else if (shouldRetryNow(application)) + else if (shouldRetryNow(application, report.jobType())) application = trigger(report.jobType(), application, false, "Immediate retry on failure"); @@ -117,6 +116,14 @@ public class DeploymentTrigger { } } + /** Returns whether all production zones listed in deployment spec last were successful on the currently deploying change. */ + private boolean deploymentComplete(LockedApplication application) { + if ( ! application.deploying().isPresent()) return true; + return order.jobsFrom(application.deploymentSpec()).stream() + .filter(JobType::isProduction) + .allMatch(jobType -> application.deploymentJobs().isSuccessful(application.deploying().get(), jobType)); + } + /** * Find jobs that can and should run but are currently not. */ @@ -138,6 +145,7 @@ public class DeploymentTrigger { List<JobType> jobs = order.jobsFrom(application.deploymentSpec()); // Should the first step be triggered? + // TODO: How can the first job not be systemTest (second ccondition)? if ( ! jobs.isEmpty() && jobs.get(0).equals(JobType.systemTest) && application.deploying().get() instanceof Change.VersionChange) { Version target = ((Change.VersionChange)application.deploying().get()).version(); @@ -177,7 +185,7 @@ public class DeploymentTrigger { Change change = application.deploying().get(); if ( ! previous.lastSuccess().isPresent() && - ! productionJobHasSucceededFor(previous, change)) return false; + ! productionUpgradeHasSucceededFor(previous, change)) return false; if (change instanceof Change.VersionChange) { Version targetVersion = ((Change.VersionChange)change).version(); @@ -208,7 +216,6 @@ public class DeploymentTrigger { if ( ! application.deploying().isPresent()) return; // No ongoing change, no need to retry // Retry first failing job - // TODO: Use JobList, requires JobList to sort according to deploymentSpec. for (JobType jobType : order.jobsFrom(application.deploymentSpec())) { JobStatus jobStatus = application.deploymentJobs().jobStatus().get(jobType); if (isFailing(application.deploying().get(), jobStatus)) { @@ -298,6 +305,7 @@ public class DeploymentTrigger { private boolean isFailing(Change change, JobStatus status) { return status != null && ! status.isSuccess() + && status.lastCompleted().isPresent() && status.lastCompleted().get().lastCompletedWas(change); } @@ -329,9 +337,10 @@ public class DeploymentTrigger { return false; } - /** Retry immediately only if this just started failing. Otherwise retry periodically */ - private boolean shouldRetryNow(Application application) { - return application.deploymentJobs().failingSince().isAfter(clock.instant().minus(Duration.ofSeconds(10))); + /** Retry immediately only if this job just started failing. Otherwise retry periodically */ + private boolean shouldRetryNow(Application application, JobType jobType) { + JobStatus jobStatus = application.deploymentJobs().jobStatus().get(jobType); + return (jobStatus != null && jobStatus.firstFailing().get().at().isAfter(clock.instant().minus(Duration.ofSeconds(10)))); } /** Decide whether to retry due to capacity restrictions */ @@ -364,18 +373,17 @@ public class DeploymentTrigger { * * @param jobType the type of the job to trigger, or null to trigger nothing * @param application the application to trigger the job for - * @param first whether to trigger the job before other jobs + * @param first whether to put the job at the front of the build system queue (or the back) * @param reason describes why the job is triggered * @return the application in the triggered state, which *must* be stored by the caller */ - // TODO: Improve explanation for first parameter. private LockedApplication trigger(JobType jobType, LockedApplication application, boolean first, String reason) { - if (isRunningProductionJob(application)) return application; + if (jobType.isProduction() && isRunningProductionJob(application)) return application; return triggerAllowParallel(jobType, application, first, false, reason); } private LockedApplication trigger(List<JobType> jobs, LockedApplication application, String reason) { - if (isRunningProductionJob(application)) return application; + if (jobs.stream().anyMatch(JobType::isProduction) && isRunningProductionJob(application)) return application; for (JobType job : jobs) application = triggerAllowParallel(job, application, false, false, reason); return application; @@ -407,8 +415,7 @@ public class DeploymentTrigger { application.deploying().map(d -> "deploying " + d).orElse("restarted deployment"), reason)); buildSystem.addJob(application.id(), jobType, first); - return application.withJobTriggering(-1, jobType, application.deploying(), reason, clock.instant(), - controller); + return application.withJobTriggering(jobType, application.deploying(), reason, clock.instant(), controller); } /** Returns true if the given proposed job triggering should be effected */ @@ -441,7 +448,7 @@ public class DeploymentTrigger { * When upgrading it is ok to trigger the next job even if the previous failed if the previous has earlier succeeded * on the version we are currently upgrading to */ - private boolean productionJobHasSucceededFor(JobStatus jobStatus, Change change) { + private boolean productionUpgradeHasSucceededFor(JobStatus jobStatus, Change change) { if ( ! (change instanceof Change.VersionChange) ) return false; if ( ! isProduction(jobStatus.type())) return false; Optional<JobStatus.JobRun> lastSuccess = jobStatus.lastSuccess(); @@ -475,7 +482,6 @@ public class DeploymentTrigger { if ( ! application.deploying().isPresent()) return true; if ( application.deploying().get() instanceof Change.ApplicationChange) return true; // more changes are ok - // TODO: Don't these two below allow concurrent App and Version changes? if ( application.deploymentJobs().hasFailures()) return true; // allow changes to fix upgrade problems if ( application.isBlocked(clock.instant())) return true; // allow testing changes while upgrade blocked (debatable) // Otherwise, the application is currently upgrading, without failures, and we should wait with the revision. diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/ApplicationSerializer.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/ApplicationSerializer.java index 04394f08e64..130d6f92d59 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/ApplicationSerializer.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/ApplicationSerializer.java @@ -385,7 +385,7 @@ public class ApplicationSerializer { private Optional<JobStatus.JobRun> jobRunFromSlime(Inspector object) { if ( ! object.valid()) return Optional.empty(); - return Optional.of(new JobStatus.JobRun(optionalLong(object.field(jobRunIdField)).orElse(-1L), // TODO: Make non-optional after November 2017 + return Optional.of(new JobStatus.JobRun(optionalLong(object.field(jobRunIdField)).orElse(-1L), // TODO: Make non-optional after November 2017 -- what about lastTriggered? new Version(object.field(versionField).asString()), applicationRevisionFromSlime(object.field(revisionField)), object.field(upgradeField).asBool(), diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/versions/VespaVersion.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/versions/VespaVersion.java index de42a9fba4e..4bcee5782ee 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/versions/VespaVersion.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/versions/VespaVersion.java @@ -58,7 +58,7 @@ public class VespaVersion implements Comparable<VespaVersion> { return Confidence.broken; // 'broken' if 4 non-canary was broken by this, and that is at least 10% of all - if (nonCanaryApplicationsBroken(failingOnThis, productionOnThis, releasedAt, controller.curator())) + if (nonCanaryApplicationsBroken(statistics.version(), failingOnThis, productionOnThis, releasedAt, controller.curator())) return Confidence.broken; // 'low' unless all canary applications are upgraded @@ -142,11 +142,12 @@ public class VespaVersion implements Comparable<VespaVersion> { } - private static boolean nonCanaryApplicationsBroken(ApplicationList failingOnThis, + private static boolean nonCanaryApplicationsBroken(Version version, + ApplicationList failingOnThis, ApplicationList productionOnThis, Instant releasedAt, CuratorDb curator) { - ApplicationList failingNonCanaries = failingOnThis.without(UpgradePolicy.canary).startedFailingAfter(releasedAt); + ApplicationList failingNonCanaries = failingOnThis.without(UpgradePolicy.canary).startedFailingOnVersionAfter(version, releasedAt); ApplicationList productionNonCanaries = productionOnThis.without(UpgradePolicy.canary); if (productionNonCanaries.size() + failingNonCanaries.size() == 0 || curator.readIgnoreConfidence()) return false; diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java index b607241c18c..1574801b77b 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java @@ -31,7 +31,6 @@ import com.yahoo.vespa.hosted.controller.application.Change; import com.yahoo.vespa.hosted.controller.application.DeploymentJobs; import com.yahoo.vespa.hosted.controller.application.DeploymentJobs.JobError; import com.yahoo.vespa.hosted.controller.application.DeploymentJobs.JobType; -import com.yahoo.vespa.hosted.controller.application.JobList; import com.yahoo.vespa.hosted.controller.application.JobStatus; import com.yahoo.vespa.hosted.controller.athenz.NToken; import com.yahoo.vespa.hosted.controller.athenz.mock.AthenzDbMock; @@ -80,6 +79,12 @@ public class ControllerTest { .region("corp-us-east-1") .build(); + private static final ApplicationPackage applicationPackage2 = new ApplicationPackageBuilder() + .environment(Environment.prod) + .region("corp-us-east-1") + .region("us-west-1") + .build(); + @Test public void testDeployment() { // Setup system @@ -104,12 +109,12 @@ public class ControllerTest { Optional<ApplicationRevision> revision = ((Change.ApplicationChange)tester.controller().applications().require(app1.id()).deploying().get()).revision(); assertTrue("Revision has been set during deployment", revision.isPresent()); assertStatus(JobStatus.initial(stagingTest) - .withTriggering(-1, version1, revision, false, "", tester.clock().instant().minus(Duration.ofMillis(1))) + .withTriggering(version1, revision, false, "", tester.clock().instant().minus(Duration.ofMillis(1))) .withCompletion(42, Optional.empty(), tester.clock().instant(), tester.controller()), app1.id(), tester.controller()); // Causes first deployment job to be triggered assertStatus(JobStatus.initial(productionCorpUsEast1) - .withTriggering(-1, version1, revision, false, "", tester.clock().instant()), app1.id(), tester.controller()); + .withTriggering(version1, revision, false, "", tester.clock().instant()), app1.id(), tester.controller()); tester.clock().advance(Duration.ofSeconds(1)); // production job (failing) @@ -117,9 +122,9 @@ public class ControllerTest { assertEquals(4, applications.require(app1.id()).deploymentJobs().jobStatus().size()); JobStatus expectedJobStatus = JobStatus.initial(productionCorpUsEast1) - .withTriggering(-1, version1, revision, false, "", tester.clock().instant()) // Triggered first without revision info + .withTriggering(version1, revision, false, "", tester.clock().instant()) // Triggered first without revision info .withCompletion(42, Optional.of(JobError.unknown), tester.clock().instant(), tester.controller()) - .withTriggering(-1, version1, revision, false, "", tester.clock().instant()); // Re-triggering (due to failure) has revision info + .withTriggering(version1, revision, false, "", tester.clock().instant()); // Re-triggering (due to failure) has revision info assertStatus(expectedJobStatus, app1.id(), tester.controller()); @@ -142,20 +147,20 @@ public class ControllerTest { tester.notifyJobCompletion(component, app1, true); tester.deployAndNotify(app1, applicationPackage, true, false, systemTest); assertStatus(JobStatus.initial(systemTest) - .withTriggering(-1, version1, revision, false, "", tester.clock().instant().minus(Duration.ofMillis(1))) + .withTriggering(version1, revision, false, "", tester.clock().instant().minus(Duration.ofMillis(1))) .withCompletion(42, Optional.empty(), tester.clock().instant(), tester.controller()), app1.id(), tester.controller()); tester.deployAndNotify(app1, applicationPackage, true, stagingTest); // production job succeeding now tester.deployAndNotify(app1, applicationPackage, true, productionCorpUsEast1); expectedJobStatus = expectedJobStatus - .withTriggering(-1, version1, revision, false, "", tester.clock().instant().minus(Duration.ofMillis(1))) + .withTriggering(version1, revision, false, "", tester.clock().instant().minus(Duration.ofMillis(1))) .withCompletion(42, Optional.empty(), tester.clock().instant(), tester.controller()); assertStatus(expectedJobStatus, app1.id(), tester.controller()); // causes triggering of next production job assertStatus(JobStatus.initial(productionUsEast3) - .withTriggering(-1, version1, revision, false, "", tester.clock().instant()), + .withTriggering(version1, revision, false, "", tester.clock().instant()), app1.id(), tester.controller()); tester.deployAndNotify(app1, applicationPackage, true, productionUsEast3); @@ -448,12 +453,13 @@ public class ControllerTest { tester.notifyJobCompletion(stagingTest, app1, Optional.of(JobError.outOfCapacity)); assertTrue("No jobs queued", buildSystem.jobs().isEmpty()); - // app2 and app3: New change triggers staging-test jobs + // app2 and app3: New change triggers system-test jobs + // Provide a changed application package, too, or the deployment is a no-op. tester.notifyJobCompletion(component, app2, true); - tester.deployAndNotify(app2, applicationPackage, true, systemTest); + tester.deployAndNotify(app2, applicationPackage2, true, systemTest); tester.notifyJobCompletion(component, app3, true); - tester.deployAndNotify(app3, applicationPackage, true, systemTest); + tester.deployAndNotify(app3, applicationPackage2, true, systemTest); assertEquals(2, buildSystem.jobs().size()); diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/DeploymentIssueReporterTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/DeploymentIssueReporterTest.java index b5ea8e0a36f..41cf7a331bb 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/DeploymentIssueReporterTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/DeploymentIssueReporterTest.java @@ -135,9 +135,7 @@ public class DeploymentIssueReporterTest { // app3 now has a new failure past max failure age; see that a new issue is filed. tester.notifyJobCompletion(component, app3, true); - tester.deployAndNotify(app3, applicationPackage, true, systemTest); - tester.deployAndNotify(app3, applicationPackage, true, stagingTest); - tester.deployAndNotify(app3, applicationPackage, false, productionCorpUsEast1); + tester.deployAndNotify(app3, applicationPackage, false, systemTest); tester.clock().advance(maxInactivity.plus(maxFailureAge)); reporter.maintain(); diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/UpgraderTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/UpgraderTest.java index 6ef48bac7fa..e92d5400a3d 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/UpgraderTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/UpgraderTest.java @@ -95,7 +95,10 @@ public class UpgraderTest { assertEquals("New system version: Should upgrade Canaries", 2, tester.buildSystem().jobs().size()); tester.completeUpgradeWithError(canary0, version, "canary", DeploymentJobs.JobType.stagingTest); assertEquals("Other Canary was cancelled", 2, tester.buildSystem().jobs().size()); - // TODO: Cancelled? + // TODO: Cancelled would mean it was triggerd, removed from the build system, but never reported in. + // Thus, the expected number of jobs should be 1, above: the retrying canary0. + // Further, canary1 should be retried after the timeout period of 12 hours, but verifying this is + // not possible when jobs are consumed form the build system on notification, rather than on deploy. tester.updateVersionStatus(version); assertEquals(VespaVersion.Confidence.broken, tester.controller().versionStatus().systemVersion().get().confidence()); @@ -149,10 +152,6 @@ public class UpgraderTest { // Deploy application change tester.deployCompletely("default0"); - // Let maintainer trigger version change, and deploy it, too - tester.upgrader().maintain(); - tester.deployCompletely("default0"); - tester.updateVersionStatus(version); assertEquals(VespaVersion.Confidence.high, tester.controller().versionStatus().systemVersion().get().confidence()); tester.upgrader().maintain(); diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/persistence/ApplicationSerializerTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/persistence/ApplicationSerializerTest.java index c408f2ee214..daccab8efbf 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/persistence/ApplicationSerializerTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/persistence/ApplicationSerializerTest.java @@ -69,10 +69,10 @@ public class ApplicationSerializerTest { List<JobStatus> statusList = new ArrayList<>(); statusList.add(JobStatus.initial(DeploymentJobs.JobType.systemTest) - .withTriggering(37, Version.fromString("5.6.7"), Optional.empty(), true, "Test", Instant.ofEpochMilli(7)) + .withTriggering(Version.fromString("5.6.7"), Optional.empty(), true, "Test", Instant.ofEpochMilli(7)) .withCompletion(30, Optional.empty(), Instant.ofEpochMilli(8), tester.controller())); statusList.add(JobStatus.initial(DeploymentJobs.JobType.stagingTest) - .withTriggering(12, Version.fromString("5.6.6"), Optional.empty(), true, "Test 2", Instant.ofEpochMilli(5)) + .withTriggering(Version.fromString("5.6.6"), Optional.empty(), true, "Test 2", Instant.ofEpochMilli(5)) .withCompletion(11, Optional.of(JobError.unknown), Instant.ofEpochMilli(6), tester.controller())); DeploymentJobs deploymentJobs = new DeploymentJobs(projectId, statusList, Optional.empty()); @@ -105,7 +105,6 @@ public class ApplicationSerializerTest { serialized.deploymentJobs().jobStatus().get(DeploymentJobs.JobType.systemTest)); assertEquals( original.deploymentJobs().jobStatus().get(DeploymentJobs.JobType.stagingTest), serialized.deploymentJobs().jobStatus().get(DeploymentJobs.JobType.stagingTest)); - assertEquals(original.deploymentJobs().failingSince(), serialized.deploymentJobs().failingSince()); assertEquals(original.hasOutstandingChange(), serialized.hasOutstandingChange()); diff --git a/eval/src/tests/eval/interpreted_function/interpreted_function_test.cpp b/eval/src/tests/eval/interpreted_function/interpreted_function_test.cpp index 0f62e69b081..76f776df552 100644 --- a/eval/src/tests/eval/interpreted_function/interpreted_function_test.cpp +++ b/eval/src/tests/eval/interpreted_function/interpreted_function_test.cpp @@ -2,6 +2,7 @@ #include <vespa/vespalib/testkit/test_kit.h> #include <vespa/eval/eval/function.h> #include <vespa/eval/eval/tensor_spec.h> +#include <vespa/eval/eval/operation.h> #include <vespa/eval/eval/interpreted_function.h> #include <vespa/eval/eval/test/eval_spec.h> #include <vespa/eval/eval/basic_nodes.h> diff --git a/eval/src/tests/eval/tensor_function/tensor_function_test.cpp b/eval/src/tests/eval/tensor_function/tensor_function_test.cpp index 150ec8a25db..681a4dabc19 100644 --- a/eval/src/tests/eval/tensor_function/tensor_function_test.cpp +++ b/eval/src/tests/eval/tensor_function/tensor_function_test.cpp @@ -12,26 +12,26 @@ using namespace vespalib; using namespace vespalib::eval; using namespace vespalib::eval::tensor_function; -struct EvalCtx : TensorFunction::Input { +struct EvalCtx { const TensorEngine &engine; Stash stash; ErrorValue error; - std::map<size_t, Value::UP> tensors; + std::vector<Value::UP> tensors; + std::vector<Value::CREF> params; EvalCtx(const TensorEngine &engine_in) : engine(engine_in), stash(), error(), tensors() {} ~EvalCtx() {} - void add_tensor(Value::UP tensor, size_t id) { - tensors.emplace(id, std::move(tensor)); + size_t add_tensor(Value::UP tensor) { + size_t id = params.size(); + params.emplace_back(*tensor); + tensors.push_back(std::move(tensor)); + return id; } - const Value &get_tensor(size_t id) const override { - if (tensors.count(id) == 0) { - return error; - } - return *tensors.find(id)->second; + const Value &eval(const TensorFunction &fun) { + return fun.eval(params, stash); } - const Value &eval(const TensorFunction &fun) { return fun.eval(*this, stash); } - TensorFunction::UP compile(tensor_function::Node_UP expr) const { - return engine.compile(std::move(expr)); + const TensorFunction &compile(const tensor_function::Node &expr) { + return engine.compile(expr, stash); } Value::UP make_tensor_inject() { return engine.from_spec( @@ -110,54 +110,56 @@ void verify_equal(const Value &expect, const Value &value) { TEST("require that tensor injection works") { EvalCtx ctx(SimpleTensorEngine::ref()); - ctx.add_tensor(ctx.make_tensor_inject(), 1); + size_t a_id = ctx.add_tensor(ctx.make_tensor_inject()); Value::UP expect = ctx.make_tensor_inject(); - auto fun = inject(ValueType::from_spec("tensor(x[2],y[2])"), 1); - EXPECT_EQUAL(expect->type(), fun->result_type); - auto prog = ctx.compile(std::move(fun)); - TEST_DO(verify_equal(*expect, ctx.eval(*prog))); + const auto &fun = inject(ValueType::from_spec("tensor(x[2],y[2])"), a_id, ctx.stash); + EXPECT_EQUAL(expect->type(), fun.result_type); + const auto &prog = ctx.compile(fun); + TEST_DO(verify_equal(*expect, ctx.eval(prog))); } TEST("require that partial tensor reduction works") { EvalCtx ctx(SimpleTensorEngine::ref()); - ctx.add_tensor(ctx.make_tensor_reduce_input(), 1); + size_t a_id = ctx.add_tensor(ctx.make_tensor_reduce_input()); Value::UP expect = ctx.make_tensor_reduce_y_output(); - auto fun = reduce(inject(ValueType::from_spec("tensor(x[3],y[2])"), 1), Aggr::SUM, {"y"}); - EXPECT_EQUAL(expect->type(), fun->result_type); - auto prog = ctx.compile(std::move(fun)); - TEST_DO(verify_equal(*expect, ctx.eval(*prog))); + const auto &fun = reduce(inject(ValueType::from_spec("tensor(x[3],y[2])"), a_id, ctx.stash), Aggr::SUM, {"y"}, ctx.stash); + EXPECT_EQUAL(expect->type(), fun.result_type); + const auto &prog = ctx.compile(fun); + TEST_DO(verify_equal(*expect, ctx.eval(prog))); } TEST("require that full tensor reduction works") { EvalCtx ctx(SimpleTensorEngine::ref()); - ctx.add_tensor(ctx.make_tensor_reduce_input(), 1); - auto fun = reduce(inject(ValueType::from_spec("tensor(x[3],y[2])"), 1), Aggr::SUM, {}); - EXPECT_EQUAL(ValueType::from_spec("double"), fun->result_type); - auto prog = ctx.compile(std::move(fun)); - EXPECT_EQUAL(21.0, ctx.eval(*prog).as_double()); + size_t a_id = ctx.add_tensor(ctx.make_tensor_reduce_input()); + const auto &fun = reduce(inject(ValueType::from_spec("tensor(x[3],y[2])"), a_id, ctx.stash), Aggr::SUM, {}, ctx.stash); + EXPECT_EQUAL(ValueType::from_spec("double"), fun.result_type); + const auto &prog = ctx.compile(fun); + const Value &result = ctx.eval(prog); + EXPECT_TRUE(result.is_double()); + EXPECT_EQUAL(21.0, result.as_double()); } TEST("require that tensor map works") { EvalCtx ctx(SimpleTensorEngine::ref()); - ctx.add_tensor(ctx.make_tensor_map_input(), 1); + size_t a_id = ctx.add_tensor(ctx.make_tensor_map_input()); Value::UP expect = ctx.make_tensor_map_output(); - auto fun = map(inject(ValueType::from_spec("tensor(x{},y{})"), 1), operation::Neg::f); - EXPECT_EQUAL(expect->type(), fun->result_type); - auto prog = ctx.compile(std::move(fun)); - TEST_DO(verify_equal(*expect, ctx.eval(*prog))); + const auto &fun = map(inject(ValueType::from_spec("tensor(x{},y{})"), a_id, ctx.stash), operation::Neg::f, ctx.stash); + EXPECT_EQUAL(expect->type(), fun.result_type); + const auto &prog = ctx.compile(fun); + TEST_DO(verify_equal(*expect, ctx.eval(prog))); } TEST("require that tensor join works") { EvalCtx ctx(SimpleTensorEngine::ref()); - ctx.add_tensor(ctx.make_tensor_apply_lhs(), 1); - ctx.add_tensor(ctx.make_tensor_apply_rhs(), 2); + size_t a_id = ctx.add_tensor(ctx.make_tensor_apply_lhs()); + size_t b_id = ctx.add_tensor(ctx.make_tensor_apply_rhs()); Value::UP expect = ctx.make_tensor_apply_output(); - auto fun = join(inject(ValueType::from_spec("tensor(x{},y{})"), 1), - inject(ValueType::from_spec("tensor(y{},z{})"), 2), - operation::Mul::f); - EXPECT_EQUAL(expect->type(), fun->result_type); - auto prog = ctx.compile(std::move(fun)); - TEST_DO(verify_equal(*expect, ctx.eval(*prog))); + const auto &fun = join(inject(ValueType::from_spec("tensor(x{},y{})"), a_id, ctx.stash), + inject(ValueType::from_spec("tensor(y{},z{})"), b_id, ctx.stash), + operation::Mul::f, ctx.stash); + EXPECT_EQUAL(expect->type(), fun.result_type); + const auto &prog = ctx.compile(fun); + TEST_DO(verify_equal(*expect, ctx.eval(prog))); } TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/eval/src/tests/tensor/dense_dot_product_function/dense_dot_product_function_test.cpp b/eval/src/tests/tensor/dense_dot_product_function/dense_dot_product_function_test.cpp index f272166fede..ca77997bac7 100644 --- a/eval/src/tests/tensor/dense_dot_product_function/dense_dot_product_function_test.cpp +++ b/eval/src/tests/tensor/dense_dot_product_function/dense_dot_product_function_test.cpp @@ -50,13 +50,14 @@ asDenseTensor(const tensor::Tensor &tensor) return dynamic_cast<const DenseTensor &>(tensor); } -class FunctionInput : public TensorFunction::Input +class FunctionInput { private: tensor::Tensor::UP _lhsTensor; tensor::Tensor::UP _rhsTensor; const DenseTensor &_lhsDenseTensor; const DenseTensor &_rhsDenseTensor; + std::vector<Value::CREF> _params; public: FunctionInput(size_t lhsNumCells, size_t rhsNumCells) @@ -64,14 +65,11 @@ public: _rhsTensor(makeTensor(rhsNumCells, 5.0)), _lhsDenseTensor(asDenseTensor(*_lhsTensor)), _rhsDenseTensor(asDenseTensor(*_rhsTensor)) - {} - virtual const Value &get_tensor(size_t id) const override { - if (id == 0) { - return *_lhsTensor; - } else { - return *_rhsTensor; - } + { + _params.emplace_back(_lhsDenseTensor); + _params.emplace_back(_rhsDenseTensor); } + ConstArrayRef<Value::CREF> get() const { return _params; } double expectedDotProduct() const { return calcDotProduct(_lhsDenseTensor, _rhsDenseTensor); } @@ -85,11 +83,11 @@ struct Fixture ~Fixture(); double eval() const { Stash stash; - const Value &result = function.eval(input, stash); + const Value &result = function.eval(input.get(), stash); ASSERT_TRUE(result.is_double()); LOG(info, "eval(): (%s) * (%s) = %f", - input.get_tensor(0).type().to_spec().c_str(), - input.get_tensor(1).type().to_spec().c_str(), + input.get()[0].get().type().to_spec().c_str(), + input.get()[1].get().type().to_spec().c_str(), result.as_double()); return result.as_double(); } diff --git a/eval/src/tests/tensor/dense_tensor_function_compiler/dense_tensor_function_compiler_test.cpp b/eval/src/tests/tensor/dense_tensor_function_compiler/dense_tensor_function_compiler_test.cpp index 6dcfc0791e7..63829650cc5 100644 --- a/eval/src/tests/tensor/dense_tensor_function_compiler/dense_tensor_function_compiler_test.cpp +++ b/eval/src/tests/tensor/dense_tensor_function_compiler/dense_tensor_function_compiler_test.cpp @@ -3,32 +3,36 @@ #include <vespa/vespalib/testkit/test_kit.h> #include <vespa/eval/tensor/dense/dense_dot_product_function.h> #include <vespa/eval/tensor/dense/dense_tensor_function_compiler.h> +#include <vespa/eval/eval/operation.h> using namespace vespalib::eval; using namespace vespalib::eval::operation; using namespace vespalib::eval::tensor_function; using namespace vespalib::tensor; +using vespalib::Stash; template <typename T> const T *as(const TensorFunction &function) { return dynamic_cast<const T *>(&function); } -TensorFunction::UP +const TensorFunction & compileDotProduct(const vespalib::string &lhsType, - const vespalib::string &rhsType) + const vespalib::string &rhsType, + Stash &stash) { - Node_UP reduceNode = reduce(join(inject(ValueType::from_spec(lhsType), 1), - inject(ValueType::from_spec(rhsType), 3), - Mul::f), - Aggr::SUM, {}); - return DenseTensorFunctionCompiler::compile(std::move(reduceNode)); + const Node &reduceNode = reduce(join(inject(ValueType::from_spec(lhsType), 1, stash), + inject(ValueType::from_spec(rhsType), 3, stash), + Mul::f, stash), + Aggr::SUM, {}, stash); + return DenseTensorFunctionCompiler::compile(reduceNode, stash); } void assertCompiledDotProduct(const vespalib::string &lhsType, const vespalib::string &rhsType) { - TensorFunction::UP func = compileDotProduct(lhsType, rhsType); - const DenseDotProductFunction *dotProduct = as<DenseDotProductFunction>(*func); + Stash stash; + const TensorFunction &func = compileDotProduct(lhsType, rhsType, stash); + const DenseDotProductFunction *dotProduct = as<DenseDotProductFunction>(func); ASSERT_TRUE(dotProduct); EXPECT_EQUAL(1u, dotProduct->lhsTensorId()); EXPECT_EQUAL(3u, dotProduct->rhsTensorId()); @@ -38,8 +42,9 @@ void assertNotCompiledDotProduct(const vespalib::string &lhsType, const vespalib::string &rhsType) { - TensorFunction::UP func = compileDotProduct(lhsType, rhsType); - const Reduce *reduce = as<Reduce>(*func); + Stash stash; + const TensorFunction &func = compileDotProduct(lhsType, rhsType, stash); + const Reduce *reduce = as<Reduce>(func); EXPECT_TRUE(reduce); } diff --git a/eval/src/vespa/eval/eval/interpreted_function.cpp b/eval/src/vespa/eval/eval/interpreted_function.cpp index 7e80a699107..59d6753e142 100644 --- a/eval/src/vespa/eval/eval/interpreted_function.cpp +++ b/eval/src/vespa/eval/eval/interpreted_function.cpp @@ -5,6 +5,7 @@ #include "node_traverser.h" #include "check_type.h" #include "tensor_spec.h" +#include "operation.h" #include <vespa/vespalib/util/classname.h> #include <vespa/eval/eval/llvm/compile_cache.h> #include <vespa/vespalib/util/benchmark_timer.h> @@ -118,32 +119,19 @@ const T &undef_cref() { } struct TensorFunctionArgArgMeta { - TensorFunction::UP function; + const TensorFunction &function; size_t param1; size_t param2; - TensorFunctionArgArgMeta(TensorFunction::UP function_in, size_t param1_in, size_t param2_in) - : function(std::move(function_in)), param1(param1_in), param2(param2_in) {} -}; - -struct ArgArgInput : TensorFunction::Input { - const TensorFunctionArgArgMeta &meta; - State &state; - ArgArgInput(const TensorFunctionArgArgMeta &meta_in, State &state_in) - : meta(meta_in), state(state_in) {} - const Value &get_tensor(size_t id) const override { - if (id == 0) { - return state.params->resolve(meta.param1, state.stash); - } else if (id == 1) { - return state.params->resolve(meta.param2, state.stash); - } - return undef_cref<Value>(); - } + TensorFunctionArgArgMeta(const TensorFunction &function_in, size_t param1_in, size_t param2_in) + : function(function_in), param1(param1_in), param2(param2_in) {} }; void op_tensor_function_arg_arg(State &state, uint64_t param) { const TensorFunctionArgArgMeta &meta = unwrap_param<TensorFunctionArgArgMeta>(param); - ArgArgInput input(meta, state); - state.stack.push_back(meta.function->eval(input, state.stash)); + Value::CREF params[2] = + {state.params->resolve(meta.param1, state.stash), + state.params->resolve(meta.param2, state.stash)}; + state.stack.push_back(meta.function.eval(ConstArrayRef<Value::CREF>(params, 2), state.stash)); } //----------------------------------------------------------------------------- @@ -279,12 +267,12 @@ struct ProgramBuilder : public NodeVisitor, public NodeTraverser { program.pop_back(); // load auto a = as<Symbol>(node.get_child(0).get_child(0)); auto b = as<Symbol>(node.get_child(0).get_child(1)); - auto ir = tensor_function::reduce(tensor_function::join( - tensor_function::inject(types.get_type(*a), 0), - tensor_function::inject(types.get_type(*b), 1), - operation::Mul::f), node.aggr(), node.dimensions()); - auto fun = tensor_engine.compile(std::move(ir)); - const auto &meta = stash.create<TensorFunctionArgArgMeta>(std::move(fun), a->id(), b->id()); + const auto &ir = tensor_function::reduce(tensor_function::join( + tensor_function::inject(types.get_type(*a), 0, stash), + tensor_function::inject(types.get_type(*b), 1, stash), + operation::Mul::f, stash), node.aggr(), node.dimensions(), stash); + const auto &fun = tensor_engine.compile(ir, stash); + const auto &meta = stash.create<TensorFunctionArgArgMeta>(fun, a->id(), b->id()); program.emplace_back(op_tensor_function_arg_arg, wrap_param<TensorFunctionArgArgMeta>(meta)); } else { ReduceParams ¶ms = stash.create<ReduceParams>(node.aggr(), node.dimensions()); diff --git a/eval/src/vespa/eval/eval/tensor_engine.h b/eval/src/vespa/eval/eval/tensor_engine.h index 357b9c8f82f..02a7f0c655a 100644 --- a/eval/src/vespa/eval/eval/tensor_engine.h +++ b/eval/src/vespa/eval/eval/tensor_engine.h @@ -34,21 +34,21 @@ struct TensorEngine { using Aggr = eval::Aggr; using Tensor = eval::Tensor; + using TensorFunction = eval::TensorFunction; using TensorSpec = eval::TensorSpec; using Value = eval::Value; using ValueType = eval::ValueType; using join_fun_t = double (*)(double, double); using map_fun_t = double (*)(double); - virtual TensorFunction::UP compile(tensor_function::Node_UP expr) const { return std::move(expr); } - - // havardpe: new API, WIP virtual TensorSpec to_spec(const Value &value) const = 0; virtual Value::UP from_spec(const TensorSpec &spec) const = 0; virtual void encode(const Value &value, nbostream &output) const = 0; virtual Value::UP decode(nbostream &input) const = 0; + virtual const TensorFunction &compile(const tensor_function::Node &expr, Stash &) const { return expr; } + virtual const Value &map(const Value &a, map_fun_t function, Stash &stash) const = 0; virtual const Value &join(const Value &a, const Value &b, join_fun_t function, Stash &stash) const = 0; virtual const Value &reduce(const Value &a, Aggr aggr, const std::vector<vespalib::string> &dimensions, Stash &stash) const = 0; diff --git a/eval/src/vespa/eval/eval/tensor_function.cpp b/eval/src/vespa/eval/eval/tensor_function.cpp index ef91d9e10a9..9cd7c7fc9c2 100644 --- a/eval/src/vespa/eval/eval/tensor_function.cpp +++ b/eval/src/vespa/eval/eval/tensor_function.cpp @@ -20,63 +20,58 @@ const TensorEngine &infer_engine(const std::initializer_list<Value::CREF> &value return SimpleTensorEngine::ref(); } -void Inject::accept(TensorFunctionVisitor &visitor) const { visitor.visit(*this); } -void Reduce::accept(TensorFunctionVisitor &visitor) const { visitor.visit(*this); } -void Map ::accept(TensorFunctionVisitor &visitor) const { visitor.visit(*this); } -void Join ::accept(TensorFunctionVisitor &visitor) const { visitor.visit(*this); } - //----------------------------------------------------------------------------- const Value & -Inject::eval(const Input &input, Stash &) const +Inject::eval(ConstArrayRef<Value::CREF> params, Stash &) const { - return input.get_tensor(tensor_id); + return params[tensor_id]; } const Value & -Reduce::eval(const Input &input, Stash &stash) const +Reduce::eval(ConstArrayRef<Value::CREF> params, Stash &stash) const { - const Value &a = tensor->eval(input, stash); + const Value &a = tensor.eval(params, stash); const TensorEngine &engine = infer_engine({a}); return engine.reduce(a, aggr, dimensions, stash); } const Value & -Map::eval(const Input &input, Stash &stash) const +Map::eval(ConstArrayRef<Value::CREF> params, Stash &stash) const { - const Value &a = tensor->eval(input, stash); + const Value &a = tensor.eval(params, stash); const TensorEngine &engine = infer_engine({a}); return engine.map(a, function, stash); } const Value & -Join::eval(const Input &input, Stash &stash) const +Join::eval(ConstArrayRef<Value::CREF> params, Stash &stash) const { - const Value &a = lhs_tensor->eval(input, stash); - const Value &b = rhs_tensor->eval(input, stash); + const Value &a = lhs_tensor.eval(params, stash); + const Value &b = rhs_tensor.eval(params, stash); const TensorEngine &engine = infer_engine({a,b}); return engine.join(a, b, function, stash); } //----------------------------------------------------------------------------- -Node_UP inject(const ValueType &type, size_t tensor_id) { - return std::make_unique<Inject>(type, tensor_id); +const Node &inject(const ValueType &type, size_t tensor_id, Stash &stash) { + return stash.create<Inject>(type, tensor_id); } -Node_UP reduce(Node_UP tensor, Aggr aggr, const std::vector<vespalib::string> &dimensions) { - ValueType result_type = tensor->result_type.reduce(dimensions); - return std::make_unique<Reduce>(result_type, std::move(tensor), aggr, dimensions); +const Node &reduce(const Node &tensor, Aggr aggr, const std::vector<vespalib::string> &dimensions, Stash &stash) { + ValueType result_type = tensor.result_type.reduce(dimensions); + return stash.create<Reduce>(result_type, tensor, aggr, dimensions); } -Node_UP map(Node_UP tensor, map_fun_t function) { - ValueType result_type = tensor->result_type; - return std::make_unique<Map>(result_type, std::move(tensor), function); +const Node &map(const Node &tensor, map_fun_t function, Stash &stash) { + ValueType result_type = tensor.result_type; + return stash.create<Map>(result_type, tensor, function); } -Node_UP join(Node_UP lhs_tensor, Node_UP rhs_tensor, join_fun_t function) { - ValueType result_type = ValueType::join(lhs_tensor->result_type, rhs_tensor->result_type); - return std::make_unique<Join>(result_type, std::move(lhs_tensor), std::move(rhs_tensor), function); +const Node &join(const Node &lhs_tensor, const Node &rhs_tensor, join_fun_t function, Stash &stash) { + ValueType result_type = ValueType::join(lhs_tensor.result_type, rhs_tensor.result_type); + return stash.create<Join>(result_type, lhs_tensor, rhs_tensor, function); } } // namespace vespalib::eval::tensor_function diff --git a/eval/src/vespa/eval/eval/tensor_function.h b/eval/src/vespa/eval/eval/tensor_function.h index cff21b7b9aa..359cabc18a0 100644 --- a/eval/src/vespa/eval/eval/tensor_function.h +++ b/eval/src/vespa/eval/eval/tensor_function.h @@ -5,8 +5,9 @@ #include <memory> #include <vector> #include <vespa/vespalib/stllike/string.h> +#include <vespa/vespalib/util/arrayref.h> #include "value_type.h" -#include "operation.h" +#include "value.h" #include "aggr.h" namespace vespalib { @@ -15,7 +16,6 @@ class Stash; namespace eval { -class Value; class Tensor; //----------------------------------------------------------------------------- @@ -29,28 +29,19 @@ class Tensor; **/ struct TensorFunction { - typedef std::unique_ptr<TensorFunction> UP; - /** - * Interface used to obtain input to a tensor function. - **/ - struct Input { - virtual const Value &get_tensor(size_t id) const = 0; - virtual ~Input() {} - }; - - /** - * Evaluate this tensor function based on the given input. The - * given stash can be used to store temporary objects that need to - * be kept alive for the return value to be valid. The return - * value must conform to the result type indicated by the - * intermediate representation describing this tensor function. + * Evaluate this tensor function based on the given + * parameters. The given stash can be used to store temporary + * objects that need to be kept alive for the return value to be + * valid. The return value must conform to the result type + * indicated by the intermediate representation describing this + * tensor function. * * @return result of evaluating this tensor function - * @param input external stuff needed to evaluate this function + * @param params external values needed to evaluate this function + * @param stash heterogeneous object store **/ - virtual const Value &eval(const Input &input, Stash &stash) const = 0; - + virtual const Value &eval(ConstArrayRef<Value::CREF> params, Stash &stash) const = 0; virtual ~TensorFunction() {} }; @@ -78,15 +69,13 @@ using join_fun_t = double (*)(double, double); **/ struct Node : public TensorFunction { - ValueType result_type; + const ValueType result_type; Node(const ValueType &result_type_in) : result_type(result_type_in) {} - virtual void accept(TensorFunctionVisitor &visitor) const = 0; Node(const Node &) = delete; Node &operator=(const Node &) = delete; Node(Node &&) = delete; Node &operator=(Node &&) = delete; }; -using Node_UP = std::unique_ptr<Node>; /** * Simple typecasting utility. @@ -95,68 +84,53 @@ template <typename T> const T *as(const Node &node) { return dynamic_cast<const T *>(&node); } struct Inject : Node { - size_t tensor_id; + const size_t tensor_id; Inject(const ValueType &result_type_in, size_t tensor_id_in) : Node(result_type_in), tensor_id(tensor_id_in) {} - void accept(TensorFunctionVisitor &visitor) const override; - const Value &eval(const Input &input, Stash &) const override; + const Value &eval(ConstArrayRef<Value::CREF> params, Stash &) const override; }; struct Reduce : Node { - Node_UP tensor; - Aggr aggr; - std::vector<vespalib::string> dimensions; + const Node &tensor; + const Aggr aggr; + const std::vector<vespalib::string> dimensions; Reduce(const ValueType &result_type_in, - Node_UP tensor_in, + const Node &tensor_in, Aggr aggr_in, const std::vector<vespalib::string> &dimensions_in) - : Node(result_type_in), tensor(std::move(tensor_in)), aggr(aggr_in), dimensions(dimensions_in) {} - void accept(TensorFunctionVisitor &visitor) const override; - const Value &eval(const Input &input, Stash &stash) const override; + : Node(result_type_in), tensor(tensor_in), aggr(aggr_in), dimensions(dimensions_in) {} + const Value &eval(ConstArrayRef<Value::CREF> params, Stash &stash) const override; }; struct Map : Node { - Node_UP tensor; - map_fun_t function; + const Node &tensor; + const map_fun_t function; Map(const ValueType &result_type_in, - Node_UP tensor_in, + const Node &tensor_in, map_fun_t function_in) - : Node(result_type_in), tensor(std::move(tensor_in)), function(function_in) {} - void accept(TensorFunctionVisitor &visitor) const override; - const Value &eval(const Input &input, Stash &stash) const override; + : Node(result_type_in), tensor(tensor_in), function(function_in) {} + const Value &eval(ConstArrayRef<Value::CREF> params, Stash &stash) const override; }; struct Join : Node { - Node_UP lhs_tensor; - Node_UP rhs_tensor; - join_fun_t function; + const Node &lhs_tensor; + const Node &rhs_tensor; + const join_fun_t function; Join(const ValueType &result_type_in, - Node_UP lhs_tensor_in, - Node_UP rhs_tensor_in, + const Node &lhs_tensor_in, + const Node &rhs_tensor_in, join_fun_t function_in) - : Node(result_type_in), lhs_tensor(std::move(lhs_tensor_in)), - rhs_tensor(std::move(rhs_tensor_in)), function(function_in) {} - void accept(TensorFunctionVisitor &visitor) const override; - const Value &eval(const Input &input, Stash &stash) const override; + : Node(result_type_in), lhs_tensor(lhs_tensor_in), + rhs_tensor(rhs_tensor_in), function(function_in) {} + const Value &eval(ConstArrayRef<Value::CREF> params, Stash &stash) const override; }; -Node_UP inject(const ValueType &type, size_t tensor_id); -Node_UP reduce(Node_UP tensor, Aggr aggr, const std::vector<vespalib::string> &dimensions); -Node_UP map(Node_UP tensor, map_fun_t function); -Node_UP join(Node_UP lhs_tensor, Node_UP rhs_tensor, join_fun_t function); +const Node &inject(const ValueType &type, size_t tensor_id, Stash &stash); +const Node &reduce(const Node &tensor, Aggr aggr, const std::vector<vespalib::string> &dimensions, Stash &stash); +const Node &map(const Node &tensor, map_fun_t function, Stash &stash); +const Node &join(const Node &lhs_tensor, const Node &rhs_tensor, join_fun_t function, Stash &stash); } // namespace vespalib::eval::tensor_function - -struct TensorFunctionVisitor { - virtual void visit(const tensor_function::Inject &) = 0; - virtual void visit(const tensor_function::Reduce &) = 0; - virtual void visit(const tensor_function::Map &) = 0; - virtual void visit(const tensor_function::Join &) = 0; - virtual ~TensorFunctionVisitor() {} -}; - -//----------------------------------------------------------------------------- - } // namespace vespalib::eval } // namespace vespalib diff --git a/eval/src/vespa/eval/eval/test/tensor_conformance.cpp b/eval/src/vespa/eval/eval/test/tensor_conformance.cpp index c45e2df3432..aa90cd566d5 100644 --- a/eval/src/vespa/eval/eval/test/tensor_conformance.cpp +++ b/eval/src/vespa/eval/eval/test/tensor_conformance.cpp @@ -238,20 +238,28 @@ const size_t tensor_id_a = 11; const size_t tensor_id_b = 12; // input used when evaluating in retained mode -struct Input : TensorFunction::Input { +struct Input { std::vector<Value::UP> tensors; + std::vector<Value::CREF> params; + ~Input() {} + void pad_params() { + for (size_t i = 0; i < tensor_id_a; ++i) { + params.push_back(ErrorValue::instance); + } + } Input(Value::UP a) : tensors() { + pad_params(); tensors.push_back(std::move(a)); + params.emplace_back(*tensors.back()); } Input(Value::UP a, Value::UP b) : tensors() { + pad_params(); tensors.push_back(std::move(a)); + params.emplace_back(*tensors.back()); tensors.push_back(std::move(b)); + params.emplace_back(*tensors.back()); } - const Value &get_tensor(size_t id) const override { - size_t offset = (id - tensor_id_a); - ASSERT_GREATER(tensors.size(), offset); - return *tensors[offset]; - } + ConstArrayRef<Value::CREF> get() const { return params; } }; // evaluate tensor reduce operation using tensor engine retained api @@ -264,11 +272,11 @@ struct RetainedReduce : Eval { Result eval(const TensorEngine &engine, const TensorSpec &a) const override { Stash stash; auto a_type = ValueType::from_spec(a.type()); - auto ir = tensor_function::reduce(tensor_function::inject(a_type, tensor_id_a), aggr, dimensions); - ValueType expect_type = ir->result_type; - auto fun = engine.compile(std::move(ir)); + const auto &ir = tensor_function::reduce(tensor_function::inject(a_type, tensor_id_a, stash), aggr, dimensions, stash); + ValueType expect_type = ir.result_type; + const auto &fun = engine.compile(ir, stash); Input input(engine.from_spec(a)); - return Result(engine, check_type(fun->eval(input, stash), expect_type)); + return Result(engine, check_type(fun.eval(input.get(), stash), expect_type)); } }; @@ -279,11 +287,11 @@ struct RetainedMap : Eval { Result eval(const TensorEngine &engine, const TensorSpec &a) const override { Stash stash; auto a_type = ValueType::from_spec(a.type()); - auto ir = tensor_function::map(tensor_function::inject(a_type, tensor_id_a), function); - ValueType expect_type = ir->result_type; - auto fun = engine.compile(std::move(ir)); + const auto &ir = tensor_function::map(tensor_function::inject(a_type, tensor_id_a, stash), function, stash); + ValueType expect_type = ir.result_type; + const auto &fun = engine.compile(ir, stash); Input input(engine.from_spec(a)); - return Result(engine, check_type(fun->eval(input, stash), expect_type)); + return Result(engine, check_type(fun.eval(input.get(), stash), expect_type)); } }; @@ -295,13 +303,13 @@ struct RetainedJoin : Eval { Stash stash; auto a_type = ValueType::from_spec(a.type()); auto b_type = ValueType::from_spec(b.type()); - auto ir = tensor_function::join(tensor_function::inject(a_type, tensor_id_a), - tensor_function::inject(b_type, tensor_id_b), - function); - ValueType expect_type = ir->result_type; - auto fun = engine.compile(std::move(ir)); + const auto &ir = tensor_function::join(tensor_function::inject(a_type, tensor_id_a, stash), + tensor_function::inject(b_type, tensor_id_b, stash), + function, stash); + ValueType expect_type = ir.result_type; + const auto &fun = engine.compile(ir, stash); Input input(engine.from_spec(a), engine.from_spec(b)); - return Result(engine, check_type(fun->eval(input, stash), expect_type)); + return Result(engine, check_type(fun.eval(input.get(), stash), expect_type)); } }; diff --git a/eval/src/vespa/eval/tensor/default_tensor_engine.cpp b/eval/src/vespa/eval/tensor/default_tensor_engine.cpp index 9fcdfe36eba..2506e6fcf0e 100644 --- a/eval/src/vespa/eval/tensor/default_tensor_engine.cpp +++ b/eval/src/vespa/eval/tensor/default_tensor_engine.cpp @@ -11,6 +11,7 @@ #include <vespa/eval/eval/value.h> #include <vespa/eval/eval/tensor_spec.h> #include <vespa/eval/eval/simple_tensor_engine.h> +#include <vespa/eval/eval/operation.h> #include <cassert> @@ -21,6 +22,7 @@ using eval::Aggr; using eval::Aggregator; using eval::DoubleValue; using eval::ErrorValue; +using eval::TensorFunction; using eval::TensorSpec; using eval::Value; using eval::ValueType; @@ -93,12 +95,6 @@ const Value &fallback_reduce(const Value &a, eval::Aggr aggr, const std::vector< const DefaultTensorEngine DefaultTensorEngine::_engine; -eval::TensorFunction::UP -DefaultTensorEngine::compile(eval::tensor_function::Node_UP expr) const -{ - return DenseTensorFunctionCompiler::compile(std::move(expr)); -} - TensorSpec DefaultTensorEngine::to_spec(const Value &value) const { @@ -206,6 +202,14 @@ DefaultTensorEngine::decode(nbostream &input) const //----------------------------------------------------------------------------- +const TensorFunction & +DefaultTensorEngine::compile(const eval::tensor_function::Node &expr, Stash &stash) const +{ + return DenseTensorFunctionCompiler::compile(expr, stash); +} + +//----------------------------------------------------------------------------- + const Value & DefaultTensorEngine::map(const Value &a, map_fun_t function, Stash &stash) const { diff --git a/eval/src/vespa/eval/tensor/default_tensor_engine.h b/eval/src/vespa/eval/tensor/default_tensor_engine.h index 86ee4459902..1cef4ba2d35 100644 --- a/eval/src/vespa/eval/tensor/default_tensor_engine.h +++ b/eval/src/vespa/eval/tensor/default_tensor_engine.h @@ -19,14 +19,14 @@ private: public: static const TensorEngine &ref() { return _engine; }; - virtual eval::TensorFunction::UP compile(eval::tensor_function::Node_UP expr) const override; - TensorSpec to_spec(const Value &value) const override; Value::UP from_spec(const TensorSpec &spec) const override; void encode(const Value &value, nbostream &output) const override; Value::UP decode(nbostream &input) const override; + const TensorFunction &compile(const eval::tensor_function::Node &expr, Stash &stash) const override; + const Value &map(const Value &a, map_fun_t function, Stash &stash) const override; const Value &join(const Value &a, const Value &b, join_fun_t function, Stash &stash) const override; const Value &reduce(const Value &a, Aggr aggr, const std::vector<vespalib::string> &dimensions, Stash &stash) const override; diff --git a/eval/src/vespa/eval/tensor/dense/dense_dot_product_function.cpp b/eval/src/vespa/eval/tensor/dense/dense_dot_product_function.cpp index 530eaed9aa9..705496714fa 100644 --- a/eval/src/vespa/eval/tensor/dense/dense_dot_product_function.cpp +++ b/eval/src/vespa/eval/tensor/dense/dense_dot_product_function.cpp @@ -31,10 +31,10 @@ getCellsRef(const eval::Value &value) } const eval::Value & -DenseDotProductFunction::eval(const Input &input, Stash &stash) const +DenseDotProductFunction::eval(ConstArrayRef<eval::Value::CREF> params, Stash &stash) const { - DenseTensorView::CellsRef lhsCells = getCellsRef(input.get_tensor(_lhsTensorId)); - DenseTensorView::CellsRef rhsCells = getCellsRef(input.get_tensor(_rhsTensorId)); + DenseTensorView::CellsRef lhsCells = getCellsRef(params[_lhsTensorId]); + DenseTensorView::CellsRef rhsCells = getCellsRef(params[_rhsTensorId]); size_t numCells = std::min(lhsCells.size(), rhsCells.size()); double result = _hwAccelerator->dotProduct(lhsCells.cbegin(), rhsCells.cbegin(), numCells); return stash.create<eval::DoubleValue>(result); diff --git a/eval/src/vespa/eval/tensor/dense/dense_dot_product_function.h b/eval/src/vespa/eval/tensor/dense/dense_dot_product_function.h index 905939cc781..8ad57d69524 100644 --- a/eval/src/vespa/eval/tensor/dense/dense_dot_product_function.h +++ b/eval/src/vespa/eval/tensor/dense/dense_dot_product_function.h @@ -24,7 +24,7 @@ public: DenseDotProductFunction(size_t lhsTensorId_, size_t rhsTensorId_); size_t lhsTensorId() const { return _lhsTensorId; } size_t rhsTensorId() const { return _rhsTensorId; } - virtual const eval::Value &eval(const Input &input, Stash &stash) const override; + const eval::Value &eval(ConstArrayRef<eval::Value::CREF> params, Stash &stash) const override; }; } // namespace tensor diff --git a/eval/src/vespa/eval/tensor/dense/dense_tensor_function_compiler.cpp b/eval/src/vespa/eval/tensor/dense/dense_tensor_function_compiler.cpp index a22307c25ad..e9ee7d30692 100644 --- a/eval/src/vespa/eval/tensor/dense/dense_tensor_function_compiler.cpp +++ b/eval/src/vespa/eval/tensor/dense/dense_tensor_function_compiler.cpp @@ -2,6 +2,7 @@ #include "dense_dot_product_function.h" #include "dense_tensor_function_compiler.h" +#include <vespa/eval/eval/operation.h> #include <vespa/vespalib/test/insertion_operators.h> #include <iostream> @@ -36,30 +37,30 @@ isCompatibleTensorsForDotProduct(const ValueType &lhsType, const ValueType &rhsT struct DotProductFunctionCompiler { - static TensorFunction::UP compile(Node_UP expr) { - const Reduce *reduce = as<Reduce>(*expr); + static const TensorFunction &compile(const Node &expr, Stash &stash) { + const Reduce *reduce = as<Reduce>(expr); if (reduce && (reduce->aggr == Aggr::SUM) && willReduceAllDimensions(reduce->dimensions)) { - const Join *join = as<Join>(*reduce->tensor); + const Join *join = as<Join>(reduce->tensor); if (join && (join->function == Mul::f)) { - const Inject *lhsTensor = as<Inject>(*join->lhs_tensor); - const Inject *rhsTensor = as<Inject>(*join->rhs_tensor); + const Inject *lhsTensor = as<Inject>(join->lhs_tensor); + const Inject *rhsTensor = as<Inject>(join->rhs_tensor); if (lhsTensor && rhsTensor && isCompatibleTensorsForDotProduct(lhsTensor->result_type, rhsTensor->result_type)) { - return std::make_unique<DenseDotProductFunction>(lhsTensor->tensor_id, rhsTensor->tensor_id); + return stash.create<DenseDotProductFunction>(lhsTensor->tensor_id, rhsTensor->tensor_id); } } } - return std::move(expr); + return expr; } }; } -TensorFunction::UP -DenseTensorFunctionCompiler::compile(Node_UP expr) +const TensorFunction & +DenseTensorFunctionCompiler::compile(const eval::tensor_function::Node &expr, Stash &stash) { - return DotProductFunctionCompiler::compile(std::move(expr)); + return DotProductFunctionCompiler::compile(expr, stash); } } // namespace tensor diff --git a/eval/src/vespa/eval/tensor/dense/dense_tensor_function_compiler.h b/eval/src/vespa/eval/tensor/dense/dense_tensor_function_compiler.h index ef940bf38f9..d5ba4e4f7a7 100644 --- a/eval/src/vespa/eval/tensor/dense/dense_tensor_function_compiler.h +++ b/eval/src/vespa/eval/tensor/dense/dense_tensor_function_compiler.h @@ -5,6 +5,9 @@ #include <vespa/eval/eval/tensor_function.h> namespace vespalib { + +class Stash; + namespace tensor { /** @@ -13,7 +16,7 @@ namespace tensor { */ struct DenseTensorFunctionCompiler { - static eval::TensorFunction::UP compile(eval::tensor_function::Node_UP expr); + static const eval::TensorFunction &compile(const eval::tensor_function::Node &expr, Stash &stash); }; } // namespace tensor diff --git a/fastlib/src/vespa/fastlib/net/httpserver.cpp b/fastlib/src/vespa/fastlib/net/httpserver.cpp index a9bba95a8ff..0d1b75ec7fe 100644 --- a/fastlib/src/vespa/fastlib/net/httpserver.cpp +++ b/fastlib/src/vespa/fastlib/net/httpserver.cpp @@ -367,7 +367,7 @@ int Fast_HTTPServer::Start(void) int retCode = FASTLIB_SUCCESS; { - std::unique_lock<std::mutex> runningGuard(_runningMutex); + std::lock_guard<std::mutex> runningGuard(_runningMutex); if (!_isRunning) { // Try listening retCode = Listen(); @@ -391,7 +391,7 @@ int Fast_HTTPServer::Start(void) void Fast_HTTPServer::Stop(void) { { - std::unique_lock<std::mutex> runningGuard(_runningMutex); + std::lock_guard<std::mutex> runningGuard(_runningMutex); _stopSignalled = true; if (_acceptThread) { _acceptThread->SetBreakFlag(); @@ -407,7 +407,7 @@ Fast_HTTPServer::Stop(void) { bool Fast_HTTPServer::StopSignalled(void) { bool retVal; - std::unique_lock<std::mutex> runningGuard(_runningMutex); + std::lock_guard<std::mutex> runningGuard(_runningMutex); retVal = _stopSignalled; return retVal; } @@ -458,7 +458,7 @@ void Fast_HTTPServer::Run(FastOS_ThreadInterface *thisThread, void *params) Fast_Socket *mySocket; { - std::unique_lock<std::mutex> runningGuard(_runningMutex); + std::lock_guard<std::mutex> runningGuard(_runningMutex); _isRunning = true; _stopSignalled = false; } @@ -516,7 +516,7 @@ void Fast_HTTPServer::Run(FastOS_ThreadInterface *thisThread, void *params) _serverSocket.SetSocketEvent(NULL); } - std::unique_lock<std::mutex> runningGuard(_runningMutex); + std::lock_guard<std::mutex> runningGuard(_runningMutex); _isRunning = false; } @@ -1040,7 +1040,7 @@ void Fast_HTTPServer::HandleFileRequest(const string & url, Fast_HTTPConnection& void Fast_HTTPServer::SetBaseDir(const char *baseDir) { - std::unique_lock<std::mutex> runningGuard(_runningMutex); + std::lock_guard<std::mutex> runningGuard(_runningMutex); if (!_isRunning) { _baseDir = baseDir; @@ -1178,14 +1178,14 @@ void Fast_HTTPServer::OutputNotFound(Fast_HTTPConnection& conn, void Fast_HTTPServer::AddConnection(Fast_HTTPConnection* connection) { - std::unique_lock<std::mutex> connectionGuard(_connectionLock); + std::lock_guard<std::mutex> connectionGuard(_connectionLock); _connections.Insert(connection); } void Fast_HTTPServer::RemoveConnection(Fast_HTTPConnection* connection) { - std::unique_lock<std::mutex> connectionGuard(_connectionLock); + std::lock_guard<std::mutex> connectionGuard(_connectionLock); _connections.RemoveElement(connection); _connectionCond.notify_one(); } diff --git a/fastlib/src/vespa/fastlib/text/normwordfolder.cpp b/fastlib/src/vespa/fastlib/text/normwordfolder.cpp index f383ff85df5..ca1f260515f 100644 --- a/fastlib/src/vespa/fastlib/text/normwordfolder.cpp +++ b/fastlib/src/vespa/fastlib/text/normwordfolder.cpp @@ -29,7 +29,7 @@ Fast_NormalizeWordFolder::Setup(uint32_t flags) { // Only allow setting these when not initialized or initializing... { - std::unique_lock<std::mutex> initGuard(_initMutex); + std::lock_guard<std::mutex> initGuard(_initMutex); _doAccentRemoval = (DO_ACCENT_REMOVAL & flags) != 0; // _doSmallToNormalKana = (DO_SMALL_TO_NORMAL_KANA & flags) != 0; // _doKatakanaToHiragana = (DO_KATAKANA_TO_HIRAGANA & flags) != 0; @@ -48,7 +48,7 @@ Fast_NormalizeWordFolder::Initialize() { unsigned int i; if (!_isInitialized) { - std::unique_lock<std::mutex> initGuard(_initMutex); + std::lock_guard<std::mutex> initGuard(_initMutex); if (!_isInitialized) { for (i = 0; i < 128; i++) diff --git a/fnet/src/vespa/fnet/connection.cpp b/fnet/src/vespa/fnet/connection.cpp index f2a4ab3a23d..ff6c1ab0b1d 100644 --- a/fnet/src/vespa/fnet/connection.cpp +++ b/fnet/src/vespa/fnet/connection.cpp @@ -45,7 +45,7 @@ public: void SyncPacket::Free() { - std::unique_lock<std::mutex> guard(_lock); + std::lock_guard<std::mutex> guard(_lock); _done = true; if (_waiting) { _cond.notify_one(); @@ -560,7 +560,7 @@ FNET_Connection::OpenChannel() uint32_t chid; { - std::unique_lock<std::mutex> guard(_ioc_lock); + std::lock_guard<std::mutex> guard(_ioc_lock); chid = GetNextID(); AddRef_NoLock(); } @@ -652,7 +652,7 @@ FNET_Connection::PostPacket(FNET_Packet *packet, uint32_t chid) uint32_t FNET_Connection::GetQueueLen() { - std::unique_lock<std::mutex> guard(_ioc_lock); + std::lock_guard<std::mutex> guard(_ioc_lock); return _queue.GetPacketCnt_NoLock() + _myQueue.GetPacketCnt_NoLock(); } @@ -710,7 +710,7 @@ FNET_Connection::HandleReadEvent() bool FNET_Connection::writePendingAfterConnect() { - std::unique_lock<std::mutex> guard(_ioc_lock); + std::lock_guard<std::mutex> guard(_ioc_lock); _state = FNET_CONNECTED; // SetState(FNET_CONNECTED) LOG(debug, "Connection(%s): State transition: %s -> %s", GetSpec(), GetStateString(FNET_CONNECTING), GetStateString(FNET_CONNECTED)); diff --git a/fnet/src/vespa/fnet/frt/invoker.cpp b/fnet/src/vespa/fnet/frt/invoker.cpp index 5afc355c68f..ce4daa988de 100644 --- a/fnet/src/vespa/fnet/frt/invoker.cpp +++ b/fnet/src/vespa/fnet/frt/invoker.cpp @@ -32,7 +32,7 @@ void FRT_SingleReqWait::RequestDone(FRT_RPCRequest *req) { (void) req; - std::unique_lock<std::mutex> guard(_lock); + std::lock_guard<std::mutex> guard(_lock); _done = true; if (_waiting) { _cond.notify_one(); diff --git a/fnet/src/vespa/fnet/iocomponent.cpp b/fnet/src/vespa/fnet/iocomponent.cpp index ec51c1f080e..8276da57e2e 100644 --- a/fnet/src/vespa/fnet/iocomponent.cpp +++ b/fnet/src/vespa/fnet/iocomponent.cpp @@ -48,7 +48,7 @@ FNET_IOComponent::UpdateTimeOut() { void FNET_IOComponent::AddRef() { - std::unique_lock<std::mutex> guard(_ioc_lock); + std::lock_guard<std::mutex> guard(_ioc_lock); assert(_ioc_refcnt > 0); _ioc_refcnt++; } @@ -66,7 +66,7 @@ void FNET_IOComponent::SubRef() { { - std::unique_lock<std::mutex> guard(_ioc_lock); + std::lock_guard<std::mutex> guard(_ioc_lock); assert(_ioc_refcnt > 0); if (--_ioc_refcnt > 0) { return; diff --git a/fnet/src/vespa/fnet/packetqueue.cpp b/fnet/src/vespa/fnet/packetqueue.cpp index 4fdb5842a9d..4331819d3f5 100644 --- a/fnet/src/vespa/fnet/packetqueue.cpp +++ b/fnet/src/vespa/fnet/packetqueue.cpp @@ -192,7 +192,7 @@ void FNET_PacketQueue::QueuePacket(FNET_Packet *packet, FNET_Context context) { assert(packet != nullptr); - std::unique_lock<std::mutex> guard(_lock); + std::lock_guard<std::mutex> guard(_lock); EnsureFree(); _buf[_in_pos]._packet = packet; // insert packet ref. _buf[_in_pos]._context = context; @@ -257,7 +257,7 @@ FNET_PacketQueue::DequeuePacket(uint32_t maxwait, FNET_Context *context) void FNET_PacketQueue::Print(uint32_t indent) { - std::unique_lock<std::mutex> guard(_lock); + std::lock_guard<std::mutex> guard(_lock); uint32_t i = _out_pos; uint32_t cnt = _bufused; diff --git a/fnet/src/vespa/fnet/scheduler.cpp b/fnet/src/vespa/fnet/scheduler.cpp index ef67407cb44..6c8340c1ff8 100644 --- a/fnet/src/vespa/fnet/scheduler.cpp +++ b/fnet/src/vespa/fnet/scheduler.cpp @@ -40,7 +40,7 @@ FNET_Scheduler::~FNET_Scheduler() bool empty = true; std::stringstream dump; { - std::unique_lock<std::mutex> guard(_lock); + std::lock_guard<std::mutex> guard(_lock); dump << "FNET_Scheduler {" << std::endl; dump << " [slot=" << _currSlot << "][iter=" << _currIter << "]" << std::endl; for (int i = 0; i <= NUM_SLOTS; i++) { @@ -70,7 +70,7 @@ FNET_Scheduler::Schedule(FNET_Task *task, double seconds) { uint32_t ticks = 1 + (uint32_t) (seconds * (1000 / SLOT_TICK) + 0.5); - std::unique_lock<std::mutex> guard(_lock); + std::lock_guard<std::mutex> guard(_lock); if (!task->_killed) { if (IsActive(task)) LinkOut(task); @@ -84,7 +84,7 @@ FNET_Scheduler::Schedule(FNET_Task *task, double seconds) void FNET_Scheduler::ScheduleNow(FNET_Task *task) { - std::unique_lock<std::mutex> guard(_lock); + std::lock_guard<std::mutex> guard(_lock); if (!task->_killed) { if (IsActive(task)) LinkOut(task); @@ -119,7 +119,7 @@ FNET_Scheduler::Kill(FNET_Task *task) void FNET_Scheduler::Print(FILE *dst) { - std::unique_lock<std::mutex> guard(_lock); + std::lock_guard<std::mutex> guard(_lock); fprintf(dst, "FNET_Scheduler {\n"); fprintf(dst, " [slot=%d][iter=%d]\n", _currSlot, _currIter); for (int i = 0; i <= NUM_SLOTS; i++) { diff --git a/fnet/src/vespa/fnet/transport_thread.cpp b/fnet/src/vespa/fnet/transport_thread.cpp index 61fcc26f1c6..443c90f1af4 100644 --- a/fnet/src/vespa/fnet/transport_thread.cpp +++ b/fnet/src/vespa/fnet/transport_thread.cpp @@ -173,7 +173,7 @@ FNET_TransportThread::UpdateStats() comp->FlushDirectWriteStats(); } { - std::unique_lock<std::mutex> guard(_lock); + std::lock_guard<std::mutex> guard(_lock); _stats.Update(&_counters, ms / 1000.0); } _counters.Clear(); @@ -238,7 +238,7 @@ FNET_TransportThread::FNET_TransportThread(FNET_Transport &owner_in) FNET_TransportThread::~FNET_TransportThread() { { - std::unique_lock<std::mutex> guard(_lock); + std::lock_guard<std::mutex> guard(_lock); _deleted = true; } if (_started && !_finished) { @@ -379,7 +379,7 @@ FNET_TransportThread::ShutDown(bool waitFinished) { bool wasEmpty = false; { - std::unique_lock<std::mutex> guard(_lock); + std::lock_guard<std::mutex> guard(_lock); if (!_shutdown) { _shutdown = true; wasEmpty = _queue.IsEmpty_NoLock(); @@ -413,7 +413,7 @@ FNET_TransportThread::InitEventLoop() bool wasStarted; bool wasDeleted; { - std::unique_lock<std::mutex> guard(_lock); + std::lock_guard<std::mutex> guard(_lock); wasStarted = _started; wasDeleted = _deleted; if (!_started && !_deleted) { @@ -440,7 +440,7 @@ void FNET_TransportThread::handle_wakeup() { { - std::unique_lock<std::mutex> guard(_lock); + std::lock_guard<std::mutex> guard(_lock); CountEvent(_queue.FlushPackets_NoLock(&_myQueue)); } @@ -590,7 +590,7 @@ FNET_TransportThread::EventLoopIteration() // flush event queue { - std::unique_lock<std::mutex> guard(_lock); + std::lock_guard<std::mutex> guard(_lock); _queue.FlushPackets_NoLock(&_myQueue); } @@ -623,7 +623,7 @@ FNET_TransportThread::EventLoopIteration() _myQueue.IsEmpty_NoLock()); { - std::unique_lock<std::mutex> guard(_lock); + std::lock_guard<std::mutex> guard(_lock); _finished = true; if (_waitFinished) { _cond.notify_all(); diff --git a/jdisc_core/src/main/java/com/yahoo/jdisc/Request.java b/jdisc_core/src/main/java/com/yahoo/jdisc/Request.java index 489a4c3dc10..7a76c588fb4 100644 --- a/jdisc_core/src/main/java/com/yahoo/jdisc/Request.java +++ b/jdisc_core/src/main/java/com/yahoo/jdisc/Request.java @@ -287,7 +287,7 @@ public class Request extends AbstractResource { } /** - * <p>Returns the allocated number of milliseconds that this Request is allowed to exist. If no timeout has been set + * <p>Returns the allocated number of time units that this Request is allowed to exist. If no timeout has been set * for this Request, this method returns <em>null</em>.</p> * * @param unit The unit to return the timeout in. @@ -306,7 +306,7 @@ public class Request extends AbstractResource { * <em>null</em>.</p> * * @param unit The unit to return the time in. - * @return The number of milliseconds left until this Request times out, or <em>null</em>. + * @return The number of time units left until this Request times out, or <em>null</em>. */ public Long timeRemaining(TimeUnit unit) { if (timeout == null) { @@ -316,6 +316,16 @@ public class Request extends AbstractResource { } /** + * <p>Returns the time that this Request has existed so far. + * + * @param unit The unit to return the time in. + * @return The number of time units elapsed since this Request was created. + */ + public long timeElapsed(TimeUnit unit) { + return unit.convert(container().currentTimeMillis() - creationTime, TimeUnit.MILLISECONDS); + } + + /** * <p>Returns the time at which this Request was created. This is whatever value was returned by {@link * Timer#currentTimeMillis()} when constructing this.</p> * diff --git a/jdisc_core/src/main/java/com/yahoo/jdisc/application/BindingMatch.java b/jdisc_core/src/main/java/com/yahoo/jdisc/application/BindingMatch.java index 5d4974f2dc4..7318b1b38ae 100644 --- a/jdisc_core/src/main/java/com/yahoo/jdisc/application/BindingMatch.java +++ b/jdisc_core/src/main/java/com/yahoo/jdisc/application/BindingMatch.java @@ -15,6 +15,7 @@ public class BindingMatch<T> { private final UriPattern.Match match; private final T target; + private final UriPattern matched; /** * <p>Constructs a new instance of this class.</p> @@ -22,12 +23,27 @@ public class BindingMatch<T> { * @param match The match information for this instance. * @param target The target of this match. * @throws NullPointerException If any argument is null. + * @deprecated use BindingMatch(UriPattern.Match match, T target, UriPattern matched) */ + @Deprecated public BindingMatch(UriPattern.Match match, T target) { + this(match, target, null); + } + + /** + * <p>Constructs a new instance of this class.</p> + * + * @param match The match information for this instance. + * @param target The target of this match. + * @param matched The matched URI pattern + * @throws NullPointerException If any argument is null. + */ + public BindingMatch(UriPattern.Match match, T target, UriPattern matched) { Objects.requireNonNull(match, "match"); Objects.requireNonNull(target, "target"); this.match = match; this.target = target; + this.matched = matched; } /** @@ -61,4 +77,14 @@ public class BindingMatch<T> { public T target() { return target; } + + /** + * <p>Returns the URI pattern that was matched.</p> + * + * @return The matched pattern. + */ + public UriPattern matched() { + return matched; + } + } diff --git a/jdisc_core/src/main/java/com/yahoo/jdisc/application/BindingSet.java b/jdisc_core/src/main/java/com/yahoo/jdisc/application/BindingSet.java index 7a21e204dd3..1e25846f63c 100644 --- a/jdisc_core/src/main/java/com/yahoo/jdisc/application/BindingSet.java +++ b/jdisc_core/src/main/java/com/yahoo/jdisc/application/BindingSet.java @@ -38,9 +38,10 @@ public class BindingSet<T> implements Iterable<Map.Entry<UriPattern, T>> { */ public BindingMatch<T> match(URI uri) { for (Map.Entry<UriPattern, T> entry : bindings) { - UriPattern.Match match = entry.getKey().match(uri); + UriPattern pattern = entry.getKey(); + UriPattern.Match match = pattern.match(uri); if (match != null) { - return new BindingMatch<>(match, entry.getValue()); + return new BindingMatch<>(match, entry.getValue(), pattern); } } return null; diff --git a/jdisc_core/src/main/java/com/yahoo/jdisc/test/ServerProviderConformanceTest.java b/jdisc_core/src/main/java/com/yahoo/jdisc/test/ServerProviderConformanceTest.java index d2b58090665..e7039e85e5e 100644 --- a/jdisc_core/src/main/java/com/yahoo/jdisc/test/ServerProviderConformanceTest.java +++ b/jdisc_core/src/main/java/com/yahoo/jdisc/test/ServerProviderConformanceTest.java @@ -24,7 +24,6 @@ import java.io.StringWriter; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.Modifier; -import java.net.URI; import java.nio.ByteBuffer; import java.util.HashSet; import java.util.Set; @@ -35,14 +34,18 @@ import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.logging.Logger; import java.util.stream.Stream; /** - * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a> + * @author Simon Thoresen Hult */ @SuppressWarnings("UnusedDeclaration") @Beta public abstract class ServerProviderConformanceTest { + + private static final Logger log = Logger.getLogger(ServerProviderConformanceTest.class.getName()); + private static final int NUM_RUNS_EACH_TEST = 10; /** @@ -2790,7 +2793,7 @@ public abstract class ServerProviderConformanceTest { serverProvider.release(); for (int i = 0; i < NUM_RUNS_EACH_TEST; ++i) { - System.out.println("Test run #" + i); + log.fine("Test run #" + i); requestHandler.reset(adapter.newResponseContent()); final U client = adapter.newClient(serverProvider); final boolean withRequestContent = requestType == RequestType.WITH_CONTENT; diff --git a/jdisc_core/src/test/java/com/yahoo/jdisc/RequestTestCase.java b/jdisc_core/src/test/java/com/yahoo/jdisc/RequestTestCase.java index e00bdae153d..cd5e07f1224 100644 --- a/jdisc_core/src/test/java/com/yahoo/jdisc/RequestTestCase.java +++ b/jdisc_core/src/test/java/com/yahoo/jdisc/RequestTestCase.java @@ -297,9 +297,10 @@ public class RequestTestCase { public RequestHandler resolveHandler(Request request) { this.asServer = request.isServerRequest(); RequestHandler requestHandler = new MyRequestHandler(); - request.setBindingMatch(new BindingMatch<>( - new UriPattern("http://*/*").match(request.getUri()), - requestHandler)); + UriPattern pattern = new UriPattern("http://*/*"); + request.setBindingMatch(new BindingMatch<>(pattern.match(request.getUri()), + requestHandler, + pattern)); return requestHandler; } diff --git a/jdisc_core/src/test/java/com/yahoo/jdisc/application/BindingMatchTestCase.java b/jdisc_core/src/test/java/com/yahoo/jdisc/application/BindingMatchTestCase.java index 582a1c6685e..21a3ae08c49 100644 --- a/jdisc_core/src/test/java/com/yahoo/jdisc/application/BindingMatchTestCase.java +++ b/jdisc_core/src/test/java/com/yahoo/jdisc/application/BindingMatchTestCase.java @@ -18,32 +18,36 @@ public class BindingMatchTestCase { @Test public void requireThatAccessorsWork() { Object obj = new Object(); + UriPattern pattern = new UriPattern("http://*/*"); BindingMatch<Object> match = new BindingMatch<>( - new UriPattern("http://*/*").match(URI.create("http://localhost:69/status.html")), - obj); + pattern.match(URI.create("http://localhost:69/status.html")), + obj, pattern); assertSame(obj, match.target()); assertEquals(3, match.groupCount()); assertEquals("localhost", match.group(0)); assertEquals("69", match.group(1)); assertEquals("status.html", match.group(2)); + assertEquals(pattern, match.matched()); } @Test public void requireThatConstructorArgumentsCanNotBeNull() { try { - new BindingMatch<>(null, null); + new BindingMatch<>(null, null, null); fail(); } catch (NullPointerException e) { } try { - new BindingMatch<>(new UriPattern("http://*/*").match(URI.create("http://localhost/")), null); + UriPattern pattern = new UriPattern("http://*/*"); + new BindingMatch<>(pattern.match(URI.create("http://localhost/")), null, pattern); fail(); } catch (NullPointerException e) { } try { - new BindingMatch<>(null, new Object()); + UriPattern pattern = new UriPattern("http://*/*"); + new BindingMatch<>(null, new Object(), pattern); fail(); } catch (NullPointerException e) { diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactory.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactory.java index 96180f48229..7ec51f35b74 100644 --- a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactory.java +++ b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactory.java @@ -1,55 +1,36 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.jdisc.http.server.jetty; -import com.google.common.base.Preconditions; import com.google.inject.Inject; import com.yahoo.jdisc.Metric; import com.yahoo.jdisc.http.ConnectorConfig; import com.yahoo.jdisc.http.ConnectorConfig.Ssl; import com.yahoo.jdisc.http.ConnectorConfig.Ssl.PemKeyStore; import com.yahoo.jdisc.http.SecretStore; -import com.yahoo.jdisc.http.ssl.ReaderForPath; -import com.yahoo.jdisc.http.ssl.SslKeyStore; import com.yahoo.jdisc.http.ssl.pem.PemSslKeyStore; import org.eclipse.jetty.http.HttpVersion; -import org.eclipse.jetty.server.ConnectionFactory; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpConnectionFactory; import org.eclipse.jetty.server.SecureRequestCustomizer; import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.ServerConnectionStatistics; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.SslConnectionFactory; import org.eclipse.jetty.util.ssl.SslContextFactory; -import javax.servlet.ServletRequest; import java.io.IOException; -import java.io.Reader; -import java.lang.reflect.Field; -import java.net.Socket; -import java.net.SocketException; -import java.nio.channels.Channels; -import java.nio.channels.FileChannel; +import java.io.UncheckedIOException; import java.nio.channels.ServerSocketChannel; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.security.KeyStore; -import java.util.Map; -import java.util.Optional; -import java.util.TreeMap; -import java.util.function.Supplier; -import java.util.logging.Level; import java.util.logging.Logger; -import static com.google.common.io.Closeables.closeQuietly; import static com.yahoo.jdisc.http.ConnectorConfig.Ssl.KeyStoreType.Enum.JKS; import static com.yahoo.jdisc.http.ConnectorConfig.Ssl.KeyStoreType.Enum.PEM; -import static com.yahoo.jdisc.http.server.jetty.Exceptions.throwUnchecked; /** * @author Einar M R Rosenvinge + * @author bjorncs */ public class ConnectorFactory { @@ -71,12 +52,30 @@ public class ConnectorFactory { ConnectorConfig.Ssl ssl = config.ssl(); if (ssl.keyStoreType() == JKS) { - if (! ssl.pemKeyStore().keyPath().isEmpty() || ! ssl.pemKeyStore().certificatePath().isEmpty()) + if (!ssl.pemKeyStore().keyPath().isEmpty() || ! ssl.pemKeyStore().certificatePath().isEmpty()) { throw new IllegalArgumentException("pemKeyStore attributes can not be set when keyStoreType is JKS."); + } + if (ssl.keyDbKey().isEmpty()) { + throw new IllegalArgumentException("Missing password for JKS keystore"); + } } if (ssl.keyStoreType() == PEM) { - if (! ssl.keyStorePath().isEmpty()) + if (! ssl.keyStorePath().isEmpty()) { throw new IllegalArgumentException("keyStorePath can not be set when keyStoreType is PEM"); + } + if (!ssl.keyDbKey().isEmpty()) { + // TODO Make an error once there are separate passwords for truststore and keystore + log.warning("Encrypted PEM key stores are not supported. Password is only applied to truststore"); + } + if (ssl.pemKeyStore().certificatePath().isEmpty()) { + throw new IllegalArgumentException("Missing certificate path."); + } + if (ssl.pemKeyStore().keyPath().isEmpty()) { + throw new IllegalArgumentException("Missing key path."); + } + } + if (!ssl.trustStorePath().isEmpty() && ssl.useTrustStorePassword() && ssl.keyDbKey().isEmpty()) { + throw new IllegalArgumentException("Missing password for JKS truststore"); } } @@ -84,11 +83,11 @@ public class ConnectorFactory { return connectorConfig; } - public ServerConnector createConnector(final Metric metric, final Server server, final ServerSocketChannel ch, Map<Path, FileChannel> keyStoreChannels) { + public ServerConnector createConnector(final Metric metric, final Server server, final ServerSocketChannel ch) { ServerConnector connector; if (connectorConfig.ssl().enabled()) { connector = new JDiscServerConnector(connectorConfig, metric, server, ch, - newSslConnectionFactory(keyStoreChannels), + newSslConnectionFactory(), newHttpConnectionFactory()); } else { connector = new JDiscServerConnector(connectorConfig, metric, server, ch, @@ -125,7 +124,7 @@ public class ConnectorFactory { } //TODO: does not support loading non-yahoo readable JKS key stores. - private SslConnectionFactory newSslConnectionFactory(Map<Path, FileChannel> keyStoreChannels) { + private SslConnectionFactory newSslConnectionFactory() { Ssl sslConfig = connectorConfig.ssl(); SslContextFactory factory = new SslContextFactory(); @@ -172,25 +171,24 @@ public class ConnectorFactory { factory.setIncludeCipherSuites(ciphs); } - Optional<String> keyDbPassword = secret(sslConfig.keyDbKey()); + String keyDbPassword = sslConfig.keyDbKey(); switch (sslConfig.keyStoreType()) { case PEM: - factory.setKeyStore(getKeyStore(sslConfig.pemKeyStore(), keyStoreChannels)); - if (keyDbPassword.isPresent()) - log.warning("Encrypted PEM key stores are not supported."); + factory.setKeyStore(createPemKeyStore(sslConfig.pemKeyStore())); break; case JKS: factory.setKeyStorePath(sslConfig.keyStorePath()); factory.setKeyStoreType(sslConfig.keyStoreType().toString()); - factory.setKeyStorePassword(keyDbPassword.orElseThrow(passwordRequiredForJKSKeyStore("key"))); + factory.setKeyStorePassword(secretStore.getSecret(keyDbPassword)); break; } if (!sslConfig.trustStorePath().isEmpty()) { factory.setTrustStorePath(sslConfig.trustStorePath()); factory.setTrustStoreType(sslConfig.trustStoreType().toString()); - if (sslConfig.useTrustStorePassword()) - factory.setTrustStorePassword(keyDbPassword.orElseThrow(passwordRequiredForJKSKeyStore("trust"))); + if (sslConfig.useTrustStorePassword()) { + factory.setTrustStorePassword(secretStore.getSecret(keyDbPassword)); + } } factory.setKeyManagerFactoryAlgorithm(sslConfig.sslKeyManagerFactoryAlgorithm()); @@ -198,162 +196,17 @@ public class ConnectorFactory { return new SslConnectionFactory(factory, HttpVersion.HTTP_1_1.asString()); } - /** Returns the secret password with the given name, or empty if the password name is null or empty */ - private Optional<String> secret(String keyname) { - return Optional.of(keyname).filter(key -> !key.isEmpty()).map(secretStore::getSecret); - } - - @SuppressWarnings("ThrowableInstanceNeverThrown") - private Supplier<RuntimeException> passwordRequiredForJKSKeyStore(String type) { - return () -> new RuntimeException(String.format("Password is required for JKS %s store", type)); - } - - private KeyStore getKeyStore(PemKeyStore pemKeyStore, Map<Path, FileChannel> keyStoreChannels) { - Preconditions.checkArgument(!pemKeyStore.certificatePath().isEmpty(), "Missing certificate path."); - Preconditions.checkArgument(!pemKeyStore.keyPath().isEmpty(), "Missing key path."); - - class KeyStoreReaderForPath implements AutoCloseable { - private final Optional<FileChannel> channel; - public final ReaderForPath readerForPath; - - - KeyStoreReaderForPath(String pathString) { - Path path = Paths.get(pathString); - channel = Optional.ofNullable(keyStoreChannels.get(path)); - readerForPath = new ReaderForPath(channel.map(this::getReader).orElseGet(() -> getReader(path)), path); - } - - private Reader getReader(FileChannel channel) { - try { - channel.position(0); - return Channels.newReader(channel, StandardCharsets.UTF_8.newDecoder(), -1); - } catch (IOException e) { - throw throwUnchecked(e); - } - - } - - private Reader getReader(Path path) { - try { - return Files.newBufferedReader(path); - } catch (IOException e) { - throw new RuntimeException("Failed opening " + path, e); - } - } - - @Override - public void close() { - //channels are reused - if (!channel.isPresent()) { - closeQuietly(readerForPath.reader); - } - } - } - - try (KeyStoreReaderForPath certificateReader = new KeyStoreReaderForPath(pemKeyStore.certificatePath()); - KeyStoreReaderForPath keyReader = new KeyStoreReaderForPath(pemKeyStore.keyPath())) { - SslKeyStore keyStore = new PemSslKeyStore( - new com.yahoo.jdisc.http.ssl.pem.PemKeyStore.KeyStoreLoadParameter( - certificateReader.readerForPath, keyReader.readerForPath)); - return keyStore.loadJavaKeyStore(); + private static KeyStore createPemKeyStore(PemKeyStore pemKeyStore) { + try { + Path certificatePath = Paths.get(pemKeyStore.certificatePath()); + Path keyPath = Paths.get(pemKeyStore.keyPath()); + return new PemSslKeyStore(certificatePath, keyPath) + .loadJavaKeyStore(); + } catch (IOException e) { + throw new UncheckedIOException(e); } catch (Exception e) { throw new RuntimeException("Failed setting up key store for " + pemKeyStore.keyPath() + ", " + pemKeyStore.certificatePath(), e); } } - public static class JDiscServerConnector extends ServerConnector { - public static final String REQUEST_ATTRIBUTE = JDiscServerConnector.class.getName(); - private final static Logger log = Logger.getLogger(JDiscServerConnector.class.getName()); - private final Metric.Context metricCtx; - private final ServerConnectionStatistics statistics; - private final boolean tcpKeepAlive; - private final boolean tcpNoDelay; - private final ServerSocketChannel channelOpenedByActivator; - - private JDiscServerConnector(ConnectorConfig config, Metric metric, Server server, - ServerSocketChannel channelOpenedByActivator, ConnectionFactory... factories) { - super(server, factories); - this.channelOpenedByActivator = channelOpenedByActivator; - this.tcpKeepAlive = config.tcpKeepAliveEnabled(); - this.tcpNoDelay = config.tcpNoDelay(); - this.metricCtx = createMetricContext(config, metric); - - this.statistics = new ServerConnectionStatistics(); - addBean(statistics); - } - - private Metric.Context createMetricContext(ConnectorConfig config, Metric metric) { - Map<String, Object> props = new TreeMap<>(); - props.put(JettyHttpServer.Metrics.NAME_DIMENSION, config.name()); - props.put(JettyHttpServer.Metrics.PORT_DIMENSION, config.listenPort()); - return metric.createContext(props); - } - - @Override - protected void configure(final Socket socket) { - super.configure(socket); - try { - socket.setKeepAlive(tcpKeepAlive); - socket.setTcpNoDelay(tcpNoDelay); - } catch (SocketException ignored) { - } - } - - @Override - public void open() throws IOException { - if (channelOpenedByActivator == null) { - log.log(Level.INFO, "No channel set by activator, opening channel ourselves."); - try { - super.open(); - } catch (RuntimeException e) { - log.log(Level.SEVERE, "failed org.eclipse.jetty.server.Server open() with port "+getPort()); - throw e; - } - return; - } - log.log(Level.INFO, "Using channel set by activator: " + channelOpenedByActivator); - - channelOpenedByActivator.socket().setReuseAddress(getReuseAddress()); - int localPort = channelOpenedByActivator.socket().getLocalPort(); - try { - uglySetLocalPort(localPort); - } catch (NoSuchFieldException | IllegalAccessException e) { - throw new RuntimeException("Could not set local port.", e); - } - if (localPort <= 0) { - throw new IOException("Server channel not bound"); - } - addBean(channelOpenedByActivator); - channelOpenedByActivator.configureBlocking(true); - addBean(channelOpenedByActivator); - - try { - uglySetChannel(channelOpenedByActivator); - } catch (NoSuchFieldException | IllegalAccessException e) { - throw new RuntimeException("Could not set server channel.", e); - } - } - - private void uglySetLocalPort(int localPort) throws NoSuchFieldException, IllegalAccessException { - Field localPortField = ServerConnector.class.getDeclaredField("_localPort"); - localPortField.setAccessible(true); - localPortField.set(this, localPort); - } - - private void uglySetChannel(ServerSocketChannel channelOpenedByActivator) throws NoSuchFieldException, - IllegalAccessException { - Field acceptChannelField = ServerConnector.class.getDeclaredField("_acceptChannel"); - acceptChannelField.setAccessible(true); - acceptChannelField.set(this, channelOpenedByActivator); - } - - public ServerConnectionStatistics getStatistics() { return statistics; } - - public Metric.Context getMetricContext() { return metricCtx; } - - public static JDiscServerConnector fromRequest(ServletRequest request) { - return (JDiscServerConnector)request.getAttribute(REQUEST_ATTRIBUTE); - } - } - } diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscHttpServlet.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscHttpServlet.java index 543cf8ab43e..27f72c7b4bf 100644 --- a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscHttpServlet.java +++ b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscHttpServlet.java @@ -20,7 +20,6 @@ import java.util.logging.Level; import java.util.logging.Logger; import static com.yahoo.jdisc.http.core.HttpServletRequestUtils.getConnection; -import static com.yahoo.jdisc.http.server.jetty.ConnectorFactory.JDiscServerConnector; /** * @author Simon Thoresen Hult diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscServerConnector.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscServerConnector.java new file mode 100644 index 00000000000..8dd50074c32 --- /dev/null +++ b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscServerConnector.java @@ -0,0 +1,122 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jdisc.http.server.jetty; + +import com.yahoo.jdisc.Metric; +import com.yahoo.jdisc.http.ConnectorConfig; +import org.eclipse.jetty.server.ConnectionFactory; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnectionStatistics; +import org.eclipse.jetty.server.ServerConnector; + +import javax.servlet.ServletRequest; +import java.io.IOException; +import java.lang.reflect.Field; +import java.net.Socket; +import java.net.SocketException; +import java.nio.channels.ServerSocketChannel; +import java.util.Map; +import java.util.TreeMap; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * @author bjorncs + */ +class JDiscServerConnector extends ServerConnector { + public static final String REQUEST_ATTRIBUTE = JDiscServerConnector.class.getName(); + private final static Logger log = Logger.getLogger(JDiscServerConnector.class.getName()); + private final Metric.Context metricCtx; + private final ServerConnectionStatistics statistics; + private final boolean tcpKeepAlive; + private final boolean tcpNoDelay; + private final ServerSocketChannel channelOpenedByActivator; + + JDiscServerConnector(ConnectorConfig config, Metric metric, Server server, + ServerSocketChannel channelOpenedByActivator, ConnectionFactory... factories) { + super(server, factories); + this.channelOpenedByActivator = channelOpenedByActivator; + this.tcpKeepAlive = config.tcpKeepAliveEnabled(); + this.tcpNoDelay = config.tcpNoDelay(); + this.metricCtx = createMetricContext(config, metric); + + this.statistics = new ServerConnectionStatistics(); + addBean(statistics); + } + + private Metric.Context createMetricContext(ConnectorConfig config, Metric metric) { + Map<String, Object> props = new TreeMap<>(); + props.put(JettyHttpServer.Metrics.NAME_DIMENSION, config.name()); + props.put(JettyHttpServer.Metrics.PORT_DIMENSION, config.listenPort()); + return metric.createContext(props); + } + + @Override + protected void configure(final Socket socket) { + super.configure(socket); + try { + socket.setKeepAlive(tcpKeepAlive); + socket.setTcpNoDelay(tcpNoDelay); + } catch (SocketException ignored) { + } + } + + @Override + public void open() throws IOException { + if (channelOpenedByActivator == null) { + log.log(Level.INFO, "No channel set by activator, opening channel ourselves."); + try { + super.open(); + } catch (RuntimeException e) { + log.log(Level.SEVERE, "failed org.eclipse.jetty.server.Server open() with port " + getPort()); + throw e; + } + return; + } + log.log(Level.INFO, "Using channel set by activator: " + channelOpenedByActivator); + + channelOpenedByActivator.socket().setReuseAddress(getReuseAddress()); + int localPort = channelOpenedByActivator.socket().getLocalPort(); + try { + uglySetLocalPort(localPort); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException("Could not set local port.", e); + } + if (localPort <= 0) { + throw new IOException("Server channel not bound"); + } + addBean(channelOpenedByActivator); + channelOpenedByActivator.configureBlocking(true); + addBean(channelOpenedByActivator); + + try { + uglySetChannel(channelOpenedByActivator); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException("Could not set server channel.", e); + } + } + + private void uglySetLocalPort(int localPort) throws NoSuchFieldException, IllegalAccessException { + Field localPortField = ServerConnector.class.getDeclaredField("_localPort"); + localPortField.setAccessible(true); + localPortField.set(this, localPort); + } + + private void uglySetChannel(ServerSocketChannel channelOpenedByActivator) throws NoSuchFieldException, + IllegalAccessException { + Field acceptChannelField = ServerConnector.class.getDeclaredField("_acceptChannel"); + acceptChannelField.setAccessible(true); + acceptChannelField.set(this, channelOpenedByActivator); + } + + public ServerConnectionStatistics getStatistics() { + return statistics; + } + + public Metric.Context getMetricContext() { + return metricCtx; + } + + public static JDiscServerConnector fromRequest(ServletRequest request) { + return (JDiscServerConnector) request.getAttribute(REQUEST_ATTRIBUTE); + } +} diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JettyHttpServer.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JettyHttpServer.java index 7feca14ef29..7bff685e780 100644 --- a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JettyHttpServer.java +++ b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JettyHttpServer.java @@ -44,15 +44,11 @@ import javax.servlet.DispatcherType; import java.lang.management.ManagementFactory; import java.net.BindException; import java.net.MalformedURLException; -import java.nio.channels.FileChannel; import java.nio.channels.ServerSocketChannel; -import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.EnumSet; import java.util.List; -import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -62,9 +58,6 @@ import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; -import static com.yahoo.jdisc.http.server.jetty.ConnectorFactory.JDiscServerConnector; -import static com.yahoo.jdisc.http.server.jetty.Exceptions.throwUnchecked; - /** * @author Simon Thoresen Hult * @author bjorncs @@ -147,11 +140,9 @@ public class JettyHttpServer extends AbstractServerProvider { setupJmx(server, serverConfig); ((QueuedThreadPool)server.getThreadPool()).setMaxThreads(serverConfig.maxWorkerThreads()); - Map<Path, FileChannel> keyStoreChannels = getKeyStoreFileChannels(osgiFramework.bundleContext()); - for (ConnectorFactory connectorFactory : connectorFactories.allComponents()) { ServerSocketChannel preBoundChannel = getChannelFromServiceLayer(connectorFactory.getConnectorConfig().listenPort(), osgiFramework.bundleContext()); - server.addConnector(connectorFactory.createConnector(metric, server, preBoundChannel, keyStoreChannels)); + server.addConnector(connectorFactory.createConnector(metric, server, preBoundChannel)); listenedPorts.add(connectorFactory.getConnectorConfig().listenPort()); } @@ -257,43 +248,6 @@ public class JettyHttpServer extends AbstractServerProvider { return "/" + servletPathsConfig.servlets(id.stringValue()).path(); } - // Ugly trick to get generic type literal. - @SuppressWarnings("unchecked") - private static final Class<Map<?, ?>> mapClass = (Class<Map<?, ?>>) (Object) Map.class; - - private Map<Path, FileChannel> getKeyStoreFileChannels(BundleContext bundleContext) { - try { - Collection<ServiceReference<Map<?, ?>>> serviceReferences = bundleContext.getServiceReferences(mapClass, - "(role=com.yahoo.container.standalone.StandaloneContainerActivator.KeyStoreFileChannels)"); - - if (serviceReferences == null || serviceReferences.isEmpty()) - return Collections.emptyMap(); - - if (serviceReferences.size() != 1) - throw new IllegalStateException("Multiple KeyStoreFileChannels registered"); - - return getKeyStoreFileChannels(bundleContext, serviceReferences.iterator().next()); - } catch (InvalidSyntaxException e) { - throw throwUnchecked(e); - } - } - - @SuppressWarnings("unchecked") - private Map<Path, FileChannel> getKeyStoreFileChannels(BundleContext bundleContext, ServiceReference<Map<?, ?>> keyStoreFileChannelReference) { - Map<?, ?> fileChannelMap = bundleContext.getService(keyStoreFileChannelReference); - try { - if (fileChannelMap == null) - return Collections.emptyMap(); - - Map<Path, FileChannel> result = (Map<Path, FileChannel>) fileChannelMap; - log.fine("Using file channel for " + result.keySet()); - return result; - } finally { - //if we change this to be anything other than a simple map, we should hold the reference as long as the object is in use. - bundleContext.ungetService(keyStoreFileChannelReference); - } - } - private ServletContextHandler createServletContextHandler() { ServletContextHandler servletContextHandler = new ServletContextHandler(ServletContextHandler.NO_SECURITY | ServletContextHandler.NO_SESSIONS); servletContextHandler.setContextPath("/"); diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/ssl/ReaderForPath.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/ssl/ReaderForPath.java deleted file mode 100644 index b04d91d7403..00000000000 --- a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/ssl/ReaderForPath.java +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.jdisc.http.ssl; - -import java.io.Reader; -import java.nio.file.Path; - -/** - * A reader along with the path used to construct it. - * - * @author tonytv - */ -public final class ReaderForPath { - - public final Reader reader; - public final Path path; - - public ReaderForPath(Reader reader, Path path) { - this.reader = reader; - this.path = path; - } - -} diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/ssl/SslKeyStore.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/ssl/SslKeyStore.java index 1201bb08afc..c282c94c1bd 100644 --- a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/ssl/SslKeyStore.java +++ b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/ssl/SslKeyStore.java @@ -1,29 +1,12 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.jdisc.http.ssl; -import java.io.IOException; import java.security.KeyStore; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.cert.CertificateException; -import java.util.Optional; /** * - * @author <a href="mailto:charlesk@yahoo-inc.com">Charles Kim</a> + * @author bjorncs */ -public abstract class SslKeyStore { - - private Optional<String> keyStorePassword = Optional.empty(); - - public Optional<String> getKeyStorePassword() { - return keyStorePassword; - } - - public void setKeyStorePassword(String keyStorePassword) { - this.keyStorePassword = Optional.of(keyStorePassword); - } - - public abstract KeyStore loadJavaKeyStore() throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException; - +public interface SslKeyStore { + KeyStore loadJavaKeyStore() throws Exception; } diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/ssl/jks/JKSKeyStore.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/ssl/jks/JksKeyStore.java index 2ca53b731c3..9cb040fb97d 100644 --- a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/ssl/jks/JKSKeyStore.java +++ b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/ssl/jks/JksKeyStore.java @@ -13,22 +13,33 @@ import java.security.NoSuchAlgorithmException; import java.security.cert.CertificateException; /** - * @author tonytv + * @author Tony Vaagenes + * @author bjorncs */ -public class JKSKeyStore extends SslKeyStore { +public class JksKeyStore implements SslKeyStore { - private static final String keyStoreType = "JKS"; + private static final String KEY_STORE_TYPE = "JKS"; private final Path keyStoreFile; + private final String keyStorePassword; - public JKSKeyStore(Path keyStoreFile) { + public JksKeyStore(Path keyStoreFile) { + this(keyStoreFile, null); + } + + public JksKeyStore(Path keyStoreFile, String keyStorePassword) { this.keyStoreFile = keyStoreFile; + this.keyStorePassword = keyStorePassword; + } + + public String getKeyStorePassword() { + return keyStorePassword; } @Override public KeyStore loadJavaKeyStore() throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException { try(InputStream stream = Files.newInputStream(keyStoreFile)) { - KeyStore keystore = KeyStore.getInstance(keyStoreType); - keystore.load(stream, getKeyStorePassword().map(String::toCharArray).orElse(null)); + KeyStore keystore = KeyStore.getInstance(KEY_STORE_TYPE); + keystore.load(stream, keyStorePassword != null ? keyStorePassword.toCharArray() : null); return keystore; } } diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/ssl/pem/PemKeyStore.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/ssl/pem/PemKeyStore.java index 21272f202ea..b52e923662f 100644 --- a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/ssl/pem/PemKeyStore.java +++ b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/ssl/pem/PemKeyStore.java @@ -2,7 +2,6 @@ package com.yahoo.jdisc.http.ssl.pem; import com.google.common.base.Preconditions; -import com.yahoo.jdisc.http.ssl.ReaderForPath; import org.bouncycastle.asn1.pkcs.PrivateKeyInfo; import org.bouncycastle.cert.X509CertificateHolder; import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter; @@ -16,9 +15,13 @@ import javax.annotation.concurrent.GuardedBy; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.Reader; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; import java.security.Key; +import java.security.KeyStore; import java.security.KeyStore.LoadStoreParameter; -import java.security.KeyStore.ProtectionParameter; import java.security.KeyStoreException; import java.security.KeyStoreSpi; import java.security.NoSuchAlgorithmException; @@ -58,10 +61,6 @@ public class PemKeyStore extends KeyStoreSpi { @GuardedBy("this") private final Map<String, Certificate> aliasToCertificate = new LinkedHashMap<>(); - - public PemKeyStore() {} - - /** * The user is responsible for closing any readers given in the parameter. */ @@ -287,30 +286,51 @@ public class PemKeyStore extends KeyStoreSpi { } } - public static class PemLoadStoreParameter implements LoadStoreParameter { - private PemLoadStoreParameter() {} + // A reader along with the path used to construct it. + private static class ReaderForPath { + final Reader reader; + final Path path; - @Override - public ProtectionParameter getProtectionParameter() { - return null; + private ReaderForPath(Reader reader, Path path) { + this.reader = reader; + this.path = path; + } + + static ReaderForPath of(Path path) { + try { + return new ReaderForPath(Files.newBufferedReader(path), path); + } catch (IOException e) { + throw new UncheckedIOException(e); + } } } - public static final class KeyStoreLoadParameter extends PemLoadStoreParameter { - public final ReaderForPath certificateReader; - public final ReaderForPath keyReader; + static class TrustStoreLoadParameter implements KeyStore.LoadStoreParameter { + final ReaderForPath certificateReader; - public KeyStoreLoadParameter(ReaderForPath certificateReader, ReaderForPath keyReader) { - this.certificateReader = certificateReader; - this.keyReader = keyReader; + TrustStoreLoadParameter(Path certificateReader) { + this.certificateReader = ReaderForPath.of(certificateReader); + } + + @Override + public KeyStore.ProtectionParameter getProtectionParameter() { + return null; } } - public static final class TrustStoreLoadParameter extends PemLoadStoreParameter { - public final ReaderForPath certificateReader; + static class KeyStoreLoadParameter implements KeyStore.LoadStoreParameter { + final ReaderForPath certificateReader; + final ReaderForPath keyReader; + + KeyStoreLoadParameter(Path certificateReader, Path keyReader) { + this.certificateReader = ReaderForPath.of(certificateReader); + this.keyReader = ReaderForPath.of(keyReader); + } - public TrustStoreLoadParameter(ReaderForPath certificateReader) { - this.certificateReader = certificateReader; + @Override + public KeyStore.ProtectionParameter getProtectionParameter() { + return null; } } + } diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/ssl/pem/PemSslKeyStore.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/ssl/pem/PemSslKeyStore.java index bbb8232f78e..9f0a635f7c1 100644 --- a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/ssl/pem/PemSslKeyStore.java +++ b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/ssl/pem/PemSslKeyStore.java @@ -3,11 +3,12 @@ package com.yahoo.jdisc.http.ssl.pem; import com.yahoo.jdisc.http.ssl.SslKeyStore; import com.yahoo.jdisc.http.ssl.pem.PemKeyStore.KeyStoreLoadParameter; -import com.yahoo.jdisc.http.ssl.pem.PemKeyStore.PemLoadStoreParameter; import com.yahoo.jdisc.http.ssl.pem.PemKeyStore.TrustStoreLoadParameter; import java.io.IOException; +import java.nio.file.Path; import java.security.KeyStore; +import java.security.KeyStore.LoadStoreParameter; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.security.Provider; @@ -18,34 +19,32 @@ import java.security.cert.CertificateException; * Responsible for creating pem key stores. * * @author Tony Vaagenes + * @author bjorncs */ -public class PemSslKeyStore extends SslKeyStore { +public class PemSslKeyStore implements SslKeyStore { static { Security.addProvider(new PemKeyStoreProvider()); } - private static final String keyStoreType = "PEM"; - private final PemLoadStoreParameter loadParameter; + private static final String KEY_STORE_TYPE = "PEM"; + + private final LoadStoreParameter loadParameter; private KeyStore keyStore; - public PemSslKeyStore(KeyStoreLoadParameter loadParameter) { - this.loadParameter = loadParameter; + public PemSslKeyStore(Path certificatePath, Path keyPath) { + this.loadParameter = new KeyStoreLoadParameter(certificatePath, keyPath); } - public PemSslKeyStore(TrustStoreLoadParameter loadParameter) { - this.loadParameter = loadParameter; + public PemSslKeyStore(Path certificatePath) { + this.loadParameter = new TrustStoreLoadParameter(certificatePath); } @Override public KeyStore loadJavaKeyStore() throws KeyStoreException, IOException, NoSuchAlgorithmException, CertificateException { - if (getKeyStorePassword().isPresent()) { - throw new UnsupportedOperationException("PEM key store with password is currently not supported. Please file a feature request."); - } - //cached since Reader(in loadParameter) can only be used one time. if (keyStore == null) { - keyStore = KeyStore.getInstance(keyStoreType); + keyStore = KeyStore.getInstance(KEY_STORE_TYPE); keyStore.load(loadParameter); } return keyStore; @@ -61,6 +60,6 @@ public class PemSslKeyStore extends SslKeyStore { super(NAME, VERSION, DESCRIPTION); putService(new Service(this, "KeyStore", "PEM", PemKeyStore. class.getName(), PemKeyStore.aliases, PemKeyStore.attributes)); } - } + } diff --git a/jdisc_http_service/src/main/resources/configdefinitions/jdisc.http.connector.def b/jdisc_http_service/src/main/resources/configdefinitions/jdisc.http.connector.def index 8d709cb8ab1..59893753bea 100644 --- a/jdisc_http_service/src/main/resources/configdefinitions/jdisc.http.connector.def +++ b/jdisc_http_service/src/main/resources/configdefinitions/jdisc.http.connector.def @@ -45,8 +45,9 @@ ssl.enabled bool default=false # The name of the key to the password to the key store if in the secret store, if JKS is used. # Must be empty with PEM -# By default this is also used to look up the password to the trust store. +# By default this is also used to look up the password to the trust store. ssl.keyDbKey string default="" +# TODO Rename keyDbKey to keyStorePassword after introducing custom services.xml syntax # Names of protocols to exclude. ssl.excludeProtocol[].name string @@ -74,9 +75,12 @@ ssl.trustStoreType enum { JKS } default=JKS # JKS only - the path to the truststore. ssl.trustStorePath string default="" +# TODO Add separate config for truststore password + # Whether we should use keyDbKey as password to the trust store (true, default), # or use no password with the trust store (false) ssl.useTrustStorePassword bool default=true +# TODO Fix broken semantics with truststore and keystore password in Vespa 7 / Vespa 8 # The algorithm name used by the KeyManagerFactory. ssl.sslKeyManagerFactoryAlgorithm string default="SunX509" diff --git a/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/SslContextFactory.java b/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/SslContextFactory.java index e71bd190a37..5dd5dca1667 100644 --- a/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/SslContextFactory.java +++ b/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/SslContextFactory.java @@ -1,16 +1,11 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.jdisc.http; -import com.yahoo.jdisc.http.ssl.SslKeyStore; +import com.yahoo.jdisc.http.ssl.jks.JksKeyStore; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManagerFactory; -import java.io.IOException; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.UnrecoverableKeyException; -import java.security.cert.CertificateException; import java.util.logging.Level; import java.util.logging.Logger; @@ -32,16 +27,16 @@ public class SslContextFactory { return this.sslContext; } - public static SslContextFactory newInstanceFromTrustStore(SslKeyStore trustStore) { + public static SslContextFactory newInstanceFromTrustStore(JksKeyStore trustStore) { return newInstance(DEFAULT_ALGORITHM, DEFAULT_PROTOCOL, null, trustStore); } - public static SslContextFactory newInstance(SslKeyStore trustStore, SslKeyStore keyStore) { + public static SslContextFactory newInstance(JksKeyStore trustStore, JksKeyStore keyStore) { return newInstance(DEFAULT_ALGORITHM, DEFAULT_PROTOCOL, keyStore, trustStore); } public static SslContextFactory newInstance(String sslAlgorithm, String sslProtocol, - SslKeyStore keyStore, SslKeyStore trustStore) { + JksKeyStore keyStore, JksKeyStore trustStore) { log.fine("Configuring SSLContext..."); log.fine("Using " + sslAlgorithm + " algorithm."); try { @@ -60,15 +55,14 @@ public class SslContextFactory { /** * Used for the key store, which contains the SSL cert and private key. */ - public static javax.net.ssl.KeyManager[] getKeyManagers(SslKeyStore keyStore, - String sslAlgorithm) - throws NoSuchAlgorithmException, CertificateException, IOException, UnrecoverableKeyException, - KeyStoreException { + public static javax.net.ssl.KeyManager[] getKeyManagers(JksKeyStore keyStore, + String sslAlgorithm) throws Exception { KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(sslAlgorithm); + String keyStorePassword = keyStore.getKeyStorePassword(); keyManagerFactory.init( keyStore.loadJavaKeyStore(), - keyStore.getKeyStorePassword().map(String::toCharArray).orElse(null)); + keyStorePassword != null ? keyStorePassword.toCharArray() : null); log.fine("KeyManagerFactory initialized with keystore"); return keyManagerFactory.getKeyManagers(); } @@ -77,9 +71,9 @@ public class SslContextFactory { * Used for the trust store, which contains certificates from other parties that you expect to communicate with, * or from Certificate Authorities that you trust to identify other parties. */ - public static javax.net.ssl.TrustManager[] getTrustManagers(SslKeyStore trustStore, + public static javax.net.ssl.TrustManager[] getTrustManagers(JksKeyStore trustStore, String sslAlgorithm) - throws NoSuchAlgorithmException, CertificateException, IOException, KeyStoreException { + throws Exception { TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(sslAlgorithm); trustManagerFactory.init(trustStore.loadJavaKeyStore()); diff --git a/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactoryTest.java b/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactoryTest.java index 25457c0c6c6..fceec51231a 100644 --- a/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactoryTest.java +++ b/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactoryTest.java @@ -1,33 +1,20 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.jdisc.http.server.jetty; -import com.google.common.collect.ImmutableMap; import com.yahoo.jdisc.Metric; import com.yahoo.jdisc.http.ConnectorConfig; import com.yahoo.jdisc.http.SecretStore; -import com.yahoo.jdisc.http.ssl.ReaderForPath; -import com.yahoo.jdisc.http.SslContextFactory; -import com.yahoo.jdisc.http.ssl.SslKeyStore; -import com.yahoo.jdisc.http.ssl.pem.PemKeyStore; -import com.yahoo.jdisc.http.ssl.pem.PemSslKeyStore; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.handler.AbstractHandler; import org.testng.annotations.Test; -import javax.net.ssl.SSLContext; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.net.InetSocketAddress; -import java.nio.channels.FileChannel; import java.nio.channels.ServerSocketChannel; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.StandardOpenOption; -import java.util.Collections; import java.util.Map; import static com.yahoo.jdisc.http.ConnectorConfig.Ssl; @@ -72,8 +59,8 @@ public class ConnectorFactoryTest { try { ConnectorFactory factory = new ConnectorFactory(new ConnectorConfig(new ConnectorConfig.Builder()), new ThrowingSecretStore()); - ConnectorFactory.JDiscServerConnector connector = - (ConnectorFactory.JDiscServerConnector)factory.createConnector(new DummyMetric(), server, null, Collections.emptyMap()); + JDiscServerConnector connector = + (JDiscServerConnector)factory.createConnector(new DummyMetric(), server, null); server.addConnector(connector); server.setHandler(new HelloWorldHandler()); server.start(); @@ -99,7 +86,7 @@ public class ConnectorFactoryTest { serverChannel.socket().bind(new InetSocketAddress(0)); ConnectorFactory factory = new ConnectorFactory(new ConnectorConfig(new ConnectorConfig.Builder()), new ThrowingSecretStore()); - ConnectorFactory.JDiscServerConnector connector = (ConnectorFactory.JDiscServerConnector) factory.createConnector(new DummyMetric(), server, serverChannel, Collections.emptyMap()); + JDiscServerConnector connector = (JDiscServerConnector) factory.createConnector(new DummyMetric(), server, serverChannel); server.addConnector(connector); server.setHandler(new HelloWorldHandler()); server.start(); @@ -117,63 +104,6 @@ public class ConnectorFactoryTest { } } - @Test - public void pre_bound_keystore_file_channels_are_used() throws Exception { - Path pemKeyStoreDirectory = Paths.get("src/test/resources/pem/"); - - Path certificateFile = pemKeyStoreDirectory.resolve("test.crt"); - Path privateKeyFile = pemKeyStoreDirectory.resolve("test.key"); - - Server server = new Server(); - try { - ServerSocketChannel serverChannel = ServerSocketChannel.open(); - serverChannel.socket().bind(new InetSocketAddress(0)); - - String fakeCertificatePath = "ensure-certificate-path-is-not-used-to-open-the-file"; - String fakeKeyPath = "ensure-key-path-is-not-used-to-open-the-file"; - - ConnectorConfig.Builder builder = new ConnectorConfig.Builder(); - builder.ssl( - new Ssl.Builder(). - enabled(true). - keyStoreType(PEM). - pemKeyStore(new Ssl.PemKeyStore.Builder(). - certificatePath(fakeCertificatePath). - keyPath(fakeKeyPath))); - - FileChannel certificateChannel = FileChannel.open(certificateFile, StandardOpenOption.READ); - FileChannel privateKeyChannel = FileChannel.open(privateKeyFile, StandardOpenOption.READ); - - Map<Path, FileChannel> keyStoreChannels = ImmutableMap.<Path, FileChannel>builder(). - put(Paths.get(fakeCertificatePath), certificateChannel). - put(Paths.get(fakeKeyPath), privateKeyChannel). - build(); - - - ConnectorFactory factory = new ConnectorFactory(new ConnectorConfig(builder), new ThrowingSecretStore()); - ConnectorFactory.JDiscServerConnector connector = (ConnectorFactory.JDiscServerConnector) factory.createConnector(new DummyMetric(), server, serverChannel, keyStoreChannels); - server.addConnector(connector); - server.setHandler(new HelloWorldHandler()); - server.start(); - - SslKeyStore trustStore = new PemSslKeyStore( - new PemKeyStore.TrustStoreLoadParameter( - new ReaderForPath(Files.newBufferedReader(certificateFile), certificateFile))); - - SSLContext clientSslContext = SslContextFactory.newInstanceFromTrustStore(trustStore).getServerSSLContext(); - SimpleHttpClient client = new SimpleHttpClient(clientSslContext, connector.getLocalPort(), false); - SimpleHttpClient.RequestExecutor ex = client.newGet("/ignored"); - SimpleHttpClient.ResponseValidator val = ex.execute(); - val.expectContent(equalTo("Hello world")); - } finally { - try { - server.stop(); - } catch (Exception e) { - //ignore - } - } - } - private static class HelloWorldHandler extends AbstractHandler { @Override public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { diff --git a/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerConformanceTest.java b/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerConformanceTest.java index d588ace8268..cc7095dadda 100644 --- a/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerConformanceTest.java +++ b/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/HttpServerConformanceTest.java @@ -24,6 +24,7 @@ import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeMatcher; import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import java.net.URI; @@ -33,6 +34,8 @@ import java.util.Collections; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.logging.Level; +import java.util.logging.Logger; import java.util.regex.Pattern; import static com.yahoo.jdisc.Response.Status.INTERNAL_SERVER_ERROR; @@ -49,13 +52,34 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; /** - * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a> + * @author Simon Thoresen Hult */ public class HttpServerConformanceTest extends ServerProviderConformanceTest { + private static final Logger log = Logger.getLogger(HttpServerConformanceTest.class.getName()); + private static final String REQUEST_CONTENT = "myRequestContent"; private static final String RESPONSE_CONTENT = "myResponseContent"; + @SuppressWarnings("LoggerInitializedWithForeignClass") + private static Logger httpRequestDispatchLogger = Logger.getLogger(HttpRequestDispatch.class.getName()); + private static Level httpRequestDispatchLoggerOriginalLevel; + + /* + * Reduce logging of every stack trace for {@link ServerProviderConformanceTest.ConformanceException} thrown. + * This makes the log more readable and the test faster as well. + */ + @BeforeClass + public static void reduceExcessiveLogging() { + httpRequestDispatchLoggerOriginalLevel = httpRequestDispatchLogger.getLevel(); + httpRequestDispatchLogger.setLevel(Level.SEVERE); + } + + @AfterClass + public static void restoreExcessiveLogging() { + httpRequestDispatchLogger.setLevel(httpRequestDispatchLoggerOriginalLevel); + } + @AfterClass public static void reportDiagnostics() { System.out.println( @@ -784,7 +808,7 @@ public class HttpServerConformanceTest extends ServerProviderConformanceTest { post.setProtocolVersion(client.requestVersion); request = post; } - System.out.println("executorService:" + log.fine(() -> "executorService:" + " .isShutDown()=" + executorService.isShutdown() + " .isTerminated()=" + executorService.isTerminated()); return executorService.submit(() -> client.delegate.execute(request)); diff --git a/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/TestDriver.java b/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/TestDriver.java index 8ddcd7f03ac..525cde9d8b3 100644 --- a/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/TestDriver.java +++ b/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/TestDriver.java @@ -6,9 +6,8 @@ import com.google.inject.Module; import com.yahoo.jdisc.application.ContainerBuilder; import com.yahoo.jdisc.handler.RequestHandler; import com.yahoo.jdisc.http.ConnectorConfig; -import com.yahoo.jdisc.http.ssl.jks.JKSKeyStore; import com.yahoo.jdisc.http.SslContextFactory; -import com.yahoo.jdisc.http.ssl.SslKeyStore; +import com.yahoo.jdisc.http.ssl.jks.JksKeyStore; import javax.net.ssl.SSLContext; import java.io.IOException; @@ -76,8 +75,9 @@ public class TestDriver { ConnectorConfig.Ssl sslConfig = builder.getInstance(ConnectorConfig.class).ssl(); if (!sslConfig.enabled()) return null; - SslKeyStore keyStore = new JKSKeyStore(Paths.get(sslConfig.keyStorePath())); - keyStore.setKeyStorePassword(builder.getInstance(Key.get(String.class, named("keyStorePassword")))); + JksKeyStore keyStore = new JksKeyStore( + Paths.get(sslConfig.keyStorePath()), + builder.getInstance(Key.get(String.class, named("keyStorePassword")))); return SslContextFactory.newInstanceFromTrustStore(keyStore).getServerSSLContext(); } @@ -39,7 +39,7 @@ </snapshots> <id>bintray-yahoo-maven</id> <name>bintray</name> - <url>http://yahoo.bintray.com/maven</url> + <url>https://yahoo.bintray.com/maven</url> </repository> </repositories> diff --git a/searchcore/src/tests/proton/flushengine/prepare_restart_flush_strategy/prepare_restart_flush_strategy_test.cpp b/searchcore/src/tests/proton/flushengine/prepare_restart_flush_strategy/prepare_restart_flush_strategy_test.cpp index 428cbf60996..6bc7e1b5556 100644 --- a/searchcore/src/tests/proton/flushengine/prepare_restart_flush_strategy/prepare_restart_flush_strategy_test.cpp +++ b/searchcore/src/tests/proton/flushengine/prepare_restart_flush_strategy/prepare_restart_flush_strategy_test.cpp @@ -15,7 +15,7 @@ using SimpleFlushHandler = test::DummyFlushHandler; using FlushCandidatesList = std::vector<FlushTargetCandidates>; using Config = PrepareRestartFlushStrategy::Config; -const Config DEFAULT_CFG(2.0, 4.0); +const Config DEFAULT_CFG(2.0, 0.0, 4.0); struct SimpleFlushTarget : public test::DummyFlushTarget { @@ -107,7 +107,7 @@ public: : _sortedFlushContexts(&sortedFlushContexts), _numCandidates(sortedFlushContexts.size()), _tlsStats(1000, 11, 110), - _cfg(DEFAULT_CFG) + _cfg(2.0, 3.0, 4.0) {} CandidatesBuilder &flushContexts(const FlushContext::List &sortedFlushContexts) { _sortedFlushContexts = &sortedFlushContexts; @@ -140,28 +140,35 @@ struct CandidatesFixture CandidatesFixture() : emptyContexts(), builder(emptyContexts) {} }; +void +assertCosts(double tlsReplayBytesCost, double tlsReplayOperationsCost, double flushTargetsWriteCost, const FlushTargetCandidates &candidates) +{ + EXPECT_EQUAL(tlsReplayBytesCost, candidates.getTlsReplayCost().bytesCost); + EXPECT_EQUAL(tlsReplayOperationsCost, candidates.getTlsReplayCost().operationsCost); + EXPECT_EQUAL(flushTargetsWriteCost, candidates.getFlushTargetsWriteCost()); + EXPECT_EQUAL(tlsReplayBytesCost + tlsReplayOperationsCost + flushTargetsWriteCost, candidates.getTotalCost()); +} + TEST_F("require that tls replay cost is correct for 100% replay", CandidatesFixture) { - EXPECT_EQUAL(2000, f.builder.replayEnd(110).build().getTlsReplayCost()); + TEST_DO(assertCosts(1000 * 2, 100 * 3, 0, f.builder.replayEnd(110).build())); } TEST_F("require that tls replay cost is correct for 75% replay", CandidatesFixture) { FlushContext::List contexts = ContextsBuilder().add("target1", 10, 0).add("target2", 35, 0).build(); - EXPECT_EQUAL(1500, f.builder.flushContexts(contexts).numCandidates(1).replayEnd(110). - build().getTlsReplayCost()); + TEST_DO(assertCosts(750 * 2, 75 * 3, 0, f.builder.flushContexts(contexts).numCandidates(1).replayEnd(110).build())); } TEST_F("require that tls replay cost is correct for 25% replay", CandidatesFixture) { FlushContext::List contexts = ContextsBuilder().add("target1", 10, 0).add("target2", 85, 0).build(); - EXPECT_EQUAL(500, f.builder.flushContexts(contexts).numCandidates(1).replayEnd(110). - build().getTlsReplayCost()); + TEST_DO(assertCosts(250 * 2, 25 * 3, 0, f.builder.flushContexts(contexts).numCandidates(1).replayEnd(110).build())); } TEST_F("require that tls replay cost is correct for zero operations to replay", CandidatesFixture) { - EXPECT_EQUAL(0, f.builder.replayEnd(10).build().getTlsReplayCost()); + TEST_DO(assertCosts(0, 0, 0, f.builder.replayEnd(10).build())); } TEST_F("require that flush cost is correct for zero flush targets", CandidatesFixture) @@ -172,7 +179,7 @@ TEST_F("require that flush cost is correct for zero flush targets", CandidatesFi TEST_F("require that flush cost is sum of flush targets", CandidatesFixture) { FlushContext::List contexts = ContextsBuilder().add("target1", 20, 1000).add("target2", 30, 2000).build(); - EXPECT_EQUAL(12000, f.builder.flushContexts(contexts).build().getFlushTargetsWriteCost()); + TEST_DO(assertCosts(0, 0, 1000 * 4 + 2000 * 4, f.builder.flushContexts(contexts).build())); } @@ -227,7 +234,7 @@ assertFlushContexts(const vespalib::string &expected, const FlushContext::List & * - handler1: serial numbers 10 -> 110, 1000 bytes * - handler2: serial numbers 10 -> 110, 2000 bytes * - * The cost config is: tlsReplayCost=2.0, flushTargetsWriteCost=4.0. + * The cost config is: tlsReplayByteCost=2.0, tlsReplayOperationCost=0.0, flushTargetsWriteCost=4.0. * The cost of replaying the complete TLS is then: * - handler1: 1000*2.0 = 2000 * - handler2: 2000*2.0 = 4000 diff --git a/searchcore/src/tests/proton/proton_config_fetcher/proton_config_fetcher_test.cpp b/searchcore/src/tests/proton/proton_config_fetcher/proton_config_fetcher_test.cpp index 6c682ea33e9..b9059338f27 100644 --- a/searchcore/src/tests/proton/proton_config_fetcher/proton_config_fetcher_test.cpp +++ b/searchcore/src/tests/proton/proton_config_fetcher/proton_config_fetcher_test.cpp @@ -170,12 +170,12 @@ struct ProtonConfigOwner : public proton::IProtonConfigurer return getConfigured(); } virtual void reconfigure(std::shared_ptr<ProtonConfigSnapshot> cfg) override { - std::unique_lock<std::mutex> guard(_mutex); + std::lock_guard<std::mutex> guard(_mutex); _config.set(cfg); _configured = true; } bool getConfigured() const { - std::unique_lock<std::mutex> guard(_mutex); + std::lock_guard<std::mutex> guard(_mutex); return _configured; } BootstrapConfig::SP getBootstrapConfig() { diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def index eab889ea28c..c6c810ae72f 100644 --- a/searchcore/src/vespa/searchcore/config/proton.def +++ b/searchcore/src/vespa/searchcore/config/proton.def @@ -82,17 +82,31 @@ flush.memory.conservative.disklimitfactor double default=0.5 ## watermark indicating when to go back from conservative to normal mode for the flush strategy. flush.memory.conservative.lowwatermarkfactor double default=0.9 -## The cost of doing replay when replaying the transaction log. +## The cost of replaying a byte when replaying the transaction log. ## -## The number of bytes to replay * replaycost gives an estimate of the -## total cost of replaying the transaction log. +## The estimate of the total cost of replaying the transaction log: +## (number of bytes to replay) * replaycost + (number of operations to replay) * replayoperationcost ## ## The prepare for restart flush strategy will choose a set of components to flush ## such that the cost of flushing these + the cost of replaying the transaction log ## is as low as possible. -flush.preparerestart.replaycost double default=4.0 +flush.preparerestart.replaycost double default=2.0 -## The cost of doing writes when flushing components to disk. +## The cost of replaying an operation when replaying the transaction log. +## +## The estimate of the total cost of replaying the transaction log: +## (number of bytes to replay) * replaycost + (number of operations to replay) * replayoperationcost +## +## The default value is chosen based on the following example: +## Assume we can replay 9 MB/s and this corresponds to 24000 ops/s. +## replayoperationcost = (bytes to replay) * replaycost / (operations to replay) = 9 MB * 2.0 / 24000 = 750 +## +## The prepare for restart flush strategy will choose a set of components to flush +## such that the cost of flushing these + the cost of replaying the transaction log +## is as low as possible. +flush.preparerestart.replayoperationcost double default=750.0 + +## The cost of writing a byte when flushing components to disk. ## ## The number of bytes to write (for a set of flushed components) * writecost ## gives an estimate of the total cost of flushing this set of components. diff --git a/searchcore/src/vespa/searchcore/fdispatch/common/search.cpp b/searchcore/src/vespa/searchcore/fdispatch/common/search.cpp index 7685ddcc328..7b060e793f6 100644 --- a/searchcore/src/vespa/searchcore/fdispatch/common/search.cpp +++ b/searchcore/src/vespa/searchcore/fdispatch/common/search.cpp @@ -165,7 +165,7 @@ void FastS_SyncSearchAdapter::DoneQuery(FastS_ISearch *, FastS_SearchContext) { - std::unique_lock<std::mutex> guard(_lock); + std::lock_guard<std::mutex> guard(_lock); _queryDone = true; if (_waitQuery) { _cond.notify_one(); @@ -177,7 +177,7 @@ void FastS_SyncSearchAdapter::DoneDocsums(FastS_ISearch *, FastS_SearchContext) { - std::unique_lock<std::mutex> guard(_lock); + std::lock_guard<std::mutex> guard(_lock); _docsumsDone = true; if (_waitDocsums) { _cond.notify_one(); diff --git a/searchcore/src/vespa/searchcore/fdispatch/search/engine_base.cpp b/searchcore/src/vespa/searchcore/fdispatch/search/engine_base.cpp index 83312a41875..24668db6024 100644 --- a/searchcore/src/vespa/searchcore/fdispatch/search/engine_base.cpp +++ b/searchcore/src/vespa/searchcore/fdispatch/search/engine_base.cpp @@ -112,7 +112,7 @@ void FastS_EngineBase::SlowQuery(double limit, double secs, bool silent) { { - std::unique_lock<std::mutex> engineGuard(_lock); + std::lock_guard<std::mutex> engineGuard(_lock); _stats._slowQueryCnt++; _stats._slowQuerySecs += secs; } @@ -127,7 +127,7 @@ void FastS_EngineBase::SlowDocsum(double limit, double secs) { { - std::unique_lock<std::mutex> engineGuard(_lock); + std::lock_guard<std::mutex> engineGuard(_lock); _stats._slowDocsumCnt++; _stats._slowDocsumSecs += secs; } @@ -173,7 +173,7 @@ FastS_EngineBase::SampleQueueLens() double queueLen; double activecnt; - std::unique_lock<std::mutex> engineGuard(_lock); + std::lock_guard<std::mutex> engineGuard(_lock); if (_stats._queueLenSampleCnt > 0) queueLen = (double) _stats._queueLenSampleAcc / (double) _stats._queueLenSampleCnt; else @@ -217,7 +217,7 @@ FastS_EngineBase::MarkBad(uint32_t badness) bool worse = false; { - std::unique_lock<std::mutex> engineGuard(_lock); + std::lock_guard<std::mutex> engineGuard(_lock); if (badness > _badness) { _badness = badness; worse = true; diff --git a/searchcore/src/vespa/searchcore/fdispatch/search/fnet_search.cpp b/searchcore/src/vespa/searchcore/fdispatch/search/fnet_search.cpp index 0cfbdc8b69a..85599b9e897 100644 --- a/searchcore/src/vespa/searchcore/fdispatch/search/fnet_search.cpp +++ b/searchcore/src/vespa/searchcore/fdispatch/search/fnet_search.cpp @@ -1077,7 +1077,7 @@ FastS_FNET_Search::Search(uint32_t searchOffset, // allow FNET responses while requests are being sent { - std::unique_lock<std::mutex> searchGuard(_lock); + std::lock_guard<std::mutex> searchGuard(_lock); ++_pendingQueries; // add Elephant query node to avoid early query done ++_queryNodes; // add Elephant query node to avoid early query done _FNET_mode = FNET_QUERY; @@ -1102,7 +1102,7 @@ FastS_FNET_Search::Search(uint32_t searchOffset, // finalize setup and check if query is still in progress bool done; { - std::unique_lock<std::mutex> searchGuard(_lock); + std::lock_guard<std::mutex> searchGuard(_lock); assert(_queryNodes >= _pendingQueries); for (uint32_t i: send_failed) { // conditional revert of state for failed nodes @@ -1398,7 +1398,7 @@ FastS_FNET_Search::GetDocsums(const FastS_hitresult *hits, uint32_t hitcnt) ConnectDocsumNodes(ignoreRow); bool done; { - std::unique_lock<std::mutex> searchGuard(_lock); + std::lock_guard<std::mutex> searchGuard(_lock); // patch in engine dependent features and send docsum requests diff --git a/searchcore/src/vespa/searchcore/fdispatch/search/nodemanager.cpp b/searchcore/src/vespa/searchcore/fdispatch/search/nodemanager.cpp index 6fddfae2ab0..4b272a615a6 100644 --- a/searchcore/src/vespa/searchcore/fdispatch/search/nodemanager.cpp +++ b/searchcore/src/vespa/searchcore/fdispatch/search/nodemanager.cpp @@ -125,7 +125,7 @@ FastS_NodeManager::CheckTempFail() _checkTempFailScheduled = false; tempfail = false; { - std::unique_lock<std::mutex> mangerGuard(_managerLock); + std::lock_guard<std::mutex> mangerGuard(_managerLock); FastS_DataSetCollection *dsc = PeekDataSetCollection(); for (unsigned int i = 0; i < dsc->GetMaxNumDataSets(); i++) { FastS_DataSetBase *ds; @@ -166,7 +166,7 @@ uint32_t FastS_NodeManager::SetPartMap(const PartitionsConfig& partmap, unsigned int waitms) { - std::unique_lock<std::mutex> configGuard(_configLock); + std::lock_guard<std::mutex> configGuard(_configLock); FastS_DataSetCollDesc *configDesc = new FastS_DataSetCollDesc(); if (!configDesc->ReadConfig(partmap)) { LOG(error, "NodeManager::SetPartMap: Failed to load configuration"); @@ -275,7 +275,7 @@ FastS_NodeManager::SetDataSetCollection(FastS_DataSetCollection *dsc) } else { { - std::unique_lock<std::mutex> managerGuard(_managerLock); + std::lock_guard<std::mutex> managerGuard(_managerLock); _gencnt++; gencnt = _gencnt; @@ -304,7 +304,7 @@ FastS_NodeManager::GetDataSetCollection() { FastS_DataSetCollection *ret; - std::unique_lock<std::mutex> managerGuard(_managerLock); + std::lock_guard<std::mutex> managerGuard(_managerLock); ret = _datasetCollection; FastS_assert(ret != NULL); ret->addRef(); @@ -320,8 +320,8 @@ FastS_NodeManager::ShutdownConfig() FastS_DataSetCollection *old_dsc; { - std::unique_lock<std::mutex> configGuard(_configLock); - std::unique_lock<std::mutex> managerGuard(_managerLock); + std::lock_guard<std::mutex> configGuard(_configLock); + std::lock_guard<std::mutex> managerGuard(_managerLock); _shutdown = true; // disallow SetPartMap dsc = _datasetCollection; _datasetCollection = new FastS_DataSetCollection(_appCtx); @@ -347,7 +347,7 @@ FastS_NodeManager::GetTotalPartitions() uint32_t ret; ret = 0; - std::unique_lock<std::mutex> managerGuard(_managerLock); + std::lock_guard<std::mutex> managerGuard(_managerLock); FastS_DataSetCollection *dsc = PeekDataSetCollection(); for (unsigned int i = 0; i < dsc->GetMaxNumDataSets(); i++) { FastS_DataSetBase *ds; @@ -429,7 +429,7 @@ FastS_NodeManager::CheckEvents(FastS_TimeKeeper *timeKeeper) FastS_DataSetCollection *tmp; { - std::unique_lock<std::mutex> managerGuard(_managerLock); + std::lock_guard<std::mutex> managerGuard(_managerLock); old_dsc = _oldDSCList; } diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_directory.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_directory.cpp index eac994bb339..f775f4443f8 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_directory.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_directory.cpp @@ -56,7 +56,7 @@ AttributeDirectory::getDirName() const { std::shared_ptr<AttributeDiskLayout> diskLayout; { - std::unique_lock<std::mutex> guard(_mutex); + std::lock_guard<std::mutex> guard(_mutex); assert(!_diskLayout.expired()); diskLayout = _diskLayout.lock(); } @@ -204,7 +204,7 @@ void AttributeDirectory::detach() { assert(empty()); - std::unique_lock<std::mutex> guard(_mutex); + std::lock_guard<std::mutex> guard(_mutex); _diskLayout.reset(); } @@ -238,7 +238,7 @@ AttributeDirectory::tryGetWriter() bool AttributeDirectory::empty() const { - std::unique_lock<std::mutex> guard(_mutex); + std::lock_guard<std::mutex> guard(_mutex); return _snapInfo.snapshots().empty(); } diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attributedisklayout.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attributedisklayout.cpp index 1fcffa92cce..bb2f99d077b 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attributedisklayout.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/attributedisklayout.cpp @@ -59,7 +59,7 @@ AttributeDiskLayout::getAttributeDir(const vespalib::string &name) std::shared_ptr<AttributeDirectory> AttributeDiskLayout::createAttributeDir(const vespalib::string &name) { - std::unique_lock<std::shared_timed_mutex> guard(_mutex); + std::lock_guard<std::shared_timed_mutex> guard(_mutex); auto itr = _dirs.find(name); if (itr == _dirs.end()) { auto dir = std::make_shared<AttributeDirectory>(shared_from_this(), name); @@ -81,7 +81,7 @@ AttributeDiskLayout::removeAttributeDir(const vespalib::string &name, search::Se writer->invalidateOldSnapshots(serialNum); writer->removeInvalidSnapshots(); if (writer->removeDiskDir()) { - std::unique_lock<std::shared_timed_mutex> guard(_mutex); + std::lock_guard<std::shared_timed_mutex> guard(_mutex); auto itr = _dirs.find(name); assert(itr != _dirs.end()); assert(dir.get() == itr->second.get()); @@ -89,7 +89,7 @@ AttributeDiskLayout::removeAttributeDir(const vespalib::string &name, search::Se writer->detach(); } } else { - std::unique_lock<std::shared_timed_mutex> guard(_mutex); + std::lock_guard<std::shared_timed_mutex> guard(_mutex); auto itr = _dirs.find(name); if (itr != _dirs.end()) { assert(dir.get() != itr->second.get()); diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flush_target_candidates.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flush_target_candidates.cpp index da54b909759..0051c209ef9 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flush_target_candidates.cpp +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flush_target_candidates.cpp @@ -8,6 +8,7 @@ namespace proton { using search::SerialNum; using Config = PrepareRestartFlushStrategy::Config; +using TlsReplayCost = FlushTargetCandidates::TlsReplayCost; namespace { @@ -25,7 +26,7 @@ calculateReplayStartSerial(const FlushContext::List &sortedFlushContexts, return sortedFlushContexts[numCandidates]->getTarget()->getFlushedSerialNum() + 1; } -double +TlsReplayCost calculateTlsReplayCost(const flushengine::TlsStats &tlsStats, const Config &cfg, SerialNum replayStartSerial) @@ -33,13 +34,13 @@ calculateTlsReplayCost(const flushengine::TlsStats &tlsStats, SerialNum replayEndSerial = tlsStats.getLastSerial(); SerialNum numTotalOperations = replayEndSerial - tlsStats.getFirstSerial() + 1; if (numTotalOperations == 0) { - return 0; + return TlsReplayCost(0.0, 0.0); } double numBytesPerOperation = (double)tlsStats.getNumBytes() / (double)numTotalOperations; SerialNum numOperationsToReplay = replayEndSerial + 1 - replayStartSerial; double numBytesToReplay = numBytesPerOperation * numOperationsToReplay; - return numBytesToReplay * cfg.tlsReplayCost; + return TlsReplayCost((numBytesToReplay * cfg.tlsReplayByteCost), (numOperationsToReplay * cfg.tlsReplayOperationCost)); } double diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flush_target_candidates.h b/searchcore/src/vespa/searchcore/proton/flushengine/flush_target_candidates.h index 5498d8c46a8..ea09989de31 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flush_target_candidates.h +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flush_target_candidates.h @@ -16,10 +16,20 @@ namespace flushengine { class TlsStats; } */ class FlushTargetCandidates { +public: + struct TlsReplayCost { + double bytesCost; + double operationsCost; + TlsReplayCost(double bytesCost_, double operationsCost_) + : bytesCost(bytesCost_), + operationsCost(operationsCost_) + {} + double totalCost() const { return bytesCost + operationsCost; } + }; private: const FlushContext::List *_sortedFlushContexts; // NOTE: ownership is handled outside size_t _numCandidates; - double _tlsReplayCost; + TlsReplayCost _tlsReplayCost; double _flushTargetsWriteCost; using Config = PrepareRestartFlushStrategy::Config; @@ -32,9 +42,9 @@ public: const flushengine::TlsStats &tlsStats, const Config &cfg); - double getTlsReplayCost() const { return _tlsReplayCost; } + TlsReplayCost getTlsReplayCost() const { return _tlsReplayCost; } double getFlushTargetsWriteCost() const { return _flushTargetsWriteCost; } - double getTotalCost() const { return getTlsReplayCost() + getFlushTargetsWriteCost(); } + double getTotalCost() const { return getTlsReplayCost().totalCost() + getFlushTargetsWriteCost(); } FlushContext::List getCandidates() const; }; diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/prepare_restart_flush_strategy.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/prepare_restart_flush_strategy.cpp index e9df78dbf4f..6cfb8cb6c3d 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/prepare_restart_flush_strategy.cpp +++ b/searchcore/src/vespa/searchcore/proton/flushengine/prepare_restart_flush_strategy.cpp @@ -18,9 +18,11 @@ using Config = PrepareRestartFlushStrategy::Config; using FlushContextsMap = std::map<vespalib::string, FlushContext::List>; using FlushTargetCandidatesList = std::vector<FlushTargetCandidates::UP>; -PrepareRestartFlushStrategy::Config::Config(double tlsReplayCost_, +PrepareRestartFlushStrategy::Config::Config(double tlsReplayByteCost_, + double tlsReplayOperationCost_, double flushTargetWriteCost_) - : tlsReplayCost(tlsReplayCost_), + : tlsReplayByteCost(tlsReplayByteCost_), + tlsReplayOperationCost(tlsReplayOperationCost_), flushTargetWriteCost(flushTargetWriteCost_) { } @@ -108,18 +110,22 @@ findBestTargetsToFlush(const FlushContext::List &unsortedFlushContexts, for (size_t numCandidates = 1; numCandidates <= sortedFlushContexts.size(); ++numCandidates) { FlushTargetCandidates nextSet(sortedFlushContexts, numCandidates, tlsStats, cfg); LOG(debug, "findBestTargetsToFlush(): Created candidate set: " - "flushTargets=[%s], tlsReplayCost=%f, flushTargetsWriteCost=%f, totalCost=%f", + "flushTargets=[%s], tlsReplayBytesCost=%f, tlsReplayOperationsCost=%f, flushTargetsWriteCost=%f, totalCost=%f", toString(nextSet.getCandidates()).c_str(), - nextSet.getTlsReplayCost(), nextSet.getFlushTargetsWriteCost(), + nextSet.getTlsReplayCost().bytesCost, + nextSet.getTlsReplayCost().operationsCost, + nextSet.getFlushTargetsWriteCost(), nextSet.getTotalCost()); if (nextSet.getTotalCost() < bestSet.getTotalCost()) { bestSet = nextSet; } } LOG(info, "findBestTargetsToFlush(): Best candidate set: " - "flushTargets=[%s], tlsReplayCost=%f, flushTargetsWriteCost=%f, totalCost=%f", + "flushTargets=[%s], tlsReplayBytesCost=%f, tlsReplayOperationsCost=%f, flushTargetsWriteCost=%f, totalCost=%f", toString(bestSet.getCandidates()).c_str(), - bestSet.getTlsReplayCost(), bestSet.getFlushTargetsWriteCost(), + bestSet.getTlsReplayCost().bytesCost, + bestSet.getTlsReplayCost().operationsCost, + bestSet.getFlushTargetsWriteCost(), bestSet.getTotalCost()); return bestSet.getCandidates(); } diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/prepare_restart_flush_strategy.h b/searchcore/src/vespa/searchcore/proton/flushengine/prepare_restart_flush_strategy.h index 19a5cf45670..df5c5a9c569 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/prepare_restart_flush_strategy.h +++ b/searchcore/src/vespa/searchcore/proton/flushengine/prepare_restart_flush_strategy.h @@ -21,9 +21,12 @@ class PrepareRestartFlushStrategy : public IFlushStrategy public: struct Config { - double tlsReplayCost; + double tlsReplayByteCost; + double tlsReplayOperationCost; double flushTargetWriteCost; - Config(double tlsReplayCost_, double flushTargetWriteCost_); + Config(double tlsReplayByteCost_, + double tlsReplayOperationCost_, + double flushTargetWriteCost_); }; private: diff --git a/searchcore/src/vespa/searchcore/proton/metrics/job_tracker.cpp b/searchcore/src/vespa/searchcore/proton/metrics/job_tracker.cpp index 753c84cd9b6..6d05ce8c57d 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/job_tracker.cpp +++ b/searchcore/src/vespa/searchcore/proton/metrics/job_tracker.cpp @@ -20,14 +20,14 @@ JobTracker::sampleLoad(time_point now, const std::lock_guard<std::mutex> &guard) void JobTracker::start() { - std::unique_lock<std::mutex> guard(_lock); + std::lock_guard<std::mutex> guard(_lock); _sampler.startJob(std::chrono::steady_clock::now()); } void JobTracker::end() { - std::unique_lock<std::mutex> guard(_lock); + std::lock_guard<std::mutex> guard(_lock); _sampler.endJob(std::chrono::steady_clock::now()); } diff --git a/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_registry.cpp b/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_registry.cpp index 75a20f9f8e5..68aaad3b557 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_registry.cpp +++ b/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_registry.cpp @@ -30,7 +30,7 @@ DocumentDBReferenceRegistry::get(vespalib::stringref name) const std::shared_ptr<IDocumentDBReference> DocumentDBReferenceRegistry::tryGet(vespalib::stringref name) const { - std::unique_lock<std::mutex> guard(_lock); + std::lock_guard<std::mutex> guard(_lock); auto itr = _handlers.find(name); if (itr == _handlers.end()) { return std::shared_ptr<IDocumentDBReference>(); diff --git a/searchcore/src/vespa/searchcore/proton/server/pendinglidtracker.cpp b/searchcore/src/vespa/searchcore/proton/server/pendinglidtracker.cpp index 15283c170cc..79bf970aeac 100644 --- a/searchcore/src/vespa/searchcore/proton/server/pendinglidtracker.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/pendinglidtracker.cpp @@ -6,7 +6,6 @@ namespace proton { -using LockGuard = std::unique_lock<std::mutex>; PendingLidTracker::PendingLidTracker() : _mutex(), _cond(), @@ -19,12 +18,12 @@ PendingLidTracker::~PendingLidTracker() { void PendingLidTracker::produce(uint32_t lid) { - LockGuard guard(_mutex); + std::lock_guard<std::mutex> guard(_mutex); _pending[lid]++; } void PendingLidTracker::consume(uint32_t lid) { - LockGuard guard(_mutex); + std::lock_guard<std::mutex> guard(_mutex); auto found = _pending.find(lid); assert (found != _pending.end()); assert (found->second > 0); @@ -38,7 +37,7 @@ PendingLidTracker::consume(uint32_t lid) { void PendingLidTracker::waitForConsumedLid(uint32_t lid) { - LockGuard guard(_mutex); + std::unique_lock<std::mutex> guard(_mutex); while (_pending.find(lid) != _pending.end()) { _cond.wait(guard); } diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index cd016e5cfc4..2794619273c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -655,7 +655,8 @@ PrepareRestartFlushStrategy::Config createPrepareRestartConfig(const ProtonConfig &protonConfig) { return PrepareRestartFlushStrategy::Config(protonConfig.flush.preparerestart.replaycost, - protonConfig.flush.preparerestart.writecost); + protonConfig.flush.preparerestart.replayoperationcost, + protonConfig.flush.preparerestart.writecost); } } diff --git a/searchlib/src/tests/postinglistbm/andstress.cpp b/searchlib/src/tests/postinglistbm/andstress.cpp index 736d53508b4..40f919509e8 100644 --- a/searchlib/src/tests/postinglistbm/andstress.cpp +++ b/searchlib/src/tests/postinglistbm/andstress.cpp @@ -280,7 +280,7 @@ AndStressMaster::Task * AndStressMaster::getTask() { Task *result = NULL; - std::unique_lock<std::mutex> taskGuard(_taskLock); + std::lock_guard<std::mutex> taskGuard(_taskLock); if (_taskIdx < _tasks.size()) { result = &_tasks[_taskIdx]; ++_taskIdx; diff --git a/staging_vespalib/src/vespa/vespalib/util/clock.cpp b/staging_vespalib/src/vespa/vespalib/util/clock.cpp index b19b067afa9..c9768417914 100644 --- a/staging_vespalib/src/vespa/vespalib/util/clock.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/clock.cpp @@ -45,7 +45,7 @@ void Clock::Run(FastOS_ThreadInterface *thread, void *arguments) void Clock::stop(void) { - std::unique_lock<std::mutex> guard(_lock); + std::lock_guard<std::mutex> guard(_lock); _stop = true; _cond.notify_all(); } diff --git a/standalone-container/src/main/java/com/yahoo/container/standalone/StandaloneContainerActivator.java b/standalone-container/src/main/java/com/yahoo/container/standalone/StandaloneContainerActivator.java index d03a08a27db..baf0b41b270 100644 --- a/standalone-container/src/main/java/com/yahoo/container/standalone/StandaloneContainerActivator.java +++ b/standalone-container/src/main/java/com/yahoo/container/standalone/StandaloneContainerActivator.java @@ -19,24 +19,15 @@ import org.osgi.framework.Bundle; import org.osgi.framework.BundleActivator; import org.osgi.framework.BundleContext; -import java.io.FileInputStream; import java.io.IOException; import java.net.InetSocketAddress; -import java.nio.channels.FileChannel; import java.nio.channels.ServerSocketChannel; -import java.nio.file.Path; -import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Hashtable; import java.util.List; -import java.util.Map; -import java.util.function.Function; import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static java.util.stream.Collectors.toMap; /** * @author Einar M R Rosenvinge @@ -45,69 +36,19 @@ public class StandaloneContainerActivator implements BundleActivator { @Override public void start(BundleContext bundleContext) throws Exception { - Container container = getContainer(); - List<ConnectorConfig> connectorConfigs = getConnectorConfigs(container); - - Stream<Path> keyStorePaths = getKeyStorePaths(connectorConfigs); - Map<Path, FileChannel> fileChannels = openFiles(keyStorePaths); - registerKeyStoreFileChannels(bundleContext, fileChannels); - - for (ConnectorConfig config: connectorConfigs) { + for (ConnectorConfig config: getConnectorConfigs(getContainer())) { ServerSocketChannel socketChannel = bindChannel(config); registerChannels(bundleContext, config.listenPort(), socketChannel); } } - private void registerKeyStoreFileChannels(BundleContext bundleContext, Map<Path, FileChannel> fileChannels) { - Hashtable<String, Object> properties = new Hashtable<>(); - properties.put("role", "com.yahoo.container.standalone.StandaloneContainerActivator.KeyStoreFileChannels"); - //Since Standalone container and jdisc http service don't have a suitable common module for placing a wrapper class for fileChannels, - //we register it with the type map. In the future, we should wrap this. - bundleContext.registerService(Map.class, - Collections.unmodifiableMap(fileChannels), - properties); - } - - private Map<Path, FileChannel> openFiles(Stream<Path> keyStorePaths) { - return keyStorePaths.collect(toMap( - Function.<Path>identity(), - StandaloneContainerActivator::getFileChannel)); - } - - private static FileChannel getFileChannel(Path path) { - try { - FileInputStream inputStream = new FileInputStream(path.toFile()); - //don't close the inputStream, as that will close the underlying channel. - return inputStream.getChannel(); - } catch (IOException e) { - throw new RuntimeException("Failed opening path " + path, e); - } - } - - private Stream<Path> getKeyStorePaths(List<ConnectorConfig> connectorConfigs) { - return connectorConfigs.stream(). - map(ConnectorConfig::ssl). - flatMap(StandaloneContainerActivator::getKeyStorePaths); - } - - private static Stream<Path> getKeyStorePaths(ConnectorConfig.Ssl ssl) { - Stream<String> paths = Stream.of( - ssl.keyStorePath(), - ssl.pemKeyStore().certificatePath(), - ssl.pemKeyStore().keyPath()); - - return paths. - filter(path -> !path.isEmpty()). - map(Paths::get); - } - - void registerChannels(BundleContext bundleContext, int listenPort, ServerSocketChannel boundChannel) { + static void registerChannels(BundleContext bundleContext, int listenPort, ServerSocketChannel boundChannel) { Hashtable<String, Integer> properties = new Hashtable<>(); properties.put("port", listenPort); bundleContext.registerService(ServerSocketChannel.class, boundChannel, properties); } - ServerSocketChannel bindChannel(ConnectorConfig channelInfo) throws IOException { + static ServerSocketChannel bindChannel(ConnectorConfig channelInfo) throws IOException { ServerSocketChannel serverChannel = ServerSocketChannel.open(); InetSocketAddress bindAddress = new InetSocketAddress(channelInfo.listenPort()); serverChannel.socket().bind(bindAddress, channelInfo.acceptQueueSize()); @@ -117,7 +58,7 @@ public class StandaloneContainerActivator implements BundleActivator { @Override public void stop(BundleContext bundleContext) throws Exception { } - Container getContainer(Module... modules) { + static Container getContainer(Module... modules) { Module activatorModule = new ActivatorModule(); List<Module> allModules = new ArrayList<>(); allModules.addAll(Arrays.asList(modules)); @@ -127,7 +68,7 @@ public class StandaloneContainerActivator implements BundleActivator { return app.container(); } - List<ConnectorConfig> getConnectorConfigs(Container container) { + static List<ConnectorConfig> getConnectorConfigs(Container container) { Http http = container.getHttp(); return (http == null) ? diff --git a/standalone-container/src/test/java/com/yahoo/container/standalone/StandaloneContainerActivatorTest.java b/standalone-container/src/test/java/com/yahoo/container/standalone/StandaloneContainerActivatorTest.java index 48c882c78db..8d413ade0f0 100644 --- a/standalone-container/src/test/java/com/yahoo/container/standalone/StandaloneContainerActivatorTest.java +++ b/standalone-container/src/test/java/com/yahoo/container/standalone/StandaloneContainerActivatorTest.java @@ -1,7 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.container.standalone; -import com.google.inject.Binder; import com.google.inject.Module; import com.yahoo.io.IOUtils; import com.yahoo.jdisc.http.ConnectorConfig; @@ -18,6 +17,7 @@ import java.nio.file.Path; import java.util.List; import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; import static java.util.stream.Collectors.toList; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.collection.IsEmptyCollection.empty; @@ -29,7 +29,7 @@ import static org.junit.Assert.assertThat; */ public class StandaloneContainerActivatorTest { - private String getJdiscXml(String contents) throws ParserConfigurationException, IOException, SAXException { + private static String getJdiscXml(String contents) throws ParserConfigurationException, IOException, SAXException { return "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n" + "<services>\n" + " <jdisc version=\"1.0\" jetty=\"true\">\n" + @@ -38,7 +38,7 @@ public class StandaloneContainerActivatorTest { "</services>"; } - private void writeApplicationPackage(String servicesXml, Path tmpDir) throws IOException { + private static void writeApplicationPackage(String servicesXml, Path tmpDir) throws IOException { FileWriter fw = new FileWriter(tmpDir.resolve("services.xml").toFile(), false); fw.write(servicesXml); fw.close(); @@ -50,16 +50,16 @@ public class StandaloneContainerActivatorTest { try { writeApplicationPackage(getJdiscXml(""), applicationDir); StandaloneContainerActivator activator = new StandaloneContainerActivator(); - Container container = activator.getContainer(newAppDirBinding(applicationDir)); + Container container = StandaloneContainerActivator.getContainer(newAppDirBinding(applicationDir)); List<Integer> ports = getPorts(activator, container); - assertThat(ports, is(asList(Defaults.getDefaults().vespaWebServicePort()))); + assertThat(ports, is(singletonList(Defaults.getDefaults().vespaWebServicePort()))); } finally { IOUtils.recursiveDeleteDir(applicationDir.toFile()); } } - private List<Integer> getPorts(StandaloneContainerActivator activator, Container container) { - return activator.getConnectorConfigs(container).stream(). + private static List<Integer> getPorts(StandaloneContainerActivator activator, Container container) { + return StandaloneContainerActivator.getConnectorConfigs(container).stream(). map(ConnectorConfig::listenPort). collect(toList()); } @@ -70,7 +70,7 @@ public class StandaloneContainerActivatorTest { try { writeApplicationPackage(getJdiscXml("<http/>"), applicationDir); StandaloneContainerActivator activator = new StandaloneContainerActivator(); - Container container = activator.getContainer(newAppDirBinding(applicationDir)); + Container container = StandaloneContainerActivator.getContainer(newAppDirBinding(applicationDir)); List<Integer> ports = getPorts(activator, container); assertThat(ports, empty()); } finally { @@ -90,7 +90,7 @@ public class StandaloneContainerActivatorTest { "</http>\n"; writeApplicationPackage(getJdiscXml(contents), applicationDir); StandaloneContainerActivator activator = new StandaloneContainerActivator(); - Container container = activator.getContainer(newAppDirBinding(applicationDir)); + Container container = StandaloneContainerActivator.getContainer(newAppDirBinding(applicationDir)); List<Integer> ports = getPorts(activator, container); assertThat(ports, is(asList(123, 456, 789))); } finally { @@ -98,15 +98,10 @@ public class StandaloneContainerActivatorTest { } } - private Module newAppDirBinding(final Path applicationDir) { - return new Module() { - @Override - public void configure(Binder binder) { - binder.bind(Path.class) - .annotatedWith(StandaloneContainerApplication.applicationPathName()) - .toInstance(applicationDir); - } - }; + private static Module newAppDirBinding(final Path applicationDir) { + return binder -> binder.bind(Path.class) + .annotatedWith(StandaloneContainerApplication.applicationPathName()) + .toInstance(applicationDir); } } diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp index d1a54c04359..03a51cf6180 100644 --- a/storage/src/tests/distributor/bucketdbupdatertest.cpp +++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp @@ -4,11 +4,14 @@ #include <iomanip> #include <vespa/storageapi/message/persistence.h> #include <vespa/storage/distributor/bucketdbupdater.h> +#include <vespa/storage/distributor/pending_bucket_space_db_transition.h> +#include <vespa/storage/distributor/outdated_nodes_map.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/storageframework/defaultimplementation/clock/realclock.h> #include <vespa/storage/storageutil/distributorstatecache.h> #include <tests/distributor/distributortestutil.h> #include <vespa/document/test/make_document_bucket.h> +#include <vespa/document/test/make_bucket_space.h> #include <vespa/storage/distributor/simpleclusterinformation.h> #include <vespa/storage/distributor/distributor.h> #include <vespa/vespalib/text/stringtokenizer.h> @@ -16,6 +19,7 @@ using namespace storage::api; using namespace storage::lib; using document::test::makeDocumentBucket; +using document::test::makeBucketSpace; namespace storage { namespace distributor { @@ -154,6 +158,7 @@ protected: } public: + using OutdatedNodesMap = dbtransition::OutdatedNodesMap; void setUp() override { createLinks(); }; @@ -536,9 +541,9 @@ public: ClusterInformation::CSP clusterInfo( owner.createClusterInfo(oldClusterState)); - std::unordered_set<uint16_t> outdatedNodes; + OutdatedNodesMap outdatedNodesMap; state = PendingClusterState::createForClusterStateChange( - clock, clusterInfo, sender, cmd, outdatedNodes, + clock, clusterInfo, sender, owner.getBucketSpaceRepo(), cmd, outdatedNodesMap, api::Timestamp(1)); } @@ -549,9 +554,8 @@ public: ClusterInformation::CSP clusterInfo( owner.createClusterInfo(oldClusterState)); - std::unordered_set<uint16_t> outdatedNodes; state = PendingClusterState::createForDistributionChange( - clock, clusterInfo, sender, api::Timestamp(1)); + clock, clusterInfo, sender, owner.getBucketSpaceRepo(), api::Timestamp(1)); } }; @@ -1475,7 +1479,7 @@ BucketDBUpdaterTest::getSentNodesDistributionChanged( ClusterInformation::CSP clusterInfo(createClusterInfo(oldClusterState)); std::unique_ptr<PendingClusterState> state( PendingClusterState::createForDistributionChange( - clock, clusterInfo, sender, api::Timestamp(1))); + clock, clusterInfo, sender, getBucketSpaceRepo(), api::Timestamp(1))); sortSentMessagesByIndex(sender); @@ -1637,10 +1641,10 @@ BucketDBUpdaterTest::testPendingClusterStateReceive() framework::defaultimplementation::FakeClock clock; ClusterInformation::CSP clusterInfo(createClusterInfo("cluster:d")); - std::unordered_set<uint16_t> outdatedNodes; + OutdatedNodesMap outdatedNodesMap; std::unique_ptr<PendingClusterState> state( PendingClusterState::createForClusterStateChange( - clock, clusterInfo, sender, cmd, outdatedNodes, + clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodesMap, api::Timestamp(1))); CPPUNIT_ASSERT_EQUAL(3, (int)sender.commands.size()); @@ -1668,7 +1672,8 @@ BucketDBUpdaterTest::testPendingClusterStateReceive() state->done()); } - CPPUNIT_ASSERT_EQUAL(3, (int)state->results().size()); + auto &pendingTransition = state->getPendingBucketSpaceDbTransition(makeBucketSpace()); + CPPUNIT_ASSERT_EQUAL(3, (int)pendingTransition.results().size()); } void @@ -1721,13 +1726,14 @@ parseInputData(const std::string& data, uint16_t node = atoi(tok2[0].c_str()); state.setNodeReplied(node); + auto &pendingTransition = state.getPendingBucketSpaceDbTransition(makeBucketSpace()); vespalib::StringTokenizer tok3(tok2[1], ","); for (uint32_t j = 0; j < tok3.size(); j++) { if (includeBucketInfo) { vespalib::StringTokenizer tok4(tok3[j], "/"); - state.addNodeInfo( + pendingTransition.addNodeInfo( document::BucketId(16, atoi(tok4[0].c_str())), BucketCopy( timestamp, @@ -1739,7 +1745,7 @@ parseInputData(const std::string& data, atoi(tok4[2].c_str()), atoi(tok4[3].c_str())))); } else { - state.addNodeInfo( + pendingTransition.addNodeInfo( document::BucketId(16, atoi(tok3[j].c_str())), BucketCopy(timestamp, node, @@ -1793,7 +1799,7 @@ BucketDBUpdaterTest::mergeBucketLists( framework::MilliSecTimer timer(clock); MessageSenderStub sender; - std::unordered_set<uint16_t> outdatedNodes; + OutdatedNodesMap outdatedNodesMap; { auto cmd(std::make_shared<api::SetSystemStateCommand>(oldState)); @@ -1803,11 +1809,11 @@ BucketDBUpdaterTest::mergeBucketLists( ClusterInformation::CSP clusterInfo(createClusterInfo("cluster:d")); std::unique_ptr<PendingClusterState> state( PendingClusterState::createForClusterStateChange( - clock, clusterInfo, sender, cmd, outdatedNodes, + clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodesMap, beforeTime)); parseInputData(existingData, beforeTime, *state, includeBucketInfo); - state->mergeInto(getBucketDBUpdater().getDistributorComponent().getBucketDatabase()); + state->mergeIntoBucketDatabases(); } BucketDumper dumper_tmp(true); @@ -1822,12 +1828,11 @@ BucketDBUpdaterTest::mergeBucketLists( ClusterInformation::CSP clusterInfo(createClusterInfo(oldState.toString())); std::unique_ptr<PendingClusterState> state( PendingClusterState::createForClusterStateChange( - clock, clusterInfo, sender, cmd, outdatedNodes, + clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodesMap, afterTime)); parseInputData(newData, afterTime, *state, includeBucketInfo); - state->mergeInto(getBucketDBUpdater().getDistributorComponent() - .getBucketDatabase()); + state->mergeIntoBucketDatabases(); } BucketDumper dumper(includeBucketInfo); diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp index 5deb31f8579..a0aa8e00070 100644 --- a/storage/src/tests/distributor/distributortestutil.cpp +++ b/storage/src/tests/distributor/distributortestutil.cpp @@ -358,6 +358,16 @@ DistributorTestUtil::getBucketDatabase() const { return _distributor->getDefaultBucketSpace().getBucketDatabase(); } +DistributorBucketSpaceRepo & +DistributorTestUtil::getBucketSpaceRepo() { + return _distributor->getBucketSpaceRepo(); +} + +const DistributorBucketSpaceRepo & +DistributorTestUtil::getBucketSpaceRepo() const { + return _distributor->getBucketSpaceRepo(); +} + const lib::Distribution& DistributorTestUtil::getDistribution() const { return _distributor->getDefaultBucketSpace().getDistribution(); diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h index 4f09c11ac03..19da0483165 100644 --- a/storage/src/tests/distributor/distributortestutil.h +++ b/storage/src/tests/distributor/distributortestutil.h @@ -20,6 +20,7 @@ namespace distributor { class BucketDBUpdater; class Distributor; class DistributorBucketSpace; +class DistributorBucketSpaceRepo; class IdealStateManager; class ExternalOperationHandler; class Operation; @@ -125,6 +126,8 @@ public: DistributorBucketSpace &getDistributorBucketSpace(); BucketDatabase& getBucketDatabase(); const BucketDatabase& getBucketDatabase() const; + DistributorBucketSpaceRepo &getBucketSpaceRepo(); + const DistributorBucketSpaceRepo &getBucketSpaceRepo() const; const lib::Distribution& getDistribution() const; // "End to end" distribution change trigger, which will invoke the bucket diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp index 569136b8b10..b68e53c7136 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp @@ -169,8 +169,9 @@ BucketDBUpdater::storageDistributionChanged( _bucketSpaceComponent.getClock(), std::move(clusterInfo), _sender, + _bucketSpaceComponent.getBucketSpaceRepo(), _bucketSpaceComponent.getUniqueTimestamp()); - _outdatedNodes = _pendingClusterState->getOutdatedNodeSet(); + _outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap(); } void @@ -216,10 +217,11 @@ BucketDBUpdater::onSetSystemState( _bucketSpaceComponent.getClock(), std::move(clusterInfo), _sender, + _bucketSpaceComponent.getBucketSpaceRepo(), cmd, - _outdatedNodes, + _outdatedNodesMap, _bucketSpaceComponent.getUniqueTimestamp()); - _outdatedNodes = _pendingClusterState->getOutdatedNodeSet(); + _outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap(); if (isPendingClusterStateCompleted()) { processCompletedPendingClusterState(); @@ -486,7 +488,7 @@ BucketDBUpdater::isPendingClusterStateCompleted() const void BucketDBUpdater::processCompletedPendingClusterState() { - _pendingClusterState->mergeInto(_bucketSpaceComponent.getBucketDatabase()); + _pendingClusterState->mergeIntoBucketDatabases(); if (_pendingClusterState->getCommand().get()) { enableCurrentClusterStateInDistributor(); @@ -498,7 +500,7 @@ BucketDBUpdater::processCompletedPendingClusterState() } _pendingClusterState.reset(); - _outdatedNodes.clear(); + _outdatedNodesMap.clear(); sendAllQueuedBucketRechecks(); completeTransitionTimer(); } diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h index 994e207f200..ee07c46754f 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.h +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h @@ -7,6 +7,7 @@ #include "distributormessagesender.h" #include "pendingclusterstate.h" #include "distributor_bucket_space_component.h" +#include "outdated_nodes_map.h" #include <vespa/document/bucket/bucketid.h> #include <vespa/storageapi/messageapi/returncode.h> #include <vespa/storageapi/message/bucket.h> @@ -27,6 +28,8 @@ class BucketDBUpdater : public framework::StatusReporter, public api::MessageHandler { public: + using OutdatedNodes = dbtransition::OutdatedNodes; + using OutdatedNodesMap = dbtransition::OutdatedNodesMap; BucketDBUpdater(Distributor& owner, DistributorBucketSpaceRepo &bucketSpaceRepo, DistributorBucketSpace& bucketSpace, @@ -226,7 +229,7 @@ private: std::list<PendingClusterState::Summary> _history; DistributorMessageSender& _sender; std::set<EnqueuedBucketRecheck> _enqueuedRechecks; - std::unordered_set<uint16_t> _outdatedNodes; + OutdatedNodesMap _outdatedNodesMap; framework::MilliSecTimer _transitionTimer; }; diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index f59b47574ba..3cb3408a951 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -161,6 +161,8 @@ public: DistributorBucketSpace& getDefaultBucketSpace() noexcept; const DistributorBucketSpace& getDefaultBucketSpace() const noexcept; + DistributorBucketSpaceRepo &getBucketSpaceRepo() noexcept { return *_bucketSpaceRepo; } + const DistributorBucketSpaceRepo &getBucketSpaceRepo() const noexcept { return *_bucketSpaceRepo; } private: friend class Distributor_Test; diff --git a/storage/src/vespa/storage/distributor/outdated_nodes.h b/storage/src/vespa/storage/distributor/outdated_nodes.h new file mode 100644 index 00000000000..fddb1806d82 --- /dev/null +++ b/storage/src/vespa/storage/distributor/outdated_nodes.h @@ -0,0 +1,11 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <unordered_set> + +namespace storage::distributor::dbtransition { + +using OutdatedNodes = std::unordered_set<uint16_t>; + +} diff --git a/storage/src/vespa/storage/distributor/outdated_nodes_map.h b/storage/src/vespa/storage/distributor/outdated_nodes_map.h new file mode 100644 index 00000000000..8d08b20732b --- /dev/null +++ b/storage/src/vespa/storage/distributor/outdated_nodes_map.h @@ -0,0 +1,13 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "outdated_nodes.h" +#include <vespa/document/bucket/bucketspace.h> +#include <unordered_map> + +namespace storage::distributor::dbtransition { + +using OutdatedNodesMap = std::unordered_map<document::BucketSpace, OutdatedNodes, document::BucketSpace::hash>; + +} diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp index 36961a0fec0..ed9c8bc222b 100644 --- a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp +++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp @@ -3,6 +3,7 @@ #include "pending_bucket_space_db_transition.h" #include "clusterinformation.h" #include "pendingclusterstate.h" +#include "distributor_bucket_space.h" #include <vespa/storage/common/bucketoperationlogger.h> #include <algorithm> @@ -11,7 +12,14 @@ LOG_SETUP(".pendingbucketspacedbtransition"); namespace storage::distributor { +using lib::Node; +using lib::NodeType; +using lib::NodeState; + PendingBucketSpaceDbTransition::PendingBucketSpaceDbTransition(const PendingClusterState &pendingClusterState, + DistributorBucketSpace &distributorBucketSpace, + bool distributionChanged, + const OutdatedNodes &outdatedNodes, std::shared_ptr<const ClusterInformation> clusterInfo, const lib::ClusterState &newClusterState, api::Timestamp creationTimestamp) @@ -20,11 +28,24 @@ PendingBucketSpaceDbTransition::PendingBucketSpaceDbTransition(const PendingClus _removedBuckets(), _missingEntries(), _clusterInfo(std::move(clusterInfo)), - _outdatedNodes(pendingClusterState.getOutdatedNodeSet()), + _outdatedNodes(newClusterState.getNodeCount(NodeType::STORAGE)), + _prevClusterState(_clusterInfo->getClusterState()), _newClusterState(newClusterState), _creationTimestamp(creationTimestamp), - _pendingClusterState(pendingClusterState) + _pendingClusterState(pendingClusterState), + _distributorBucketSpace(distributorBucketSpace), + _distributorIndex(_clusterInfo->getDistributorIndex()), + _bucketOwnershipTransfer(distributionChanged) { + if (distributorChanged()) { + _bucketOwnershipTransfer = true; + } + if (_bucketOwnershipTransfer) { + markAllAvailableNodesAsRequiringRequest(); + } else { + updateSetOfNodesThatAreOutdated(); + addAdditionalNodesToOutdatedSet(outdatedNodes); + } } PendingBucketSpaceDbTransition::~PendingBucketSpaceDbTransition() @@ -67,10 +88,12 @@ PendingBucketSpaceDbTransition::insertInfo(BucketDatabase::Entry& info, const Ra std::vector<BucketCopy> copiesToAddOrUpdate( getCopiesThatAreNewOrAltered(info, range)); + const auto &dist(_distributorBucketSpace.getDistribution()); std::vector<uint16_t> order( - _clusterInfo->getIdealStorageNodesForState( + dist.getIdealStorageNodes( _newClusterState, - _entries[range.first].bucketId)); + _entries[range.first].bucketId, + _clusterInfo->getStorageUpStates())); info->addNodes(copiesToAddOrUpdate, order, TrustedUpdate::DEFER); LOG_BUCKET_OPERATION_NO_LOCK( @@ -192,8 +215,9 @@ PendingBucketSpaceDbTransition::addToBucketDB(BucketDatabase& db, const Range& r } void -PendingBucketSpaceDbTransition::mergeInto(BucketDatabase& db) +PendingBucketSpaceDbTransition::mergeIntoBucketDatabase() { + BucketDatabase &db(_distributorBucketSpace.getBucketDatabase()); std::sort(_entries.begin(), _entries.end()); db.forEach(*this); @@ -224,6 +248,170 @@ PendingBucketSpaceDbTransition::onRequestBucketInfoReply(const api::RequestBucke } } +bool +PendingBucketSpaceDbTransition::distributorChanged() +{ + const auto &oldState(_prevClusterState); + const auto &newState(_newClusterState); + if (newState.getDistributionBitCount() != oldState.getDistributionBitCount()) { + return true; + } + + Node myNode(NodeType::DISTRIBUTOR, _distributorIndex); + if (oldState.getNodeState(myNode).getState() == lib::State::DOWN) { + return true; + } + + uint16_t oldCount = oldState.getNodeCount(NodeType::DISTRIBUTOR); + uint16_t newCount = newState.getNodeCount(NodeType::DISTRIBUTOR); + + uint16_t maxCount = std::max(oldCount, newCount); + + for (uint16_t i = 0; i < maxCount; ++i) { + Node node(NodeType::DISTRIBUTOR, i); + + const lib::State& old(oldState.getNodeState(node).getState()); + const lib::State& nw(newState.getNodeState(node).getState()); + + if (nodeWasUpButNowIsDown(old, nw)) { + if (nodeInSameGroupAsSelf(i) || + nodeNeedsOwnershipTransferFromGroupDown(i, newState)) { + return true; + } + } + } + + return false; +} + +bool +PendingBucketSpaceDbTransition::nodeWasUpButNowIsDown(const lib::State& old, + const lib::State& nw) +{ + return (old.oneOf("uimr") && !nw.oneOf("uimr")); +} + +bool +PendingBucketSpaceDbTransition::nodeInSameGroupAsSelf(uint16_t index) const +{ + const auto &dist(_distributorBucketSpace.getDistribution()); + if (dist.getNodeGraph().getGroupForNode(index) == + dist.getNodeGraph().getGroupForNode(_distributorIndex)) { + LOG(debug, + "Distributor %d state changed, need to request data from all " + "storage nodes", + index); + return true; + } else { + LOG(debug, + "Distributor %d state changed but unrelated to my group.", + index); + return false; + } +} + +bool +PendingBucketSpaceDbTransition::nodeNeedsOwnershipTransferFromGroupDown( + uint16_t nodeIndex, + const lib::ClusterState& state) const +{ + const auto &dist(_distributorBucketSpace.getDistribution()); + if (!dist.distributorAutoOwnershipTransferOnWholeGroupDown()) { + return false; // Not doing anything for downed groups. + } + const lib::Group* group(dist.getNodeGraph().getGroupForNode(nodeIndex)); + // If there is no group information associated with the node (because the + // group has changed or the node has been removed from config), we must + // also invoke ownership transfer of buckets. + if (group == nullptr + || lib::Distribution::allDistributorsDown(*group, state)) + { + LOG(debug, + "Distributor %u state changed and is in a " + "group that now has no distributors remaining", + nodeIndex); + return true; + } + return false; +} + +uint16_t +PendingBucketSpaceDbTransition::newStateStorageNodeCount() const +{ + return _newClusterState.getNodeCount(lib::NodeType::STORAGE); +} + +bool +PendingBucketSpaceDbTransition::storageNodeMayHaveLostData(uint16_t index) +{ + Node node(NodeType::STORAGE, index); + NodeState newState = _newClusterState.getNodeState(node); + NodeState oldState = _prevClusterState.getNodeState(node); + + return (newState.getStartTimestamp() > oldState.getStartTimestamp()); +} + +void +PendingBucketSpaceDbTransition::updateSetOfNodesThatAreOutdated() +{ + const uint16_t nodeCount(newStateStorageNodeCount()); + for (uint16_t index = 0; index < nodeCount; ++index) { + if (storageNodeMayHaveLostData(index) || storageNodeChanged(index)) { + _outdatedNodes.insert(index); + } + } +} + +bool +PendingBucketSpaceDbTransition::storageNodeChanged(uint16_t index) { + Node node(NodeType::STORAGE, index); + NodeState newState = _newClusterState.getNodeState(node); + NodeState oldNodeState = _prevClusterState.getNodeState(node); + + // similarTo() also covers disk states. + if (!(oldNodeState.similarTo(newState))) { + LOG(debug, + "State for storage node %d has changed from '%s' to '%s', " + "updating bucket information", + index, + oldNodeState.toString().c_str(), + newState.toString().c_str()); + return true; + } + + return false; +} + +bool +PendingBucketSpaceDbTransition::storageNodeUpInNewState(uint16_t node) const +{ + return _newClusterState.getNodeState(Node(NodeType::STORAGE, node)) + .getState().oneOf(_clusterInfo->getStorageUpStates()); +} + +void +PendingBucketSpaceDbTransition::markAllAvailableNodesAsRequiringRequest() +{ + const uint16_t nodeCount(newStateStorageNodeCount()); + for (uint16_t i = 0; i < nodeCount; ++i) { + if (storageNodeUpInNewState(i)) { + _outdatedNodes.insert(i); + } + } +} + +void +PendingBucketSpaceDbTransition::addAdditionalNodesToOutdatedSet( + const std::unordered_set<uint16_t>& nodes) +{ + const uint16_t nodeCount(newStateStorageNodeCount()); + for (uint16_t node : nodes) { + if (node < nodeCount) { + _outdatedNodes.insert(node); + } + } +} + void PendingBucketSpaceDbTransition::addNodeInfo(const document::BucketId& id, const BucketCopy& copy) { diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h index 649ce676d88..903f9b762fb 100644 --- a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h +++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h @@ -2,16 +2,17 @@ #pragma once #include "pending_bucket_space_db_transition_entry.h" +#include "outdated_nodes.h" #include <vespa/storage/bucketdb/bucketdatabase.h> -#include <unordered_set> namespace storage::api { class RequestBucketInfoReply; } -namespace storage::lib { class ClusterState; } +namespace storage::lib { class ClusterState; class State; } namespace storage::distributor { class ClusterInformation; class PendingClusterState; +class DistributorBucketSpace; /** * Class used by PendingClusterState to track request bucket info @@ -23,6 +24,7 @@ class PendingBucketSpaceDbTransition : public BucketDatabase::MutableEntryProces public: using Entry = dbtransition::Entry; using EntryList = std::vector<Entry>; + using OutdatedNodes = dbtransition::OutdatedNodes; private: using Range = std::pair<uint32_t, uint32_t>; @@ -37,11 +39,15 @@ private: // cluster state was constructed. // May be a superset of _requestedNodes, as some nodes that are outdated // may be down and thus cannot get a request. - const std::unordered_set<uint16_t> _outdatedNodes; + OutdatedNodes _outdatedNodes; + const lib::ClusterState &_prevClusterState; const lib::ClusterState &_newClusterState; const api::Timestamp _creationTimestamp; const PendingClusterState &_pendingClusterState; + DistributorBucketSpace &_distributorBucketSpace; + uint16_t _distributorIndex; + bool _bucketOwnershipTransfer; // BucketDataBase::MutableEntryProcessor API bool process(BucketDatabase::Entry& e) override; @@ -71,19 +77,37 @@ private: bool bucketInfoIteratorPointsToBucket(const document::BucketId& bucketId) const; std::string requestNodesToString(); + bool distributorChanged(); + static bool nodeWasUpButNowIsDown(const lib::State &old, const lib::State &nw); + bool storageNodeUpInNewState(uint16_t node) const; + bool nodeInSameGroupAsSelf(uint16_t index) const; + bool nodeNeedsOwnershipTransferFromGroupDown(uint16_t nodeIndex, const lib::ClusterState& state) const; + uint16_t newStateStorageNodeCount() const; + bool storageNodeMayHaveLostData(uint16_t index); + bool storageNodeChanged(uint16_t index); + void markAllAvailableNodesAsRequiringRequest(); + void addAdditionalNodesToOutdatedSet(const OutdatedNodes &nodes); + void updateSetOfNodesThatAreOutdated(); + public: PendingBucketSpaceDbTransition(const PendingClusterState &pendingClusterState, + DistributorBucketSpace &distributorBucketSpace, + bool distributionChanged, + const OutdatedNodes &outdatedNodes, std::shared_ptr<const ClusterInformation> clusterInfo, const lib::ClusterState &newClusterState, api::Timestamp creationTimestamp); ~PendingBucketSpaceDbTransition(); - // Merges all the results with the given bucket database. - void mergeInto(BucketDatabase& db); + // Merges all the results with the corresponding bucket database. + void mergeIntoBucketDatabase(); // Adds the info from the reply to our list of information. void onRequestBucketInfoReply(const api::RequestBucketInfoReply &reply, uint16_t node); + const OutdatedNodes &getOutdatedNodes() { return _outdatedNodes; } + bool getBucketOwnershipTransfer() const { return _bucketOwnershipTransfer; } + // Methods used by unit tests. const EntryList& results() const { return _entries; } void addNodeInfo(const document::BucketId& id, const BucketCopy& copy); diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp index 1186cc23c5f..daa85822264 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp @@ -3,6 +3,7 @@ #include "pendingclusterstate.h" #include "pending_bucket_space_db_transition.h" #include "bucketdbupdater.h" +#include "distributor_bucket_space_repo.h" #include <vespa/storageframework/defaultimplementation/clock/realclock.h> #include <vespa/storage/common/bucketoperationlogger.h> #include <vespa/vespalib/util/xmlstream.hpp> @@ -23,60 +24,70 @@ PendingClusterState::PendingClusterState( const framework::Clock& clock, const ClusterInformation::CSP& clusterInfo, DistributorMessageSender& sender, + DistributorBucketSpaceRepo &bucketSpaceRepo, const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd, - const std::unordered_set<uint16_t>& outdatedNodes, + const OutdatedNodesMap &outdatedNodesMap, api::Timestamp creationTimestamp) : _cmd(newStateCmd), _requestedNodes(newStateCmd->getSystemState().getNodeCount(lib::NodeType::STORAGE)), - _outdatedNodes(newStateCmd->getSystemState().getNodeCount(lib::NodeType::STORAGE)), _prevClusterState(clusterInfo->getClusterState()), _newClusterState(newStateCmd->getSystemState()), _clock(clock), _clusterInfo(clusterInfo), _creationTimestamp(creationTimestamp), _sender(sender), - _bucketOwnershipTransfer(distributorChanged(_prevClusterState, _newClusterState)), - _pendingTransition() + _bucketSpaceRepo(bucketSpaceRepo), + _bucketOwnershipTransfer(false), + _pendingTransitions() { logConstructionInformation(); - if (hasBucketOwnershipTransfer()) { - markAllAvailableNodesAsRequiringRequest(); - } else { - updateSetOfNodesThatAreOutdated(); - addAdditionalNodesToOutdatedSet(outdatedNodes); - } - _pendingTransition = std::make_unique<PendingBucketSpaceDbTransition>(*this, _clusterInfo, _newClusterState, _creationTimestamp); - if (shouldRequestBucketInfo()) { - requestNodes(); - } + initializeBucketSpaceTransitions(false, outdatedNodesMap); } PendingClusterState::PendingClusterState( const framework::Clock& clock, const ClusterInformation::CSP& clusterInfo, DistributorMessageSender& sender, + DistributorBucketSpaceRepo &bucketSpaceRepo, api::Timestamp creationTimestamp) : _requestedNodes(clusterInfo->getStorageNodeCount()), - _outdatedNodes(clusterInfo->getStorageNodeCount()), _prevClusterState(clusterInfo->getClusterState()), _newClusterState(clusterInfo->getClusterState()), _clock(clock), _clusterInfo(clusterInfo), _creationTimestamp(creationTimestamp), _sender(sender), + _bucketSpaceRepo(bucketSpaceRepo), _bucketOwnershipTransfer(true), - _pendingTransition() + _pendingTransitions() { logConstructionInformation(); - markAllAvailableNodesAsRequiringRequest(); - _pendingTransition = std::make_unique<PendingBucketSpaceDbTransition>(*this, _clusterInfo, _newClusterState, _creationTimestamp); + initializeBucketSpaceTransitions(true, OutdatedNodesMap()); +} + +PendingClusterState::~PendingClusterState() {} + +void +PendingClusterState::initializeBucketSpaceTransitions(bool distributionChanged, const OutdatedNodesMap &outdatedNodesMap) +{ + OutdatedNodes emptyOutdatedNodes; + for (auto &elem : _bucketSpaceRepo) { + auto onItr = outdatedNodesMap.find(elem.first); + const auto &outdatedNodes = (onItr == outdatedNodesMap.end()) ? emptyOutdatedNodes : onItr->second; + auto pendingTransition = + std::make_unique<PendingBucketSpaceDbTransition> + (*this, *elem.second, distributionChanged, outdatedNodes, + _clusterInfo, _newClusterState, _creationTimestamp); + if (pendingTransition->getBucketOwnershipTransfer()) { + _bucketOwnershipTransfer = true; + } + _pendingTransitions.emplace(elem.first, std::move(pendingTransition)); + } if (shouldRequestBucketInfo()) { requestNodes(); } } -PendingClusterState::~PendingClusterState() {} - void PendingClusterState::logConstructionInformation() const { @@ -96,33 +107,14 @@ PendingClusterState::storageNodeUpInNewState(uint16_t node) const .getState().oneOf(_clusterInfo->getStorageUpStates()); } -void -PendingClusterState::markAllAvailableNodesAsRequiringRequest() +PendingClusterState::OutdatedNodesMap +PendingClusterState::getOutdatedNodesMap() const { - const uint16_t nodeCount(newStateStorageNodeCount()); - for (uint16_t i = 0; i < nodeCount; ++i) { - if (storageNodeUpInNewState(i)) { - _outdatedNodes.insert(i); - } - } -} - -void -PendingClusterState::addAdditionalNodesToOutdatedSet( - const std::unordered_set<uint16_t>& nodes) -{ - const uint16_t nodeCount(newStateStorageNodeCount()); - for (uint16_t node : nodes) { - if (node < nodeCount) { - _outdatedNodes.insert(node); - } + OutdatedNodesMap outdatedNodesMap; + for (const auto &elem : _pendingTransitions) { + outdatedNodesMap.emplace(elem.first, elem.second->getOutdatedNodes()); } -} - -std::unordered_set<uint16_t> -PendingClusterState::getOutdatedNodeSet() const -{ - return _outdatedNodes; + return outdatedNodesMap; } uint16_t @@ -160,47 +152,6 @@ PendingClusterState::iAmDown() const return myState.getState() == lib::State::DOWN; } -bool -PendingClusterState::storageNodeMayHaveLostData(uint16_t index) -{ - Node node(NodeType::STORAGE, index); - NodeState newState = _newClusterState.getNodeState(node); - NodeState oldState = _prevClusterState.getNodeState(node); - - return (newState.getStartTimestamp() > oldState.getStartTimestamp()); -} - -void -PendingClusterState::updateSetOfNodesThatAreOutdated() -{ - const uint16_t nodeCount(newStateStorageNodeCount()); - for (uint16_t index = 0; index < nodeCount; ++index) { - if (storageNodeMayHaveLostData(index) || storageNodeChanged(index)) { - _outdatedNodes.insert(index); - } - } -} - -bool -PendingClusterState::storageNodeChanged(uint16_t index) { - Node node(NodeType::STORAGE, index); - NodeState newState = _newClusterState.getNodeState(node); - NodeState oldNodeState = _prevClusterState.getNodeState(node); - - // similarTo() also covers disk states. - if (!(oldNodeState.similarTo(newState))) { - LOG(debug, - "State for storage node %d has changed from '%s' to '%s', " - "updating bucket information", - index, - oldNodeState.toString().c_str(), - newState.toString().c_str()); - return true; - } - - return false; -} - void PendingClusterState::requestNodes() { @@ -215,116 +166,31 @@ PendingClusterState::requestNodes() void PendingClusterState::requestBucketInfoFromStorageNodesWithChangedState() { - for (uint16_t idx : _outdatedNodes) { - if (storageNodeUpInNewState(idx)) { - requestNode(idx); - } - } -} - -bool -PendingClusterState::distributorChanged( - const lib::ClusterState& oldState, - const lib::ClusterState& newState) -{ - if (newState.getDistributionBitCount() != - oldState.getDistributionBitCount()) - { - return true; - } - - Node myNode(NodeType::DISTRIBUTOR, _sender.getDistributorIndex()); - if (oldState.getNodeState(myNode).getState() == - lib::State::DOWN) - { - return true; - } - - uint16_t oldCount = oldState.getNodeCount(NodeType::DISTRIBUTOR); - uint16_t newCount = newState.getNodeCount(NodeType::DISTRIBUTOR); - - uint16_t maxCount = std::max(oldCount, newCount); - - for (uint16_t i = 0; i < maxCount; ++i) { - Node node(NodeType::DISTRIBUTOR, i); - - const lib::State& old(oldState.getNodeState(node).getState()); - const lib::State& nw(newState.getNodeState(node).getState()); - - if (nodeWasUpButNowIsDown(old, nw)) { - if (nodeInSameGroupAsSelf(i) || - nodeNeedsOwnershipTransferFromGroupDown(i, newState)) { - return true; + for (auto &elem : _pendingTransitions) { + const OutdatedNodes &outdatedNodes(elem.second->getOutdatedNodes()); + for (uint16_t idx : outdatedNodes) { + if (storageNodeUpInNewState(idx)) { + requestNode(BucketSpaceAndNode(elem.first, idx)); } } } - - return false; -} - -bool -PendingClusterState::nodeWasUpButNowIsDown(const lib::State& old, - const lib::State& nw) const -{ - return (old.oneOf("uimr") && !nw.oneOf("uimr")); -} - -bool -PendingClusterState::nodeInSameGroupAsSelf(uint16_t index) const -{ - if (_clusterInfo->nodeInSameGroupAsSelf(index)) { - LOG(debug, - "Distributor %d state changed, need to request data from all " - "storage nodes", - index); - return true; - } else { - LOG(debug, - "Distributor %d state changed but unrelated to my group.", - index); - return false; - } -} - -bool -PendingClusterState::nodeNeedsOwnershipTransferFromGroupDown( - uint16_t nodeIndex, - const lib::ClusterState& state) const -{ - const lib::Distribution& dist(_clusterInfo->getDistribution()); - if (!dist.distributorAutoOwnershipTransferOnWholeGroupDown()) { - return false; // Not doing anything for downed groups. - } - const lib::Group* group(dist.getNodeGraph().getGroupForNode(nodeIndex)); - // If there is no group information associated with the node (because the - // group has changed or the node has been removed from config), we must - // also invoke ownership transfer of buckets. - if (group == nullptr - || lib::Distribution::allDistributorsDown(*group, state)) - { - LOG(debug, - "Distributor %u state changed and is in a " - "group that now has no distributors remaining", - nodeIndex); - return true; - } - return false; } void -PendingClusterState::requestNode(uint16_t node) +PendingClusterState::requestNode(BucketSpaceAndNode bucketSpaceAndNode) { vespalib::string distributionHash(_clusterInfo->getDistributionHash()); LOG(debug, - "Requesting bucket info for node %d with cluster state '%s' " + "Requesting bucket info for bucket space %" PRIu64 " node %d with cluster state '%s' " "and distribution hash '%s'", - node, + bucketSpaceAndNode.bucketSpace.getId(), + bucketSpaceAndNode.node, _newClusterState.toString().c_str(), distributionHash.c_str()); std::shared_ptr<api::RequestBucketInfoCommand> cmd( new api::RequestBucketInfoCommand( - BucketSpace::placeHolder(), + bucketSpaceAndNode.bucketSpace, _sender.getDistributorIndex(), _newClusterState, distributionHash)); @@ -332,9 +198,9 @@ PendingClusterState::requestNode(uint16_t node) cmd->setPriority(api::StorageMessage::HIGH); cmd->setTimeout(INT_MAX); - _sentMessages[cmd->getMsgId()] = node; + _sentMessages.emplace(cmd->getMsgId(), bucketSpaceAndNode); - _sender.sendToNode(NodeType::STORAGE, node, cmd); + _sender.sendToNode(NodeType::STORAGE, bucketSpaceAndNode.node, cmd); } @@ -358,18 +224,20 @@ PendingClusterState::onRequestBucketInfoReply(const std::shared_ptr<api::Request if (iter == _sentMessages.end()) { return false; } - const uint16_t node = iter->second; + const BucketSpaceAndNode bucketSpaceAndNode = iter->second; if (!reply->getResult().success()) { framework::MilliSecTime resendTime(_clock); resendTime += framework::MilliSecTime(100); - _delayedRequests.push_back(std::make_pair(resendTime, node)); + _delayedRequests.emplace_back(resendTime, bucketSpaceAndNode); _sentMessages.erase(iter); return true; } - setNodeReplied(node); - _pendingTransition->onRequestBucketInfoReply(*reply, node); + setNodeReplied(bucketSpaceAndNode.node); + auto transitionIter = _pendingTransitions.find(bucketSpaceAndNode.bucketSpace); + assert(transitionIter != _pendingTransitions.end()); + transitionIter->second->onRequestBucketInfoReply(*reply, bucketSpaceAndNode.node); _sentMessages.erase(iter); return true; @@ -403,9 +271,11 @@ PendingClusterState::requestNodesToString() const } void -PendingClusterState::mergeInto(BucketDatabase& db) +PendingClusterState::mergeIntoBucketDatabases() { - _pendingTransition->mergeInto(db); + for (auto &elem : _pendingTransitions) { + elem.second->mergeIntoBucketDatabase(); + } } void @@ -414,11 +284,9 @@ PendingClusterState::printXml(vespalib::XmlOutputStream& xos) const using namespace vespalib::xml; xos << XmlTag("systemstate_pending") << XmlAttribute("state", _newClusterState); - for (std::map<uint64_t, uint16_t>::const_iterator iter - = _sentMessages.begin(); iter != _sentMessages.end(); ++iter) - { + for (auto &elem : _sentMessages) { xos << XmlTag("pending") - << XmlAttribute("node", iter->second) + << XmlAttribute("node", elem.second.node) << XmlEndTag(); } xos << XmlEndTag(); @@ -432,16 +300,12 @@ PendingClusterState::getSummary() const (_clock.getTimeInMicros().getTime() - _creationTimestamp)); } -const PendingBucketSpaceDbTransition::EntryList & -PendingClusterState::results() const -{ - return _pendingTransition->results(); -} - -void -PendingClusterState::addNodeInfo(const document::BucketId& id, const BucketCopy& copy) +PendingBucketSpaceDbTransition & +PendingClusterState::getPendingBucketSpaceDbTransition(document::BucketSpace bucketSpace) { - _pendingTransition->addNodeInfo(id, copy); + auto transitionIter = _pendingTransitions.find(bucketSpace); + assert(transitionIter != _pendingTransitions.end()); + return *transitionIter->second; } } diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.h b/storage/src/vespa/storage/distributor/pendingclusterstate.h index b742bd3bf46..7a23d48c9fd 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.h +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.h @@ -9,15 +9,15 @@ #include <vespa/storageframework/generic/clock/clock.h> #include <vespa/vdslib/distribution/distribution.h> #include <vespa/vespalib/util/xmlserializable.h> -#include <unordered_set> +#include "outdated_nodes_map.h" +#include <unordered_map> #include <deque> -namespace storage { class BucketDatabase; } - namespace storage::distributor { class DistributorMessageSender; class PendingBucketSpaceDbTransition; +class DistributorBucketSpaceRepo; /** * Class used by BucketDBUpdater to track request bucket info @@ -25,6 +25,8 @@ class PendingBucketSpaceDbTransition; */ class PendingClusterState : public vespalib::XmlSerializable { public: + using OutdatedNodes = dbtransition::OutdatedNodes; + using OutdatedNodesMap = dbtransition::OutdatedNodesMap; struct Summary { Summary(const std::string& prevClusterState, const std::string& newClusterState, uint32_t processingTime); Summary(const Summary &); @@ -42,13 +44,14 @@ public: const framework::Clock& clock, const ClusterInformation::CSP& clusterInfo, DistributorMessageSender& sender, + DistributorBucketSpaceRepo &bucketSpaceRepo, const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd, - const std::unordered_set<uint16_t>& outdatedNodes, + const OutdatedNodesMap &outdatedNodesMap, api::Timestamp creationTimestamp) { return std::unique_ptr<PendingClusterState>( - new PendingClusterState(clock, clusterInfo, sender, newStateCmd, - outdatedNodes, + new PendingClusterState(clock, clusterInfo, sender, bucketSpaceRepo, newStateCmd, + outdatedNodesMap, creationTimestamp)); } @@ -60,10 +63,11 @@ public: const framework::Clock& clock, const ClusterInformation::CSP& clusterInfo, DistributorMessageSender& sender, + DistributorBucketSpaceRepo &bucketSpaceRepo, api::Timestamp creationTimestamp) { return std::unique_ptr<PendingClusterState>( - new PendingClusterState(clock, clusterInfo, sender, creationTimestamp)); + new PendingClusterState(clock, clusterInfo, sender, bucketSpaceRepo, creationTimestamp)); } PendingClusterState(const PendingClusterState &) = delete; @@ -77,8 +81,8 @@ public: bool onRequestBucketInfoReply(const std::shared_ptr<api::RequestBucketInfoReply>& reply); /** - * Tags the given node as having replied to the - * request bucket info command. + * Tags the given node as having replied to at least one of the + * request bucket info commands. Only used for debug logging. */ void setNodeReplied(uint16_t nodeIdx) { _requestedNodes[nodeIdx] = true; @@ -120,23 +124,15 @@ public: * state was constructed for a distribution config change, this set will * be equal to the set of all available storage nodes. */ - std::unordered_set<uint16_t> getOutdatedNodeSet() const; + OutdatedNodesMap getOutdatedNodesMap() const; /** - * Merges all the results with the given bucket database. + * Merges all the results with the corresponding bucket databases. */ - void mergeInto(BucketDatabase& db); - // Get our list of information. Only used by unit test. - const std::vector<dbtransition::Entry>& results() const; - // Adds info from a node to our list of information. Only used by unit test. - void addNodeInfo(const document::BucketId& id, const BucketCopy& copy); - + void mergeIntoBucketDatabases(); + // Get pending transition for a specific bucket space. Only used by unit test. + PendingBucketSpaceDbTransition &getPendingBucketSpaceDbTransition(document::BucketSpace bucketSpace); - /** - * Returns true if this pending state was due to a distribution bit - * change rather than an actual state change. - */ - bool distributionChange() const { return _distributionChange; } void printXml(vespalib::XmlOutputStream&) const override; Summary getSummary() const; std::string requestNodesToString() const; @@ -150,8 +146,9 @@ private: const framework::Clock&, const ClusterInformation::CSP& clusterInfo, DistributorMessageSender& sender, + DistributorBucketSpaceRepo &bucketSpaceRepo, const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd, - const std::unordered_set<uint16_t>& outdatedNodes, + const OutdatedNodesMap &outdatedNodesMap, api::Timestamp creationTimestamp); /** @@ -162,16 +159,23 @@ private: const framework::Clock&, const ClusterInformation::CSP& clusterInfo, DistributorMessageSender& sender, + DistributorBucketSpaceRepo &bucketSpaceRepo, api::Timestamp creationTimestamp); + struct BucketSpaceAndNode { + document::BucketSpace bucketSpace; + uint16_t node; + BucketSpaceAndNode(document::BucketSpace bucketSpace_, + uint16_t node_) + : bucketSpace(bucketSpace_), + node(node_) + { + } + }; + + void initializeBucketSpaceTransitions(bool distributionChanged, const OutdatedNodesMap &outdatedNodesMap); void logConstructionInformation() const; - void requestNode(uint16_t node); - bool distributorChanged(const lib::ClusterState& oldState, const lib::ClusterState& newState); - bool storageNodeMayHaveLostData(uint16_t index); - bool storageNodeChanged(uint16_t index); - void markAllAvailableNodesAsRequiringRequest(); - void addAdditionalNodesToOutdatedSet(const std::unordered_set<uint16_t>& nodes); - void updateSetOfNodesThatAreOutdated(); + void requestNode(BucketSpaceAndNode bucketSpaceAndNode); void requestNodes(); void requestBucketInfoFromStorageNodesWithChangedState(); @@ -182,24 +186,14 @@ private: bool shouldRequestBucketInfo() const; bool clusterIsDown() const; bool iAmDown() const; - bool nodeInSameGroupAsSelf(uint16_t index) const; - bool nodeNeedsOwnershipTransferFromGroupDown(uint16_t nodeIndex, const lib::ClusterState& state) const; - bool nodeWasUpButNowIsDown(const lib::State& old, const lib::State& nw) const; bool storageNodeUpInNewState(uint16_t node) const; std::shared_ptr<api::SetSystemStateCommand> _cmd; - std::map<uint64_t, uint16_t> _sentMessages; + std::map<uint64_t, BucketSpaceAndNode> _sentMessages; std::vector<bool> _requestedNodes; - std::deque<std::pair<framework::MilliSecTime, uint16_t> > _delayedRequests; - - // Set for all nodes that may have changed state since that previous - // active cluster state, or that were marked as outdated when the pending - // cluster state was constructed. - // May be a superset of _requestedNodes, as some nodes that are outdated - // may be down and thus cannot get a request. - std::unordered_set<uint16_t> _outdatedNodes; + std::deque<std::pair<framework::MilliSecTime, BucketSpaceAndNode> > _delayedRequests; lib::ClusterState _prevClusterState; lib::ClusterState _newClusterState; @@ -209,10 +203,10 @@ private: api::Timestamp _creationTimestamp; DistributorMessageSender& _sender; + DistributorBucketSpaceRepo &_bucketSpaceRepo; - bool _distributionChange; bool _bucketOwnershipTransfer; - std::unique_ptr<PendingBucketSpaceDbTransition> _pendingTransition; + std::unordered_map<document::BucketSpace, std::unique_ptr<PendingBucketSpaceDbTransition>, document::BucketSpace::hash> _pendingTransitions; }; } diff --git a/vespabase/src/rhel-prestart.sh b/vespabase/src/rhel-prestart.sh index 89be9fec2be..fa7b9a1fe49 100755 --- a/vespabase/src/rhel-prestart.sh +++ b/vespabase/src/rhel-prestart.sh @@ -89,12 +89,11 @@ fixdir ${VESPA_USER} wheel 755 libexec/vespa/plugins/qrs fixdir ${VESPA_USER} wheel 755 logs/vespa/configserver fixdir ${VESPA_USER} wheel 755 logs/vespa/qrs fixdir ${VESPA_USER} wheel 755 logs/vespa/search -fixdir ${VESPA_USER} wheel 755 var/cache/vespa/config fixdir ${VESPA_USER} wheel 755 var/db/vespa fixdir ${VESPA_USER} wheel 755 var/db/vespa/tmp fixdir ${VESPA_USER} wheel 755 var/db/vespa/config_server fixdir ${VESPA_USER} wheel 755 var/db/vespa/config_server/serverdb -fixdir ${VESPA_USER} wheel 755 var/db/vespa/config_server/serverdb/applications +fixdir ${VESPA_USER} wheel 755 var/db/vespa/config_server/serverdb/tenants fixdir ${VESPA_USER} wheel 755 var/db/vespa/index fixdir ${VESPA_USER} wheel 755 var/db/vespa/logcontrol fixdir ${VESPA_USER} wheel 755 var/db/vespa/search @@ -102,7 +101,6 @@ fixdir ${VESPA_USER} wheel 755 var/jdisc_core fixdir ${VESPA_USER} wheel 755 var/vespa/bundlecache fixdir ${VESPA_USER} wheel 755 var/vespa/bundlecache/configserver fixdir ${VESPA_USER} wheel 755 var/vespa/cache/config/ -fixdir ${VESPA_USER} wheel 775 libexec/vespa/modelplugins chown -hR ${VESPA_USER} logs/vespa chown -hR ${VESPA_USER} var/db/vespa |