diff options
author | toby <smorgrav@yahoo-inc.com> | 2017-10-18 14:28:19 +0200 |
---|---|---|
committer | toby <smorgrav@yahoo-inc.com> | 2017-10-18 14:28:19 +0200 |
commit | 79d1aa480db70a00ef74c19f582db2a4d220a96a (patch) | |
tree | 75b6e4c15048cd6282bc862d6f2ab7ed247f2f97 | |
parent | 8f417af81fe1d5c011636eb9eeedf5a3c68775cf (diff) | |
parent | 9de8d3d524f8856f9bed0efc6294ace9dd3e6c08 (diff) |
Merge branch 'master' into smorgrav/cost_npe
56 files changed, 278 insertions, 1309 deletions
diff --git a/config-application-package/src/main/java/com/yahoo/config/model/application/provider/Bundle.java b/config-application-package/src/main/java/com/yahoo/config/model/application/provider/Bundle.java index 213024b95d0..3eb1c89db3e 100644 --- a/config-application-package/src/main/java/com/yahoo/config/model/application/provider/Bundle.java +++ b/config-application-package/src/main/java/com/yahoo/config/model/application/provider/Bundle.java @@ -132,7 +132,7 @@ public class Bundle { defName = nameAndNamespace.first; defNamespace = getNamespace(); if (defNamespace.isEmpty()) - throw new IllegalArgumentException("Config definition '" + defName + "' is missing a namespace"); + throw new IllegalArgumentException("Config definition '" + defName + "' is missing a package (or namespace)"); contents = getContents(); } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ZKTenantApplications.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ZKTenantApplications.java index 9e3df5af715..574bac792dd 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ZKTenantApplications.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ZKTenantApplications.java @@ -164,10 +164,13 @@ public class ZKTenantApplications implements TenantApplications, PathChildrenCac case CHILD_REMOVED: applicationRemoved(ApplicationId.fromSerializedForm(Path.fromString(event.getData().getPath()).getName())); break; + case CHILD_UPDATED: + // do nothing, application just got redeployed + break; default: // We don't know if applications have been added or removed so possibly need to remove some of them // (new applications are not added here) - removeApplications(); + removeApplications(event.getType()); break; } } @@ -181,9 +184,9 @@ public class ZKTenantApplications implements TenantApplications, PathChildrenCac log.log(LogLevel.DEBUG, Tenants.logPre(applicationId) + "Application added: " + applicationId); } - private void removeApplications() { + private void removeApplications(PathChildrenCacheEvent.Type eventType) { ImmutableSet<ApplicationId> allApplications = ImmutableSet.copyOf(listApplications()); - log.log(Level.INFO, "We probably lost events, need to check if applications have been removed, " + + log.log(Level.INFO, "Got " + eventType + " event, need to check if applications have been removed, " + " found these active applications: " + allApplications); reloadHandler.removeApplicationsExcept(allApplications); } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/zookeeper/ZKApplicationPackage.java b/configserver/src/main/java/com/yahoo/vespa/config/server/zookeeper/ZKApplicationPackage.java index 05c301ddba8..e4933dc84c4 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/zookeeper/ZKApplicationPackage.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/zookeeper/ZKApplicationPackage.java @@ -2,8 +2,6 @@ package com.yahoo.vespa.config.server.zookeeper; import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableSet; -import com.yahoo.component.Version; import com.yahoo.config.application.api.ApplicationMetaData; import com.yahoo.config.application.api.ComponentInfo; import com.yahoo.config.application.api.FileRegistry; @@ -12,7 +10,6 @@ import com.yahoo.config.codegen.DefParser; import com.yahoo.config.application.api.ApplicationFile; import com.yahoo.config.application.api.ApplicationPackage; import com.yahoo.config.model.application.provider.*; -import com.yahoo.config.provision.HostSpec; import com.yahoo.config.provision.NodeFlavors; import com.yahoo.config.provision.AllocatedHosts; import com.yahoo.io.IOUtils; @@ -61,37 +58,7 @@ public class ZKApplicationPackage implements ApplicationPackage { private Optional<AllocatedHosts> importAllocatedHosts(String allocatedHostsPath, Optional<NodeFlavors> nodeFlavors) { if ( ! liveApp.exists(allocatedHostsPath)) return Optional.empty(); - Optional<AllocatedHosts> allocatedHosts = readAllocatedHosts(allocatedHostsPath, nodeFlavors); - if ( ! allocatedHosts.isPresent()) { // Read from legacy location. TODO: Remove when 6.143 is in production everywhere - List<String> allocatedHostsByVersionNodes = liveApp.getChildren(allocatedHostsPath); - allocatedHosts = merge(readAllocatedHostsByVersion(allocatedHostsByVersionNodes, nodeFlavors)); - } - return allocatedHosts; - } - - private Map<Version, AllocatedHosts> readAllocatedHostsByVersion(List<String> allocatedHostsByVersionNodes, - Optional<NodeFlavors> nodeFlavors) { - Map<Version, AllocatedHosts> allocatedHostsByVersion = new HashMap<>(); - allocatedHostsByVersionNodes.stream() - .forEach(versionStr -> { - Version version = Version.fromString(versionStr); - Optional<AllocatedHosts> allocatedHosts = readAllocatedHosts(Joiner.on("/").join(allocatedHostsNode, versionStr), - nodeFlavors); - allocatedHosts.ifPresent(info -> allocatedHostsByVersion.put(version, info)); - }); - return allocatedHostsByVersion; - } - - private Optional<AllocatedHosts> merge(Map<Version, AllocatedHosts> allocatedHostsByVersion) { - // Merge the allocated hosts in any order. This is wrong but preserves current behavior (modulo order differences) - if (allocatedHostsByVersion.isEmpty()) return Optional.empty(); - - Map<String, HostSpec> merged = new HashMap<>(); - for (Map.Entry<Version, AllocatedHosts> entry : allocatedHostsByVersion.entrySet()) { - for (HostSpec host : entry.getValue().getHosts()) - merged.put(host.hostname(), host); - } - return Optional.of(AllocatedHosts.withHosts(ImmutableSet.copyOf(merged.values()))); + return Optional.of(readAllocatedHosts(allocatedHostsPath, nodeFlavors)); } /** @@ -99,11 +66,9 @@ public class ZKApplicationPackage implements ApplicationPackage { * * @return the allocated hosts at this node or empty if there is no data at this path */ - private Optional<AllocatedHosts> readAllocatedHosts(String allocatedHostsPath, Optional<NodeFlavors> nodeFlavors) { + private AllocatedHosts readAllocatedHosts(String allocatedHostsPath, Optional<NodeFlavors> nodeFlavors) { try { - byte[] data = liveApp.getBytes(allocatedHostsPath); - if (data.length == 0) return Optional.empty(); // TODO: Remove this line (and make return non-optional) when 6.143 is in production everywhere - return Optional.of(AllocatedHosts.fromJson(data, nodeFlavors)); + return AllocatedHosts.fromJson(liveApp.getBytes(allocatedHostsPath), nodeFlavors); } catch (Exception e) { throw new RuntimeException("Unable to read allocated hosts", e); } diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/zookeeper/ZKApplicationPackageTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/zookeeper/ZKApplicationPackageTest.java index adf26dbfa32..07430a66c89 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/zookeeper/ZKApplicationPackageTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/zookeeper/ZKApplicationPackageTest.java @@ -77,7 +77,7 @@ public class ZKApplicationPackageTest extends TestWithCurator { String metaData = "{\"deploy\":{\"user\":\"foo\",\"from\":\"bar\",\"timestamp\":1},\"application\":{\"name\":\"foo\",\"checksum\":\"abc\",\"generation\":4,\"previousActiveGeneration\":3}}"; zk.putData("/0", ConfigCurator.META_ZK_PATH, metaData); zk.putData("/0/" + ZKApplicationPackage.fileRegistryNode + "/3.0.0", "dummyfiles"); - zk.putData("/0/" + ZKApplicationPackage.allocatedHostsNode + "/3.0.0", ALLOCATED_HOSTS.toJson()); + zk.putData("/0/" + ZKApplicationPackage.allocatedHostsNode, ALLOCATED_HOSTS.toJson()); } private static class MockNodeFlavors extends NodeFlavors{ diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/TenantController.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/TenantController.java index 4f61c811a9b..a4bddf86cbb 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/TenantController.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/TenantController.java @@ -140,7 +140,7 @@ public class TenantController { if (updatedTenant.isAthensTenant() && ! token.isPresent()) throw new IllegalArgumentException("Could not update " + updatedTenant + ": No NToken provided"); - updateAthensDomain(updatedTenant, token); + updateAthenzDomain(updatedTenant, token); db.updateTenant(updatedTenant); log.info("Updated " + updatedTenant); } catch (PersistenceException e) { @@ -148,7 +148,7 @@ public class TenantController { } } - private void updateAthensDomain(Tenant updatedTenant, Optional<NToken> token) { + private void updateAthenzDomain(Tenant updatedTenant, Optional<NToken> token) { Tenant existingTenant = tenant(updatedTenant.getId()).get(); if ( ! existingTenant.isAthensTenant()) return; @@ -192,7 +192,7 @@ public class TenantController { } } - public Tenant migrateTenantToAthens(TenantId tenantId, + public Tenant migrateTenantToAthenz(TenantId tenantId, AthenzDomain tenantDomain, PropertyId propertyId, Property property, diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/athenz/impl/ZmsClientImpl.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/athenz/impl/ZmsClientImpl.java index cf2f7c798c6..110e06b767c 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/athenz/impl/ZmsClientImpl.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/athenz/impl/ZmsClientImpl.java @@ -121,7 +121,7 @@ public class ZmsClientImpl implements ZmsClient { DomainList domainList = zmsClient.getDomainList( /*limit*/null, /*skip*/null, prefix, /*depth*/null, /*domain*/null, /*productId*/ null, /*modifiedSince*/null); - return toAthensDomains(domainList.getNames()); + return toAthenzDomains(domainList.getNames()); }); } @@ -139,7 +139,7 @@ public class ZmsClientImpl implements ZmsClient { log("getServiceIdentity(domain=%s, service=%s)", service.getDomain().id(), service.getServiceName()); return getOrThrow(() -> { ServiceIdentity serviceIdentity = zmsClient.getServiceIdentity(service.getDomain().id(), service.getServiceName()); - return toAthensPublicKeys(serviceIdentity.getPublicKeys()); + return toAthenzPublicKeys(serviceIdentity.getPublicKeys()); }); } @@ -153,11 +153,11 @@ public class ZmsClientImpl implements ZmsClient { .collect(toList()); } - private static List<AthenzDomain> toAthensDomains(List<String> domains) { + private static List<AthenzDomain> toAthenzDomains(List<String> domains) { return domains.stream().map(AthenzDomain::new).collect(toList()); } - private static List<AthenzPublicKey> toAthensPublicKeys(List<PublicKeyEntry> publicKeys) { + private static List<AthenzPublicKey> toAthenzPublicKeys(List<PublicKeyEntry> publicKeys) { return publicKeys.stream() .map(entry -> fromYbase64EncodedKey(entry.getKey(), entry.getId())) .collect(toList()); @@ -192,7 +192,7 @@ public class ZmsClientImpl implements ZmsClient { } private static void logWarning(ZMSClientException e) { - log.warning("Error from Athens: " + e.getMessage()); + log.warning("Error from Athenz: " + e.getMessage()); } private String resourceStringPrefix(AthenzDomain tenantDomain) { diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ClusterInfoMaintainer.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ClusterInfoMaintainer.java index c807a7f0586..baf60b612b0 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ClusterInfoMaintainer.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ClusterInfoMaintainer.java @@ -18,6 +18,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -30,6 +31,8 @@ import java.util.stream.Collectors; */ public class ClusterInfoMaintainer extends Maintainer { + private static final Logger log = Logger.getLogger(ClusterInfoMaintainer.class.getName()); + private final Controller controller; ClusterInfoMaintainer(Controller controller, Duration duration, JobControl jobControl) { @@ -53,7 +56,7 @@ public class ClusterInfoMaintainer extends Maintainer { for (String id : clusters.keySet()) { List<NodeList.Node> clusterNodes = clusters.get(id); - //Assume they are all equal and use first node as a representatitve for the cluster + // Assume they are all equal and use first node as a representative for the cluster NodeList.Node node = clusterNodes.get(0); // Extract flavor info @@ -73,7 +76,7 @@ public class ClusterInfoMaintainer extends Maintainer { // Add to map List<String> hostnames = clusterNodes.stream().map(node1 -> node1.hostname).collect(Collectors.toList()); ClusterInfo inf = new ClusterInfo(node.flavor, node.cost, cpu, mem, disk, - ClusterSpec.Type.from(node.membership.clusterType), hostnames); + ClusterSpec.Type.from(node.membership.clusterType), hostnames); infoMap.put(new ClusterSpec.Id(id), inf); } @@ -82,7 +85,6 @@ public class ClusterInfoMaintainer extends Maintainer { @Override protected void maintain() { - for (Application application : controller().applications().asList()) { try (Lock lock = controller().applications().lock(application.id())) { for (Deployment deployment : application.deployments().values()) { @@ -92,11 +94,13 @@ public class ClusterInfoMaintainer extends Maintainer { Map<ClusterSpec.Id, ClusterInfo> clusterInfo = getClusterInfo(nodes, deployment.zone()); Application app = application.with(deployment.withClusterInfo(clusterInfo)); controller.applications().store(app, lock); - } catch (IOException ioe) { - Logger.getLogger(ClusterInfoMaintainer.class.getName()).fine(ioe.getMessage()); + } + catch (IOException | IllegalArgumentException e) { + log.log(Level.WARNING, "Failing getting cluster info of for " + deploymentId, e); } } } } } + } diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java index 99530557981..c50f1464be7 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java @@ -160,7 +160,7 @@ public class ApplicationApiHandler extends LoggingRequestHandler { if (path.matches("/application/v4/user")) return authenticatedUser(request); if (path.matches("/application/v4/tenant")) return tenants(request); if (path.matches("/application/v4/tenant-pipeline")) return tenantPipelines(); - if (path.matches("/application/v4/athensDomain")) return athensDomains(request); + if (path.matches("/application/v4/athensDomain")) return athenzDomains(request); if (path.matches("/application/v4/property")) return properties(); if (path.matches("/application/v4/cookiefreshness")) return cookieFreshness(request); if (path.matches("/application/v4/tenant/{tenant}")) return tenant(path.get("tenant"), request); @@ -269,12 +269,12 @@ public class ApplicationApiHandler extends LoggingRequestHandler { return new SlimeJsonResponse(slime); } - private HttpResponse athensDomains(HttpRequest request) { + private HttpResponse athenzDomains(HttpRequest request) { Slime slime = new Slime(); Cursor response = slime.setObject(); Cursor array = response.setArray("data"); - for (AthenzDomain athensDomain : controller.getDomainList(request.getProperty("prefix"))) { - array.addString(athensDomain.id()); + for (AthenzDomain athenzDomain : controller.getDomainList(request.getProperty("prefix"))) { + array.addString(athenzDomain.id()); } return new SlimeJsonResponse(slime); } @@ -638,7 +638,7 @@ public class ApplicationApiHandler extends LoggingRequestHandler { if (tenant.isOpsDbTenant()) throwIfNotSuperUserOrPartOfOpsDbGroup(new UserGroup(mandatory("userGroup", requestData).asString()), request); if (tenant.isAthensTenant()) - throwIfNotAthensDomainAdmin(new AthenzDomain(mandatory("athensDomain", requestData).asString()), request); + throwIfNotAthenzDomainAdmin(new AthenzDomain(mandatory("athensDomain", requestData).asString()), request); controller.tenants().addTenant(tenant, authorizer.getNToken(request)); return new SlimeJsonResponse(toSlime(tenant, request, true)); @@ -652,11 +652,11 @@ public class ApplicationApiHandler extends LoggingRequestHandler { PropertyId propertyId = new PropertyId(mandatory("propertyId", requestData).asString()); authorizer.throwIfUnauthorized(tenantid, request); - throwIfNotAthensDomainAdmin(tenantDomain, request); + throwIfNotAthenzDomainAdmin(tenantDomain, request); NToken nToken = authorizer.getNToken(request) .orElseThrow(() -> new BadRequestException("The NToken for a domain admin is required to migrate tenant to Athens")); - Tenant tenant = controller.tenants().migrateTenantToAthens(tenantid, tenantDomain, propertyId, property, nToken); + Tenant tenant = controller.tenants().migrateTenantToAthenz(tenantid, tenantDomain, propertyId, property, nToken); return new SlimeJsonResponse(toSlime(tenant, request, true)); } @@ -769,6 +769,9 @@ public class ApplicationApiHandler extends LoggingRequestHandler { tenant, applicationId); } else { // In case of host-based principal + // TODO What about other user type principals like Bouncer? + log.log(LogLevel.WARNING, + "Using deprecated DeployAuthorizer.throwIfUnauthorizedForDeploy. Principal=" + principal); UserId userId = new UserId(principal.getName()); deployAuthorizer.throwIfUnauthorizedForDeploy( Environment.from(environment), @@ -959,11 +962,11 @@ public class ApplicationApiHandler extends LoggingRequestHandler { } } - private void throwIfNotAthensDomainAdmin(AthenzDomain tenantDomain, HttpRequest request) { + private void throwIfNotAthenzDomainAdmin(AthenzDomain tenantDomain, HttpRequest request) { UserId userId = authorizer.getUserId(request); - if ( ! authorizer.isAthensDomainAdmin(userId, tenantDomain)) { + if ( ! authorizer.isAthenzDomainAdmin(userId, tenantDomain)) { throw new ForbiddenException( - String.format("The user '%s' is not admin in Athens domain '%s'", userId.id(), tenantDomain.id())); + String.format("The user '%s' is not admin in Athenz domain '%s'", userId.id(), tenantDomain.id())); } } diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/Authorizer.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/Authorizer.java index cbd39b201c1..93dc2541385 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/Authorizer.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/Authorizer.java @@ -92,7 +92,7 @@ public class Authorizer { } public boolean isSuperUser(HttpRequest request) { - // TODO Check membership of admin role in Vespa's Athens domain + // TODO Check membership of admin role in Vespa's Athenz domain return isMemberOfVespaBouncerGroup(request) || isScrewdriverPrincipal(getPrincipal(request)); } @@ -114,7 +114,7 @@ public class Authorizer { private boolean isTenantAdmin(UserId userId, Tenant tenant) { switch (tenant.tenantType()) { case ATHENS: - return isAthensTenantAdmin(userId, tenant.getAthensDomain().get()); + return isAthenzTenantAdmin(userId, tenant.getAthensDomain().get()); case OPSDB: return isGroupMember(userId, tenant.getUserGroup().get()); case USER: @@ -123,12 +123,12 @@ public class Authorizer { throw new IllegalArgumentException("Unknown tenant type: " + tenant.tenantType()); } - private boolean isAthensTenantAdmin(UserId userId, AthenzDomain tenantDomain) { + private boolean isAthenzTenantAdmin(UserId userId, AthenzDomain tenantDomain) { return athenzClientFactory.createZmsClientWithServicePrincipal() .hasTenantAdminAccess(AthenzUtils.createPrincipal(userId), tenantDomain); } - public boolean isAthensDomainAdmin(UserId userId, AthenzDomain tenantDomain) { + public boolean isAthenzDomainAdmin(UserId userId, AthenzDomain tenantDomain) { return athenzClientFactory.createZmsClientWithServicePrincipal() .isDomainAdmin(AthenzUtils.createPrincipal(userId), tenantDomain); } diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/DeployAuthorizer.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/DeployAuthorizer.java index 209f17464a7..7cf19629774 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/DeployAuthorizer.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/DeployAuthorizer.java @@ -43,12 +43,12 @@ public class DeployAuthorizer { Environment environment, Tenant tenant, ApplicationId applicationId) { - if (athensCredentialsRequired(environment, tenant, applicationId, principal)) - checkAthensCredentials(principal, tenant, applicationId); + if (athenzCredentialsRequired(environment, tenant, applicationId, principal)) + checkAthenzCredentials(principal, tenant, applicationId); } // TODO: inline when deployment via ssh is removed - private boolean athensCredentialsRequired(Environment environment, Tenant tenant, ApplicationId applicationId, Principal principal) { + private boolean athenzCredentialsRequired(Environment environment, Tenant tenant, ApplicationId applicationId, Principal principal) { if (!environmentRequiresAuthorization(environment)) return false; if (! isScrewdriverPrincipal(principal)) @@ -61,13 +61,13 @@ public class DeployAuthorizer { // TODO: inline when deployment via ssh is removed - private void checkAthensCredentials(Principal principal, Tenant tenant, ApplicationId applicationId) { + private void checkAthenzCredentials(Principal principal, Tenant tenant, ApplicationId applicationId) { AthenzDomain domain = tenant.getAthensDomain().get(); if (! (principal instanceof AthenzPrincipal)) throw loggedForbiddenException("Principal '%s' is not authenticated.", principal.getName()); AthenzPrincipal athensPrincipal = (AthenzPrincipal)principal; - if ( ! hasDeployAccessToAthensApplication(athensPrincipal, domain, applicationId)) + if ( ! hasDeployAccessToAthenzApplication(athensPrincipal, domain, applicationId)) throw loggedForbiddenException( "Screwdriver principal '%1$s' does not have deploy access to '%2$s'. " + "Either the application has not been created at " + zoneRegistry.getDashboardUri() + " or " + @@ -90,18 +90,17 @@ public class DeployAuthorizer { Tenant tenant, ApplicationId applicationId, Optional<ScrewdriverId> optionalScrewdriverId) { - Principal principal = new UnauthenticatedUserPrincipal(userId.id()); - if (athensCredentialsRequired(environment, tenant, applicationId, principal)) { + if (athenzCredentialsRequired(environment, tenant, applicationId, principal)) { ScrewdriverId screwdriverId = optionalScrewdriverId.orElseThrow( () -> loggedForbiddenException("Screwdriver id must be provided when deploying from Screwdriver.")); principal = AthenzUtils.createPrincipal(screwdriverId); - checkAthensCredentials(principal, tenant, applicationId); + checkAthenzCredentials(principal, tenant, applicationId); } } - private boolean hasDeployAccessToAthensApplication(AthenzPrincipal principal, AthenzDomain domain, ApplicationId applicationId) { + private boolean hasDeployAccessToAthenzApplication(AthenzPrincipal principal, AthenzDomain domain, ApplicationId applicationId) { try { return athenzClientFactory.createZmsClientWithServicePrincipal() .hasApplicationAccess( @@ -111,7 +110,7 @@ public class DeployAuthorizer { new com.yahoo.vespa.hosted.controller.api.identifiers.ApplicationId(applicationId.application().value())); } catch (ZmsException e) { throw loggedForbiddenException( - "Failed to authorize deployment through Athens. If this problem persists, " + + "Failed to authorize deployment through Athenz. If this problem persists, " + "please create ticket at yo/vespa-support. (" + e.getMessage() + ")"); } } diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java index d39f72ec1b8..e36645175a7 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java @@ -381,7 +381,7 @@ public class ControllerTest { // Migrate tenant to Athens NToken nToken = TestIdentities.userNToken; - tester.controller().tenants().migrateTenantToAthens( + tester.controller().tenants().migrateTenantToAthenz( tenantId, athensDomain, new PropertyId("1567"), new Property("vespa_dev.no"), nToken); // Verify that tenant is migrated diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiTest.java index ef8a3809b25..1ac5dfeb58a 100644 --- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiTest.java +++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiTest.java @@ -529,7 +529,7 @@ public class ApplicationApiTest extends ControllerContainerTest { "{\"athensDomain\":\"domain1\", \"property\":\"property1\"}", Request.Method.POST, "domain1", unauthorizedUser), - "{\"error-code\":\"FORBIDDEN\",\"message\":\"The user 'othertenant' is not admin in Athens domain 'domain1'\"}", + "{\"error-code\":\"FORBIDDEN\",\"message\":\"The user 'othertenant' is not admin in Athenz domain 'domain1'\"}", 403); // (Create it with the right tenant id) diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java index c401fc16a2f..e1e501d3e1b 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java @@ -291,8 +291,6 @@ public class DocumentProtocol implements Protocol { putRoutingPolicyFactory("MessageType", new RoutingPolicyFactories.MessageTypePolicyFactory(cfg)); putRoutingPolicyFactory("RoundRobin", new RoutingPolicyFactories.RoundRobinPolicyFactory()); putRoutingPolicyFactory("LoadBalancer", new RoutingPolicyFactories.LoadBalancerPolicyFactory()); - putRoutingPolicyFactory("SearchColumn", new RoutingPolicyFactories.SearchColumnPolicyFactory()); - putRoutingPolicyFactory("SearchRow", new RoutingPolicyFactories.SearchRowPolicyFactory()); putRoutingPolicyFactory("Storage", new RoutingPolicyFactories.StoragePolicyFactory()); putRoutingPolicyFactory("SubsetService", new RoutingPolicyFactories.SubsetServicePolicyFactory()); diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java index acd73a21a8a..6c13b7468c7 100755 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java @@ -116,26 +116,6 @@ public abstract class RoutingPolicyFactories { } } - static class SearchColumnPolicyFactory implements RoutingPolicyFactory { - public DocumentProtocolRoutingPolicy createPolicy(String param) { - return new SearchColumnPolicy(param); - } - - - public void destroy() { - } - } - - static class SearchRowPolicyFactory implements RoutingPolicyFactory { - public DocumentProtocolRoutingPolicy createPolicy(String param) { - return new SearchRowPolicy(param); - } - - - public void destroy() { - } - } - static class SubsetServicePolicyFactory implements RoutingPolicyFactory { public DocumentProtocolRoutingPolicy createPolicy(String param) { return new SubsetServicePolicy(param); diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SearchColumnPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SearchColumnPolicy.java deleted file mode 100644 index aabb6407d14..00000000000 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SearchColumnPolicy.java +++ /dev/null @@ -1,183 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.protocol; - -import com.yahoo.document.BucketId; -import com.yahoo.document.BucketIdFactory; -import com.yahoo.document.DocumentId; -import com.yahoo.log.LogLevel; -import com.yahoo.messagebus.EmptyReply; -import com.yahoo.messagebus.ErrorCode; -import com.yahoo.messagebus.Message; -import com.yahoo.messagebus.Reply; -import com.yahoo.messagebus.metrics.MetricSet; -import com.yahoo.messagebus.routing.Route; -import com.yahoo.messagebus.routing.RoutingContext; -import com.yahoo.messagebus.routing.RoutingNodeIterator; -import com.yahoo.vdslib.BucketDistribution; - -import java.util.*; -import java.util.logging.Logger; - -/** - * <p>This policy implements the logic to select recipients for a single search column. It has 2 different modes of - * operation;</p> - * - * <ol> - * <li>If the "maxbadparts" parameter is 0, select recipient based on document id hash and use - * shared merge logic. Do not allow any out-of-service replies.</li> - * <li>Else do best-effort validation of system - * state. This means; - * <ol> - * <li>if the message is sending to all recipients (typicall start- and - * end-of-feed), allow at most "maxbadparts" out-of-service replies,</li> - * <li>else always allow out-of-service reply by masking it with an empty - * reply.</li> - * </ol> - * </li> - * </ol> - * <p>For systems that allow bad parts, one will not know whether or not feeding - * was a success until the RTX attempts to set the new index live, because it is - * only the RTX that is now able to verify that the service level requirements - * are met. Feeding will still break if a message that was supposed to be sent - * to all recipients receives more than "maxbadparts" out-of-service replies, - * according to (2.a) above.</p> - * - * @author Simon Thoresen - */ -public class SearchColumnPolicy implements DocumentProtocolRoutingPolicy { - - private static Logger log = Logger.getLogger(SearchColumnPolicy.class.getName()); - private BucketIdFactory factory = new BucketIdFactory(); - private Map<Integer, BucketDistribution> distributions = new HashMap<Integer, BucketDistribution>(); - private int maxOOS = 0; // The maximum OUT_OF_SERVICE replies to hide. - - public static final int DEFAULT_NUM_BUCKET_BITS = 16; - - /** - * Constructs a new policy object for the given parameter string. The string can be null or empty, which is a - * request to not allow any bad columns. - * - * @param param The maximum number of allowed bad columns. - */ - public SearchColumnPolicy(String param) { - if (param != null && param.length() > 0) { - try { - maxOOS = Integer.parseInt(param); - } catch (NumberFormatException e) { - log.log(LogLevel.WARNING, "Parameter '" + param + "' could not be parsed as an integer.", e); - } - if (maxOOS < 0) { - log.log(LogLevel.WARNING, "Ignoring a request to set the maximum number of OOS replies to " + maxOOS + - " because it makes no sense. This routing policy will not allow any recipient" + - " to be out of service."); - } - } - } - - @Override - public void select(RoutingContext context) { - List<Route> recipients = context.getMatchedRecipients(); - if (recipients == null || recipients.size() == 0) { - return; - } - DocumentId id = null; - BucketId bucketId = null; - Message msg = context.getMessage(); - switch (msg.getType()) { - - case DocumentProtocol.MESSAGE_PUTDOCUMENT: - id = ((PutDocumentMessage)msg).getDocumentPut().getDocument().getId(); - break; - - case DocumentProtocol.MESSAGE_GETDOCUMENT: - id = ((GetDocumentMessage)msg).getDocumentId(); - break; - - case DocumentProtocol.MESSAGE_REMOVEDOCUMENT: - id = ((RemoveDocumentMessage)msg).getDocumentId(); - break; - - case DocumentProtocol.MESSAGE_UPDATEDOCUMENT: - id = ((UpdateDocumentMessage)msg).getDocumentUpdate().getId(); - break; - - case DocumentProtocol.MESSAGE_BATCHDOCUMENTUPDATE: - bucketId = ((BatchDocumentUpdateMessage)msg).getBucketId(); - break; - - case DocumentProtocol.MESSAGE_GETBUCKETSTATE: - bucketId = ((GetBucketStateMessage)msg).getBucketId(); - break; - - default: - throw new UnsupportedOperationException("Message type '" + msg.getType() + "' not supported."); - } - if (bucketId == null && id != null) { - bucketId = factory.getBucketId(id); - } - int recipient = getRecipient(bucketId, recipients.size()); - context.addChild(recipients.get(recipient)); - context.setSelectOnRetry(true); - if (maxOOS > 0) { - context.addConsumableError(ErrorCode.SERVICE_OOS); - } - } - - @Override - public void merge(RoutingContext context) { - if (maxOOS > 0) { - if (context.getNumChildren() > 1) { - Set<Integer> oosReplies = new HashSet<Integer>(); - int idx = 0; - for (RoutingNodeIterator it = context.getChildIterator(); - it.isValid(); it.next()) - { - Reply ref = it.getReplyRef(); - if (ref.hasErrors() && DocumentProtocol.hasOnlyErrorsOfType(ref, ErrorCode.SERVICE_OOS)) { - oosReplies.add(idx); - } - ++idx; - } - if (oosReplies.size() <= maxOOS) { - DocumentProtocol.merge(context, oosReplies); - return; // may the rtx be with you - } - } else { - Reply ref = context.getChildIterator().getReplyRef(); - if (ref.hasErrors() && DocumentProtocol.hasOnlyErrorsOfType(ref, ErrorCode.SERVICE_OOS)) { - context.setReply(new EmptyReply()); - return; // god help us all - } - } - } - DocumentProtocol.merge(context); - } - - /** - * Returns the recipient index for the given bucket id. This updates the shared internal distribution map, so it - * needs to be synchronized. - * - * @param bucketId The bucket whose recipient to return. - * @param numRecipients The number of recipients being distributed to. - * @return The recipient to use. - */ - private synchronized int getRecipient(BucketId bucketId, int numRecipients) { - BucketDistribution distribution = distributions.get(numRecipients); - if (distribution == null) { - distribution = new BucketDistribution(1, DEFAULT_NUM_BUCKET_BITS); - distribution.setNumColumns(numRecipients); - distributions.put(numRecipients, distribution); - } - return distribution.getColumn(bucketId); - } - - @Override - public void destroy() { - // empty - } - - @Override - public MetricSet getMetrics() { - return null; - } -} diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SearchRowPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SearchRowPolicy.java deleted file mode 100755 index 4d94107eae7..00000000000 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SearchRowPolicy.java +++ /dev/null @@ -1,85 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.documentapi.messagebus.protocol; - -import com.yahoo.log.LogLevel; -import com.yahoo.messagebus.ErrorCode; -import com.yahoo.messagebus.Reply; -import com.yahoo.messagebus.metrics.MetricSet; -import com.yahoo.messagebus.routing.RoutingContext; -import com.yahoo.messagebus.routing.RoutingNodeIterator; - -import java.util.HashSet; -import java.util.Set; -import java.util.logging.Logger; - -/** - * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> - */ -public class SearchRowPolicy implements DocumentProtocolRoutingPolicy { - - private static Logger log = Logger.getLogger(SearchRowPolicy.class.getName()); - private int minOk = 0; // Hide OUT_OF_SERVICE as long as this number of replies are something else. - - /** - * Creates a search row policy that wraps the underlying search group policy in case the parameter is something - * other than an empty string. - * - * @param param The number of minimum non-OOS replies that this policy requires. - */ - public SearchRowPolicy(String param) { - if (param != null && param.length() > 0) { - try { - minOk = Integer.parseInt(param); - } - catch (NumberFormatException e) { - log.log(LogLevel.WARNING, "Parameter '" + param + "' could not be parsed as an integer.", e); - } - if (minOk <= 0) { - log.log(LogLevel.WARNING, "Ignoring a request to set the minimum number of OK replies to " + minOk + " " + - "because it makes no sense. This routing policy will not allow any recipient " + - "to be out of service."); - } - } - } - - @Override - public void select(RoutingContext context) { - context.addChildren(context.getMatchedRecipients()); - context.setSelectOnRetry(false); - if (minOk > 0) { - context.addConsumableError(ErrorCode.SERVICE_OOS); - } - } - - @Override - public void merge(RoutingContext context) { - if (minOk > 0) { - Set<Integer> oosReplies = new HashSet<Integer>(); - int idx = 0; - for (RoutingNodeIterator it = context.getChildIterator(); - it.isValid(); it.next()) - { - Reply ref = it.getReplyRef(); - if (ref.hasErrors() && DocumentProtocol.hasOnlyErrorsOfType(ref, ErrorCode.SERVICE_OOS)) { - oosReplies.add(idx); - } - ++idx; - } - if (context.getNumChildren() - oosReplies.size() >= minOk) { - DocumentProtocol.merge(context, oosReplies); - return; - } - } - DocumentProtocol.merge(context); - } - - @Override - public void destroy() { - // empty - } - - @Override - public MetricSet getMetrics() { - return null; - } -} diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/PolicyTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/PolicyTestCase.java index ff237d46b90..fba2edf5cfd 100755 --- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/PolicyTestCase.java +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/PolicyTestCase.java @@ -12,8 +12,6 @@ import com.yahoo.messagebus.Error; import com.yahoo.messagebus.network.rpc.test.TestServer; import com.yahoo.messagebus.routing.*; import com.yahoo.messagebus.test.Receptor; -import com.yahoo.vdslib.DocumentList; -import com.yahoo.vdslib.Entry; import org.junit.Before; import org.junit.Test; @@ -60,12 +58,6 @@ public class PolicyTestCase { policy = new DocumentProtocol(manager).createPolicy("RoundRobin", null); assertTrue(policy instanceof RoundRobinPolicy); - policy = new DocumentProtocol(manager).createPolicy("SearchRow", null); - assertTrue(policy instanceof SearchRowPolicy); - - policy = new DocumentProtocol(manager).createPolicy("SearchColumn", null); - assertTrue(policy instanceof SearchColumnPolicy); - policy = new DocumentProtocol(manager).createPolicy("SubsetService", null); assertTrue(policy instanceof SubsetServicePolicy); @@ -330,125 +322,6 @@ public class PolicyTestCase { } @Test - public void testSearchRow() { - PolicyTestFrame frame = new PolicyTestFrame(manager); - frame.setMessage(new PutDocumentMessage(new DocumentPut(new Document(manager.getDocumentType("testdoc"), - new DocumentId("doc:scheme:"))))); - frame.setHop(new HopSpec("test", "[SearchRow]").addRecipient("foo")); - frame.assertMergeOneReply("foo"); - frame.setHop(new HopSpec("test", "[SearchRow]").addRecipient("foo").addRecipient("bar")); - frame.assertMergeTwoReplies("foo", "bar"); - - frame.setHop(new HopSpec("test", "[SearchRow:1]").addRecipient("foo")); - Map<String, Integer> replies = new HashMap<>(); - replies.put("foo", ErrorCode.SERVICE_OOS); - frame.assertMergeError(replies, Arrays.asList(ErrorCode.SERVICE_OOS)); - - frame.setHop(new HopSpec("test", "[SearchRow:1]").addRecipient("foo").addRecipient("bar")); - replies.put("foo", ErrorCode.SERVICE_OOS); - replies.put("bar", ErrorCode.NONE); - frame.assertMergeOk(replies, Arrays.asList("bar")); - - replies.put("foo", ErrorCode.SERVICE_OOS); - replies.put("bar", ErrorCode.SERVICE_OOS); - frame.assertMergeError(replies, Arrays.asList(ErrorCode.SERVICE_OOS, ErrorCode.SERVICE_OOS)); - - frame.setHop(new HopSpec("test", "[SearchRow:1]").addRecipient("foo").addRecipient("bar").addRecipient("baz")); - replies.put("foo", ErrorCode.SERVICE_OOS); - replies.put("bar", ErrorCode.NONE); - replies.put("baz", ErrorCode.NONE); - frame.assertMergeOk(replies, Arrays.asList("bar", "baz")); - - replies.put("foo", ErrorCode.SERVICE_OOS); - replies.put("bar", ErrorCode.SERVICE_OOS); - replies.put("baz", ErrorCode.NONE); - frame.assertMergeOk(replies, Arrays.asList("baz")); - - replies.put("foo", ErrorCode.SERVICE_OOS); - replies.put("bar", ErrorCode.SERVICE_OOS); - replies.put("baz", ErrorCode.SERVICE_OOS); - frame.assertMergeError(replies, - Arrays.asList(ErrorCode.SERVICE_OOS, ErrorCode.SERVICE_OOS, ErrorCode.SERVICE_OOS)); - - frame.setHop(new HopSpec("test", "[SearchRow:2]").addRecipient("foo").addRecipient("bar").addRecipient("baz")); - replies.put("foo", ErrorCode.SERVICE_OOS); - replies.put("bar", ErrorCode.NONE); - replies.put("baz", ErrorCode.NONE); - frame.assertMergeOk(replies, Arrays.asList("bar", "baz")); - - replies.put("foo", ErrorCode.SERVICE_OOS); - replies.put("bar", ErrorCode.SERVICE_OOS); - replies.put("baz", ErrorCode.NONE); - frame.assertMergeError(replies, Arrays.asList(ErrorCode.SERVICE_OOS, ErrorCode.SERVICE_OOS)); - - replies.put("foo", ErrorCode.SERVICE_OOS); - replies.put("bar", ErrorCode.SERVICE_OOS); - replies.put("baz", ErrorCode.SERVICE_OOS); - frame.assertMergeError(replies, - Arrays.asList(ErrorCode.SERVICE_OOS, ErrorCode.SERVICE_OOS, ErrorCode.SERVICE_OOS)); - - frame.destroy(); - } - - @Test - public void testSearchRowMerge() { - PolicyTestFrame frame = new PolicyTestFrame(manager); - frame.setHop(new HopSpec("test", "[SearchRow]").addRecipient("foo")); - tryWasFound(frame, 1, 0x0, false); - tryWasFound(frame, 1, 0x1, true); - - frame.setHop(new HopSpec("test", "[SearchRow]").addRecipient("foo").addRecipient("bar")); - tryWasFound(frame, 2, 0x0, false); - tryWasFound(frame, 2, 0x1, true); - tryWasFound(frame, 2, 0x2, true); - tryWasFound(frame, 2, 0x3, true); - - frame.setHop(new HopSpec("test", "[SearchRow]").addRecipient("foo").addRecipient("bar").addRecipient("baz")); - tryWasFound(frame, 3, 0x0, false); - tryWasFound(frame, 3, 0x1, true); - tryWasFound(frame, 3, 0x2, true); - tryWasFound(frame, 3, 0x3, true); - tryWasFound(frame, 3, 0x4, true); - tryWasFound(frame, 3, 0x5, true); - tryWasFound(frame, 3, 0x6, true); - tryWasFound(frame, 3, 0x7, true); - frame.destroy(); - } - - private void tryWasFound(PolicyTestFrame frame, int expectedRecipients, - int foundMask, boolean expectedFound) - { - { - frame.setMessage(new RemoveDocumentMessage(new DocumentId("doc:scheme:69"))); - List<RoutingNode> selected = frame.select(expectedRecipients); - for (int i = 0, len = selected.size(); i < len; ++i) { - RemoveDocumentReply reply = new RemoveDocumentReply(); - reply.setWasFound(((1 << i) & foundMask) != 0); - selected.get(i).handleReply(reply); - } - Reply reply = frame.getReceptor().getReply(TIMEOUT); - assertNotNull(reply); - assertEquals(DocumentProtocol.REPLY_REMOVEDOCUMENT, reply.getType()); - assertEquals(expectedFound, ((RemoveDocumentReply)reply).wasFound()); - } - { - DocumentUpdate upd = new DocumentUpdate(manager.getDocumentType("testdoc"), - new DocumentId("doc:scheme:")); - frame.setMessage(new UpdateDocumentMessage(upd)); - List<RoutingNode> selected = frame.select(expectedRecipients); - for (int i = 0, len = selected.size(); i < len; ++i) { - UpdateDocumentReply reply = new UpdateDocumentReply(); - reply.setWasFound(((1 << i) & foundMask) != 0); - selected.get(i).handleReply(reply); - } - Reply reply = frame.getReceptor().getReply(TIMEOUT); - assertNotNull(reply); - assertEquals(DocumentProtocol.REPLY_UPDATEDOCUMENT, reply.getType()); - assertEquals(expectedFound, ((UpdateDocumentReply)reply).wasFound()); - } - } - - @Test public void multipleGetRepliesAreMergedToFoundDocument() { PolicyTestFrame frame = new PolicyTestFrame(manager); frame.setHop(new HopSpec("test", getDocumentRouteSelectorRawConfig()) @@ -483,74 +356,6 @@ public class PolicyTestCase { "route[1].feed \"myfeed\"\n]"; } - @Test - public void testSearchColumn() { - PolicyTestFrame frame = new PolicyTestFrame(manager); - frame.setHop(new HopSpec("test", "[SearchColumn]") - .addRecipient("c0").addRecipient("c1") - .addRecipient("c2").addRecipient("c3")); - - // Test hash distribution. - assertDistribution(frame, "doc:ns:3", "c0"); - assertDistribution(frame, "doc:ns:18", "c1"); - assertDistribution(frame, "doc:ns:0", "c2"); - assertDistribution(frame, "doc:ns:4", "c3"); - - assertDistribution(frame, "userdoc:ns:49152:0", "c0"); - assertDistribution(frame, "userdoc:ns:49152:1", "c0"); - assertDistribution(frame, "userdoc:ns:16384:2", "c1"); - assertDistribution(frame, "userdoc:ns:16384:3", "c1"); - assertDistribution(frame, "userdoc:ns:5461:4", "c2"); - assertDistribution(frame, "userdoc:ns:5461:5", "c2"); - assertDistribution(frame, "userdoc:ns:0:6", "c3"); - assertDistribution(frame, "userdoc:ns:0:7", "c3"); - - assertDistribution(frame, "groupdoc:ns:0:0", "c0"); - assertDistribution(frame, "groupdoc:ns:0:1", "c0"); - assertDistribution(frame, "groupdoc:ns:4:2", "c1"); - assertDistribution(frame, "groupdoc:ns:4:3", "c1"); - assertDistribution(frame, "groupdoc:ns:2:4", "c2"); - assertDistribution(frame, "groupdoc:ns:2:5", "c2"); - assertDistribution(frame, "groupdoc:ns:7:6", "c3"); - assertDistribution(frame, "groupdoc:ns:7:7", "c3"); - - // Test routing based on message type. - Message put = new PutDocumentMessage(new DocumentPut(new Document(manager.getDocumentType("testdoc"), - new DocumentId("doc:scheme:")))); - frame.setHop(new HopSpec("test", "[SearchColumn]").addRecipient("c0").addRecipient("c1")); - frame.setMessage(put); - frame.assertMergeOneReply("c0"); - - // Test allowed bad parts. - frame.setHop(new HopSpec("test", "[SearchColumn:1]").addRecipient("c0")); - frame.setMessage(put); - Map<String, Integer> replies = new HashMap<>(); - replies.put("c0", ErrorCode.SERVICE_OOS); - frame.assertMergeOk(replies, null); - - replies.put("c0", ErrorCode.SERVICE_OOS); - frame.assertMergeOk(replies, null); - - frame.setHop(new HopSpec("test", "[SearchColumn:1]").addRecipient("c0").addRecipient("c1")); - frame.setMessage(put); - replies.put("c0", ErrorCode.SERVICE_OOS); - frame.assertMergeOk(replies, null); - - frame.setHop(new HopSpec("test", "[SearchColumn:1]").addRecipient("c0").addRecipient("c1").addRecipient("c2")); - frame.setMessage(put); - replies.clear(); - replies.put("c0", ErrorCode.SERVICE_OOS); - frame.assertMergeOk(replies, null); - - frame.setHop(new HopSpec("test", "[SearchColumn:2]").addRecipient("c0").addRecipient("c1").addRecipient("c2")); - frame.setMessage(put); - replies.clear(); - replies.put("c0", ErrorCode.SERVICE_OOS); - frame.assertMergeOk(replies, null); - - frame.destroy(); - } - private void assertDistribution(PolicyTestFrame frame, String id, String expected) { frame.setMessage(new PutDocumentMessage(new DocumentPut(new Document(manager.getDocumentType("testdoc"), new DocumentId(id))))); diff --git a/documentapi/src/tests/messagebus/messagebus_test.cpp b/documentapi/src/tests/messagebus/messagebus_test.cpp index d8920b0577b..919dc6b6570 100644 --- a/documentapi/src/tests/messagebus/messagebus_test.cpp +++ b/documentapi/src/tests/messagebus/messagebus_test.cpp @@ -92,13 +92,7 @@ void Test::testProtocol() { DocumentProtocol protocol(set, _repo); EXPECT_TRUE(protocol.getName() == "document"); - IRoutingPolicy::UP policy = protocol.createPolicy(string("SearchRow"),string("")); - EXPECT_TRUE(policy.get() != NULL); - - policy = protocol.createPolicy(string("SearchColumn"),string("")); - EXPECT_TRUE(policy.get() != NULL); - - policy = protocol.createPolicy(string("DocumentRouteSelector"), string("file:documentrouteselectorpolicy.cfg")); + IRoutingPolicy::UP policy = protocol.createPolicy(string("DocumentRouteSelector"), string("file:documentrouteselectorpolicy.cfg")); EXPECT_TRUE(policy.get() != NULL); policy = protocol.createPolicy(string(""),string("")); diff --git a/documentapi/src/tests/policies/policies_test.cpp b/documentapi/src/tests/policies/policies_test.cpp index 9d38247ebfd..58db3079631 100644 --- a/documentapi/src/tests/policies/policies_test.cpp +++ b/documentapi/src/tests/policies/policies_test.cpp @@ -10,8 +10,6 @@ #include <vespa/documentapi/messagebus/policies/loadbalancerpolicy.h> #include <vespa/documentapi/messagebus/policies/localservicepolicy.h> #include <vespa/documentapi/messagebus/policies/roundrobinpolicy.h> -#include <vespa/documentapi/messagebus/policies/searchcolumnpolicy.h> -#include <vespa/documentapi/messagebus/policies/searchrowpolicy.h> #include <vespa/documentapi/messagebus/policies/storagepolicy.h> #include <vespa/documentapi/messagebus/policies/subsetservicepolicy.h> #include <vespa/messagebus/emptyreply.h> @@ -39,6 +37,8 @@ using document::readDocumenttypesConfig; using slobrok::api::IMirrorAPI; using namespace documentapi; using vespalib::make_string; +using std::make_unique; +using std::make_shared; class Test : public vespalib::TestApp { private: @@ -48,16 +48,12 @@ private: private: bool trySelect(TestFrame &frame, uint32_t numSelects, const std::vector<string> &expected); - bool tryDistribution(TestFrame &frame, const string &id, const string &expected); - void tryWasFound(TestFrame &frame, uint32_t expectedRecipients, uint32_t foundMask, bool expectedFound); - void setupExternPolicy(TestFrame &frame, mbus::Slobrok &slobrok, const string &pattern, - int32_t numEntries = -1); + void setupExternPolicy(TestFrame &frame, mbus::Slobrok &slobrok, const string &pattern, int32_t numEntries = -1); StoragePolicy &setupStoragePolicy(TestFrame &frame, const string ¶m, const string &pattern = "", int32_t numEntries = -1); bool isErrorPolicy(const string &name, const string ¶m); void assertMirrorReady(const IMirrorAPI &mirror); - void assertMirrorContains(const IMirrorAPI &mirror, const string &pattern, - uint32_t numEntries); + void assertMirrorContains(const IMirrorAPI &mirror, const string &pattern, uint32_t numEntries); mbus::Message::UP newPutDocumentMessage(const string &documentId); public: @@ -75,9 +71,6 @@ public: void testProtocol(); void testRoundRobin(); void testRoundRobinCache(); - void testSearchColumn(); - void testSearchRow(); - void testSearchRowMerge(); void multipleGetRepliesAreMergedToFoundDocument(); void testSubsetService(); void testSubsetServiceCache(); @@ -117,9 +110,6 @@ Test::Main() { testLocalServiceCache(); TEST_FLUSH(); testRoundRobin(); TEST_FLUSH(); testRoundRobinCache(); TEST_FLUSH(); - testSearchColumn(); TEST_FLUSH(); - testSearchRow(); TEST_FLUSH(); - testSearchRowMerge(); TEST_FLUSH(); testSubsetService(); TEST_FLUSH(); testSubsetServiceCache(); TEST_FLUSH(); @@ -144,43 +134,34 @@ Test::testProtocol() mbus::IProtocol::SP protocol(new DocumentProtocol(_loadTypes, _repo)); mbus::IRoutingPolicy::UP policy = protocol->createPolicy("AND", ""); - ASSERT_TRUE(dynamic_cast<ANDPolicy*>(policy.get()) != NULL); + ASSERT_TRUE(dynamic_cast<ANDPolicy*>(policy.get()) != nullptr); policy = protocol->createPolicy("DocumentRouteSelector", "raw:route[0]\n"); - ASSERT_TRUE(dynamic_cast<DocumentRouteSelectorPolicy*>(policy.get()) != NULL); + ASSERT_TRUE(dynamic_cast<DocumentRouteSelectorPolicy*>(policy.get()) != nullptr); policy = protocol->createPolicy("Extern", "foo;bar/baz"); - ASSERT_TRUE(dynamic_cast<ExternPolicy*>(policy.get()) != NULL); + ASSERT_TRUE(dynamic_cast<ExternPolicy*>(policy.get()) != nullptr); policy = protocol->createPolicy("LoadBalancer", "cluster=docproc/cluster.default;" "session=chain.default;syncinit"); - ASSERT_TRUE(dynamic_cast<LoadBalancerPolicy*>(policy.get()) != NULL); + ASSERT_TRUE(dynamic_cast<LoadBalancerPolicy*>(policy.get()) != nullptr); policy = protocol->createPolicy("LocalService", ""); - ASSERT_TRUE(dynamic_cast<LocalServicePolicy*>(policy.get()) != NULL); + ASSERT_TRUE(dynamic_cast<LocalServicePolicy*>(policy.get()) != nullptr); policy = protocol->createPolicy("RoundRobin", ""); - ASSERT_TRUE(dynamic_cast<RoundRobinPolicy*>(policy.get()) != NULL); - - policy = protocol->createPolicy("SearchRow", ""); - ASSERT_TRUE(dynamic_cast<SearchRowPolicy*>(policy.get()) != NULL); - - policy = protocol->createPolicy("SearchColumn", ""); - ASSERT_TRUE(dynamic_cast<SearchColumnPolicy*>(policy.get()) != NULL); + ASSERT_TRUE(dynamic_cast<RoundRobinPolicy*>(policy.get()) != nullptr); policy = protocol->createPolicy("SubsetService", ""); - ASSERT_TRUE(dynamic_cast<SubsetServicePolicy*>(policy.get()) != NULL); + ASSERT_TRUE(dynamic_cast<SubsetServicePolicy*>(policy.get()) != nullptr); } void Test::testAND() { TestFrame frame(_repo); - frame.setMessage(mbus::Message::UP(new PutDocumentMessage( - document::Document::SP( - new document::Document(*_docType, - DocumentId("doc:scheme:")))))); + frame.setMessage(make_unique<PutDocumentMessage>(make_shared<Document>(*_docType, DocumentId("doc:scheme:")))); frame.setHop(mbus::HopSpec("test", "[AND]") .addRecipient("foo") .addRecipient("bar")); @@ -242,7 +223,7 @@ Test::requireThatExternPolicySelectsFromExternSlobrok() lst.insert(leaf[0]->getRoute().toString()); leaf[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply())); - ASSERT_TRUE(frame.getReceptor().getReply(600).get() != NULL); + ASSERT_TRUE(frame.getReceptor().getReply(600)); } EXPECT_EQUAL(servers.size(), lst.size()); for (uint32_t i = 0; i < servers.size(); ++i) { @@ -267,15 +248,14 @@ mbus::Message::UP Test::newPutDocumentMessage(const string &documentId) { Document::SP doc(new Document(*_docType, DocumentId(documentId))); - return mbus::Message::UP(new PutDocumentMessage(doc)); + return make_unique<PutDocumentMessage>(doc); } void Test::setupExternPolicy(TestFrame &frame, mbus::Slobrok &slobrok, const string &pattern, int32_t numEntries) { - string param = vespalib::make_string("tcp/localhost:%d;%s", - slobrok.port(), pattern.c_str()); + string param = vespalib::make_string("tcp/localhost:%d;%s", slobrok.port(), pattern.c_str()); frame.setHop(mbus::HopSpec("test", vespalib::make_string("[Extern:%s]", param.c_str()))); mbus::MessageBus &mbus = frame.getMessageBus(); const mbus::HopBlueprint *hop = mbus.getRoutingTable(DocumentProtocol::NAME)->getHop("test"); @@ -340,7 +320,7 @@ Test::testExternSend() mbus::DestinationSession::UP ds = dst.mb.createDestinationSession("session", true, dr); // Send message from local node to remote cluster and resolve route there. - mbus::Message::UP msg(new GetDocumentMessage(document::DocumentId("doc:scheme:"), 0)); + mbus::Message::UP msg(new GetDocumentMessage(DocumentId("doc:scheme:"), 0)); msg->getTrace().setLevel(9); msg->setRoute(mbus::Route::parse(vespalib::make_string("[Extern:tcp/localhost:%d;itr/session] default", slobrok.port()))); @@ -376,7 +356,7 @@ Test::testExternMultipleSlobroks() std::make_shared<DocumentProtocol>(_loadTypes, _repo)); mbus::DestinationSession::UP ds = dst.mb.createDestinationSession("session", true, dr); - mbus::Message::UP msg(new GetDocumentMessage(document::DocumentId("doc:scheme:"), 0)); + mbus::Message::UP msg(new GetDocumentMessage(DocumentId("doc:scheme:"), 0)); msg->setRoute(mbus::Route::parse(vespalib::make_string("[Extern:%s;dst/session]", spec.c_str()))); ASSERT_TRUE(ss->send(std::move(msg)).isAccepted()); ASSERT_TRUE((msg = dr.getMessage(600))); @@ -392,7 +372,7 @@ Test::testExternMultipleSlobroks() std::make_shared<DocumentProtocol>(_loadTypes, _repo)); mbus::DestinationSession::UP ds = dst.mb.createDestinationSession("session", true, dr); - mbus::Message::UP msg(new GetDocumentMessage(document::DocumentId("doc:scheme:"), 0)); + mbus::Message::UP msg(new GetDocumentMessage(DocumentId("doc:scheme:"), 0)); msg->setRoute(mbus::Route::parse(vespalib::make_string("[Extern:%s;dst/session]", spec.c_str()))); ASSERT_TRUE(ss->send(std::move(msg)).isAccepted()); ASSERT_TRUE((msg = dr.getMessage(600))); @@ -407,8 +387,7 @@ Test::testLocalService() { // Prepare message. TestFrame frame(_repo, "docproc/cluster.default"); - frame.setMessage(mbus::Message::UP(new PutDocumentMessage(Document::SP( - new Document(*_docType, DocumentId("doc:scheme:")))))); + frame.setMessage(make_unique<PutDocumentMessage>(make_shared<Document>(*_docType, DocumentId("doc:scheme:")))); // Test select with proper address. for (uint32_t i = 0; i < 10; ++i) { @@ -424,7 +403,7 @@ Test::testLocalService() lst.insert(leaf[0]->getRoute().toString()); leaf[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply())); - ASSERT_TRUE(frame.getReceptor().getReply(600).get() != NULL); + ASSERT_TRUE(frame.getReceptor().getReply(600)); } EXPECT_EQUAL(10u, lst.size()); @@ -437,7 +416,7 @@ Test::testLocalService() lst.insert(leaf[0]->getRoute().toString()); leaf[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply())); - ASSERT_TRUE(frame.getReceptor().getReply(600).get() != NULL); + ASSERT_TRUE(frame.getReceptor().getReply(600)); } EXPECT_EQUAL(1u, lst.size()); EXPECT_EQUAL("docproc/cluster.default/*/chain.default", *lst.begin()); @@ -452,12 +431,12 @@ Test::testLocalServiceCache() { TestFrame fooFrame(_repo, "docproc/cluster.default"); mbus::HopSpec fooHop("foo", "docproc/cluster.default/[LocalService]/chain.foo"); - fooFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(document::DocumentId("doc:scheme:foo")))); + fooFrame.setMessage(make_unique<GetDocumentMessage>(DocumentId("doc:scheme:foo"))); fooFrame.setHop(fooHop); TestFrame barFrame(fooFrame); mbus::HopSpec barHop("test", "docproc/cluster.default/[LocalService]/chain.bar"); - barFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(document::DocumentId("doc:scheme:bar")))); + barFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(DocumentId("doc:scheme:bar")))); barFrame.setHop(barHop); fooFrame.getMessageBus().setupRouting( @@ -480,8 +459,8 @@ Test::testLocalServiceCache() barSelected[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply())); fooSelected[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply())); - ASSERT_TRUE(barFrame.getReceptor().getReply(600).get() != NULL); - ASSERT_TRUE(fooFrame.getReceptor().getReply(600).get() != NULL); + ASSERT_TRUE(barFrame.getReceptor().getReply(600)); + ASSERT_TRUE(fooFrame.getReceptor().getReply(600)); } void @@ -489,9 +468,7 @@ Test::testRoundRobin() { // Prepare message. TestFrame frame(_repo, "docproc/cluster.default"); - frame.setMessage(mbus::Message::UP(new PutDocumentMessage(Document::SP( - new Document(*_docType, - DocumentId("doc:scheme:")))))); + frame.setMessage(make_unique<PutDocumentMessage>(make_shared<Document>(*_docType, DocumentId("doc:scheme:")))); // Test select with proper address. for (uint32_t i = 0; i < 10; ++i) { @@ -530,13 +507,13 @@ Test::testRoundRobinCache() TestFrame fooFrame(_repo, "docproc/cluster.default"); mbus::HopSpec fooHop("foo", "[RoundRobin]"); fooHop.addRecipient("docproc/cluster.default/0/chain.foo"); - fooFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(document::DocumentId("doc:scheme:foo")))); + fooFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(DocumentId("doc:scheme:foo")))); fooFrame.setHop(fooHop); TestFrame barFrame(fooFrame); mbus::HopSpec barHop("bar", "[RoundRobin]"); barHop.addRecipient("docproc/cluster.default/0/chain.bar"); - barFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(document::DocumentId("doc:scheme:bar")))); + barFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(DocumentId("doc:scheme:bar")))); barFrame.setHop(barHop); fooFrame.getMessageBus().setupRouting( @@ -559,155 +536,8 @@ Test::testRoundRobinCache() barSelected[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply())); fooSelected[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply())); - ASSERT_TRUE(barFrame.getReceptor().getReply(600).get() != NULL); - ASSERT_TRUE(fooFrame.getReceptor().getReply(600).get() != NULL); -} - -void -Test::testSearchRow() -{ - TestFrame frame(_repo); - frame.setMessage(mbus::Message::UP(new PutDocumentMessage(Document::SP( - new Document(*_docType, - DocumentId("doc:scheme:")))))); - frame.setHop(mbus::HopSpec("test", "[SearchRow]") - .addRecipient("foo")); - EXPECT_TRUE(frame.testMergeOneReply("foo")); - frame.setHop(mbus::HopSpec("test", "[SearchRow]") - .addRecipient("foo") - .addRecipient("bar")); - EXPECT_TRUE(frame.testMergeTwoReplies("foo", "bar")); - - frame.setHop(mbus::HopSpec("test", "[SearchRow:1]") - .addRecipient("foo")); - TestFrame::ReplyMap replies; - replies["foo"] = mbus::ErrorCode::SERVICE_OOS; - EXPECT_TRUE(frame.testMergeError(replies, UIntList().add(mbus::ErrorCode::SERVICE_OOS))); - - frame.setHop(mbus::HopSpec("test", "[SearchRow:1]") - .addRecipient("foo") - .addRecipient("bar")); - replies["foo"] = mbus::ErrorCode::SERVICE_OOS; - replies["bar"] = mbus::ErrorCode::NONE; - EXPECT_TRUE(frame.testMergeOk(replies, StringList().add("bar"))); - - replies["foo"] = mbus::ErrorCode::SERVICE_OOS; - replies["bar"] = mbus::ErrorCode::SERVICE_OOS; - EXPECT_TRUE(frame.testMergeError(replies, UIntList() - .add(mbus::ErrorCode::SERVICE_OOS) - .add(mbus::ErrorCode::SERVICE_OOS))); - - frame.setHop(mbus::HopSpec("test", "[SearchRow:1]") - .addRecipient("foo") - .addRecipient("bar") - .addRecipient("baz")); - replies["foo"] = mbus::ErrorCode::SERVICE_OOS; - replies["bar"] = mbus::ErrorCode::NONE; - replies["baz"] = mbus::ErrorCode::NONE; - EXPECT_TRUE(frame.testMergeOk(replies, StringList().add("bar").add("baz"))); - - replies["foo"] = mbus::ErrorCode::SERVICE_OOS; - replies["bar"] = mbus::ErrorCode::SERVICE_OOS; - replies["baz"] = mbus::ErrorCode::NONE; - EXPECT_TRUE(frame.testMergeOk(replies, StringList().add("baz"))); - - replies["foo"] = mbus::ErrorCode::SERVICE_OOS; - replies["bar"] = mbus::ErrorCode::SERVICE_OOS; - replies["baz"] = mbus::ErrorCode::SERVICE_OOS; - EXPECT_TRUE(frame.testMergeError(replies, UIntList() - .add(mbus::ErrorCode::SERVICE_OOS) - .add(mbus::ErrorCode::SERVICE_OOS) - .add(mbus::ErrorCode::SERVICE_OOS))); - - frame.setHop(mbus::HopSpec("test", "[SearchRow:2]") - .addRecipient("foo") - .addRecipient("bar") - .addRecipient("baz")); - replies["foo"] = mbus::ErrorCode::SERVICE_OOS; - replies["bar"] = mbus::ErrorCode::NONE; - replies["baz"] = mbus::ErrorCode::NONE; - EXPECT_TRUE(frame.testMergeOk(replies, StringList().add("bar").add("baz"))); - - replies["foo"] = mbus::ErrorCode::SERVICE_OOS; - replies["bar"] = mbus::ErrorCode::SERVICE_OOS; - replies["baz"] = mbus::ErrorCode::NONE; - EXPECT_TRUE(frame.testMergeError(replies, UIntList() - .add(mbus::ErrorCode::SERVICE_OOS) - .add(mbus::ErrorCode::SERVICE_OOS))); - - replies["foo"] = mbus::ErrorCode::SERVICE_OOS; - replies["bar"] = mbus::ErrorCode::SERVICE_OOS; - replies["baz"] = mbus::ErrorCode::SERVICE_OOS; - EXPECT_TRUE(frame.testMergeError(replies, UIntList() - .add(mbus::ErrorCode::SERVICE_OOS) - .add(mbus::ErrorCode::SERVICE_OOS) - .add(mbus::ErrorCode::SERVICE_OOS))); -} - -void -Test::testSearchRowMerge() -{ - TestFrame frame(_repo); - frame.setHop(mbus::HopSpec("test", "[SearchRow]") - .addRecipient("foo")); - tryWasFound(frame, 1, 0x0, false); - tryWasFound(frame, 1, 0x1, true); - - frame.setHop(mbus::HopSpec("test", "[SearchRow]") - .addRecipient("foo") - .addRecipient("bar")); - tryWasFound(frame, 2, 0x0, false); - tryWasFound(frame, 2, 0x1, true); - tryWasFound(frame, 2, 0x2, true); - tryWasFound(frame, 2, 0x3, true); - - frame.setHop(mbus::HopSpec("test", "[SearchRow]") - .addRecipient("foo") - .addRecipient("bar") - .addRecipient("baz")); - tryWasFound(frame, 3, 0x0, false); - tryWasFound(frame, 3, 0x1, true); - tryWasFound(frame, 3, 0x2, true); - tryWasFound(frame, 3, 0x3, true); - tryWasFound(frame, 3, 0x4, true); - tryWasFound(frame, 3, 0x5, true); - tryWasFound(frame, 3, 0x6, true); - tryWasFound(frame, 3, 0x7, true); -} - -void -Test::tryWasFound(TestFrame &frame, uint32_t expectedRecipients, - uint32_t foundMask, bool expectedFound) -{ - { - frame.setMessage(mbus::Message::UP(new RemoveDocumentMessage(DocumentId("doc:scheme:69")))); - std::vector<mbus::RoutingNode*> selected; - EXPECT_TRUE(frame.select(selected, expectedRecipients)); - for (uint32_t i = 0, len = selected.size(); i < len; ++i) { - mbus::Reply::UP reply(new RemoveDocumentReply()); - static_cast<RemoveDocumentReply&>(*reply).setWasFound((1 << i) & foundMask); - selected[i]->handleReply(std::move(reply)); - } - mbus::Reply::UP reply = frame.getReceptor().getReply(600); - EXPECT_TRUE(reply.get() != NULL); - EXPECT_EQUAL((uint32_t)DocumentProtocol::REPLY_REMOVEDOCUMENT, reply->getType()); - EXPECT_EQUAL(expectedFound, static_cast<RemoveDocumentReply&>(*reply).wasFound()); - } - { - DocumentUpdate::SP upd(new DocumentUpdate(*_docType, DocumentId("doc:scheme:"))); - frame.setMessage(mbus::Message::UP(new UpdateDocumentMessage(upd))); - std::vector<mbus::RoutingNode*> selected; - EXPECT_TRUE(frame.select(selected, expectedRecipients)); - for (uint32_t i = 0, len = selected.size(); i < len; ++i) { - mbus::Reply::UP reply(new UpdateDocumentReply()); - static_cast<UpdateDocumentReply&>(*reply).setWasFound((1 << i) & foundMask); - selected[i]->handleReply(std::move(reply)); - } - mbus::Reply::UP reply = frame.getReceptor().getReply(600); - EXPECT_TRUE(reply.get() != NULL); - EXPECT_EQUAL((uint32_t)DocumentProtocol::REPLY_UPDATEDOCUMENT, reply->getType()); - EXPECT_EQUAL(expectedFound, static_cast<UpdateDocumentReply&>(*reply).wasFound()); - } + ASSERT_TRUE(barFrame.getReceptor().getReply(600)); + ASSERT_TRUE(fooFrame.getReceptor().getReply(600)); } void @@ -724,11 +554,11 @@ Test::multipleGetRepliesAreMergedToFoundDocument() "route[1].feed \"myfeed\"\n]") .addRecipient("foo") .addRecipient("bar")); - frame.setMessage(mbus::Message::UP(new GetDocumentMessage(DocumentId("doc:scheme:yarn")))); + frame.setMessage(make_unique<GetDocumentMessage>(DocumentId("doc:scheme:yarn"))); std::vector<mbus::RoutingNode*> selected; EXPECT_TRUE(frame.select(selected, 2)); for (uint32_t i = 0, len = selected.size(); i < len; ++i) { - document::Document::SP doc; + Document::SP doc; if (i == 0) { doc.reset(new Document(*_docType, DocumentId("doc:scheme:yarn"))); doc->setLastModified(123456ULL); @@ -737,62 +567,12 @@ Test::multipleGetRepliesAreMergedToFoundDocument() selected[i]->handleReply(std::move(reply)); } mbus::Reply::UP reply = frame.getReceptor().getReply(600); - EXPECT_TRUE(reply.get() != NULL); - EXPECT_EQUAL(static_cast<uint32_t>(DocumentProtocol::REPLY_GETDOCUMENT), - reply->getType()); + EXPECT_TRUE(reply); + EXPECT_EQUAL(static_cast<uint32_t>(DocumentProtocol::REPLY_GETDOCUMENT), reply->getType()); EXPECT_EQUAL(123456ULL, static_cast<GetDocumentReply&>(*reply).getLastModified()); } void -Test::testSearchColumn() -{ - TestFrame frame(_repo); - frame.setHop(mbus::HopSpec("test", "[SearchColumn]") - .addRecipient("c0") - .addRecipient("c1") - .addRecipient("c2") - .addRecipient("c3")); - - // Test hash distribution. - EXPECT_TRUE(tryDistribution(frame, "doc:ns:3", "c0")); - EXPECT_TRUE(tryDistribution(frame, "doc:ns:18", "c1")); - EXPECT_TRUE(tryDistribution(frame, "doc:ns:0", "c2")); - EXPECT_TRUE(tryDistribution(frame, "doc:ns:4", "c3")); - - EXPECT_TRUE(tryDistribution(frame, "userdoc:ns:49152:0", "c0")); - EXPECT_TRUE(tryDistribution(frame, "userdoc:ns:49152:1", "c0")); - EXPECT_TRUE(tryDistribution(frame, "userdoc:ns:16384:2", "c1")); - EXPECT_TRUE(tryDistribution(frame, "userdoc:ns:16384:3", "c1")); - EXPECT_TRUE(tryDistribution(frame, "userdoc:ns:5461:4", "c2")); - EXPECT_TRUE(tryDistribution(frame, "userdoc:ns:5461:5", "c2")); - EXPECT_TRUE(tryDistribution(frame, "userdoc:ns:0:6", "c3")); - EXPECT_TRUE(tryDistribution(frame, "userdoc:ns:0:7", "c3")); - - EXPECT_TRUE(tryDistribution(frame, "groupdoc:ns:0:0", "c0")); - EXPECT_TRUE(tryDistribution(frame, "groupdoc:ns:0:1", "c0")); - EXPECT_TRUE(tryDistribution(frame, "groupdoc:ns:4:2", "c1")); - EXPECT_TRUE(tryDistribution(frame, "groupdoc:ns:4:3", "c1")); - EXPECT_TRUE(tryDistribution(frame, "groupdoc:ns:2:4", "c2")); - EXPECT_TRUE(tryDistribution(frame, "groupdoc:ns:2:5", "c2")); - EXPECT_TRUE(tryDistribution(frame, "groupdoc:ns:7:6", "c3")); - EXPECT_TRUE(tryDistribution(frame, "groupdoc:ns:7:7", "c3")); - - // Test routing based on message type. - mbus::Message::UP put(new PutDocumentMessage(Document::SP( - new Document(*_docType, - DocumentId("doc:scheme:"))))); -} - -bool -Test::tryDistribution(TestFrame &frame, const string &id, const string &expected) -{ - Document::SP doc(new Document(*_docType, DocumentId(id))); - mbus::Message::UP msg(new PutDocumentMessage(doc)); - frame.setMessage(std::move(msg)); - return frame.testSelect(StringList().add(expected)); -} - -void Test::testDocumentRouteSelector() { // Test policy with usage safeguard. @@ -804,13 +584,13 @@ Test::testDocumentRouteSelector() "route[0].feed \"baz\"\n"; { DocumentProtocol protocol(_loadTypes, _repo, okConfig); - EXPECT_TRUE(dynamic_cast<DocumentRouteSelectorPolicy*>(protocol.createPolicy("DocumentRouteSelector", "").get()) != NULL); - EXPECT_TRUE(dynamic_cast<ErrorPolicy*>(protocol.createPolicy("DocumentRouteSelector", errConfig).get()) != NULL); + EXPECT_TRUE(dynamic_cast<DocumentRouteSelectorPolicy*>(protocol.createPolicy("DocumentRouteSelector", "").get()) != nullptr); + EXPECT_TRUE(dynamic_cast<ErrorPolicy*>(protocol.createPolicy("DocumentRouteSelector", errConfig).get()) != nullptr); } { DocumentProtocol protocol(_loadTypes, _repo, errConfig); - EXPECT_TRUE(dynamic_cast<ErrorPolicy*>(protocol.createPolicy("DocumentRouteSelector", "").get()) != NULL); - EXPECT_TRUE(dynamic_cast<DocumentRouteSelectorPolicy*>(protocol.createPolicy("DocumentRouteSelector", okConfig).get()) != NULL); + EXPECT_TRUE(dynamic_cast<ErrorPolicy*>(protocol.createPolicy("DocumentRouteSelector", "").get()) != nullptr); + EXPECT_TRUE(dynamic_cast<DocumentRouteSelectorPolicy*>(protocol.createPolicy("DocumentRouteSelector", okConfig).get()) != nullptr); } // Test policy with proper config. @@ -826,19 +606,17 @@ Test::testDocumentRouteSelector() .addRecipient("foo") .addRecipient("bar")); - frame.setMessage(mbus::Message::UP(new GetDocumentMessage(document::DocumentId("doc:scheme:"), 0))); + frame.setMessage(make_unique<GetDocumentMessage>(DocumentId("doc:scheme:"), 0)); EXPECT_TRUE(frame.testSelect(StringList().add("foo").add("bar"))); - mbus::Message::UP put(new PutDocumentMessage(Document::SP( - new Document(*_docType, - DocumentId("doc:scheme:"))))); + mbus::Message::UP put = make_unique<PutDocumentMessage>(make_shared<Document>(*_docType, DocumentId("doc:scheme:"))); frame.setMessage(std::move(put)); EXPECT_TRUE(frame.testSelect( StringList().add("foo"))); { vdslib::OperationList opList; - document::DocumentId id("doc:scheme:"); + DocumentId id("doc:scheme:"); Document::UP doc(new Document(*_docType, id)); opList.addPut(std::move(doc)); @@ -849,24 +627,20 @@ Test::testDocumentRouteSelector() { vdslib::OperationList opList; - document::DocumentId id("doc:scheme:"); + DocumentId id("doc:scheme:"); Document::UP doc(new Document(*_repo->getDocumentType("other"), id)); opList.addPut(std::move(doc)); document::BucketIdFactory factory; - put = frame.setMessage(MultiOperationMessage::create(_repo, - factory.getBucketId(id), opList)); + put = frame.setMessage(MultiOperationMessage::create(_repo, factory.getBucketId(id), opList)); EXPECT_TRUE(frame.testSelect(StringList().add("bar"))); } - frame.setMessage(mbus::Message::UP(new RemoveDocumentMessage(document::DocumentId("doc:scheme:")))); + frame.setMessage(mbus::Message::UP(new RemoveDocumentMessage(DocumentId("doc:scheme:")))); EXPECT_TRUE(frame.testSelect(StringList().add("foo").add("bar"))); - frame.setMessage(mbus::Message::UP(new UpdateDocumentMessage( - document::DocumentUpdate::SP( - new document::DocumentUpdate( - *_docType, - DocumentId("doc:scheme:")))))); + frame.setMessage(make_unique<UpdateDocumentMessage>( + make_shared<DocumentUpdate>(*_docType, DocumentId("doc:scheme:")))); EXPECT_TRUE(frame.testSelect(StringList().add("foo"))); frame.setMessage(std::move(put)); @@ -884,22 +658,17 @@ Test::testDocumentRouteSelectorIgnore() "route[0].feed \"myfeed\"\n]") .addRecipient("docproc/cluster.foo")); - frame.setMessage(mbus::Message::UP(new PutDocumentMessage( - document::Document::SP( - new document::Document(*_docType, - DocumentId("id:yarn:testdoc:n=1234:fluff")))))); + frame.setMessage(make_unique<PutDocumentMessage>( + make_shared<Document>(*_docType, DocumentId("id:yarn:testdoc:n=1234:fluff")))); std::vector<mbus::RoutingNode*> leaf; ASSERT_TRUE(frame.select(leaf, 0)); mbus::Reply::UP reply = frame.getReceptor().getReply(600); - ASSERT_TRUE(reply.get() != NULL); + ASSERT_TRUE(reply); EXPECT_EQUAL(uint32_t(DocumentProtocol::REPLY_DOCUMENTIGNORED), reply->getType()); EXPECT_EQUAL(0u, reply->getNumErrors()); - frame.setMessage(mbus::Message::UP(new UpdateDocumentMessage( - document::DocumentUpdate::SP( - new document::DocumentUpdate( - *_docType, - DocumentId("doc:scheme:")))))); + frame.setMessage(make_unique<UpdateDocumentMessage>( + make_shared<DocumentUpdate>(*_docType, DocumentId("doc:scheme:")))); EXPECT_TRUE(frame.testSelect(StringList().add("docproc/cluster.foo"))); } @@ -999,7 +768,7 @@ Test::requireThatStoragePolicyIsRandomWithoutState() StoragePolicy &policy = setupStoragePolicy( frame, param, "storage/cluster.mycluster/distributor/*/default", 5); - ASSERT_TRUE(policy.getSystemState() == NULL); + ASSERT_TRUE(policy.getSystemState() == nullptr); std::set<string> lst; for (uint32_t i = 0; i < 666; i++) { @@ -1022,10 +791,8 @@ Test::setupStoragePolicy(TestFrame &frame, const string ¶m, mbus::MessageBus &mbus = frame.getMessageBus(); const mbus::HopBlueprint *hop = mbus.getRoutingTable(DocumentProtocol::NAME)->getHop("test"); const mbus::PolicyDirective dir = static_cast<mbus::PolicyDirective&>(*hop->getDirective(0)); - StoragePolicy &policy = static_cast<StoragePolicy&>(*mbus.getRoutingPolicy( - DocumentProtocol::NAME, - dir.getName(), - dir.getParam())); + StoragePolicy &policy = static_cast<StoragePolicy&>(*mbus.getRoutingPolicy(DocumentProtocol::NAME, + dir.getName(), dir.getParam())); policy.initSynchronous(); assertMirrorReady(*policy.getMirror()); if (numEntries >= 0) { @@ -1046,7 +813,7 @@ Test::requireThatStoragePolicyIsTargetedWithState() mbus::TestServer *srv = new mbus::TestServer( mbus::Identity(vespalib::make_string("storage/cluster.mycluster/distributor/%d", i)), mbus::RoutingSpec(), slobrok, - mbus::IProtocol::SP(new DocumentProtocol(_loadTypes, _repo))); + make_shared<DocumentProtocol>(_loadTypes, _repo)); servers.push_back(srv); srv->net.registerSession("default"); } @@ -1056,12 +823,12 @@ Test::requireThatStoragePolicyIsTargetedWithState() StoragePolicy &policy = setupStoragePolicy( frame, param, "storage/cluster.mycluster/distributor/*/default", 5); - ASSERT_TRUE(policy.getSystemState() == NULL); + ASSERT_TRUE(policy.getSystemState() == nullptr); { std::vector<mbus::RoutingNode*> leaf; ASSERT_TRUE(frame.select(leaf, 1)); leaf[0]->handleReply(mbus::Reply::UP(new WrongDistributionReply("distributor:5 storage:5"))); - ASSERT_TRUE(policy.getSystemState() != NULL); + ASSERT_TRUE(policy.getSystemState() != nullptr); EXPECT_EQUAL(policy.getSystemState()->toString(), "distributor:5 storage:5"); } std::set<string> lst; @@ -1086,7 +853,7 @@ Test::requireThatStoragePolicyCombinesSystemAndSlobrokState() mbus::Slobrok slobrok; mbus::TestServer server(mbus::Identity("storage/cluster.mycluster/distributor/0"), mbus::RoutingSpec(), slobrok, - mbus::IProtocol::SP(new DocumentProtocol(_loadTypes, _repo))); + make_shared<DocumentProtocol>(_loadTypes, _repo)); server.net.registerSession("default"); string param = vespalib::make_string( @@ -1095,12 +862,12 @@ Test::requireThatStoragePolicyCombinesSystemAndSlobrokState() StoragePolicy &policy = setupStoragePolicy( frame, param, "storage/cluster.mycluster/distributor/*/default", 1); - ASSERT_TRUE(policy.getSystemState() == NULL); + ASSERT_TRUE(policy.getSystemState() == nullptr); { std::vector<mbus::RoutingNode*> leaf; ASSERT_TRUE(frame.select(leaf, 1)); leaf[0]->handleReply(mbus::Reply::UP(new WrongDistributionReply("distributor:99 storage:99"))); - ASSERT_TRUE(policy.getSystemState() != NULL); + ASSERT_TRUE(policy.getSystemState() != nullptr); EXPECT_EQUAL(policy.getSystemState()->toString(), "distributor:99 storage:99"); } for (int i = 0; i < 666; i++) { @@ -1113,9 +880,7 @@ Test::testSubsetService() { // Prepare message. TestFrame frame(_repo, "docproc/cluster.default"); - frame.setMessage(mbus::Message::UP(new PutDocumentMessage(Document::SP( - new Document(*_docType, - DocumentId("doc:scheme:")))))); + frame.setMessage(make_unique<PutDocumentMessage>(make_shared<Document>(*_docType, DocumentId("doc:scheme:")))); // Test requerying for adding nodes. frame.setHop(mbus::HopSpec("test", "docproc/cluster.default/[SubsetService:2]/chain.default")); @@ -1128,7 +893,7 @@ Test::testSubsetService() ASSERT_TRUE(frame.select(leaf, 1)); lst.insert(leaf[0]->getRoute().toString()); leaf[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply())); - ASSERT_TRUE(frame.getReceptor().getReply(600).get() != NULL); + ASSERT_TRUE(frame.getReceptor().getReply(600)); } ASSERT_TRUE(lst.size() > 1); // must have requeried @@ -1147,7 +912,7 @@ Test::testSubsetService() prev = next; leaf[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply())); - ASSERT_TRUE(frame.getReceptor().getReply(600).get() != NULL); + ASSERT_TRUE(frame.getReceptor().getReply(600)); } // Test requerying for dropping nodes. @@ -1164,7 +929,7 @@ Test::testSubsetService() mbus::Reply::UP reply(new mbus::EmptyReply()); reply->addError(mbus::Error(mbus::ErrorCode::NO_ADDRESS_FOR_SERVICE, route)); leaf[0]->handleReply(std::move(reply)); - ASSERT_TRUE(frame.getReceptor().getReply(600).get() != NULL); + ASSERT_TRUE(frame.getReceptor().getReply(600)); } EXPECT_EQUAL(10u, lst.size()); @@ -1178,12 +943,12 @@ Test::testSubsetServiceCache() { TestFrame fooFrame(_repo, "docproc/cluster.default"); mbus::HopSpec fooHop("foo", "docproc/cluster.default/[SubsetService:2]/chain.foo"); - fooFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(document::DocumentId("doc:scheme:foo")))); + fooFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(DocumentId("doc:scheme:foo")))); fooFrame.setHop(fooHop); TestFrame barFrame(fooFrame); mbus::HopSpec barHop("bar", "docproc/cluster.default/[SubsetService:2]/chain.bar"); - barFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(document::DocumentId("doc:scheme:bar")))); + barFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(DocumentId("doc:scheme:bar")))); barFrame.setHop(barHop); fooFrame.getMessageBus().setupRouting( diff --git a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp index 95115d1c3c1..a1837b6bcd0 100644 --- a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp @@ -41,8 +41,6 @@ DocumentProtocol::DocumentProtocol(const LoadTypeSet& loadTypes, putRoutingPolicyFactory("Extern", IRoutingPolicyFactory::SP(new RoutingPolicyFactories::ExternPolicyFactory())); putRoutingPolicyFactory("LocalService", IRoutingPolicyFactory::SP(new RoutingPolicyFactories::LocalServicePolicyFactory())); putRoutingPolicyFactory("RoundRobin", IRoutingPolicyFactory::SP(new RoutingPolicyFactories::RoundRobinPolicyFactory())); - putRoutingPolicyFactory("SearchColumn", IRoutingPolicyFactory::SP(new RoutingPolicyFactories::SearchColumnPolicyFactory())); - putRoutingPolicyFactory("SearchRow", IRoutingPolicyFactory::SP(new RoutingPolicyFactories::SearchRowPolicyFactory())); putRoutingPolicyFactory("Storage", IRoutingPolicyFactory::SP(new RoutingPolicyFactories::StoragePolicyFactory())); putRoutingPolicyFactory("SubsetService", IRoutingPolicyFactory::SP(new RoutingPolicyFactories::SubsetServicePolicyFactory())); putRoutingPolicyFactory("LoadBalancer", IRoutingPolicyFactory::SP(new RoutingPolicyFactories::LoadBalancerPolicyFactory())); @@ -134,8 +132,7 @@ DocumentProtocol::encode(const vespalib::Version &version, const mbus::Routable std::ostringstream message; document::StringUtil::printAsHex( message, blob.data(), blob.size()); - LOG(spam, "Encoded message of protocol %s type %u using version " - "%s serialization:\n%s", + LOG(spam, "Encoded message of protocol %s type %u using version %s serialization:\n%s", routable.getProtocol().c_str(), routable.getType(), version.toString().c_str(), message.str().c_str()); } diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt b/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt index f1a691bc46d..26d51e702e9 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt +++ b/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt @@ -11,8 +11,6 @@ vespa_add_library(documentapi_documentapipolicies OBJECT externpolicy.cpp localservicepolicy.cpp roundrobinpolicy.cpp - searchcolumnpolicy.cpp - searchrowpolicy.cpp subsetservicepolicy.cpp loadbalancer.cpp loadbalancerpolicy.cpp diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/searchcolumnpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/searchcolumnpolicy.cpp deleted file mode 100644 index 38610aca551..00000000000 --- a/documentapi/src/vespa/documentapi/messagebus/policies/searchcolumnpolicy.cpp +++ /dev/null @@ -1,137 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "searchcolumnpolicy.h" -#include <vespa/documentapi/messagebus/documentprotocol.h> -#include <vespa/documentapi/messagebus/messages/getdocumentmessage.h> -#include <vespa/documentapi/messagebus/messages/putdocumentmessage.h> -#include <vespa/documentapi/messagebus/messages/removedocumentmessage.h> -#include <vespa/documentapi/messagebus/messages/updatedocumentmessage.h> -#include <vespa/documentapi/messagebus/messages/batchdocumentupdatemessage.h> -#include <vespa/documentapi/messagebus/messages/multioperationmessage.h> -#include <vespa/messagebus/emptyreply.h> -#include <vespa/vdslib/state/clusterstate.h> -#include <vespa/vespalib/util/hashmap.h> -#include <vespa/log/log.h> -LOG_SETUP(".searchcolumnpolicy"); - -namespace documentapi { - -SearchColumnPolicy::SearchColumnPolicy(const string ¶m) : - _lock(), - _factory(), - _distributions(), - _maxOOS(0) -{ - if (param.length() > 0) { - int maxOOS = atoi(param.c_str()); - if (maxOOS >= 0) { - _maxOOS = (uint32_t)maxOOS; - } else { - LOG(warning, - "Ignoring a request to set the maximum number of OOS replies to %d because it makes no " - "sense. This routing policy will not allow any recipient to be out of service.", maxOOS); - } - } -} - -SearchColumnPolicy::~SearchColumnPolicy() -{ - // empty -} - -void -SearchColumnPolicy::select(mbus::RoutingContext &context) -{ - std::vector<mbus::Route> recipients; - context.getMatchedRecipients(recipients); - if (recipients.empty()) { - return; - } - const document::DocumentId *id = NULL; - document::BucketId bucketId; - - const mbus::Message &msg = context.getMessage(); - switch(msg.getType()) { - case DocumentProtocol::MESSAGE_PUTDOCUMENT: - id = &static_cast<const PutDocumentMessage&>(msg).getDocument().getId(); - break; - - case DocumentProtocol::MESSAGE_GETDOCUMENT: - id = &static_cast<const GetDocumentMessage&>(msg).getDocumentId(); - break; - - case DocumentProtocol::MESSAGE_REMOVEDOCUMENT: - id = &static_cast<const RemoveDocumentMessage&>(msg).getDocumentId(); - break; - - case DocumentProtocol::MESSAGE_UPDATEDOCUMENT: - id = &static_cast<const UpdateDocumentMessage&>(msg).getDocumentUpdate().getId(); - break; - - case DocumentProtocol::MESSAGE_MULTIOPERATION: - bucketId = (static_cast<const MultiOperationMessage&>(msg)).getBucketId(); - break; - - case DocumentProtocol::MESSAGE_BATCHDOCUMENTUPDATE: - bucketId = (static_cast<const BatchDocumentUpdateMessage&>(msg)).getBucketId(); - break; - - default: - LOG(error, "Message type '%d' not supported.", msg.getType()); - return; - } - if (bucketId.getRawId() == 0) { - bucketId = _factory.getBucketId(*id); - } - uint32_t recipient = getRecipient(bucketId, recipients.size()); - context.addChild(recipients[recipient]); - context.setSelectOnRetry(true); - if (_maxOOS > 0) { - context.addConsumableError(mbus::ErrorCode::SERVICE_OOS); - } -} - -void -SearchColumnPolicy::merge(mbus::RoutingContext &context) -{ - if (_maxOOS > 0) { - if (context.getNumChildren() > 1) { - std::set<uint32_t> oosReplies; - uint32_t idx = 0; - for (mbus::RoutingNodeIterator it = context.getChildIterator(); - it.isValid(); it.next()) - { - const mbus::Reply &ref = it.getReplyRef(); - if (ref.hasErrors() && DocumentProtocol::hasOnlyErrorsOfType(ref, mbus::ErrorCode::SERVICE_OOS)) { - oosReplies.insert(idx); - } - ++idx; - } - if (oosReplies.size() <= _maxOOS) { - DocumentProtocol::merge(context, oosReplies); - return; // may the rtx be with you - } - } else { - const mbus::Reply &ref = context.getChildIterator().getReplyRef(); - if (ref.hasErrors() && DocumentProtocol::hasOnlyErrorsOfType(ref, mbus::ErrorCode::SERVICE_OOS)) { - context.setReply(mbus::Reply::UP(new mbus::EmptyReply())); - return; // god help us all - } - } - } - DocumentProtocol::merge(context); -} - -uint32_t -SearchColumnPolicy::getRecipient(const document::BucketId &bucketId, uint32_t numRecipients) -{ - vespalib::LockGuard guard(_lock); - DistributionCache::iterator it = _distributions.find(numRecipients); - if (it == _distributions.end()) { - it = _distributions.insert(DistributionCache::value_type(numRecipients, vdslib::BucketDistribution(1, 16u))).first; - it->second.setNumColumns(numRecipients); - } - return it->second.getColumn(bucketId); -} - -} diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/searchcolumnpolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/searchcolumnpolicy.h deleted file mode 100644 index a17824249c4..00000000000 --- a/documentapi/src/vespa/documentapi/messagebus/policies/searchcolumnpolicy.h +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include <vespa/documentapi/common.h> -#include <vespa/messagebus/routing/iroutingpolicy.h> -#include <vespa/vdslib/bucketdistribution.h> -#include <vespa/document/bucket/bucketidfactory.h> -#include <vespa/vespalib/util/sync.h> -#include <map> - -namespace documentapi { - -/** - * This policy implements the logic to select recipients for a single search column. - * - * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a> - * @version $Id$ - */ -class SearchColumnPolicy : public mbus::IRoutingPolicy { -private: - typedef std::map<uint32_t, vdslib::BucketDistribution> DistributionCache; - - vespalib::Lock _lock; - document::BucketIdFactory _factory; - DistributionCache _distributions; - uint32_t _maxOOS; - - /** - * Returns the recipient index for the given bucket id. This updates the shared internal distribution map, so it - * needs to be synchronized. - * - * @param bucketId The bucket whose recipient to return. - * @param numRecipients The number of recipients being distributed to. - * @return The recipient to use. - */ - uint32_t getRecipient(const document::BucketId &bucketId, uint32_t numRecipients); - -public: - /** - * Constructs a new policy object for the given parameter string. The string can be null or empty, which is a - * request to not allow any bad columns. - * - * @param param The maximum number of allowed bad columns. - */ - SearchColumnPolicy(const string ¶m); - ~SearchColumnPolicy(); - - void select(mbus::RoutingContext &context) override; - void merge(mbus::RoutingContext &context) override; -}; - -} diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/searchrowpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/searchrowpolicy.cpp deleted file mode 100644 index 20f8398f1e6..00000000000 --- a/documentapi/src/vespa/documentapi/messagebus/policies/searchrowpolicy.cpp +++ /dev/null @@ -1,63 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "searchrowpolicy.h" -#include <vespa/documentapi/messagebus/documentprotocol.h> - -#include <vespa/log/log.h> -LOG_SETUP(".searchrowpolicy"); - -namespace documentapi { - -SearchRowPolicy::SearchRowPolicy(const string ¶m) : - _minOk(0) -{ - if (param.length() > 0) { - int minOk = atoi(param.c_str()); - if (minOk > 0) { - _minOk = (uint32_t)minOk; - } else { - LOG(warning, - "Ignoring a request to set the minimum number of OK replies to %d because it makes no sense. " - "This routing policy will not allow any recipient to be out of service.", minOk); - } - } -} - -SearchRowPolicy::~SearchRowPolicy() {} - -void -SearchRowPolicy::select(mbus::RoutingContext &context) -{ - std::vector<mbus::Route> recipients; - context.getMatchedRecipients(recipients); - context.addChildren(recipients); - context.setSelectOnRetry(false); - if (_minOk > 0) { - context.addConsumableError(mbus::ErrorCode::SERVICE_OOS); - } -} - -void -SearchRowPolicy::merge(mbus::RoutingContext &context) -{ - if (_minOk > 0) { - std::set<uint32_t> oosReplies; - uint32_t idx = 0; - for (mbus::RoutingNodeIterator it = context.getChildIterator(); - it.isValid(); it.next()) - { - const mbus::Reply &ref = it.getReplyRef(); - if (ref.hasErrors() && DocumentProtocol::hasOnlyErrorsOfType(ref, mbus::ErrorCode::SERVICE_OOS)) { - oosReplies.insert(idx); - } - ++idx; - } - if (context.getNumChildren() - oosReplies.size() >= _minOk) { - DocumentProtocol::merge(context, oosReplies); - return; - } - } - DocumentProtocol::merge(context); -} - -} diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/searchrowpolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/searchrowpolicy.h deleted file mode 100644 index 2ca987ba4f7..00000000000 --- a/documentapi/src/vespa/documentapi/messagebus/policies/searchrowpolicy.h +++ /dev/null @@ -1,30 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include <vespa/documentapi/common.h> -#include <vespa/messagebus/routing/iroutingpolicy.h> - -namespace documentapi { - -class SearchRowPolicy : public mbus::IRoutingPolicy { -private: - SearchRowPolicy(const SearchRowPolicy &); - SearchRowPolicy &operator=(const SearchRowPolicy &); - -public: - /** - * Creates a search row policy that wraps the underlying search group policy in case the parameter is something - * other than an empty string. - * - * @param param The number of minimum non-OOS replies that this policy requires. - */ - SearchRowPolicy(const string ¶m); - ~SearchRowPolicy(); - - void select(mbus::RoutingContext &context) override; - void merge(mbus::RoutingContext &context) override; -private: - uint32_t _minOk; // Hide OUT_OF_SERVICE as long as this number of replies are something else. -}; - -} diff --git a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp index 741083009f5..2c244c63046 100644 --- a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp @@ -6,8 +6,6 @@ #include <vespa/documentapi/messagebus/policies/externpolicy.h> #include <vespa/documentapi/messagebus/policies/localservicepolicy.h> #include <vespa/documentapi/messagebus/policies/roundrobinpolicy.h> -#include <vespa/documentapi/messagebus/policies/searchcolumnpolicy.h> -#include <vespa/documentapi/messagebus/policies/searchrowpolicy.h> #include <vespa/documentapi/messagebus/policies/subsetservicepolicy.h> #include <vespa/documentapi/messagebus/policies/storagepolicy.h> #include <vespa/documentapi/messagebus/policies/contentpolicy.h> @@ -107,18 +105,6 @@ RoutingPolicyFactories::RoundRobinPolicyFactory::createPolicy(const string ¶ } mbus::IRoutingPolicy::UP -RoutingPolicyFactories::SearchColumnPolicyFactory::createPolicy(const string ¶m) const -{ - return mbus::IRoutingPolicy::UP(new SearchColumnPolicy(param)); -} - -mbus::IRoutingPolicy::UP -RoutingPolicyFactories::SearchRowPolicyFactory::createPolicy(const string ¶m) const -{ - return mbus::IRoutingPolicy::UP(new SearchRowPolicy(param)); -} - -mbus::IRoutingPolicy::UP RoutingPolicyFactories::SubsetServicePolicyFactory::createPolicy(const string ¶m) const { return mbus::IRoutingPolicy::UP(new SubsetServicePolicy(param)); diff --git a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h index 95a65008a9b..003768aedda 100644 --- a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h +++ b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h @@ -52,14 +52,6 @@ public: public: mbus::IRoutingPolicy::UP createPolicy(const string ¶m) const override; }; - class SearchColumnPolicyFactory : public IRoutingPolicyFactory { - public: - mbus::IRoutingPolicy::UP createPolicy(const string ¶m) const override; - }; - class SearchRowPolicyFactory : public IRoutingPolicyFactory { - public: - mbus::IRoutingPolicy::UP createPolicy(const string ¶m) const override; - }; class SubsetServicePolicyFactory : public IRoutingPolicyFactory { public: mbus::IRoutingPolicy::UP createPolicy(const string ¶m) const override; diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/StatusCodes.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/StatusCodes.java index 91b269081bf..adbe07df8d0 100644 --- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/StatusCodes.java +++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/StatusCodes.java @@ -34,8 +34,6 @@ public class StatusCodes { return Response.Status.BAD_REQUEST; case ErrorCode.NO_SERVICES_FOR_ROUTE: return Response.Status.NOT_FOUND; - case ErrorCode.SERVICE_OOS: - return Response.Status.SERVICE_UNAVAILABLE; case ErrorCode.ENCODE_ERROR: return Response.Status.BAD_REQUEST; case ErrorCode.NETWORK_ERROR: diff --git a/messagebus/src/main/java/com/yahoo/messagebus/ErrorCode.java b/messagebus/src/main/java/com/yahoo/messagebus/ErrorCode.java index 7c645ec4e1b..ee8a4cc2b55 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/ErrorCode.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/ErrorCode.java @@ -50,7 +50,9 @@ public final class ErrorCode { /** No services found for the message route. */ public static final int NO_SERVICES_FOR_ROUTE = FATAL_ERROR + 3; - /** The selected service was out of service. */ + /** The selected service was out of service. + */ + @Deprecated // Unused and will be removed public static final int SERVICE_OOS = FATAL_ERROR + 4; /** An error occured while encoding the message. */ diff --git a/messagebus/src/vespa/messagebus/errorcode.cpp b/messagebus/src/vespa/messagebus/errorcode.cpp index 67330580f7a..af9089b987e 100644 --- a/messagebus/src/vespa/messagebus/errorcode.cpp +++ b/messagebus/src/vespa/messagebus/errorcode.cpp @@ -30,7 +30,6 @@ ErrorCode::getName(uint32_t errorCode) case SEND_QUEUE_CLOSED : return "SEND_QUEUE_CLOSED"; case SEND_QUEUE_FULL : return "SEND_QUEUE_FULL"; case SEQUENCE_ERROR : return "SEQUENCE_ERROR"; - case SERVICE_OOS : return "SERVICE_OOS"; case SESSION_BUSY : return "SESSION_BUSY"; case TIMEOUT : return "TIMEOUT"; case TRANSIENT_ERROR : return "TRANSIENT_ERROR"; diff --git a/messagebus/src/vespa/messagebus/errorcode.h b/messagebus/src/vespa/messagebus/errorcode.h index 2cdc6952a67..f170e29ca8f 100644 --- a/messagebus/src/vespa/messagebus/errorcode.h +++ b/messagebus/src/vespa/messagebus/errorcode.h @@ -72,7 +72,8 @@ public: NO_SERVICES_FOR_ROUTE = FATAL_ERROR + 3, // The selected service was out of service. - SERVICE_OOS = FATAL_ERROR + 4, + // Unused..... + // SERVICE_OOS = FATAL_ERROR + 4, // An error occured while encoding the message. ENCODE_ERROR = FATAL_ERROR + 5, diff --git a/messagebus_test/src/tests/errorcodes/DumpCodes.java b/messagebus_test/src/tests/errorcodes/DumpCodes.java index 01003876fa3..96c7fc57a3e 100644 --- a/messagebus_test/src/tests/errorcodes/DumpCodes.java +++ b/messagebus_test/src/tests/errorcodes/DumpCodes.java @@ -24,7 +24,6 @@ public class DumpCodes { dump("SEND_QUEUE_CLOSED", ErrorCode.SEND_QUEUE_CLOSED); dump("ILLEGAL_ROUTE", ErrorCode.ILLEGAL_ROUTE); dump("NO_SERVICES_FOR_ROUTE", ErrorCode.NO_SERVICES_FOR_ROUTE); - dump("SERVICE_OOS", ErrorCode.SERVICE_OOS); dump("ENCODE_ERROR", ErrorCode.ENCODE_ERROR); dump("NETWORK_ERROR", ErrorCode.NETWORK_ERROR); dump("UNKNOWN_PROTOCOL", ErrorCode.UNKNOWN_PROTOCOL); diff --git a/messagebus_test/src/tests/errorcodes/dumpcodes.cpp b/messagebus_test/src/tests/errorcodes/dumpcodes.cpp index fdc8892b7c7..ecc5c9bc42c 100644 --- a/messagebus_test/src/tests/errorcodes/dumpcodes.cpp +++ b/messagebus_test/src/tests/errorcodes/dumpcodes.cpp @@ -36,7 +36,6 @@ App::Main() dump("SEND_QUEUE_CLOSED", ErrorCode::SEND_QUEUE_CLOSED); dump("ILLEGAL_ROUTE", ErrorCode::ILLEGAL_ROUTE); dump("NO_SERVICES_FOR_ROUTE", ErrorCode::NO_SERVICES_FOR_ROUTE); - dump("SERVICE_OOS", ErrorCode::SERVICE_OOS); dump("ENCODE_ERROR", ErrorCode::ENCODE_ERROR); dump("NETWORK_ERROR", ErrorCode::NETWORK_ERROR); dump("UNKNOWN_PROTOCOL", ErrorCode::UNKNOWN_PROTOCOL); diff --git a/messagebus_test/src/tests/errorcodes/ref-dump.txt b/messagebus_test/src/tests/errorcodes/ref-dump.txt index b8038816897..70a10de7b82 100644 --- a/messagebus_test/src/tests/errorcodes/ref-dump.txt +++ b/messagebus_test/src/tests/errorcodes/ref-dump.txt @@ -10,7 +10,6 @@ first unused TRANSIENT_ERROR => 100008 => "UNKNOWN(100008)" SEND_QUEUE_CLOSED => 200001 => "SEND_QUEUE_CLOSED" ILLEGAL_ROUTE => 200002 => "ILLEGAL_ROUTE" NO_SERVICES_FOR_ROUTE => 200003 => "NO_SERVICES_FOR_ROUTE" -SERVICE_OOS => 200004 => "SERVICE_OOS" ENCODE_ERROR => 200005 => "ENCODE_ERROR" NETWORK_ERROR => 200006 => "NETWORK_ERROR" UNKNOWN_PROTOCOL => 200007 => "UNKNOWN_PROTOCOL" diff --git a/searchcore/src/tests/proton/docsummary/docsummary.cpp b/searchcore/src/tests/proton/docsummary/docsummary.cpp index 3b199d266a8..bce3fb7267c 100644 --- a/searchcore/src/tests/proton/docsummary/docsummary.cpp +++ b/searchcore/src/tests/proton/docsummary/docsummary.cpp @@ -21,9 +21,8 @@ #include <vespa/searchcore/proton/server/memoryconfigstore.h> #include <vespa/searchcore/proton/server/searchview.h> #include <vespa/searchcore/proton/server/summaryadapter.h> -#include <vespa/searchlib/common/idestructorcallback.h> +#include <vespa/searchlib/common/gatecallback.h> #include <vespa/searchlib/common/transport.h> -#include <vespa/searchlib/docstore/logdocumentstore.h> #include <vespa/searchlib/engine/docsumapi.h> #include <vespa/searchlib/index/docbuilder.h> #include <vespa/searchlib/index/dummyfileheadercontext.h> @@ -254,7 +253,7 @@ public: op.setSerialNum(serialNum); op.setDbDocumentId(dbdId); op.setPrevDbDocumentId(prevDbdId); - _ddb->getFeedHandler().storeOperation(op); + _ddb->getFeedHandler().storeOperation(op, std::make_shared<search::IgnoreCallback>()); SearchView *sv(dynamic_cast<SearchView *>(_ddb->getReadySubDB()->getSearchView().get())); if (sv != NULL) { // cf. FeedView::putAttributes() diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp index 823c31dd1c2..b8ffc41d3cd 100644 --- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp +++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp @@ -370,7 +370,7 @@ struct MyTlsWriter : TlsWriter { bool erase_return; MyTlsWriter() : store_count(0), erase_count(0), erase_return(true) {} - void storeOperation(const FeedOperation &) override { ++store_count; } + void storeOperation(const FeedOperation &, DoneCallback) override { ++store_count; } bool erase(SerialNum) override { ++erase_count; return erase_return; } SerialNum sync(SerialNum syncTo) override { diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp index 55f71da9687..56bd99c90f6 100644 --- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp +++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp @@ -138,7 +138,7 @@ struct MyStorer : public IOperationStorer : _moveCnt(0), _compactCnt(0) {} - virtual void storeOperation(FeedOperation &op) override { + void storeOperation(const FeedOperation &op, DoneCallback) override { if (op.getType() == FeedOperation::MOVE) { ++ _moveCnt; } else if (op.getType() == FeedOperation::COMPACT_LID_SPACE) { diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp index 559dbb240a8..f20ad01bcf6 100644 --- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp +++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp @@ -26,7 +26,7 @@ #include <vespa/searchcore/proton/test/test.h> #include <vespa/searchlib/attribute/attributecontext.h> #include <vespa/searchlib/attribute/attributeguard.h> -#include <vespa/searchlib/common/idestructorcallback.h> +#include <vespa/searchlib/common/gatecallback.h> #include <vespa/searchlib/common/idocumentmetastore.h> #include <vespa/searchlib/index/docbuilder.h> #include <vespa/vespalib/data/slime/slime.h> @@ -232,7 +232,7 @@ public: } // Implements IOperationStorer - virtual void storeOperation(FeedOperation &op) override; + virtual void storeOperation(const FeedOperation &op, DoneCallback) override; uint32_t getHeartBeats() { return _heartBeats; @@ -781,7 +781,6 @@ MyFeedHandler::isExecutorThread() void MyFeedHandler::handleMove(MoveOperation &op, IDestructorCallback::SP moveDoneCtx) { - (void) moveDoneCtx; assert(isExecutorThread()); assert(op.getValidPrevDbdId()); _subDBs[op.getSubDbId()]->prepareMove(op); @@ -792,7 +791,7 @@ MyFeedHandler::handleMove(MoveOperation &op, IDestructorCallback::SP moveDoneCtx assert(op.getPrevSubDbId() != 1u); assert(op.getSubDbId() < _subDBs.size()); assert(op.getPrevSubDbId() < _subDBs.size()); - storeOperation(op); + storeOperation(op, std::move(moveDoneCtx)); _subDBs[op.getSubDbId()]->handleMove(op); _subDBs[op.getPrevSubDbId()]->handleMove(op); } @@ -803,7 +802,7 @@ MyFeedHandler::performPruneRemovedDocuments(PruneRemovedDocumentsOperation &op) { assert(isExecutorThread()); if (op.getLidsToRemove()->getNumLids() != 0u) { - storeOperation(op); + storeOperation(op, std::make_shared<search::IgnoreCallback>()); // magic number. _subDBs[1u]->handlePruneRemovedDocuments(op); } @@ -826,9 +825,9 @@ MyFeedHandler::setSubDBs(const std::vector<MyDocumentSubDB *> &subDBs) void -MyFeedHandler::storeOperation(FeedOperation &op) +MyFeedHandler::storeOperation(const FeedOperation &op, DoneCallback) { - op.setSerialNum(incSerialNum()); + const_cast<FeedOperation &>(op).setSerialNum(incSerialNum()); } @@ -1011,22 +1010,16 @@ MaintenanceControllerFixture::performForwardMaintenanceConfig() void -MaintenanceControllerFixture::insertDocs(const test::UserDocuments &docs, - MyDocumentSubDB &subDb) +MaintenanceControllerFixture::insertDocs(const test::UserDocuments &docs, MyDocumentSubDB &subDb) { - for (test::UserDocuments::Iterator itr = docs.begin(); - itr != docs.end(); - ++itr) { + for (auto itr = docs.begin(); itr != docs.end(); ++itr) { const test::BucketDocuments &bucketDocs = itr->second; for (size_t i = 0; i < bucketDocs.getDocs().size(); ++i) { const test::Document &testDoc = bucketDocs.getDocs()[i]; - PutOperation op(testDoc.getBucket(), - testDoc.getTimestamp(), - testDoc.getDoc()); - op.setDbDocumentId(DbDocumentId(subDb.getSubDBId(), - testDoc.getLid())); - _fh.storeOperation(op); + PutOperation op(testDoc.getBucket(), testDoc.getTimestamp(), testDoc.getDoc()); + op.setDbDocumentId(DbDocumentId(subDb.getSubDBId(), testDoc.getLid())); + _fh.storeOperation(op, std::make_shared<search::IgnoreCallback>()); subDb.handlePut(op); } } @@ -1038,18 +1031,13 @@ MaintenanceControllerFixture::removeDocs(const test::UserDocuments &docs, Timestamp timestamp) { - for (test::UserDocuments::Iterator itr = docs.begin(); - itr != docs.end(); - ++itr) { + for (auto itr = docs.begin(); itr != docs.end(); ++itr) { const test::BucketDocuments &bucketDocs = itr->second; for (size_t i = 0; i < bucketDocs.getDocs().size(); ++i) { const test::Document &testDoc = bucketDocs.getDocs()[i]; - RemoveOperation op(testDoc.getBucket(), - timestamp, - testDoc.getDoc()->getId()); - op.setDbDocumentId(DbDocumentId(_removed.getSubDBId(), - testDoc.getLid())); - _fh.storeOperation(op); + RemoveOperation op(testDoc.getBucket(), timestamp, testDoc.getDoc()->getId()); + op.setDbDocumentId(DbDocumentId(_removed.getSubDBId(), testDoc.getLid())); + _fh.storeOperation(op, std::make_shared<search::IgnoreCallback>()); _removed.handleRemove(op); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index 4198803d1fe..5babacfc4b6 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -307,7 +307,7 @@ DocumentDB::enterReprocessState() if (!runner.empty()) { runner.run(); NoopOperation op; - _feedHandler.storeOperation(op); + _feedHandler.storeOperationSync(op); sync(op.getSerialNum()); _subDBs.pruneRemovedFields(op.getSerialNum()); } @@ -397,15 +397,14 @@ DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum _config_store->saveConfig(*configSnapshot, serialNum); // save entry in transaction log NewConfigOperation op(serialNum, *_config_store); - _feedHandler.storeOperation(op); + _feedHandler.storeOperationSync(op); sync(op.getSerialNum()); } bool hasVisibilityDelayChanged = false; { bool elidedConfigSave = equalReplayConfig && tlsReplayDone; // Flush changes to attributes and memory index, cf. visibilityDelay - _feedView.get()->forceCommit(elidedConfigSave ? serialNum : - serialNum - 1); + _feedView.get()->forceCommit(elidedConfigSave ? serialNum : serialNum - 1); _writeService.sync(); fastos::TimeStamp visibilityDelay = configSnapshot->getMaintenanceConfigSP()->getVisibilityDelay(); hasVisibilityDelayChanged = (visibilityDelay != _visibility.getVisibilityDelay()); @@ -585,7 +584,7 @@ DocumentDB::saveInitialConfig(const DocumentDBConfig &configSnapshot) // pruned at once anyway. // save noop entry in transaction log NoopOperation op; - _feedHandler.storeOperation(op); + _feedHandler.storeOperationSync(op); sync(op.getSerialNum()); // Wipe everything in transaction log before initial config. try { @@ -609,7 +608,7 @@ DocumentDB::resumeSaveConfig() SerialNum confSerial = _feedHandler.incSerialNum(); // resume operation, i.e. save config entry in transaction log NewConfigOperation op(confSerial, *_config_store); - _feedHandler.storeOperation(op); + _feedHandler.storeOperationSync(op); sync(op.getSerialNum()); } @@ -776,7 +775,7 @@ DocumentDB::enterRedoReprocessState() runner.run(); _subDBs.onReprocessDone(_feedHandler.getSerialNum()); NoopOperation op; - _feedHandler.storeOperation(op); + _feedHandler.storeOperationSync(op); sync(op.getSerialNum()); _subDBs.pruneRemovedFields(op.getSerialNum()); } diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index b01ba43cb49..bc1715e3c04 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -12,7 +12,7 @@ #include <vespa/searchcore/proton/persistenceengine/i_resource_write_filter.h> #include <vespa/searchcore/proton/persistenceengine/transport_latch.h> #include <vespa/searchcorespi/index/ithreadingservice.h> -#include <vespa/searchlib/common/idestructorcallback.h> +#include <vespa/searchlib/common/gatecallback.h> #include <vespa/vespalib/util/exceptions.h> #include <unistd.h> @@ -34,6 +34,8 @@ using vespalib::makeLambdaTask; using vespalib::make_string; using vespalib::MonitorGuard; using vespalib::LockGuard; +using std::make_unique; +using std::make_shared; namespace proton { @@ -46,8 +48,8 @@ ignoreOperation(const DocumentOperation &op) { } // namespace -void FeedHandler::TlsMgrWriter::storeOperation(const FeedOperation &op) { - TlcProxy(_tls_mgr.getDomainName(), *_tlsDirectWriter).storeOperation(op); +void FeedHandler::TlsMgrWriter::storeOperation(const FeedOperation &op, DoneCallback onDone) { + TlcProxy(_tls_mgr.getDomainName(), *_tlsDirectWriter).storeOperation(op, std::move(onDone)); } bool FeedHandler::TlsMgrWriter::erase(SerialNum oldest_to_keep) { return _tls_mgr.getSession()->erase(oldest_to_keep); @@ -72,7 +74,6 @@ FeedHandler::TlsMgrWriter::sync(SerialNum syncTo) LOG(spam, "Tls sync incomplete, reached %" PRIu64 ", retrying", syncedTo); } throw IllegalStateException(make_string("Failed to sync TLS to token %" PRIu64 ".", syncTo)); - return 0; } void @@ -90,13 +91,13 @@ void FeedHandler::performPut(FeedToken token, PutOperation &op) { LOG(debug, "performPut(): ignoreOperation: docId(%s), timestamp(%" PRIu64 "), prevTimestamp(%" PRIu64 ")", op.getDocument()->getId().toString().c_str(), (uint64_t)op.getTimestamp(), (uint64_t)op.getPrevTimestamp()); if (token) { - token->setResult(std::make_unique<Result>(), false); + token->setResult(make_unique<Result>(), false); } return; } - storeOperation(op); + storeOperation(op, token); if (token) { - token->setResult(std::make_unique<Result>(), false); + token->setResult(make_unique<Result>(), false); } _activeFeedView->handlePut(std::move(token), op); } @@ -112,7 +113,7 @@ FeedHandler::performUpdate(FeedToken token, UpdateOperation &op) createNonExistingDocument(std::move(token), op); } else { if (token) { - token->setResult(std::make_unique<UpdateResult>(Timestamp(0)), false); + token->setResult(make_unique<UpdateResult>(Timestamp(0)), false); } } } @@ -121,9 +122,9 @@ FeedHandler::performUpdate(FeedToken token, UpdateOperation &op) void FeedHandler::performInternalUpdate(FeedToken token, UpdateOperation &op) { - storeOperation(op); + storeOperation(op, token); if (token) { - token->setResult(ResultUP(new UpdateResult(op.getPrevTimestamp())), true); + token->setResult(make_unique<UpdateResult>(op.getPrevTimestamp()), true); } _activeFeedView->handleUpdate(std::move(token), op); } @@ -132,14 +133,14 @@ FeedHandler::performInternalUpdate(FeedToken token, UpdateOperation &op) void FeedHandler::createNonExistingDocument(FeedToken token, const UpdateOperation &op) { - Document::SP doc(new Document(op.getUpdate()->getType(), op.getUpdate()->getId())); + auto doc = make_shared<Document>(op.getUpdate()->getType(), op.getUpdate()->getId()); doc->setRepo(*_activeFeedView->getDocumentTypeRepo()); op.getUpdate()->applyTo(*doc); PutOperation putOp(op.getBucketId(), op.getTimestamp(), doc); _activeFeedView->preparePut(putOp); - storeOperation(putOp); + storeOperation(putOp, token); if (token) { - token->setResult(ResultUP(new UpdateResult(putOp.getTimestamp())), true); + token->setResult(make_unique<UpdateResult>(putOp.getTimestamp()), true); } TransportLatch latch(1); _activeFeedView->handlePut(feedtoken::make(latch), putOp); @@ -153,29 +154,29 @@ void FeedHandler::performRemove(FeedToken token, RemoveOperation &op) { LOG(debug, "performRemove(): ignoreOperation: docId(%s), timestamp(%" PRIu64 "), prevTimestamp(%" PRIu64 ")", op.getDocumentId().toString().c_str(), (uint64_t)op.getTimestamp(), (uint64_t)op.getPrevTimestamp()); if (token) { - token->setResult(ResultUP(new RemoveResult(false)), false); + token->setResult(make_unique<RemoveResult>(false), false); } return; } if (op.getPrevDbDocumentId().valid()) { assert(op.getValidNewOrPrevDbdId()); assert(op.notMovingLidInSameSubDb()); - storeOperation(op); + storeOperation(op, token); if (token) { bool documentWasFound = !op.getPrevMarkedAsRemoved(); - token->setResult(ResultUP(new RemoveResult(documentWasFound)), documentWasFound); + token->setResult(make_unique<RemoveResult>(documentWasFound), documentWasFound); } _activeFeedView->handleRemove(std::move(token), op); } else if (op.hasDocType()) { assert(op.getDocType() == _docTypeName.getName()); - storeOperation(op); + storeOperation(op, token); if (token) { - token->setResult(ResultUP(new RemoveResult(false)), false); + token->setResult(make_unique<RemoveResult>(false), false); } _activeFeedView->handleRemove(std::move(token), op); } else { if (token) { - token->setResult(ResultUP(new RemoveResult(false)), false); + token->setResult(make_unique<RemoveResult>(false), false); } } } @@ -186,20 +187,16 @@ FeedHandler::performGarbageCollect(FeedToken token) (void) token; } - void FeedHandler::performCreateBucket(FeedToken token, CreateBucketOperation &op) { - (void) token; - storeOperation(op); + storeOperation(op, token); _bucketDBHandler->handleCreateBucket(op.getBucketId()); } - void FeedHandler::performDeleteBucket(FeedToken token, DeleteBucketOperation &op) { - (void) token; _activeFeedView->prepareDeleteBucket(op); - storeOperation(op); + storeOperation(op, token); // Delete documents in bucket _activeFeedView->handleDeleteBucket(op); // Delete bucket itself, should no longer have documents. @@ -207,21 +204,16 @@ void FeedHandler::performDeleteBucket(FeedToken token, DeleteBucketOperation &op } - void FeedHandler::performSplit(FeedToken token, SplitBucketOperation &op) { - (void) token; - storeOperation(op); + storeOperation(op, token); _bucketDBHandler->handleSplit(op.getSerialNum(), op.getSource(), op.getTarget1(), op.getTarget2()); } - void FeedHandler::performJoin(FeedToken token, JoinBucketsOperation &op) { - (void) token; - storeOperation(op); + storeOperation(op, token); _bucketDBHandler->handleJoin(op.getSerialNum(), op.getSource1(), op.getSource2(), op.getTarget()); } - void FeedHandler::performSync() { @@ -342,7 +334,7 @@ FeedHandler::FeedHandler(IThreadingService &writeService, _prunedSerialNum(0), _delayedPrune(false), _feedLock(), - _feedState(std::make_shared<InitState>(getDocTypeName())), + _feedState(make_shared<InitState>(getDocTypeName())), _activeFeedView(nullptr), _bucketDBHandler(nullptr), _syncLock(), @@ -381,7 +373,7 @@ FeedHandler::replayTransactionLog(SerialNum flushedIndexMgrSerial, SerialNum flu (void) newestFlushedSerial; assert(_activeFeedView); assert(_bucketDBHandler); - FeedState::SP state = std::make_shared<ReplayTransactionLogState> + FeedState::SP state = make_shared<ReplayTransactionLogState> (getDocTypeName(), _activeFeedView, *_bucketDBHandler, _replayConfig, config_store); changeFeedState(state); // Resurrected attribute vector might cause oldestFlushedSerial to @@ -404,7 +396,7 @@ FeedHandler::flushDone(SerialNum flushedSerial) } void FeedHandler::changeToNormalFeedState() { - changeFeedState(FeedState::SP(new NormalState(*this))); + changeFeedState(make_shared<NormalState>(*this)); } bool @@ -412,18 +404,28 @@ FeedHandler::isDoingReplay() const { return _tlsMgr.isDoingReplay(); } -bool FeedHandler::getTransactionLogReplayDone() const { +bool +FeedHandler::getTransactionLogReplayDone() const { return _tlsMgr.getReplayDone(); } -void FeedHandler::storeOperation(FeedOperation &op) { +void +FeedHandler::storeOperation(const FeedOperation &op, TlsWriter::DoneCallback onDone) { if (!op.getSerialNum()) { - op.setSerialNum(incSerialNum()); + const_cast<FeedOperation &>(op).setSerialNum(incSerialNum()); } - _tlsWriter.storeOperation(op); + _tlsWriter.storeOperation(op, std::move(onDone)); } -void FeedHandler::tlsPrune(SerialNum oldest_to_keep) { +void +FeedHandler::storeOperationSync(const FeedOperation &op) { + vespalib::Gate gate; + storeOperation(op, make_shared<search::GateCallback>(gate)); + gate.await(); +} + +void +FeedHandler::tlsPrune(SerialNum oldest_to_keep) { if (!_tlsWriter.erase(oldest_to_keep)) { throw IllegalStateException(make_string("Failed to prune TLS to token %" PRIu64 ".", oldest_to_keep)); } @@ -445,7 +447,7 @@ void feedOperationRejected(FeedToken & token, const vespalib::string &opType, co if (token) { auto message = make_string("%s operation rejected for document '%s' of type '%s': '%s'", opType.c_str(), docId.c_str(), docTypeName.toString().c_str(), rejectMessage.c_str()); - token->setResult(ResultUP(new ResultType(Result::RESOURCE_EXHAUSTED, message)), false); + token->setResult(make_unique<ResultType>(Result::RESOURCE_EXHAUSTED, message), false); token->fail(); } } @@ -533,7 +535,7 @@ FeedHandler::handleMove(MoveOperation &op, std::shared_ptr<search::IDestructorCa assert(op.getValidDbdId()); assert(op.getValidPrevDbdId()); assert(op.getSubDbId() != op.getPrevSubDbId()); - storeOperation(op); + storeOperation(op, moveDoneCtx); _activeFeedView->handleMove(op, std::move(moveDoneCtx)); } @@ -558,7 +560,7 @@ FeedHandler::receive(const Packet &packet) // (by fnet thread). Called via DocumentDB::recoverPacket() when // recovering from another node. FeedState::SP state = getFeedState(); - PacketWrapper::SP wrap(new PacketWrapper(packet, _tlsReplayProgress.get())); + auto wrap = make_shared<PacketWrapper>(packet, _tlsReplayProgress.get()); state->receive(wrap, _writeService.master()); wrap->gate.await(); return wrap->result; @@ -577,7 +579,7 @@ performPruneRemovedDocuments(PruneRemovedDocumentsOperation &pruneOp) { const LidVectorContext::SP lids_to_remove = pruneOp.getLidsToRemove(); if (lids_to_remove && lids_to_remove->getNumLids() != 0) { - storeOperation(pruneOp); + storeOperationSync(pruneOp); _activeFeedView->handlePruneRemovedDocuments(pruneOp); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h index d717346883a..8c28fcdc1ea 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h @@ -65,7 +65,7 @@ private: _tls_mgr(tls_mgr), _tlsDirectWriter(tlsDirectWriter) { } - void storeOperation(const FeedOperation &op) override; + void storeOperation(const FeedOperation &op, DoneCallback onDone) override; bool erase(SerialNum oldest_to_keep) override; SerialNum sync(SerialNum syncTo) override; }; @@ -234,7 +234,8 @@ public: void eof() override; void performPruneRemovedDocuments(PruneRemovedDocumentsOperation &pruneOp) override; void syncTls(SerialNum syncTo); - void storeOperation(FeedOperation &op) override; + void storeOperation(const FeedOperation &op, DoneCallback onDone) override; + void storeOperationSync(const FeedOperation & op); void considerDelayedPrune(); }; diff --git a/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h b/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h index 4e5958cd9e2..760250844a8 100644 --- a/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h +++ b/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h @@ -2,6 +2,8 @@ #pragma once +#include <vespa/searchlib/transactionlog/common.h> + namespace proton { class FeedOperation; @@ -11,12 +13,13 @@ class FeedOperation; */ struct IOperationStorer { - virtual ~IOperationStorer() {} + using DoneCallback = search::transactionlog::Writer::DoneCallback; + virtual ~IOperationStorer() = default; /** * Assign serial number to (if not set) and store the given operation. */ - virtual void storeOperation(FeedOperation &op) = 0; + virtual void storeOperation(const FeedOperation &op, DoneCallback onDone) = 0; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp index d6c1a032cea..2ae8d826ebc 100644 --- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp @@ -5,7 +5,8 @@ #include "imaintenancejobrunner.h" #include "lid_space_compaction_job.h" #include <vespa/searchcore/proton/common/eventlogger.h> -#include <vespa/searchlib/common/idestructorcallback.h> +#include <vespa/searchlib/common/gatecallback.h> +#include <vespa/vespalib/util/sync.h> #include <cassert> #include <vespa/log/log.h> @@ -55,8 +56,9 @@ LidSpaceCompactionJob::scanDocuments(const LidUsageStats &stats) return true; } else { MoveOperation::UP op = _handler.createMoveOperation(document, stats.getLowestFreeLid()); - _opStorer.storeOperation(*op); - _handler.handleMove(*op, _moveOpsLimiter->beginOperation()); + search::IDestructorCallback::SP context = _moveOpsLimiter->beginOperation(); + _opStorer.storeOperation(*op, context); + _handler.handleMove(*op, std::move(context)); if (isBlocked(BlockedReason::OUTSTANDING_OPS)) { return true; } @@ -79,7 +81,9 @@ LidSpaceCompactionJob::compactLidSpace(const LidUsageStats &stats) { uint32_t wantedLidLimit = stats.getHighestUsedLid() + 1; CompactLidSpaceOperation op(_handler.getSubDbId(), wantedLidLimit); - _opStorer.storeOperation(op); + vespalib::Gate gate; + _opStorer.storeOperation(op, std::make_shared<search::GateCallback>(gate)); + gate.await(); _handler.handleCompactLidSpace(op); EventLogger::lidSpaceCompactionComplete(_handler.getName(), wantedLidLimit); _shouldCompactLidSpace = false; diff --git a/searchcore/src/vespa/searchcore/proton/server/tlcproxy.cpp b/searchcore/src/vespa/searchcore/proton/server/tlcproxy.cpp index 215650b6664..bfc59dee35e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/tlcproxy.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/tlcproxy.cpp @@ -11,24 +11,24 @@ using search::transactionlog::Packet; namespace proton { -void TlcProxy::commit(search::SerialNum serialNum, search::transactionlog::Type type, const vespalib::nbostream &buf) +void TlcProxy::commit(search::SerialNum serialNum, search::transactionlog::Type type, + const vespalib::nbostream &buf, DoneCallback onDone) { Packet::Entry entry(serialNum, type, vespalib::ConstBufferRef(buf.c_str(), buf.size())); Packet packet; packet.add(entry); packet.close(); - _tlsDirectWriter.commit(_domain, packet); - + _tlsDirectWriter.commit(_domain, packet, std::move(onDone)); } void -TlcProxy::storeOperation(const FeedOperation &op) +TlcProxy::storeOperation(const FeedOperation &op, DoneCallback onDone) { nbostream stream; op.serialize(stream); LOG(debug, "storeOperation(): serialNum(%" PRIu64 "), type(%u), size(%zu)", op.getSerialNum(), (uint32_t)op.getType(), stream.size()); - commit(op.getSerialNum(), (uint32_t)op.getType(), stream); + commit(op.getSerialNum(), (uint32_t)op.getType(), stream, std::move(onDone)); } } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/tlcproxy.h b/searchcore/src/vespa/searchcore/proton/server/tlcproxy.h index 8e4feb2f354..2dc6501731e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/tlcproxy.h +++ b/searchcore/src/vespa/searchcore/proton/server/tlcproxy.h @@ -8,18 +8,20 @@ namespace proton { class FeedOperation; class TlcProxy { - vespalib::string _domain; - search::transactionlog::Writer & _tlsDirectWriter; + using DoneCallback = search::transactionlog::Writer::DoneCallback; + using Writer = search::transactionlog::Writer; + vespalib::string _domain; + Writer & _tlsDirectWriter; - void commit(search::SerialNum serialNum, search::transactionlog::Type type, const vespalib::nbostream &buf); + void commit(search::SerialNum serialNum, search::transactionlog::Type type, + const vespalib::nbostream &buf, DoneCallback onDone); public: typedef std::unique_ptr<TlcProxy> UP; - TlcProxy(const vespalib::string & domain, search::transactionlog::Writer & writer) + TlcProxy(const vespalib::string & domain, Writer & writer) : _domain(domain), _tlsDirectWriter(writer) {} - void storeOperation(const FeedOperation &op); + void storeOperation(const FeedOperation &op, DoneCallback onDone); }; } // namespace proton - diff --git a/searchcore/src/vespa/searchcore/proton/server/tlswriter.h b/searchcore/src/vespa/searchcore/proton/server/tlswriter.h index 0956c0ae011..5d51580c0ad 100644 --- a/searchcore/src/vespa/searchcore/proton/server/tlswriter.h +++ b/searchcore/src/vespa/searchcore/proton/server/tlswriter.h @@ -2,19 +2,17 @@ #pragma once +#include "i_operation_storer.h" #include <vespa/searchlib/common/serialnum.h> namespace proton { -class FeedOperation; - /** * Interface for writing to the TransactionLogServer. */ -struct TlsWriter { - virtual ~TlsWriter() {} +struct TlsWriter : public IOperationStorer { + virtual ~TlsWriter() = default; - virtual void storeOperation(const FeedOperation &op) = 0; virtual bool erase(search::SerialNum oldest_to_keep) = 0; virtual search::SerialNum sync(search::SerialNum syncTo) = 0; }; diff --git a/searchlib/src/vespa/searchlib/common/CMakeLists.txt b/searchlib/src/vespa/searchlib/common/CMakeLists.txt index b1f71303449..f9db738528c 100644 --- a/searchlib/src/vespa/searchlib/common/CMakeLists.txt +++ b/searchlib/src/vespa/searchlib/common/CMakeLists.txt @@ -13,6 +13,7 @@ vespa_add_library(searchlib_common OBJECT featureset.cpp fileheadercontext.cpp foregroundtaskexecutor.cpp + gatecallback.cpp growablebitvector.cpp indexmetainfo.cpp location.cpp diff --git a/searchlib/src/vespa/searchlib/common/gatecallback.cpp b/searchlib/src/vespa/searchlib/common/gatecallback.cpp new file mode 100644 index 00000000000..a853909be71 --- /dev/null +++ b/searchlib/src/vespa/searchlib/common/gatecallback.cpp @@ -0,0 +1,12 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "gatecallback.h" +#include <vespa/vespalib/util/sync.h> + +namespace search { + +GateCallback::~GateCallback() { + _gate.countDown(); +} + +} diff --git a/searchlib/src/vespa/searchlib/common/gatecallback.h b/searchlib/src/vespa/searchlib/common/gatecallback.h new file mode 100644 index 00000000000..1e85d796089 --- /dev/null +++ b/searchlib/src/vespa/searchlib/common/gatecallback.h @@ -0,0 +1,24 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "idestructorcallback.h" + +namespace vespalib { class Gate; } + +namespace search { + +class GateCallback : public IDestructorCallback { +public: + GateCallback(vespalib::Gate & gate) : _gate(gate) {} + ~GateCallback() override; +private: + vespalib::Gate & _gate; +}; + +class IgnoreCallback : public IDestructorCallback { +public: + IgnoreCallback() { } + ~IgnoreCallback() override = default; +}; + +} // namespace search diff --git a/searchlib/src/vespa/searchlib/common/idestructorcallback.h b/searchlib/src/vespa/searchlib/common/idestructorcallback.h index 4c42f68f0e4..77adba7a4cc 100644 --- a/searchlib/src/vespa/searchlib/common/idestructorcallback.h +++ b/searchlib/src/vespa/searchlib/common/idestructorcallback.h @@ -3,8 +3,7 @@ #include <memory> -namespace search -{ +namespace search { /** * Interface for class that performs a callback when instance is @@ -17,7 +16,7 @@ class IDestructorCallback { public: using SP = std::shared_ptr<IDestructorCallback>; - virtual ~IDestructorCallback() { } + virtual ~IDestructorCallback() = default; }; } // namespace search diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.h b/searchlib/src/vespa/searchlib/transactionlog/common.h index 65ef8f363c0..db8b9727daa 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/common.h +++ b/searchlib/src/vespa/searchlib/transactionlog/common.h @@ -2,6 +2,7 @@ #pragma once #include <vespa/searchlib/common/serialnum.h> +#include <vespa/searchlib/common/idestructorcallback.h> #include <vespa/vespalib/objects/nbostream.h> #include <vespa/vespalib/util/buffer.h> @@ -90,8 +91,9 @@ int makeDirectory(const char * dir); class Writer { public: + using DoneCallback = std::shared_ptr<IDestructorCallback>; virtual ~Writer() { } - virtual void commit(const vespalib::string & domainName, const Packet & packet) = 0; + virtual void commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done) = 0; }; } diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp index ca17457bdb9..e793aafd38f 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp @@ -453,8 +453,9 @@ void TransLogServer::domainStatus(FRT_RPCRequest *req) } } -void TransLogServer::commit(const vespalib::string & domainName, const Packet & packet) +void TransLogServer::commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done) { + (void) done; Domain::SP domain(findDomain(domainName)); if (domain) { domain->commit(packet); diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h index 92832786059..c12e37dd1c8 100644 --- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h +++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h @@ -31,7 +31,7 @@ public: virtual ~TransLogServer(); DomainStats getDomainStats() const; - void commit(const vespalib::string & domainName, const Packet & packet) override; + void commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done) override; class Session diff --git a/storageapi/src/vespa/storageapi/messageapi/returncode.cpp b/storageapi/src/vespa/storageapi/messageapi/returncode.cpp index 9825aea47bc..79a768541b9 100644 --- a/storageapi/src/vespa/storageapi/messageapi/returncode.cpp +++ b/storageapi/src/vespa/storageapi/messageapi/returncode.cpp @@ -93,7 +93,6 @@ ReturnCode::isNodeDownOrNetwork() const case mbus::ErrorCode::UNKNOWN_SESSION: case mbus::ErrorCode::HANDSHAKE_FAILED: case mbus::ErrorCode::NO_SERVICES_FOR_ROUTE: - case mbus::ErrorCode::SERVICE_OOS: case mbus::ErrorCode::NETWORK_ERROR: case mbus::ErrorCode::UNKNOWN_PROTOCOL: case Protocol::ERROR_NODE_NOT_READY: diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java index 7cea767f9ba..0a9fe72552c 100644 --- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java +++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java @@ -131,7 +131,6 @@ public class OperationProcessor { exceptionMessage.contains("SEND_QUEUE_CLOSED") || exceptionMessage.contains("ILLEGAL_ROUTE") || exceptionMessage.contains("NO_SERVICES_FOR_ROUTE") || - exceptionMessage.contains("SERVICE_OOS") || exceptionMessage.contains("NETWORK_ERROR") || exceptionMessage.contains("SEQUENCE_ERROR") || exceptionMessage.contains("NETWORK_SHUTDOWN") || diff --git a/vespalib/src/vespa/vespalib/util/sync.h b/vespalib/src/vespa/vespalib/util/sync.h index f961c280174..86e0a227c72 100644 --- a/vespalib/src/vespa/vespalib/util/sync.h +++ b/vespalib/src/vespa/vespalib/util/sync.h @@ -710,7 +710,7 @@ public: /** * Empty. Needs to be virtual to reduce compiler warnings. **/ - virtual ~CountDownLatch() {} + virtual ~CountDownLatch() = default; }; |