summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortoby <smorgrav@yahoo-inc.com>2017-10-18 14:28:19 +0200
committertoby <smorgrav@yahoo-inc.com>2017-10-18 14:28:19 +0200
commit79d1aa480db70a00ef74c19f582db2a4d220a96a (patch)
tree75b6e4c15048cd6282bc862d6f2ab7ed247f2f97
parent8f417af81fe1d5c011636eb9eeedf5a3c68775cf (diff)
parent9de8d3d524f8856f9bed0efc6294ace9dd3e6c08 (diff)
Merge branch 'master' into smorgrav/cost_npe
-rw-r--r--config-application-package/src/main/java/com/yahoo/config/model/application/provider/Bundle.java2
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/application/ZKTenantApplications.java9
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/zookeeper/ZKApplicationPackage.java41
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/zookeeper/ZKApplicationPackageTest.java2
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/TenantController.java6
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/athenz/impl/ZmsClientImpl.java10
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ClusterInfoMaintainer.java14
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java23
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/Authorizer.java8
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/DeployAuthorizer.java19
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java2
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiTest.java2
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java2
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java20
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SearchColumnPolicy.java183
-rwxr-xr-xdocumentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SearchRowPolicy.java85
-rwxr-xr-xdocumentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/PolicyTestCase.java195
-rw-r--r--documentapi/src/tests/messagebus/messagebus_test.cpp8
-rw-r--r--documentapi/src/tests/policies/policies_test.cpp367
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp5
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt2
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/searchcolumnpolicy.cpp137
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/searchcolumnpolicy.h52
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/searchrowpolicy.cpp63
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/searchrowpolicy.h30
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp14
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h8
-rw-r--r--jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/StatusCodes.java2
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/ErrorCode.java4
-rw-r--r--messagebus/src/vespa/messagebus/errorcode.cpp1
-rw-r--r--messagebus/src/vespa/messagebus/errorcode.h3
-rw-r--r--messagebus_test/src/tests/errorcodes/DumpCodes.java1
-rw-r--r--messagebus_test/src/tests/errorcodes/dumpcodes.cpp1
-rw-r--r--messagebus_test/src/tests/errorcodes/ref-dump.txt1
-rw-r--r--searchcore/src/tests/proton/docsummary/docsummary.cpp5
-rw-r--r--searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp2
-rw-r--r--searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp2
-rw-r--r--searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp42
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.cpp13
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp90
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.h5
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h7
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp12
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/tlcproxy.cpp10
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/tlcproxy.h14
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/tlswriter.h8
-rw-r--r--searchlib/src/vespa/searchlib/common/CMakeLists.txt1
-rw-r--r--searchlib/src/vespa/searchlib/common/gatecallback.cpp12
-rw-r--r--searchlib/src/vespa/searchlib/common/gatecallback.h24
-rw-r--r--searchlib/src/vespa/searchlib/common/idestructorcallback.h5
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/common.h4
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp3
-rw-r--r--searchlib/src/vespa/searchlib/transactionlog/translogserver.h2
-rw-r--r--storageapi/src/vespa/storageapi/messageapi/returncode.cpp1
-rw-r--r--vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java1
-rw-r--r--vespalib/src/vespa/vespalib/util/sync.h2
56 files changed, 278 insertions, 1309 deletions
diff --git a/config-application-package/src/main/java/com/yahoo/config/model/application/provider/Bundle.java b/config-application-package/src/main/java/com/yahoo/config/model/application/provider/Bundle.java
index 213024b95d0..3eb1c89db3e 100644
--- a/config-application-package/src/main/java/com/yahoo/config/model/application/provider/Bundle.java
+++ b/config-application-package/src/main/java/com/yahoo/config/model/application/provider/Bundle.java
@@ -132,7 +132,7 @@ public class Bundle {
defName = nameAndNamespace.first;
defNamespace = getNamespace();
if (defNamespace.isEmpty())
- throw new IllegalArgumentException("Config definition '" + defName + "' is missing a namespace");
+ throw new IllegalArgumentException("Config definition '" + defName + "' is missing a package (or namespace)");
contents = getContents();
}
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ZKTenantApplications.java b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ZKTenantApplications.java
index 9e3df5af715..574bac792dd 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/application/ZKTenantApplications.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/application/ZKTenantApplications.java
@@ -164,10 +164,13 @@ public class ZKTenantApplications implements TenantApplications, PathChildrenCac
case CHILD_REMOVED:
applicationRemoved(ApplicationId.fromSerializedForm(Path.fromString(event.getData().getPath()).getName()));
break;
+ case CHILD_UPDATED:
+ // do nothing, application just got redeployed
+ break;
default:
// We don't know if applications have been added or removed so possibly need to remove some of them
// (new applications are not added here)
- removeApplications();
+ removeApplications(event.getType());
break;
}
}
@@ -181,9 +184,9 @@ public class ZKTenantApplications implements TenantApplications, PathChildrenCac
log.log(LogLevel.DEBUG, Tenants.logPre(applicationId) + "Application added: " + applicationId);
}
- private void removeApplications() {
+ private void removeApplications(PathChildrenCacheEvent.Type eventType) {
ImmutableSet<ApplicationId> allApplications = ImmutableSet.copyOf(listApplications());
- log.log(Level.INFO, "We probably lost events, need to check if applications have been removed, " +
+ log.log(Level.INFO, "Got " + eventType + " event, need to check if applications have been removed, " +
" found these active applications: " + allApplications);
reloadHandler.removeApplicationsExcept(allApplications);
}
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/zookeeper/ZKApplicationPackage.java b/configserver/src/main/java/com/yahoo/vespa/config/server/zookeeper/ZKApplicationPackage.java
index 05c301ddba8..e4933dc84c4 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/zookeeper/ZKApplicationPackage.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/zookeeper/ZKApplicationPackage.java
@@ -2,8 +2,6 @@
package com.yahoo.vespa.config.server.zookeeper;
import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableSet;
-import com.yahoo.component.Version;
import com.yahoo.config.application.api.ApplicationMetaData;
import com.yahoo.config.application.api.ComponentInfo;
import com.yahoo.config.application.api.FileRegistry;
@@ -12,7 +10,6 @@ import com.yahoo.config.codegen.DefParser;
import com.yahoo.config.application.api.ApplicationFile;
import com.yahoo.config.application.api.ApplicationPackage;
import com.yahoo.config.model.application.provider.*;
-import com.yahoo.config.provision.HostSpec;
import com.yahoo.config.provision.NodeFlavors;
import com.yahoo.config.provision.AllocatedHosts;
import com.yahoo.io.IOUtils;
@@ -61,37 +58,7 @@ public class ZKApplicationPackage implements ApplicationPackage {
private Optional<AllocatedHosts> importAllocatedHosts(String allocatedHostsPath, Optional<NodeFlavors> nodeFlavors) {
if ( ! liveApp.exists(allocatedHostsPath)) return Optional.empty();
- Optional<AllocatedHosts> allocatedHosts = readAllocatedHosts(allocatedHostsPath, nodeFlavors);
- if ( ! allocatedHosts.isPresent()) { // Read from legacy location. TODO: Remove when 6.143 is in production everywhere
- List<String> allocatedHostsByVersionNodes = liveApp.getChildren(allocatedHostsPath);
- allocatedHosts = merge(readAllocatedHostsByVersion(allocatedHostsByVersionNodes, nodeFlavors));
- }
- return allocatedHosts;
- }
-
- private Map<Version, AllocatedHosts> readAllocatedHostsByVersion(List<String> allocatedHostsByVersionNodes,
- Optional<NodeFlavors> nodeFlavors) {
- Map<Version, AllocatedHosts> allocatedHostsByVersion = new HashMap<>();
- allocatedHostsByVersionNodes.stream()
- .forEach(versionStr -> {
- Version version = Version.fromString(versionStr);
- Optional<AllocatedHosts> allocatedHosts = readAllocatedHosts(Joiner.on("/").join(allocatedHostsNode, versionStr),
- nodeFlavors);
- allocatedHosts.ifPresent(info -> allocatedHostsByVersion.put(version, info));
- });
- return allocatedHostsByVersion;
- }
-
- private Optional<AllocatedHosts> merge(Map<Version, AllocatedHosts> allocatedHostsByVersion) {
- // Merge the allocated hosts in any order. This is wrong but preserves current behavior (modulo order differences)
- if (allocatedHostsByVersion.isEmpty()) return Optional.empty();
-
- Map<String, HostSpec> merged = new HashMap<>();
- for (Map.Entry<Version, AllocatedHosts> entry : allocatedHostsByVersion.entrySet()) {
- for (HostSpec host : entry.getValue().getHosts())
- merged.put(host.hostname(), host);
- }
- return Optional.of(AllocatedHosts.withHosts(ImmutableSet.copyOf(merged.values())));
+ return Optional.of(readAllocatedHosts(allocatedHostsPath, nodeFlavors));
}
/**
@@ -99,11 +66,9 @@ public class ZKApplicationPackage implements ApplicationPackage {
*
* @return the allocated hosts at this node or empty if there is no data at this path
*/
- private Optional<AllocatedHosts> readAllocatedHosts(String allocatedHostsPath, Optional<NodeFlavors> nodeFlavors) {
+ private AllocatedHosts readAllocatedHosts(String allocatedHostsPath, Optional<NodeFlavors> nodeFlavors) {
try {
- byte[] data = liveApp.getBytes(allocatedHostsPath);
- if (data.length == 0) return Optional.empty(); // TODO: Remove this line (and make return non-optional) when 6.143 is in production everywhere
- return Optional.of(AllocatedHosts.fromJson(data, nodeFlavors));
+ return AllocatedHosts.fromJson(liveApp.getBytes(allocatedHostsPath), nodeFlavors);
} catch (Exception e) {
throw new RuntimeException("Unable to read allocated hosts", e);
}
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/zookeeper/ZKApplicationPackageTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/zookeeper/ZKApplicationPackageTest.java
index adf26dbfa32..07430a66c89 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/zookeeper/ZKApplicationPackageTest.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/zookeeper/ZKApplicationPackageTest.java
@@ -77,7 +77,7 @@ public class ZKApplicationPackageTest extends TestWithCurator {
String metaData = "{\"deploy\":{\"user\":\"foo\",\"from\":\"bar\",\"timestamp\":1},\"application\":{\"name\":\"foo\",\"checksum\":\"abc\",\"generation\":4,\"previousActiveGeneration\":3}}";
zk.putData("/0", ConfigCurator.META_ZK_PATH, metaData);
zk.putData("/0/" + ZKApplicationPackage.fileRegistryNode + "/3.0.0", "dummyfiles");
- zk.putData("/0/" + ZKApplicationPackage.allocatedHostsNode + "/3.0.0", ALLOCATED_HOSTS.toJson());
+ zk.putData("/0/" + ZKApplicationPackage.allocatedHostsNode, ALLOCATED_HOSTS.toJson());
}
private static class MockNodeFlavors extends NodeFlavors{
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/TenantController.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/TenantController.java
index 4f61c811a9b..a4bddf86cbb 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/TenantController.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/TenantController.java
@@ -140,7 +140,7 @@ public class TenantController {
if (updatedTenant.isAthensTenant() && ! token.isPresent())
throw new IllegalArgumentException("Could not update " + updatedTenant + ": No NToken provided");
- updateAthensDomain(updatedTenant, token);
+ updateAthenzDomain(updatedTenant, token);
db.updateTenant(updatedTenant);
log.info("Updated " + updatedTenant);
} catch (PersistenceException e) {
@@ -148,7 +148,7 @@ public class TenantController {
}
}
- private void updateAthensDomain(Tenant updatedTenant, Optional<NToken> token) {
+ private void updateAthenzDomain(Tenant updatedTenant, Optional<NToken> token) {
Tenant existingTenant = tenant(updatedTenant.getId()).get();
if ( ! existingTenant.isAthensTenant()) return;
@@ -192,7 +192,7 @@ public class TenantController {
}
}
- public Tenant migrateTenantToAthens(TenantId tenantId,
+ public Tenant migrateTenantToAthenz(TenantId tenantId,
AthenzDomain tenantDomain,
PropertyId propertyId,
Property property,
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/athenz/impl/ZmsClientImpl.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/athenz/impl/ZmsClientImpl.java
index cf2f7c798c6..110e06b767c 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/athenz/impl/ZmsClientImpl.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/athenz/impl/ZmsClientImpl.java
@@ -121,7 +121,7 @@ public class ZmsClientImpl implements ZmsClient {
DomainList domainList = zmsClient.getDomainList(
/*limit*/null, /*skip*/null, prefix, /*depth*/null, /*domain*/null,
/*productId*/ null, /*modifiedSince*/null);
- return toAthensDomains(domainList.getNames());
+ return toAthenzDomains(domainList.getNames());
});
}
@@ -139,7 +139,7 @@ public class ZmsClientImpl implements ZmsClient {
log("getServiceIdentity(domain=%s, service=%s)", service.getDomain().id(), service.getServiceName());
return getOrThrow(() -> {
ServiceIdentity serviceIdentity = zmsClient.getServiceIdentity(service.getDomain().id(), service.getServiceName());
- return toAthensPublicKeys(serviceIdentity.getPublicKeys());
+ return toAthenzPublicKeys(serviceIdentity.getPublicKeys());
});
}
@@ -153,11 +153,11 @@ public class ZmsClientImpl implements ZmsClient {
.collect(toList());
}
- private static List<AthenzDomain> toAthensDomains(List<String> domains) {
+ private static List<AthenzDomain> toAthenzDomains(List<String> domains) {
return domains.stream().map(AthenzDomain::new).collect(toList());
}
- private static List<AthenzPublicKey> toAthensPublicKeys(List<PublicKeyEntry> publicKeys) {
+ private static List<AthenzPublicKey> toAthenzPublicKeys(List<PublicKeyEntry> publicKeys) {
return publicKeys.stream()
.map(entry -> fromYbase64EncodedKey(entry.getKey(), entry.getId()))
.collect(toList());
@@ -192,7 +192,7 @@ public class ZmsClientImpl implements ZmsClient {
}
private static void logWarning(ZMSClientException e) {
- log.warning("Error from Athens: " + e.getMessage());
+ log.warning("Error from Athenz: " + e.getMessage());
}
private String resourceStringPrefix(AthenzDomain tenantDomain) {
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ClusterInfoMaintainer.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ClusterInfoMaintainer.java
index c807a7f0586..baf60b612b0 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ClusterInfoMaintainer.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ClusterInfoMaintainer.java
@@ -18,6 +18,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
@@ -30,6 +31,8 @@ import java.util.stream.Collectors;
*/
public class ClusterInfoMaintainer extends Maintainer {
+ private static final Logger log = Logger.getLogger(ClusterInfoMaintainer.class.getName());
+
private final Controller controller;
ClusterInfoMaintainer(Controller controller, Duration duration, JobControl jobControl) {
@@ -53,7 +56,7 @@ public class ClusterInfoMaintainer extends Maintainer {
for (String id : clusters.keySet()) {
List<NodeList.Node> clusterNodes = clusters.get(id);
- //Assume they are all equal and use first node as a representatitve for the cluster
+ // Assume they are all equal and use first node as a representative for the cluster
NodeList.Node node = clusterNodes.get(0);
// Extract flavor info
@@ -73,7 +76,7 @@ public class ClusterInfoMaintainer extends Maintainer {
// Add to map
List<String> hostnames = clusterNodes.stream().map(node1 -> node1.hostname).collect(Collectors.toList());
ClusterInfo inf = new ClusterInfo(node.flavor, node.cost, cpu, mem, disk,
- ClusterSpec.Type.from(node.membership.clusterType), hostnames);
+ ClusterSpec.Type.from(node.membership.clusterType), hostnames);
infoMap.put(new ClusterSpec.Id(id), inf);
}
@@ -82,7 +85,6 @@ public class ClusterInfoMaintainer extends Maintainer {
@Override
protected void maintain() {
-
for (Application application : controller().applications().asList()) {
try (Lock lock = controller().applications().lock(application.id())) {
for (Deployment deployment : application.deployments().values()) {
@@ -92,11 +94,13 @@ public class ClusterInfoMaintainer extends Maintainer {
Map<ClusterSpec.Id, ClusterInfo> clusterInfo = getClusterInfo(nodes, deployment.zone());
Application app = application.with(deployment.withClusterInfo(clusterInfo));
controller.applications().store(app, lock);
- } catch (IOException ioe) {
- Logger.getLogger(ClusterInfoMaintainer.class.getName()).fine(ioe.getMessage());
+ }
+ catch (IOException | IllegalArgumentException e) {
+ log.log(Level.WARNING, "Failing getting cluster info of for " + deploymentId, e);
}
}
}
}
}
+
}
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java
index 99530557981..c50f1464be7 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java
@@ -160,7 +160,7 @@ public class ApplicationApiHandler extends LoggingRequestHandler {
if (path.matches("/application/v4/user")) return authenticatedUser(request);
if (path.matches("/application/v4/tenant")) return tenants(request);
if (path.matches("/application/v4/tenant-pipeline")) return tenantPipelines();
- if (path.matches("/application/v4/athensDomain")) return athensDomains(request);
+ if (path.matches("/application/v4/athensDomain")) return athenzDomains(request);
if (path.matches("/application/v4/property")) return properties();
if (path.matches("/application/v4/cookiefreshness")) return cookieFreshness(request);
if (path.matches("/application/v4/tenant/{tenant}")) return tenant(path.get("tenant"), request);
@@ -269,12 +269,12 @@ public class ApplicationApiHandler extends LoggingRequestHandler {
return new SlimeJsonResponse(slime);
}
- private HttpResponse athensDomains(HttpRequest request) {
+ private HttpResponse athenzDomains(HttpRequest request) {
Slime slime = new Slime();
Cursor response = slime.setObject();
Cursor array = response.setArray("data");
- for (AthenzDomain athensDomain : controller.getDomainList(request.getProperty("prefix"))) {
- array.addString(athensDomain.id());
+ for (AthenzDomain athenzDomain : controller.getDomainList(request.getProperty("prefix"))) {
+ array.addString(athenzDomain.id());
}
return new SlimeJsonResponse(slime);
}
@@ -638,7 +638,7 @@ public class ApplicationApiHandler extends LoggingRequestHandler {
if (tenant.isOpsDbTenant())
throwIfNotSuperUserOrPartOfOpsDbGroup(new UserGroup(mandatory("userGroup", requestData).asString()), request);
if (tenant.isAthensTenant())
- throwIfNotAthensDomainAdmin(new AthenzDomain(mandatory("athensDomain", requestData).asString()), request);
+ throwIfNotAthenzDomainAdmin(new AthenzDomain(mandatory("athensDomain", requestData).asString()), request);
controller.tenants().addTenant(tenant, authorizer.getNToken(request));
return new SlimeJsonResponse(toSlime(tenant, request, true));
@@ -652,11 +652,11 @@ public class ApplicationApiHandler extends LoggingRequestHandler {
PropertyId propertyId = new PropertyId(mandatory("propertyId", requestData).asString());
authorizer.throwIfUnauthorized(tenantid, request);
- throwIfNotAthensDomainAdmin(tenantDomain, request);
+ throwIfNotAthenzDomainAdmin(tenantDomain, request);
NToken nToken = authorizer.getNToken(request)
.orElseThrow(() ->
new BadRequestException("The NToken for a domain admin is required to migrate tenant to Athens"));
- Tenant tenant = controller.tenants().migrateTenantToAthens(tenantid, tenantDomain, propertyId, property, nToken);
+ Tenant tenant = controller.tenants().migrateTenantToAthenz(tenantid, tenantDomain, propertyId, property, nToken);
return new SlimeJsonResponse(toSlime(tenant, request, true));
}
@@ -769,6 +769,9 @@ public class ApplicationApiHandler extends LoggingRequestHandler {
tenant,
applicationId);
} else { // In case of host-based principal
+ // TODO What about other user type principals like Bouncer?
+ log.log(LogLevel.WARNING,
+ "Using deprecated DeployAuthorizer.throwIfUnauthorizedForDeploy. Principal=" + principal);
UserId userId = new UserId(principal.getName());
deployAuthorizer.throwIfUnauthorizedForDeploy(
Environment.from(environment),
@@ -959,11 +962,11 @@ public class ApplicationApiHandler extends LoggingRequestHandler {
}
}
- private void throwIfNotAthensDomainAdmin(AthenzDomain tenantDomain, HttpRequest request) {
+ private void throwIfNotAthenzDomainAdmin(AthenzDomain tenantDomain, HttpRequest request) {
UserId userId = authorizer.getUserId(request);
- if ( ! authorizer.isAthensDomainAdmin(userId, tenantDomain)) {
+ if ( ! authorizer.isAthenzDomainAdmin(userId, tenantDomain)) {
throw new ForbiddenException(
- String.format("The user '%s' is not admin in Athens domain '%s'", userId.id(), tenantDomain.id()));
+ String.format("The user '%s' is not admin in Athenz domain '%s'", userId.id(), tenantDomain.id()));
}
}
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/Authorizer.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/Authorizer.java
index cbd39b201c1..93dc2541385 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/Authorizer.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/Authorizer.java
@@ -92,7 +92,7 @@ public class Authorizer {
}
public boolean isSuperUser(HttpRequest request) {
- // TODO Check membership of admin role in Vespa's Athens domain
+ // TODO Check membership of admin role in Vespa's Athenz domain
return isMemberOfVespaBouncerGroup(request) || isScrewdriverPrincipal(getPrincipal(request));
}
@@ -114,7 +114,7 @@ public class Authorizer {
private boolean isTenantAdmin(UserId userId, Tenant tenant) {
switch (tenant.tenantType()) {
case ATHENS:
- return isAthensTenantAdmin(userId, tenant.getAthensDomain().get());
+ return isAthenzTenantAdmin(userId, tenant.getAthensDomain().get());
case OPSDB:
return isGroupMember(userId, tenant.getUserGroup().get());
case USER:
@@ -123,12 +123,12 @@ public class Authorizer {
throw new IllegalArgumentException("Unknown tenant type: " + tenant.tenantType());
}
- private boolean isAthensTenantAdmin(UserId userId, AthenzDomain tenantDomain) {
+ private boolean isAthenzTenantAdmin(UserId userId, AthenzDomain tenantDomain) {
return athenzClientFactory.createZmsClientWithServicePrincipal()
.hasTenantAdminAccess(AthenzUtils.createPrincipal(userId), tenantDomain);
}
- public boolean isAthensDomainAdmin(UserId userId, AthenzDomain tenantDomain) {
+ public boolean isAthenzDomainAdmin(UserId userId, AthenzDomain tenantDomain) {
return athenzClientFactory.createZmsClientWithServicePrincipal()
.isDomainAdmin(AthenzUtils.createPrincipal(userId), tenantDomain);
}
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/DeployAuthorizer.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/DeployAuthorizer.java
index 209f17464a7..7cf19629774 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/DeployAuthorizer.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/DeployAuthorizer.java
@@ -43,12 +43,12 @@ public class DeployAuthorizer {
Environment environment,
Tenant tenant,
ApplicationId applicationId) {
- if (athensCredentialsRequired(environment, tenant, applicationId, principal))
- checkAthensCredentials(principal, tenant, applicationId);
+ if (athenzCredentialsRequired(environment, tenant, applicationId, principal))
+ checkAthenzCredentials(principal, tenant, applicationId);
}
// TODO: inline when deployment via ssh is removed
- private boolean athensCredentialsRequired(Environment environment, Tenant tenant, ApplicationId applicationId, Principal principal) {
+ private boolean athenzCredentialsRequired(Environment environment, Tenant tenant, ApplicationId applicationId, Principal principal) {
if (!environmentRequiresAuthorization(environment)) return false;
if (! isScrewdriverPrincipal(principal))
@@ -61,13 +61,13 @@ public class DeployAuthorizer {
// TODO: inline when deployment via ssh is removed
- private void checkAthensCredentials(Principal principal, Tenant tenant, ApplicationId applicationId) {
+ private void checkAthenzCredentials(Principal principal, Tenant tenant, ApplicationId applicationId) {
AthenzDomain domain = tenant.getAthensDomain().get();
if (! (principal instanceof AthenzPrincipal))
throw loggedForbiddenException("Principal '%s' is not authenticated.", principal.getName());
AthenzPrincipal athensPrincipal = (AthenzPrincipal)principal;
- if ( ! hasDeployAccessToAthensApplication(athensPrincipal, domain, applicationId))
+ if ( ! hasDeployAccessToAthenzApplication(athensPrincipal, domain, applicationId))
throw loggedForbiddenException(
"Screwdriver principal '%1$s' does not have deploy access to '%2$s'. " +
"Either the application has not been created at " + zoneRegistry.getDashboardUri() + " or " +
@@ -90,18 +90,17 @@ public class DeployAuthorizer {
Tenant tenant,
ApplicationId applicationId,
Optional<ScrewdriverId> optionalScrewdriverId) {
-
Principal principal = new UnauthenticatedUserPrincipal(userId.id());
- if (athensCredentialsRequired(environment, tenant, applicationId, principal)) {
+ if (athenzCredentialsRequired(environment, tenant, applicationId, principal)) {
ScrewdriverId screwdriverId = optionalScrewdriverId.orElseThrow(
() -> loggedForbiddenException("Screwdriver id must be provided when deploying from Screwdriver."));
principal = AthenzUtils.createPrincipal(screwdriverId);
- checkAthensCredentials(principal, tenant, applicationId);
+ checkAthenzCredentials(principal, tenant, applicationId);
}
}
- private boolean hasDeployAccessToAthensApplication(AthenzPrincipal principal, AthenzDomain domain, ApplicationId applicationId) {
+ private boolean hasDeployAccessToAthenzApplication(AthenzPrincipal principal, AthenzDomain domain, ApplicationId applicationId) {
try {
return athenzClientFactory.createZmsClientWithServicePrincipal()
.hasApplicationAccess(
@@ -111,7 +110,7 @@ public class DeployAuthorizer {
new com.yahoo.vespa.hosted.controller.api.identifiers.ApplicationId(applicationId.application().value()));
} catch (ZmsException e) {
throw loggedForbiddenException(
- "Failed to authorize deployment through Athens. If this problem persists, " +
+ "Failed to authorize deployment through Athenz. If this problem persists, " +
"please create ticket at yo/vespa-support. (" + e.getMessage() + ")");
}
}
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java
index d39f72ec1b8..e36645175a7 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/ControllerTest.java
@@ -381,7 +381,7 @@ public class ControllerTest {
// Migrate tenant to Athens
NToken nToken = TestIdentities.userNToken;
- tester.controller().tenants().migrateTenantToAthens(
+ tester.controller().tenants().migrateTenantToAthenz(
tenantId, athensDomain, new PropertyId("1567"), new Property("vespa_dev.no"), nToken);
// Verify that tenant is migrated
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiTest.java
index ef8a3809b25..1ac5dfeb58a 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiTest.java
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiTest.java
@@ -529,7 +529,7 @@ public class ApplicationApiTest extends ControllerContainerTest {
"{\"athensDomain\":\"domain1\", \"property\":\"property1\"}",
Request.Method.POST,
"domain1", unauthorizedUser),
- "{\"error-code\":\"FORBIDDEN\",\"message\":\"The user 'othertenant' is not admin in Athens domain 'domain1'\"}",
+ "{\"error-code\":\"FORBIDDEN\",\"message\":\"The user 'othertenant' is not admin in Athenz domain 'domain1'\"}",
403);
// (Create it with the right tenant id)
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java
index c401fc16a2f..e1e501d3e1b 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/DocumentProtocol.java
@@ -291,8 +291,6 @@ public class DocumentProtocol implements Protocol {
putRoutingPolicyFactory("MessageType", new RoutingPolicyFactories.MessageTypePolicyFactory(cfg));
putRoutingPolicyFactory("RoundRobin", new RoutingPolicyFactories.RoundRobinPolicyFactory());
putRoutingPolicyFactory("LoadBalancer", new RoutingPolicyFactories.LoadBalancerPolicyFactory());
- putRoutingPolicyFactory("SearchColumn", new RoutingPolicyFactories.SearchColumnPolicyFactory());
- putRoutingPolicyFactory("SearchRow", new RoutingPolicyFactories.SearchRowPolicyFactory());
putRoutingPolicyFactory("Storage", new RoutingPolicyFactories.StoragePolicyFactory());
putRoutingPolicyFactory("SubsetService", new RoutingPolicyFactories.SubsetServicePolicyFactory());
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java
index acd73a21a8a..6c13b7468c7 100755
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/RoutingPolicyFactories.java
@@ -116,26 +116,6 @@ public abstract class RoutingPolicyFactories {
}
}
- static class SearchColumnPolicyFactory implements RoutingPolicyFactory {
- public DocumentProtocolRoutingPolicy createPolicy(String param) {
- return new SearchColumnPolicy(param);
- }
-
-
- public void destroy() {
- }
- }
-
- static class SearchRowPolicyFactory implements RoutingPolicyFactory {
- public DocumentProtocolRoutingPolicy createPolicy(String param) {
- return new SearchRowPolicy(param);
- }
-
-
- public void destroy() {
- }
- }
-
static class SubsetServicePolicyFactory implements RoutingPolicyFactory {
public DocumentProtocolRoutingPolicy createPolicy(String param) {
return new SubsetServicePolicy(param);
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SearchColumnPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SearchColumnPolicy.java
deleted file mode 100644
index aabb6407d14..00000000000
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SearchColumnPolicy.java
+++ /dev/null
@@ -1,183 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.documentapi.messagebus.protocol;
-
-import com.yahoo.document.BucketId;
-import com.yahoo.document.BucketIdFactory;
-import com.yahoo.document.DocumentId;
-import com.yahoo.log.LogLevel;
-import com.yahoo.messagebus.EmptyReply;
-import com.yahoo.messagebus.ErrorCode;
-import com.yahoo.messagebus.Message;
-import com.yahoo.messagebus.Reply;
-import com.yahoo.messagebus.metrics.MetricSet;
-import com.yahoo.messagebus.routing.Route;
-import com.yahoo.messagebus.routing.RoutingContext;
-import com.yahoo.messagebus.routing.RoutingNodeIterator;
-import com.yahoo.vdslib.BucketDistribution;
-
-import java.util.*;
-import java.util.logging.Logger;
-
-/**
- * <p>This policy implements the logic to select recipients for a single search column. It has 2 different modes of
- * operation;</p>
- *
- * <ol>
- * <li>If the "maxbadparts" parameter is 0, select recipient based on document id hash and use
- * shared merge logic. Do not allow any out-of-service replies.</li>
- * <li>Else do best-effort validation of system
- * state. This means;
- * <ol>
- * <li>if the message is sending to all recipients (typicall start- and
- * end-of-feed), allow at most "maxbadparts" out-of-service replies,</li>
- * <li>else always allow out-of-service reply by masking it with an empty
- * reply.</li>
- * </ol>
- * </li>
- * </ol>
- * <p>For systems that allow bad parts, one will not know whether or not feeding
- * was a success until the RTX attempts to set the new index live, because it is
- * only the RTX that is now able to verify that the service level requirements
- * are met. Feeding will still break if a message that was supposed to be sent
- * to all recipients receives more than "maxbadparts" out-of-service replies,
- * according to (2.a) above.</p>
- *
- * @author Simon Thoresen
- */
-public class SearchColumnPolicy implements DocumentProtocolRoutingPolicy {
-
- private static Logger log = Logger.getLogger(SearchColumnPolicy.class.getName());
- private BucketIdFactory factory = new BucketIdFactory();
- private Map<Integer, BucketDistribution> distributions = new HashMap<Integer, BucketDistribution>();
- private int maxOOS = 0; // The maximum OUT_OF_SERVICE replies to hide.
-
- public static final int DEFAULT_NUM_BUCKET_BITS = 16;
-
- /**
- * Constructs a new policy object for the given parameter string. The string can be null or empty, which is a
- * request to not allow any bad columns.
- *
- * @param param The maximum number of allowed bad columns.
- */
- public SearchColumnPolicy(String param) {
- if (param != null && param.length() > 0) {
- try {
- maxOOS = Integer.parseInt(param);
- } catch (NumberFormatException e) {
- log.log(LogLevel.WARNING, "Parameter '" + param + "' could not be parsed as an integer.", e);
- }
- if (maxOOS < 0) {
- log.log(LogLevel.WARNING, "Ignoring a request to set the maximum number of OOS replies to " + maxOOS +
- " because it makes no sense. This routing policy will not allow any recipient" +
- " to be out of service.");
- }
- }
- }
-
- @Override
- public void select(RoutingContext context) {
- List<Route> recipients = context.getMatchedRecipients();
- if (recipients == null || recipients.size() == 0) {
- return;
- }
- DocumentId id = null;
- BucketId bucketId = null;
- Message msg = context.getMessage();
- switch (msg.getType()) {
-
- case DocumentProtocol.MESSAGE_PUTDOCUMENT:
- id = ((PutDocumentMessage)msg).getDocumentPut().getDocument().getId();
- break;
-
- case DocumentProtocol.MESSAGE_GETDOCUMENT:
- id = ((GetDocumentMessage)msg).getDocumentId();
- break;
-
- case DocumentProtocol.MESSAGE_REMOVEDOCUMENT:
- id = ((RemoveDocumentMessage)msg).getDocumentId();
- break;
-
- case DocumentProtocol.MESSAGE_UPDATEDOCUMENT:
- id = ((UpdateDocumentMessage)msg).getDocumentUpdate().getId();
- break;
-
- case DocumentProtocol.MESSAGE_BATCHDOCUMENTUPDATE:
- bucketId = ((BatchDocumentUpdateMessage)msg).getBucketId();
- break;
-
- case DocumentProtocol.MESSAGE_GETBUCKETSTATE:
- bucketId = ((GetBucketStateMessage)msg).getBucketId();
- break;
-
- default:
- throw new UnsupportedOperationException("Message type '" + msg.getType() + "' not supported.");
- }
- if (bucketId == null && id != null) {
- bucketId = factory.getBucketId(id);
- }
- int recipient = getRecipient(bucketId, recipients.size());
- context.addChild(recipients.get(recipient));
- context.setSelectOnRetry(true);
- if (maxOOS > 0) {
- context.addConsumableError(ErrorCode.SERVICE_OOS);
- }
- }
-
- @Override
- public void merge(RoutingContext context) {
- if (maxOOS > 0) {
- if (context.getNumChildren() > 1) {
- Set<Integer> oosReplies = new HashSet<Integer>();
- int idx = 0;
- for (RoutingNodeIterator it = context.getChildIterator();
- it.isValid(); it.next())
- {
- Reply ref = it.getReplyRef();
- if (ref.hasErrors() && DocumentProtocol.hasOnlyErrorsOfType(ref, ErrorCode.SERVICE_OOS)) {
- oosReplies.add(idx);
- }
- ++idx;
- }
- if (oosReplies.size() <= maxOOS) {
- DocumentProtocol.merge(context, oosReplies);
- return; // may the rtx be with you
- }
- } else {
- Reply ref = context.getChildIterator().getReplyRef();
- if (ref.hasErrors() && DocumentProtocol.hasOnlyErrorsOfType(ref, ErrorCode.SERVICE_OOS)) {
- context.setReply(new EmptyReply());
- return; // god help us all
- }
- }
- }
- DocumentProtocol.merge(context);
- }
-
- /**
- * Returns the recipient index for the given bucket id. This updates the shared internal distribution map, so it
- * needs to be synchronized.
- *
- * @param bucketId The bucket whose recipient to return.
- * @param numRecipients The number of recipients being distributed to.
- * @return The recipient to use.
- */
- private synchronized int getRecipient(BucketId bucketId, int numRecipients) {
- BucketDistribution distribution = distributions.get(numRecipients);
- if (distribution == null) {
- distribution = new BucketDistribution(1, DEFAULT_NUM_BUCKET_BITS);
- distribution.setNumColumns(numRecipients);
- distributions.put(numRecipients, distribution);
- }
- return distribution.getColumn(bucketId);
- }
-
- @Override
- public void destroy() {
- // empty
- }
-
- @Override
- public MetricSet getMetrics() {
- return null;
- }
-}
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SearchRowPolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SearchRowPolicy.java
deleted file mode 100755
index 4d94107eae7..00000000000
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/SearchRowPolicy.java
+++ /dev/null
@@ -1,85 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.documentapi.messagebus.protocol;
-
-import com.yahoo.log.LogLevel;
-import com.yahoo.messagebus.ErrorCode;
-import com.yahoo.messagebus.Reply;
-import com.yahoo.messagebus.metrics.MetricSet;
-import com.yahoo.messagebus.routing.RoutingContext;
-import com.yahoo.messagebus.routing.RoutingNodeIterator;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.logging.Logger;
-
-/**
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- */
-public class SearchRowPolicy implements DocumentProtocolRoutingPolicy {
-
- private static Logger log = Logger.getLogger(SearchRowPolicy.class.getName());
- private int minOk = 0; // Hide OUT_OF_SERVICE as long as this number of replies are something else.
-
- /**
- * Creates a search row policy that wraps the underlying search group policy in case the parameter is something
- * other than an empty string.
- *
- * @param param The number of minimum non-OOS replies that this policy requires.
- */
- public SearchRowPolicy(String param) {
- if (param != null && param.length() > 0) {
- try {
- minOk = Integer.parseInt(param);
- }
- catch (NumberFormatException e) {
- log.log(LogLevel.WARNING, "Parameter '" + param + "' could not be parsed as an integer.", e);
- }
- if (minOk <= 0) {
- log.log(LogLevel.WARNING, "Ignoring a request to set the minimum number of OK replies to " + minOk + " " +
- "because it makes no sense. This routing policy will not allow any recipient " +
- "to be out of service.");
- }
- }
- }
-
- @Override
- public void select(RoutingContext context) {
- context.addChildren(context.getMatchedRecipients());
- context.setSelectOnRetry(false);
- if (minOk > 0) {
- context.addConsumableError(ErrorCode.SERVICE_OOS);
- }
- }
-
- @Override
- public void merge(RoutingContext context) {
- if (minOk > 0) {
- Set<Integer> oosReplies = new HashSet<Integer>();
- int idx = 0;
- for (RoutingNodeIterator it = context.getChildIterator();
- it.isValid(); it.next())
- {
- Reply ref = it.getReplyRef();
- if (ref.hasErrors() && DocumentProtocol.hasOnlyErrorsOfType(ref, ErrorCode.SERVICE_OOS)) {
- oosReplies.add(idx);
- }
- ++idx;
- }
- if (context.getNumChildren() - oosReplies.size() >= minOk) {
- DocumentProtocol.merge(context, oosReplies);
- return;
- }
- }
- DocumentProtocol.merge(context);
- }
-
- @Override
- public void destroy() {
- // empty
- }
-
- @Override
- public MetricSet getMetrics() {
- return null;
- }
-}
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/PolicyTestCase.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/PolicyTestCase.java
index ff237d46b90..fba2edf5cfd 100755
--- a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/PolicyTestCase.java
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/test/PolicyTestCase.java
@@ -12,8 +12,6 @@ import com.yahoo.messagebus.Error;
import com.yahoo.messagebus.network.rpc.test.TestServer;
import com.yahoo.messagebus.routing.*;
import com.yahoo.messagebus.test.Receptor;
-import com.yahoo.vdslib.DocumentList;
-import com.yahoo.vdslib.Entry;
import org.junit.Before;
import org.junit.Test;
@@ -60,12 +58,6 @@ public class PolicyTestCase {
policy = new DocumentProtocol(manager).createPolicy("RoundRobin", null);
assertTrue(policy instanceof RoundRobinPolicy);
- policy = new DocumentProtocol(manager).createPolicy("SearchRow", null);
- assertTrue(policy instanceof SearchRowPolicy);
-
- policy = new DocumentProtocol(manager).createPolicy("SearchColumn", null);
- assertTrue(policy instanceof SearchColumnPolicy);
-
policy = new DocumentProtocol(manager).createPolicy("SubsetService", null);
assertTrue(policy instanceof SubsetServicePolicy);
@@ -330,125 +322,6 @@ public class PolicyTestCase {
}
@Test
- public void testSearchRow() {
- PolicyTestFrame frame = new PolicyTestFrame(manager);
- frame.setMessage(new PutDocumentMessage(new DocumentPut(new Document(manager.getDocumentType("testdoc"),
- new DocumentId("doc:scheme:")))));
- frame.setHop(new HopSpec("test", "[SearchRow]").addRecipient("foo"));
- frame.assertMergeOneReply("foo");
- frame.setHop(new HopSpec("test", "[SearchRow]").addRecipient("foo").addRecipient("bar"));
- frame.assertMergeTwoReplies("foo", "bar");
-
- frame.setHop(new HopSpec("test", "[SearchRow:1]").addRecipient("foo"));
- Map<String, Integer> replies = new HashMap<>();
- replies.put("foo", ErrorCode.SERVICE_OOS);
- frame.assertMergeError(replies, Arrays.asList(ErrorCode.SERVICE_OOS));
-
- frame.setHop(new HopSpec("test", "[SearchRow:1]").addRecipient("foo").addRecipient("bar"));
- replies.put("foo", ErrorCode.SERVICE_OOS);
- replies.put("bar", ErrorCode.NONE);
- frame.assertMergeOk(replies, Arrays.asList("bar"));
-
- replies.put("foo", ErrorCode.SERVICE_OOS);
- replies.put("bar", ErrorCode.SERVICE_OOS);
- frame.assertMergeError(replies, Arrays.asList(ErrorCode.SERVICE_OOS, ErrorCode.SERVICE_OOS));
-
- frame.setHop(new HopSpec("test", "[SearchRow:1]").addRecipient("foo").addRecipient("bar").addRecipient("baz"));
- replies.put("foo", ErrorCode.SERVICE_OOS);
- replies.put("bar", ErrorCode.NONE);
- replies.put("baz", ErrorCode.NONE);
- frame.assertMergeOk(replies, Arrays.asList("bar", "baz"));
-
- replies.put("foo", ErrorCode.SERVICE_OOS);
- replies.put("bar", ErrorCode.SERVICE_OOS);
- replies.put("baz", ErrorCode.NONE);
- frame.assertMergeOk(replies, Arrays.asList("baz"));
-
- replies.put("foo", ErrorCode.SERVICE_OOS);
- replies.put("bar", ErrorCode.SERVICE_OOS);
- replies.put("baz", ErrorCode.SERVICE_OOS);
- frame.assertMergeError(replies,
- Arrays.asList(ErrorCode.SERVICE_OOS, ErrorCode.SERVICE_OOS, ErrorCode.SERVICE_OOS));
-
- frame.setHop(new HopSpec("test", "[SearchRow:2]").addRecipient("foo").addRecipient("bar").addRecipient("baz"));
- replies.put("foo", ErrorCode.SERVICE_OOS);
- replies.put("bar", ErrorCode.NONE);
- replies.put("baz", ErrorCode.NONE);
- frame.assertMergeOk(replies, Arrays.asList("bar", "baz"));
-
- replies.put("foo", ErrorCode.SERVICE_OOS);
- replies.put("bar", ErrorCode.SERVICE_OOS);
- replies.put("baz", ErrorCode.NONE);
- frame.assertMergeError(replies, Arrays.asList(ErrorCode.SERVICE_OOS, ErrorCode.SERVICE_OOS));
-
- replies.put("foo", ErrorCode.SERVICE_OOS);
- replies.put("bar", ErrorCode.SERVICE_OOS);
- replies.put("baz", ErrorCode.SERVICE_OOS);
- frame.assertMergeError(replies,
- Arrays.asList(ErrorCode.SERVICE_OOS, ErrorCode.SERVICE_OOS, ErrorCode.SERVICE_OOS));
-
- frame.destroy();
- }
-
- @Test
- public void testSearchRowMerge() {
- PolicyTestFrame frame = new PolicyTestFrame(manager);
- frame.setHop(new HopSpec("test", "[SearchRow]").addRecipient("foo"));
- tryWasFound(frame, 1, 0x0, false);
- tryWasFound(frame, 1, 0x1, true);
-
- frame.setHop(new HopSpec("test", "[SearchRow]").addRecipient("foo").addRecipient("bar"));
- tryWasFound(frame, 2, 0x0, false);
- tryWasFound(frame, 2, 0x1, true);
- tryWasFound(frame, 2, 0x2, true);
- tryWasFound(frame, 2, 0x3, true);
-
- frame.setHop(new HopSpec("test", "[SearchRow]").addRecipient("foo").addRecipient("bar").addRecipient("baz"));
- tryWasFound(frame, 3, 0x0, false);
- tryWasFound(frame, 3, 0x1, true);
- tryWasFound(frame, 3, 0x2, true);
- tryWasFound(frame, 3, 0x3, true);
- tryWasFound(frame, 3, 0x4, true);
- tryWasFound(frame, 3, 0x5, true);
- tryWasFound(frame, 3, 0x6, true);
- tryWasFound(frame, 3, 0x7, true);
- frame.destroy();
- }
-
- private void tryWasFound(PolicyTestFrame frame, int expectedRecipients,
- int foundMask, boolean expectedFound)
- {
- {
- frame.setMessage(new RemoveDocumentMessage(new DocumentId("doc:scheme:69")));
- List<RoutingNode> selected = frame.select(expectedRecipients);
- for (int i = 0, len = selected.size(); i < len; ++i) {
- RemoveDocumentReply reply = new RemoveDocumentReply();
- reply.setWasFound(((1 << i) & foundMask) != 0);
- selected.get(i).handleReply(reply);
- }
- Reply reply = frame.getReceptor().getReply(TIMEOUT);
- assertNotNull(reply);
- assertEquals(DocumentProtocol.REPLY_REMOVEDOCUMENT, reply.getType());
- assertEquals(expectedFound, ((RemoveDocumentReply)reply).wasFound());
- }
- {
- DocumentUpdate upd = new DocumentUpdate(manager.getDocumentType("testdoc"),
- new DocumentId("doc:scheme:"));
- frame.setMessage(new UpdateDocumentMessage(upd));
- List<RoutingNode> selected = frame.select(expectedRecipients);
- for (int i = 0, len = selected.size(); i < len; ++i) {
- UpdateDocumentReply reply = new UpdateDocumentReply();
- reply.setWasFound(((1 << i) & foundMask) != 0);
- selected.get(i).handleReply(reply);
- }
- Reply reply = frame.getReceptor().getReply(TIMEOUT);
- assertNotNull(reply);
- assertEquals(DocumentProtocol.REPLY_UPDATEDOCUMENT, reply.getType());
- assertEquals(expectedFound, ((UpdateDocumentReply)reply).wasFound());
- }
- }
-
- @Test
public void multipleGetRepliesAreMergedToFoundDocument() {
PolicyTestFrame frame = new PolicyTestFrame(manager);
frame.setHop(new HopSpec("test", getDocumentRouteSelectorRawConfig())
@@ -483,74 +356,6 @@ public class PolicyTestCase {
"route[1].feed \"myfeed\"\n]";
}
- @Test
- public void testSearchColumn() {
- PolicyTestFrame frame = new PolicyTestFrame(manager);
- frame.setHop(new HopSpec("test", "[SearchColumn]")
- .addRecipient("c0").addRecipient("c1")
- .addRecipient("c2").addRecipient("c3"));
-
- // Test hash distribution.
- assertDistribution(frame, "doc:ns:3", "c0");
- assertDistribution(frame, "doc:ns:18", "c1");
- assertDistribution(frame, "doc:ns:0", "c2");
- assertDistribution(frame, "doc:ns:4", "c3");
-
- assertDistribution(frame, "userdoc:ns:49152:0", "c0");
- assertDistribution(frame, "userdoc:ns:49152:1", "c0");
- assertDistribution(frame, "userdoc:ns:16384:2", "c1");
- assertDistribution(frame, "userdoc:ns:16384:3", "c1");
- assertDistribution(frame, "userdoc:ns:5461:4", "c2");
- assertDistribution(frame, "userdoc:ns:5461:5", "c2");
- assertDistribution(frame, "userdoc:ns:0:6", "c3");
- assertDistribution(frame, "userdoc:ns:0:7", "c3");
-
- assertDistribution(frame, "groupdoc:ns:0:0", "c0");
- assertDistribution(frame, "groupdoc:ns:0:1", "c0");
- assertDistribution(frame, "groupdoc:ns:4:2", "c1");
- assertDistribution(frame, "groupdoc:ns:4:3", "c1");
- assertDistribution(frame, "groupdoc:ns:2:4", "c2");
- assertDistribution(frame, "groupdoc:ns:2:5", "c2");
- assertDistribution(frame, "groupdoc:ns:7:6", "c3");
- assertDistribution(frame, "groupdoc:ns:7:7", "c3");
-
- // Test routing based on message type.
- Message put = new PutDocumentMessage(new DocumentPut(new Document(manager.getDocumentType("testdoc"),
- new DocumentId("doc:scheme:"))));
- frame.setHop(new HopSpec("test", "[SearchColumn]").addRecipient("c0").addRecipient("c1"));
- frame.setMessage(put);
- frame.assertMergeOneReply("c0");
-
- // Test allowed bad parts.
- frame.setHop(new HopSpec("test", "[SearchColumn:1]").addRecipient("c0"));
- frame.setMessage(put);
- Map<String, Integer> replies = new HashMap<>();
- replies.put("c0", ErrorCode.SERVICE_OOS);
- frame.assertMergeOk(replies, null);
-
- replies.put("c0", ErrorCode.SERVICE_OOS);
- frame.assertMergeOk(replies, null);
-
- frame.setHop(new HopSpec("test", "[SearchColumn:1]").addRecipient("c0").addRecipient("c1"));
- frame.setMessage(put);
- replies.put("c0", ErrorCode.SERVICE_OOS);
- frame.assertMergeOk(replies, null);
-
- frame.setHop(new HopSpec("test", "[SearchColumn:1]").addRecipient("c0").addRecipient("c1").addRecipient("c2"));
- frame.setMessage(put);
- replies.clear();
- replies.put("c0", ErrorCode.SERVICE_OOS);
- frame.assertMergeOk(replies, null);
-
- frame.setHop(new HopSpec("test", "[SearchColumn:2]").addRecipient("c0").addRecipient("c1").addRecipient("c2"));
- frame.setMessage(put);
- replies.clear();
- replies.put("c0", ErrorCode.SERVICE_OOS);
- frame.assertMergeOk(replies, null);
-
- frame.destroy();
- }
-
private void assertDistribution(PolicyTestFrame frame, String id, String expected) {
frame.setMessage(new PutDocumentMessage(new DocumentPut(new Document(manager.getDocumentType("testdoc"),
new DocumentId(id)))));
diff --git a/documentapi/src/tests/messagebus/messagebus_test.cpp b/documentapi/src/tests/messagebus/messagebus_test.cpp
index d8920b0577b..919dc6b6570 100644
--- a/documentapi/src/tests/messagebus/messagebus_test.cpp
+++ b/documentapi/src/tests/messagebus/messagebus_test.cpp
@@ -92,13 +92,7 @@ void Test::testProtocol() {
DocumentProtocol protocol(set, _repo);
EXPECT_TRUE(protocol.getName() == "document");
- IRoutingPolicy::UP policy = protocol.createPolicy(string("SearchRow"),string(""));
- EXPECT_TRUE(policy.get() != NULL);
-
- policy = protocol.createPolicy(string("SearchColumn"),string(""));
- EXPECT_TRUE(policy.get() != NULL);
-
- policy = protocol.createPolicy(string("DocumentRouteSelector"), string("file:documentrouteselectorpolicy.cfg"));
+ IRoutingPolicy::UP policy = protocol.createPolicy(string("DocumentRouteSelector"), string("file:documentrouteselectorpolicy.cfg"));
EXPECT_TRUE(policy.get() != NULL);
policy = protocol.createPolicy(string(""),string(""));
diff --git a/documentapi/src/tests/policies/policies_test.cpp b/documentapi/src/tests/policies/policies_test.cpp
index 9d38247ebfd..58db3079631 100644
--- a/documentapi/src/tests/policies/policies_test.cpp
+++ b/documentapi/src/tests/policies/policies_test.cpp
@@ -10,8 +10,6 @@
#include <vespa/documentapi/messagebus/policies/loadbalancerpolicy.h>
#include <vespa/documentapi/messagebus/policies/localservicepolicy.h>
#include <vespa/documentapi/messagebus/policies/roundrobinpolicy.h>
-#include <vespa/documentapi/messagebus/policies/searchcolumnpolicy.h>
-#include <vespa/documentapi/messagebus/policies/searchrowpolicy.h>
#include <vespa/documentapi/messagebus/policies/storagepolicy.h>
#include <vespa/documentapi/messagebus/policies/subsetservicepolicy.h>
#include <vespa/messagebus/emptyreply.h>
@@ -39,6 +37,8 @@ using document::readDocumenttypesConfig;
using slobrok::api::IMirrorAPI;
using namespace documentapi;
using vespalib::make_string;
+using std::make_unique;
+using std::make_shared;
class Test : public vespalib::TestApp {
private:
@@ -48,16 +48,12 @@ private:
private:
bool trySelect(TestFrame &frame, uint32_t numSelects, const std::vector<string> &expected);
- bool tryDistribution(TestFrame &frame, const string &id, const string &expected);
- void tryWasFound(TestFrame &frame, uint32_t expectedRecipients, uint32_t foundMask, bool expectedFound);
- void setupExternPolicy(TestFrame &frame, mbus::Slobrok &slobrok, const string &pattern,
- int32_t numEntries = -1);
+ void setupExternPolicy(TestFrame &frame, mbus::Slobrok &slobrok, const string &pattern, int32_t numEntries = -1);
StoragePolicy &setupStoragePolicy(TestFrame &frame, const string &param,
const string &pattern = "", int32_t numEntries = -1);
bool isErrorPolicy(const string &name, const string &param);
void assertMirrorReady(const IMirrorAPI &mirror);
- void assertMirrorContains(const IMirrorAPI &mirror, const string &pattern,
- uint32_t numEntries);
+ void assertMirrorContains(const IMirrorAPI &mirror, const string &pattern, uint32_t numEntries);
mbus::Message::UP newPutDocumentMessage(const string &documentId);
public:
@@ -75,9 +71,6 @@ public:
void testProtocol();
void testRoundRobin();
void testRoundRobinCache();
- void testSearchColumn();
- void testSearchRow();
- void testSearchRowMerge();
void multipleGetRepliesAreMergedToFoundDocument();
void testSubsetService();
void testSubsetServiceCache();
@@ -117,9 +110,6 @@ Test::Main() {
testLocalServiceCache(); TEST_FLUSH();
testRoundRobin(); TEST_FLUSH();
testRoundRobinCache(); TEST_FLUSH();
- testSearchColumn(); TEST_FLUSH();
- testSearchRow(); TEST_FLUSH();
- testSearchRowMerge(); TEST_FLUSH();
testSubsetService(); TEST_FLUSH();
testSubsetServiceCache(); TEST_FLUSH();
@@ -144,43 +134,34 @@ Test::testProtocol()
mbus::IProtocol::SP protocol(new DocumentProtocol(_loadTypes, _repo));
mbus::IRoutingPolicy::UP policy = protocol->createPolicy("AND", "");
- ASSERT_TRUE(dynamic_cast<ANDPolicy*>(policy.get()) != NULL);
+ ASSERT_TRUE(dynamic_cast<ANDPolicy*>(policy.get()) != nullptr);
policy = protocol->createPolicy("DocumentRouteSelector", "raw:route[0]\n");
- ASSERT_TRUE(dynamic_cast<DocumentRouteSelectorPolicy*>(policy.get()) != NULL);
+ ASSERT_TRUE(dynamic_cast<DocumentRouteSelectorPolicy*>(policy.get()) != nullptr);
policy = protocol->createPolicy("Extern", "foo;bar/baz");
- ASSERT_TRUE(dynamic_cast<ExternPolicy*>(policy.get()) != NULL);
+ ASSERT_TRUE(dynamic_cast<ExternPolicy*>(policy.get()) != nullptr);
policy = protocol->createPolicy("LoadBalancer",
"cluster=docproc/cluster.default;"
"session=chain.default;syncinit");
- ASSERT_TRUE(dynamic_cast<LoadBalancerPolicy*>(policy.get()) != NULL);
+ ASSERT_TRUE(dynamic_cast<LoadBalancerPolicy*>(policy.get()) != nullptr);
policy = protocol->createPolicy("LocalService", "");
- ASSERT_TRUE(dynamic_cast<LocalServicePolicy*>(policy.get()) != NULL);
+ ASSERT_TRUE(dynamic_cast<LocalServicePolicy*>(policy.get()) != nullptr);
policy = protocol->createPolicy("RoundRobin", "");
- ASSERT_TRUE(dynamic_cast<RoundRobinPolicy*>(policy.get()) != NULL);
-
- policy = protocol->createPolicy("SearchRow", "");
- ASSERT_TRUE(dynamic_cast<SearchRowPolicy*>(policy.get()) != NULL);
-
- policy = protocol->createPolicy("SearchColumn", "");
- ASSERT_TRUE(dynamic_cast<SearchColumnPolicy*>(policy.get()) != NULL);
+ ASSERT_TRUE(dynamic_cast<RoundRobinPolicy*>(policy.get()) != nullptr);
policy = protocol->createPolicy("SubsetService", "");
- ASSERT_TRUE(dynamic_cast<SubsetServicePolicy*>(policy.get()) != NULL);
+ ASSERT_TRUE(dynamic_cast<SubsetServicePolicy*>(policy.get()) != nullptr);
}
void
Test::testAND()
{
TestFrame frame(_repo);
- frame.setMessage(mbus::Message::UP(new PutDocumentMessage(
- document::Document::SP(
- new document::Document(*_docType,
- DocumentId("doc:scheme:"))))));
+ frame.setMessage(make_unique<PutDocumentMessage>(make_shared<Document>(*_docType, DocumentId("doc:scheme:"))));
frame.setHop(mbus::HopSpec("test", "[AND]")
.addRecipient("foo")
.addRecipient("bar"));
@@ -242,7 +223,7 @@ Test::requireThatExternPolicySelectsFromExternSlobrok()
lst.insert(leaf[0]->getRoute().toString());
leaf[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply()));
- ASSERT_TRUE(frame.getReceptor().getReply(600).get() != NULL);
+ ASSERT_TRUE(frame.getReceptor().getReply(600));
}
EXPECT_EQUAL(servers.size(), lst.size());
for (uint32_t i = 0; i < servers.size(); ++i) {
@@ -267,15 +248,14 @@ mbus::Message::UP
Test::newPutDocumentMessage(const string &documentId)
{
Document::SP doc(new Document(*_docType, DocumentId(documentId)));
- return mbus::Message::UP(new PutDocumentMessage(doc));
+ return make_unique<PutDocumentMessage>(doc);
}
void
Test::setupExternPolicy(TestFrame &frame, mbus::Slobrok &slobrok, const string &pattern,
int32_t numEntries)
{
- string param = vespalib::make_string("tcp/localhost:%d;%s",
- slobrok.port(), pattern.c_str());
+ string param = vespalib::make_string("tcp/localhost:%d;%s", slobrok.port(), pattern.c_str());
frame.setHop(mbus::HopSpec("test", vespalib::make_string("[Extern:%s]", param.c_str())));
mbus::MessageBus &mbus = frame.getMessageBus();
const mbus::HopBlueprint *hop = mbus.getRoutingTable(DocumentProtocol::NAME)->getHop("test");
@@ -340,7 +320,7 @@ Test::testExternSend()
mbus::DestinationSession::UP ds = dst.mb.createDestinationSession("session", true, dr);
// Send message from local node to remote cluster and resolve route there.
- mbus::Message::UP msg(new GetDocumentMessage(document::DocumentId("doc:scheme:"), 0));
+ mbus::Message::UP msg(new GetDocumentMessage(DocumentId("doc:scheme:"), 0));
msg->getTrace().setLevel(9);
msg->setRoute(mbus::Route::parse(vespalib::make_string("[Extern:tcp/localhost:%d;itr/session] default", slobrok.port())));
@@ -376,7 +356,7 @@ Test::testExternMultipleSlobroks()
std::make_shared<DocumentProtocol>(_loadTypes, _repo));
mbus::DestinationSession::UP ds = dst.mb.createDestinationSession("session", true, dr);
- mbus::Message::UP msg(new GetDocumentMessage(document::DocumentId("doc:scheme:"), 0));
+ mbus::Message::UP msg(new GetDocumentMessage(DocumentId("doc:scheme:"), 0));
msg->setRoute(mbus::Route::parse(vespalib::make_string("[Extern:%s;dst/session]", spec.c_str())));
ASSERT_TRUE(ss->send(std::move(msg)).isAccepted());
ASSERT_TRUE((msg = dr.getMessage(600)));
@@ -392,7 +372,7 @@ Test::testExternMultipleSlobroks()
std::make_shared<DocumentProtocol>(_loadTypes, _repo));
mbus::DestinationSession::UP ds = dst.mb.createDestinationSession("session", true, dr);
- mbus::Message::UP msg(new GetDocumentMessage(document::DocumentId("doc:scheme:"), 0));
+ mbus::Message::UP msg(new GetDocumentMessage(DocumentId("doc:scheme:"), 0));
msg->setRoute(mbus::Route::parse(vespalib::make_string("[Extern:%s;dst/session]", spec.c_str())));
ASSERT_TRUE(ss->send(std::move(msg)).isAccepted());
ASSERT_TRUE((msg = dr.getMessage(600)));
@@ -407,8 +387,7 @@ Test::testLocalService()
{
// Prepare message.
TestFrame frame(_repo, "docproc/cluster.default");
- frame.setMessage(mbus::Message::UP(new PutDocumentMessage(Document::SP(
- new Document(*_docType, DocumentId("doc:scheme:"))))));
+ frame.setMessage(make_unique<PutDocumentMessage>(make_shared<Document>(*_docType, DocumentId("doc:scheme:"))));
// Test select with proper address.
for (uint32_t i = 0; i < 10; ++i) {
@@ -424,7 +403,7 @@ Test::testLocalService()
lst.insert(leaf[0]->getRoute().toString());
leaf[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply()));
- ASSERT_TRUE(frame.getReceptor().getReply(600).get() != NULL);
+ ASSERT_TRUE(frame.getReceptor().getReply(600));
}
EXPECT_EQUAL(10u, lst.size());
@@ -437,7 +416,7 @@ Test::testLocalService()
lst.insert(leaf[0]->getRoute().toString());
leaf[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply()));
- ASSERT_TRUE(frame.getReceptor().getReply(600).get() != NULL);
+ ASSERT_TRUE(frame.getReceptor().getReply(600));
}
EXPECT_EQUAL(1u, lst.size());
EXPECT_EQUAL("docproc/cluster.default/*/chain.default", *lst.begin());
@@ -452,12 +431,12 @@ Test::testLocalServiceCache()
{
TestFrame fooFrame(_repo, "docproc/cluster.default");
mbus::HopSpec fooHop("foo", "docproc/cluster.default/[LocalService]/chain.foo");
- fooFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(document::DocumentId("doc:scheme:foo"))));
+ fooFrame.setMessage(make_unique<GetDocumentMessage>(DocumentId("doc:scheme:foo")));
fooFrame.setHop(fooHop);
TestFrame barFrame(fooFrame);
mbus::HopSpec barHop("test", "docproc/cluster.default/[LocalService]/chain.bar");
- barFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(document::DocumentId("doc:scheme:bar"))));
+ barFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(DocumentId("doc:scheme:bar"))));
barFrame.setHop(barHop);
fooFrame.getMessageBus().setupRouting(
@@ -480,8 +459,8 @@ Test::testLocalServiceCache()
barSelected[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply()));
fooSelected[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply()));
- ASSERT_TRUE(barFrame.getReceptor().getReply(600).get() != NULL);
- ASSERT_TRUE(fooFrame.getReceptor().getReply(600).get() != NULL);
+ ASSERT_TRUE(barFrame.getReceptor().getReply(600));
+ ASSERT_TRUE(fooFrame.getReceptor().getReply(600));
}
void
@@ -489,9 +468,7 @@ Test::testRoundRobin()
{
// Prepare message.
TestFrame frame(_repo, "docproc/cluster.default");
- frame.setMessage(mbus::Message::UP(new PutDocumentMessage(Document::SP(
- new Document(*_docType,
- DocumentId("doc:scheme:"))))));
+ frame.setMessage(make_unique<PutDocumentMessage>(make_shared<Document>(*_docType, DocumentId("doc:scheme:"))));
// Test select with proper address.
for (uint32_t i = 0; i < 10; ++i) {
@@ -530,13 +507,13 @@ Test::testRoundRobinCache()
TestFrame fooFrame(_repo, "docproc/cluster.default");
mbus::HopSpec fooHop("foo", "[RoundRobin]");
fooHop.addRecipient("docproc/cluster.default/0/chain.foo");
- fooFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(document::DocumentId("doc:scheme:foo"))));
+ fooFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(DocumentId("doc:scheme:foo"))));
fooFrame.setHop(fooHop);
TestFrame barFrame(fooFrame);
mbus::HopSpec barHop("bar", "[RoundRobin]");
barHop.addRecipient("docproc/cluster.default/0/chain.bar");
- barFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(document::DocumentId("doc:scheme:bar"))));
+ barFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(DocumentId("doc:scheme:bar"))));
barFrame.setHop(barHop);
fooFrame.getMessageBus().setupRouting(
@@ -559,155 +536,8 @@ Test::testRoundRobinCache()
barSelected[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply()));
fooSelected[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply()));
- ASSERT_TRUE(barFrame.getReceptor().getReply(600).get() != NULL);
- ASSERT_TRUE(fooFrame.getReceptor().getReply(600).get() != NULL);
-}
-
-void
-Test::testSearchRow()
-{
- TestFrame frame(_repo);
- frame.setMessage(mbus::Message::UP(new PutDocumentMessage(Document::SP(
- new Document(*_docType,
- DocumentId("doc:scheme:"))))));
- frame.setHop(mbus::HopSpec("test", "[SearchRow]")
- .addRecipient("foo"));
- EXPECT_TRUE(frame.testMergeOneReply("foo"));
- frame.setHop(mbus::HopSpec("test", "[SearchRow]")
- .addRecipient("foo")
- .addRecipient("bar"));
- EXPECT_TRUE(frame.testMergeTwoReplies("foo", "bar"));
-
- frame.setHop(mbus::HopSpec("test", "[SearchRow:1]")
- .addRecipient("foo"));
- TestFrame::ReplyMap replies;
- replies["foo"] = mbus::ErrorCode::SERVICE_OOS;
- EXPECT_TRUE(frame.testMergeError(replies, UIntList().add(mbus::ErrorCode::SERVICE_OOS)));
-
- frame.setHop(mbus::HopSpec("test", "[SearchRow:1]")
- .addRecipient("foo")
- .addRecipient("bar"));
- replies["foo"] = mbus::ErrorCode::SERVICE_OOS;
- replies["bar"] = mbus::ErrorCode::NONE;
- EXPECT_TRUE(frame.testMergeOk(replies, StringList().add("bar")));
-
- replies["foo"] = mbus::ErrorCode::SERVICE_OOS;
- replies["bar"] = mbus::ErrorCode::SERVICE_OOS;
- EXPECT_TRUE(frame.testMergeError(replies, UIntList()
- .add(mbus::ErrorCode::SERVICE_OOS)
- .add(mbus::ErrorCode::SERVICE_OOS)));
-
- frame.setHop(mbus::HopSpec("test", "[SearchRow:1]")
- .addRecipient("foo")
- .addRecipient("bar")
- .addRecipient("baz"));
- replies["foo"] = mbus::ErrorCode::SERVICE_OOS;
- replies["bar"] = mbus::ErrorCode::NONE;
- replies["baz"] = mbus::ErrorCode::NONE;
- EXPECT_TRUE(frame.testMergeOk(replies, StringList().add("bar").add("baz")));
-
- replies["foo"] = mbus::ErrorCode::SERVICE_OOS;
- replies["bar"] = mbus::ErrorCode::SERVICE_OOS;
- replies["baz"] = mbus::ErrorCode::NONE;
- EXPECT_TRUE(frame.testMergeOk(replies, StringList().add("baz")));
-
- replies["foo"] = mbus::ErrorCode::SERVICE_OOS;
- replies["bar"] = mbus::ErrorCode::SERVICE_OOS;
- replies["baz"] = mbus::ErrorCode::SERVICE_OOS;
- EXPECT_TRUE(frame.testMergeError(replies, UIntList()
- .add(mbus::ErrorCode::SERVICE_OOS)
- .add(mbus::ErrorCode::SERVICE_OOS)
- .add(mbus::ErrorCode::SERVICE_OOS)));
-
- frame.setHop(mbus::HopSpec("test", "[SearchRow:2]")
- .addRecipient("foo")
- .addRecipient("bar")
- .addRecipient("baz"));
- replies["foo"] = mbus::ErrorCode::SERVICE_OOS;
- replies["bar"] = mbus::ErrorCode::NONE;
- replies["baz"] = mbus::ErrorCode::NONE;
- EXPECT_TRUE(frame.testMergeOk(replies, StringList().add("bar").add("baz")));
-
- replies["foo"] = mbus::ErrorCode::SERVICE_OOS;
- replies["bar"] = mbus::ErrorCode::SERVICE_OOS;
- replies["baz"] = mbus::ErrorCode::NONE;
- EXPECT_TRUE(frame.testMergeError(replies, UIntList()
- .add(mbus::ErrorCode::SERVICE_OOS)
- .add(mbus::ErrorCode::SERVICE_OOS)));
-
- replies["foo"] = mbus::ErrorCode::SERVICE_OOS;
- replies["bar"] = mbus::ErrorCode::SERVICE_OOS;
- replies["baz"] = mbus::ErrorCode::SERVICE_OOS;
- EXPECT_TRUE(frame.testMergeError(replies, UIntList()
- .add(mbus::ErrorCode::SERVICE_OOS)
- .add(mbus::ErrorCode::SERVICE_OOS)
- .add(mbus::ErrorCode::SERVICE_OOS)));
-}
-
-void
-Test::testSearchRowMerge()
-{
- TestFrame frame(_repo);
- frame.setHop(mbus::HopSpec("test", "[SearchRow]")
- .addRecipient("foo"));
- tryWasFound(frame, 1, 0x0, false);
- tryWasFound(frame, 1, 0x1, true);
-
- frame.setHop(mbus::HopSpec("test", "[SearchRow]")
- .addRecipient("foo")
- .addRecipient("bar"));
- tryWasFound(frame, 2, 0x0, false);
- tryWasFound(frame, 2, 0x1, true);
- tryWasFound(frame, 2, 0x2, true);
- tryWasFound(frame, 2, 0x3, true);
-
- frame.setHop(mbus::HopSpec("test", "[SearchRow]")
- .addRecipient("foo")
- .addRecipient("bar")
- .addRecipient("baz"));
- tryWasFound(frame, 3, 0x0, false);
- tryWasFound(frame, 3, 0x1, true);
- tryWasFound(frame, 3, 0x2, true);
- tryWasFound(frame, 3, 0x3, true);
- tryWasFound(frame, 3, 0x4, true);
- tryWasFound(frame, 3, 0x5, true);
- tryWasFound(frame, 3, 0x6, true);
- tryWasFound(frame, 3, 0x7, true);
-}
-
-void
-Test::tryWasFound(TestFrame &frame, uint32_t expectedRecipients,
- uint32_t foundMask, bool expectedFound)
-{
- {
- frame.setMessage(mbus::Message::UP(new RemoveDocumentMessage(DocumentId("doc:scheme:69"))));
- std::vector<mbus::RoutingNode*> selected;
- EXPECT_TRUE(frame.select(selected, expectedRecipients));
- for (uint32_t i = 0, len = selected.size(); i < len; ++i) {
- mbus::Reply::UP reply(new RemoveDocumentReply());
- static_cast<RemoveDocumentReply&>(*reply).setWasFound((1 << i) & foundMask);
- selected[i]->handleReply(std::move(reply));
- }
- mbus::Reply::UP reply = frame.getReceptor().getReply(600);
- EXPECT_TRUE(reply.get() != NULL);
- EXPECT_EQUAL((uint32_t)DocumentProtocol::REPLY_REMOVEDOCUMENT, reply->getType());
- EXPECT_EQUAL(expectedFound, static_cast<RemoveDocumentReply&>(*reply).wasFound());
- }
- {
- DocumentUpdate::SP upd(new DocumentUpdate(*_docType, DocumentId("doc:scheme:")));
- frame.setMessage(mbus::Message::UP(new UpdateDocumentMessage(upd)));
- std::vector<mbus::RoutingNode*> selected;
- EXPECT_TRUE(frame.select(selected, expectedRecipients));
- for (uint32_t i = 0, len = selected.size(); i < len; ++i) {
- mbus::Reply::UP reply(new UpdateDocumentReply());
- static_cast<UpdateDocumentReply&>(*reply).setWasFound((1 << i) & foundMask);
- selected[i]->handleReply(std::move(reply));
- }
- mbus::Reply::UP reply = frame.getReceptor().getReply(600);
- EXPECT_TRUE(reply.get() != NULL);
- EXPECT_EQUAL((uint32_t)DocumentProtocol::REPLY_UPDATEDOCUMENT, reply->getType());
- EXPECT_EQUAL(expectedFound, static_cast<UpdateDocumentReply&>(*reply).wasFound());
- }
+ ASSERT_TRUE(barFrame.getReceptor().getReply(600));
+ ASSERT_TRUE(fooFrame.getReceptor().getReply(600));
}
void
@@ -724,11 +554,11 @@ Test::multipleGetRepliesAreMergedToFoundDocument()
"route[1].feed \"myfeed\"\n]")
.addRecipient("foo")
.addRecipient("bar"));
- frame.setMessage(mbus::Message::UP(new GetDocumentMessage(DocumentId("doc:scheme:yarn"))));
+ frame.setMessage(make_unique<GetDocumentMessage>(DocumentId("doc:scheme:yarn")));
std::vector<mbus::RoutingNode*> selected;
EXPECT_TRUE(frame.select(selected, 2));
for (uint32_t i = 0, len = selected.size(); i < len; ++i) {
- document::Document::SP doc;
+ Document::SP doc;
if (i == 0) {
doc.reset(new Document(*_docType, DocumentId("doc:scheme:yarn")));
doc->setLastModified(123456ULL);
@@ -737,62 +567,12 @@ Test::multipleGetRepliesAreMergedToFoundDocument()
selected[i]->handleReply(std::move(reply));
}
mbus::Reply::UP reply = frame.getReceptor().getReply(600);
- EXPECT_TRUE(reply.get() != NULL);
- EXPECT_EQUAL(static_cast<uint32_t>(DocumentProtocol::REPLY_GETDOCUMENT),
- reply->getType());
+ EXPECT_TRUE(reply);
+ EXPECT_EQUAL(static_cast<uint32_t>(DocumentProtocol::REPLY_GETDOCUMENT), reply->getType());
EXPECT_EQUAL(123456ULL, static_cast<GetDocumentReply&>(*reply).getLastModified());
}
void
-Test::testSearchColumn()
-{
- TestFrame frame(_repo);
- frame.setHop(mbus::HopSpec("test", "[SearchColumn]")
- .addRecipient("c0")
- .addRecipient("c1")
- .addRecipient("c2")
- .addRecipient("c3"));
-
- // Test hash distribution.
- EXPECT_TRUE(tryDistribution(frame, "doc:ns:3", "c0"));
- EXPECT_TRUE(tryDistribution(frame, "doc:ns:18", "c1"));
- EXPECT_TRUE(tryDistribution(frame, "doc:ns:0", "c2"));
- EXPECT_TRUE(tryDistribution(frame, "doc:ns:4", "c3"));
-
- EXPECT_TRUE(tryDistribution(frame, "userdoc:ns:49152:0", "c0"));
- EXPECT_TRUE(tryDistribution(frame, "userdoc:ns:49152:1", "c0"));
- EXPECT_TRUE(tryDistribution(frame, "userdoc:ns:16384:2", "c1"));
- EXPECT_TRUE(tryDistribution(frame, "userdoc:ns:16384:3", "c1"));
- EXPECT_TRUE(tryDistribution(frame, "userdoc:ns:5461:4", "c2"));
- EXPECT_TRUE(tryDistribution(frame, "userdoc:ns:5461:5", "c2"));
- EXPECT_TRUE(tryDistribution(frame, "userdoc:ns:0:6", "c3"));
- EXPECT_TRUE(tryDistribution(frame, "userdoc:ns:0:7", "c3"));
-
- EXPECT_TRUE(tryDistribution(frame, "groupdoc:ns:0:0", "c0"));
- EXPECT_TRUE(tryDistribution(frame, "groupdoc:ns:0:1", "c0"));
- EXPECT_TRUE(tryDistribution(frame, "groupdoc:ns:4:2", "c1"));
- EXPECT_TRUE(tryDistribution(frame, "groupdoc:ns:4:3", "c1"));
- EXPECT_TRUE(tryDistribution(frame, "groupdoc:ns:2:4", "c2"));
- EXPECT_TRUE(tryDistribution(frame, "groupdoc:ns:2:5", "c2"));
- EXPECT_TRUE(tryDistribution(frame, "groupdoc:ns:7:6", "c3"));
- EXPECT_TRUE(tryDistribution(frame, "groupdoc:ns:7:7", "c3"));
-
- // Test routing based on message type.
- mbus::Message::UP put(new PutDocumentMessage(Document::SP(
- new Document(*_docType,
- DocumentId("doc:scheme:")))));
-}
-
-bool
-Test::tryDistribution(TestFrame &frame, const string &id, const string &expected)
-{
- Document::SP doc(new Document(*_docType, DocumentId(id)));
- mbus::Message::UP msg(new PutDocumentMessage(doc));
- frame.setMessage(std::move(msg));
- return frame.testSelect(StringList().add(expected));
-}
-
-void
Test::testDocumentRouteSelector()
{
// Test policy with usage safeguard.
@@ -804,13 +584,13 @@ Test::testDocumentRouteSelector()
"route[0].feed \"baz\"\n";
{
DocumentProtocol protocol(_loadTypes, _repo, okConfig);
- EXPECT_TRUE(dynamic_cast<DocumentRouteSelectorPolicy*>(protocol.createPolicy("DocumentRouteSelector", "").get()) != NULL);
- EXPECT_TRUE(dynamic_cast<ErrorPolicy*>(protocol.createPolicy("DocumentRouteSelector", errConfig).get()) != NULL);
+ EXPECT_TRUE(dynamic_cast<DocumentRouteSelectorPolicy*>(protocol.createPolicy("DocumentRouteSelector", "").get()) != nullptr);
+ EXPECT_TRUE(dynamic_cast<ErrorPolicy*>(protocol.createPolicy("DocumentRouteSelector", errConfig).get()) != nullptr);
}
{
DocumentProtocol protocol(_loadTypes, _repo, errConfig);
- EXPECT_TRUE(dynamic_cast<ErrorPolicy*>(protocol.createPolicy("DocumentRouteSelector", "").get()) != NULL);
- EXPECT_TRUE(dynamic_cast<DocumentRouteSelectorPolicy*>(protocol.createPolicy("DocumentRouteSelector", okConfig).get()) != NULL);
+ EXPECT_TRUE(dynamic_cast<ErrorPolicy*>(protocol.createPolicy("DocumentRouteSelector", "").get()) != nullptr);
+ EXPECT_TRUE(dynamic_cast<DocumentRouteSelectorPolicy*>(protocol.createPolicy("DocumentRouteSelector", okConfig).get()) != nullptr);
}
// Test policy with proper config.
@@ -826,19 +606,17 @@ Test::testDocumentRouteSelector()
.addRecipient("foo")
.addRecipient("bar"));
- frame.setMessage(mbus::Message::UP(new GetDocumentMessage(document::DocumentId("doc:scheme:"), 0)));
+ frame.setMessage(make_unique<GetDocumentMessage>(DocumentId("doc:scheme:"), 0));
EXPECT_TRUE(frame.testSelect(StringList().add("foo").add("bar")));
- mbus::Message::UP put(new PutDocumentMessage(Document::SP(
- new Document(*_docType,
- DocumentId("doc:scheme:")))));
+ mbus::Message::UP put = make_unique<PutDocumentMessage>(make_shared<Document>(*_docType, DocumentId("doc:scheme:")));
frame.setMessage(std::move(put));
EXPECT_TRUE(frame.testSelect( StringList().add("foo")));
{
vdslib::OperationList opList;
- document::DocumentId id("doc:scheme:");
+ DocumentId id("doc:scheme:");
Document::UP doc(new Document(*_docType, id));
opList.addPut(std::move(doc));
@@ -849,24 +627,20 @@ Test::testDocumentRouteSelector()
{
vdslib::OperationList opList;
- document::DocumentId id("doc:scheme:");
+ DocumentId id("doc:scheme:");
Document::UP doc(new Document(*_repo->getDocumentType("other"), id));
opList.addPut(std::move(doc));
document::BucketIdFactory factory;
- put = frame.setMessage(MultiOperationMessage::create(_repo,
- factory.getBucketId(id), opList));
+ put = frame.setMessage(MultiOperationMessage::create(_repo, factory.getBucketId(id), opList));
EXPECT_TRUE(frame.testSelect(StringList().add("bar")));
}
- frame.setMessage(mbus::Message::UP(new RemoveDocumentMessage(document::DocumentId("doc:scheme:"))));
+ frame.setMessage(mbus::Message::UP(new RemoveDocumentMessage(DocumentId("doc:scheme:"))));
EXPECT_TRUE(frame.testSelect(StringList().add("foo").add("bar")));
- frame.setMessage(mbus::Message::UP(new UpdateDocumentMessage(
- document::DocumentUpdate::SP(
- new document::DocumentUpdate(
- *_docType,
- DocumentId("doc:scheme:"))))));
+ frame.setMessage(make_unique<UpdateDocumentMessage>(
+ make_shared<DocumentUpdate>(*_docType, DocumentId("doc:scheme:"))));
EXPECT_TRUE(frame.testSelect(StringList().add("foo")));
frame.setMessage(std::move(put));
@@ -884,22 +658,17 @@ Test::testDocumentRouteSelectorIgnore()
"route[0].feed \"myfeed\"\n]")
.addRecipient("docproc/cluster.foo"));
- frame.setMessage(mbus::Message::UP(new PutDocumentMessage(
- document::Document::SP(
- new document::Document(*_docType,
- DocumentId("id:yarn:testdoc:n=1234:fluff"))))));
+ frame.setMessage(make_unique<PutDocumentMessage>(
+ make_shared<Document>(*_docType, DocumentId("id:yarn:testdoc:n=1234:fluff"))));
std::vector<mbus::RoutingNode*> leaf;
ASSERT_TRUE(frame.select(leaf, 0));
mbus::Reply::UP reply = frame.getReceptor().getReply(600);
- ASSERT_TRUE(reply.get() != NULL);
+ ASSERT_TRUE(reply);
EXPECT_EQUAL(uint32_t(DocumentProtocol::REPLY_DOCUMENTIGNORED), reply->getType());
EXPECT_EQUAL(0u, reply->getNumErrors());
- frame.setMessage(mbus::Message::UP(new UpdateDocumentMessage(
- document::DocumentUpdate::SP(
- new document::DocumentUpdate(
- *_docType,
- DocumentId("doc:scheme:"))))));
+ frame.setMessage(make_unique<UpdateDocumentMessage>(
+ make_shared<DocumentUpdate>(*_docType, DocumentId("doc:scheme:"))));
EXPECT_TRUE(frame.testSelect(StringList().add("docproc/cluster.foo")));
}
@@ -999,7 +768,7 @@ Test::requireThatStoragePolicyIsRandomWithoutState()
StoragePolicy &policy = setupStoragePolicy(
frame, param,
"storage/cluster.mycluster/distributor/*/default", 5);
- ASSERT_TRUE(policy.getSystemState() == NULL);
+ ASSERT_TRUE(policy.getSystemState() == nullptr);
std::set<string> lst;
for (uint32_t i = 0; i < 666; i++) {
@@ -1022,10 +791,8 @@ Test::setupStoragePolicy(TestFrame &frame, const string &param,
mbus::MessageBus &mbus = frame.getMessageBus();
const mbus::HopBlueprint *hop = mbus.getRoutingTable(DocumentProtocol::NAME)->getHop("test");
const mbus::PolicyDirective dir = static_cast<mbus::PolicyDirective&>(*hop->getDirective(0));
- StoragePolicy &policy = static_cast<StoragePolicy&>(*mbus.getRoutingPolicy(
- DocumentProtocol::NAME,
- dir.getName(),
- dir.getParam()));
+ StoragePolicy &policy = static_cast<StoragePolicy&>(*mbus.getRoutingPolicy(DocumentProtocol::NAME,
+ dir.getName(), dir.getParam()));
policy.initSynchronous();
assertMirrorReady(*policy.getMirror());
if (numEntries >= 0) {
@@ -1046,7 +813,7 @@ Test::requireThatStoragePolicyIsTargetedWithState()
mbus::TestServer *srv = new mbus::TestServer(
mbus::Identity(vespalib::make_string("storage/cluster.mycluster/distributor/%d", i)),
mbus::RoutingSpec(), slobrok,
- mbus::IProtocol::SP(new DocumentProtocol(_loadTypes, _repo)));
+ make_shared<DocumentProtocol>(_loadTypes, _repo));
servers.push_back(srv);
srv->net.registerSession("default");
}
@@ -1056,12 +823,12 @@ Test::requireThatStoragePolicyIsTargetedWithState()
StoragePolicy &policy = setupStoragePolicy(
frame, param,
"storage/cluster.mycluster/distributor/*/default", 5);
- ASSERT_TRUE(policy.getSystemState() == NULL);
+ ASSERT_TRUE(policy.getSystemState() == nullptr);
{
std::vector<mbus::RoutingNode*> leaf;
ASSERT_TRUE(frame.select(leaf, 1));
leaf[0]->handleReply(mbus::Reply::UP(new WrongDistributionReply("distributor:5 storage:5")));
- ASSERT_TRUE(policy.getSystemState() != NULL);
+ ASSERT_TRUE(policy.getSystemState() != nullptr);
EXPECT_EQUAL(policy.getSystemState()->toString(), "distributor:5 storage:5");
}
std::set<string> lst;
@@ -1086,7 +853,7 @@ Test::requireThatStoragePolicyCombinesSystemAndSlobrokState()
mbus::Slobrok slobrok;
mbus::TestServer server(mbus::Identity("storage/cluster.mycluster/distributor/0"),
mbus::RoutingSpec(), slobrok,
- mbus::IProtocol::SP(new DocumentProtocol(_loadTypes, _repo)));
+ make_shared<DocumentProtocol>(_loadTypes, _repo));
server.net.registerSession("default");
string param = vespalib::make_string(
@@ -1095,12 +862,12 @@ Test::requireThatStoragePolicyCombinesSystemAndSlobrokState()
StoragePolicy &policy = setupStoragePolicy(
frame, param,
"storage/cluster.mycluster/distributor/*/default", 1);
- ASSERT_TRUE(policy.getSystemState() == NULL);
+ ASSERT_TRUE(policy.getSystemState() == nullptr);
{
std::vector<mbus::RoutingNode*> leaf;
ASSERT_TRUE(frame.select(leaf, 1));
leaf[0]->handleReply(mbus::Reply::UP(new WrongDistributionReply("distributor:99 storage:99")));
- ASSERT_TRUE(policy.getSystemState() != NULL);
+ ASSERT_TRUE(policy.getSystemState() != nullptr);
EXPECT_EQUAL(policy.getSystemState()->toString(), "distributor:99 storage:99");
}
for (int i = 0; i < 666; i++) {
@@ -1113,9 +880,7 @@ Test::testSubsetService()
{
// Prepare message.
TestFrame frame(_repo, "docproc/cluster.default");
- frame.setMessage(mbus::Message::UP(new PutDocumentMessage(Document::SP(
- new Document(*_docType,
- DocumentId("doc:scheme:"))))));
+ frame.setMessage(make_unique<PutDocumentMessage>(make_shared<Document>(*_docType, DocumentId("doc:scheme:"))));
// Test requerying for adding nodes.
frame.setHop(mbus::HopSpec("test", "docproc/cluster.default/[SubsetService:2]/chain.default"));
@@ -1128,7 +893,7 @@ Test::testSubsetService()
ASSERT_TRUE(frame.select(leaf, 1));
lst.insert(leaf[0]->getRoute().toString());
leaf[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply()));
- ASSERT_TRUE(frame.getReceptor().getReply(600).get() != NULL);
+ ASSERT_TRUE(frame.getReceptor().getReply(600));
}
ASSERT_TRUE(lst.size() > 1); // must have requeried
@@ -1147,7 +912,7 @@ Test::testSubsetService()
prev = next;
leaf[0]->handleReply(mbus::Reply::UP(new mbus::EmptyReply()));
- ASSERT_TRUE(frame.getReceptor().getReply(600).get() != NULL);
+ ASSERT_TRUE(frame.getReceptor().getReply(600));
}
// Test requerying for dropping nodes.
@@ -1164,7 +929,7 @@ Test::testSubsetService()
mbus::Reply::UP reply(new mbus::EmptyReply());
reply->addError(mbus::Error(mbus::ErrorCode::NO_ADDRESS_FOR_SERVICE, route));
leaf[0]->handleReply(std::move(reply));
- ASSERT_TRUE(frame.getReceptor().getReply(600).get() != NULL);
+ ASSERT_TRUE(frame.getReceptor().getReply(600));
}
EXPECT_EQUAL(10u, lst.size());
@@ -1178,12 +943,12 @@ Test::testSubsetServiceCache()
{
TestFrame fooFrame(_repo, "docproc/cluster.default");
mbus::HopSpec fooHop("foo", "docproc/cluster.default/[SubsetService:2]/chain.foo");
- fooFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(document::DocumentId("doc:scheme:foo"))));
+ fooFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(DocumentId("doc:scheme:foo"))));
fooFrame.setHop(fooHop);
TestFrame barFrame(fooFrame);
mbus::HopSpec barHop("bar", "docproc/cluster.default/[SubsetService:2]/chain.bar");
- barFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(document::DocumentId("doc:scheme:bar"))));
+ barFrame.setMessage(mbus::Message::UP(new GetDocumentMessage(DocumentId("doc:scheme:bar"))));
barFrame.setHop(barHop);
fooFrame.getMessageBus().setupRouting(
diff --git a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp
index 95115d1c3c1..a1837b6bcd0 100644
--- a/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp
+++ b/documentapi/src/vespa/documentapi/messagebus/documentprotocol.cpp
@@ -41,8 +41,6 @@ DocumentProtocol::DocumentProtocol(const LoadTypeSet& loadTypes,
putRoutingPolicyFactory("Extern", IRoutingPolicyFactory::SP(new RoutingPolicyFactories::ExternPolicyFactory()));
putRoutingPolicyFactory("LocalService", IRoutingPolicyFactory::SP(new RoutingPolicyFactories::LocalServicePolicyFactory()));
putRoutingPolicyFactory("RoundRobin", IRoutingPolicyFactory::SP(new RoutingPolicyFactories::RoundRobinPolicyFactory()));
- putRoutingPolicyFactory("SearchColumn", IRoutingPolicyFactory::SP(new RoutingPolicyFactories::SearchColumnPolicyFactory()));
- putRoutingPolicyFactory("SearchRow", IRoutingPolicyFactory::SP(new RoutingPolicyFactories::SearchRowPolicyFactory()));
putRoutingPolicyFactory("Storage", IRoutingPolicyFactory::SP(new RoutingPolicyFactories::StoragePolicyFactory()));
putRoutingPolicyFactory("SubsetService", IRoutingPolicyFactory::SP(new RoutingPolicyFactories::SubsetServicePolicyFactory()));
putRoutingPolicyFactory("LoadBalancer", IRoutingPolicyFactory::SP(new RoutingPolicyFactories::LoadBalancerPolicyFactory()));
@@ -134,8 +132,7 @@ DocumentProtocol::encode(const vespalib::Version &version, const mbus::Routable
std::ostringstream message;
document::StringUtil::printAsHex(
message, blob.data(), blob.size());
- LOG(spam, "Encoded message of protocol %s type %u using version "
- "%s serialization:\n%s",
+ LOG(spam, "Encoded message of protocol %s type %u using version %s serialization:\n%s",
routable.getProtocol().c_str(), routable.getType(),
version.toString().c_str(), message.str().c_str());
}
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt b/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt
index f1a691bc46d..26d51e702e9 100644
--- a/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt
+++ b/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt
@@ -11,8 +11,6 @@ vespa_add_library(documentapi_documentapipolicies OBJECT
externpolicy.cpp
localservicepolicy.cpp
roundrobinpolicy.cpp
- searchcolumnpolicy.cpp
- searchrowpolicy.cpp
subsetservicepolicy.cpp
loadbalancer.cpp
loadbalancerpolicy.cpp
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/searchcolumnpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/searchcolumnpolicy.cpp
deleted file mode 100644
index 38610aca551..00000000000
--- a/documentapi/src/vespa/documentapi/messagebus/policies/searchcolumnpolicy.cpp
+++ /dev/null
@@ -1,137 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "searchcolumnpolicy.h"
-#include <vespa/documentapi/messagebus/documentprotocol.h>
-#include <vespa/documentapi/messagebus/messages/getdocumentmessage.h>
-#include <vespa/documentapi/messagebus/messages/putdocumentmessage.h>
-#include <vespa/documentapi/messagebus/messages/removedocumentmessage.h>
-#include <vespa/documentapi/messagebus/messages/updatedocumentmessage.h>
-#include <vespa/documentapi/messagebus/messages/batchdocumentupdatemessage.h>
-#include <vespa/documentapi/messagebus/messages/multioperationmessage.h>
-#include <vespa/messagebus/emptyreply.h>
-#include <vespa/vdslib/state/clusterstate.h>
-#include <vespa/vespalib/util/hashmap.h>
-#include <vespa/log/log.h>
-LOG_SETUP(".searchcolumnpolicy");
-
-namespace documentapi {
-
-SearchColumnPolicy::SearchColumnPolicy(const string &param) :
- _lock(),
- _factory(),
- _distributions(),
- _maxOOS(0)
-{
- if (param.length() > 0) {
- int maxOOS = atoi(param.c_str());
- if (maxOOS >= 0) {
- _maxOOS = (uint32_t)maxOOS;
- } else {
- LOG(warning,
- "Ignoring a request to set the maximum number of OOS replies to %d because it makes no "
- "sense. This routing policy will not allow any recipient to be out of service.", maxOOS);
- }
- }
-}
-
-SearchColumnPolicy::~SearchColumnPolicy()
-{
- // empty
-}
-
-void
-SearchColumnPolicy::select(mbus::RoutingContext &context)
-{
- std::vector<mbus::Route> recipients;
- context.getMatchedRecipients(recipients);
- if (recipients.empty()) {
- return;
- }
- const document::DocumentId *id = NULL;
- document::BucketId bucketId;
-
- const mbus::Message &msg = context.getMessage();
- switch(msg.getType()) {
- case DocumentProtocol::MESSAGE_PUTDOCUMENT:
- id = &static_cast<const PutDocumentMessage&>(msg).getDocument().getId();
- break;
-
- case DocumentProtocol::MESSAGE_GETDOCUMENT:
- id = &static_cast<const GetDocumentMessage&>(msg).getDocumentId();
- break;
-
- case DocumentProtocol::MESSAGE_REMOVEDOCUMENT:
- id = &static_cast<const RemoveDocumentMessage&>(msg).getDocumentId();
- break;
-
- case DocumentProtocol::MESSAGE_UPDATEDOCUMENT:
- id = &static_cast<const UpdateDocumentMessage&>(msg).getDocumentUpdate().getId();
- break;
-
- case DocumentProtocol::MESSAGE_MULTIOPERATION:
- bucketId = (static_cast<const MultiOperationMessage&>(msg)).getBucketId();
- break;
-
- case DocumentProtocol::MESSAGE_BATCHDOCUMENTUPDATE:
- bucketId = (static_cast<const BatchDocumentUpdateMessage&>(msg)).getBucketId();
- break;
-
- default:
- LOG(error, "Message type '%d' not supported.", msg.getType());
- return;
- }
- if (bucketId.getRawId() == 0) {
- bucketId = _factory.getBucketId(*id);
- }
- uint32_t recipient = getRecipient(bucketId, recipients.size());
- context.addChild(recipients[recipient]);
- context.setSelectOnRetry(true);
- if (_maxOOS > 0) {
- context.addConsumableError(mbus::ErrorCode::SERVICE_OOS);
- }
-}
-
-void
-SearchColumnPolicy::merge(mbus::RoutingContext &context)
-{
- if (_maxOOS > 0) {
- if (context.getNumChildren() > 1) {
- std::set<uint32_t> oosReplies;
- uint32_t idx = 0;
- for (mbus::RoutingNodeIterator it = context.getChildIterator();
- it.isValid(); it.next())
- {
- const mbus::Reply &ref = it.getReplyRef();
- if (ref.hasErrors() && DocumentProtocol::hasOnlyErrorsOfType(ref, mbus::ErrorCode::SERVICE_OOS)) {
- oosReplies.insert(idx);
- }
- ++idx;
- }
- if (oosReplies.size() <= _maxOOS) {
- DocumentProtocol::merge(context, oosReplies);
- return; // may the rtx be with you
- }
- } else {
- const mbus::Reply &ref = context.getChildIterator().getReplyRef();
- if (ref.hasErrors() && DocumentProtocol::hasOnlyErrorsOfType(ref, mbus::ErrorCode::SERVICE_OOS)) {
- context.setReply(mbus::Reply::UP(new mbus::EmptyReply()));
- return; // god help us all
- }
- }
- }
- DocumentProtocol::merge(context);
-}
-
-uint32_t
-SearchColumnPolicy::getRecipient(const document::BucketId &bucketId, uint32_t numRecipients)
-{
- vespalib::LockGuard guard(_lock);
- DistributionCache::iterator it = _distributions.find(numRecipients);
- if (it == _distributions.end()) {
- it = _distributions.insert(DistributionCache::value_type(numRecipients, vdslib::BucketDistribution(1, 16u))).first;
- it->second.setNumColumns(numRecipients);
- }
- return it->second.getColumn(bucketId);
-}
-
-}
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/searchcolumnpolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/searchcolumnpolicy.h
deleted file mode 100644
index a17824249c4..00000000000
--- a/documentapi/src/vespa/documentapi/messagebus/policies/searchcolumnpolicy.h
+++ /dev/null
@@ -1,52 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include <vespa/documentapi/common.h>
-#include <vespa/messagebus/routing/iroutingpolicy.h>
-#include <vespa/vdslib/bucketdistribution.h>
-#include <vespa/document/bucket/bucketidfactory.h>
-#include <vespa/vespalib/util/sync.h>
-#include <map>
-
-namespace documentapi {
-
-/**
- * This policy implements the logic to select recipients for a single search column.
- *
- * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen</a>
- * @version $Id$
- */
-class SearchColumnPolicy : public mbus::IRoutingPolicy {
-private:
- typedef std::map<uint32_t, vdslib::BucketDistribution> DistributionCache;
-
- vespalib::Lock _lock;
- document::BucketIdFactory _factory;
- DistributionCache _distributions;
- uint32_t _maxOOS;
-
- /**
- * Returns the recipient index for the given bucket id. This updates the shared internal distribution map, so it
- * needs to be synchronized.
- *
- * @param bucketId The bucket whose recipient to return.
- * @param numRecipients The number of recipients being distributed to.
- * @return The recipient to use.
- */
- uint32_t getRecipient(const document::BucketId &bucketId, uint32_t numRecipients);
-
-public:
- /**
- * Constructs a new policy object for the given parameter string. The string can be null or empty, which is a
- * request to not allow any bad columns.
- *
- * @param param The maximum number of allowed bad columns.
- */
- SearchColumnPolicy(const string &param);
- ~SearchColumnPolicy();
-
- void select(mbus::RoutingContext &context) override;
- void merge(mbus::RoutingContext &context) override;
-};
-
-}
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/searchrowpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/searchrowpolicy.cpp
deleted file mode 100644
index 20f8398f1e6..00000000000
--- a/documentapi/src/vespa/documentapi/messagebus/policies/searchrowpolicy.cpp
+++ /dev/null
@@ -1,63 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "searchrowpolicy.h"
-#include <vespa/documentapi/messagebus/documentprotocol.h>
-
-#include <vespa/log/log.h>
-LOG_SETUP(".searchrowpolicy");
-
-namespace documentapi {
-
-SearchRowPolicy::SearchRowPolicy(const string &param) :
- _minOk(0)
-{
- if (param.length() > 0) {
- int minOk = atoi(param.c_str());
- if (minOk > 0) {
- _minOk = (uint32_t)minOk;
- } else {
- LOG(warning,
- "Ignoring a request to set the minimum number of OK replies to %d because it makes no sense. "
- "This routing policy will not allow any recipient to be out of service.", minOk);
- }
- }
-}
-
-SearchRowPolicy::~SearchRowPolicy() {}
-
-void
-SearchRowPolicy::select(mbus::RoutingContext &context)
-{
- std::vector<mbus::Route> recipients;
- context.getMatchedRecipients(recipients);
- context.addChildren(recipients);
- context.setSelectOnRetry(false);
- if (_minOk > 0) {
- context.addConsumableError(mbus::ErrorCode::SERVICE_OOS);
- }
-}
-
-void
-SearchRowPolicy::merge(mbus::RoutingContext &context)
-{
- if (_minOk > 0) {
- std::set<uint32_t> oosReplies;
- uint32_t idx = 0;
- for (mbus::RoutingNodeIterator it = context.getChildIterator();
- it.isValid(); it.next())
- {
- const mbus::Reply &ref = it.getReplyRef();
- if (ref.hasErrors() && DocumentProtocol::hasOnlyErrorsOfType(ref, mbus::ErrorCode::SERVICE_OOS)) {
- oosReplies.insert(idx);
- }
- ++idx;
- }
- if (context.getNumChildren() - oosReplies.size() >= _minOk) {
- DocumentProtocol::merge(context, oosReplies);
- return;
- }
- }
- DocumentProtocol::merge(context);
-}
-
-}
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/searchrowpolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/searchrowpolicy.h
deleted file mode 100644
index 2ca987ba4f7..00000000000
--- a/documentapi/src/vespa/documentapi/messagebus/policies/searchrowpolicy.h
+++ /dev/null
@@ -1,30 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#pragma once
-
-#include <vespa/documentapi/common.h>
-#include <vespa/messagebus/routing/iroutingpolicy.h>
-
-namespace documentapi {
-
-class SearchRowPolicy : public mbus::IRoutingPolicy {
-private:
- SearchRowPolicy(const SearchRowPolicy &);
- SearchRowPolicy &operator=(const SearchRowPolicy &);
-
-public:
- /**
- * Creates a search row policy that wraps the underlying search group policy in case the parameter is something
- * other than an empty string.
- *
- * @param param The number of minimum non-OOS replies that this policy requires.
- */
- SearchRowPolicy(const string &param);
- ~SearchRowPolicy();
-
- void select(mbus::RoutingContext &context) override;
- void merge(mbus::RoutingContext &context) override;
-private:
- uint32_t _minOk; // Hide OUT_OF_SERVICE as long as this number of replies are something else.
-};
-
-}
diff --git a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp
index 741083009f5..2c244c63046 100644
--- a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp
+++ b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.cpp
@@ -6,8 +6,6 @@
#include <vespa/documentapi/messagebus/policies/externpolicy.h>
#include <vespa/documentapi/messagebus/policies/localservicepolicy.h>
#include <vespa/documentapi/messagebus/policies/roundrobinpolicy.h>
-#include <vespa/documentapi/messagebus/policies/searchcolumnpolicy.h>
-#include <vespa/documentapi/messagebus/policies/searchrowpolicy.h>
#include <vespa/documentapi/messagebus/policies/subsetservicepolicy.h>
#include <vespa/documentapi/messagebus/policies/storagepolicy.h>
#include <vespa/documentapi/messagebus/policies/contentpolicy.h>
@@ -107,18 +105,6 @@ RoutingPolicyFactories::RoundRobinPolicyFactory::createPolicy(const string &para
}
mbus::IRoutingPolicy::UP
-RoutingPolicyFactories::SearchColumnPolicyFactory::createPolicy(const string &param) const
-{
- return mbus::IRoutingPolicy::UP(new SearchColumnPolicy(param));
-}
-
-mbus::IRoutingPolicy::UP
-RoutingPolicyFactories::SearchRowPolicyFactory::createPolicy(const string &param) const
-{
- return mbus::IRoutingPolicy::UP(new SearchRowPolicy(param));
-}
-
-mbus::IRoutingPolicy::UP
RoutingPolicyFactories::SubsetServicePolicyFactory::createPolicy(const string &param) const
{
return mbus::IRoutingPolicy::UP(new SubsetServicePolicy(param));
diff --git a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h
index 95a65008a9b..003768aedda 100644
--- a/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h
+++ b/documentapi/src/vespa/documentapi/messagebus/routingpolicyfactories.h
@@ -52,14 +52,6 @@ public:
public:
mbus::IRoutingPolicy::UP createPolicy(const string &param) const override;
};
- class SearchColumnPolicyFactory : public IRoutingPolicyFactory {
- public:
- mbus::IRoutingPolicy::UP createPolicy(const string &param) const override;
- };
- class SearchRowPolicyFactory : public IRoutingPolicyFactory {
- public:
- mbus::IRoutingPolicy::UP createPolicy(const string &param) const override;
- };
class SubsetServicePolicyFactory : public IRoutingPolicyFactory {
public:
mbus::IRoutingPolicy::UP createPolicy(const string &param) const override;
diff --git a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/StatusCodes.java b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/StatusCodes.java
index 91b269081bf..adbe07df8d0 100644
--- a/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/StatusCodes.java
+++ b/jdisc_messagebus_service/src/main/java/com/yahoo/messagebus/jdisc/StatusCodes.java
@@ -34,8 +34,6 @@ public class StatusCodes {
return Response.Status.BAD_REQUEST;
case ErrorCode.NO_SERVICES_FOR_ROUTE:
return Response.Status.NOT_FOUND;
- case ErrorCode.SERVICE_OOS:
- return Response.Status.SERVICE_UNAVAILABLE;
case ErrorCode.ENCODE_ERROR:
return Response.Status.BAD_REQUEST;
case ErrorCode.NETWORK_ERROR:
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/ErrorCode.java b/messagebus/src/main/java/com/yahoo/messagebus/ErrorCode.java
index 7c645ec4e1b..ee8a4cc2b55 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/ErrorCode.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/ErrorCode.java
@@ -50,7 +50,9 @@ public final class ErrorCode {
/** No services found for the message route. */
public static final int NO_SERVICES_FOR_ROUTE = FATAL_ERROR + 3;
- /** The selected service was out of service. */
+ /** The selected service was out of service.
+ */
+ @Deprecated // Unused and will be removed
public static final int SERVICE_OOS = FATAL_ERROR + 4;
/** An error occured while encoding the message. */
diff --git a/messagebus/src/vespa/messagebus/errorcode.cpp b/messagebus/src/vespa/messagebus/errorcode.cpp
index 67330580f7a..af9089b987e 100644
--- a/messagebus/src/vespa/messagebus/errorcode.cpp
+++ b/messagebus/src/vespa/messagebus/errorcode.cpp
@@ -30,7 +30,6 @@ ErrorCode::getName(uint32_t errorCode)
case SEND_QUEUE_CLOSED : return "SEND_QUEUE_CLOSED";
case SEND_QUEUE_FULL : return "SEND_QUEUE_FULL";
case SEQUENCE_ERROR : return "SEQUENCE_ERROR";
- case SERVICE_OOS : return "SERVICE_OOS";
case SESSION_BUSY : return "SESSION_BUSY";
case TIMEOUT : return "TIMEOUT";
case TRANSIENT_ERROR : return "TRANSIENT_ERROR";
diff --git a/messagebus/src/vespa/messagebus/errorcode.h b/messagebus/src/vespa/messagebus/errorcode.h
index 2cdc6952a67..f170e29ca8f 100644
--- a/messagebus/src/vespa/messagebus/errorcode.h
+++ b/messagebus/src/vespa/messagebus/errorcode.h
@@ -72,7 +72,8 @@ public:
NO_SERVICES_FOR_ROUTE = FATAL_ERROR + 3,
// The selected service was out of service.
- SERVICE_OOS = FATAL_ERROR + 4,
+ // Unused.....
+ // SERVICE_OOS = FATAL_ERROR + 4,
// An error occured while encoding the message.
ENCODE_ERROR = FATAL_ERROR + 5,
diff --git a/messagebus_test/src/tests/errorcodes/DumpCodes.java b/messagebus_test/src/tests/errorcodes/DumpCodes.java
index 01003876fa3..96c7fc57a3e 100644
--- a/messagebus_test/src/tests/errorcodes/DumpCodes.java
+++ b/messagebus_test/src/tests/errorcodes/DumpCodes.java
@@ -24,7 +24,6 @@ public class DumpCodes {
dump("SEND_QUEUE_CLOSED", ErrorCode.SEND_QUEUE_CLOSED);
dump("ILLEGAL_ROUTE", ErrorCode.ILLEGAL_ROUTE);
dump("NO_SERVICES_FOR_ROUTE", ErrorCode.NO_SERVICES_FOR_ROUTE);
- dump("SERVICE_OOS", ErrorCode.SERVICE_OOS);
dump("ENCODE_ERROR", ErrorCode.ENCODE_ERROR);
dump("NETWORK_ERROR", ErrorCode.NETWORK_ERROR);
dump("UNKNOWN_PROTOCOL", ErrorCode.UNKNOWN_PROTOCOL);
diff --git a/messagebus_test/src/tests/errorcodes/dumpcodes.cpp b/messagebus_test/src/tests/errorcodes/dumpcodes.cpp
index fdc8892b7c7..ecc5c9bc42c 100644
--- a/messagebus_test/src/tests/errorcodes/dumpcodes.cpp
+++ b/messagebus_test/src/tests/errorcodes/dumpcodes.cpp
@@ -36,7 +36,6 @@ App::Main()
dump("SEND_QUEUE_CLOSED", ErrorCode::SEND_QUEUE_CLOSED);
dump("ILLEGAL_ROUTE", ErrorCode::ILLEGAL_ROUTE);
dump("NO_SERVICES_FOR_ROUTE", ErrorCode::NO_SERVICES_FOR_ROUTE);
- dump("SERVICE_OOS", ErrorCode::SERVICE_OOS);
dump("ENCODE_ERROR", ErrorCode::ENCODE_ERROR);
dump("NETWORK_ERROR", ErrorCode::NETWORK_ERROR);
dump("UNKNOWN_PROTOCOL", ErrorCode::UNKNOWN_PROTOCOL);
diff --git a/messagebus_test/src/tests/errorcodes/ref-dump.txt b/messagebus_test/src/tests/errorcodes/ref-dump.txt
index b8038816897..70a10de7b82 100644
--- a/messagebus_test/src/tests/errorcodes/ref-dump.txt
+++ b/messagebus_test/src/tests/errorcodes/ref-dump.txt
@@ -10,7 +10,6 @@ first unused TRANSIENT_ERROR => 100008 => "UNKNOWN(100008)"
SEND_QUEUE_CLOSED => 200001 => "SEND_QUEUE_CLOSED"
ILLEGAL_ROUTE => 200002 => "ILLEGAL_ROUTE"
NO_SERVICES_FOR_ROUTE => 200003 => "NO_SERVICES_FOR_ROUTE"
-SERVICE_OOS => 200004 => "SERVICE_OOS"
ENCODE_ERROR => 200005 => "ENCODE_ERROR"
NETWORK_ERROR => 200006 => "NETWORK_ERROR"
UNKNOWN_PROTOCOL => 200007 => "UNKNOWN_PROTOCOL"
diff --git a/searchcore/src/tests/proton/docsummary/docsummary.cpp b/searchcore/src/tests/proton/docsummary/docsummary.cpp
index 3b199d266a8..bce3fb7267c 100644
--- a/searchcore/src/tests/proton/docsummary/docsummary.cpp
+++ b/searchcore/src/tests/proton/docsummary/docsummary.cpp
@@ -21,9 +21,8 @@
#include <vespa/searchcore/proton/server/memoryconfigstore.h>
#include <vespa/searchcore/proton/server/searchview.h>
#include <vespa/searchcore/proton/server/summaryadapter.h>
-#include <vespa/searchlib/common/idestructorcallback.h>
+#include <vespa/searchlib/common/gatecallback.h>
#include <vespa/searchlib/common/transport.h>
-#include <vespa/searchlib/docstore/logdocumentstore.h>
#include <vespa/searchlib/engine/docsumapi.h>
#include <vespa/searchlib/index/docbuilder.h>
#include <vespa/searchlib/index/dummyfileheadercontext.h>
@@ -254,7 +253,7 @@ public:
op.setSerialNum(serialNum);
op.setDbDocumentId(dbdId);
op.setPrevDbDocumentId(prevDbdId);
- _ddb->getFeedHandler().storeOperation(op);
+ _ddb->getFeedHandler().storeOperation(op, std::make_shared<search::IgnoreCallback>());
SearchView *sv(dynamic_cast<SearchView *>(_ddb->getReadySubDB()->getSearchView().get()));
if (sv != NULL) {
// cf. FeedView::putAttributes()
diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
index 823c31dd1c2..b8ffc41d3cd 100644
--- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
@@ -370,7 +370,7 @@ struct MyTlsWriter : TlsWriter {
bool erase_return;
MyTlsWriter() : store_count(0), erase_count(0), erase_return(true) {}
- void storeOperation(const FeedOperation &) override { ++store_count; }
+ void storeOperation(const FeedOperation &, DoneCallback) override { ++store_count; }
bool erase(SerialNum) override { ++erase_count; return erase_return; }
SerialNum sync(SerialNum syncTo) override {
diff --git a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp
index 55f71da9687..56bd99c90f6 100644
--- a/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/lid_space_compaction/lid_space_compaction_test.cpp
@@ -138,7 +138,7 @@ struct MyStorer : public IOperationStorer
: _moveCnt(0),
_compactCnt(0)
{}
- virtual void storeOperation(FeedOperation &op) override {
+ void storeOperation(const FeedOperation &op, DoneCallback) override {
if (op.getType() == FeedOperation::MOVE) {
++ _moveCnt;
} else if (op.getType() == FeedOperation::COMPACT_LID_SPACE) {
diff --git a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp
index 559dbb240a8..f20ad01bcf6 100644
--- a/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/maintenancecontroller/maintenancecontroller_test.cpp
@@ -26,7 +26,7 @@
#include <vespa/searchcore/proton/test/test.h>
#include <vespa/searchlib/attribute/attributecontext.h>
#include <vespa/searchlib/attribute/attributeguard.h>
-#include <vespa/searchlib/common/idestructorcallback.h>
+#include <vespa/searchlib/common/gatecallback.h>
#include <vespa/searchlib/common/idocumentmetastore.h>
#include <vespa/searchlib/index/docbuilder.h>
#include <vespa/vespalib/data/slime/slime.h>
@@ -232,7 +232,7 @@ public:
}
// Implements IOperationStorer
- virtual void storeOperation(FeedOperation &op) override;
+ virtual void storeOperation(const FeedOperation &op, DoneCallback) override;
uint32_t getHeartBeats() {
return _heartBeats;
@@ -781,7 +781,6 @@ MyFeedHandler::isExecutorThread()
void
MyFeedHandler::handleMove(MoveOperation &op, IDestructorCallback::SP moveDoneCtx)
{
- (void) moveDoneCtx;
assert(isExecutorThread());
assert(op.getValidPrevDbdId());
_subDBs[op.getSubDbId()]->prepareMove(op);
@@ -792,7 +791,7 @@ MyFeedHandler::handleMove(MoveOperation &op, IDestructorCallback::SP moveDoneCtx
assert(op.getPrevSubDbId() != 1u);
assert(op.getSubDbId() < _subDBs.size());
assert(op.getPrevSubDbId() < _subDBs.size());
- storeOperation(op);
+ storeOperation(op, std::move(moveDoneCtx));
_subDBs[op.getSubDbId()]->handleMove(op);
_subDBs[op.getPrevSubDbId()]->handleMove(op);
}
@@ -803,7 +802,7 @@ MyFeedHandler::performPruneRemovedDocuments(PruneRemovedDocumentsOperation &op)
{
assert(isExecutorThread());
if (op.getLidsToRemove()->getNumLids() != 0u) {
- storeOperation(op);
+ storeOperation(op, std::make_shared<search::IgnoreCallback>());
// magic number.
_subDBs[1u]->handlePruneRemovedDocuments(op);
}
@@ -826,9 +825,9 @@ MyFeedHandler::setSubDBs(const std::vector<MyDocumentSubDB *> &subDBs)
void
-MyFeedHandler::storeOperation(FeedOperation &op)
+MyFeedHandler::storeOperation(const FeedOperation &op, DoneCallback)
{
- op.setSerialNum(incSerialNum());
+ const_cast<FeedOperation &>(op).setSerialNum(incSerialNum());
}
@@ -1011,22 +1010,16 @@ MaintenanceControllerFixture::performForwardMaintenanceConfig()
void
-MaintenanceControllerFixture::insertDocs(const test::UserDocuments &docs,
- MyDocumentSubDB &subDb)
+MaintenanceControllerFixture::insertDocs(const test::UserDocuments &docs, MyDocumentSubDB &subDb)
{
- for (test::UserDocuments::Iterator itr = docs.begin();
- itr != docs.end();
- ++itr) {
+ for (auto itr = docs.begin(); itr != docs.end(); ++itr) {
const test::BucketDocuments &bucketDocs = itr->second;
for (size_t i = 0; i < bucketDocs.getDocs().size(); ++i) {
const test::Document &testDoc = bucketDocs.getDocs()[i];
- PutOperation op(testDoc.getBucket(),
- testDoc.getTimestamp(),
- testDoc.getDoc());
- op.setDbDocumentId(DbDocumentId(subDb.getSubDBId(),
- testDoc.getLid()));
- _fh.storeOperation(op);
+ PutOperation op(testDoc.getBucket(), testDoc.getTimestamp(), testDoc.getDoc());
+ op.setDbDocumentId(DbDocumentId(subDb.getSubDBId(), testDoc.getLid()));
+ _fh.storeOperation(op, std::make_shared<search::IgnoreCallback>());
subDb.handlePut(op);
}
}
@@ -1038,18 +1031,13 @@ MaintenanceControllerFixture::removeDocs(const test::UserDocuments &docs,
Timestamp timestamp)
{
- for (test::UserDocuments::Iterator itr = docs.begin();
- itr != docs.end();
- ++itr) {
+ for (auto itr = docs.begin(); itr != docs.end(); ++itr) {
const test::BucketDocuments &bucketDocs = itr->second;
for (size_t i = 0; i < bucketDocs.getDocs().size(); ++i) {
const test::Document &testDoc = bucketDocs.getDocs()[i];
- RemoveOperation op(testDoc.getBucket(),
- timestamp,
- testDoc.getDoc()->getId());
- op.setDbDocumentId(DbDocumentId(_removed.getSubDBId(),
- testDoc.getLid()));
- _fh.storeOperation(op);
+ RemoveOperation op(testDoc.getBucket(), timestamp, testDoc.getDoc()->getId());
+ op.setDbDocumentId(DbDocumentId(_removed.getSubDBId(), testDoc.getLid()));
+ _fh.storeOperation(op, std::make_shared<search::IgnoreCallback>());
_removed.handleRemove(op);
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
index 4198803d1fe..5babacfc4b6 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
@@ -307,7 +307,7 @@ DocumentDB::enterReprocessState()
if (!runner.empty()) {
runner.run();
NoopOperation op;
- _feedHandler.storeOperation(op);
+ _feedHandler.storeOperationSync(op);
sync(op.getSerialNum());
_subDBs.pruneRemovedFields(op.getSerialNum());
}
@@ -397,15 +397,14 @@ DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum
_config_store->saveConfig(*configSnapshot, serialNum);
// save entry in transaction log
NewConfigOperation op(serialNum, *_config_store);
- _feedHandler.storeOperation(op);
+ _feedHandler.storeOperationSync(op);
sync(op.getSerialNum());
}
bool hasVisibilityDelayChanged = false;
{
bool elidedConfigSave = equalReplayConfig && tlsReplayDone;
// Flush changes to attributes and memory index, cf. visibilityDelay
- _feedView.get()->forceCommit(elidedConfigSave ? serialNum :
- serialNum - 1);
+ _feedView.get()->forceCommit(elidedConfigSave ? serialNum : serialNum - 1);
_writeService.sync();
fastos::TimeStamp visibilityDelay = configSnapshot->getMaintenanceConfigSP()->getVisibilityDelay();
hasVisibilityDelayChanged = (visibilityDelay != _visibility.getVisibilityDelay());
@@ -585,7 +584,7 @@ DocumentDB::saveInitialConfig(const DocumentDBConfig &configSnapshot)
// pruned at once anyway.
// save noop entry in transaction log
NoopOperation op;
- _feedHandler.storeOperation(op);
+ _feedHandler.storeOperationSync(op);
sync(op.getSerialNum());
// Wipe everything in transaction log before initial config.
try {
@@ -609,7 +608,7 @@ DocumentDB::resumeSaveConfig()
SerialNum confSerial = _feedHandler.incSerialNum();
// resume operation, i.e. save config entry in transaction log
NewConfigOperation op(confSerial, *_config_store);
- _feedHandler.storeOperation(op);
+ _feedHandler.storeOperationSync(op);
sync(op.getSerialNum());
}
@@ -776,7 +775,7 @@ DocumentDB::enterRedoReprocessState()
runner.run();
_subDBs.onReprocessDone(_feedHandler.getSerialNum());
NoopOperation op;
- _feedHandler.storeOperation(op);
+ _feedHandler.storeOperationSync(op);
sync(op.getSerialNum());
_subDBs.pruneRemovedFields(op.getSerialNum());
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
index b01ba43cb49..bc1715e3c04 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
@@ -12,7 +12,7 @@
#include <vespa/searchcore/proton/persistenceengine/i_resource_write_filter.h>
#include <vespa/searchcore/proton/persistenceengine/transport_latch.h>
#include <vespa/searchcorespi/index/ithreadingservice.h>
-#include <vespa/searchlib/common/idestructorcallback.h>
+#include <vespa/searchlib/common/gatecallback.h>
#include <vespa/vespalib/util/exceptions.h>
#include <unistd.h>
@@ -34,6 +34,8 @@ using vespalib::makeLambdaTask;
using vespalib::make_string;
using vespalib::MonitorGuard;
using vespalib::LockGuard;
+using std::make_unique;
+using std::make_shared;
namespace proton {
@@ -46,8 +48,8 @@ ignoreOperation(const DocumentOperation &op) {
} // namespace
-void FeedHandler::TlsMgrWriter::storeOperation(const FeedOperation &op) {
- TlcProxy(_tls_mgr.getDomainName(), *_tlsDirectWriter).storeOperation(op);
+void FeedHandler::TlsMgrWriter::storeOperation(const FeedOperation &op, DoneCallback onDone) {
+ TlcProxy(_tls_mgr.getDomainName(), *_tlsDirectWriter).storeOperation(op, std::move(onDone));
}
bool FeedHandler::TlsMgrWriter::erase(SerialNum oldest_to_keep) {
return _tls_mgr.getSession()->erase(oldest_to_keep);
@@ -72,7 +74,6 @@ FeedHandler::TlsMgrWriter::sync(SerialNum syncTo)
LOG(spam, "Tls sync incomplete, reached %" PRIu64 ", retrying", syncedTo);
}
throw IllegalStateException(make_string("Failed to sync TLS to token %" PRIu64 ".", syncTo));
- return 0;
}
void
@@ -90,13 +91,13 @@ void FeedHandler::performPut(FeedToken token, PutOperation &op) {
LOG(debug, "performPut(): ignoreOperation: docId(%s), timestamp(%" PRIu64 "), prevTimestamp(%" PRIu64 ")",
op.getDocument()->getId().toString().c_str(), (uint64_t)op.getTimestamp(), (uint64_t)op.getPrevTimestamp());
if (token) {
- token->setResult(std::make_unique<Result>(), false);
+ token->setResult(make_unique<Result>(), false);
}
return;
}
- storeOperation(op);
+ storeOperation(op, token);
if (token) {
- token->setResult(std::make_unique<Result>(), false);
+ token->setResult(make_unique<Result>(), false);
}
_activeFeedView->handlePut(std::move(token), op);
}
@@ -112,7 +113,7 @@ FeedHandler::performUpdate(FeedToken token, UpdateOperation &op)
createNonExistingDocument(std::move(token), op);
} else {
if (token) {
- token->setResult(std::make_unique<UpdateResult>(Timestamp(0)), false);
+ token->setResult(make_unique<UpdateResult>(Timestamp(0)), false);
}
}
}
@@ -121,9 +122,9 @@ FeedHandler::performUpdate(FeedToken token, UpdateOperation &op)
void
FeedHandler::performInternalUpdate(FeedToken token, UpdateOperation &op)
{
- storeOperation(op);
+ storeOperation(op, token);
if (token) {
- token->setResult(ResultUP(new UpdateResult(op.getPrevTimestamp())), true);
+ token->setResult(make_unique<UpdateResult>(op.getPrevTimestamp()), true);
}
_activeFeedView->handleUpdate(std::move(token), op);
}
@@ -132,14 +133,14 @@ FeedHandler::performInternalUpdate(FeedToken token, UpdateOperation &op)
void
FeedHandler::createNonExistingDocument(FeedToken token, const UpdateOperation &op)
{
- Document::SP doc(new Document(op.getUpdate()->getType(), op.getUpdate()->getId()));
+ auto doc = make_shared<Document>(op.getUpdate()->getType(), op.getUpdate()->getId());
doc->setRepo(*_activeFeedView->getDocumentTypeRepo());
op.getUpdate()->applyTo(*doc);
PutOperation putOp(op.getBucketId(), op.getTimestamp(), doc);
_activeFeedView->preparePut(putOp);
- storeOperation(putOp);
+ storeOperation(putOp, token);
if (token) {
- token->setResult(ResultUP(new UpdateResult(putOp.getTimestamp())), true);
+ token->setResult(make_unique<UpdateResult>(putOp.getTimestamp()), true);
}
TransportLatch latch(1);
_activeFeedView->handlePut(feedtoken::make(latch), putOp);
@@ -153,29 +154,29 @@ void FeedHandler::performRemove(FeedToken token, RemoveOperation &op) {
LOG(debug, "performRemove(): ignoreOperation: docId(%s), timestamp(%" PRIu64 "), prevTimestamp(%" PRIu64 ")",
op.getDocumentId().toString().c_str(), (uint64_t)op.getTimestamp(), (uint64_t)op.getPrevTimestamp());
if (token) {
- token->setResult(ResultUP(new RemoveResult(false)), false);
+ token->setResult(make_unique<RemoveResult>(false), false);
}
return;
}
if (op.getPrevDbDocumentId().valid()) {
assert(op.getValidNewOrPrevDbdId());
assert(op.notMovingLidInSameSubDb());
- storeOperation(op);
+ storeOperation(op, token);
if (token) {
bool documentWasFound = !op.getPrevMarkedAsRemoved();
- token->setResult(ResultUP(new RemoveResult(documentWasFound)), documentWasFound);
+ token->setResult(make_unique<RemoveResult>(documentWasFound), documentWasFound);
}
_activeFeedView->handleRemove(std::move(token), op);
} else if (op.hasDocType()) {
assert(op.getDocType() == _docTypeName.getName());
- storeOperation(op);
+ storeOperation(op, token);
if (token) {
- token->setResult(ResultUP(new RemoveResult(false)), false);
+ token->setResult(make_unique<RemoveResult>(false), false);
}
_activeFeedView->handleRemove(std::move(token), op);
} else {
if (token) {
- token->setResult(ResultUP(new RemoveResult(false)), false);
+ token->setResult(make_unique<RemoveResult>(false), false);
}
}
}
@@ -186,20 +187,16 @@ FeedHandler::performGarbageCollect(FeedToken token)
(void) token;
}
-
void
FeedHandler::performCreateBucket(FeedToken token, CreateBucketOperation &op)
{
- (void) token;
- storeOperation(op);
+ storeOperation(op, token);
_bucketDBHandler->handleCreateBucket(op.getBucketId());
}
-
void FeedHandler::performDeleteBucket(FeedToken token, DeleteBucketOperation &op) {
- (void) token;
_activeFeedView->prepareDeleteBucket(op);
- storeOperation(op);
+ storeOperation(op, token);
// Delete documents in bucket
_activeFeedView->handleDeleteBucket(op);
// Delete bucket itself, should no longer have documents.
@@ -207,21 +204,16 @@ void FeedHandler::performDeleteBucket(FeedToken token, DeleteBucketOperation &op
}
-
void FeedHandler::performSplit(FeedToken token, SplitBucketOperation &op) {
- (void) token;
- storeOperation(op);
+ storeOperation(op, token);
_bucketDBHandler->handleSplit(op.getSerialNum(), op.getSource(), op.getTarget1(), op.getTarget2());
}
-
void FeedHandler::performJoin(FeedToken token, JoinBucketsOperation &op) {
- (void) token;
- storeOperation(op);
+ storeOperation(op, token);
_bucketDBHandler->handleJoin(op.getSerialNum(), op.getSource1(), op.getSource2(), op.getTarget());
}
-
void
FeedHandler::performSync()
{
@@ -342,7 +334,7 @@ FeedHandler::FeedHandler(IThreadingService &writeService,
_prunedSerialNum(0),
_delayedPrune(false),
_feedLock(),
- _feedState(std::make_shared<InitState>(getDocTypeName())),
+ _feedState(make_shared<InitState>(getDocTypeName())),
_activeFeedView(nullptr),
_bucketDBHandler(nullptr),
_syncLock(),
@@ -381,7 +373,7 @@ FeedHandler::replayTransactionLog(SerialNum flushedIndexMgrSerial, SerialNum flu
(void) newestFlushedSerial;
assert(_activeFeedView);
assert(_bucketDBHandler);
- FeedState::SP state = std::make_shared<ReplayTransactionLogState>
+ FeedState::SP state = make_shared<ReplayTransactionLogState>
(getDocTypeName(), _activeFeedView, *_bucketDBHandler, _replayConfig, config_store);
changeFeedState(state);
// Resurrected attribute vector might cause oldestFlushedSerial to
@@ -404,7 +396,7 @@ FeedHandler::flushDone(SerialNum flushedSerial)
}
void FeedHandler::changeToNormalFeedState() {
- changeFeedState(FeedState::SP(new NormalState(*this)));
+ changeFeedState(make_shared<NormalState>(*this));
}
bool
@@ -412,18 +404,28 @@ FeedHandler::isDoingReplay() const {
return _tlsMgr.isDoingReplay();
}
-bool FeedHandler::getTransactionLogReplayDone() const {
+bool
+FeedHandler::getTransactionLogReplayDone() const {
return _tlsMgr.getReplayDone();
}
-void FeedHandler::storeOperation(FeedOperation &op) {
+void
+FeedHandler::storeOperation(const FeedOperation &op, TlsWriter::DoneCallback onDone) {
if (!op.getSerialNum()) {
- op.setSerialNum(incSerialNum());
+ const_cast<FeedOperation &>(op).setSerialNum(incSerialNum());
}
- _tlsWriter.storeOperation(op);
+ _tlsWriter.storeOperation(op, std::move(onDone));
}
-void FeedHandler::tlsPrune(SerialNum oldest_to_keep) {
+void
+FeedHandler::storeOperationSync(const FeedOperation &op) {
+ vespalib::Gate gate;
+ storeOperation(op, make_shared<search::GateCallback>(gate));
+ gate.await();
+}
+
+void
+FeedHandler::tlsPrune(SerialNum oldest_to_keep) {
if (!_tlsWriter.erase(oldest_to_keep)) {
throw IllegalStateException(make_string("Failed to prune TLS to token %" PRIu64 ".", oldest_to_keep));
}
@@ -445,7 +447,7 @@ void feedOperationRejected(FeedToken & token, const vespalib::string &opType, co
if (token) {
auto message = make_string("%s operation rejected for document '%s' of type '%s': '%s'",
opType.c_str(), docId.c_str(), docTypeName.toString().c_str(), rejectMessage.c_str());
- token->setResult(ResultUP(new ResultType(Result::RESOURCE_EXHAUSTED, message)), false);
+ token->setResult(make_unique<ResultType>(Result::RESOURCE_EXHAUSTED, message), false);
token->fail();
}
}
@@ -533,7 +535,7 @@ FeedHandler::handleMove(MoveOperation &op, std::shared_ptr<search::IDestructorCa
assert(op.getValidDbdId());
assert(op.getValidPrevDbdId());
assert(op.getSubDbId() != op.getPrevSubDbId());
- storeOperation(op);
+ storeOperation(op, moveDoneCtx);
_activeFeedView->handleMove(op, std::move(moveDoneCtx));
}
@@ -558,7 +560,7 @@ FeedHandler::receive(const Packet &packet)
// (by fnet thread). Called via DocumentDB::recoverPacket() when
// recovering from another node.
FeedState::SP state = getFeedState();
- PacketWrapper::SP wrap(new PacketWrapper(packet, _tlsReplayProgress.get()));
+ auto wrap = make_shared<PacketWrapper>(packet, _tlsReplayProgress.get());
state->receive(wrap, _writeService.master());
wrap->gate.await();
return wrap->result;
@@ -577,7 +579,7 @@ performPruneRemovedDocuments(PruneRemovedDocumentsOperation &pruneOp)
{
const LidVectorContext::SP lids_to_remove = pruneOp.getLidsToRemove();
if (lids_to_remove && lids_to_remove->getNumLids() != 0) {
- storeOperation(pruneOp);
+ storeOperationSync(pruneOp);
_activeFeedView->handlePruneRemovedDocuments(pruneOp);
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
index d717346883a..8c28fcdc1ea 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
@@ -65,7 +65,7 @@ private:
_tls_mgr(tls_mgr),
_tlsDirectWriter(tlsDirectWriter)
{ }
- void storeOperation(const FeedOperation &op) override;
+ void storeOperation(const FeedOperation &op, DoneCallback onDone) override;
bool erase(SerialNum oldest_to_keep) override;
SerialNum sync(SerialNum syncTo) override;
};
@@ -234,7 +234,8 @@ public:
void eof() override;
void performPruneRemovedDocuments(PruneRemovedDocumentsOperation &pruneOp) override;
void syncTls(SerialNum syncTo);
- void storeOperation(FeedOperation &op) override;
+ void storeOperation(const FeedOperation &op, DoneCallback onDone) override;
+ void storeOperationSync(const FeedOperation & op);
void considerDelayedPrune();
};
diff --git a/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h b/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h
index 4e5958cd9e2..760250844a8 100644
--- a/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h
+++ b/searchcore/src/vespa/searchcore/proton/server/i_operation_storer.h
@@ -2,6 +2,8 @@
#pragma once
+#include <vespa/searchlib/transactionlog/common.h>
+
namespace proton {
class FeedOperation;
@@ -11,12 +13,13 @@ class FeedOperation;
*/
struct IOperationStorer
{
- virtual ~IOperationStorer() {}
+ using DoneCallback = search::transactionlog::Writer::DoneCallback;
+ virtual ~IOperationStorer() = default;
/**
* Assign serial number to (if not set) and store the given operation.
*/
- virtual void storeOperation(FeedOperation &op) = 0;
+ virtual void storeOperation(const FeedOperation &op, DoneCallback onDone) = 0;
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp
index d6c1a032cea..2ae8d826ebc 100644
--- a/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/lid_space_compaction_job.cpp
@@ -5,7 +5,8 @@
#include "imaintenancejobrunner.h"
#include "lid_space_compaction_job.h"
#include <vespa/searchcore/proton/common/eventlogger.h>
-#include <vespa/searchlib/common/idestructorcallback.h>
+#include <vespa/searchlib/common/gatecallback.h>
+#include <vespa/vespalib/util/sync.h>
#include <cassert>
#include <vespa/log/log.h>
@@ -55,8 +56,9 @@ LidSpaceCompactionJob::scanDocuments(const LidUsageStats &stats)
return true;
} else {
MoveOperation::UP op = _handler.createMoveOperation(document, stats.getLowestFreeLid());
- _opStorer.storeOperation(*op);
- _handler.handleMove(*op, _moveOpsLimiter->beginOperation());
+ search::IDestructorCallback::SP context = _moveOpsLimiter->beginOperation();
+ _opStorer.storeOperation(*op, context);
+ _handler.handleMove(*op, std::move(context));
if (isBlocked(BlockedReason::OUTSTANDING_OPS)) {
return true;
}
@@ -79,7 +81,9 @@ LidSpaceCompactionJob::compactLidSpace(const LidUsageStats &stats)
{
uint32_t wantedLidLimit = stats.getHighestUsedLid() + 1;
CompactLidSpaceOperation op(_handler.getSubDbId(), wantedLidLimit);
- _opStorer.storeOperation(op);
+ vespalib::Gate gate;
+ _opStorer.storeOperation(op, std::make_shared<search::GateCallback>(gate));
+ gate.await();
_handler.handleCompactLidSpace(op);
EventLogger::lidSpaceCompactionComplete(_handler.getName(), wantedLidLimit);
_shouldCompactLidSpace = false;
diff --git a/searchcore/src/vespa/searchcore/proton/server/tlcproxy.cpp b/searchcore/src/vespa/searchcore/proton/server/tlcproxy.cpp
index 215650b6664..bfc59dee35e 100644
--- a/searchcore/src/vespa/searchcore/proton/server/tlcproxy.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/tlcproxy.cpp
@@ -11,24 +11,24 @@ using search::transactionlog::Packet;
namespace proton {
-void TlcProxy::commit(search::SerialNum serialNum, search::transactionlog::Type type, const vespalib::nbostream &buf)
+void TlcProxy::commit(search::SerialNum serialNum, search::transactionlog::Type type,
+ const vespalib::nbostream &buf, DoneCallback onDone)
{
Packet::Entry entry(serialNum, type, vespalib::ConstBufferRef(buf.c_str(), buf.size()));
Packet packet;
packet.add(entry);
packet.close();
- _tlsDirectWriter.commit(_domain, packet);
-
+ _tlsDirectWriter.commit(_domain, packet, std::move(onDone));
}
void
-TlcProxy::storeOperation(const FeedOperation &op)
+TlcProxy::storeOperation(const FeedOperation &op, DoneCallback onDone)
{
nbostream stream;
op.serialize(stream);
LOG(debug, "storeOperation(): serialNum(%" PRIu64 "), type(%u), size(%zu)",
op.getSerialNum(), (uint32_t)op.getType(), stream.size());
- commit(op.getSerialNum(), (uint32_t)op.getType(), stream);
+ commit(op.getSerialNum(), (uint32_t)op.getType(), stream, std::move(onDone));
}
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/tlcproxy.h b/searchcore/src/vespa/searchcore/proton/server/tlcproxy.h
index 8e4feb2f354..2dc6501731e 100644
--- a/searchcore/src/vespa/searchcore/proton/server/tlcproxy.h
+++ b/searchcore/src/vespa/searchcore/proton/server/tlcproxy.h
@@ -8,18 +8,20 @@ namespace proton {
class FeedOperation;
class TlcProxy {
- vespalib::string _domain;
- search::transactionlog::Writer & _tlsDirectWriter;
+ using DoneCallback = search::transactionlog::Writer::DoneCallback;
+ using Writer = search::transactionlog::Writer;
+ vespalib::string _domain;
+ Writer & _tlsDirectWriter;
- void commit(search::SerialNum serialNum, search::transactionlog::Type type, const vespalib::nbostream &buf);
+ void commit(search::SerialNum serialNum, search::transactionlog::Type type,
+ const vespalib::nbostream &buf, DoneCallback onDone);
public:
typedef std::unique_ptr<TlcProxy> UP;
- TlcProxy(const vespalib::string & domain, search::transactionlog::Writer & writer)
+ TlcProxy(const vespalib::string & domain, Writer & writer)
: _domain(domain), _tlsDirectWriter(writer) {}
- void storeOperation(const FeedOperation &op);
+ void storeOperation(const FeedOperation &op, DoneCallback onDone);
};
} // namespace proton
-
diff --git a/searchcore/src/vespa/searchcore/proton/server/tlswriter.h b/searchcore/src/vespa/searchcore/proton/server/tlswriter.h
index 0956c0ae011..5d51580c0ad 100644
--- a/searchcore/src/vespa/searchcore/proton/server/tlswriter.h
+++ b/searchcore/src/vespa/searchcore/proton/server/tlswriter.h
@@ -2,19 +2,17 @@
#pragma once
+#include "i_operation_storer.h"
#include <vespa/searchlib/common/serialnum.h>
namespace proton {
-class FeedOperation;
-
/**
* Interface for writing to the TransactionLogServer.
*/
-struct TlsWriter {
- virtual ~TlsWriter() {}
+struct TlsWriter : public IOperationStorer {
+ virtual ~TlsWriter() = default;
- virtual void storeOperation(const FeedOperation &op) = 0;
virtual bool erase(search::SerialNum oldest_to_keep) = 0;
virtual search::SerialNum sync(search::SerialNum syncTo) = 0;
};
diff --git a/searchlib/src/vespa/searchlib/common/CMakeLists.txt b/searchlib/src/vespa/searchlib/common/CMakeLists.txt
index b1f71303449..f9db738528c 100644
--- a/searchlib/src/vespa/searchlib/common/CMakeLists.txt
+++ b/searchlib/src/vespa/searchlib/common/CMakeLists.txt
@@ -13,6 +13,7 @@ vespa_add_library(searchlib_common OBJECT
featureset.cpp
fileheadercontext.cpp
foregroundtaskexecutor.cpp
+ gatecallback.cpp
growablebitvector.cpp
indexmetainfo.cpp
location.cpp
diff --git a/searchlib/src/vespa/searchlib/common/gatecallback.cpp b/searchlib/src/vespa/searchlib/common/gatecallback.cpp
new file mode 100644
index 00000000000..a853909be71
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/common/gatecallback.cpp
@@ -0,0 +1,12 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "gatecallback.h"
+#include <vespa/vespalib/util/sync.h>
+
+namespace search {
+
+GateCallback::~GateCallback() {
+ _gate.countDown();
+}
+
+}
diff --git a/searchlib/src/vespa/searchlib/common/gatecallback.h b/searchlib/src/vespa/searchlib/common/gatecallback.h
new file mode 100644
index 00000000000..1e85d796089
--- /dev/null
+++ b/searchlib/src/vespa/searchlib/common/gatecallback.h
@@ -0,0 +1,24 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "idestructorcallback.h"
+
+namespace vespalib { class Gate; }
+
+namespace search {
+
+class GateCallback : public IDestructorCallback {
+public:
+ GateCallback(vespalib::Gate & gate) : _gate(gate) {}
+ ~GateCallback() override;
+private:
+ vespalib::Gate & _gate;
+};
+
+class IgnoreCallback : public IDestructorCallback {
+public:
+ IgnoreCallback() { }
+ ~IgnoreCallback() override = default;
+};
+
+} // namespace search
diff --git a/searchlib/src/vespa/searchlib/common/idestructorcallback.h b/searchlib/src/vespa/searchlib/common/idestructorcallback.h
index 4c42f68f0e4..77adba7a4cc 100644
--- a/searchlib/src/vespa/searchlib/common/idestructorcallback.h
+++ b/searchlib/src/vespa/searchlib/common/idestructorcallback.h
@@ -3,8 +3,7 @@
#include <memory>
-namespace search
-{
+namespace search {
/**
* Interface for class that performs a callback when instance is
@@ -17,7 +16,7 @@ class IDestructorCallback
{
public:
using SP = std::shared_ptr<IDestructorCallback>;
- virtual ~IDestructorCallback() { }
+ virtual ~IDestructorCallback() = default;
};
} // namespace search
diff --git a/searchlib/src/vespa/searchlib/transactionlog/common.h b/searchlib/src/vespa/searchlib/transactionlog/common.h
index 65ef8f363c0..db8b9727daa 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/common.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/common.h
@@ -2,6 +2,7 @@
#pragma once
#include <vespa/searchlib/common/serialnum.h>
+#include <vespa/searchlib/common/idestructorcallback.h>
#include <vespa/vespalib/objects/nbostream.h>
#include <vespa/vespalib/util/buffer.h>
@@ -90,8 +91,9 @@ int makeDirectory(const char * dir);
class Writer {
public:
+ using DoneCallback = std::shared_ptr<IDestructorCallback>;
virtual ~Writer() { }
- virtual void commit(const vespalib::string & domainName, const Packet & packet) = 0;
+ virtual void commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done) = 0;
};
}
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
index ca17457bdb9..e793aafd38f 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.cpp
@@ -453,8 +453,9 @@ void TransLogServer::domainStatus(FRT_RPCRequest *req)
}
}
-void TransLogServer::commit(const vespalib::string & domainName, const Packet & packet)
+void TransLogServer::commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done)
{
+ (void) done;
Domain::SP domain(findDomain(domainName));
if (domain) {
domain->commit(packet);
diff --git a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
index 92832786059..c12e37dd1c8 100644
--- a/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
+++ b/searchlib/src/vespa/searchlib/transactionlog/translogserver.h
@@ -31,7 +31,7 @@ public:
virtual ~TransLogServer();
DomainStats getDomainStats() const;
- void commit(const vespalib::string & domainName, const Packet & packet) override;
+ void commit(const vespalib::string & domainName, const Packet & packet, DoneCallback done) override;
class Session
diff --git a/storageapi/src/vespa/storageapi/messageapi/returncode.cpp b/storageapi/src/vespa/storageapi/messageapi/returncode.cpp
index 9825aea47bc..79a768541b9 100644
--- a/storageapi/src/vespa/storageapi/messageapi/returncode.cpp
+++ b/storageapi/src/vespa/storageapi/messageapi/returncode.cpp
@@ -93,7 +93,6 @@ ReturnCode::isNodeDownOrNetwork() const
case mbus::ErrorCode::UNKNOWN_SESSION:
case mbus::ErrorCode::HANDSHAKE_FAILED:
case mbus::ErrorCode::NO_SERVICES_FOR_ROUTE:
- case mbus::ErrorCode::SERVICE_OOS:
case mbus::ErrorCode::NETWORK_ERROR:
case mbus::ErrorCode::UNKNOWN_PROTOCOL:
case Protocol::ERROR_NODE_NOT_READY:
diff --git a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
index 7cea767f9ba..0a9fe72552c 100644
--- a/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
+++ b/vespa-http-client/src/main/java/com/yahoo/vespa/http/client/core/operationProcessor/OperationProcessor.java
@@ -131,7 +131,6 @@ public class OperationProcessor {
exceptionMessage.contains("SEND_QUEUE_CLOSED") ||
exceptionMessage.contains("ILLEGAL_ROUTE") ||
exceptionMessage.contains("NO_SERVICES_FOR_ROUTE") ||
- exceptionMessage.contains("SERVICE_OOS") ||
exceptionMessage.contains("NETWORK_ERROR") ||
exceptionMessage.contains("SEQUENCE_ERROR") ||
exceptionMessage.contains("NETWORK_SHUTDOWN") ||
diff --git a/vespalib/src/vespa/vespalib/util/sync.h b/vespalib/src/vespa/vespalib/util/sync.h
index f961c280174..86e0a227c72 100644
--- a/vespalib/src/vespa/vespalib/util/sync.h
+++ b/vespalib/src/vespa/vespalib/util/sync.h
@@ -710,7 +710,7 @@ public:
/**
* Empty. Needs to be virtual to reduce compiler warnings.
**/
- virtual ~CountDownLatch() {}
+ virtual ~CountDownLatch() = default;
};