diff options
58 files changed, 1313 insertions, 520 deletions
diff --git a/athenz-identity-provider-service/pom.xml b/athenz-identity-provider-service/pom.xml index 26e24be526c..260836af892 100644 --- a/athenz-identity-provider-service/pom.xml +++ b/athenz-identity-provider-service/pom.xml @@ -102,6 +102,12 @@ <scope>test</scope> </dependency> <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>testutil</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.4.1</version> diff --git a/athenz-identity-provider-service/src/main/java/com/yahoo/vespa/hosted/athenz/instanceproviderservice/AthenzInstanceProviderService.java b/athenz-identity-provider-service/src/main/java/com/yahoo/vespa/hosted/athenz/instanceproviderservice/AthenzInstanceProviderService.java index 26a88896fb9..8ac26938633 100644 --- a/athenz-identity-provider-service/src/main/java/com/yahoo/vespa/hosted/athenz/instanceproviderservice/AthenzInstanceProviderService.java +++ b/athenz-identity-provider-service/src/main/java/com/yahoo/vespa/hosted/athenz/instanceproviderservice/AthenzInstanceProviderService.java @@ -8,6 +8,9 @@ import com.yahoo.config.provision.SystemName; import com.yahoo.config.provision.Zone; import com.yahoo.jdisc.http.SecretStore; import com.yahoo.log.LogLevel; +import com.yahoo.net.HostName; +import com.yahoo.vespa.hosted.athenz.instanceproviderservice.ca.CertificateSigner; +import com.yahoo.vespa.hosted.athenz.instanceproviderservice.ca.CertificateSignerServlet; import com.yahoo.vespa.hosted.athenz.instanceproviderservice.config.AthenzProviderServiceConfig; import com.yahoo.vespa.hosted.athenz.instanceproviderservice.impl.AthenzCertificateClient; import com.yahoo.vespa.hosted.athenz.instanceproviderservice.impl.CertificateClient; @@ -64,6 +67,7 @@ public class AthenzInstanceProviderService extends AbstractComponent { CertificateClient certificateClient, SslContextFactory sslContextFactory) { this(config, scheduler, zone, sslContextFactory, + new CertificateSigner(keyProvider, getZoneConfig(config, zone), HostName.getLocalhost()), new InstanceValidator(keyProvider, superModelProvider), new IdentityDocumentGenerator(config, getZoneConfig(config, zone), nodeRepository, zone, keyProvider), new AthenzCertificateUpdater( @@ -74,13 +78,15 @@ public class AthenzInstanceProviderService extends AbstractComponent { ScheduledExecutorService scheduler, Zone zone, SslContextFactory sslContextFactory, + CertificateSigner certificateSigner, InstanceValidator instanceValidator, IdentityDocumentGenerator identityDocumentGenerator, AthenzCertificateUpdater reloader) { // TODO: Enable for all systems. Currently enabled for CD system only if (SystemName.cd.equals(zone.system())) { this.scheduler = scheduler; - this.jetty = createJettyServer(config, sslContextFactory, instanceValidator, identityDocumentGenerator); + this.jetty = createJettyServer(config, sslContextFactory, + certificateSigner, instanceValidator, identityDocumentGenerator); // TODO Configurable update frequency scheduler.scheduleAtFixedRate(reloader, 0, 1, TimeUnit.DAYS); @@ -97,6 +103,7 @@ public class AthenzInstanceProviderService extends AbstractComponent { private static Server createJettyServer(AthenzProviderServiceConfig config, SslContextFactory sslContextFactory, + CertificateSigner certificateSigner, InstanceValidator instanceValidator, IdentityDocumentGenerator identityDocumentGenerator) { Server server = new Server(); @@ -105,6 +112,10 @@ public class AthenzInstanceProviderService extends AbstractComponent { server.addConnector(connector); ServletHandler handler = new ServletHandler(); + + CertificateSignerServlet certificateSignerServlet = new CertificateSignerServlet(certificateSigner); + handler.addServletWithMapping(new ServletHolder(certificateSignerServlet), config.apiPath() + "/sign"); + InstanceConfirmationServlet instanceConfirmationServlet = new InstanceConfirmationServlet(instanceValidator); handler.addServletWithMapping(new ServletHolder(instanceConfirmationServlet), config.apiPath() + "/instance"); diff --git a/athenz-identity-provider-service/src/main/java/com/yahoo/vespa/hosted/athenz/instanceproviderservice/ca/CertificateSigner.java b/athenz-identity-provider-service/src/main/java/com/yahoo/vespa/hosted/athenz/instanceproviderservice/ca/CertificateSigner.java new file mode 100644 index 00000000000..2e00695f2f0 --- /dev/null +++ b/athenz-identity-provider-service/src/main/java/com/yahoo/vespa/hosted/athenz/instanceproviderservice/ca/CertificateSigner.java @@ -0,0 +1,141 @@ +package com.yahoo.vespa.hosted.athenz.instanceproviderservice.ca; + +import com.google.common.collect.ImmutableList; +import com.yahoo.log.LogLevel; +import com.yahoo.vespa.hosted.athenz.instanceproviderservice.config.AthenzProviderServiceConfig; +import com.yahoo.vespa.hosted.athenz.instanceproviderservice.impl.KeyProvider; +import org.bouncycastle.asn1.ASN1ObjectIdentifier; +import org.bouncycastle.asn1.DERUTF8String; +import org.bouncycastle.asn1.pkcs.PKCSObjectIdentifiers; +import org.bouncycastle.asn1.x500.AttributeTypeAndValue; +import org.bouncycastle.asn1.x500.X500Name; +import org.bouncycastle.asn1.x500.style.BCStyle; +import org.bouncycastle.asn1.x509.BasicConstraints; +import org.bouncycastle.asn1.x509.Extension; +import org.bouncycastle.asn1.x509.Extensions; +import org.bouncycastle.cert.X509v3CertificateBuilder; +import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter; +import org.bouncycastle.cert.jcajce.JcaX509v3CertificateBuilder; +import org.bouncycastle.jce.provider.BouncyCastleProvider; +import org.bouncycastle.operator.ContentSigner; +import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder; +import org.bouncycastle.pkcs.PKCS10CertificationRequest; +import org.bouncycastle.pkcs.jcajce.JcaPKCS10CertificationRequest; + +import java.math.BigInteger; +import java.security.PrivateKey; +import java.security.Provider; +import java.security.PublicKey; +import java.security.cert.X509Certificate; +import java.time.Clock; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.Enumeration; +import java.util.List; +import java.util.logging.Logger; +import java.util.stream.Collectors; +import java.util.stream.Stream; + + +/** + * Signs Certificate Signing Reqest from tenant nodes. This certificate will be used + * by nodes to authenticate themselves when performing operations against the config + * server, such as updating node-repository or orchestrator. + * + * @author freva + */ +public class CertificateSigner { + + private static final Logger log = Logger.getLogger(CertificateSigner.class.getName()); + + static final String SIGNER_ALGORITHM = "SHA256withRSA"; + static final Duration CERTIFICATE_EXPIRATION = Duration.ofDays(30); + private static final List<ASN1ObjectIdentifier> ILLEGAL_EXTENSIONS = ImmutableList.of( + Extension.basicConstraints, Extension.subjectAlternativeName); + + private final JcaX509CertificateConverter certificateConverter = new JcaX509CertificateConverter(); + private final Provider provider = new BouncyCastleProvider(); + + private final PrivateKey caPrivateKey; + private final X500Name issuer; + private final Clock clock; + + public CertificateSigner(KeyProvider keyProvider, + AthenzProviderServiceConfig.Zones zoneConfig, + String configServerHostname) { + this(keyProvider.getPrivateKey(zoneConfig.secretVersion()), configServerHostname, Clock.systemUTC()); + } + + CertificateSigner(PrivateKey caPrivateKey, String configServerHostname, Clock clock) { + this.caPrivateKey = caPrivateKey; + this.issuer = new X500Name("CN=" + configServerHostname); + this.clock = clock; + } + + /** + * Signs the CSR if: + * <ul> + * <li>Common Name matches {@code remoteHostname}</li> + * <li>CSR does not contain any any of the extensions in {@code ILLEGAL_EXTENSIONS}</li> + * </ul> + */ + X509Certificate generateX509Certificate(PKCS10CertificationRequest certReq, String remoteHostname) { + verifyCertificateCommonName(certReq.getSubject(), remoteHostname); + verifyCertificateExtensions(certReq); + + Date notBefore = Date.from(clock.instant()); + Date notAfter = Date.from(clock.instant().plus(CERTIFICATE_EXPIRATION)); + + try { + PublicKey publicKey = new JcaPKCS10CertificationRequest(certReq).getPublicKey(); + X509v3CertificateBuilder caBuilder = new JcaX509v3CertificateBuilder( + issuer, BigInteger.valueOf(clock.millis()), notBefore, notAfter, certReq.getSubject(), publicKey) + + // Set Basic Constraints to false + .addExtension(Extension.basicConstraints, true, new BasicConstraints(false)); + + ContentSigner caSigner = new JcaContentSignerBuilder(SIGNER_ALGORITHM).build(caPrivateKey); + + return certificateConverter + .setProvider(provider) + .getCertificate(caBuilder.build(caSigner)); + } catch (Exception ex) { + log.log(LogLevel.ERROR, "Failed to generate X509 Certificate", ex); + throw new RuntimeException("Failed to generate X509 Certificate"); + } + } + + static void verifyCertificateCommonName(X500Name subject, String commonName) { + List<AttributeTypeAndValue> attributesAndValues = Arrays.stream(subject.getRDNs()) + .flatMap(rdn -> rdn.isMultiValued() ? + Stream.of(rdn.getTypesAndValues()) : Stream.of(rdn.getFirst())) + .filter(attr -> attr.getType() == BCStyle.CN) + .collect(Collectors.toList()); + + if (attributesAndValues.size() != 1) { + throw new IllegalArgumentException("Only 1 common name should be set"); + } + + String actualCommonName = DERUTF8String.getInstance(attributesAndValues.get(0).getValue()).getString(); + if (! actualCommonName.equals(commonName)) { + throw new IllegalArgumentException("Expected common name to be " + commonName + ", but was " + actualCommonName); + } + } + + @SuppressWarnings("unchecked") + static void verifyCertificateExtensions(PKCS10CertificationRequest request) { + List<String> illegalExt = Arrays + .stream(request.getAttributes(PKCSObjectIdentifiers.pkcs_9_at_extensionRequest)) + .map(attribute -> Extensions.getInstance(attribute.getAttrValues().getObjectAt(0))) + .flatMap(ext -> Collections.list((Enumeration<ASN1ObjectIdentifier>) ext.oids()).stream()) + .filter(ILLEGAL_EXTENSIONS::contains) + .map(ASN1ObjectIdentifier::getId) + .collect(Collectors.toList()); + + if (! illegalExt.isEmpty()) { + throw new IllegalArgumentException("CSR contains illegal extensions: " + String.join(", ", illegalExt)); + } + } +} diff --git a/athenz-identity-provider-service/src/main/java/com/yahoo/vespa/hosted/athenz/instanceproviderservice/ca/CertificateSignerServlet.java b/athenz-identity-provider-service/src/main/java/com/yahoo/vespa/hosted/athenz/instanceproviderservice/ca/CertificateSignerServlet.java new file mode 100644 index 00000000000..d2ebae394a2 --- /dev/null +++ b/athenz-identity-provider-service/src/main/java/com/yahoo/vespa/hosted/athenz/instanceproviderservice/ca/CertificateSignerServlet.java @@ -0,0 +1,50 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.hosted.athenz.instanceproviderservice.ca; + +import com.yahoo.log.LogLevel; +import com.yahoo.vespa.hosted.athenz.instanceproviderservice.ca.model.CertificateSerializedPayload; +import com.yahoo.vespa.hosted.athenz.instanceproviderservice.ca.model.CsrSerializedPayload; +import com.yahoo.vespa.hosted.athenz.instanceproviderservice.impl.Utils; +import org.bouncycastle.pkcs.PKCS10CertificationRequest; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.security.cert.X509Certificate; +import java.util.logging.Logger; + +/** + * @author freva + */ +public class CertificateSignerServlet extends HttpServlet { + + private static final Logger log = Logger.getLogger(CertificateSignerServlet.class.getName()); + + private final CertificateSigner certificateSigner; + + public CertificateSignerServlet(CertificateSigner certificateSigner) { + this.certificateSigner = certificateSigner; + } + + @Override + protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + try { + String remoteHostname = req.getRemoteHost(); + PKCS10CertificationRequest csr = Utils.getMapper().readValue(req.getReader(), CsrSerializedPayload.class).csr; + + log.log(LogLevel.DEBUG, "Certification request from " + remoteHostname + ": " + csr); + + X509Certificate certificate = certificateSigner.generateX509Certificate(csr, remoteHostname); + CertificateSerializedPayload certificateSerializedPayload = new CertificateSerializedPayload(certificate); + + resp.setStatus(HttpServletResponse.SC_OK); + resp.setContentType("application/json"); + resp.getWriter().write(Utils.getMapper().writeValueAsString(certificateSerializedPayload)); + } catch (RuntimeException e) { + log.log(LogLevel.ERROR, e.getMessage(), e); + resp.sendError(HttpServletResponse.SC_BAD_REQUEST, e.getMessage()); + } + } +} diff --git a/athenz-identity-provider-service/src/main/java/com/yahoo/vespa/hosted/athenz/instanceproviderservice/ca/model/CertificateSerializedPayload.java b/athenz-identity-provider-service/src/main/java/com/yahoo/vespa/hosted/athenz/instanceproviderservice/ca/model/CertificateSerializedPayload.java new file mode 100644 index 00000000000..2fd34741da7 --- /dev/null +++ b/athenz-identity-provider-service/src/main/java/com/yahoo/vespa/hosted/athenz/instanceproviderservice/ca/model/CertificateSerializedPayload.java @@ -0,0 +1,68 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.hosted.athenz.instanceproviderservice.ca.model; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.bouncycastle.openssl.jcajce.JcaPEMWriter; +import org.bouncycastle.util.io.pem.PemObject; + +import java.io.IOException; +import java.io.StringWriter; +import java.security.cert.CertificateEncodingException; +import java.security.cert.X509Certificate; + +/** + * Contains PEM formatted signed certificate + * + * @author freva + */ +public class CertificateSerializedPayload { + + @JsonProperty("certificate") @JsonSerialize(using = CertificateSerializer.class) + public final X509Certificate certificate; + + @JsonCreator + public CertificateSerializedPayload(@JsonProperty("certificate") X509Certificate certificate) { + this.certificate = certificate; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + CertificateSerializedPayload that = (CertificateSerializedPayload) o; + + return certificate.equals(that.certificate); + } + + @Override + public int hashCode() { + return certificate.hashCode(); + } + + @Override + public String toString() { + return "CertificateSerializedPayload{" + + "certificate='" + certificate + '\'' + + '}'; + } + + public static class CertificateSerializer extends JsonSerializer<X509Certificate> { + @Override + public void serialize( + X509Certificate certificate, JsonGenerator gen, SerializerProvider serializers) throws IOException { + try (StringWriter stringWriter = new StringWriter(); JcaPEMWriter pemWriter = new JcaPEMWriter(stringWriter)) { + pemWriter.writeObject(new PemObject("CERTIFICATE", certificate.getEncoded())); + pemWriter.flush(); + gen.writeString(stringWriter.toString()); + } catch (CertificateEncodingException e) { + throw new RuntimeException("Failed to encode X509Certificate", e); + } + } + } +} diff --git a/athenz-identity-provider-service/src/main/java/com/yahoo/vespa/hosted/athenz/instanceproviderservice/ca/model/CsrSerializedPayload.java b/athenz-identity-provider-service/src/main/java/com/yahoo/vespa/hosted/athenz/instanceproviderservice/ca/model/CsrSerializedPayload.java new file mode 100644 index 00000000000..d755fbd02a3 --- /dev/null +++ b/athenz-identity-provider-service/src/main/java/com/yahoo/vespa/hosted/athenz/instanceproviderservice/ca/model/CsrSerializedPayload.java @@ -0,0 +1,62 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.hosted.athenz.instanceproviderservice.ca.model; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import org.bouncycastle.openssl.PEMParser; +import org.bouncycastle.pkcs.PKCS10CertificationRequest; + +import java.io.IOException; +import java.io.StringReader; + +/** + * Contains PEM formatted Certificate Signing Request (CSR) + * + * @author freva + */ +public class CsrSerializedPayload { + + @JsonProperty("csr") public final PKCS10CertificationRequest csr; + + @JsonCreator + public CsrSerializedPayload(@JsonProperty("csr") @JsonDeserialize(using = CertificateRequestDeserializer.class) + PKCS10CertificationRequest csr) { + this.csr = csr; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + CsrSerializedPayload that = (CsrSerializedPayload) o; + + return csr.equals(that.csr); + } + + @Override + public int hashCode() { + return csr.hashCode(); + } + + @Override + public String toString() { + return "CsrSerializedPayload{" + + "csr='" + csr + '\'' + + '}'; + } + + public static class CertificateRequestDeserializer extends JsonDeserializer<PKCS10CertificationRequest> { + @Override + public PKCS10CertificationRequest deserialize( + JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException { + try (PEMParser pemParser = new PEMParser(new StringReader(jsonParser.getValueAsString()))) { + return (PKCS10CertificationRequest) pemParser.readObject(); + } + } + } +} diff --git a/athenz-identity-provider-service/src/test/java/com/yahoo/vespa/hosted/athenz/instanceproviderservice/AthenzInstanceProviderServiceTest.java b/athenz-identity-provider-service/src/test/java/com/yahoo/vespa/hosted/athenz/instanceproviderservice/AthenzInstanceProviderServiceTest.java index bf0746aee7e..c8c3826fc39 100644 --- a/athenz-identity-provider-service/src/test/java/com/yahoo/vespa/hosted/athenz/instanceproviderservice/AthenzInstanceProviderServiceTest.java +++ b/athenz-identity-provider-service/src/test/java/com/yahoo/vespa/hosted/athenz/instanceproviderservice/AthenzInstanceProviderServiceTest.java @@ -9,6 +9,7 @@ import com.yahoo.config.provision.SystemName; import com.yahoo.config.provision.Zone; import com.yahoo.log.LogLevel; import com.yahoo.vespa.hosted.athenz.instanceproviderservice.AthenzInstanceProviderService.AthenzCertificateUpdater; +import com.yahoo.vespa.hosted.athenz.instanceproviderservice.ca.CertificateSigner; import com.yahoo.vespa.hosted.athenz.instanceproviderservice.config.AthenzProviderServiceConfig; import com.yahoo.vespa.hosted.athenz.instanceproviderservice.impl.CertificateClient; import com.yahoo.vespa.hosted.athenz.instanceproviderservice.impl.IdentityDocumentGenerator; @@ -101,13 +102,16 @@ public class AthenzInstanceProviderServiceTest { ScheduledExecutorService executor = mock(ScheduledExecutorService.class); when(executor.awaitTermination(anyLong(), any())).thenReturn(true); + CertificateSigner certificateSigner = mock(CertificateSigner.class); + InstanceValidator instanceValidator = mock(InstanceValidator.class); when(instanceValidator.isValidInstance(any())).thenReturn(true); IdentityDocumentGenerator identityDocumentGenerator = mock(IdentityDocumentGenerator.class); AthenzInstanceProviderService athenzInstanceProviderService = new AthenzInstanceProviderService( - config, executor, ZONE, sslContextFactory, instanceValidator, identityDocumentGenerator, certificateUpdater); + config, executor, ZONE, sslContextFactory, certificateSigner, instanceValidator, + identityDocumentGenerator, certificateUpdater); try (CloseableHttpClient client = createHttpClient(domain, service)) { assertFalse(getStatus(client)); diff --git a/athenz-identity-provider-service/src/test/java/com/yahoo/vespa/hosted/athenz/instanceproviderservice/ca/CertificateSignerTest.java b/athenz-identity-provider-service/src/test/java/com/yahoo/vespa/hosted/athenz/instanceproviderservice/ca/CertificateSignerTest.java new file mode 100644 index 00000000000..e691da0b2c3 --- /dev/null +++ b/athenz-identity-provider-service/src/test/java/com/yahoo/vespa/hosted/athenz/instanceproviderservice/ca/CertificateSignerTest.java @@ -0,0 +1,134 @@ +package com.yahoo.vespa.hosted.athenz.instanceproviderservice.ca; + +import com.yahoo.test.ManualClock; +import org.bouncycastle.asn1.pkcs.PKCSObjectIdentifiers; +import org.bouncycastle.asn1.x500.X500Name; +import org.bouncycastle.asn1.x509.Extension; +import org.bouncycastle.asn1.x509.Extensions; +import org.bouncycastle.asn1.x509.ExtensionsGenerator; +import org.bouncycastle.asn1.x509.GeneralName; +import org.bouncycastle.asn1.x509.GeneralNames; +import org.bouncycastle.operator.ContentSigner; +import org.bouncycastle.operator.jcajce.JcaContentSignerBuilder; +import org.bouncycastle.pkcs.PKCS10CertificationRequest; +import org.bouncycastle.pkcs.PKCS10CertificationRequestBuilder; +import org.bouncycastle.pkcs.jcajce.JcaPKCS10CertificationRequestBuilder; +import org.junit.Test; + +import java.math.BigInteger; +import java.security.KeyPair; +import java.security.KeyPairGenerator; +import java.security.cert.X509Certificate; +import java.time.Instant; +import java.util.Collection; +import java.util.Collections; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * @author freva + */ +public class CertificateSignerTest { + + private final KeyPair clientKeyPair = getKeyPair(); + + private final long startTime = 1234567890000L; + private final KeyPair caKeyPair = getKeyPair(); + private final String cfgServerHostname = "cfg1.us-north-1.vespa.domain.tld"; + private final ManualClock clock = new ManualClock(Instant.ofEpochMilli(startTime)); + private final CertificateSigner signer = new CertificateSigner(caKeyPair.getPrivate(), cfgServerHostname, clock); + + private final String requestersHostname = "tenant-123.us-north-1.vespa.domain.tld"; + + @Test + public void test_signing() throws Exception { + ExtensionsGenerator extGen = new ExtensionsGenerator(); + String subject = "C=NO,OU=Vespa,CN=" + requestersHostname; + PKCS10CertificationRequest request = makeRequest(subject, extGen.generate()); + + X509Certificate certificate = signer.generateX509Certificate(request, requestersHostname); + assertCertificate(certificate, subject, Collections.singleton(Extension.basicConstraints.getId())); + } + + @Test + public void common_name_test() throws Exception { + CertificateSigner.verifyCertificateCommonName( + new X500Name("CN=" + requestersHostname), requestersHostname); + CertificateSigner.verifyCertificateCommonName( + new X500Name("C=NO,OU=Vespa,CN=" + requestersHostname), requestersHostname); + CertificateSigner.verifyCertificateCommonName( + new X500Name("C=NO+OU=org,CN=" + requestersHostname), requestersHostname); + + assertCertificateCommonNameException("C=NO", "Only 1 common name should be set"); + assertCertificateCommonNameException("C=US+CN=abc123.domain.tld,C=NO+CN=" + requestersHostname, "Only 1 common name should be set"); + assertCertificateCommonNameException("CN=evil.hostname.domain.tld", + "Expected common name to be tenant-123.us-north-1.vespa.domain.tld, but was evil.hostname.domain.tld"); + } + + @Test(expected = IllegalArgumentException.class) + public void extensions_test_subject_alternative_names() throws Exception { + ExtensionsGenerator extGen = new ExtensionsGenerator(); + extGen.addExtension(Extension.subjectAlternativeName, false, new GeneralNames(new GeneralName[] { + new GeneralName(GeneralName.dNSName, "some.other.domain.tld")})); + PKCS10CertificationRequest request = makeRequest("OU=Vespa", extGen.generate()); + + CertificateSigner.verifyCertificateExtensions(request); + } + + @Test + public void extensions_allowed() throws Exception { + ExtensionsGenerator extGen = new ExtensionsGenerator(); + extGen.addExtension(Extension.certificateIssuer, true, new byte[0]); + PKCS10CertificationRequest request = makeRequest("OU=Vespa", extGen.generate()); + + CertificateSigner.verifyCertificateExtensions(request); + } + + private void assertCertificateCommonNameException(String subject, String expectedMessage) { + try { + CertificateSigner.verifyCertificateCommonName(new X500Name(subject), requestersHostname); + fail("Expected to fail"); + } catch (IllegalArgumentException e) { + assertEquals(expectedMessage, e.getMessage()); + } + } + + private void assertCertificate(X509Certificate certificate, String expectedSubjectName, Set<String> expectedExtensions) throws Exception { + assertEquals(3, certificate.getVersion()); + assertEquals(BigInteger.valueOf(startTime), certificate.getSerialNumber()); + assertEquals(startTime, certificate.getNotBefore().getTime()); + assertEquals(startTime + CertificateSigner.CERTIFICATE_EXPIRATION.toMillis(), certificate.getNotAfter().getTime()); + assertEquals(CertificateSigner.SIGNER_ALGORITHM, certificate.getSigAlgName()); + assertEquals(expectedSubjectName, certificate.getSubjectDN().getName()); + assertEquals("CN=" + cfgServerHostname, certificate.getIssuerX500Principal().getName()); + + Set<String> extensions = Stream.of(certificate.getNonCriticalExtensionOIDs(), + certificate.getCriticalExtensionOIDs()) + .flatMap(Collection::stream) + .collect(Collectors.toSet()); + assertEquals(expectedExtensions, extensions); + + certificate.verify(caKeyPair.getPublic()); + } + + private PKCS10CertificationRequest makeRequest(String subject, Extensions extensions) throws Exception { + PKCS10CertificationRequestBuilder builder = new JcaPKCS10CertificationRequestBuilder( + new X500Name(subject), clientKeyPair.getPublic()); + builder.addAttribute(PKCSObjectIdentifiers.pkcs_9_at_extensionRequest, extensions); + + ContentSigner signGen = new JcaContentSignerBuilder(CertificateSigner.SIGNER_ALGORITHM).build(caKeyPair.getPrivate()); + return builder.build(signGen); + } + + private static KeyPair getKeyPair() { + try { + return KeyPairGenerator.getInstance("RSA").genKeyPair(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/config-application-package/src/main/java/com/yahoo/config/model/application/provider/FileReferenceCreator.java b/config-application-package/src/main/java/com/yahoo/config/model/application/provider/FileReferenceCreator.java deleted file mode 100644 index 7e1d247281c..00000000000 --- a/config-application-package/src/main/java/com/yahoo/config/model/application/provider/FileReferenceCreator.java +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.config.model.application.provider; - -import com.yahoo.config.FileReference; - -import java.lang.reflect.Constructor; - -/** - * Convenience for creating a {@link com.yahoo.config.FileReference}. - * - * @author gjoranv - */ -public class FileReferenceCreator { - - public static FileReference create(String stringVal) { - try { - Constructor<FileReference> ctor = FileReference.class.getDeclaredConstructor(String.class); - ctor.setAccessible(true); - return ctor.newInstance(stringVal); - } catch (Exception e) { - throw new RuntimeException("Could not create a new " + FileReference.class.getName() + - ". This should never happen!", e); - } - } - -} diff --git a/config-application-package/src/main/java/com/yahoo/config/model/application/provider/MockFileRegistry.java b/config-application-package/src/main/java/com/yahoo/config/model/application/provider/MockFileRegistry.java index ca0b37d8cc3..d635fe90ded 100644 --- a/config-application-package/src/main/java/com/yahoo/config/model/application/provider/MockFileRegistry.java +++ b/config-application-package/src/main/java/com/yahoo/config/model/application/provider/MockFileRegistry.java @@ -17,7 +17,7 @@ import java.util.Set; public class MockFileRegistry implements FileRegistry { public FileReference addFile(String relativePath) { - return FileReferenceCreator.create("0123456789abcdef"); + return new FileReference("0123456789abcdef"); } @Override @@ -25,8 +25,8 @@ public class MockFileRegistry implements FileRegistry { return "localhost.fortestingpurposesonly"; } - public static final Entry entry1 = new Entry("component/path1", FileReferenceCreator.create("1234")); - public static final Entry entry2 = new Entry("component/path2", FileReferenceCreator.create("56789")); + public static final Entry entry1 = new Entry("component/path1", new FileReference("1234")); + public static final Entry entry2 = new Entry("component/path2", new FileReference("56789")); public List<Entry> export() { List<Entry> result = new ArrayList<>(); diff --git a/config-application-package/src/main/java/com/yahoo/config/model/application/provider/PreGeneratedFileRegistry.java b/config-application-package/src/main/java/com/yahoo/config/model/application/provider/PreGeneratedFileRegistry.java index 29e83e00305..0b0b799f47f 100644 --- a/config-application-package/src/main/java/com/yahoo/config/model/application/provider/PreGeneratedFileRegistry.java +++ b/config-application-package/src/main/java/com/yahoo/config/model/application/provider/PreGeneratedFileRegistry.java @@ -70,7 +70,7 @@ public class PreGeneratedFileRegistry implements FileRegistry { } public FileReference addFile(String relativePath) { - return FileReferenceCreator.create(path2Hash.get(relativePath)); + return new FileReference(path2Hash.get(relativePath)); } @Override @@ -86,7 +86,7 @@ public class PreGeneratedFileRegistry implements FileRegistry { public List<Entry> export() { List<Entry> entries = new ArrayList<>(); for (Map.Entry<String, String> entry : path2Hash.entrySet()) { - entries.add(new Entry(entry.getKey(), FileReferenceCreator.create(entry.getValue()))); + entries.add(new Entry(entry.getKey(), new FileReference(entry.getValue()))); } return entries; } diff --git a/config-model/src/main/java/com/yahoo/vespa/model/filedistribution/FileDistributor.java b/config-model/src/main/java/com/yahoo/vespa/model/filedistribution/FileDistributor.java index 8860f5c2249..f6cc9203d00 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/filedistribution/FileDistributor.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/filedistribution/FileDistributor.java @@ -110,11 +110,5 @@ public class FileDistributor { public void reloadDeployFileDistributor(FileDistribution dbHandler) { dbHandler.reloadDeployFileDistributor(); } - - private Set<String> union(Set<String> hosts, String... additionalHosts) { - Set<String> result = new HashSet<>(hosts); - result.addAll(asList(additionalHosts)); - return result; - } } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/AddFileInterface.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/AddFileInterface.java new file mode 100644 index 00000000000..61c376a7256 --- /dev/null +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/AddFileInterface.java @@ -0,0 +1,9 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.server.filedistribution; + +import com.yahoo.config.FileReference; + +public interface AddFileInterface { + FileReference addFile(String relativePath); + FileReference addFile(String relativePath, FileReference reference); +} diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/ApplicationFileManager.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/ApplicationFileManager.java new file mode 100644 index 00000000000..79c541d7b1a --- /dev/null +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/ApplicationFileManager.java @@ -0,0 +1,27 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.server.filedistribution; + +import com.yahoo.config.FileReference; +import java.io.File; + +public class ApplicationFileManager implements AddFileInterface { + private final File applicationDir; + private final FileDirectory master; + + ApplicationFileManager(File applicationDir, FileDirectory master) { + this.applicationDir = applicationDir; + this.master = master; + } + + @Override + public FileReference addFile(String relativePath, FileReference reference) { + // TODO Wire in when verified in system test + // return master.addFile(new File(applicationDir, relativePath), reference); + return reference; + } + + @Override + public FileReference addFile(String relativePath) { + return master.addFile(new File(applicationDir, relativePath)); + } +} diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/CombinedLegacyDistribution.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/CombinedLegacyDistribution.java new file mode 100644 index 00000000000..588f2d1d63f --- /dev/null +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/CombinedLegacyDistribution.java @@ -0,0 +1,30 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.server.filedistribution; + +import com.yahoo.config.FileReference; +import com.yahoo.config.model.api.FileDistribution; + +import java.util.Collection; +import java.util.Set; + +public class CombinedLegacyDistribution implements FileDistribution { + private final FileDistribution legacy; + + CombinedLegacyDistribution(FileDBHandler legacy) { + this.legacy = legacy; + } + @Override + public void sendDeployedFiles(String hostName, Set<FileReference> fileReferences) { + legacy.sendDeployedFiles(hostName, fileReferences); + } + + @Override + public void reloadDeployFileDistributor() { + legacy.reloadDeployFileDistributor(); + } + + @Override + public void removeDeploymentsThatHaveDifferentApplicationId(Collection<String> targetHostnames) { + legacy.removeDeploymentsThatHaveDifferentApplicationId(targetHostnames); + } +} diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/CombinedLegacyRegistry.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/CombinedLegacyRegistry.java new file mode 100644 index 00000000000..8f2cb194bbd --- /dev/null +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/CombinedLegacyRegistry.java @@ -0,0 +1,32 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.config.server.filedistribution; + +import com.yahoo.config.FileReference; +import com.yahoo.config.application.api.FileRegistry; + +import java.util.List; + +public class CombinedLegacyRegistry implements FileRegistry { + private final FileDBRegistry legacy; + private final FileDBRegistry future; + + CombinedLegacyRegistry(FileDBRegistry legacy, FileDBRegistry future) { + this.legacy = legacy; + this.future = future; + } + @Override + public FileReference addFile(String relativePath) { + FileReference reference = legacy.addFile(relativePath); + return future.addFile(relativePath, reference); + } + + @Override + public String fileSourceHost() { + return future.fileSourceHost(); + } + + @Override + public List<Entry> export() { + return future.export(); + } +} diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDBRegistry.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDBRegistry.java index d921d8d4f8d..1a76454fbed 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDBRegistry.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDBRegistry.java @@ -4,29 +4,41 @@ package com.yahoo.vespa.config.server.filedistribution; import com.yahoo.config.FileReference; import com.yahoo.config.application.api.FileRegistry; import com.yahoo.net.HostName; -import com.yahoo.vespa.filedistribution.FileDistributionManager; -import com.yahoo.config.model.application.provider.FileReferenceCreator; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; /** * @author tonytv */ public class FileDBRegistry implements FileRegistry { - private final FileDistributionManager manager; + private final AddFileInterface manager; private List<Entry> entries = new ArrayList<>(); private final Map<String, FileReference> fileReferenceCache = new HashMap<>(); - public FileDBRegistry(FileDistributionManager manager) { + public FileDBRegistry(AddFileInterface manager) { this.manager = manager; } + public synchronized FileReference addFile(String relativePath, FileReference reference) { + Optional<FileReference> cachedReference = Optional.ofNullable(fileReferenceCache.get(relativePath)); + return cachedReference.orElseGet(() -> { + FileReference newRef = manager.addFile(relativePath, reference); + entries.add(new Entry(relativePath, newRef)); + fileReferenceCache.put(relativePath, newRef); + return newRef; + }); + } + @Override public synchronized FileReference addFile(String relativePath) { Optional<FileReference> cachedReference = Optional.ofNullable(fileReferenceCache.get(relativePath)); return cachedReference.orElseGet(() -> { - FileReference newRef = FileReferenceCreator.create(manager.addFile(relativePath)); + FileReference newRef = manager.addFile(relativePath); entries.add(new Entry(relativePath, newRef)); fileReferenceCache.put(relativePath, newRef); return newRef; diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDirectory.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDirectory.java new file mode 100644 index 00000000000..b0042b15470 --- /dev/null +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDirectory.java @@ -0,0 +1,118 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +package com.yahoo.vespa.config.server.filedistribution; + +import com.yahoo.config.FileReference; +import com.yahoo.config.model.api.FileDistribution; +import com.yahoo.io.IOUtils; +import com.yahoo.log.LogLevel; +import com.yahoo.text.Utf8; +import net.jpountz.xxhash.XXHash64; +import net.jpountz.xxhash.XXHashFactory; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.logging.Logger; + +public class FileDirectory { + private static final Logger log = Logger.getLogger(FileDirectory.class.getName()); + private final File root; + + public FileDirectory() { + this(FileDistribution.getDefaultFileDBPath()); + } + + public FileDirectory(File rootDir) { + root = rootDir; + try { + ensureRootExist(); + } catch (IllegalArgumentException e) { + log.warning("Failed creating directory in constructor, will retry on demand : " + e.toString()); + } + } + + private void ensureRootExist() { + if (! root.exists()) { + if ( ! root.mkdir()) { + throw new IllegalArgumentException("Failed creating root dir '" + root.getAbsolutePath() + "'."); + } + } else if (!root.isDirectory()) { + throw new IllegalArgumentException("'" + root.getAbsolutePath() + "' is not a directory"); + } + } + + static private class Filter implements FilenameFilter { + @Override + public boolean accept(File dir, String name) { + return !".".equals(name) && !"..".equals(name) ; + } + } + + String getPath(FileReference ref) { + return root.getAbsolutePath() + "/" + ref.value(); + } + + File getFile(FileReference reference) { + ensureRootExist(); + File dir = new File(getPath(reference)); + if (!dir.exists()) { + throw new IllegalArgumentException("File reference '" + reference.toString() + "' with absolute path '" + dir.getAbsolutePath() + "' does not exist."); + } + if (!dir.isDirectory()) { + throw new IllegalArgumentException("File reference '" + reference.toString() + "' with absolute path '" + dir.getAbsolutePath() + "' is not a directory."); + } + File [] files = dir.listFiles(new Filter()); + if (files.length != 1) { + StringBuilder msg = new StringBuilder(); + for (File f: files) { + msg.append(f.getName()).append("\n"); + } + throw new IllegalArgumentException("File reference '" + reference.toString() + "' with absolute path '" + dir.getAbsolutePath() + " does not contain exactly one file, but [" + msg.toString() + "]"); + } + return files[0]; + } + + private Long computeReference(File file) throws IOException { + byte [] wholeFile = IOUtils.readFileBytes(file); + XXHash64 hasher = XXHashFactory.fastestInstance().hash64(); + return hasher.hash(ByteBuffer.wrap(wholeFile), hasher.hash(ByteBuffer.wrap(Utf8.toBytes(file.getName())), 0)); + } + + public FileReference addFile(File source) { + try { + Long hash = computeReference(source); + FileReference reference = new FileReference(Long.toHexString(hash)); + return addFile(source, reference); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + } + + public FileReference addFile(File source, FileReference reference) { + ensureRootExist(); + try { + File destinationDir = new File(root, reference.value()); + if (!destinationDir.exists()) { + destinationDir.mkdir(); + Path tempDestinationDir = Files.createTempDirectory(root.toPath(), "writing"); + File destination = new File(tempDestinationDir.toFile(), source.getName()); + IOUtils.copy(source, destination); + if (!destinationDir.exists()) { + if ( ! tempDestinationDir.toFile().renameTo(destinationDir)) { + log.warning("Failed moving '" + tempDestinationDir.toFile().getAbsolutePath() + "' to '" + destination.getAbsolutePath() + "'."); + } + } else { + IOUtils.copyDirectory(tempDestinationDir.toFile(), destinationDir, 1); + } + IOUtils.recursiveDeleteDir(tempDestinationDir.toFile()); + } + return reference; + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + } +} diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionProvider.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionProvider.java index 36b0138ad36..59c3a54897d 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionProvider.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileDistributionProvider.java @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config.server.filedistribution; +import com.yahoo.config.FileReference; import com.yahoo.config.model.api.FileDistribution; import com.yahoo.config.application.api.FileRegistry; import com.yahoo.vespa.filedistribution.FileDistributionManager; @@ -19,13 +20,32 @@ public class FileDistributionProvider { private final FileRegistry fileRegistry; private final FileDistribution fileDistribution; - public FileDistributionProvider(File applicationDir, String zooKeepersSpec, String applicationId, Lock fileDistributionLock) { + static private class ManagerWrapper implements AddFileInterface { + private final FileDistributionManager manager; + ManagerWrapper(FileDistributionManager manager) { + this.manager = manager; + } + @Override + public FileReference addFile(String relativePath) { + return new FileReference(manager.addFile(relativePath)); + } + + @Override + public FileReference addFile(String relativePath, FileReference reference) { + throw new IllegalStateException("addFile with external reference is not possible with legacy filedistribution."); + } + } + + public FileDistributionProvider(File applicationDir, String zooKeepersSpec, + String applicationId, Lock fileDistributionLock) + { ensureDirExists(FileDistribution.getDefaultFileDBPath()); final FileDistributionManager manager = new FileDistributionManager( FileDistribution.getDefaultFileDBPath(), applicationDir, zooKeepersSpec, applicationId, fileDistributionLock); - this.fileDistribution = new FileDBHandler(manager); - this.fileRegistry = new FileDBRegistry(manager); + this.fileDistribution = new CombinedLegacyDistribution(new FileDBHandler(manager)); + this.fileRegistry = new CombinedLegacyRegistry(new FileDBRegistry(new ManagerWrapper(manager)), + new FileDBRegistry(new ApplicationFileManager(applicationDir, new FileDirectory()))); } public FileDistributionProvider(FileRegistry fileRegistry, FileDistribution fileDistribution) { diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java index 1c77ee66d0c..a504cd120ee 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/filedistribution/FileServer.java @@ -1,3 +1,4 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config.server.filedistribution; import com.google.inject.Inject; @@ -6,7 +7,6 @@ import com.yahoo.config.model.api.FileDistribution; import com.yahoo.io.IOUtils; import java.io.File; -import java.io.FilenameFilter; import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -14,7 +14,7 @@ import java.util.logging.Logger; public class FileServer { private static final Logger log = Logger.getLogger(FileServer.class.getName()); - private final String rootDir; + private final FileDirectory root; private final ExecutorService executor; public static class ReplayStatus { @@ -33,46 +33,17 @@ public class FileServer { void receive(FileReference reference, String filename, byte [] content, ReplayStatus status); } - private String getPath(FileReference ref) { - return rootDir + "/" + ref.value(); - } - - static private class Filter implements FilenameFilter { - @Override - public boolean accept(File dir, String name) { - return !".".equals(name) && !"..".equals(name) ; - } - } - private File getFile(FileReference reference) { - File dir = new File(getPath(reference)); - if (!dir.exists()) { - throw new IllegalArgumentException("File reference '" + reference.toString() + "' with absolute path '" + dir.getAbsolutePath() + "' does not exist."); - } - if (!dir.isDirectory()) { - throw new IllegalArgumentException("File reference '" + reference.toString() + "' with absolute path '" + dir.getAbsolutePath() + "' is not a directory."); - } - File [] files = dir.listFiles(new Filter()); - if (files.length != 1) { - StringBuilder msg = new StringBuilder(); - for (File f: files) { - msg.append(f.getName()).append("\n"); - } - throw new IllegalArgumentException("File reference '" + reference.toString() + "' with absolute path '" + dir.getAbsolutePath() + " does not contain exactly one file, but [" + msg.toString() + "]"); - } - return files[0]; - } - @Inject public FileServer() { - this(FileDistribution.getDefaultFileDBRoot()); + this(FileDistribution.getDefaultFileDBPath()); } - public FileServer(String rootDir) { + public FileServer(File rootDir) { this(rootDir, Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); } - public FileServer(String rootDir, ExecutorService executor) { - this.rootDir = rootDir; + public FileServer(File rootDir, ExecutorService executor) { + this.root = new FileDirectory(rootDir); this.executor = executor; } public boolean hasFile(String fileName) { @@ -80,7 +51,7 @@ public class FileServer { } public boolean hasFile(FileReference reference) { try { - return getFile(reference).exists(); + return root.getFile(reference).exists(); } catch (IllegalArgumentException e) { log.warning("Failed locating file reference '" + reference + "' with error " + e.toString()); } @@ -88,7 +59,7 @@ public class FileServer { } public boolean startFileServing(String fileName, Receiver target) { FileReference reference = new FileReference(fileName); - File file = getFile(reference); + File file = root.getFile(reference); if (file.exists()) { executor.execute(() -> serveFile(reference, target)); @@ -97,8 +68,9 @@ public class FileServer { } private void serveFile(FileReference reference, Receiver target) { - - File file = getFile(reference); + File file = root.getFile(reference); + // TODO remove once verified in system tests. + log.info("Start serving reference '" + reference.value() + "' with file '" + file.getAbsolutePath() + "'"); byte [] blob = new byte [0]; boolean success = false; String errorDescription = "OK"; @@ -111,5 +83,7 @@ public class FileServer { } target.receive(reference, file.getName(), blob, new ReplayStatus(success ? 0 : 1, success ? "OK" : errorDescription)); + // TODO remove once verified in system tests. + log.info("Done serving reference '" + reference.toString() + "' with file '" + file.getAbsolutePath() + "'"); } } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java index 662da63d198..7c5adb3b932 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/rpc/RpcServer.java @@ -450,6 +450,10 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { fileBlob.parameters().add(new Int32Value(status.getCode())); fileBlob.parameters().add(new StringValue(status.getDescription())); target.invokeSync(fileBlob, 600); + if (fileBlob.isError()) { + log.warning("Failed delivering reference '" + reference.value() + "' with file '" + filename + "' to " + + target.toString() + " with error : '" + fileBlob.errorMessage() + "'."); + } } } @@ -458,6 +462,8 @@ public class RpcServer implements Runnable, ReloadListener, TenantListener { String fileReference = request.parameters().get(0).asString(); FileApiErrorCodes result; try { + // TODO remove once verified in system tests. + log.info("Received request for reference '" + fileReference + "'"); result = fileServer.hasFile(fileReference) ? FileApiErrorCodes.OK : FileApiErrorCodes.NOT_FOUND; diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/Tenants.java b/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/Tenants.java index c67ad0675b2..528a30e0191 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/Tenants.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/tenant/Tenants.java @@ -159,7 +159,7 @@ public class Tenants implements ConnectionStateListener, PathChildrenCacheListen Map<TenantName, Tenant> current = new LinkedHashMap<>(tenants); for (Map.Entry<TenantName, Tenant> entry : current.entrySet()) { TenantName tenant = entry.getKey(); - if (!newTenants.contains(tenant)) { + if (!newTenants.contains(tenant) && !DEFAULT_TENANT.equals(tenant)) { notifyRemovedTenant(tenant); entry.getValue().close(); tenants.remove(tenant); @@ -257,7 +257,7 @@ public class Tenants implements ConnectionStateListener, PathChildrenCacheListen * @return this Tenants instance */ public synchronized Tenants deleteTenant(TenantName name) { - if (name.equals(TenantName.defaultName())) + if (name.equals(DEFAULT_TENANT)) throw new IllegalArgumentException("Deleting 'default' tenant is not allowed"); Tenant tenant = tenants.get(name); tenant.delete(); @@ -275,7 +275,7 @@ public class Tenants implements ConnectionStateListener, PathChildrenCacheListen * @return the log string */ public static String logPre(ApplicationId app) { - if (TenantName.defaultName().equals(app.tenant())) return ""; + if (DEFAULT_TENANT.equals(app.tenant())) return ""; StringBuilder ret = new StringBuilder() .append(logPre(app.tenant())) .append("app:"+app.application().value()) diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/InjectedGlobalComponentRegistryTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/InjectedGlobalComponentRegistryTest.java index acda60049ab..dec9dd991de 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/InjectedGlobalComponentRegistryTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/InjectedGlobalComponentRegistryTest.java @@ -63,7 +63,7 @@ public class InjectedGlobalComponentRegistryTest { serverDB = new ConfigServerDB(configserverConfig); sessionPreparer = new SessionTest.MockSessionPreparer(); rpcServer = new RpcServer(configserverConfig, null, Metrics.createTestMetrics(), - new HostRegistries(), new ConfigRequestHostLivenessTracker(), new FileServer(FileDistribution.getDefaultFileDBRoot())); + new HostRegistries(), new ConfigRequestHostLivenessTracker(), new FileServer(FileDistribution.getDefaultFileDBPath())); generationCounter = new SuperModelGenerationCounter(curator); defRepo = new StaticConfigDefinitionRepo(); permanentApplicationPackage = new PermanentApplicationPackage(configserverConfig); diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/deploy/DeployTester.java b/configserver/src/test/java/com/yahoo/vespa/config/server/deploy/DeployTester.java index dea468eb0be..d9a0db7e811 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/deploy/DeployTester.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/deploy/DeployTester.java @@ -69,26 +69,19 @@ public class DeployTester { } public DeployTester(String appPath, List<ModelFactory> modelFactories) { - this(appPath, modelFactories, new ConfigserverConfig(new ConfigserverConfig.Builder() - .configServerDBDir(Files.createTempDir() - .getAbsolutePath()) - .configDefinitionsDir(Files.createTempDir() - .getAbsolutePath())), + this(appPath, modelFactories, + new ConfigserverConfig(new ConfigserverConfig.Builder() + .configServerDBDir(Files.createTempDir().getAbsolutePath()) + .configDefinitionsDir(Files.createTempDir().getAbsolutePath())), Clock.systemUTC()); } public DeployTester(String appPath, ConfigserverConfig configserverConfig) { - this(appPath, - Collections.singletonList(createModelFactory(Clock.systemUTC())), - configserverConfig, - Clock.systemUTC()); + this(appPath, Collections.singletonList(createModelFactory(Clock.systemUTC())), configserverConfig, Clock.systemUTC()); } public DeployTester(String appPath, ConfigserverConfig configserverConfig, Clock clock) { - this(appPath, - Collections.singletonList(createModelFactory(clock)), - configserverConfig, - clock); + this(appPath, Collections.singletonList(createModelFactory(clock)), configserverConfig, clock); } public DeployTester(String appPath, List<ModelFactory> modelFactories, ConfigserverConfig configserverConfig) { @@ -106,12 +99,12 @@ public class DeployTester { catch (Exception e) { throw new IllegalArgumentException(e); } - applicationRepository = new ApplicationRepository(tenants, - createHostProvisioner(), - clock); + applicationRepository = new ApplicationRepository(tenants, createHostProvisioner(), clock); } - public Tenant tenant() { return tenants.defaultTenant(); } + public Tenant tenant() { + return tenants.defaultTenant(); + } /** Create a model factory for the version of this source*/ public static ModelFactory createModelFactory(Clock clock) { @@ -137,6 +130,7 @@ public class DeployTester { * Do the initial "deploy" with the existing API-less code as the deploy API doesn't support first deploys yet. */ public ApplicationId deployApp(String appName, String vespaVersion, Instant now) { + Tenant tenant = tenant(); LocalSession session = tenant.getSessionFactory().createSession(testApp, appName, new TimeoutBudget(clock, Duration.ofSeconds(60))); ApplicationId id = ApplicationId.from(tenant.getName(), ApplicationName.from(appName), InstanceName.defaultName()); diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java index 0c2ace38389..4913798e5ad 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/filedistribution/FileServerTest.java @@ -1,3 +1,4 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.config.server.filedistribution; import com.yahoo.config.FileReference; @@ -17,7 +18,7 @@ import static org.junit.Assert.assertFalse; public class FileServerTest { - FileServer fs = new FileServer("."); + FileServer fs = new FileServer(new File(".")); List<File> created = new LinkedList<>(); private void createCleanDir(String name) throws IOException{ diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/MockRpc.java b/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/MockRpc.java index b094a741f34..4c2a4b56751 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/MockRpc.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/MockRpc.java @@ -39,7 +39,7 @@ public class MockRpc extends RpcServer { public MockRpc(int port, boolean createDefaultTenant, boolean pretendToHaveLoadedAnyApplication) { super(createConfig(port), null, Metrics.createTestMetrics(), - new HostRegistries(), new ConfigRequestHostLivenessTracker(), new FileServer(FileDistribution.getDefaultFileDBRoot())); + new HostRegistries(), new ConfigRequestHostLivenessTracker(), new FileServer(FileDistribution.getDefaultFileDBPath())); if (createDefaultTenant) { onTenantCreate(TenantName.from("default"), new MockTenantProvider(pretendToHaveLoadedAnyApplication)); } diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/TestWithRpc.java b/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/TestWithRpc.java index 933cb770dd1..12dc584f055 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/TestWithRpc.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/TestWithRpc.java @@ -90,7 +90,7 @@ public class TestWithRpc { emptyNodeFlavors(), generationCounter)), Metrics.createTestMetrics(), new HostRegistries(), - hostLivenessTracker, new FileServer(FileDistribution.getDefaultFileDBRoot())); + hostLivenessTracker, new FileServer(FileDistribution.getDefaultFileDBPath())); rpcServer.onTenantCreate(TenantName.from("default"), tenantProvider); t = new Thread(rpcServer); t.start(); diff --git a/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedRequestHandler.java b/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedRequestHandler.java index 7142449f3d4..f6e1c1d6d5a 100644 --- a/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedRequestHandler.java +++ b/container-core/src/main/java/com/yahoo/container/jdisc/ThreadedRequestHandler.java @@ -2,10 +2,13 @@ package com.yahoo.container.jdisc; import com.google.inject.Inject; +import com.yahoo.concurrent.CopyOnWriteHashMap; import com.yahoo.jdisc.Metric; import com.yahoo.jdisc.Request; import com.yahoo.jdisc.ResourceReference; import com.yahoo.jdisc.Response; +import com.yahoo.jdisc.application.BindingMatch; +import com.yahoo.jdisc.application.UriPattern; import com.yahoo.jdisc.handler.AbstractRequestHandler; import com.yahoo.jdisc.handler.BufferedContentChannel; import com.yahoo.jdisc.handler.ContentChannel; @@ -22,6 +25,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; +import static java.util.Collections.singletonMap; import javax.annotation.concurrent.GuardedBy; @@ -75,6 +79,21 @@ public abstract class ThreadedRequestHandler extends AbstractRequestHandler { this.allowAsyncResponse = allowAsyncResponse; } + private Map<String, Metric.Context> handlerContexts = new CopyOnWriteHashMap<>(); + private Metric.Context contextFor(BindingMatch match) { + if (match == null) return null; + UriPattern matched = match.matched(); + if (matched == null) return null; + String name = matched.toString(); + Metric.Context context = handlerContexts.get(name); + if (context == null) { + Map<String, String> dimensions = singletonMap("handler", name); + context = this.metric.createContext(dimensions); + handlerContexts.put(name, context); + } + return context; + } + /** * Handles a request by assigning a worker thread to it. * @@ -82,6 +101,7 @@ public abstract class ThreadedRequestHandler extends AbstractRequestHandler { */ @Override public final ContentChannel handleRequest(Request request, ResponseHandler responseHandler) { + metric.add("container.handled.requests", 1, contextFor(request.getBindingMatch())); if (request.getTimeout(TimeUnit.SECONDS) == null) { Duration timeout = getTimeout(); if (timeout != null) { @@ -173,7 +193,10 @@ public abstract class ThreadedRequestHandler extends AbstractRequestHandler { @Override public ContentChannel handleResponse(Response response) { if ( tryHasResponded()) throw new IllegalStateException("Response already handled"); - return responseHandler.handleResponse(response); + ContentChannel cc = responseHandler.handleResponse(response); + long millis = request.timeElapsed(TimeUnit.MILLISECONDS); + metric.set("container.handled.latency", millis, contextFor(request.getBindingMatch())); + return cc; } private boolean tryHasResponded() { diff --git a/fastlib/src/vespa/fastlib/net/httpserver.cpp b/fastlib/src/vespa/fastlib/net/httpserver.cpp index a9bba95a8ff..0d1b75ec7fe 100644 --- a/fastlib/src/vespa/fastlib/net/httpserver.cpp +++ b/fastlib/src/vespa/fastlib/net/httpserver.cpp @@ -367,7 +367,7 @@ int Fast_HTTPServer::Start(void) int retCode = FASTLIB_SUCCESS; { - std::unique_lock<std::mutex> runningGuard(_runningMutex); + std::lock_guard<std::mutex> runningGuard(_runningMutex); if (!_isRunning) { // Try listening retCode = Listen(); @@ -391,7 +391,7 @@ int Fast_HTTPServer::Start(void) void Fast_HTTPServer::Stop(void) { { - std::unique_lock<std::mutex> runningGuard(_runningMutex); + std::lock_guard<std::mutex> runningGuard(_runningMutex); _stopSignalled = true; if (_acceptThread) { _acceptThread->SetBreakFlag(); @@ -407,7 +407,7 @@ Fast_HTTPServer::Stop(void) { bool Fast_HTTPServer::StopSignalled(void) { bool retVal; - std::unique_lock<std::mutex> runningGuard(_runningMutex); + std::lock_guard<std::mutex> runningGuard(_runningMutex); retVal = _stopSignalled; return retVal; } @@ -458,7 +458,7 @@ void Fast_HTTPServer::Run(FastOS_ThreadInterface *thisThread, void *params) Fast_Socket *mySocket; { - std::unique_lock<std::mutex> runningGuard(_runningMutex); + std::lock_guard<std::mutex> runningGuard(_runningMutex); _isRunning = true; _stopSignalled = false; } @@ -516,7 +516,7 @@ void Fast_HTTPServer::Run(FastOS_ThreadInterface *thisThread, void *params) _serverSocket.SetSocketEvent(NULL); } - std::unique_lock<std::mutex> runningGuard(_runningMutex); + std::lock_guard<std::mutex> runningGuard(_runningMutex); _isRunning = false; } @@ -1040,7 +1040,7 @@ void Fast_HTTPServer::HandleFileRequest(const string & url, Fast_HTTPConnection& void Fast_HTTPServer::SetBaseDir(const char *baseDir) { - std::unique_lock<std::mutex> runningGuard(_runningMutex); + std::lock_guard<std::mutex> runningGuard(_runningMutex); if (!_isRunning) { _baseDir = baseDir; @@ -1178,14 +1178,14 @@ void Fast_HTTPServer::OutputNotFound(Fast_HTTPConnection& conn, void Fast_HTTPServer::AddConnection(Fast_HTTPConnection* connection) { - std::unique_lock<std::mutex> connectionGuard(_connectionLock); + std::lock_guard<std::mutex> connectionGuard(_connectionLock); _connections.Insert(connection); } void Fast_HTTPServer::RemoveConnection(Fast_HTTPConnection* connection) { - std::unique_lock<std::mutex> connectionGuard(_connectionLock); + std::lock_guard<std::mutex> connectionGuard(_connectionLock); _connections.RemoveElement(connection); _connectionCond.notify_one(); } diff --git a/fastlib/src/vespa/fastlib/text/normwordfolder.cpp b/fastlib/src/vespa/fastlib/text/normwordfolder.cpp index f383ff85df5..ca1f260515f 100644 --- a/fastlib/src/vespa/fastlib/text/normwordfolder.cpp +++ b/fastlib/src/vespa/fastlib/text/normwordfolder.cpp @@ -29,7 +29,7 @@ Fast_NormalizeWordFolder::Setup(uint32_t flags) { // Only allow setting these when not initialized or initializing... { - std::unique_lock<std::mutex> initGuard(_initMutex); + std::lock_guard<std::mutex> initGuard(_initMutex); _doAccentRemoval = (DO_ACCENT_REMOVAL & flags) != 0; // _doSmallToNormalKana = (DO_SMALL_TO_NORMAL_KANA & flags) != 0; // _doKatakanaToHiragana = (DO_KATAKANA_TO_HIRAGANA & flags) != 0; @@ -48,7 +48,7 @@ Fast_NormalizeWordFolder::Initialize() { unsigned int i; if (!_isInitialized) { - std::unique_lock<std::mutex> initGuard(_initMutex); + std::lock_guard<std::mutex> initGuard(_initMutex); if (!_isInitialized) { for (i = 0; i < 128; i++) diff --git a/jdisc_core/src/main/java/com/yahoo/jdisc/Request.java b/jdisc_core/src/main/java/com/yahoo/jdisc/Request.java index 489a4c3dc10..7a76c588fb4 100644 --- a/jdisc_core/src/main/java/com/yahoo/jdisc/Request.java +++ b/jdisc_core/src/main/java/com/yahoo/jdisc/Request.java @@ -287,7 +287,7 @@ public class Request extends AbstractResource { } /** - * <p>Returns the allocated number of milliseconds that this Request is allowed to exist. If no timeout has been set + * <p>Returns the allocated number of time units that this Request is allowed to exist. If no timeout has been set * for this Request, this method returns <em>null</em>.</p> * * @param unit The unit to return the timeout in. @@ -306,7 +306,7 @@ public class Request extends AbstractResource { * <em>null</em>.</p> * * @param unit The unit to return the time in. - * @return The number of milliseconds left until this Request times out, or <em>null</em>. + * @return The number of time units left until this Request times out, or <em>null</em>. */ public Long timeRemaining(TimeUnit unit) { if (timeout == null) { @@ -316,6 +316,16 @@ public class Request extends AbstractResource { } /** + * <p>Returns the time that this Request has existed so far. + * + * @param unit The unit to return the time in. + * @return The number of time units elapsed since this Request was created. + */ + public long timeElapsed(TimeUnit unit) { + return unit.convert(container().currentTimeMillis() - creationTime, TimeUnit.MILLISECONDS); + } + + /** * <p>Returns the time at which this Request was created. This is whatever value was returned by {@link * Timer#currentTimeMillis()} when constructing this.</p> * diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactory.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactory.java index 54338c64c1e..7ec51f35b74 100644 --- a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactory.java +++ b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactory.java @@ -1,7 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.jdisc.http.server.jetty; -import com.google.common.base.Preconditions; import com.google.inject.Inject; import com.yahoo.jdisc.Metric; import com.yahoo.jdisc.http.ConnectorConfig; @@ -10,31 +9,20 @@ import com.yahoo.jdisc.http.ConnectorConfig.Ssl.PemKeyStore; import com.yahoo.jdisc.http.SecretStore; import com.yahoo.jdisc.http.ssl.pem.PemSslKeyStore; import org.eclipse.jetty.http.HttpVersion; -import org.eclipse.jetty.server.ConnectionFactory; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpConnectionFactory; import org.eclipse.jetty.server.SecureRequestCustomizer; import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.ServerConnectionStatistics; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.SslConnectionFactory; import org.eclipse.jetty.util.ssl.SslContextFactory; -import javax.servlet.ServletRequest; import java.io.IOException; import java.io.UncheckedIOException; -import java.lang.reflect.Field; -import java.net.Socket; -import java.net.SocketException; import java.nio.channels.ServerSocketChannel; import java.nio.file.Path; import java.nio.file.Paths; import java.security.KeyStore; -import java.util.Map; -import java.util.Optional; -import java.util.TreeMap; -import java.util.function.Supplier; -import java.util.logging.Level; import java.util.logging.Logger; import static com.yahoo.jdisc.http.ConnectorConfig.Ssl.KeyStoreType.Enum.JKS; @@ -42,6 +30,7 @@ import static com.yahoo.jdisc.http.ConnectorConfig.Ssl.KeyStoreType.Enum.PEM; /** * @author Einar M R Rosenvinge + * @author bjorncs */ public class ConnectorFactory { @@ -63,12 +52,30 @@ public class ConnectorFactory { ConnectorConfig.Ssl ssl = config.ssl(); if (ssl.keyStoreType() == JKS) { - if (! ssl.pemKeyStore().keyPath().isEmpty() || ! ssl.pemKeyStore().certificatePath().isEmpty()) + if (!ssl.pemKeyStore().keyPath().isEmpty() || ! ssl.pemKeyStore().certificatePath().isEmpty()) { throw new IllegalArgumentException("pemKeyStore attributes can not be set when keyStoreType is JKS."); + } + if (ssl.keyDbKey().isEmpty()) { + throw new IllegalArgumentException("Missing password for JKS keystore"); + } } if (ssl.keyStoreType() == PEM) { - if (! ssl.keyStorePath().isEmpty()) + if (! ssl.keyStorePath().isEmpty()) { throw new IllegalArgumentException("keyStorePath can not be set when keyStoreType is PEM"); + } + if (!ssl.keyDbKey().isEmpty()) { + // TODO Make an error once there are separate passwords for truststore and keystore + log.warning("Encrypted PEM key stores are not supported. Password is only applied to truststore"); + } + if (ssl.pemKeyStore().certificatePath().isEmpty()) { + throw new IllegalArgumentException("Missing certificate path."); + } + if (ssl.pemKeyStore().keyPath().isEmpty()) { + throw new IllegalArgumentException("Missing key path."); + } + } + if (!ssl.trustStorePath().isEmpty() && ssl.useTrustStorePassword() && ssl.keyDbKey().isEmpty()) { + throw new IllegalArgumentException("Missing password for JKS truststore"); } } @@ -164,25 +171,24 @@ public class ConnectorFactory { factory.setIncludeCipherSuites(ciphs); } - Optional<String> keyDbPassword = secret(sslConfig.keyDbKey()); + String keyDbPassword = sslConfig.keyDbKey(); switch (sslConfig.keyStoreType()) { case PEM: - factory.setKeyStore(getKeyStore(sslConfig.pemKeyStore())); - if (keyDbPassword.isPresent()) - log.warning("Encrypted PEM key stores are not supported."); + factory.setKeyStore(createPemKeyStore(sslConfig.pemKeyStore())); break; case JKS: factory.setKeyStorePath(sslConfig.keyStorePath()); factory.setKeyStoreType(sslConfig.keyStoreType().toString()); - factory.setKeyStorePassword(keyDbPassword.orElseThrow(passwordRequiredForJKSKeyStore("key"))); + factory.setKeyStorePassword(secretStore.getSecret(keyDbPassword)); break; } if (!sslConfig.trustStorePath().isEmpty()) { factory.setTrustStorePath(sslConfig.trustStorePath()); factory.setTrustStoreType(sslConfig.trustStoreType().toString()); - if (sslConfig.useTrustStorePassword()) - factory.setTrustStorePassword(keyDbPassword.orElseThrow(passwordRequiredForJKSKeyStore("trust"))); + if (sslConfig.useTrustStorePassword()) { + factory.setTrustStorePassword(secretStore.getSecret(keyDbPassword)); + } } factory.setKeyManagerFactoryAlgorithm(sslConfig.sslKeyManagerFactoryAlgorithm()); @@ -190,19 +196,7 @@ public class ConnectorFactory { return new SslConnectionFactory(factory, HttpVersion.HTTP_1_1.asString()); } - /** Returns the secret password with the given name, or empty if the password name is null or empty */ - private Optional<String> secret(String keyname) { - return Optional.of(keyname).filter(key -> !key.isEmpty()).map(secretStore::getSecret); - } - - @SuppressWarnings("ThrowableInstanceNeverThrown") - private Supplier<RuntimeException> passwordRequiredForJKSKeyStore(String type) { - return () -> new RuntimeException(String.format("Password is required for JKS %s store", type)); - } - - private static KeyStore getKeyStore(PemKeyStore pemKeyStore) { - Preconditions.checkArgument(!pemKeyStore.certificatePath().isEmpty(), "Missing certificate path."); - Preconditions.checkArgument(!pemKeyStore.keyPath().isEmpty(), "Missing key path."); + private static KeyStore createPemKeyStore(PemKeyStore pemKeyStore) { try { Path certificatePath = Paths.get(pemKeyStore.certificatePath()); Path keyPath = Paths.get(pemKeyStore.keyPath()); @@ -215,99 +209,4 @@ public class ConnectorFactory { } } - public static class JDiscServerConnector extends ServerConnector { - public static final String REQUEST_ATTRIBUTE = JDiscServerConnector.class.getName(); - private final static Logger log = Logger.getLogger(JDiscServerConnector.class.getName()); - private final Metric.Context metricCtx; - private final ServerConnectionStatistics statistics; - private final boolean tcpKeepAlive; - private final boolean tcpNoDelay; - private final ServerSocketChannel channelOpenedByActivator; - - private JDiscServerConnector(ConnectorConfig config, Metric metric, Server server, - ServerSocketChannel channelOpenedByActivator, ConnectionFactory... factories) { - super(server, factories); - this.channelOpenedByActivator = channelOpenedByActivator; - this.tcpKeepAlive = config.tcpKeepAliveEnabled(); - this.tcpNoDelay = config.tcpNoDelay(); - this.metricCtx = createMetricContext(config, metric); - - this.statistics = new ServerConnectionStatistics(); - addBean(statistics); - } - - private Metric.Context createMetricContext(ConnectorConfig config, Metric metric) { - Map<String, Object> props = new TreeMap<>(); - props.put(JettyHttpServer.Metrics.NAME_DIMENSION, config.name()); - props.put(JettyHttpServer.Metrics.PORT_DIMENSION, config.listenPort()); - return metric.createContext(props); - } - - @Override - protected void configure(final Socket socket) { - super.configure(socket); - try { - socket.setKeepAlive(tcpKeepAlive); - socket.setTcpNoDelay(tcpNoDelay); - } catch (SocketException ignored) { - } - } - - @Override - public void open() throws IOException { - if (channelOpenedByActivator == null) { - log.log(Level.INFO, "No channel set by activator, opening channel ourselves."); - try { - super.open(); - } catch (RuntimeException e) { - log.log(Level.SEVERE, "failed org.eclipse.jetty.server.Server open() with port "+getPort()); - throw e; - } - return; - } - log.log(Level.INFO, "Using channel set by activator: " + channelOpenedByActivator); - - channelOpenedByActivator.socket().setReuseAddress(getReuseAddress()); - int localPort = channelOpenedByActivator.socket().getLocalPort(); - try { - uglySetLocalPort(localPort); - } catch (NoSuchFieldException | IllegalAccessException e) { - throw new RuntimeException("Could not set local port.", e); - } - if (localPort <= 0) { - throw new IOException("Server channel not bound"); - } - addBean(channelOpenedByActivator); - channelOpenedByActivator.configureBlocking(true); - addBean(channelOpenedByActivator); - - try { - uglySetChannel(channelOpenedByActivator); - } catch (NoSuchFieldException | IllegalAccessException e) { - throw new RuntimeException("Could not set server channel.", e); - } - } - - private void uglySetLocalPort(int localPort) throws NoSuchFieldException, IllegalAccessException { - Field localPortField = ServerConnector.class.getDeclaredField("_localPort"); - localPortField.setAccessible(true); - localPortField.set(this, localPort); - } - - private void uglySetChannel(ServerSocketChannel channelOpenedByActivator) throws NoSuchFieldException, - IllegalAccessException { - Field acceptChannelField = ServerConnector.class.getDeclaredField("_acceptChannel"); - acceptChannelField.setAccessible(true); - acceptChannelField.set(this, channelOpenedByActivator); - } - - public ServerConnectionStatistics getStatistics() { return statistics; } - - public Metric.Context getMetricContext() { return metricCtx; } - - public static JDiscServerConnector fromRequest(ServletRequest request) { - return (JDiscServerConnector)request.getAttribute(REQUEST_ATTRIBUTE); - } - } - } diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscHttpServlet.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscHttpServlet.java index 543cf8ab43e..27f72c7b4bf 100644 --- a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscHttpServlet.java +++ b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscHttpServlet.java @@ -20,7 +20,6 @@ import java.util.logging.Level; import java.util.logging.Logger; import static com.yahoo.jdisc.http.core.HttpServletRequestUtils.getConnection; -import static com.yahoo.jdisc.http.server.jetty.ConnectorFactory.JDiscServerConnector; /** * @author Simon Thoresen Hult diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscServerConnector.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscServerConnector.java new file mode 100644 index 00000000000..8dd50074c32 --- /dev/null +++ b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JDiscServerConnector.java @@ -0,0 +1,122 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.jdisc.http.server.jetty; + +import com.yahoo.jdisc.Metric; +import com.yahoo.jdisc.http.ConnectorConfig; +import org.eclipse.jetty.server.ConnectionFactory; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnectionStatistics; +import org.eclipse.jetty.server.ServerConnector; + +import javax.servlet.ServletRequest; +import java.io.IOException; +import java.lang.reflect.Field; +import java.net.Socket; +import java.net.SocketException; +import java.nio.channels.ServerSocketChannel; +import java.util.Map; +import java.util.TreeMap; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * @author bjorncs + */ +class JDiscServerConnector extends ServerConnector { + public static final String REQUEST_ATTRIBUTE = JDiscServerConnector.class.getName(); + private final static Logger log = Logger.getLogger(JDiscServerConnector.class.getName()); + private final Metric.Context metricCtx; + private final ServerConnectionStatistics statistics; + private final boolean tcpKeepAlive; + private final boolean tcpNoDelay; + private final ServerSocketChannel channelOpenedByActivator; + + JDiscServerConnector(ConnectorConfig config, Metric metric, Server server, + ServerSocketChannel channelOpenedByActivator, ConnectionFactory... factories) { + super(server, factories); + this.channelOpenedByActivator = channelOpenedByActivator; + this.tcpKeepAlive = config.tcpKeepAliveEnabled(); + this.tcpNoDelay = config.tcpNoDelay(); + this.metricCtx = createMetricContext(config, metric); + + this.statistics = new ServerConnectionStatistics(); + addBean(statistics); + } + + private Metric.Context createMetricContext(ConnectorConfig config, Metric metric) { + Map<String, Object> props = new TreeMap<>(); + props.put(JettyHttpServer.Metrics.NAME_DIMENSION, config.name()); + props.put(JettyHttpServer.Metrics.PORT_DIMENSION, config.listenPort()); + return metric.createContext(props); + } + + @Override + protected void configure(final Socket socket) { + super.configure(socket); + try { + socket.setKeepAlive(tcpKeepAlive); + socket.setTcpNoDelay(tcpNoDelay); + } catch (SocketException ignored) { + } + } + + @Override + public void open() throws IOException { + if (channelOpenedByActivator == null) { + log.log(Level.INFO, "No channel set by activator, opening channel ourselves."); + try { + super.open(); + } catch (RuntimeException e) { + log.log(Level.SEVERE, "failed org.eclipse.jetty.server.Server open() with port " + getPort()); + throw e; + } + return; + } + log.log(Level.INFO, "Using channel set by activator: " + channelOpenedByActivator); + + channelOpenedByActivator.socket().setReuseAddress(getReuseAddress()); + int localPort = channelOpenedByActivator.socket().getLocalPort(); + try { + uglySetLocalPort(localPort); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException("Could not set local port.", e); + } + if (localPort <= 0) { + throw new IOException("Server channel not bound"); + } + addBean(channelOpenedByActivator); + channelOpenedByActivator.configureBlocking(true); + addBean(channelOpenedByActivator); + + try { + uglySetChannel(channelOpenedByActivator); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new RuntimeException("Could not set server channel.", e); + } + } + + private void uglySetLocalPort(int localPort) throws NoSuchFieldException, IllegalAccessException { + Field localPortField = ServerConnector.class.getDeclaredField("_localPort"); + localPortField.setAccessible(true); + localPortField.set(this, localPort); + } + + private void uglySetChannel(ServerSocketChannel channelOpenedByActivator) throws NoSuchFieldException, + IllegalAccessException { + Field acceptChannelField = ServerConnector.class.getDeclaredField("_acceptChannel"); + acceptChannelField.setAccessible(true); + acceptChannelField.set(this, channelOpenedByActivator); + } + + public ServerConnectionStatistics getStatistics() { + return statistics; + } + + public Metric.Context getMetricContext() { + return metricCtx; + } + + public static JDiscServerConnector fromRequest(ServletRequest request) { + return (JDiscServerConnector) request.getAttribute(REQUEST_ATTRIBUTE); + } +} diff --git a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JettyHttpServer.java b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JettyHttpServer.java index aaa213095c6..7bff685e780 100644 --- a/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JettyHttpServer.java +++ b/jdisc_http_service/src/main/java/com/yahoo/jdisc/http/server/jetty/JettyHttpServer.java @@ -58,8 +58,6 @@ import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; -import static com.yahoo.jdisc.http.server.jetty.ConnectorFactory.JDiscServerConnector; - /** * @author Simon Thoresen Hult * @author bjorncs diff --git a/jdisc_http_service/src/main/resources/configdefinitions/jdisc.http.connector.def b/jdisc_http_service/src/main/resources/configdefinitions/jdisc.http.connector.def index 7a13ec2485f..59893753bea 100644 --- a/jdisc_http_service/src/main/resources/configdefinitions/jdisc.http.connector.def +++ b/jdisc_http_service/src/main/resources/configdefinitions/jdisc.http.connector.def @@ -45,8 +45,9 @@ ssl.enabled bool default=false # The name of the key to the password to the key store if in the secret store, if JKS is used. # Must be empty with PEM -# By default this is also used to look up the password to the trust store. +# By default this is also used to look up the password to the trust store. ssl.keyDbKey string default="" +# TODO Rename keyDbKey to keyStorePassword after introducing custom services.xml syntax # Names of protocols to exclude. ssl.excludeProtocol[].name string @@ -74,6 +75,8 @@ ssl.trustStoreType enum { JKS } default=JKS # JKS only - the path to the truststore. ssl.trustStorePath string default="" +# TODO Add separate config for truststore password + # Whether we should use keyDbKey as password to the trust store (true, default), # or use no password with the trust store (false) ssl.useTrustStorePassword bool default=true diff --git a/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactoryTest.java b/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactoryTest.java index 1380abc03f3..fceec51231a 100644 --- a/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactoryTest.java +++ b/jdisc_http_service/src/test/java/com/yahoo/jdisc/http/server/jetty/ConnectorFactoryTest.java @@ -59,8 +59,8 @@ public class ConnectorFactoryTest { try { ConnectorFactory factory = new ConnectorFactory(new ConnectorConfig(new ConnectorConfig.Builder()), new ThrowingSecretStore()); - ConnectorFactory.JDiscServerConnector connector = - (ConnectorFactory.JDiscServerConnector)factory.createConnector(new DummyMetric(), server, null); + JDiscServerConnector connector = + (JDiscServerConnector)factory.createConnector(new DummyMetric(), server, null); server.addConnector(connector); server.setHandler(new HelloWorldHandler()); server.start(); @@ -86,7 +86,7 @@ public class ConnectorFactoryTest { serverChannel.socket().bind(new InetSocketAddress(0)); ConnectorFactory factory = new ConnectorFactory(new ConnectorConfig(new ConnectorConfig.Builder()), new ThrowingSecretStore()); - ConnectorFactory.JDiscServerConnector connector = (ConnectorFactory.JDiscServerConnector) factory.createConnector(new DummyMetric(), server, serverChannel); + JDiscServerConnector connector = (JDiscServerConnector) factory.createConnector(new DummyMetric(), server, serverChannel); server.addConnector(connector); server.setHandler(new HelloWorldHandler()); server.start(); diff --git a/searchcore/src/tests/proton/proton_config_fetcher/proton_config_fetcher_test.cpp b/searchcore/src/tests/proton/proton_config_fetcher/proton_config_fetcher_test.cpp index 6c682ea33e9..b9059338f27 100644 --- a/searchcore/src/tests/proton/proton_config_fetcher/proton_config_fetcher_test.cpp +++ b/searchcore/src/tests/proton/proton_config_fetcher/proton_config_fetcher_test.cpp @@ -170,12 +170,12 @@ struct ProtonConfigOwner : public proton::IProtonConfigurer return getConfigured(); } virtual void reconfigure(std::shared_ptr<ProtonConfigSnapshot> cfg) override { - std::unique_lock<std::mutex> guard(_mutex); + std::lock_guard<std::mutex> guard(_mutex); _config.set(cfg); _configured = true; } bool getConfigured() const { - std::unique_lock<std::mutex> guard(_mutex); + std::lock_guard<std::mutex> guard(_mutex); return _configured; } BootstrapConfig::SP getBootstrapConfig() { diff --git a/searchcore/src/vespa/searchcore/fdispatch/common/search.cpp b/searchcore/src/vespa/searchcore/fdispatch/common/search.cpp index 7685ddcc328..7b060e793f6 100644 --- a/searchcore/src/vespa/searchcore/fdispatch/common/search.cpp +++ b/searchcore/src/vespa/searchcore/fdispatch/common/search.cpp @@ -165,7 +165,7 @@ void FastS_SyncSearchAdapter::DoneQuery(FastS_ISearch *, FastS_SearchContext) { - std::unique_lock<std::mutex> guard(_lock); + std::lock_guard<std::mutex> guard(_lock); _queryDone = true; if (_waitQuery) { _cond.notify_one(); @@ -177,7 +177,7 @@ void FastS_SyncSearchAdapter::DoneDocsums(FastS_ISearch *, FastS_SearchContext) { - std::unique_lock<std::mutex> guard(_lock); + std::lock_guard<std::mutex> guard(_lock); _docsumsDone = true; if (_waitDocsums) { _cond.notify_one(); diff --git a/searchcore/src/vespa/searchcore/fdispatch/search/engine_base.cpp b/searchcore/src/vespa/searchcore/fdispatch/search/engine_base.cpp index 83312a41875..24668db6024 100644 --- a/searchcore/src/vespa/searchcore/fdispatch/search/engine_base.cpp +++ b/searchcore/src/vespa/searchcore/fdispatch/search/engine_base.cpp @@ -112,7 +112,7 @@ void FastS_EngineBase::SlowQuery(double limit, double secs, bool silent) { { - std::unique_lock<std::mutex> engineGuard(_lock); + std::lock_guard<std::mutex> engineGuard(_lock); _stats._slowQueryCnt++; _stats._slowQuerySecs += secs; } @@ -127,7 +127,7 @@ void FastS_EngineBase::SlowDocsum(double limit, double secs) { { - std::unique_lock<std::mutex> engineGuard(_lock); + std::lock_guard<std::mutex> engineGuard(_lock); _stats._slowDocsumCnt++; _stats._slowDocsumSecs += secs; } @@ -173,7 +173,7 @@ FastS_EngineBase::SampleQueueLens() double queueLen; double activecnt; - std::unique_lock<std::mutex> engineGuard(_lock); + std::lock_guard<std::mutex> engineGuard(_lock); if (_stats._queueLenSampleCnt > 0) queueLen = (double) _stats._queueLenSampleAcc / (double) _stats._queueLenSampleCnt; else @@ -217,7 +217,7 @@ FastS_EngineBase::MarkBad(uint32_t badness) bool worse = false; { - std::unique_lock<std::mutex> engineGuard(_lock); + std::lock_guard<std::mutex> engineGuard(_lock); if (badness > _badness) { _badness = badness; worse = true; diff --git a/searchcore/src/vespa/searchcore/fdispatch/search/fnet_search.cpp b/searchcore/src/vespa/searchcore/fdispatch/search/fnet_search.cpp index 0cfbdc8b69a..85599b9e897 100644 --- a/searchcore/src/vespa/searchcore/fdispatch/search/fnet_search.cpp +++ b/searchcore/src/vespa/searchcore/fdispatch/search/fnet_search.cpp @@ -1077,7 +1077,7 @@ FastS_FNET_Search::Search(uint32_t searchOffset, // allow FNET responses while requests are being sent { - std::unique_lock<std::mutex> searchGuard(_lock); + std::lock_guard<std::mutex> searchGuard(_lock); ++_pendingQueries; // add Elephant query node to avoid early query done ++_queryNodes; // add Elephant query node to avoid early query done _FNET_mode = FNET_QUERY; @@ -1102,7 +1102,7 @@ FastS_FNET_Search::Search(uint32_t searchOffset, // finalize setup and check if query is still in progress bool done; { - std::unique_lock<std::mutex> searchGuard(_lock); + std::lock_guard<std::mutex> searchGuard(_lock); assert(_queryNodes >= _pendingQueries); for (uint32_t i: send_failed) { // conditional revert of state for failed nodes @@ -1398,7 +1398,7 @@ FastS_FNET_Search::GetDocsums(const FastS_hitresult *hits, uint32_t hitcnt) ConnectDocsumNodes(ignoreRow); bool done; { - std::unique_lock<std::mutex> searchGuard(_lock); + std::lock_guard<std::mutex> searchGuard(_lock); // patch in engine dependent features and send docsum requests diff --git a/searchcore/src/vespa/searchcore/fdispatch/search/nodemanager.cpp b/searchcore/src/vespa/searchcore/fdispatch/search/nodemanager.cpp index 6fddfae2ab0..4b272a615a6 100644 --- a/searchcore/src/vespa/searchcore/fdispatch/search/nodemanager.cpp +++ b/searchcore/src/vespa/searchcore/fdispatch/search/nodemanager.cpp @@ -125,7 +125,7 @@ FastS_NodeManager::CheckTempFail() _checkTempFailScheduled = false; tempfail = false; { - std::unique_lock<std::mutex> mangerGuard(_managerLock); + std::lock_guard<std::mutex> mangerGuard(_managerLock); FastS_DataSetCollection *dsc = PeekDataSetCollection(); for (unsigned int i = 0; i < dsc->GetMaxNumDataSets(); i++) { FastS_DataSetBase *ds; @@ -166,7 +166,7 @@ uint32_t FastS_NodeManager::SetPartMap(const PartitionsConfig& partmap, unsigned int waitms) { - std::unique_lock<std::mutex> configGuard(_configLock); + std::lock_guard<std::mutex> configGuard(_configLock); FastS_DataSetCollDesc *configDesc = new FastS_DataSetCollDesc(); if (!configDesc->ReadConfig(partmap)) { LOG(error, "NodeManager::SetPartMap: Failed to load configuration"); @@ -275,7 +275,7 @@ FastS_NodeManager::SetDataSetCollection(FastS_DataSetCollection *dsc) } else { { - std::unique_lock<std::mutex> managerGuard(_managerLock); + std::lock_guard<std::mutex> managerGuard(_managerLock); _gencnt++; gencnt = _gencnt; @@ -304,7 +304,7 @@ FastS_NodeManager::GetDataSetCollection() { FastS_DataSetCollection *ret; - std::unique_lock<std::mutex> managerGuard(_managerLock); + std::lock_guard<std::mutex> managerGuard(_managerLock); ret = _datasetCollection; FastS_assert(ret != NULL); ret->addRef(); @@ -320,8 +320,8 @@ FastS_NodeManager::ShutdownConfig() FastS_DataSetCollection *old_dsc; { - std::unique_lock<std::mutex> configGuard(_configLock); - std::unique_lock<std::mutex> managerGuard(_managerLock); + std::lock_guard<std::mutex> configGuard(_configLock); + std::lock_guard<std::mutex> managerGuard(_managerLock); _shutdown = true; // disallow SetPartMap dsc = _datasetCollection; _datasetCollection = new FastS_DataSetCollection(_appCtx); @@ -347,7 +347,7 @@ FastS_NodeManager::GetTotalPartitions() uint32_t ret; ret = 0; - std::unique_lock<std::mutex> managerGuard(_managerLock); + std::lock_guard<std::mutex> managerGuard(_managerLock); FastS_DataSetCollection *dsc = PeekDataSetCollection(); for (unsigned int i = 0; i < dsc->GetMaxNumDataSets(); i++) { FastS_DataSetBase *ds; @@ -429,7 +429,7 @@ FastS_NodeManager::CheckEvents(FastS_TimeKeeper *timeKeeper) FastS_DataSetCollection *tmp; { - std::unique_lock<std::mutex> managerGuard(_managerLock); + std::lock_guard<std::mutex> managerGuard(_managerLock); old_dsc = _oldDSCList; } diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_directory.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_directory.cpp index eac994bb339..f775f4443f8 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_directory.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_directory.cpp @@ -56,7 +56,7 @@ AttributeDirectory::getDirName() const { std::shared_ptr<AttributeDiskLayout> diskLayout; { - std::unique_lock<std::mutex> guard(_mutex); + std::lock_guard<std::mutex> guard(_mutex); assert(!_diskLayout.expired()); diskLayout = _diskLayout.lock(); } @@ -204,7 +204,7 @@ void AttributeDirectory::detach() { assert(empty()); - std::unique_lock<std::mutex> guard(_mutex); + std::lock_guard<std::mutex> guard(_mutex); _diskLayout.reset(); } @@ -238,7 +238,7 @@ AttributeDirectory::tryGetWriter() bool AttributeDirectory::empty() const { - std::unique_lock<std::mutex> guard(_mutex); + std::lock_guard<std::mutex> guard(_mutex); return _snapInfo.snapshots().empty(); } diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attributedisklayout.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attributedisklayout.cpp index 1fcffa92cce..bb2f99d077b 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attributedisklayout.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/attributedisklayout.cpp @@ -59,7 +59,7 @@ AttributeDiskLayout::getAttributeDir(const vespalib::string &name) std::shared_ptr<AttributeDirectory> AttributeDiskLayout::createAttributeDir(const vespalib::string &name) { - std::unique_lock<std::shared_timed_mutex> guard(_mutex); + std::lock_guard<std::shared_timed_mutex> guard(_mutex); auto itr = _dirs.find(name); if (itr == _dirs.end()) { auto dir = std::make_shared<AttributeDirectory>(shared_from_this(), name); @@ -81,7 +81,7 @@ AttributeDiskLayout::removeAttributeDir(const vespalib::string &name, search::Se writer->invalidateOldSnapshots(serialNum); writer->removeInvalidSnapshots(); if (writer->removeDiskDir()) { - std::unique_lock<std::shared_timed_mutex> guard(_mutex); + std::lock_guard<std::shared_timed_mutex> guard(_mutex); auto itr = _dirs.find(name); assert(itr != _dirs.end()); assert(dir.get() == itr->second.get()); @@ -89,7 +89,7 @@ AttributeDiskLayout::removeAttributeDir(const vespalib::string &name, search::Se writer->detach(); } } else { - std::unique_lock<std::shared_timed_mutex> guard(_mutex); + std::lock_guard<std::shared_timed_mutex> guard(_mutex); auto itr = _dirs.find(name); if (itr != _dirs.end()) { assert(dir.get() != itr->second.get()); diff --git a/searchcore/src/vespa/searchcore/proton/metrics/job_tracker.cpp b/searchcore/src/vespa/searchcore/proton/metrics/job_tracker.cpp index 753c84cd9b6..6d05ce8c57d 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/job_tracker.cpp +++ b/searchcore/src/vespa/searchcore/proton/metrics/job_tracker.cpp @@ -20,14 +20,14 @@ JobTracker::sampleLoad(time_point now, const std::lock_guard<std::mutex> &guard) void JobTracker::start() { - std::unique_lock<std::mutex> guard(_lock); + std::lock_guard<std::mutex> guard(_lock); _sampler.startJob(std::chrono::steady_clock::now()); } void JobTracker::end() { - std::unique_lock<std::mutex> guard(_lock); + std::lock_guard<std::mutex> guard(_lock); _sampler.endJob(std::chrono::steady_clock::now()); } diff --git a/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_registry.cpp b/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_registry.cpp index 75a20f9f8e5..68aaad3b557 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_registry.cpp +++ b/searchcore/src/vespa/searchcore/proton/reference/document_db_reference_registry.cpp @@ -30,7 +30,7 @@ DocumentDBReferenceRegistry::get(vespalib::stringref name) const std::shared_ptr<IDocumentDBReference> DocumentDBReferenceRegistry::tryGet(vespalib::stringref name) const { - std::unique_lock<std::mutex> guard(_lock); + std::lock_guard<std::mutex> guard(_lock); auto itr = _handlers.find(name); if (itr == _handlers.end()) { return std::shared_ptr<IDocumentDBReference>(); diff --git a/searchcore/src/vespa/searchcore/proton/server/pendinglidtracker.cpp b/searchcore/src/vespa/searchcore/proton/server/pendinglidtracker.cpp index 15283c170cc..79bf970aeac 100644 --- a/searchcore/src/vespa/searchcore/proton/server/pendinglidtracker.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/pendinglidtracker.cpp @@ -6,7 +6,6 @@ namespace proton { -using LockGuard = std::unique_lock<std::mutex>; PendingLidTracker::PendingLidTracker() : _mutex(), _cond(), @@ -19,12 +18,12 @@ PendingLidTracker::~PendingLidTracker() { void PendingLidTracker::produce(uint32_t lid) { - LockGuard guard(_mutex); + std::lock_guard<std::mutex> guard(_mutex); _pending[lid]++; } void PendingLidTracker::consume(uint32_t lid) { - LockGuard guard(_mutex); + std::lock_guard<std::mutex> guard(_mutex); auto found = _pending.find(lid); assert (found != _pending.end()); assert (found->second > 0); @@ -38,7 +37,7 @@ PendingLidTracker::consume(uint32_t lid) { void PendingLidTracker::waitForConsumedLid(uint32_t lid) { - LockGuard guard(_mutex); + std::unique_lock<std::mutex> guard(_mutex); while (_pending.find(lid) != _pending.end()) { _cond.wait(guard); } diff --git a/searchlib/src/tests/postinglistbm/andstress.cpp b/searchlib/src/tests/postinglistbm/andstress.cpp index 736d53508b4..40f919509e8 100644 --- a/searchlib/src/tests/postinglistbm/andstress.cpp +++ b/searchlib/src/tests/postinglistbm/andstress.cpp @@ -280,7 +280,7 @@ AndStressMaster::Task * AndStressMaster::getTask() { Task *result = NULL; - std::unique_lock<std::mutex> taskGuard(_taskLock); + std::lock_guard<std::mutex> taskGuard(_taskLock); if (_taskIdx < _tasks.size()) { result = &_tasks[_taskIdx]; ++_taskIdx; diff --git a/staging_vespalib/src/vespa/vespalib/util/clock.cpp b/staging_vespalib/src/vespa/vespalib/util/clock.cpp index b19b067afa9..c9768417914 100644 --- a/staging_vespalib/src/vespa/vespalib/util/clock.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/clock.cpp @@ -45,7 +45,7 @@ void Clock::Run(FastOS_ThreadInterface *thread, void *arguments) void Clock::stop(void) { - std::unique_lock<std::mutex> guard(_lock); + std::lock_guard<std::mutex> guard(_lock); _stop = true; _cond.notify_all(); } diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp index 363065be65c..03a51cf6180 100644 --- a/storage/src/tests/distributor/bucketdbupdatertest.cpp +++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp @@ -5,6 +5,7 @@ #include <vespa/storageapi/message/persistence.h> #include <vespa/storage/distributor/bucketdbupdater.h> #include <vespa/storage/distributor/pending_bucket_space_db_transition.h> +#include <vespa/storage/distributor/outdated_nodes_map.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/storageframework/defaultimplementation/clock/realclock.h> #include <vespa/storage/storageutil/distributorstatecache.h> @@ -157,6 +158,7 @@ protected: } public: + using OutdatedNodesMap = dbtransition::OutdatedNodesMap; void setUp() override { createLinks(); }; @@ -539,9 +541,9 @@ public: ClusterInformation::CSP clusterInfo( owner.createClusterInfo(oldClusterState)); - std::unordered_set<uint16_t> outdatedNodes; + OutdatedNodesMap outdatedNodesMap; state = PendingClusterState::createForClusterStateChange( - clock, clusterInfo, sender, owner.getBucketSpaceRepo(), cmd, outdatedNodes, + clock, clusterInfo, sender, owner.getBucketSpaceRepo(), cmd, outdatedNodesMap, api::Timestamp(1)); } @@ -552,7 +554,6 @@ public: ClusterInformation::CSP clusterInfo( owner.createClusterInfo(oldClusterState)); - std::unordered_set<uint16_t> outdatedNodes; state = PendingClusterState::createForDistributionChange( clock, clusterInfo, sender, owner.getBucketSpaceRepo(), api::Timestamp(1)); } @@ -1640,10 +1641,10 @@ BucketDBUpdaterTest::testPendingClusterStateReceive() framework::defaultimplementation::FakeClock clock; ClusterInformation::CSP clusterInfo(createClusterInfo("cluster:d")); - std::unordered_set<uint16_t> outdatedNodes; + OutdatedNodesMap outdatedNodesMap; std::unique_ptr<PendingClusterState> state( PendingClusterState::createForClusterStateChange( - clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodes, + clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodesMap, api::Timestamp(1))); CPPUNIT_ASSERT_EQUAL(3, (int)sender.commands.size()); @@ -1798,7 +1799,7 @@ BucketDBUpdaterTest::mergeBucketLists( framework::MilliSecTimer timer(clock); MessageSenderStub sender; - std::unordered_set<uint16_t> outdatedNodes; + OutdatedNodesMap outdatedNodesMap; { auto cmd(std::make_shared<api::SetSystemStateCommand>(oldState)); @@ -1808,7 +1809,7 @@ BucketDBUpdaterTest::mergeBucketLists( ClusterInformation::CSP clusterInfo(createClusterInfo("cluster:d")); std::unique_ptr<PendingClusterState> state( PendingClusterState::createForClusterStateChange( - clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodes, + clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodesMap, beforeTime)); parseInputData(existingData, beforeTime, *state, includeBucketInfo); @@ -1827,7 +1828,7 @@ BucketDBUpdaterTest::mergeBucketLists( ClusterInformation::CSP clusterInfo(createClusterInfo(oldState.toString())); std::unique_ptr<PendingClusterState> state( PendingClusterState::createForClusterStateChange( - clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodes, + clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodesMap, afterTime)); parseInputData(newData, afterTime, *state, includeBucketInfo); diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp index 50431bda37e..b68e53c7136 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp @@ -171,7 +171,7 @@ BucketDBUpdater::storageDistributionChanged( _sender, _bucketSpaceComponent.getBucketSpaceRepo(), _bucketSpaceComponent.getUniqueTimestamp()); - _outdatedNodes = _pendingClusterState->getOutdatedNodeSet(); + _outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap(); } void @@ -219,9 +219,9 @@ BucketDBUpdater::onSetSystemState( _sender, _bucketSpaceComponent.getBucketSpaceRepo(), cmd, - _outdatedNodes, + _outdatedNodesMap, _bucketSpaceComponent.getUniqueTimestamp()); - _outdatedNodes = _pendingClusterState->getOutdatedNodeSet(); + _outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap(); if (isPendingClusterStateCompleted()) { processCompletedPendingClusterState(); @@ -500,7 +500,7 @@ BucketDBUpdater::processCompletedPendingClusterState() } _pendingClusterState.reset(); - _outdatedNodes.clear(); + _outdatedNodesMap.clear(); sendAllQueuedBucketRechecks(); completeTransitionTimer(); } diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h index 994e207f200..ee07c46754f 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.h +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h @@ -7,6 +7,7 @@ #include "distributormessagesender.h" #include "pendingclusterstate.h" #include "distributor_bucket_space_component.h" +#include "outdated_nodes_map.h" #include <vespa/document/bucket/bucketid.h> #include <vespa/storageapi/messageapi/returncode.h> #include <vespa/storageapi/message/bucket.h> @@ -27,6 +28,8 @@ class BucketDBUpdater : public framework::StatusReporter, public api::MessageHandler { public: + using OutdatedNodes = dbtransition::OutdatedNodes; + using OutdatedNodesMap = dbtransition::OutdatedNodesMap; BucketDBUpdater(Distributor& owner, DistributorBucketSpaceRepo &bucketSpaceRepo, DistributorBucketSpace& bucketSpace, @@ -226,7 +229,7 @@ private: std::list<PendingClusterState::Summary> _history; DistributorMessageSender& _sender; std::set<EnqueuedBucketRecheck> _enqueuedRechecks; - std::unordered_set<uint16_t> _outdatedNodes; + OutdatedNodesMap _outdatedNodesMap; framework::MilliSecTimer _transitionTimer; }; diff --git a/storage/src/vespa/storage/distributor/outdated_nodes.h b/storage/src/vespa/storage/distributor/outdated_nodes.h new file mode 100644 index 00000000000..fddb1806d82 --- /dev/null +++ b/storage/src/vespa/storage/distributor/outdated_nodes.h @@ -0,0 +1,11 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <unordered_set> + +namespace storage::distributor::dbtransition { + +using OutdatedNodes = std::unordered_set<uint16_t>; + +} diff --git a/storage/src/vespa/storage/distributor/outdated_nodes_map.h b/storage/src/vespa/storage/distributor/outdated_nodes_map.h new file mode 100644 index 00000000000..8d08b20732b --- /dev/null +++ b/storage/src/vespa/storage/distributor/outdated_nodes_map.h @@ -0,0 +1,13 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "outdated_nodes.h" +#include <vespa/document/bucket/bucketspace.h> +#include <unordered_map> + +namespace storage::distributor::dbtransition { + +using OutdatedNodesMap = std::unordered_map<document::BucketSpace, OutdatedNodes, document::BucketSpace::hash>; + +} diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp index 36961a0fec0..ed9c8bc222b 100644 --- a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp +++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.cpp @@ -3,6 +3,7 @@ #include "pending_bucket_space_db_transition.h" #include "clusterinformation.h" #include "pendingclusterstate.h" +#include "distributor_bucket_space.h" #include <vespa/storage/common/bucketoperationlogger.h> #include <algorithm> @@ -11,7 +12,14 @@ LOG_SETUP(".pendingbucketspacedbtransition"); namespace storage::distributor { +using lib::Node; +using lib::NodeType; +using lib::NodeState; + PendingBucketSpaceDbTransition::PendingBucketSpaceDbTransition(const PendingClusterState &pendingClusterState, + DistributorBucketSpace &distributorBucketSpace, + bool distributionChanged, + const OutdatedNodes &outdatedNodes, std::shared_ptr<const ClusterInformation> clusterInfo, const lib::ClusterState &newClusterState, api::Timestamp creationTimestamp) @@ -20,11 +28,24 @@ PendingBucketSpaceDbTransition::PendingBucketSpaceDbTransition(const PendingClus _removedBuckets(), _missingEntries(), _clusterInfo(std::move(clusterInfo)), - _outdatedNodes(pendingClusterState.getOutdatedNodeSet()), + _outdatedNodes(newClusterState.getNodeCount(NodeType::STORAGE)), + _prevClusterState(_clusterInfo->getClusterState()), _newClusterState(newClusterState), _creationTimestamp(creationTimestamp), - _pendingClusterState(pendingClusterState) + _pendingClusterState(pendingClusterState), + _distributorBucketSpace(distributorBucketSpace), + _distributorIndex(_clusterInfo->getDistributorIndex()), + _bucketOwnershipTransfer(distributionChanged) { + if (distributorChanged()) { + _bucketOwnershipTransfer = true; + } + if (_bucketOwnershipTransfer) { + markAllAvailableNodesAsRequiringRequest(); + } else { + updateSetOfNodesThatAreOutdated(); + addAdditionalNodesToOutdatedSet(outdatedNodes); + } } PendingBucketSpaceDbTransition::~PendingBucketSpaceDbTransition() @@ -67,10 +88,12 @@ PendingBucketSpaceDbTransition::insertInfo(BucketDatabase::Entry& info, const Ra std::vector<BucketCopy> copiesToAddOrUpdate( getCopiesThatAreNewOrAltered(info, range)); + const auto &dist(_distributorBucketSpace.getDistribution()); std::vector<uint16_t> order( - _clusterInfo->getIdealStorageNodesForState( + dist.getIdealStorageNodes( _newClusterState, - _entries[range.first].bucketId)); + _entries[range.first].bucketId, + _clusterInfo->getStorageUpStates())); info->addNodes(copiesToAddOrUpdate, order, TrustedUpdate::DEFER); LOG_BUCKET_OPERATION_NO_LOCK( @@ -192,8 +215,9 @@ PendingBucketSpaceDbTransition::addToBucketDB(BucketDatabase& db, const Range& r } void -PendingBucketSpaceDbTransition::mergeInto(BucketDatabase& db) +PendingBucketSpaceDbTransition::mergeIntoBucketDatabase() { + BucketDatabase &db(_distributorBucketSpace.getBucketDatabase()); std::sort(_entries.begin(), _entries.end()); db.forEach(*this); @@ -224,6 +248,170 @@ PendingBucketSpaceDbTransition::onRequestBucketInfoReply(const api::RequestBucke } } +bool +PendingBucketSpaceDbTransition::distributorChanged() +{ + const auto &oldState(_prevClusterState); + const auto &newState(_newClusterState); + if (newState.getDistributionBitCount() != oldState.getDistributionBitCount()) { + return true; + } + + Node myNode(NodeType::DISTRIBUTOR, _distributorIndex); + if (oldState.getNodeState(myNode).getState() == lib::State::DOWN) { + return true; + } + + uint16_t oldCount = oldState.getNodeCount(NodeType::DISTRIBUTOR); + uint16_t newCount = newState.getNodeCount(NodeType::DISTRIBUTOR); + + uint16_t maxCount = std::max(oldCount, newCount); + + for (uint16_t i = 0; i < maxCount; ++i) { + Node node(NodeType::DISTRIBUTOR, i); + + const lib::State& old(oldState.getNodeState(node).getState()); + const lib::State& nw(newState.getNodeState(node).getState()); + + if (nodeWasUpButNowIsDown(old, nw)) { + if (nodeInSameGroupAsSelf(i) || + nodeNeedsOwnershipTransferFromGroupDown(i, newState)) { + return true; + } + } + } + + return false; +} + +bool +PendingBucketSpaceDbTransition::nodeWasUpButNowIsDown(const lib::State& old, + const lib::State& nw) +{ + return (old.oneOf("uimr") && !nw.oneOf("uimr")); +} + +bool +PendingBucketSpaceDbTransition::nodeInSameGroupAsSelf(uint16_t index) const +{ + const auto &dist(_distributorBucketSpace.getDistribution()); + if (dist.getNodeGraph().getGroupForNode(index) == + dist.getNodeGraph().getGroupForNode(_distributorIndex)) { + LOG(debug, + "Distributor %d state changed, need to request data from all " + "storage nodes", + index); + return true; + } else { + LOG(debug, + "Distributor %d state changed but unrelated to my group.", + index); + return false; + } +} + +bool +PendingBucketSpaceDbTransition::nodeNeedsOwnershipTransferFromGroupDown( + uint16_t nodeIndex, + const lib::ClusterState& state) const +{ + const auto &dist(_distributorBucketSpace.getDistribution()); + if (!dist.distributorAutoOwnershipTransferOnWholeGroupDown()) { + return false; // Not doing anything for downed groups. + } + const lib::Group* group(dist.getNodeGraph().getGroupForNode(nodeIndex)); + // If there is no group information associated with the node (because the + // group has changed or the node has been removed from config), we must + // also invoke ownership transfer of buckets. + if (group == nullptr + || lib::Distribution::allDistributorsDown(*group, state)) + { + LOG(debug, + "Distributor %u state changed and is in a " + "group that now has no distributors remaining", + nodeIndex); + return true; + } + return false; +} + +uint16_t +PendingBucketSpaceDbTransition::newStateStorageNodeCount() const +{ + return _newClusterState.getNodeCount(lib::NodeType::STORAGE); +} + +bool +PendingBucketSpaceDbTransition::storageNodeMayHaveLostData(uint16_t index) +{ + Node node(NodeType::STORAGE, index); + NodeState newState = _newClusterState.getNodeState(node); + NodeState oldState = _prevClusterState.getNodeState(node); + + return (newState.getStartTimestamp() > oldState.getStartTimestamp()); +} + +void +PendingBucketSpaceDbTransition::updateSetOfNodesThatAreOutdated() +{ + const uint16_t nodeCount(newStateStorageNodeCount()); + for (uint16_t index = 0; index < nodeCount; ++index) { + if (storageNodeMayHaveLostData(index) || storageNodeChanged(index)) { + _outdatedNodes.insert(index); + } + } +} + +bool +PendingBucketSpaceDbTransition::storageNodeChanged(uint16_t index) { + Node node(NodeType::STORAGE, index); + NodeState newState = _newClusterState.getNodeState(node); + NodeState oldNodeState = _prevClusterState.getNodeState(node); + + // similarTo() also covers disk states. + if (!(oldNodeState.similarTo(newState))) { + LOG(debug, + "State for storage node %d has changed from '%s' to '%s', " + "updating bucket information", + index, + oldNodeState.toString().c_str(), + newState.toString().c_str()); + return true; + } + + return false; +} + +bool +PendingBucketSpaceDbTransition::storageNodeUpInNewState(uint16_t node) const +{ + return _newClusterState.getNodeState(Node(NodeType::STORAGE, node)) + .getState().oneOf(_clusterInfo->getStorageUpStates()); +} + +void +PendingBucketSpaceDbTransition::markAllAvailableNodesAsRequiringRequest() +{ + const uint16_t nodeCount(newStateStorageNodeCount()); + for (uint16_t i = 0; i < nodeCount; ++i) { + if (storageNodeUpInNewState(i)) { + _outdatedNodes.insert(i); + } + } +} + +void +PendingBucketSpaceDbTransition::addAdditionalNodesToOutdatedSet( + const std::unordered_set<uint16_t>& nodes) +{ + const uint16_t nodeCount(newStateStorageNodeCount()); + for (uint16_t node : nodes) { + if (node < nodeCount) { + _outdatedNodes.insert(node); + } + } +} + void PendingBucketSpaceDbTransition::addNodeInfo(const document::BucketId& id, const BucketCopy& copy) { diff --git a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h index 649ce676d88..903f9b762fb 100644 --- a/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h +++ b/storage/src/vespa/storage/distributor/pending_bucket_space_db_transition.h @@ -2,16 +2,17 @@ #pragma once #include "pending_bucket_space_db_transition_entry.h" +#include "outdated_nodes.h" #include <vespa/storage/bucketdb/bucketdatabase.h> -#include <unordered_set> namespace storage::api { class RequestBucketInfoReply; } -namespace storage::lib { class ClusterState; } +namespace storage::lib { class ClusterState; class State; } namespace storage::distributor { class ClusterInformation; class PendingClusterState; +class DistributorBucketSpace; /** * Class used by PendingClusterState to track request bucket info @@ -23,6 +24,7 @@ class PendingBucketSpaceDbTransition : public BucketDatabase::MutableEntryProces public: using Entry = dbtransition::Entry; using EntryList = std::vector<Entry>; + using OutdatedNodes = dbtransition::OutdatedNodes; private: using Range = std::pair<uint32_t, uint32_t>; @@ -37,11 +39,15 @@ private: // cluster state was constructed. // May be a superset of _requestedNodes, as some nodes that are outdated // may be down and thus cannot get a request. - const std::unordered_set<uint16_t> _outdatedNodes; + OutdatedNodes _outdatedNodes; + const lib::ClusterState &_prevClusterState; const lib::ClusterState &_newClusterState; const api::Timestamp _creationTimestamp; const PendingClusterState &_pendingClusterState; + DistributorBucketSpace &_distributorBucketSpace; + uint16_t _distributorIndex; + bool _bucketOwnershipTransfer; // BucketDataBase::MutableEntryProcessor API bool process(BucketDatabase::Entry& e) override; @@ -71,19 +77,37 @@ private: bool bucketInfoIteratorPointsToBucket(const document::BucketId& bucketId) const; std::string requestNodesToString(); + bool distributorChanged(); + static bool nodeWasUpButNowIsDown(const lib::State &old, const lib::State &nw); + bool storageNodeUpInNewState(uint16_t node) const; + bool nodeInSameGroupAsSelf(uint16_t index) const; + bool nodeNeedsOwnershipTransferFromGroupDown(uint16_t nodeIndex, const lib::ClusterState& state) const; + uint16_t newStateStorageNodeCount() const; + bool storageNodeMayHaveLostData(uint16_t index); + bool storageNodeChanged(uint16_t index); + void markAllAvailableNodesAsRequiringRequest(); + void addAdditionalNodesToOutdatedSet(const OutdatedNodes &nodes); + void updateSetOfNodesThatAreOutdated(); + public: PendingBucketSpaceDbTransition(const PendingClusterState &pendingClusterState, + DistributorBucketSpace &distributorBucketSpace, + bool distributionChanged, + const OutdatedNodes &outdatedNodes, std::shared_ptr<const ClusterInformation> clusterInfo, const lib::ClusterState &newClusterState, api::Timestamp creationTimestamp); ~PendingBucketSpaceDbTransition(); - // Merges all the results with the given bucket database. - void mergeInto(BucketDatabase& db); + // Merges all the results with the corresponding bucket database. + void mergeIntoBucketDatabase(); // Adds the info from the reply to our list of information. void onRequestBucketInfoReply(const api::RequestBucketInfoReply &reply, uint16_t node); + const OutdatedNodes &getOutdatedNodes() { return _outdatedNodes; } + bool getBucketOwnershipTransfer() const { return _bucketOwnershipTransfer; } + // Methods used by unit tests. const EntryList& results() const { return _entries; } void addNodeInfo(const document::BucketId& id, const BucketCopy& copy); diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp index e8f9442c76f..daa85822264 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp @@ -26,11 +26,10 @@ PendingClusterState::PendingClusterState( DistributorMessageSender& sender, DistributorBucketSpaceRepo &bucketSpaceRepo, const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd, - const std::unordered_set<uint16_t>& outdatedNodes, + const OutdatedNodesMap &outdatedNodesMap, api::Timestamp creationTimestamp) : _cmd(newStateCmd), _requestedNodes(newStateCmd->getSystemState().getNodeCount(lib::NodeType::STORAGE)), - _outdatedNodes(newStateCmd->getSystemState().getNodeCount(lib::NodeType::STORAGE)), _prevClusterState(clusterInfo->getClusterState()), _newClusterState(newStateCmd->getSystemState()), _clock(clock), @@ -38,17 +37,11 @@ PendingClusterState::PendingClusterState( _creationTimestamp(creationTimestamp), _sender(sender), _bucketSpaceRepo(bucketSpaceRepo), - _bucketOwnershipTransfer(distributorChanged(_prevClusterState, _newClusterState)), + _bucketOwnershipTransfer(false), _pendingTransitions() { logConstructionInformation(); - if (hasBucketOwnershipTransfer()) { - markAllAvailableNodesAsRequiringRequest(); - } else { - updateSetOfNodesThatAreOutdated(); - addAdditionalNodesToOutdatedSet(outdatedNodes); - } - initializeBucketSpaceTransitions(); + initializeBucketSpaceTransitions(false, outdatedNodesMap); } PendingClusterState::PendingClusterState( @@ -58,7 +51,6 @@ PendingClusterState::PendingClusterState( DistributorBucketSpaceRepo &bucketSpaceRepo, api::Timestamp creationTimestamp) : _requestedNodes(clusterInfo->getStorageNodeCount()), - _outdatedNodes(clusterInfo->getStorageNodeCount()), _prevClusterState(clusterInfo->getClusterState()), _newClusterState(clusterInfo->getClusterState()), _clock(clock), @@ -70,17 +62,26 @@ PendingClusterState::PendingClusterState( _pendingTransitions() { logConstructionInformation(); - markAllAvailableNodesAsRequiringRequest(); - initializeBucketSpaceTransitions(); + initializeBucketSpaceTransitions(true, OutdatedNodesMap()); } PendingClusterState::~PendingClusterState() {} void -PendingClusterState::initializeBucketSpaceTransitions() +PendingClusterState::initializeBucketSpaceTransitions(bool distributionChanged, const OutdatedNodesMap &outdatedNodesMap) { + OutdatedNodes emptyOutdatedNodes; for (auto &elem : _bucketSpaceRepo) { - _pendingTransitions.emplace(elem.first, std::make_unique<PendingBucketSpaceDbTransition>(*this, _clusterInfo, _newClusterState, _creationTimestamp)); + auto onItr = outdatedNodesMap.find(elem.first); + const auto &outdatedNodes = (onItr == outdatedNodesMap.end()) ? emptyOutdatedNodes : onItr->second; + auto pendingTransition = + std::make_unique<PendingBucketSpaceDbTransition> + (*this, *elem.second, distributionChanged, outdatedNodes, + _clusterInfo, _newClusterState, _creationTimestamp); + if (pendingTransition->getBucketOwnershipTransfer()) { + _bucketOwnershipTransfer = true; + } + _pendingTransitions.emplace(elem.first, std::move(pendingTransition)); } if (shouldRequestBucketInfo()) { requestNodes(); @@ -106,33 +107,14 @@ PendingClusterState::storageNodeUpInNewState(uint16_t node) const .getState().oneOf(_clusterInfo->getStorageUpStates()); } -void -PendingClusterState::markAllAvailableNodesAsRequiringRequest() -{ - const uint16_t nodeCount(newStateStorageNodeCount()); - for (uint16_t i = 0; i < nodeCount; ++i) { - if (storageNodeUpInNewState(i)) { - _outdatedNodes.insert(i); - } - } -} - -void -PendingClusterState::addAdditionalNodesToOutdatedSet( - const std::unordered_set<uint16_t>& nodes) +PendingClusterState::OutdatedNodesMap +PendingClusterState::getOutdatedNodesMap() const { - const uint16_t nodeCount(newStateStorageNodeCount()); - for (uint16_t node : nodes) { - if (node < nodeCount) { - _outdatedNodes.insert(node); - } + OutdatedNodesMap outdatedNodesMap; + for (const auto &elem : _pendingTransitions) { + outdatedNodesMap.emplace(elem.first, elem.second->getOutdatedNodes()); } -} - -std::unordered_set<uint16_t> -PendingClusterState::getOutdatedNodeSet() const -{ - return _outdatedNodes; + return outdatedNodesMap; } uint16_t @@ -170,47 +152,6 @@ PendingClusterState::iAmDown() const return myState.getState() == lib::State::DOWN; } -bool -PendingClusterState::storageNodeMayHaveLostData(uint16_t index) -{ - Node node(NodeType::STORAGE, index); - NodeState newState = _newClusterState.getNodeState(node); - NodeState oldState = _prevClusterState.getNodeState(node); - - return (newState.getStartTimestamp() > oldState.getStartTimestamp()); -} - -void -PendingClusterState::updateSetOfNodesThatAreOutdated() -{ - const uint16_t nodeCount(newStateStorageNodeCount()); - for (uint16_t index = 0; index < nodeCount; ++index) { - if (storageNodeMayHaveLostData(index) || storageNodeChanged(index)) { - _outdatedNodes.insert(index); - } - } -} - -bool -PendingClusterState::storageNodeChanged(uint16_t index) { - Node node(NodeType::STORAGE, index); - NodeState newState = _newClusterState.getNodeState(node); - NodeState oldNodeState = _prevClusterState.getNodeState(node); - - // similarTo() also covers disk states. - if (!(oldNodeState.similarTo(newState))) { - LOG(debug, - "State for storage node %d has changed from '%s' to '%s', " - "updating bucket information", - index, - oldNodeState.toString().c_str(), - newState.toString().c_str()); - return true; - } - - return false; -} - void PendingClusterState::requestNodes() { @@ -225,104 +166,16 @@ PendingClusterState::requestNodes() void PendingClusterState::requestBucketInfoFromStorageNodesWithChangedState() { - for (uint16_t idx : _outdatedNodes) { - if (storageNodeUpInNewState(idx)) { - for (auto &elem : _bucketSpaceRepo) { + for (auto &elem : _pendingTransitions) { + const OutdatedNodes &outdatedNodes(elem.second->getOutdatedNodes()); + for (uint16_t idx : outdatedNodes) { + if (storageNodeUpInNewState(idx)) { requestNode(BucketSpaceAndNode(elem.first, idx)); } } } } -bool -PendingClusterState::distributorChanged( - const lib::ClusterState& oldState, - const lib::ClusterState& newState) -{ - if (newState.getDistributionBitCount() != - oldState.getDistributionBitCount()) - { - return true; - } - - Node myNode(NodeType::DISTRIBUTOR, _sender.getDistributorIndex()); - if (oldState.getNodeState(myNode).getState() == - lib::State::DOWN) - { - return true; - } - - uint16_t oldCount = oldState.getNodeCount(NodeType::DISTRIBUTOR); - uint16_t newCount = newState.getNodeCount(NodeType::DISTRIBUTOR); - - uint16_t maxCount = std::max(oldCount, newCount); - - for (uint16_t i = 0; i < maxCount; ++i) { - Node node(NodeType::DISTRIBUTOR, i); - - const lib::State& old(oldState.getNodeState(node).getState()); - const lib::State& nw(newState.getNodeState(node).getState()); - - if (nodeWasUpButNowIsDown(old, nw)) { - if (nodeInSameGroupAsSelf(i) || - nodeNeedsOwnershipTransferFromGroupDown(i, newState)) { - return true; - } - } - } - - return false; -} - -bool -PendingClusterState::nodeWasUpButNowIsDown(const lib::State& old, - const lib::State& nw) const -{ - return (old.oneOf("uimr") && !nw.oneOf("uimr")); -} - -bool -PendingClusterState::nodeInSameGroupAsSelf(uint16_t index) const -{ - if (_clusterInfo->nodeInSameGroupAsSelf(index)) { - LOG(debug, - "Distributor %d state changed, need to request data from all " - "storage nodes", - index); - return true; - } else { - LOG(debug, - "Distributor %d state changed but unrelated to my group.", - index); - return false; - } -} - -bool -PendingClusterState::nodeNeedsOwnershipTransferFromGroupDown( - uint16_t nodeIndex, - const lib::ClusterState& state) const -{ - const lib::Distribution& dist(_clusterInfo->getDistribution()); - if (!dist.distributorAutoOwnershipTransferOnWholeGroupDown()) { - return false; // Not doing anything for downed groups. - } - const lib::Group* group(dist.getNodeGraph().getGroupForNode(nodeIndex)); - // If there is no group information associated with the node (because the - // group has changed or the node has been removed from config), we must - // also invoke ownership transfer of buckets. - if (group == nullptr - || lib::Distribution::allDistributorsDown(*group, state)) - { - LOG(debug, - "Distributor %u state changed and is in a " - "group that now has no distributors remaining", - nodeIndex); - return true; - } - return false; -} - void PendingClusterState::requestNode(BucketSpaceAndNode bucketSpaceAndNode) { @@ -420,8 +273,8 @@ PendingClusterState::requestNodesToString() const void PendingClusterState::mergeIntoBucketDatabases() { - for (auto &elem : _bucketSpaceRepo) { - _pendingTransitions[elem.first]->mergeInto(elem.second->getBucketDatabase()); + for (auto &elem : _pendingTransitions) { + elem.second->mergeIntoBucketDatabase(); } } diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.h b/storage/src/vespa/storage/distributor/pendingclusterstate.h index ac5c4dc35ea..7a23d48c9fd 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.h +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.h @@ -9,7 +9,7 @@ #include <vespa/storageframework/generic/clock/clock.h> #include <vespa/vdslib/distribution/distribution.h> #include <vespa/vespalib/util/xmlserializable.h> -#include <unordered_set> +#include "outdated_nodes_map.h" #include <unordered_map> #include <deque> @@ -25,6 +25,8 @@ class DistributorBucketSpaceRepo; */ class PendingClusterState : public vespalib::XmlSerializable { public: + using OutdatedNodes = dbtransition::OutdatedNodes; + using OutdatedNodesMap = dbtransition::OutdatedNodesMap; struct Summary { Summary(const std::string& prevClusterState, const std::string& newClusterState, uint32_t processingTime); Summary(const Summary &); @@ -44,12 +46,12 @@ public: DistributorMessageSender& sender, DistributorBucketSpaceRepo &bucketSpaceRepo, const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd, - const std::unordered_set<uint16_t>& outdatedNodes, + const OutdatedNodesMap &outdatedNodesMap, api::Timestamp creationTimestamp) { return std::unique_ptr<PendingClusterState>( new PendingClusterState(clock, clusterInfo, sender, bucketSpaceRepo, newStateCmd, - outdatedNodes, + outdatedNodesMap, creationTimestamp)); } @@ -122,7 +124,7 @@ public: * state was constructed for a distribution config change, this set will * be equal to the set of all available storage nodes. */ - std::unordered_set<uint16_t> getOutdatedNodeSet() const; + OutdatedNodesMap getOutdatedNodesMap() const; /** * Merges all the results with the corresponding bucket databases. @@ -131,11 +133,6 @@ public: // Get pending transition for a specific bucket space. Only used by unit test. PendingBucketSpaceDbTransition &getPendingBucketSpaceDbTransition(document::BucketSpace bucketSpace); - /** - * Returns true if this pending state was due to a distribution bit - * change rather than an actual state change. - */ - bool distributionChange() const { return _distributionChange; } void printXml(vespalib::XmlOutputStream&) const override; Summary getSummary() const; std::string requestNodesToString() const; @@ -151,7 +148,7 @@ private: DistributorMessageSender& sender, DistributorBucketSpaceRepo &bucketSpaceRepo, const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd, - const std::unordered_set<uint16_t>& outdatedNodes, + const OutdatedNodesMap &outdatedNodesMap, api::Timestamp creationTimestamp); /** @@ -176,15 +173,9 @@ private: } }; - void initializeBucketSpaceTransitions(); + void initializeBucketSpaceTransitions(bool distributionChanged, const OutdatedNodesMap &outdatedNodesMap); void logConstructionInformation() const; void requestNode(BucketSpaceAndNode bucketSpaceAndNode); - bool distributorChanged(const lib::ClusterState& oldState, const lib::ClusterState& newState); - bool storageNodeMayHaveLostData(uint16_t index); - bool storageNodeChanged(uint16_t index); - void markAllAvailableNodesAsRequiringRequest(); - void addAdditionalNodesToOutdatedSet(const std::unordered_set<uint16_t>& nodes); - void updateSetOfNodesThatAreOutdated(); void requestNodes(); void requestBucketInfoFromStorageNodesWithChangedState(); @@ -195,9 +186,6 @@ private: bool shouldRequestBucketInfo() const; bool clusterIsDown() const; bool iAmDown() const; - bool nodeInSameGroupAsSelf(uint16_t index) const; - bool nodeNeedsOwnershipTransferFromGroupDown(uint16_t nodeIndex, const lib::ClusterState& state) const; - bool nodeWasUpButNowIsDown(const lib::State& old, const lib::State& nw) const; bool storageNodeUpInNewState(uint16_t node) const; @@ -207,13 +195,6 @@ private: std::vector<bool> _requestedNodes; std::deque<std::pair<framework::MilliSecTime, BucketSpaceAndNode> > _delayedRequests; - // Set for all nodes that may have changed state since that previous - // active cluster state, or that were marked as outdated when the pending - // cluster state was constructed. - // May be a superset of _requestedNodes, as some nodes that are outdated - // may be down and thus cannot get a request. - std::unordered_set<uint16_t> _outdatedNodes; - lib::ClusterState _prevClusterState; lib::ClusterState _newClusterState; @@ -224,7 +205,6 @@ private: DistributorMessageSender& _sender; DistributorBucketSpaceRepo &_bucketSpaceRepo; - bool _distributionChange; bool _bucketOwnershipTransfer; std::unordered_map<document::BucketSpace, std::unique_ptr<PendingBucketSpaceDbTransition>, document::BucketSpace::hash> _pendingTransitions; }; |