summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.github/ISSUE_TEMPLATE/vespa-8.md15
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/admin/Admin.java6
-rw-r--r--config-model/src/main/java/com/yahoo/vespa/model/admin/Logserver.java15
-rw-r--r--config-model/src/test/java/com/yahoo/vespa/model/admin/AdminTestCase.java5
-rw-r--r--configdefinitions/src/vespa/lb-services.def1
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/SuperModelManager.java8
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/model/LbServicesProducer.java13
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/model/SuperModelConfigProvider.java5
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/SuperModelControllerTest.java7
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/SuperModelRequestHandlerTest.java5
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/model/LbServicesProducerTest.java21
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/rpc/RpcTester.java4
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java1
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java68
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/MapConverter.java26
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java161
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java36
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java11
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java10
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java227
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java29
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/rpc/MockClient.java19
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java4
-rw-r--r--flags/src/main/java/com/yahoo/vespa/flags/Flags.java12
-rw-r--r--logd/src/main/resources/configdefinitions/logd.def7
-rw-r--r--storage/src/tests/CMakeLists.txt1
-rw-r--r--storage/src/tests/distributor/CMakeLists.txt14
-rw-r--r--storage/src/tests/distributor/bucketdbupdatertest.cpp1478
29 files changed, 1117 insertions, 1094 deletions
diff --git a/.github/ISSUE_TEMPLATE/vespa-8.md b/.github/ISSUE_TEMPLATE/vespa-8.md
new file mode 100644
index 00000000000..f35e428fa12
--- /dev/null
+++ b/.github/ISSUE_TEMPLATE/vespa-8.md
@@ -0,0 +1,15 @@
+---
+name: Vespa 8
+about: Template for issues related to Vespa 8
+title: 'Vespa 8: [your description]'
+labels: Vespa 8
+assignees: ''
+
+---
+
+When creating an issue, please consider the following items:
+
+* How are users affected?
+* How can app owners check if they are affected?
+* How can tenant fix the issue?
+* How to communicate the change to the users?
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/Admin.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/Admin.java
index 8a98730c9c6..22811ae5878 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/admin/Admin.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/Admin.java
@@ -6,11 +6,9 @@ import com.yahoo.cloud.config.ZookeepersConfig;
import com.yahoo.cloud.config.log.LogdConfig;
import com.yahoo.config.application.api.DeployLogger;
import com.yahoo.config.model.ConfigModelContext.ApplicationType;
-import com.yahoo.config.model.api.ConfigServerSpec;
import com.yahoo.config.model.deploy.DeployState;
import com.yahoo.config.model.producer.AbstractConfigProducer;
import com.yahoo.config.provision.ApplicationId;
-import com.yahoo.config.provision.SystemName;
import com.yahoo.config.provision.Zone;
import com.yahoo.vespa.model.AbstractService;
import com.yahoo.vespa.model.ConfigProxy;
@@ -20,10 +18,8 @@ import com.yahoo.vespa.model.Logd;
import com.yahoo.vespa.model.admin.clustercontroller.ClusterControllerContainerCluster;
import com.yahoo.vespa.model.admin.metricsproxy.MetricsProxyContainer;
import com.yahoo.vespa.model.admin.metricsproxy.MetricsProxyContainerCluster;
-import com.yahoo.vespa.model.admin.monitoring.MetricsConsumer;
import com.yahoo.vespa.model.admin.monitoring.Monitoring;
import com.yahoo.vespa.model.admin.monitoring.builder.Metrics;
-import com.yahoo.vespa.model.container.ContainerCluster;
import com.yahoo.vespa.model.filedistribution.FileDistributionConfigProducer;
import com.yahoo.vespa.model.filedistribution.FileDistributionConfigProvider;
import com.yahoo.vespa.model.filedistribution.FileDistributor;
@@ -32,7 +28,6 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
/**
@@ -156,6 +151,7 @@ public class Admin extends AbstractConfigProducer implements Serializable {
logserver(new LogdConfig.Logserver.Builder().
use(logServerContainerCluster.isPresent() || !isHostedVespa).
host(logserver.getHostName()).
+ rpcport(logserver.getRelativePort(0)).
port(logserver.getRelativePort(1)));
}
}
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/Logserver.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/Logserver.java
index 0d641dcbd38..6c173043092 100644
--- a/config-model/src/main/java/com/yahoo/vespa/model/admin/Logserver.java
+++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/Logserver.java
@@ -9,6 +9,7 @@ import com.yahoo.vespa.model.AbstractService;
* system.
*
* @author gjoranv
+ * @author bjorncs
*/
public class Logserver extends AbstractService {
@@ -17,10 +18,10 @@ public class Logserver extends AbstractService {
public Logserver(AbstractConfigProducer parent) {
super(parent, "logserver");
- portsMeta.on(0).tag("unused");
- portsMeta.on(1).tag("logtp");
- portsMeta.on(2).tag("logtp").tag("telnet").tag("last-errors-holder");
- portsMeta.on(3).tag("logtp").tag("telnet").tag("replicator");
+ portsMeta.on(0).tag("logtp").tag("rpc");
+ portsMeta.on(1).tag("logtp").tag("legacy");
+ portsMeta.on(2).tag("unused");
+ portsMeta.on(3).tag("unused");
setProp("clustertype", "admin");
setProp("clustername", "admin");
}
@@ -37,6 +38,8 @@ public class Logserver extends AbstractService {
*/
private String getMyJVMArgs() {
StringBuilder sb = new StringBuilder();
+ sb.append("-Dlogserver.rpcListenPort=").append(getRelativePort(0));
+ sb.append(" ");
sb.append("-Dlogserver.listenport=").append(getRelativePort(1));
sb.append(" ");
sb.append("-Dlogserver.logarchive.dir=" + logArchiveDir);
@@ -56,7 +59,7 @@ public class Logserver extends AbstractService {
* @return 'true' always
*/
public boolean requiresWantedPort() {
- return true;
+ return true; // TODO Support dynamic port allocation for logserver
}
/**
@@ -68,7 +71,7 @@ public class Logserver extends AbstractService {
@Override
public String[] getPortSuffixes() {
- return new String[]{ "unused", "logtp", "last.errors", "replicator" };
+ return new String[]{ "rpc", "legacy", "unused/1", "unused/2" };
}
}
diff --git a/config-model/src/test/java/com/yahoo/vespa/model/admin/AdminTestCase.java b/config-model/src/test/java/com/yahoo/vespa/model/admin/AdminTestCase.java
index 749ee2b9acc..6d65c2a472e 100644
--- a/config-model/src/test/java/com/yahoo/vespa/model/admin/AdminTestCase.java
+++ b/config-model/src/test/java/com/yahoo/vespa/model/admin/AdminTestCase.java
@@ -18,6 +18,7 @@ import com.yahoo.container.StatisticsConfig;
import com.yahoo.container.jdisc.config.HealthMonitorConfig;
import com.yahoo.net.HostName;
import com.yahoo.vespa.config.core.StateserverConfig;
+import com.yahoo.vespa.model.Service;
import com.yahoo.vespa.model.VespaModel;
import com.yahoo.vespa.model.container.ApplicationContainerCluster;
import com.yahoo.vespa.model.container.component.Component;
@@ -28,7 +29,6 @@ import org.junit.Test;
import java.util.Set;
-import static com.yahoo.config.model.api.container.ContainerServiceType.METRICS_PROXY_CONTAINER;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -94,6 +94,9 @@ public class AdminTestCase {
LogdConfig lc = new LogdConfig(lb);
assertEquals(lc.logserver().host(), localhost);
+ Service logserver = vespaModel.getService("admin/logserver").get();
+ assertEquals(logserver.getRelativePort(0), lc.logserver().rpcport());
+
// Verify services in the sentinel config
SentinelConfig.Builder b = new SentinelConfig.Builder();
vespaModel.getConfig(b, localhostConfigId);
diff --git a/configdefinitions/src/vespa/lb-services.def b/configdefinitions/src/vespa/lb-services.def
index 33c568061fe..189b39461ec 100644
--- a/configdefinitions/src/vespa/lb-services.def
+++ b/configdefinitions/src/vespa/lb-services.def
@@ -7,6 +7,7 @@ namespace=cloud.config
# Active rotation given as flag 'active' for a prod region in deployment.xml
# Default true for now (since code in config-model to set it is not ready yet), should have no default value
tenants{}.applications{}.activeRotation bool default=true
+tenants{}.applications{}.upstreamHttps bool default=false
tenants{}.applications{}.hosts{}.hostname string default="(unknownhostname)"
tenants{}.applications{}.hosts{}.services{}.type string default="(noservicetype)"
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/SuperModelManager.java b/configserver/src/main/java/com/yahoo/vespa/config/server/SuperModelManager.java
index 80818aea2e8..eeae770da43 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/SuperModelManager.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/SuperModelManager.java
@@ -14,6 +14,7 @@ import com.yahoo.config.provision.Zone;
import com.yahoo.vespa.config.GenerationCounter;
import com.yahoo.vespa.config.server.application.ApplicationSet;
import com.yahoo.vespa.config.server.model.SuperModelConfigProvider;
+import com.yahoo.vespa.flags.FlagSource;
import java.time.Instant;
import java.util.ArrayList;
@@ -28,6 +29,7 @@ public class SuperModelManager implements SuperModelProvider {
private final Zone zone;
private final Object monitor = new Object();
+ private final FlagSource flagSource;
private SuperModelConfigProvider superModelConfigProvider; // Guarded by 'this' monitor
private final List<SuperModelListener> listeners = new ArrayList<>(); // Guarded by 'this' monitor
@@ -39,7 +41,9 @@ public class SuperModelManager implements SuperModelProvider {
@Inject
public SuperModelManager(ConfigserverConfig configserverConfig,
NodeFlavors nodeFlavors,
- GenerationCounter generationCounter) {
+ GenerationCounter generationCounter,
+ FlagSource flagSource) {
+ this.flagSource = flagSource;
this.zone = new Zone(configserverConfig, nodeFlavors);
this.generationCounter = generationCounter;
this.masterGeneration = configserverConfig.masterGeneration();
@@ -107,6 +111,6 @@ public class SuperModelManager implements SuperModelProvider {
private void makeNewSuperModelConfigProvider(SuperModel newSuperModel) {
generation = masterGeneration + generationCounter.get();
- superModelConfigProvider = new SuperModelConfigProvider(newSuperModel, zone);
+ superModelConfigProvider = new SuperModelConfigProvider(newSuperModel, zone, flagSource);
}
}
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/model/LbServicesProducer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/model/LbServicesProducer.java
index ceeea197440..1503e4d2397 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/model/LbServicesProducer.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/model/LbServicesProducer.java
@@ -9,6 +9,10 @@ import com.yahoo.config.model.api.ServiceInfo;
import com.yahoo.config.provision.ApplicationId;
import com.yahoo.config.provision.TenantName;
import com.yahoo.config.provision.Zone;
+import com.yahoo.vespa.flags.BooleanFlag;
+import com.yahoo.vespa.flags.FetchVector;
+import com.yahoo.vespa.flags.FlagSource;
+import com.yahoo.vespa.flags.Flags;
import java.util.Collections;
import java.util.Comparator;
@@ -31,10 +35,12 @@ public class LbServicesProducer implements LbServicesConfig.Producer {
private final Map<TenantName, Set<ApplicationInfo>> models;
private final Zone zone;
+ private final BooleanFlag useHttpsLoadBalancerUpstream;
- public LbServicesProducer(Map<TenantName, Set<ApplicationInfo>> models, Zone zone) {
+ public LbServicesProducer(Map<TenantName, Set<ApplicationInfo>> models, Zone zone, FlagSource flagSource) {
this.models = models;
this.zone = zone;
+ this.useHttpsLoadBalancerUpstream = Flags.USE_HTTPS_LOAD_BALANCER_UPSTREAM.bindTo(flagSource);
}
@Override
@@ -61,6 +67,7 @@ public class LbServicesProducer implements LbServicesConfig.Producer {
private LbServicesConfig.Tenants.Applications.Builder getAppConfig(ApplicationInfo app) {
LbServicesConfig.Tenants.Applications.Builder ab = new LbServicesConfig.Tenants.Applications.Builder();
ab.activeRotation(getActiveRotation(app));
+ ab.upstreamHttps(useHttpsLoadBalancerUpstream(app));
app.getModel().getHosts().stream()
.sorted((a, b) -> a.getHostname().compareTo(b.getHostname()))
.forEach(hostInfo -> ab.hosts(hostInfo.getHostname(), getHostsConfig(hostInfo)));
@@ -81,6 +88,10 @@ public class LbServicesProducer implements LbServicesConfig.Producer {
return activeRotation;
}
+ private boolean useHttpsLoadBalancerUpstream(ApplicationInfo app) {
+ return useHttpsLoadBalancerUpstream.with(FetchVector.Dimension.APPLICATION_ID, app.getApplicationId().serializedForm()).value();
+ }
+
private LbServicesConfig.Tenants.Applications.Hosts.Builder getHostsConfig(HostInfo hostInfo) {
LbServicesConfig.Tenants.Applications.Hosts.Builder hb = new LbServicesConfig.Tenants.Applications.Hosts.Builder();
hb.hostname(hostInfo.getHostname());
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/model/SuperModelConfigProvider.java b/configserver/src/main/java/com/yahoo/vespa/config/server/model/SuperModelConfigProvider.java
index 4a75414c272..75f53667c4a 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/model/SuperModelConfigProvider.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/model/SuperModelConfigProvider.java
@@ -11,6 +11,7 @@ import com.yahoo.config.provision.ApplicationId;
import com.yahoo.config.provision.Zone;
import com.yahoo.vespa.config.ConfigKey;
import com.yahoo.vespa.config.ConfigPayload;
+import com.yahoo.vespa.flags.FlagSource;
import java.util.Collections;
import java.util.Map;
@@ -26,9 +27,9 @@ public class SuperModelConfigProvider implements LbServicesConfig.Producer, Rout
private final LbServicesProducer lbProd;
private final RoutingProducer zoneProd;
- public SuperModelConfigProvider(SuperModel superModel, Zone zone) {
+ public SuperModelConfigProvider(SuperModel superModel, Zone zone, FlagSource flagSource) {
this.superModel = superModel;
- this.lbProd = new LbServicesProducer(Collections.unmodifiableMap(superModel.getModelsPerTenant()), zone);
+ this.lbProd = new LbServicesProducer(Collections.unmodifiableMap(superModel.getModelsPerTenant()), zone, flagSource);
this.zoneProd = new RoutingProducer(Collections.unmodifiableMap(superModel.getModelsPerTenant()));
}
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/SuperModelControllerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/SuperModelControllerTest.java
index 97496bd4177..c6f6be5fbab 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/SuperModelControllerTest.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/SuperModelControllerTest.java
@@ -22,6 +22,7 @@ import com.yahoo.vespa.config.protocol.JRTServerConfigRequestV3;
import com.yahoo.vespa.config.protocol.Trace;
import com.yahoo.vespa.config.server.model.SuperModelConfigProvider;
import com.yahoo.vespa.config.server.rpc.UncompressedConfigResponseFactory;
+import com.yahoo.vespa.flags.InMemoryFlagSource;
import com.yahoo.vespa.model.VespaModel;
import org.junit.Before;
import org.junit.Test;
@@ -55,7 +56,7 @@ public class SuperModelControllerTest {
ApplicationName.from("foo"), InstanceName.defaultName());
models.put(app, new ApplicationInfo(app, 4l, new VespaModel(FilesApplicationPackage.fromFile(testApp))));
SuperModel superModel = new SuperModel(models);
- handler = new SuperModelController(new SuperModelConfigProvider(superModel, Zone.defaultZone()), new TestConfigDefinitionRepo(), 2, new UncompressedConfigResponseFactory());
+ handler = new SuperModelController(new SuperModelConfigProvider(superModel, Zone.defaultZone(), new InMemoryFlagSource()), new TestConfigDefinitionRepo(), 2, new UncompressedConfigResponseFactory());
}
@Test
@@ -98,7 +99,7 @@ public class SuperModelControllerTest {
models.put(tooAdvanced, createApplicationInfo(testApp3, tooAdvanced, 4l));
SuperModel superModel = new SuperModel(models);
- SuperModelController han = new SuperModelController(new SuperModelConfigProvider(superModel, Zone.defaultZone()), new TestConfigDefinitionRepo(), 2, new UncompressedConfigResponseFactory());
+ SuperModelController han = new SuperModelController(new SuperModelConfigProvider(superModel, Zone.defaultZone(), new InMemoryFlagSource()), new TestConfigDefinitionRepo(), 2, new UncompressedConfigResponseFactory());
LbServicesConfig.Builder lb = new LbServicesConfig.Builder();
han.getSuperModel().getConfig(lb);
LbServicesConfig lbc = new LbServicesConfig(lb);
@@ -126,7 +127,7 @@ public class SuperModelControllerTest {
models.put(tooAdvanced, createApplicationInfo(testApp3, tooAdvanced, 4l));
SuperModel superModel = new SuperModel(models);
- SuperModelController han = new SuperModelController(new SuperModelConfigProvider(superModel, Zone.defaultZone()), new TestConfigDefinitionRepo(), 2, new UncompressedConfigResponseFactory());
+ SuperModelController han = new SuperModelController(new SuperModelConfigProvider(superModel, Zone.defaultZone(), new InMemoryFlagSource()), new TestConfigDefinitionRepo(), 2, new UncompressedConfigResponseFactory());
LbServicesConfig.Builder lb = new LbServicesConfig.Builder();
han.getSuperModel().getConfig(lb);
LbServicesConfig lbc = new LbServicesConfig(lb);
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/SuperModelRequestHandlerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/SuperModelRequestHandlerTest.java
index 3288b418bb1..35ad97b7b43 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/SuperModelRequestHandlerTest.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/SuperModelRequestHandlerTest.java
@@ -11,6 +11,7 @@ import com.yahoo.config.provision.ApplicationId;
import com.yahoo.vespa.config.server.application.ApplicationSet;
import com.yahoo.vespa.config.server.monitoring.MetricUpdater;
import com.yahoo.vespa.curator.mock.MockCurator;
+import com.yahoo.vespa.flags.InMemoryFlagSource;
import com.yahoo.vespa.model.VespaModel;
import org.junit.Before;
@@ -45,7 +46,7 @@ public class SuperModelRequestHandlerTest {
public void setup() {
counter = new SuperModelGenerationCounter(new MockCurator());
ConfigserverConfig configserverConfig = new ConfigserverConfig(new ConfigserverConfig.Builder());
- manager = new SuperModelManager(configserverConfig, emptyNodeFlavors(), counter);
+ manager = new SuperModelManager(configserverConfig, emptyNodeFlavors(), counter, new InMemoryFlagSource());
controller = new SuperModelRequestHandler(new TestConfigDefinitionRepo(), configserverConfig, manager);
}
@@ -97,7 +98,7 @@ public class SuperModelRequestHandlerTest {
ApplicationId foo = applicationId("a", "foo");
long masterGen = 10;
ConfigserverConfig configserverConfig = new ConfigserverConfig(new ConfigserverConfig.Builder().masterGeneration(masterGen));
- manager = new SuperModelManager(configserverConfig, emptyNodeFlavors(), counter);
+ manager = new SuperModelManager(configserverConfig, emptyNodeFlavors(), counter, new InMemoryFlagSource());
controller = new SuperModelRequestHandler(new TestConfigDefinitionRepo(), configserverConfig, manager);
long gen = counter.increment();
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/model/LbServicesProducerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/model/LbServicesProducerTest.java
index 352245757ca..0e124addaf7 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/model/LbServicesProducerTest.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/model/LbServicesProducerTest.java
@@ -16,6 +16,8 @@ import com.yahoo.config.provision.Rotation;
import com.yahoo.config.provision.TenantName;
import com.yahoo.config.provision.Zone;
import com.yahoo.vespa.config.ConfigPayload;
+import com.yahoo.vespa.flags.Flags;
+import com.yahoo.vespa.flags.InMemoryFlagSource;
import com.yahoo.vespa.model.VespaModel;
import org.junit.Test;
import org.xml.sax.SAXException;
@@ -44,6 +46,7 @@ public class LbServicesProducerTest {
private static final String rotation2 = "rotation-2";
private static final String rotationString = rotation1 + "," + rotation2;
private static final Set<Rotation> rotations = Collections.singleton(new Rotation(rotationString));
+ private final InMemoryFlagSource flagSource = new InMemoryFlagSource();
@Test
public void testDeterministicGetConfig() throws IOException, SAXException {
@@ -87,6 +90,22 @@ public class LbServicesProducerTest {
}
}
+ @Test
+ public void https_upstream_is_configured_from_feature_flag() throws IOException, SAXException {
+ {
+ flagSource.withBooleanFlag(Flags.USE_HTTPS_LOAD_BALANCER_UPSTREAM.id(), true);
+ RegionName regionName = RegionName.from("us-east-1");
+ LbServicesConfig conf = createModelAndGetLbServicesConfig(regionName);
+ assertTrue(conf.tenants("foo").applications("foo:prod:" + regionName.value() + ":default").upstreamHttps());
+ }
+ {
+ flagSource.withBooleanFlag(Flags.USE_HTTPS_LOAD_BALANCER_UPSTREAM.id(), false);
+ RegionName regionName = RegionName.from("us-east-2");
+ LbServicesConfig conf = createModelAndGetLbServicesConfig(regionName);
+ assertFalse(conf.tenants("foo").applications("foo:prod:" + regionName.value() + ":default").upstreamHttps());
+ }
+ }
+
private LbServicesConfig createModelAndGetLbServicesConfig(RegionName regionName) throws IOException, SAXException {
Zone zone = new Zone(Environment.prod, regionName);
Map<TenantName, Set<ApplicationInfo>> testModel = createTestModel(new DeployState.Builder()
@@ -96,7 +115,7 @@ public class LbServicesProducerTest {
}
private LbServicesConfig getLbServicesConfig(Zone zone, Map<TenantName, Set<ApplicationInfo>> testModel) {
- LbServicesProducer producer = new LbServicesProducer(testModel, zone);
+ LbServicesProducer producer = new LbServicesProducer(testModel, zone, flagSource);
LbServicesConfig.Builder builder = new LbServicesConfig.Builder();
producer.getConfig(builder);
return new LbServicesConfig(builder);
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/RpcTester.java b/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/RpcTester.java
index 43a2c01f26a..dd66f720b1f 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/RpcTester.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/RpcTester.java
@@ -18,6 +18,7 @@ import com.yahoo.vespa.config.server.host.HostRegistries;
import com.yahoo.vespa.config.server.monitoring.Metrics;
import com.yahoo.vespa.config.server.tenant.MockTenantProvider;
import com.yahoo.vespa.config.server.tenant.TenantHandlerProvider;
+import com.yahoo.vespa.flags.InMemoryFlagSource;
import org.junit.After;
import org.junit.rules.TemporaryFolder;
@@ -93,7 +94,8 @@ public class RpcTester implements AutoCloseable {
new SuperModelManager(
configserverConfig,
emptyNodeFlavors(),
- generationCounter)),
+ generationCounter,
+ new InMemoryFlagSource())),
Metrics.createTestMetrics(), new HostRegistries(),
hostLivenessTracker, new FileServer(temporaryFolder.newFolder()));
rpcServer.onTenantCreate(TenantName.from("default"), tenantProvider);
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java
index dccda0bf733..df72720a46c 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java
@@ -52,7 +52,6 @@ import java.util.logging.Logger;
*
* @author baldersheim
*/
-@SuppressWarnings("deprecation")
public abstract class VespaBackEndSearcher extends PingableSearcher {
static final CompoundName PACKET_COMPRESSION_LIMIT = new CompoundName("packetcompressionlimit");
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index cf5bcedcf51..74d9c38b273 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -48,7 +48,7 @@ public class Dispatcher extends AbstractComponent {
private static final CompoundName dispatchInternal = new CompoundName("dispatch.internal");
/** If enabled, search queries will use protobuf rpc */
- private static final CompoundName dispatchProtobuf = new CompoundName("dispatch.protobuf");
+ public static final CompoundName dispatchProtobuf = new CompoundName("dispatch.protobuf");
/** A model of the search cluster this dispatches to */
private final SearchCluster searchCluster;
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java
index 019e07221a6..4422538cff6 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java
@@ -18,42 +18,46 @@ interface Client {
int uncompressedLength, byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver,
double timeoutSeconds);
- void search(NodeConnection node, CompressionType compression,
- int uncompressedLength, byte[] compressedPayload, RpcSearchInvoker responseReceiver,
- double timeoutSeconds);
+ void request(String rpcMethod, NodeConnection node, CompressionType compression, int uncompressedLength,
+ byte[] compressedPayload, ResponseReceiver responseReceiver, double timeoutSeconds);
/** Creates a connection to a particular node in this */
NodeConnection createConnection(String hostname, int port);
- class GetDocsumsResponseOrError {
+ interface ResponseReceiver {
+ void receive(ResponseOrError<ProtobufResponse> response);
+ }
+
+ class ResponseOrError<T> {
+ final Optional<T> response;
+ final Optional<String> error;
- // One of these will be non empty and the other not
- private Optional<GetDocsumsResponse> response;
- private Optional<String> error;
+ public static <T> ResponseOrError<T> fromResponse(T response) {
+ return new ResponseOrError<>(response);
+ }
- public static GetDocsumsResponseOrError fromResponse(GetDocsumsResponse response) {
- return new GetDocsumsResponseOrError(Optional.of(response), Optional.empty());
+ public static <T> ResponseOrError<T> fromError(String error) {
+ return new ResponseOrError<T>(error);
}
- public static GetDocsumsResponseOrError fromError(String error) {
- return new GetDocsumsResponseOrError(Optional.empty(), Optional.of(error));
+ ResponseOrError(T response) {
+ this.response = Optional.of(response);
+ this.error = Optional.empty();
}
- private GetDocsumsResponseOrError(Optional<GetDocsumsResponse> response, Optional<String> error) {
- this.response = response;
- this.error = error;
+ ResponseOrError(String error) {
+ this.response = Optional.empty();
+ this.error = Optional.of(error);
}
/** Returns the response, or empty if there is an error */
- public Optional<GetDocsumsResponse> response() { return response; }
+ public Optional<T> response() { return response; }
/** Returns the error or empty if there is a response */
public Optional<String> error() { return error; }
-
}
class GetDocsumsResponse {
-
private final byte compression;
private final int uncompressedSize;
private final byte[] compressedSlimeBytes;
@@ -91,38 +95,12 @@ interface Client {
}
- class SearchResponseOrError {
- // One of these will be non empty and the other not
- private Optional<SearchResponse> response;
- private Optional<String> error;
-
- public static SearchResponseOrError fromResponse(SearchResponse response) {
- return new SearchResponseOrError(Optional.of(response), Optional.empty());
- }
-
- public static SearchResponseOrError fromError(String error) {
- return new SearchResponseOrError(Optional.empty(), Optional.of(error));
- }
-
- private SearchResponseOrError(Optional<SearchResponse> response, Optional<String> error) {
- this.response = response;
- this.error = error;
- }
-
- /** Returns the response, or empty if there is an error */
- public Optional<SearchResponse> response() { return response; }
-
- /** Returns the error or empty if there is a response */
- public Optional<String> error() { return error; }
-
- }
-
- class SearchResponse {
+ class ProtobufResponse {
private final byte compression;
private final int uncompressedSize;
private final byte[] compressedPayload;
- public SearchResponse(byte compression, int uncompressedSize, byte[] compressedPayload) {
+ public ProtobufResponse(byte compression, int uncompressedSize, byte[] compressedPayload) {
this.compression = compression;
this.uncompressedSize = uncompressedSize;
this.compressedPayload = compressedPayload;
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/MapConverter.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/MapConverter.java
index 817ecfe0091..74828dd6740 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/MapConverter.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/MapConverter.java
@@ -10,46 +10,42 @@ import com.yahoo.tensor.serialization.TypedBinaryFormat;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.function.Consumer;
/**
* @author ollivir
*/
public class MapConverter {
- @FunctionalInterface
- public interface PropertyInserter<T> {
- void add(T prop);
- }
-
- public static void convertMapTensors(Map<String, Object> map, PropertyInserter<TensorProperty.Builder> inserter) {
+ public static void convertMapTensors(Map<String, Object> map, Consumer<TensorProperty.Builder> inserter) {
for (var entry : map.entrySet()) {
var value = entry.getValue();
if (value instanceof Tensor) {
byte[] tensor = TypedBinaryFormat.encode((Tensor) value);
- inserter.add(TensorProperty.newBuilder().setName(entry.getKey()).setValue(ByteString.copyFrom(tensor)));
+ inserter.accept(TensorProperty.newBuilder().setName(entry.getKey()).setValue(ByteString.copyFrom(tensor)));
}
}
}
- public static void convertMapStrings(Map<String, Object> map, PropertyInserter<StringProperty.Builder> inserter) {
+ public static void convertMapStrings(Map<String, Object> map, Consumer<StringProperty.Builder> inserter) {
for (var entry : map.entrySet()) {
var value = entry.getValue();
if (!(value instanceof Tensor)) {
- inserter.add(StringProperty.newBuilder().setName(entry.getKey()).addValues(value.toString()));
+ inserter.accept(StringProperty.newBuilder().setName(entry.getKey()).addValues(value.toString()));
}
}
}
- public static void convertStringMultiMap(Map<String, List<String>> map, PropertyInserter<StringProperty.Builder> inserter) {
+ public static void convertStringMultiMap(Map<String, List<String>> map, Consumer<StringProperty.Builder> inserter) {
for (var entry : map.entrySet()) {
var values = entry.getValue();
if (values != null) {
- inserter.add(StringProperty.newBuilder().setName(entry.getKey()).addAllValues(values));
+ inserter.accept(StringProperty.newBuilder().setName(entry.getKey()).addAllValues(values));
}
}
}
- public static void convertMultiMap(Map<String, List<Object>> map, PropertyInserter<StringProperty.Builder> stringInserter,
- PropertyInserter<TensorProperty.Builder> tensorInserter) {
+ public static void convertMultiMap(Map<String, List<Object>> map, Consumer<StringProperty.Builder> stringInserter,
+ Consumer<TensorProperty.Builder> tensorInserter) {
for (var entry : map.entrySet()) {
if (entry.getValue() != null) {
var key = entry.getKey();
@@ -58,14 +54,14 @@ public class MapConverter {
if (value != null) {
if (value instanceof Tensor) {
byte[] tensor = TypedBinaryFormat.encode((Tensor) value);
- tensorInserter.add(TensorProperty.newBuilder().setName(key).setValue(ByteString.copyFrom(tensor)));
+ tensorInserter.accept(TensorProperty.newBuilder().setName(key).setValue(ByteString.copyFrom(tensor)));
} else {
stringValues.add(value.toString());
}
}
}
if (!stringValues.isEmpty()) {
- stringInserter.add(StringProperty.newBuilder().setName(key).addAllValues(stringValues));
+ stringInserter.accept(StringProperty.newBuilder().setName(key).addAllValues(stringValues));
}
}
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java
index 1c1c9ccd115..9903aacdda0 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java
@@ -1,7 +1,8 @@
package com.yahoo.search.dispatch.rpc;
import ai.vespa.searchlib.searchprotocol.protobuf.SearchProtocol;
-import ai.vespa.searchlib.searchprotocol.protobuf.SearchProtocol.SearchRequest.Builder;
+import ai.vespa.searchlib.searchprotocol.protobuf.SearchProtocol.StringProperty;
+import ai.vespa.searchlib.searchprotocol.protobuf.SearchProtocol.TensorProperty;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.yahoo.document.GlobalId;
@@ -15,11 +16,10 @@ import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.grouping.vespa.GroupingExecutor;
import com.yahoo.search.query.Model;
+import com.yahoo.search.query.QueryTree;
import com.yahoo.search.query.Ranking;
import com.yahoo.search.query.Sorting;
import com.yahoo.search.query.Sorting.Order;
-import com.yahoo.search.query.ranking.RankFeatures;
-import com.yahoo.search.query.ranking.RankProperties;
import com.yahoo.search.result.Coverage;
import com.yahoo.search.result.Relevance;
import com.yahoo.searchlib.aggregation.Grouping;
@@ -28,31 +28,24 @@ import com.yahoo.vespa.objects.BufferSerializer;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.function.Consumer;
public class ProtobufSerialization {
private static final int INITIAL_SERIALIZATION_BUFFER_SIZE = 10 * 1024;
- public static byte[] serializeQuery(Query query, String serverId, boolean includeQueryData) {
- return convertFromQuery(query, serverId, includeQueryData).toByteArray();
+ public static byte[] serializeSearchRequest(Query query, String serverId) {
+ return convertFromQuery(query, serverId).toByteArray();
}
- public static byte[] serializeResult(Result searchResult) {
- return convertFromResult(searchResult).toByteArray();
- }
-
- public static Result deserializeToResult(byte[] payload, Query query, VespaBackEndSearcher searcher)
- throws InvalidProtocolBufferException {
- var protobuf = SearchProtocol.SearchReply.parseFrom(payload);
- var result = convertToResult(query, protobuf, searcher.getDocumentDatabase(query));
- return result;
- }
-
- private static SearchProtocol.SearchRequest convertFromQuery(Query query, String serverId, boolean includeQueryData) {
+ private static SearchProtocol.SearchRequest convertFromQuery(Query query, String serverId) {
var builder = SearchProtocol.SearchRequest.newBuilder().setHits(query.getHits()).setOffset(query.getOffset())
.setTimeout((int) query.getTimeLeft());
- mergeToRequestFromRanking(query.getRanking(), builder, includeQueryData);
- mergeToRequestFromModel(query.getModel(), builder);
+ var documentDb = query.getModel().getDocumentDb();
+ if (documentDb != null) {
+ builder.setDocumentType(documentDb);
+ }
+ builder.setQueryTreeBlob(serializeQueryTree(query.getModel().getQueryTree()));
if (query.getGroupingSessionCache() || query.getRanking().getQueryCache()) {
// TODO verify that the session key is included whenever rank properties would have been
@@ -71,71 +64,101 @@ public class ProtobufSerialization {
gbuf.getBuf().flip();
builder.setGroupingBlob(ByteString.copyFrom(gbuf.getBuf().getByteBuffer()));
}
-
if (query.getGroupingSessionCache()) {
builder.setCacheGrouping(true);
}
+ mergeToSearchRequestFromRanking(query.getRanking(), builder);
+
return builder.build();
}
- private static void mergeToRequestFromModel(Model model, SearchProtocol.SearchRequest.Builder builder) {
- if (model.getDocumentDb() != null) {
- builder.setDocumentType(model.getDocumentDb());
+ private static void mergeToSearchRequestFromRanking(Ranking ranking, SearchProtocol.SearchRequest.Builder builder) {
+ builder.setRankProfile(ranking.getProfile());
+
+ if (ranking.getQueryCache()) {
+ builder.setCacheQuery(true);
}
- int bufferSize = INITIAL_SERIALIZATION_BUFFER_SIZE;
- boolean success = false;
- while (!success) {
- try {
- ByteBuffer treeBuffer = ByteBuffer.allocate(bufferSize);
- model.getQueryTree().encode(treeBuffer);
- treeBuffer.flip();
- builder.setQueryTreeBlob(ByteString.copyFrom(treeBuffer));
- success = true;
- } catch (java.nio.BufferOverflowException e) {
- bufferSize *= 2;
- }
+ if (ranking.getSorting() != null) {
+ mergeToSearchRequestFromSorting(ranking.getSorting(), builder);
}
+ if (ranking.getLocation() != null) {
+ builder.setGeoLocation(ranking.getLocation().toString());
+ }
+
+ var featureMap = ranking.getFeatures().asMap();
+ MapConverter.convertMapStrings(featureMap, builder::addFeatureOverrides);
+ MapConverter.convertMapTensors(featureMap, builder::addTensorFeatureOverrides);
+ mergeRankProperties(ranking, builder::addRankProperties, builder::addTensorRankProperties);
}
- private static void mergeToRequestFromSorting(Sorting sorting, SearchProtocol.SearchRequest.Builder builder, boolean includeQueryData) {
+ private static void mergeToSearchRequestFromSorting(Sorting sorting, SearchProtocol.SearchRequest.Builder builder) {
for (var field : sorting.fieldOrders()) {
- var sortField = SearchProtocol.SortField.newBuilder().setField(field.getSorter().getName())
+ var sortField = SearchProtocol.SortField.newBuilder()
+ .setField(field.getSorter().getName())
.setAscending(field.getSortOrder() == Order.ASCENDING).build();
builder.addSorting(sortField);
}
}
- private static void mergeToRequestFromRanking(Ranking ranking, SearchProtocol.SearchRequest.Builder builder, boolean includeQueryData) {
- builder.setRankProfile(ranking.getProfile());
- if (ranking.getQueryCache()) {
- builder.setCacheQuery(true);
+ public static SearchProtocol.DocsumRequest.Builder createDocsumRequestBuilder(Query query, String serverId, String summaryClass,
+ boolean includeQueryData) {
+ var builder = SearchProtocol.DocsumRequest.newBuilder()
+ .setTimeout((int) query.getTimeLeft())
+ .setDumpFeatures(query.properties().getBoolean(Ranking.RANKFEATURES, false));
+
+ if (summaryClass != null) {
+ builder.setSummaryClass(summaryClass);
}
- if (ranking.getSorting() != null) {
- mergeToRequestFromSorting(ranking.getSorting(), builder, includeQueryData);
+
+ var documentDb = query.getModel().getDocumentDb();
+ if (documentDb != null) {
+ builder.setDocumentType(documentDb);
}
- if (ranking.getLocation() != null) {
- builder.setGeoLocation(ranking.getLocation().toString());
+
+ var ranking = query.getRanking();
+ if (ranking.getQueryCache()) {
+ builder.setCacheQuery(true);
+ builder.setSessionKey(query.getSessionId(serverId).toString());
}
- mergeToRequestFromRankFeatures(ranking.getFeatures(), builder, includeQueryData);
- mergeToRequestFromRankProperties(ranking.getProperties(), builder, includeQueryData);
- }
+ builder.setRankProfile(query.getRanking().getProfile());
- private static void mergeToRequestFromRankFeatures(RankFeatures features, SearchProtocol.SearchRequest.Builder builder, boolean includeQueryData) {
if (includeQueryData) {
- MapConverter.convertMapStrings(features.asMap(), builder::addFeatureOverrides);
- MapConverter.convertMapTensors(features.asMap(), builder::addTensorFeatureOverrides);
+ mergeQueryDataToDocsumRequest(query, builder);
}
+
+ return builder;
}
- private static void mergeToRequestFromRankProperties(RankProperties properties, Builder builder, boolean includeQueryData) {
- if (includeQueryData) {
- MapConverter.convertMultiMap(properties.asMap(), propB -> {
- if (!GetDocSumsPacket.sessionIdKey.equals(propB.getName())) {
- builder.addRankProperties(propB);
- }
- }, builder::addTensorRankProperties);
+ public static byte[] serializeDocsumRequest(SearchProtocol.DocsumRequest.Builder builder, List<FastHit> documents) {
+ builder.clearGlobalIds();
+ for (var hit : documents) {
+ builder.addGlobalIds(ByteString.copyFrom(hit.getGlobalId().getRawId()));
}
+ return builder.build().toByteArray();
+ }
+
+ private static void mergeQueryDataToDocsumRequest(Query query, SearchProtocol.DocsumRequest.Builder builder) {
+ var ranking = query.getRanking();
+ var featureMap = ranking.getFeatures().asMap();
+
+ builder.setQueryTreeBlob(serializeQueryTree(query.getModel().getQueryTree()));
+ builder.setGeoLocation(ranking.getLocation().toString());
+ MapConverter.convertMapStrings(featureMap, builder::addFeatureOverrides);
+ MapConverter.convertMapTensors(featureMap, builder::addTensorFeatureOverrides);
+ MapConverter.convertStringMultiMap(query.getPresentation().getHighlight().getHighlightTerms(), builder::addHighlightTerms);
+ mergeRankProperties(ranking, builder::addRankProperties, builder::addTensorRankProperties);
+ }
+
+ public static byte[] serializeResult(Result searchResult) {
+ return convertFromResult(searchResult).toByteArray();
+ }
+
+ public static Result deserializeToSearchResult(byte[] payload, Query query, VespaBackEndSearcher searcher)
+ throws InvalidProtocolBufferException {
+ var protobuf = SearchProtocol.SearchReply.parseFrom(payload);
+ var result = convertToResult(query, protobuf, searcher.getDocumentDatabase(query));
+ return result;
}
private static Result convertToResult(Query query, SearchProtocol.SearchReply protobuf, DocumentDatabase documentDatabase) {
@@ -211,4 +234,26 @@ public class ProtobufSerialization {
return builder.build();
}
+ private static ByteString serializeQueryTree(QueryTree queryTree) {
+ int bufferSize = INITIAL_SERIALIZATION_BUFFER_SIZE;
+ while (true) {
+ try {
+ ByteBuffer treeBuffer = ByteBuffer.allocate(bufferSize);
+ queryTree.encode(treeBuffer);
+ treeBuffer.flip();
+ return ByteString.copyFrom(treeBuffer);
+ } catch (java.nio.BufferOverflowException e) {
+ bufferSize *= 2;
+ }
+ }
+ }
+
+ private static void mergeRankProperties(Ranking ranking, Consumer<StringProperty.Builder> stringProperties,
+ Consumer<TensorProperty.Builder> tensorProperties) {
+ MapConverter.convertMultiMap(ranking.getProperties().asMap(), propB -> {
+ if (!GetDocSumsPacket.sessionIdKey.equals(propB.getName())) {
+ stringProperties.accept(propB);
+ }
+ }, tensorProperties);
+ }
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java
index 32a7917d43c..2aa01b05955 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java
@@ -22,7 +22,6 @@ import java.util.List;
* @author bratseth
*/
class RpcClient implements Client {
-
private final Supervisor supervisor = new Supervisor(new Transport());
@Override
@@ -44,15 +43,15 @@ class RpcClient implements Client {
}
@Override
- public void search(NodeConnection node, CompressionType compression, int uncompressedLength, byte[] compressedPayload,
- RpcSearchInvoker responseReceiver, double timeoutSeconds) {
- Request request = new Request("vespa.searchprotocol.search");
+ public void request(String rpcMethod, NodeConnection node, CompressionType compression, int uncompressedLength, byte[] compressedPayload,
+ ResponseReceiver responseReceiver, double timeoutSeconds) {
+ Request request = new Request(rpcMethod);
request.parameters().add(new Int8Value(compression.getCode()));
request.parameters().add(new Int32Value(uncompressedLength));
request.parameters().add(new DataValue(compressedPayload));
RpcNodeConnection rpcNode = ((RpcNodeConnection) node);
- rpcNode.invokeAsync(request, timeoutSeconds, new RpcSearchResponseWaiter(rpcNode, responseReceiver));
+ rpcNode.invokeAsync(request, timeoutSeconds, new RpcProtobufResponseWaiter(rpcNode, responseReceiver));
}
private static class RpcNodeConnection implements NodeConnection {
@@ -111,40 +110,37 @@ class RpcClient implements Client {
@Override
public void handleRequestDone(Request requestWithResponse) {
if (requestWithResponse.isError()) {
- handler.receive(GetDocsumsResponseOrError.fromError("Error response from " + node + ": " +
- requestWithResponse.errorMessage()));
+ handler.receive(ResponseOrError.fromError("Error response from " + node + ": " + requestWithResponse.errorMessage()));
return;
}
Values returnValues = requestWithResponse.returnValues();
if (returnValues.size() < 3) {
- handler.receive(GetDocsumsResponseOrError.fromError("Invalid getDocsums response from " + node +
- ": Expected 3 return arguments, got " +
- returnValues.size()));
+ handler.receive(ResponseOrError.fromError(
+ "Invalid getDocsums response from " + node + ": Expected 3 return arguments, got " + returnValues.size()));
return;
}
byte compression = returnValues.get(0).asInt8();
int uncompressedSize = returnValues.get(1).asInt32();
byte[] compressedSlimeBytes = returnValues.get(2).asData();
+ @SuppressWarnings("unchecked") // TODO: Non-protobuf rpc docsums to be removed soon
List<FastHit> hits = (List<FastHit>) requestWithResponse.getContext();
- handler.receive(GetDocsumsResponseOrError.fromResponse(new GetDocsumsResponse(compression,
- uncompressedSize,
- compressedSlimeBytes,
- hits)));
+ handler.receive(
+ ResponseOrError.fromResponse(new GetDocsumsResponse(compression, uncompressedSize, compressedSlimeBytes, hits)));
}
}
- private static class RpcSearchResponseWaiter implements RequestWaiter {
+ private static class RpcProtobufResponseWaiter implements RequestWaiter {
/** The node to which we made the request we are waiting for - for error messages only */
private final RpcNodeConnection node;
/** The handler to which the response is forwarded */
- private final RpcSearchInvoker handler;
+ private final ResponseReceiver handler;
- public RpcSearchResponseWaiter(RpcNodeConnection node, RpcSearchInvoker handler) {
+ public RpcProtobufResponseWaiter(RpcNodeConnection node, ResponseReceiver handler) {
this.node = node;
this.handler = handler;
}
@@ -152,13 +148,13 @@ class RpcClient implements Client {
@Override
public void handleRequestDone(Request requestWithResponse) {
if (requestWithResponse.isError()) {
- handler.receive(SearchResponseOrError.fromError("Error response from " + node + ": " + requestWithResponse.errorMessage()));
+ handler.receive(ResponseOrError.fromError("Error response from " + node + ": " + requestWithResponse.errorMessage()));
return;
}
Values returnValues = requestWithResponse.returnValues();
if (returnValues.size() < 3) {
- handler.receive(SearchResponseOrError.fromError(
+ handler.receive(ResponseOrError.fromError(
"Invalid getDocsums response from " + node + ": Expected 3 return arguments, got " + returnValues.size()));
return;
}
@@ -166,7 +162,7 @@ class RpcClient implements Client {
byte compression = returnValues.get(0).asInt8();
int uncompressedSize = returnValues.get(1).asInt32();
byte[] compressedPayload = returnValues.get(2).asData();
- handler.receive(SearchResponseOrError.fromResponse(new SearchResponse(compression, uncompressedSize, compressedPayload)));
+ handler.receive(ResponseOrError.fromResponse(new ProtobufResponse(compression, uncompressedSize, compressedPayload)));
}
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java
index b7286997514..760f7486923 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java
@@ -13,6 +13,7 @@ import com.yahoo.prelude.fastsearch.TimeoutException;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.dispatch.FillInvoker;
+import com.yahoo.search.dispatch.rpc.Client.GetDocsumsResponse;
import com.yahoo.search.query.SessionId;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.search.result.Hit;
@@ -102,7 +103,7 @@ public class RpcFillInvoker extends FillInvoker {
Client.NodeConnection node = resourcePool.nodeConnections().get(nodeId);
if (node == null) {
String error = "Could not fill hits from unknown node " + nodeId;
- responseReceiver.receive(Client.GetDocsumsResponseOrError.fromError(error));
+ responseReceiver.receive(Client.ResponseOrError.fromError(error));
result.hits().addError(ErrorMessage.createEmptyDocsums(error));
log.warning("Got hits with partid " + nodeId + ", which is not included in the current dispatch config");
return;
@@ -143,7 +144,7 @@ public class RpcFillInvoker extends FillInvoker {
/** Receiver of the responses to a set of getDocsums requests */
public static class GetDocsumsResponseReceiver {
- private final BlockingQueue<Client.GetDocsumsResponseOrError> responses;
+ private final BlockingQueue<Client.ResponseOrError<GetDocsumsResponse>> responses;
private final Compressor compressor;
private final Result result;
@@ -161,7 +162,7 @@ public class RpcFillInvoker extends FillInvoker {
}
/** Called by a thread belonging to the client when a valid response becomes available */
- public void receive(Client.GetDocsumsResponseOrError response) {
+ public void receive(Client.ResponseOrError<GetDocsumsResponse> response) {
responses.add(response);
}
@@ -181,7 +182,7 @@ public class RpcFillInvoker extends FillInvoker {
if (timeLeftMs <= 0) {
throwTimeout();
}
- Client.GetDocsumsResponseOrError response = responses.poll(timeLeftMs, TimeUnit.MILLISECONDS);
+ Client.ResponseOrError<GetDocsumsResponse> response = responses.poll(timeLeftMs, TimeUnit.MILLISECONDS);
if (response == null)
throwTimeout();
skippedHits += processResponse(response, summaryClass, documentDb);
@@ -197,7 +198,7 @@ public class RpcFillInvoker extends FillInvoker {
}
}
- private int processResponse(Client.GetDocsumsResponseOrError responseOrError,
+ private int processResponse(Client.ResponseOrError<GetDocsumsResponse> responseOrError,
String summaryClass,
DocumentDatabase documentDb) {
if (responseOrError.error().isPresent()) {
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java
index f17e7d63431..c1b164aaeaa 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java
@@ -6,6 +6,7 @@ import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
import com.yahoo.processing.request.CompoundName;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
+import com.yahoo.search.dispatch.Dispatcher;
import com.yahoo.search.dispatch.FillInvoker;
import com.yahoo.search.dispatch.InvokerFactory;
import com.yahoo.search.dispatch.SearchInvoker;
@@ -36,8 +37,15 @@ public class RpcInvokerFactory extends InvokerFactory {
@Override
public Optional<FillInvoker> createFillInvoker(VespaBackEndSearcher searcher, Result result) {
Query query = result.getQuery();
+
+ boolean summaryNeedsQuery = searcher.summaryNeedsQuery(query);
+
+ if(query.properties().getBoolean(Dispatcher.dispatchProtobuf, false)) {
+ return Optional.of(new RpcProtobufFillInvoker(rpcResourcePool, searcher.getDocumentDatabase(query), searcher.getServerId(),
+ summaryNeedsQuery));
+ }
if (query.properties().getBoolean(dispatchSummaries, true)
- && ! searcher.summaryNeedsQuery(query)
+ && ! summaryNeedsQuery
&& query.getRanking().getLocation() == null)
{
return Optional.of(new RpcFillInvoker(rpcResourcePool, searcher.getDocumentDatabase(query)));
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java
new file mode 100644
index 00000000000..3ec821beba8
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java
@@ -0,0 +1,227 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.dispatch.rpc;
+
+import ai.vespa.searchlib.searchprotocol.protobuf.SearchProtocol;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.yahoo.collections.ListMap;
+import com.yahoo.collections.Pair;
+import com.yahoo.compress.CompressionType;
+import com.yahoo.compress.Compressor;
+import com.yahoo.container.protect.Error;
+import com.yahoo.data.access.Inspector;
+import com.yahoo.data.access.slime.SlimeAdapter;
+import com.yahoo.prelude.fastsearch.DocumentDatabase;
+import com.yahoo.prelude.fastsearch.FastHit;
+import com.yahoo.prelude.fastsearch.TimeoutException;
+import com.yahoo.search.Query;
+import com.yahoo.search.Result;
+import com.yahoo.search.dispatch.FillInvoker;
+import com.yahoo.search.dispatch.rpc.Client.ProtobufResponse;
+import com.yahoo.search.result.ErrorMessage;
+import com.yahoo.search.result.Hit;
+import com.yahoo.slime.ArrayTraverser;
+import com.yahoo.slime.BinaryFormat;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * {@link FillInvoker} implementation using Protobuf over JRT
+ *
+ * @author bratseth
+ * @author ollivir
+ */
+public class RpcProtobufFillInvoker extends FillInvoker {
+ private static final String RPC_METHOD = "vespa.searchprotocol.getDocsums";
+
+ private static final Logger log = Logger.getLogger(RpcProtobufFillInvoker.class.getName());
+
+ private final DocumentDatabase documentDb;
+ private final RpcResourcePool resourcePool;
+ private final boolean summaryNeedsQuery;
+ private final String serverId;
+
+ private BlockingQueue<Pair<Client.ResponseOrError<ProtobufResponse>, List<FastHit>>> responses;
+
+ /** Whether we have already logged/notified about an error - to avoid spamming */
+ private boolean hasReportedError = false;
+
+ /** The number of responses we should receive (and process) before this is complete */
+ private int outstandingResponses;
+
+ RpcProtobufFillInvoker(RpcResourcePool resourcePool, DocumentDatabase documentDb, String serverId, boolean summaryNeedsQuery) {
+ this.documentDb = documentDb;
+ this.resourcePool = resourcePool;
+ this.serverId = serverId;
+ this.summaryNeedsQuery = summaryNeedsQuery;
+ }
+
+ @Override
+ protected void sendFillRequest(Result result, String summaryClass) {
+ ListMap<Integer, FastHit> hitsByNode = hitsByNode(result);
+
+ CompressionType compression = CompressionType
+ .valueOf(result.getQuery().properties().getString(RpcResourcePool.dispatchCompression, "LZ4").toUpperCase());
+
+ result.getQuery().trace(false, 5, "Sending ", hitsByNode.size(), " summary fetch requests with jrt/protobuf");
+
+ outstandingResponses = hitsByNode.size();
+ responses = new LinkedBlockingQueue<>(outstandingResponses);
+
+ var builder = ProtobufSerialization.createDocsumRequestBuilder(result.getQuery(), serverId, summaryClass, summaryNeedsQuery);
+ for (Map.Entry<Integer, List<FastHit>> nodeHits : hitsByNode.entrySet()) {
+ var payload = ProtobufSerialization.serializeDocsumRequest(builder, nodeHits.getValue());
+ sendDocsumsRequest(nodeHits.getKey(), nodeHits.getValue(), payload, compression, result);
+ }
+ }
+
+ @Override
+ protected void getFillResults(Result result, String summaryClass) {
+ try {
+ processResponses(result, summaryClass);
+ result.hits().setSorted(false);
+ result.analyzeHits();
+ } catch (TimeoutException e) {
+ result.hits().addError(ErrorMessage.createTimeout("Summary data is incomplete: " + e.getMessage()));
+ }
+ }
+
+ @Override
+ protected void release() {
+ // nothing to release
+ }
+
+ /** Called by a thread belonging to the client when a valid response becomes available */
+ public void receive(Client.ResponseOrError<ProtobufResponse> response, List<FastHit> hitsContext) {
+ responses.add(new Pair<>(response, hitsContext));
+ }
+
+ /** Return a map of hits by their search node (partition) id */
+ private static ListMap<Integer, FastHit> hitsByNode(Result result) {
+ ListMap<Integer, FastHit> hitsByNode = new ListMap<>();
+ for (Iterator<Hit> i = result.hits().unorderedDeepIterator(); i.hasNext();) {
+ Hit h = i.next();
+ if (!(h instanceof FastHit))
+ continue;
+ FastHit hit = (FastHit) h;
+
+ hitsByNode.put(hit.getDistributionKey(), hit);
+ }
+ return hitsByNode;
+ }
+
+ /** Send a docsums request to a node. Responses will be added to the given receiver. */
+ private void sendDocsumsRequest(int nodeId, List<FastHit> hits, byte[] payload, CompressionType compression, Result result) {
+ Client.NodeConnection node = resourcePool.nodeConnections().get(nodeId);
+ if (node == null) {
+ String error = "Could not fill hits from unknown node " + nodeId;
+ receive(Client.ResponseOrError.fromError(error), hits);
+ result.hits().addError(ErrorMessage.createEmptyDocsums(error));
+ log.warning("Got hits with partid " + nodeId + ", which is not included in the current dispatch config");
+ return;
+ }
+
+ Query query = result.getQuery();
+ double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0;
+ Compressor.Compression compressionResult = resourcePool.compressor().compress(compression, payload);
+ resourcePool.client().request(RPC_METHOD, node, compressionResult.type(), payload.length, compressionResult.data(),
+ roe -> receive(roe, hits), timeoutSeconds);
+ }
+
+ private void processResponses(Result result, String summaryClass) throws TimeoutException {
+ try {
+ int skippedHits = 0;
+ while (outstandingResponses > 0) {
+ long timeLeftMs = result.getQuery().getTimeLeft();
+ if (timeLeftMs <= 0) {
+ throwTimeout();
+ }
+ var responseAndHits = responses.poll(timeLeftMs, TimeUnit.MILLISECONDS);
+ if (responseAndHits == null) {
+ throwTimeout();
+ }
+ var response = responseAndHits.getFirst();
+ var hitsContext = responseAndHits.getSecond();
+ skippedHits += processResponse(result, response, hitsContext, summaryClass);
+ outstandingResponses--;
+ }
+ if (skippedHits != 0) {
+ result.hits().addError(ErrorMessage
+ .createEmptyDocsums("Missing hit summary data for summary " + summaryClass + " for " + skippedHits + " hits"));
+ }
+ } catch (InterruptedException e) {
+ // TODO: Add error
+ }
+ }
+
+ private int processResponse(Result result, Client.ResponseOrError<ProtobufResponse> responseOrError, List<FastHit> hitsContext,
+ String summaryClass) {
+ if (responseOrError.error().isPresent()) {
+ if (hasReportedError) {
+ return 0;
+ }
+ String error = responseOrError.error().get();
+ result.hits().addError(ErrorMessage.createBackendCommunicationError(error));
+ log.log(Level.WARNING, "Error fetching summary data: " + error);
+ hasReportedError = true;
+ } else {
+ Client.ProtobufResponse response = responseOrError.response().get();
+ CompressionType compression = CompressionType.valueOf(response.compression());
+ byte[] responseBytes = resourcePool.compressor().decompress(response.compressedPayload(), compression,
+ response.uncompressedSize());
+ return fill(result, hitsContext, summaryClass, responseBytes);
+ }
+ return 0;
+ }
+
+ private void addErrors(Result result, com.yahoo.slime.Inspector errors) {
+ errors.traverse((ArrayTraverser) (index, value) -> {
+ int errorCode = ("timeout".equalsIgnoreCase(value.field("type").asString())) ? Error.TIMEOUT.code : Error.UNSPECIFIED.code;
+ result.hits().addError(new ErrorMessage(errorCode, value.field("message").asString(), value.field("details").asString()));
+ });
+ }
+
+ private int fill(Result result, List<FastHit> hits, String summaryClass, byte[] payload) {
+ try {
+ var protobuf = SearchProtocol.DocsumReply.parseFrom(payload);
+ var root = BinaryFormat.decode(protobuf.getSlimeSummaries().toByteArray()).get();
+ var errors = root.field("errors");
+ boolean hasErrors = errors.valid() && (errors.entries() > 0);
+ if (hasErrors) {
+ addErrors(result, errors);
+ }
+
+ Inspector summaries = new SlimeAdapter(root.field("docsums"));
+ if (!summaries.valid()) {
+ return 0; // No summaries; Perhaps we requested a non-existing summary class
+ }
+ int skippedHits = 0;
+ for (int i = 0; i < hits.size(); i++) {
+ Inspector summary = summaries.entry(i).field("docsum");
+ if (summary.fieldCount() != 0) {
+ hits.get(i).setField(Hit.SDDOCNAME_FIELD, documentDb.getName());
+ hits.get(i).addSummary(documentDb.getDocsumDefinitionSet().getDocsum(summaryClass), summary);
+ hits.get(i).setFilled(summaryClass);
+ } else {
+ skippedHits++;
+ }
+ }
+ return skippedHits;
+ } catch (InvalidProtocolBufferException ex) {
+ log.log(Level.WARNING, "Invalid response to docsum request", ex);
+ result.hits().addError(ErrorMessage.createInternalServerError("Invalid response to docsum request from backend"));
+ return 0;
+ }
+ }
+
+ private void throwTimeout() throws TimeoutException {
+ throw new TimeoutException("Timed out waiting for summary data. " + outstandingResponses + " responses outstanding.");
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
index 88d77c760e3..e54ac00f821 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
@@ -9,7 +9,7 @@ import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.dispatch.SearchInvoker;
-import com.yahoo.search.dispatch.rpc.Client.SearchResponse;
+import com.yahoo.search.dispatch.rpc.Client.ProtobufResponse;
import com.yahoo.search.dispatch.searchcluster.Node;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.search.searchchain.Execution;
@@ -25,11 +25,13 @@ import java.util.concurrent.TimeUnit;
*
* @author ollivir
*/
-public class RpcSearchInvoker extends SearchInvoker {
+public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseReceiver {
+ private static final String RPC_METHOD = "vespa.searchprotocol.search";
+
private final VespaBackEndSearcher searcher;
private final Node node;
private final RpcResourcePool resourcePool;
- private final BlockingQueue<Client.SearchResponseOrError> responses;
+ private final BlockingQueue<Client.ResponseOrError<ProtobufResponse>> responses;
private Query query;
@@ -50,15 +52,16 @@ public class RpcSearchInvoker extends SearchInvoker {
Client.NodeConnection nodeConnection = resourcePool.nodeConnections().get(node.key());
if (nodeConnection == null) {
- responses.add(Client.SearchResponseOrError.fromError("Could send search to unknown node " + node.key()));
+ responses.add(Client.ResponseOrError.fromError("Could not send search to unknown node " + node.key()));
responseAvailable();
return;
}
+ query.trace(false, 5, "Sending search request with jrt/protobuf to node with dist key ", node.key());
- var payload = ProtobufSerialization.serializeQuery(query, searcher.getServerId(), true);
+ var payload = ProtobufSerialization.serializeSearchRequest(query, searcher.getServerId());
double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0;
Compressor.Compression compressionResult = resourcePool.compressor().compress(compression, payload);
- resourcePool.client().search(nodeConnection, compressionResult.type(), payload.length, compressionResult.data(), this,
+ resourcePool.client().request(RPC_METHOD, nodeConnection, compressionResult.type(), payload.length, compressionResult.data(), this,
timeoutSeconds);
}
@@ -68,7 +71,7 @@ public class RpcSearchInvoker extends SearchInvoker {
if (timeLeftMs <= 0) {
return errorResult(query, ErrorMessage.createTimeout("Timeout while waiting for " + getName()));
}
- Client.SearchResponseOrError response = null;
+ Client.ResponseOrError<ProtobufResponse> response = null;
try {
response = responses.poll(timeLeftMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
@@ -84,11 +87,11 @@ public class RpcSearchInvoker extends SearchInvoker {
return errorResult(query, ErrorMessage.createInternalServerError("Neither error nor result available"));
}
- SearchResponse searchResponse = response.response().get();
- CompressionType compression = CompressionType.valueOf(searchResponse.compression());
- byte[] payload = resourcePool.compressor().decompress(searchResponse.compressedPayload(), compression,
- searchResponse.uncompressedSize());
- var result = ProtobufSerialization.deserializeToResult(payload, query, searcher);
+ ProtobufResponse protobufResponse = response.response().get();
+ CompressionType compression = CompressionType.valueOf(protobufResponse.compression());
+ byte[] payload = resourcePool.compressor().decompress(protobufResponse.compressedPayload(), compression,
+ protobufResponse.uncompressedSize());
+ var result = ProtobufSerialization.deserializeToSearchResult(payload, query, searcher);
result.hits().unorderedIterator().forEachRemaining(hit -> {
if(hit instanceof FastHit) {
FastHit fhit = (FastHit) hit;
@@ -106,7 +109,7 @@ public class RpcSearchInvoker extends SearchInvoker {
// nothing to release
}
- public void receive(Client.SearchResponseOrError response) {
+ public void receive(Client.ResponseOrError<ProtobufResponse> response) {
responses.add(response);
responseAvailable();
}
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/MockClient.java b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/MockClient.java
index f9b628e594a..687d3e728c0 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/MockClient.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/MockClient.java
@@ -7,9 +7,6 @@ import com.yahoo.document.GlobalId;
import com.yahoo.document.idstring.IdIdString;
import com.yahoo.prelude.fastsearch.FastHit;
import com.yahoo.search.Result;
-import com.yahoo.search.dispatch.rpc.Client;
-import com.yahoo.search.dispatch.rpc.RpcFillInvoker;
-import com.yahoo.search.dispatch.rpc.RpcSearchInvoker;
import com.yahoo.slime.ArrayTraverser;
import com.yahoo.slime.BinaryFormat;
import com.yahoo.slime.Cursor;
@@ -44,7 +41,7 @@ public class MockClient implements Client {
int uncompressedSize, byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver,
double timeoutSeconds) {
if (malfunctioning) {
- responseReceiver.receive(GetDocsumsResponseOrError.fromError("Malfunctioning"));
+ responseReceiver.receive(ResponseOrError.fromError("Malfunctioning"));
return;
}
@@ -74,25 +71,25 @@ public class MockClient implements Client {
Compressor.Compression compressionResult = compressor.compress(compression, slimeBytes);
GetDocsumsResponse response = new GetDocsumsResponse(compressionResult.type().getCode(), slimeBytes.length,
compressionResult.data(), hitsContext);
- responseReceiver.receive(GetDocsumsResponseOrError.fromResponse(response));
+ responseReceiver.receive(ResponseOrError.fromResponse(response));
}
@Override
- public void search(NodeConnection node, CompressionType compression, int uncompressedLength, byte[] compressedPayload,
- RpcSearchInvoker responseReceiver, double timeoutSeconds) {
+ public void request(String rpcMethod, NodeConnection node, CompressionType compression, int uncompressedLength, byte[] compressedPayload,
+ ResponseReceiver responseReceiver, double timeoutSeconds) {
if (malfunctioning) {
- responseReceiver.receive(SearchResponseOrError.fromError("Malfunctioning"));
+ responseReceiver.receive(ResponseOrError.fromError("Malfunctioning"));
return;
}
if(searchResult == null) {
- responseReceiver.receive(SearchResponseOrError.fromError("No result defined"));
+ responseReceiver.receive(ResponseOrError.fromError("No result defined"));
return;
}
var payload = ProtobufSerialization.serializeResult(searchResult);
var compressionResult = compressor.compress(compression, payload);
- var response = new SearchResponse(compressionResult.type().getCode(), payload.length, compressionResult.data());
- responseReceiver.receive(SearchResponseOrError.fromResponse(response));
+ var response = new ProtobufResponse(compressionResult.type().getCode(), payload.length, compressionResult.data());
+ responseReceiver.receive(ResponseOrError.fromResponse(response));
}
public void setDocsumReponse(String nodeId, int docId, String docsumClass, Map<String, Object> docsumValues) {
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java
index 689be53de23..b9d894f9eb5 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java
@@ -54,8 +54,8 @@ public class RpcSearchInvokerTest {
AtomicInteger lengthHolder) {
return new Client() {
@Override
- public void search(NodeConnection node, CompressionType compression, int uncompressedLength, byte[] compressedPayload,
- RpcSearchInvoker responseReceiver, double timeoutSeconds) {
+ public void request(String rpcMethod, NodeConnection node, CompressionType compression, int uncompressedLength,
+ byte[] compressedPayload, ResponseReceiver responseReceiver, double timeoutSeconds) {
compressionTypeHolder.set(compression);
payloadHolder.set(compressedPayload);
lengthHolder.set(uncompressedLength);
diff --git a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
index e294aee02e7..2e0e15e8a12 100644
--- a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
+++ b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java
@@ -113,6 +113,18 @@ public class Flags {
"Takes effect at redeployment",
APPLICATION_ID);
+ public static final UnboundBooleanFlag USE_LB_STATUS_FILE = defineFeatureFlag(
+ "use-lb-status-file", false,
+ "Serve status.html from the new path",
+ "Takes effect on restart of Docker container",
+ APPLICATION_ID, HOSTNAME);
+
+ public static final UnboundBooleanFlag USE_HTTPS_LOAD_BALANCER_UPSTREAM = defineFeatureFlag(
+ "use-https-load-balancer-upstream", false,
+ "Use https between load balancer and upstream containers",
+ "Takes effect at redeployment",
+ APPLICATION_ID);
+
/** WARNING: public for testing: All flags should be defined in {@link Flags}. */
public static UnboundBooleanFlag defineFeatureFlag(String flagId, boolean defaultValue, String description,
String modificationEffect, FetchVector.Dimension... dimensions) {
diff --git a/logd/src/main/resources/configdefinitions/logd.def b/logd/src/main/resources/configdefinitions/logd.def
index 570c2e36f99..61150c373aa 100644
--- a/logd/src/main/resources/configdefinitions/logd.def
+++ b/logd/src/main/resources/configdefinitions/logd.def
@@ -7,10 +7,13 @@ stateport int default=0
## Host to contact the logserver on.
logserver.host string default="localhost"
-## Port to contact the logserver on.
+## RPC port of logserver.
+logserver.rpcport int default=5822
+
+## Legacy port to contact the logserver on.
logserver.port int default=5821
-## Forward to a logserver. If false, logserver.host and logserver.port are irrelevant
+## Forward to a logserver. Other logserver configuration is irrelevant if false.
logserver.use bool default=true
## Loglevel config whether they should be stored and/or forwarded
diff --git a/storage/src/tests/CMakeLists.txt b/storage/src/tests/CMakeLists.txt
index c4436161f28..1894a3fe6a8 100644
--- a/storage/src/tests/CMakeLists.txt
+++ b/storage/src/tests/CMakeLists.txt
@@ -7,6 +7,7 @@ vespa_add_executable(storage_gtest_runner_app TEST
gtest_runner.cpp
DEPENDS
storage_testbucketmover
+ storage_gtestdistributor
)
vespa_add_test(
diff --git a/storage/src/tests/distributor/CMakeLists.txt b/storage/src/tests/distributor/CMakeLists.txt
index f58fd37f622..1e9dca3f7f1 100644
--- a/storage/src/tests/distributor/CMakeLists.txt
+++ b/storage/src/tests/distributor/CMakeLists.txt
@@ -4,7 +4,6 @@ vespa_add_library(storage_testdistributor TEST
blockingoperationstartertest.cpp
bucketdatabasetest.cpp
bucketdbmetricupdatertest.cpp
- bucketdbupdatertest.cpp
bucketgctimecalculatortest.cpp
bucketstateoperationtest.cpp
distributor_host_info_reporter_test.cpp
@@ -46,3 +45,16 @@ vespa_add_library(storage_testdistributor TEST
storage_testcommon
storage_testhostreporter
)
+
+vespa_add_library(storage_gtestdistributor TEST
+ SOURCES
+ bucketdbupdatertest.cpp
+ # Fixture etc. dupes with non-gtest runner :
+ distributortestutil.cpp
+ messagesenderstub.cpp
+ DEPENDS
+ storage_distributor
+ storage_testcommon
+ storage_testhostreporter
+ gtest
+)
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp
index 9795f5db5dc..2beae2ac43b 100644
--- a/storage/src/tests/distributor/bucketdbupdatertest.cpp
+++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp
@@ -1,7 +1,5 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/vdstestlib/cppunit/macros.h>
-#include <iomanip>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storage/distributor/bucketdbupdater.h>
#include <vespa/storage/distributor/distributormetricsset.h>
@@ -19,6 +17,9 @@
#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/vespalib/text/stringtokenizer.h>
#include <sstream>
+#include <iomanip>
+
+#include <gtest/gtest.h>
using namespace storage::api;
using namespace storage::lib;
@@ -29,6 +30,8 @@ using document::FixedBucketSpaces;
using document::BucketId;
using document::Bucket;
+using namespace ::testing;
+
namespace storage::distributor {
namespace {
@@ -54,145 +57,12 @@ getRequestBucketInfoStrings(uint32_t count)
}
-class BucketDBUpdaterTest : public CppUnit::TestFixture,
+class BucketDBUpdaterTest : public Test,
public DistributorTestUtil
{
- CPPUNIT_TEST_SUITE(BucketDBUpdaterTest);
- CPPUNIT_TEST(testNormalUsage); // Make sure that bucketdbupdater sends requests to nodes, send responses back for 3 nodes, check that bucketdb is in correct state
- CPPUNIT_TEST(testDistributorChange);
- CPPUNIT_TEST(testDistributorChangeWithGrouping);
- CPPUNIT_TEST(testNormalUsageInitializing); // Check that we send request bucket info when storage node is initializing, and send another when it's up.
- CPPUNIT_TEST(testFailedRequestBucketInfo);
- CPPUNIT_TEST(testBitChange); // Check what happens when distribution bits change
- CPPUNIT_TEST(testNodeDown);
- CPPUNIT_TEST(testStorageNodeInMaintenanceClearsBucketsForNode);
- CPPUNIT_TEST(testNodeDownCopiesGetInSync);
- CPPUNIT_TEST(testDownWhileInit);
- CPPUNIT_TEST(testInitializingWhileRecheck);
- CPPUNIT_TEST(testRecheckNode);
- CPPUNIT_TEST(testRecheckNodeWithFailure);
- CPPUNIT_TEST(testNotifyBucketChange);
- CPPUNIT_TEST(testNotifyBucketChangeFromNodeDown);
- CPPUNIT_TEST(testNotifyChangeWithPendingStateQueuesBucketInfoRequests);
- CPPUNIT_TEST(testMergeReply);
- CPPUNIT_TEST(testMergeReplyNodeDown);
- CPPUNIT_TEST(testMergeReplyNodeDownAfterRequestSent);
- CPPUNIT_TEST(testFlush);
- CPPUNIT_TEST(testPendingClusterStateSendMessages);
- CPPUNIT_TEST(testPendingClusterStateReceive);
- CPPUNIT_TEST(testPendingClusterStateMerge);
- CPPUNIT_TEST(testPendingClusterStateMergeReplicaChanged);
- CPPUNIT_TEST(testPendingClusterStateWithGroupDown);
- CPPUNIT_TEST(testPendingClusterStateWithGroupDownAndNoHandover);
- CPPUNIT_TEST(testNoDbResurrectionForBucketNotOwnedInCurrentState);
- CPPUNIT_TEST(testNoDbResurrectionForBucketNotOwnedInPendingState);
- CPPUNIT_TEST(testClusterStateAlwaysSendsFullFetchWhenDistributionChangePending);
- CPPUNIT_TEST(testChangedDistributionConfigTriggersRecoveryMode);
- CPPUNIT_TEST(testNewlyAddedBucketsHaveCurrentTimeAsGcTimestamp);
- CPPUNIT_TEST(testNewerMutationsNotOverwrittenByEarlierBucketFetch);
- CPPUNIT_TEST(preemptedDistrChangeCarriesNodeSetOverToNextStateFetch);
- CPPUNIT_TEST(preemptedStorChangeCarriesNodeSetOverToNextStateFetch);
- CPPUNIT_TEST(preemptedStorageNodeDownMustBeReFetched);
- CPPUNIT_TEST(outdatedNodeSetClearedAfterSuccessfulStateCompletion);
- CPPUNIT_TEST(doNotSendToPreemptedNodeNowInDownState);
- CPPUNIT_TEST(doNotSendToPreemptedNodeNotPartOfNewState);
- CPPUNIT_TEST_DISABLED(clusterConfigDownsizeOnlySendsToAvailableNodes);
- CPPUNIT_TEST(changedDiskSetTriggersReFetch);
- CPPUNIT_TEST(nodeMissingFromConfigIsTreatedAsNeedingOwnershipTransfer);
- CPPUNIT_TEST(changed_distributor_set_implies_ownership_transfer);
- CPPUNIT_TEST(unchanged_distributor_set_implies_no_ownership_transfer);
- CPPUNIT_TEST(changed_distribution_config_implies_ownership_transfer);
- CPPUNIT_TEST(transition_time_tracked_for_single_state_change);
- CPPUNIT_TEST(transition_time_reset_across_non_preempting_state_changes);
- CPPUNIT_TEST(transition_time_tracked_for_distribution_config_change);
- CPPUNIT_TEST(transition_time_tracked_across_preempted_transitions);
- CPPUNIT_TEST(batch_update_of_existing_diverging_replicas_does_not_mark_any_as_trusted);
- CPPUNIT_TEST(batch_add_of_new_diverging_replicas_does_not_mark_any_as_trusted);
- CPPUNIT_TEST(batch_add_with_single_resulting_replica_implicitly_marks_as_trusted);
- CPPUNIT_TEST(identity_update_of_single_replica_does_not_clear_trusted);
- CPPUNIT_TEST(identity_update_of_diverging_untrusted_replicas_does_not_mark_any_as_trusted);
- CPPUNIT_TEST(adding_diverging_replica_to_existing_trusted_does_not_remove_trusted);
- CPPUNIT_TEST(batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted);
- CPPUNIT_TEST(global_distribution_hash_falls_back_to_legacy_format_upon_request_rejection);
- CPPUNIT_TEST(non_owned_buckets_moved_to_read_only_db_on_ownership_change);
- CPPUNIT_TEST(buckets_no_longer_available_are_not_moved_to_read_only_database);
- CPPUNIT_TEST(non_owned_buckets_purged_when_read_only_support_is_config_disabled);
- CPPUNIT_TEST(deferred_activated_state_does_not_enable_state_until_activation_received);
- CPPUNIT_TEST(read_only_db_cleared_once_pending_state_is_activated);
- CPPUNIT_TEST(read_only_db_is_populated_even_when_self_is_marked_down);
- CPPUNIT_TEST(activate_cluster_state_request_with_mismatching_version_returns_actual_version);
- CPPUNIT_TEST(activate_cluster_state_request_without_pending_transition_passes_message_through);
- CPPUNIT_TEST_SUITE_END();
-
public:
BucketDBUpdaterTest();
-protected:
- void testNormalUsage();
- void testDistributorChange();
- void testDistributorChangeWithGrouping();
- void testNormalUsageInitializing();
- void testFailedRequestBucketInfo();
- void testBitChange();
- void testNodeDown();
- void testStorageNodeInMaintenanceClearsBucketsForNode();
- void testNodeDownCopiesGetInSync();
- void testDownWhileInit();
- void testInitializingWhileRecheck();
- void testRecheckNode();
- void testRecheckNodeWithFailure();
- void testNotifyBucketChange();
- void testNotifyBucketChangeFromNodeDown();
- void testNotifyChangeWithPendingStateQueuesBucketInfoRequests();
- void testMergeReply();
- void testMergeReplyNodeDown();
- void testMergeReplyNodeDownAfterRequestSent();
- void testFlush();
- void testPendingClusterStateSendMessages();
- void testPendingClusterStateReceive();
- void testPendingClusterStateMerge();
- void testPendingClusterStateMergeReplicaChanged();
- void testPendingClusterStateWithGroupDown();
- void testPendingClusterStateWithGroupDownAndNoHandover();
- void testNoDbResurrectionForBucketNotOwnedInCurrentState();
- void testNoDbResurrectionForBucketNotOwnedInPendingState();
- void testClusterStateAlwaysSendsFullFetchWhenDistributionChangePending();
- void testChangedDistributionConfigTriggersRecoveryMode();
- void testNewlyAddedBucketsHaveCurrentTimeAsGcTimestamp();
- void testNewerMutationsNotOverwrittenByEarlierBucketFetch();
- void preemptedDistrChangeCarriesNodeSetOverToNextStateFetch();
- void preemptedStorChangeCarriesNodeSetOverToNextStateFetch();
- void preemptedStorageNodeDownMustBeReFetched();
- void outdatedNodeSetClearedAfterSuccessfulStateCompletion();
- void doNotSendToPreemptedNodeNowInDownState();
- void doNotSendToPreemptedNodeNotPartOfNewState();
- void clusterConfigDownsizeOnlySendsToAvailableNodes();
- void changedDiskSetTriggersReFetch();
- void nodeMissingFromConfigIsTreatedAsNeedingOwnershipTransfer();
- void changed_distributor_set_implies_ownership_transfer();
- void unchanged_distributor_set_implies_no_ownership_transfer();
- void changed_distribution_config_implies_ownership_transfer();
- void transition_time_tracked_for_single_state_change();
- void transition_time_reset_across_non_preempting_state_changes();
- void transition_time_tracked_for_distribution_config_change();
- void transition_time_tracked_across_preempted_transitions();
- void batch_update_of_existing_diverging_replicas_does_not_mark_any_as_trusted();
- void batch_add_of_new_diverging_replicas_does_not_mark_any_as_trusted();
- void batch_add_with_single_resulting_replica_implicitly_marks_as_trusted();
- void identity_update_of_single_replica_does_not_clear_trusted();
- void identity_update_of_diverging_untrusted_replicas_does_not_mark_any_as_trusted();
- void adding_diverging_replica_to_existing_trusted_does_not_remove_trusted();
- void batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted();
- void global_distribution_hash_falls_back_to_legacy_format_upon_request_rejection();
- void non_owned_buckets_moved_to_read_only_db_on_ownership_change();
- void buckets_no_longer_available_are_not_moved_to_read_only_database();
- void non_owned_buckets_purged_when_read_only_support_is_config_disabled();
- void deferred_activated_state_does_not_enable_state_until_activation_received();
- void read_only_db_cleared_once_pending_state_is_activated();
- void read_only_db_is_populated_even_when_self_is_marked_down();
- void activate_cluster_state_request_with_mismatching_version_returns_actual_version();
- void activate_cluster_state_request_without_pending_transition_passes_message_through();
-
auto &defaultDistributorBucketSpace() { return getBucketSpaceRepo().get(makeBucketSpace()); }
bool bucketExistsThatHasNode(int bucketCount, uint16_t node) const;
@@ -250,14 +120,14 @@ protected:
public:
using OutdatedNodesMap = dbtransition::OutdatedNodesMap;
- void setUp() override {
+ void SetUp() override {
createLinks();
_bucketSpaces = getBucketSpaces();
// Disable deferred activation by default (at least for now) to avoid breaking the entire world.
getConfig().setAllowStaleReadsDuringClusterStateTransitions(false);
};
- void tearDown() override {
+ void TearDown() override {
close();
}
@@ -309,7 +179,7 @@ public:
uint32_t bucketCount,
uint32_t invalidBucketCount = 0)
{
- CPPUNIT_ASSERT(cmd.getType() == MessageType::REQUESTBUCKETINFO);
+ ASSERT_EQ(cmd.getType(), MessageType::REQUESTBUCKETINFO);
const api::StorageMessageAddress &address(*cmd.getAddress());
getBucketDBUpdater().onRequestBucketInfoReply(
getFakeBucketReply(state,
@@ -322,7 +192,7 @@ public:
void sendFakeReplyForSingleBucketRequest(
const api::RequestBucketInfoCommand& rbi)
{
- CPPUNIT_ASSERT_EQUAL(size_t(1), rbi.getBuckets().size());
+ ASSERT_EQ(size_t(1), rbi.getBuckets().size());
const document::BucketId& bucket(rbi.getBuckets()[0]);
std::shared_ptr<api::RequestBucketInfoReply> reply(
@@ -379,17 +249,17 @@ public:
void verifyInvalid(document::BucketId id, int storageNode) {
BucketDatabase::Entry entry = getBucketDatabase().get(id);
- CPPUNIT_ASSERT(entry.valid());
+ ASSERT_TRUE(entry.valid());
bool found = false;
for (uint32_t j = 0; j<entry->getNodeCount(); j++) {
if (entry->getNodeRef(j).getNode() == storageNode) {
- CPPUNIT_ASSERT(!entry->getNodeRef(j).valid());
+ ASSERT_FALSE(entry->getNodeRef(j).valid());
found = true;
}
}
- CPPUNIT_ASSERT(found);
+ ASSERT_TRUE(found);
}
struct OrderByIncreasingNodeIndex {
@@ -433,10 +303,10 @@ public:
}
void assert_has_activate_cluster_state_reply_with_actual_version(uint32_t version) {
- CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size());
+ ASSERT_EQ(size_t(1), _sender.replies.size());
auto* response = dynamic_cast<api::ActivateClusterStateVersionReply*>(_sender.replies.back().get());
- CPPUNIT_ASSERT(response != nullptr);
- CPPUNIT_ASSERT_EQUAL(version, response->actualVersion());
+ ASSERT_TRUE(response != nullptr);
+ ASSERT_EQ(version, response->actualVersion());
_sender.clear();
}
@@ -445,11 +315,11 @@ public:
uint32_t bucketCount = 1,
uint32_t invalidBucketCount = 0)
{
- CPPUNIT_ASSERT_EQUAL(expectedMsgs, _sender.commands.size());
+ ASSERT_EQ(expectedMsgs, _sender.commands.size());
for (uint32_t i = 0; i < _sender.commands.size(); i++) {
- fakeBucketReply(state, *_sender.commands[i],
- bucketCount, invalidBucketCount);
+ ASSERT_NO_FATAL_FAILURE(fakeBucketReply(state, *_sender.commands[i],
+ bucketCount, invalidBucketCount));
}
}
@@ -459,7 +329,7 @@ public:
{
_sender.clear();
setSystemState(state);
- completeBucketInfoGathering(state, expectedMsgs, nBuckets);
+ ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(state, expectedMsgs, nBuckets));
}
void completeStateTransitionInSeconds(const std::string& stateStr,
@@ -470,7 +340,7 @@ public:
lib::ClusterState state(stateStr);
setSystemState(state);
getClock().addSecondsToTime(seconds);
- completeBucketInfoGathering(state, expectedMsgs);
+ ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(state, expectedMsgs));
}
uint64_t lastTransitionTimeInMillis() {
@@ -486,32 +356,31 @@ public:
setSystemState(newState);
for (uint32_t i=0; i< messageCount(numStorageNodes); i++) {
- CPPUNIT_ASSERT(_sender.commands[i]->getType() ==
- MessageType::REQUESTBUCKETINFO);
+ ASSERT_EQ(_sender.commands[i]->getType(), MessageType::REQUESTBUCKETINFO);
const api::StorageMessageAddress *address = _sender.commands[i]->getAddress();
- CPPUNIT_ASSERT_EQUAL((uint32_t)(i / _bucketSpaces.size()), (uint32_t)address->getIndex());
+ ASSERT_EQ((uint32_t)(i / _bucketSpaces.size()), (uint32_t)address->getIndex());
}
}
void initializeNodesAndBuckets(uint32_t numStorageNodes,
uint32_t numBuckets)
{
- setStorageNodes(numStorageNodes);
+ ASSERT_NO_FATAL_FAILURE(setStorageNodes(numStorageNodes));
vespalib::string state(vespalib::make_string(
"distributor:1 storage:%d", numStorageNodes));
lib::ClusterState newState(state);
for (uint32_t i=0; i< messageCount(numStorageNodes); i++) {
- fakeBucketReply(newState, *_sender.commands[i], numBuckets);
+ ASSERT_NO_FATAL_FAILURE(fakeBucketReply(newState, *_sender.commands[i], numBuckets));
}
- assertCorrectBuckets(numBuckets, state);
+ ASSERT_NO_FATAL_FAILURE(assertCorrectBuckets(numBuckets, state));
}
bool bucketHasNode(document::BucketId id, uint16_t node) const {
BucketDatabase::Entry entry = getBucket(id);
- CPPUNIT_ASSERT(entry.valid());
+ assert(entry.valid());
for (uint32_t j=0; j<entry->getNodeCount(); j++) {
if (entry->getNodeRef(j).getNode() == node) {
@@ -555,9 +424,8 @@ public:
void assertCorrectBuckets(int numBuckets, const std::string& stateStr) {
lib::ClusterState state(stateStr);
for (int i=0; i<numBuckets; i++) {
- CPPUNIT_ASSERT_EQUAL(
- getIdealStr(document::BucketId(16, i), state),
- getNodes(document::BucketId(16, i)));
+ ASSERT_EQ(getIdealStr(document::BucketId(16, i), state),
+ getNodes(document::BucketId(16, i)));
}
}
@@ -677,31 +545,26 @@ public:
}
};
-CPPUNIT_TEST_SUITE_REGISTRATION(BucketDBUpdaterTest);
-
BucketDBUpdaterTest::BucketDBUpdaterTest()
- : CppUnit::TestFixture(),
- DistributorTestUtil(),
+ : DistributorTestUtil(),
_bucketSpaces()
{
}
-void
-BucketDBUpdaterTest::testNormalUsage()
-{
+TEST_F(BucketDBUpdaterTest, testNormalUsage) {
setSystemState(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"));
- CPPUNIT_ASSERT_EQUAL(messageCount(3), _sender.commands.size());
+ ASSERT_EQ(messageCount(3), _sender.commands.size());
// Ensure distribution hash is set correctly
- CPPUNIT_ASSERT_EQUAL(
+ ASSERT_EQ(
defaultDistributorBucketSpace().getDistribution()
.getNodeGraph().getDistributionConfigHash(),
dynamic_cast<const RequestBucketInfoCommand&>(
*_sender.commands[0]).getDistributionHash());
- fakeBucketReply(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"),
- *_sender.commands[0], 10);
+ ASSERT_NO_FATAL_FAILURE(fakeBucketReply(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"),
+ *_sender.commands[0], 10));
_sender.clear();
@@ -709,101 +572,96 @@ BucketDBUpdaterTest::testNormalUsage()
// change is only implemented after completion of previous cluster state
setSystemState(lib::ClusterState("distributor:2 .0.s:i storage:3"));
- CPPUNIT_ASSERT_EQUAL(messageCount(3), _sender.commands.size());
+ ASSERT_EQ(messageCount(3), _sender.commands.size());
// Expect reply of first set SystemState request.
- CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size());
+ ASSERT_EQ(size_t(1), _sender.replies.size());
- completeBucketInfoGathering(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"),
- messageCount(3), 10);
- assertCorrectBuckets(10, "distributor:2 storage:3");
+ ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(
+ lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"),
+ messageCount(3), 10));
+ ASSERT_NO_FATAL_FAILURE(assertCorrectBuckets(10, "distributor:2 storage:3"));
}
-void
-BucketDBUpdaterTest::testDistributorChange()
-{
+TEST_F(BucketDBUpdaterTest, testDistributorChange) {
int numBuckets = 100;
// First sends request
setSystemState(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"));
- CPPUNIT_ASSERT_EQUAL(messageCount(3), _sender.commands.size());
- completeBucketInfoGathering(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"),
- messageCount(3), numBuckets);
+ ASSERT_EQ(messageCount(3), _sender.commands.size());
+ ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"),
+ messageCount(3), numBuckets));
_sender.clear();
// No change from initializing to up (when done with last job)
setSystemState(lib::ClusterState("distributor:2 storage:3"));
- CPPUNIT_ASSERT_EQUAL(size_t(0), _sender.commands.size());
+ ASSERT_EQ(size_t(0), _sender.commands.size());
_sender.clear();
// Adding node. No new read requests, but buckets thrown
setSystemState(lib::ClusterState("distributor:3 storage:3"));
- CPPUNIT_ASSERT_EQUAL(size_t(0), _sender.commands.size());
- assertCorrectBuckets(numBuckets, "distributor:3 storage:3");
+ ASSERT_EQ(size_t(0), _sender.commands.size());
+ ASSERT_NO_FATAL_FAILURE(assertCorrectBuckets(numBuckets, "distributor:3 storage:3"));
_sender.clear();
// Removing distributor. Need to refetch new data from all nodes.
setSystemState(lib::ClusterState("distributor:2 storage:3"));
- CPPUNIT_ASSERT_EQUAL(messageCount(3), _sender.commands.size());
- completeBucketInfoGathering(lib::ClusterState("distributor:2 storage:3"),
- messageCount(3), numBuckets);
+ ASSERT_EQ(messageCount(3), _sender.commands.size());
+ ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(lib::ClusterState("distributor:2 storage:3"),
+ messageCount(3), numBuckets));
_sender.clear();
- assertCorrectBuckets(numBuckets, "distributor:2 storage:3");
+ ASSERT_NO_FATAL_FAILURE(assertCorrectBuckets(numBuckets, "distributor:2 storage:3"));
}
-void
-BucketDBUpdaterTest::testDistributorChangeWithGrouping()
-{
+TEST_F(BucketDBUpdaterTest, testDistributorChangeWithGrouping) {
std::string distConfig(getDistConfig6Nodes2Groups());
setDistribution(distConfig);
int numBuckets = 100;
setSystemState(lib::ClusterState("distributor:6 storage:6"));
- CPPUNIT_ASSERT_EQUAL(messageCount(6), _sender.commands.size());
- completeBucketInfoGathering(lib::ClusterState("distributor:6 storage:6"),
- messageCount(6), numBuckets);
+ ASSERT_EQ(messageCount(6), _sender.commands.size());
+ ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(lib::ClusterState("distributor:6 storage:6"),
+ messageCount(6), numBuckets));
_sender.clear();
// Distributor going down in other group, no change
setSystemState(lib::ClusterState("distributor:6 .5.s:d storage:6"));
- CPPUNIT_ASSERT_EQUAL(size_t(0), _sender.commands.size());
+ ASSERT_EQ(size_t(0), _sender.commands.size());
_sender.clear();
setSystemState(lib::ClusterState("distributor:6 storage:6"));
- CPPUNIT_ASSERT_EQUAL(size_t(0), _sender.commands.size());
- assertCorrectBuckets(numBuckets, "distributor:6 storage:6");
+ ASSERT_EQ(size_t(0), _sender.commands.size());
+ ASSERT_NO_FATAL_FAILURE(assertCorrectBuckets(numBuckets, "distributor:6 storage:6"));
_sender.clear();
// Unchanged grouping cause no change.
setDistribution(distConfig);
- CPPUNIT_ASSERT_EQUAL(size_t(0), _sender.commands.size());
+ ASSERT_EQ(size_t(0), _sender.commands.size());
// Changed grouping cause change
setDistribution(getDistConfig6Nodes4Groups());
- CPPUNIT_ASSERT_EQUAL(messageCount(6), _sender.commands.size());
+ ASSERT_EQ(messageCount(6), _sender.commands.size());
}
-void
-BucketDBUpdaterTest::testNormalUsageInitializing()
-{
+TEST_F(BucketDBUpdaterTest, testNormalUsageInitializing) {
setSystemState(lib::ClusterState("distributor:1 .0.s:i storage:1 .0.s:i"));
- CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size());
+ ASSERT_EQ(_bucketSpaces.size(), _sender.commands.size());
// Not yet passing on system state.
- CPPUNIT_ASSERT_EQUAL(size_t(0), _senderDown.commands.size());
+ ASSERT_EQ(size_t(0), _senderDown.commands.size());
- completeBucketInfoGathering(lib::ClusterState("distributor:1 .0.s:i storage:1"),
- _bucketSpaces.size(), 10, 10);
+ ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(lib::ClusterState("distributor:1 .0.s:i storage:1"),
+ _bucketSpaces.size(), 10, 10));
- assertCorrectBuckets(10, "distributor:1 storage:1");
+ ASSERT_NO_FATAL_FAILURE(assertCorrectBuckets(10, "distributor:1 storage:1"));
for (int i=10; i<20; i++) {
- verifyInvalid(document::BucketId(16, i), 0);
+ ASSERT_NO_FATAL_FAILURE(verifyInvalid(document::BucketId(16, i), 0));
}
// Pass on cluster state and recheck buckets now.
- CPPUNIT_ASSERT_EQUAL(size_t(1), _senderDown.commands.size());
+ ASSERT_EQ(size_t(1), _senderDown.commands.size());
_sender.clear();
_senderDown.clear();
@@ -811,24 +669,22 @@ BucketDBUpdaterTest::testNormalUsageInitializing()
setSystemState(lib::ClusterState("distributor:1 .0.s:i storage:1"));
// Send a new request bucket info up.
- CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size());
+ ASSERT_EQ(_bucketSpaces.size(), _sender.commands.size());
- completeBucketInfoGathering(lib::ClusterState("distributor:1 .0.s:i storage:1"),
- _bucketSpaces.size(), 20);
+ ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(lib::ClusterState("distributor:1 .0.s:i storage:1"),
+ _bucketSpaces.size(), 20));
// Pass on cluster state and recheck buckets now.
- CPPUNIT_ASSERT_EQUAL(size_t(1), _senderDown.commands.size());
+ ASSERT_EQ(size_t(1), _senderDown.commands.size());
- assertCorrectBuckets(20, "distributor:1 storage:1");
+ ASSERT_NO_FATAL_FAILURE(assertCorrectBuckets(20, "distributor:1 storage:1"));
}
-void
-BucketDBUpdaterTest::testFailedRequestBucketInfo()
-{
+TEST_F(BucketDBUpdaterTest, testFailedRequestBucketInfo) {
setSystemState(lib::ClusterState("distributor:1 .0.s:i storage:1"));
// 2 messages sent up: 1 to the nodes, and one reply to the setsystemstate.
- CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size());
+ ASSERT_EQ(_bucketSpaces.size(), _sender.commands.size());
{
for (uint32_t i = 0; i < _bucketSpaces.size(); ++i) {
@@ -847,43 +703,38 @@ BucketDBUpdaterTest::testFailedRequestBucketInfo()
}
// Should be resent.
- CPPUNIT_ASSERT_EQUAL(getRequestBucketInfoStrings(messageCount(2)),
- _sender.getCommands());
+ ASSERT_EQ(getRequestBucketInfoStrings(messageCount(2)), _sender.getCommands());
- CPPUNIT_ASSERT_EQUAL(size_t(0), _senderDown.commands.size());
+ ASSERT_EQ(size_t(0), _senderDown.commands.size());
for (uint32_t i = 0; i < _bucketSpaces.size(); ++i) {
- fakeBucketReply(lib::ClusterState("distributor:1 .0.s:i storage:1"),
- *_sender.commands[_bucketSpaces.size() + i], 10);
+ ASSERT_NO_FATAL_FAILURE(fakeBucketReply(lib::ClusterState("distributor:1 .0.s:i storage:1"),
+ *_sender.commands[_bucketSpaces.size() + i], 10));
}
for (int i=0; i<10; i++) {
- CPPUNIT_ASSERT_EQUAL(
- std::string(""),
- verifyBucket(document::BucketId(16, i),
- lib::ClusterState("distributor:1 storage:1")));
+ EXPECT_EQ(std::string(""),
+ verifyBucket(document::BucketId(16, i),
+ lib::ClusterState("distributor:1 storage:1")));
}
// Set system state should now be passed on
- CPPUNIT_ASSERT_EQUAL(std::string("Set system state"),
- _senderDown.getCommands());
+ EXPECT_EQ(std::string("Set system state"), _senderDown.getCommands());
}
-void
-BucketDBUpdaterTest::testDownWhileInit()
-{
- setStorageNodes(3);
+TEST_F(BucketDBUpdaterTest, testDownWhileInit) {
+ ASSERT_NO_FATAL_FAILURE(setStorageNodes(3));
- fakeBucketReply(lib::ClusterState("distributor:1 storage:3"),
- *_sender.commands[0], 5);
+ ASSERT_NO_FATAL_FAILURE(fakeBucketReply(lib::ClusterState("distributor:1 storage:3"),
+ *_sender.commands[0], 5));
setSystemState(lib::ClusterState("distributor:1 storage:3 .1.s:d"));
- fakeBucketReply(lib::ClusterState("distributor:1 storage:3"),
- *_sender.commands[2], 5);
+ ASSERT_NO_FATAL_FAILURE(fakeBucketReply(lib::ClusterState("distributor:1 storage:3"),
+ *_sender.commands[2], 5));
- fakeBucketReply(lib::ClusterState("distributor:1 storage:3"),
- *_sender.commands[1], 5);
+ ASSERT_NO_FATAL_FAILURE(fakeBucketReply(lib::ClusterState("distributor:1 storage:3"),
+ *_sender.commands[1], 5));
}
bool
@@ -934,96 +785,83 @@ BucketDBUpdaterTest::expandNodeVec(const std::vector<uint16_t> &nodes)
return res;
}
-void
-BucketDBUpdaterTest::testNodeDown()
-{
- setStorageNodes(3);
+TEST_F(BucketDBUpdaterTest, testNodeDown) {
+ ASSERT_NO_FATAL_FAILURE(setStorageNodes(3));
enableDistributorClusterState("distributor:1 storage:3");
for (int i=1; i<100; i++) {
addIdealNodes(document::BucketId(16, i));
}
- CPPUNIT_ASSERT(bucketExistsThatHasNode(100, 1));
+ EXPECT_TRUE(bucketExistsThatHasNode(100, 1));
setSystemState(lib::ClusterState("distributor:1 storage:3 .1.s:d"));
- CPPUNIT_ASSERT(!bucketExistsThatHasNode(100, 1));
+ EXPECT_FALSE(bucketExistsThatHasNode(100, 1));
}
-void
-BucketDBUpdaterTest::testStorageNodeInMaintenanceClearsBucketsForNode()
-{
- setStorageNodes(3);
+TEST_F(BucketDBUpdaterTest, testStorageNodeInMaintenanceClearsBucketsForNode) {
+ ASSERT_NO_FATAL_FAILURE(setStorageNodes(3));
enableDistributorClusterState("distributor:1 storage:3");
for (int i=1; i<100; i++) {
addIdealNodes(document::BucketId(16, i));
}
- CPPUNIT_ASSERT(bucketExistsThatHasNode(100, 1));
+ EXPECT_TRUE(bucketExistsThatHasNode(100, 1));
setSystemState(lib::ClusterState("distributor:1 storage:3 .1.s:m"));
-
- CPPUNIT_ASSERT(!bucketExistsThatHasNode(100, 1));
+
+ EXPECT_FALSE(bucketExistsThatHasNode(100, 1));
}
-void
-BucketDBUpdaterTest::testNodeDownCopiesGetInSync()
-{
- setStorageNodes(3);
+TEST_F(BucketDBUpdaterTest, testNodeDownCopiesGetInSync) {
+ ASSERT_NO_FATAL_FAILURE(setStorageNodes(3));
- lib::ClusterState systemState("distributor:1 storage:3");
+ lib::ClusterState systemState("distributor:1 storage:3");
document::BucketId bid(16, 1);
addNodesToBucketDB(bid, "0=3,1=2,2=3");
setSystemState(lib::ClusterState("distributor:1 storage:3 .1.s:d"));
- CPPUNIT_ASSERT_EQUAL(
+ EXPECT_EQ(
std::string("BucketId(0x4000000000000001) : "
"node(idx=0,crc=0x3,docs=3/3,bytes=3/3,trusted=true,active=false,ready=false), "
"node(idx=2,crc=0x3,docs=3/3,bytes=3/3,trusted=true,active=false,ready=false)"),
dumpBucket(bid));
}
-void
-BucketDBUpdaterTest::testInitializingWhileRecheck()
-{
+TEST_F(BucketDBUpdaterTest, testInitializingWhileRecheck) {
lib::ClusterState systemState("distributor:1 storage:2 .0.s:i .0.i:0.1");
setSystemState(systemState);
- CPPUNIT_ASSERT_EQUAL(messageCount(2), _sender.commands.size());
- CPPUNIT_ASSERT_EQUAL(size_t(0), _senderDown.commands.size());
+ ASSERT_EQ(messageCount(2), _sender.commands.size());
+ ASSERT_EQ(size_t(0), _senderDown.commands.size());
getBucketDBUpdater().recheckBucketInfo(1, makeDocumentBucket(document::BucketId(16, 3)));
for (uint32_t i = 0; i < messageCount(2); ++i) {
- fakeBucketReply(systemState, *_sender.commands[i], 100);
+ ASSERT_NO_FATAL_FAILURE(fakeBucketReply(systemState, *_sender.commands[i], 100));
}
// Now we can pass on system state.
- CPPUNIT_ASSERT_EQUAL(size_t(1), _senderDown.commands.size());
-
- CPPUNIT_ASSERT_EQUAL(MessageType::SETSYSTEMSTATE,
- _senderDown.commands[0]->getType());
+ ASSERT_EQ(size_t(1), _senderDown.commands.size());
+ EXPECT_EQ(MessageType::SETSYSTEMSTATE, _senderDown.commands[0]->getType());
}
-void
-BucketDBUpdaterTest::testBitChange()
-{
-
+TEST_F(BucketDBUpdaterTest, testBitChange) {
std::vector<document::BucketId> bucketlist;
{
setSystemState(lib::ClusterState("bits:14 storage:1 distributor:2"));
- CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size());
+ ASSERT_EQ(_bucketSpaces.size(), _sender.commands.size());
for (uint32_t bsi = 0; bsi < _bucketSpaces.size(); ++bsi) {
- CPPUNIT_ASSERT(_sender.commands[bsi]->getType() == MessageType::REQUESTBUCKETINFO);
+ ASSERT_EQ(_sender.commands[bsi]->getType(), MessageType::REQUESTBUCKETINFO);
const auto &req = dynamic_cast<const RequestBucketInfoCommand &>(*_sender.commands[bsi]);
- RequestBucketInfoReply* sreply = new RequestBucketInfoReply(req);
+ auto sreply = std::make_shared<RequestBucketInfoReply>(req);
sreply->setAddress(storageAddress(0));
api::RequestBucketInfoReply::EntryVector &vec = sreply->getBucketInfo();
if (req.getBucketSpace() == FixedBucketSpaces::default_space()) {
@@ -1046,29 +884,27 @@ BucketDBUpdaterTest::testBitChange()
}
}
- getBucketDBUpdater().onRequestBucketInfoReply(std::shared_ptr<RequestBucketInfoReply>(sreply));
+ getBucketDBUpdater().onRequestBucketInfoReply(sreply);
}
}
- CPPUNIT_ASSERT_EQUAL(
- std::string("BucketId(0x4000000000000001) : "
- "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"),
- dumpBucket(bucketlist[0]));
- CPPUNIT_ASSERT_EQUAL(
- std::string("BucketId(0x4000000000000002) : "
- "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"),
- dumpBucket(bucketlist[1]));
+ EXPECT_EQ(std::string("BucketId(0x4000000000000001) : "
+ "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"),
+ dumpBucket(bucketlist[0]));
+ EXPECT_EQ(std::string("BucketId(0x4000000000000002) : "
+ "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"),
+ dumpBucket(bucketlist[1]));
{
_sender.clear();
setSystemState(lib::ClusterState("bits:16 storage:1 distributor:2"));
- CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size());
+ ASSERT_EQ(_bucketSpaces.size(), _sender.commands.size());
for (uint32_t bsi = 0; bsi < _bucketSpaces.size(); ++bsi) {
- CPPUNIT_ASSERT(_sender.commands[bsi]->getType() == MessageType::REQUESTBUCKETINFO);
+ ASSERT_EQ(_sender.commands[bsi]->getType(), MessageType::REQUESTBUCKETINFO);
const auto &req = dynamic_cast<const RequestBucketInfoCommand &>(*_sender.commands[bsi]);
- RequestBucketInfoReply* sreply = new RequestBucketInfoReply(req);
+ auto sreply = std::make_shared<RequestBucketInfoReply>(req);
sreply->setAddress(storageAddress(0));
sreply->setResult(api::ReturnCode::OK);
if (req.getBucketSpace() == FixedBucketSpaces::default_space()) {
@@ -1085,27 +921,22 @@ BucketDBUpdaterTest::testBitChange()
api::BucketInfo(10,1,1)));
}
- getBucketDBUpdater().onRequestBucketInfoReply(
- std::shared_ptr<RequestBucketInfoReply>(sreply));
+ getBucketDBUpdater().onRequestBucketInfoReply(sreply);
}
}
- CPPUNIT_ASSERT_EQUAL(
- std::string("BucketId(0x4000000000000000) : "
- "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"),
- dumpBucket(document::BucketId(16, 0)));
- CPPUNIT_ASSERT_EQUAL(
- std::string("BucketId(0x4000000000000001) : "
- "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"),
- dumpBucket(document::BucketId(16, 1)));
- CPPUNIT_ASSERT_EQUAL(
- std::string("BucketId(0x4000000000000002) : "
- "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"),
- dumpBucket(document::BucketId(16, 2)));
- CPPUNIT_ASSERT_EQUAL(
- std::string("BucketId(0x4000000000000004) : "
- "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"),
- dumpBucket(document::BucketId(16, 4)));
+ EXPECT_EQ(std::string("BucketId(0x4000000000000000) : "
+ "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"),
+ dumpBucket(document::BucketId(16, 0)));
+ EXPECT_EQ(std::string("BucketId(0x4000000000000001) : "
+ "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"),
+ dumpBucket(document::BucketId(16, 1)));
+ EXPECT_EQ(std::string("BucketId(0x4000000000000002) : "
+ "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"),
+ dumpBucket(document::BucketId(16, 2)));
+ EXPECT_EQ(std::string("BucketId(0x4000000000000004) : "
+ "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"),
+ dumpBucket(document::BucketId(16, 4)));
{
_sender.clear();
@@ -1118,29 +949,23 @@ BucketDBUpdaterTest::testBitChange()
}
};
-void
-BucketDBUpdaterTest::testRecheckNodeWithFailure()
-{
- initializeNodesAndBuckets(3, 5);
+TEST_F(BucketDBUpdaterTest, testRecheckNodeWithFailure) {
+ ASSERT_NO_FATAL_FAILURE(initializeNodesAndBuckets(3, 5));
_sender.clear();
getBucketDBUpdater().recheckBucketInfo(1, makeDocumentBucket(document::BucketId(16, 3)));
- CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size());
-
+ ASSERT_EQ(size_t(1), _sender.commands.size());
uint16_t index = 0;
{
- api::RequestBucketInfoCommand& rbi(
- dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[0]));
- CPPUNIT_ASSERT_EQUAL(size_t(1), rbi.getBuckets().size());
- CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 3), rbi.getBuckets()[0]);
- auto reply(std::make_shared<api::RequestBucketInfoReply>(rbi));
-
+ auto& rbi = dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[0]);
+ ASSERT_EQ(size_t(1), rbi.getBuckets().size());
+ EXPECT_EQ(document::BucketId(16, 3), rbi.getBuckets()[0]);
+ auto reply = std::make_shared<api::RequestBucketInfoReply>(rbi);
const api::StorageMessageAddress *address = _sender.commands[0]->getAddress();
index = address->getIndex();
-
reply->setResult(api::ReturnCode::NOT_CONNECTED);
getBucketDBUpdater().onRequestBucketInfoReply(reply);
// Trigger that delayed message is sent
@@ -1148,43 +973,39 @@ BucketDBUpdaterTest::testRecheckNodeWithFailure()
getBucketDBUpdater().resendDelayedMessages();
}
- CPPUNIT_ASSERT_EQUAL(size_t(2), _sender.commands.size());
+ ASSERT_EQ(size_t(2), _sender.commands.size());
setSystemState(
lib::ClusterState(vespalib::make_string("distributor:1 storage:3 .%d.s:d", index)));
// Recheck bucket.
{
- api::RequestBucketInfoCommand& rbi(dynamic_cast<RequestBucketInfoCommand&>
- (*_sender.commands[1]));
- CPPUNIT_ASSERT_EQUAL(size_t(1), rbi.getBuckets().size());
- CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 3), rbi.getBuckets()[0]);
- auto reply(std::make_shared<api::RequestBucketInfoReply>(rbi));
+ auto& rbi = dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[1]);
+ ASSERT_EQ(size_t(1), rbi.getBuckets().size());
+ EXPECT_EQ(document::BucketId(16, 3), rbi.getBuckets()[0]);
+ auto reply = std::make_shared<api::RequestBucketInfoReply>(rbi);
reply->setResult(api::ReturnCode::NOT_CONNECTED);
getBucketDBUpdater().onRequestBucketInfoReply(reply);
}
// Should not retry since node is down.
- CPPUNIT_ASSERT_EQUAL(size_t(2), _sender.commands.size());
+ EXPECT_EQ(size_t(2), _sender.commands.size());
}
-void
-BucketDBUpdaterTest::testRecheckNode()
-{
- initializeNodesAndBuckets(3, 5);
+TEST_F(BucketDBUpdaterTest, testRecheckNode) {
+ ASSERT_NO_FATAL_FAILURE(initializeNodesAndBuckets(3, 5));
_sender.clear();
getBucketDBUpdater().recheckBucketInfo(1, makeDocumentBucket(document::BucketId(16, 3)));
- CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size());
+ ASSERT_EQ(size_t(1), _sender.commands.size());
- api::RequestBucketInfoCommand& rbi(
- dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[0]));
- CPPUNIT_ASSERT_EQUAL(size_t(1), rbi.getBuckets().size());
- CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 3), rbi.getBuckets()[0]);
+ auto& rbi = dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[0]);
+ ASSERT_EQ(size_t(1), rbi.getBuckets().size());
+ EXPECT_EQ(document::BucketId(16, 3), rbi.getBuckets()[0]);
- auto reply(std::make_shared<api::RequestBucketInfoReply>(rbi));
+ auto reply = std::make_shared<api::RequestBucketInfoReply>(rbi);
reply->getBucketInfo().push_back(
api::RequestBucketInfoReply::Entry(document::BucketId(16, 3),
api::BucketInfo(20, 10, 12, 50, 60, true, true)));
@@ -1192,29 +1013,24 @@ BucketDBUpdaterTest::testRecheckNode()
lib::ClusterState state("distributor:1 storage:3");
for (uint32_t i = 0; i < 3; i++) {
- CPPUNIT_ASSERT_EQUAL(
- getIdealStr(document::BucketId(16, i), state),
- getNodes(document::BucketId(16, i)));
+ EXPECT_EQ(getIdealStr(document::BucketId(16, i), state),
+ getNodes(document::BucketId(16, i)));
}
for (uint32_t i = 4; i < 5; i++) {
- CPPUNIT_ASSERT_EQUAL(
- getIdealStr(document::BucketId(16, i), state),
- getNodes(document::BucketId(16, i)));
+ EXPECT_EQ(getIdealStr(document::BucketId(16, i), state),
+ getNodes(document::BucketId(16, i)));
}
BucketDatabase::Entry entry = getBucketDatabase().get(document::BucketId(16, 3));
- CPPUNIT_ASSERT(entry.valid());
+ ASSERT_TRUE(entry.valid());
const BucketCopy* copy = entry->getNode(1);
- CPPUNIT_ASSERT(copy != 0);
- CPPUNIT_ASSERT_EQUAL(api::BucketInfo(20,10,12, 50, 60, true, true),
- copy->getBucketInfo());
+ ASSERT_TRUE(copy != nullptr);
+ EXPECT_EQ(api::BucketInfo(20,10,12, 50, 60, true, true), copy->getBucketInfo());
}
-void
-BucketDBUpdaterTest::testNotifyBucketChange()
-{
+TEST_F(BucketDBUpdaterTest, testNotifyBucketChange) {
enableDistributorClusterState("distributor:1 storage:1");
addNodesToBucketDB(document::BucketId(16, 1), "0=1234");
@@ -1237,54 +1053,47 @@ BucketDBUpdaterTest::testNotifyBucketChange()
}
// Must receive reply
- CPPUNIT_ASSERT_EQUAL(size_t(2), _sender.replies.size());
+ ASSERT_EQ(size_t(2), _sender.replies.size());
for (int i = 0; i < 2; ++i) {
- CPPUNIT_ASSERT_EQUAL(MessageType::NOTIFYBUCKETCHANGE_REPLY,
- _sender.replies[i]->getType());
+ ASSERT_EQ(MessageType::NOTIFYBUCKETCHANGE_REPLY,
+ _sender.replies[i]->getType());
}
// No database update until request bucket info replies have been received.
- CPPUNIT_ASSERT_EQUAL(std::string("BucketId(0x4000000000000001) : "
- "node(idx=0,crc=0x4d2,docs=1234/1234,bytes=1234/1234,"
- "trusted=false,active=false,ready=false)"),
- dumpBucket(document::BucketId(16, 1)));
- CPPUNIT_ASSERT_EQUAL(std::string("NONEXISTING"),
- dumpBucket(document::BucketId(16, 2)));
+ EXPECT_EQ(std::string("BucketId(0x4000000000000001) : "
+ "node(idx=0,crc=0x4d2,docs=1234/1234,bytes=1234/1234,"
+ "trusted=false,active=false,ready=false)"),
+ dumpBucket(document::BucketId(16, 1)));
+ EXPECT_EQ(std::string("NONEXISTING"), dumpBucket(document::BucketId(16, 2)));
- CPPUNIT_ASSERT_EQUAL(size_t(2), _sender.commands.size());
+ ASSERT_EQ(size_t(2), _sender.commands.size());
std::vector<api::BucketInfo> infos;
infos.push_back(api::BucketInfo(4567, 200, 2000, 400, 4000, true, true));
infos.push_back(api::BucketInfo(8999, 300, 3000, 500, 5000, false, false));
for (int i = 0; i < 2; ++i) {
- api::RequestBucketInfoCommand& rbi(
- dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[i]));
- CPPUNIT_ASSERT_EQUAL(size_t(1), rbi.getBuckets().size());
- CPPUNIT_ASSERT_EQUAL(document::BucketId(16, i + 1), rbi.getBuckets()[0]);
+ auto& rbi = dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[i]);
+ ASSERT_EQ(size_t(1), rbi.getBuckets().size());
+ EXPECT_EQ(document::BucketId(16, i + 1), rbi.getBuckets()[0]);
- auto reply(std::make_shared<api::RequestBucketInfoReply>(rbi));
+ auto reply = std::make_shared<api::RequestBucketInfoReply>(rbi);
reply->getBucketInfo().push_back(
api::RequestBucketInfoReply::Entry(document::BucketId(16, i + 1),
infos[i]));
getBucketDBUpdater().onRequestBucketInfoReply(reply);
}
- CPPUNIT_ASSERT_EQUAL(
- std::string("BucketId(0x4000000000000001) : "
- "node(idx=0,crc=0x11d7,docs=200/400,bytes=2000/4000,trusted=true,active=true,ready=true)"),
- dumpBucket(document::BucketId(16, 1)));
- CPPUNIT_ASSERT_EQUAL(
- std::string("BucketId(0x4000000000000002) : "
- "node(idx=0,crc=0x2327,docs=300/500,bytes=3000/5000,trusted=true,active=false,ready=false)"),
- dumpBucket(document::BucketId(16, 2)));
-
+ EXPECT_EQ(std::string("BucketId(0x4000000000000001) : "
+ "node(idx=0,crc=0x11d7,docs=200/400,bytes=2000/4000,trusted=true,active=true,ready=true)"),
+ dumpBucket(document::BucketId(16, 1)));
+ EXPECT_EQ(std::string("BucketId(0x4000000000000002) : "
+ "node(idx=0,crc=0x2327,docs=300/500,bytes=3000/5000,trusted=true,active=false,ready=false)"),
+ dumpBucket(document::BucketId(16, 2)));
}
-void
-BucketDBUpdaterTest::testNotifyBucketChangeFromNodeDown()
-{
+TEST_F(BucketDBUpdaterTest, testNotifyBucketChangeFromNodeDown) {
enableDistributorClusterState("distributor:1 storage:2");
addNodesToBucketDB(document::BucketId(16, 1), "1=1234");
@@ -1302,25 +1111,22 @@ BucketDBUpdaterTest::testNotifyBucketChangeFromNodeDown()
// (sendRequestBucketInfo drops message if node is down).
enableDistributorClusterState("distributor:1 storage:2 .0.s:d");
- CPPUNIT_ASSERT_EQUAL(
- std::string("BucketId(0x4000000000000001) : "
- "node(idx=1,crc=0x4d2,docs=1234/1234,bytes=1234/1234,trusted=false,active=false,ready=false)"),
- dumpBucket(document::BucketId(16, 1)));
+ ASSERT_EQ(std::string("BucketId(0x4000000000000001) : "
+ "node(idx=1,crc=0x4d2,docs=1234/1234,bytes=1234/1234,trusted=false,active=false,ready=false)"),
+ dumpBucket(document::BucketId(16, 1)));
- CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size());
- CPPUNIT_ASSERT_EQUAL(MessageType::NOTIFYBUCKETCHANGE_REPLY,
- _sender.replies[0]->getType());
+ ASSERT_EQ(size_t(1), _sender.replies.size());
+ ASSERT_EQ(MessageType::NOTIFYBUCKETCHANGE_REPLY, _sender.replies[0]->getType());
// Currently, this pending operation will be auto-flushed when the cluster state
// changes so the behavior is still correct. Keep this test around to prevent
// regressions here.
- CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size());
- api::RequestBucketInfoCommand& rbi(
- dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[0]));
- CPPUNIT_ASSERT_EQUAL(size_t(1), rbi.getBuckets().size());
- CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 1), rbi.getBuckets()[0]);
+ ASSERT_EQ(size_t(1), _sender.commands.size());
+ auto& rbi = dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[0]);
+ ASSERT_EQ(size_t(1), rbi.getBuckets().size());
+ EXPECT_EQ(document::BucketId(16, 1), rbi.getBuckets()[0]);
- auto reply(std::make_shared<api::RequestBucketInfoReply>(rbi));
+ auto reply = std::make_shared<api::RequestBucketInfoReply>(rbi);
reply->getBucketInfo().push_back(
api::RequestBucketInfoReply::Entry(
document::BucketId(16, 1),
@@ -1328,10 +1134,9 @@ BucketDBUpdaterTest::testNotifyBucketChangeFromNodeDown()
getBucketDBUpdater().onRequestBucketInfoReply(reply);
// No change
- CPPUNIT_ASSERT_EQUAL(
- std::string("BucketId(0x4000000000000001) : "
- "node(idx=1,crc=0x4d2,docs=1234/1234,bytes=1234/1234,trusted=false,active=false,ready=false)"),
- dumpBucket(document::BucketId(16, 1)));
+ EXPECT_EQ(std::string("BucketId(0x4000000000000001) : "
+ "node(idx=1,crc=0x4d2,docs=1234/1234,bytes=1234/1234,trusted=false,active=false,ready=false)"),
+ dumpBucket(document::BucketId(16, 1)));
}
/**
@@ -1342,11 +1147,9 @@ BucketDBUpdaterTest::testNotifyBucketChangeFromNodeDown()
* distributor in the pending state but not by the current state would be
* discarded when attempted inserted into the bucket database.
*/
-void
-BucketDBUpdaterTest::testNotifyChangeWithPendingStateQueuesBucketInfoRequests()
-{
+TEST_F(BucketDBUpdaterTest, testNotifyChangeWithPendingStateQueuesBucketInfoRequests) {
setSystemState(lib::ClusterState("distributor:1 storage:1"));
- CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size());
+ ASSERT_EQ(_bucketSpaces.size(), _sender.commands.size());
{
api::BucketInfo info(8999, 300, 3000, 500, 5000, false, false);
@@ -1356,18 +1159,17 @@ BucketDBUpdaterTest::testNotifyChangeWithPendingStateQueuesBucketInfoRequests()
getBucketDBUpdater().onNotifyBucketChange(cmd);
}
- CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size());
+ ASSERT_EQ(_bucketSpaces.size(), _sender.commands.size());
- completeBucketInfoGathering(lib::ClusterState("distributor:1 storage:1"),
- _bucketSpaces.size(), 10);
+ ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(lib::ClusterState("distributor:1 storage:1"),
+ _bucketSpaces.size(), 10));
- CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size() + 1, _sender.commands.size());
+ ASSERT_EQ(_bucketSpaces.size() + 1, _sender.commands.size());
{
- api::RequestBucketInfoCommand& rbi(
- dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[_bucketSpaces.size()]));
- CPPUNIT_ASSERT_EQUAL(size_t(1), rbi.getBuckets().size());
- CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 1), rbi.getBuckets()[0]);
+ auto& rbi = dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[_bucketSpaces.size()]);
+ ASSERT_EQ(size_t(1), rbi.getBuckets().size());
+ EXPECT_EQ(document::BucketId(16, 1), rbi.getBuckets()[0]);
}
_sender.clear();
@@ -1375,19 +1177,16 @@ BucketDBUpdaterTest::testNotifyChangeWithPendingStateQueuesBucketInfoRequests()
{
lib::ClusterState state("distributor:1 storage:2");
uint32_t expectedMsgs = _bucketSpaces.size(), dummyBucketsToReturn = 1;
- setAndEnableClusterState(state, expectedMsgs, dummyBucketsToReturn);
+ ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(state, expectedMsgs, dummyBucketsToReturn));
}
- CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size());
+ ASSERT_EQ(_bucketSpaces.size(), _sender.commands.size());
{
- api::RequestBucketInfoCommand& rbi(
- dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[0]));
- CPPUNIT_ASSERT_EQUAL(size_t(0), rbi.getBuckets().size());
+ auto& rbi = dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[0]);
+ EXPECT_EQ(size_t(0), rbi.getBuckets().size());
}
}
-void
-BucketDBUpdaterTest::testMergeReply()
-{
+TEST_F(BucketDBUpdaterTest, testMergeReply) {
enableDistributorClusterState("distributor:1 storage:3");
addNodesToBucketDB(document::BucketId(16, 1234),
@@ -1400,23 +1199,21 @@ BucketDBUpdaterTest::testMergeReply()
api::MergeBucketCommand cmd(makeDocumentBucket(document::BucketId(16, 1234)), nodes, 0);
- auto reply(std::make_shared<api::MergeBucketReply>(cmd));
+ auto reply = std::make_shared<api::MergeBucketReply>(cmd);
_sender.clear();
getBucketDBUpdater().onMergeBucketReply(reply);
- CPPUNIT_ASSERT_EQUAL(size_t(3), _sender.commands.size());
+ ASSERT_EQ(size_t(3), _sender.commands.size());
for (uint32_t i = 0; i < 3; i++) {
- std::shared_ptr<api::RequestBucketInfoCommand>
- req(std::dynamic_pointer_cast<api::RequestBucketInfoCommand>(
- _sender.commands[i]));
+ auto req = std::dynamic_pointer_cast<api::RequestBucketInfoCommand>(_sender.commands[i]);
- CPPUNIT_ASSERT(req.get());
- CPPUNIT_ASSERT_EQUAL(size_t(1), req->getBuckets().size());
- CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 1234), req->getBuckets()[0]);
+ ASSERT_TRUE(req.get() != nullptr);
+ ASSERT_EQ(size_t(1), req->getBuckets().size());
+ EXPECT_EQ(document::BucketId(16, 1234), req->getBuckets()[0]);
- auto reqreply(std::make_shared<api::RequestBucketInfoReply>(*req));
+ auto reqreply = std::make_shared<api::RequestBucketInfoReply>(*req);
reqreply->getBucketInfo().push_back(
api::RequestBucketInfoReply::Entry(document::BucketId(16, 1234),
api::BucketInfo(10 * (i + 1), 100 * (i +1), 1000 * (i+1))));
@@ -1424,17 +1221,14 @@ BucketDBUpdaterTest::testMergeReply()
getBucketDBUpdater().onRequestBucketInfoReply(reqreply);
}
- CPPUNIT_ASSERT_EQUAL(
- std::string("BucketId(0x40000000000004d2) : "
- "node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false,ready=false), "
- "node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false), "
- "node(idx=2,crc=0x1e,docs=300/300,bytes=3000/3000,trusted=false,active=false,ready=false)"),
- dumpBucket(document::BucketId(16, 1234)));
+ EXPECT_EQ(std::string("BucketId(0x40000000000004d2) : "
+ "node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false,ready=false), "
+ "node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false), "
+ "node(idx=2,crc=0x1e,docs=300/300,bytes=3000/3000,trusted=false,active=false,ready=false)"),
+ dumpBucket(document::BucketId(16, 1234)));
};
-void
-BucketDBUpdaterTest::testMergeReplyNodeDown()
-{
+TEST_F(BucketDBUpdaterTest, testMergeReplyNodeDown) {
enableDistributorClusterState("distributor:1 storage:3");
std::vector<api::MergeBucketCommand::Node> nodes;
@@ -1446,25 +1240,23 @@ BucketDBUpdaterTest::testMergeReplyNodeDown()
api::MergeBucketCommand cmd(makeDocumentBucket(document::BucketId(16, 1234)), nodes, 0);
- auto reply(std::make_shared<api::MergeBucketReply>(cmd));
+ auto reply = std::make_shared<api::MergeBucketReply>(cmd);
setSystemState(lib::ClusterState("distributor:1 storage:2"));
_sender.clear();
getBucketDBUpdater().onMergeBucketReply(reply);
- CPPUNIT_ASSERT_EQUAL(size_t(2), _sender.commands.size());
+ ASSERT_EQ(size_t(2), _sender.commands.size());
for (uint32_t i = 0; i < 2; i++) {
- std::shared_ptr<api::RequestBucketInfoCommand> req(
- std::dynamic_pointer_cast<api::RequestBucketInfoCommand>(
- _sender.commands[i]));
+ auto req = std::dynamic_pointer_cast<api::RequestBucketInfoCommand>(_sender.commands[i]);
- CPPUNIT_ASSERT(req.get());
- CPPUNIT_ASSERT_EQUAL(size_t(1), req->getBuckets().size());
- CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 1234), req->getBuckets()[0]);
+ ASSERT_TRUE(req.get() != nullptr);
+ ASSERT_EQ(size_t(1), req->getBuckets().size());
+ EXPECT_EQ(document::BucketId(16, 1234), req->getBuckets()[0]);
- auto reqreply(std::make_shared<api::RequestBucketInfoReply>(*req));
+ auto reqreply = std::make_shared<api::RequestBucketInfoReply>(*req);
reqreply->getBucketInfo().push_back(
api::RequestBucketInfoReply::Entry(
document::BucketId(16, 1234),
@@ -1472,16 +1264,13 @@ BucketDBUpdaterTest::testMergeReplyNodeDown()
getBucketDBUpdater().onRequestBucketInfoReply(reqreply);
}
- CPPUNIT_ASSERT_EQUAL(
- std::string("BucketId(0x40000000000004d2) : "
- "node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false,ready=false), "
- "node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false)"),
- dumpBucket(document::BucketId(16, 1234)));
+ EXPECT_EQ(std::string("BucketId(0x40000000000004d2) : "
+ "node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false,ready=false), "
+ "node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false)"),
+ dumpBucket(document::BucketId(16, 1234)));
};
-void
-BucketDBUpdaterTest::testMergeReplyNodeDownAfterRequestSent()
-{
+TEST_F(BucketDBUpdaterTest, testMergeReplyNodeDownAfterRequestSent) {
enableDistributorClusterState("distributor:1 storage:3");
std::vector<api::MergeBucketCommand::Node> nodes;
@@ -1498,20 +1287,18 @@ BucketDBUpdaterTest::testMergeReplyNodeDownAfterRequestSent()
_sender.clear();
getBucketDBUpdater().onMergeBucketReply(reply);
- CPPUNIT_ASSERT_EQUAL(size_t(3), _sender.commands.size());
+ ASSERT_EQ(size_t(3), _sender.commands.size());
setSystemState(lib::ClusterState("distributor:1 storage:2"));
for (uint32_t i = 0; i < 3; i++) {
- std::shared_ptr<api::RequestBucketInfoCommand> req(
- std::dynamic_pointer_cast<api::RequestBucketInfoCommand>(
- _sender.commands[i]));
+ auto req = std::dynamic_pointer_cast<api::RequestBucketInfoCommand>(_sender.commands[i]);
- CPPUNIT_ASSERT(req.get());
- CPPUNIT_ASSERT_EQUAL(size_t(1), req->getBuckets().size());
- CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 1234), req->getBuckets()[0]);
+ ASSERT_TRUE(req.get() != nullptr);
+ ASSERT_EQ(size_t(1), req->getBuckets().size());
+ EXPECT_EQ(document::BucketId(16, 1234), req->getBuckets()[0]);
- auto reqreply(std::make_shared<api::RequestBucketInfoReply>(*req));
+ auto reqreply = std::make_shared<api::RequestBucketInfoReply>(*req);
reqreply->getBucketInfo().push_back(
api::RequestBucketInfoReply::Entry(
document::BucketId(16, 1234),
@@ -1519,17 +1306,14 @@ BucketDBUpdaterTest::testMergeReplyNodeDownAfterRequestSent()
getBucketDBUpdater().onRequestBucketInfoReply(reqreply);
}
- CPPUNIT_ASSERT_EQUAL(
- std::string("BucketId(0x40000000000004d2) : "
- "node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false,ready=false), "
- "node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false)"),
- dumpBucket(document::BucketId(16, 1234)));
+ EXPECT_EQ(std::string("BucketId(0x40000000000004d2) : "
+ "node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false,ready=false), "
+ "node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false)"),
+ dumpBucket(document::BucketId(16, 1234)));
};
-void
-BucketDBUpdaterTest::testFlush()
-{
+TEST_F(BucketDBUpdaterTest, testFlush) {
enableDistributorClusterState("distributor:1 storage:3");
_sender.clear();
@@ -1540,21 +1324,19 @@ BucketDBUpdaterTest::testFlush()
nodes.push_back(api::MergeBucketCommand::Node(i));
}
- api::MergeBucketCommand cmd(makeDocumentBucket(document::BucketId(16, 1234)),
- nodes,
- 0);
+ api::MergeBucketCommand cmd(makeDocumentBucket(document::BucketId(16, 1234)), nodes, 0);
auto reply(std::make_shared<api::MergeBucketReply>(cmd));
_sender.clear();
getBucketDBUpdater().onMergeBucketReply(reply);
- CPPUNIT_ASSERT_EQUAL(size_t(3), _sender.commands.size());
- CPPUNIT_ASSERT_EQUAL(size_t(0), _senderDown.replies.size());
+ ASSERT_EQ(size_t(3), _sender.commands.size());
+ ASSERT_EQ(size_t(0), _senderDown.replies.size());
getBucketDBUpdater().flush();
// Flushing should drop all merge bucket replies
- CPPUNIT_ASSERT_EQUAL(size_t(0), _senderDown.commands.size());
+ EXPECT_EQ(size_t(0), _senderDown.commands.size());
}
std::string
@@ -1569,8 +1351,7 @@ BucketDBUpdaterTest::getSentNodes(
std::ostringstream ost;
for (uint32_t i = 0; i < fixture->sender.commands.size(); i++) {
- RequestBucketInfoCommand& req(dynamic_cast<RequestBucketInfoCommand&>(
- *fixture->sender.commands[i]));
+ auto& req = dynamic_cast<RequestBucketInfoCommand&>(*fixture->sender.commands[i]);
if (i > 0) {
ost << ",";
@@ -1599,155 +1380,124 @@ BucketDBUpdaterTest::getSentNodesDistributionChanged(
std::ostringstream ost;
for (uint32_t i = 0; i < sender.commands.size(); i++) {
- RequestBucketInfoCommand* req =
- dynamic_cast<RequestBucketInfoCommand*>(sender.commands[i].get());
+ auto& req = dynamic_cast<RequestBucketInfoCommand&>(*sender.commands[i]);
if (i > 0) {
ost << ",";
}
- ost << req->getAddress()->getIndex();
+ ost << req.getAddress()->getIndex();
}
return ost.str();
}
-void
-BucketDBUpdaterTest::testPendingClusterStateSendMessages()
-{
- CPPUNIT_ASSERT_EQUAL(
- getNodeList({0, 1, 2}),
- getSentNodes("cluster:d",
- "distributor:1 storage:3"));
-
- CPPUNIT_ASSERT_EQUAL(
- getNodeList({0, 1}),
- getSentNodes("cluster:d",
- "distributor:1 storage:3 .2.s:m"));
-
- CPPUNIT_ASSERT_EQUAL(
- getNodeList({2}),
- getSentNodes("distributor:1 storage:2",
- "distributor:1 storage:3"));
-
- CPPUNIT_ASSERT_EQUAL(
- getNodeList({2, 3, 4, 5}),
- getSentNodes("distributor:1 storage:2",
- "distributor:1 storage:6"));
-
- CPPUNIT_ASSERT_EQUAL(
- getNodeList({0, 1, 2}),
- getSentNodes("distributor:4 storage:3",
- "distributor:3 storage:3"));
-
- CPPUNIT_ASSERT_EQUAL(
- getNodeList({0, 1, 2, 3}),
- getSentNodes("distributor:4 storage:3",
- "distributor:4 .2.s:d storage:4"));
-
- CPPUNIT_ASSERT_EQUAL(
- std::string(""),
- getSentNodes("distributor:4 storage:3",
- "distributor:4 .0.s:d storage:4"));
-
- CPPUNIT_ASSERT_EQUAL(
- std::string(""),
- getSentNodes("distributor:3 storage:3",
- "distributor:4 storage:3"));
-
- CPPUNIT_ASSERT_EQUAL(
- getNodeList({2}),
- getSentNodes("distributor:3 storage:3 .2.s:i",
- "distributor:3 storage:3"));
-
- CPPUNIT_ASSERT_EQUAL(
- getNodeList({1}),
- getSentNodes("distributor:3 storage:3 .1.s:d",
- "distributor:3 storage:3"));
-
- CPPUNIT_ASSERT_EQUAL(
- getNodeList({1, 2, 4}),
- getSentNodes("distributor:3 storage:4 .1.s:d .2.s:i",
- "distributor:3 storage:5"));
-
- CPPUNIT_ASSERT_EQUAL(
- std::string(""),
- getSentNodes("distributor:1 storage:3",
- "cluster:d"));
-
- CPPUNIT_ASSERT_EQUAL(
- std::string(""),
- getSentNodes("distributor:1 storage:3",
- "distributor:1 storage:3"));
-
- CPPUNIT_ASSERT_EQUAL(
- std::string(""),
- getSentNodes("distributor:1 storage:3",
- "cluster:d distributor:1 storage:6"));
-
- CPPUNIT_ASSERT_EQUAL(
- std::string(""),
- getSentNodes("distributor:3 storage:3",
- "distributor:3 .2.s:m storage:3"));
-
- CPPUNIT_ASSERT_EQUAL(
- getNodeList({0, 1, 2}),
- getSentNodes("distributor:3 .2.s:m storage:3",
- "distributor:3 .2.s:d storage:3"));
-
- CPPUNIT_ASSERT_EQUAL(
- std::string(""),
- getSentNodes("distributor:3 .2.s:m storage:3",
- "distributor:3 storage:3"));
-
- CPPUNIT_ASSERT_EQUAL(
- getNodeList({0, 1, 2}),
- getSentNodesDistributionChanged("distributor:3 storage:3"));
-
- CPPUNIT_ASSERT_EQUAL(
- getNodeList({0, 1}),
- getSentNodes("distributor:10 storage:2",
- "distributor:10 .1.s:d storage:2"));
-
- CPPUNIT_ASSERT_EQUAL(
- getNodeList({1}),
- getSentNodes("distributor:2 storage:2",
- "distributor:2 storage:2 .1.d:3 .1.d.1.s:d"));
+TEST_F(BucketDBUpdaterTest, testPendingClusterStateSendMessages) {
+ EXPECT_EQ(getNodeList({0, 1, 2}),
+ getSentNodes("cluster:d",
+ "distributor:1 storage:3"));
- CPPUNIT_ASSERT_EQUAL(
- getNodeList({1}),
- getSentNodes("distributor:2 storage:2 .1.s:d",
- "distributor:2 storage:2 .1.d:3 .1.d.1.s:d"));
+ EXPECT_EQ(getNodeList({0, 1}),
+ getSentNodes("cluster:d",
+ "distributor:1 storage:3 .2.s:m"));
- CPPUNIT_ASSERT_EQUAL(
- std::string(""),
- getSentNodes("distributor:2 storage:2",
- "distributor:3 .2.s:i storage:2"));
+ EXPECT_EQ(getNodeList({2}),
+ getSentNodes("distributor:1 storage:2",
+ "distributor:1 storage:3"));
- CPPUNIT_ASSERT_EQUAL(
- getNodeList({0, 1, 2}),
- getSentNodes("distributor:3 storage:3",
- "distributor:3 .2.s:s storage:3"));
+ EXPECT_EQ(getNodeList({2, 3, 4, 5}),
+ getSentNodes("distributor:1 storage:2",
+ "distributor:1 storage:6"));
- CPPUNIT_ASSERT_EQUAL(
- std::string(""),
- getSentNodes("distributor:3 .2.s:s storage:3",
- "distributor:3 .2.s:d storage:3"));
+ EXPECT_EQ(getNodeList({0, 1, 2}),
+ getSentNodes("distributor:4 storage:3",
+ "distributor:3 storage:3"));
- CPPUNIT_ASSERT_EQUAL(
- getNodeList({1}),
- getSentNodes("distributor:3 storage:3 .1.s:m",
- "distributor:3 storage:3"));
-
- CPPUNIT_ASSERT_EQUAL(
- std::string(""),
- getSentNodes("distributor:3 storage:3",
- "distributor:3 storage:3 .1.s:m"));
+ EXPECT_EQ(getNodeList({0, 1, 2, 3}),
+ getSentNodes("distributor:4 storage:3",
+ "distributor:4 .2.s:d storage:4"));
+
+ EXPECT_EQ(std::string(""),
+ getSentNodes("distributor:4 storage:3",
+ "distributor:4 .0.s:d storage:4"));
+
+ EXPECT_EQ(std::string(""),
+ getSentNodes("distributor:3 storage:3",
+ "distributor:4 storage:3"));
+
+ EXPECT_EQ(getNodeList({2}),
+ getSentNodes("distributor:3 storage:3 .2.s:i",
+ "distributor:3 storage:3"));
+
+ EXPECT_EQ(getNodeList({1}),
+ getSentNodes("distributor:3 storage:3 .1.s:d",
+ "distributor:3 storage:3"));
+
+ EXPECT_EQ(getNodeList({1, 2, 4}),
+ getSentNodes("distributor:3 storage:4 .1.s:d .2.s:i",
+ "distributor:3 storage:5"));
+
+ EXPECT_EQ(std::string(""),
+ getSentNodes("distributor:1 storage:3",
+ "cluster:d"));
+
+ EXPECT_EQ(std::string(""),
+ getSentNodes("distributor:1 storage:3",
+ "distributor:1 storage:3"));
+
+ EXPECT_EQ(std::string(""),
+ getSentNodes("distributor:1 storage:3",
+ "cluster:d distributor:1 storage:6"));
+
+ EXPECT_EQ(std::string(""),
+ getSentNodes("distributor:3 storage:3",
+ "distributor:3 .2.s:m storage:3"));
+
+ EXPECT_EQ(getNodeList({0, 1, 2}),
+ getSentNodes("distributor:3 .2.s:m storage:3",
+ "distributor:3 .2.s:d storage:3"));
+
+ EXPECT_EQ(std::string(""),
+ getSentNodes("distributor:3 .2.s:m storage:3",
+ "distributor:3 storage:3"));
+
+ EXPECT_EQ(getNodeList({0, 1, 2}),
+ getSentNodesDistributionChanged("distributor:3 storage:3"));
+
+ EXPECT_EQ(getNodeList({0, 1}),
+ getSentNodes("distributor:10 storage:2",
+ "distributor:10 .1.s:d storage:2"));
+
+ EXPECT_EQ(getNodeList({1}),
+ getSentNodes("distributor:2 storage:2",
+ "distributor:2 storage:2 .1.d:3 .1.d.1.s:d"));
+
+ EXPECT_EQ(getNodeList({1}),
+ getSentNodes("distributor:2 storage:2 .1.s:d",
+ "distributor:2 storage:2 .1.d:3 .1.d.1.s:d"));
+
+ EXPECT_EQ(std::string(""),
+ getSentNodes("distributor:2 storage:2",
+ "distributor:3 .2.s:i storage:2"));
+
+ EXPECT_EQ(getNodeList({0, 1, 2}),
+ getSentNodes("distributor:3 storage:3",
+ "distributor:3 .2.s:s storage:3"));
+
+ EXPECT_EQ(std::string(""),
+ getSentNodes("distributor:3 .2.s:s storage:3",
+ "distributor:3 .2.s:d storage:3"));
+
+ EXPECT_EQ(getNodeList({1}),
+ getSentNodes("distributor:3 storage:3 .1.s:m",
+ "distributor:3 storage:3"));
+
+ EXPECT_EQ(std::string(""),
+ getSentNodes("distributor:3 storage:3",
+ "distributor:3 storage:3 .1.s:m"));
};
-void
-BucketDBUpdaterTest::testPendingClusterStateReceive()
-{
+TEST_F(BucketDBUpdaterTest, testPendingClusterStateReceive) {
MessageSenderStub sender;
auto cmd(std::make_shared<api::SetSystemStateCommand>(
@@ -1761,38 +1511,31 @@ BucketDBUpdaterTest::testPendingClusterStateReceive()
clock, clusterInfo, sender, getBucketSpaceRepo(), getReadOnlyBucketSpaceRepo(),
cmd, outdatedNodesMap, api::Timestamp(1)));
- CPPUNIT_ASSERT_EQUAL(messageCount(3), sender.commands.size());
+ ASSERT_EQ(messageCount(3), sender.commands.size());
sortSentMessagesByIndex(sender);
std::ostringstream ost;
for (uint32_t i = 0; i < sender.commands.size(); i++) {
- RequestBucketInfoCommand* req =
- dynamic_cast<RequestBucketInfoCommand*>(sender.commands[i].get());
+ auto* req = dynamic_cast<RequestBucketInfoCommand*>(sender.commands[i].get());
+ ASSERT_TRUE(req != nullptr);
- RequestBucketInfoReply* rep =
- new RequestBucketInfoReply(*req);
+ auto rep = std::make_shared<RequestBucketInfoReply>(*req);
rep->getBucketInfo().push_back(
RequestBucketInfoReply::Entry(
document::BucketId(16, i),
api::BucketInfo(i, i, i, i, i)));
- CPPUNIT_ASSERT(
- state->onRequestBucketInfoReply(
- std::shared_ptr<api::RequestBucketInfoReply>(rep)));
-
- CPPUNIT_ASSERT_EQUAL(i == sender.commands.size() - 1 ? true : false,
- state->done());
+ ASSERT_TRUE(state->onRequestBucketInfoReply(rep));
+ ASSERT_EQ((i == (sender.commands.size() - 1)), state->done());
}
- auto &pendingTransition = state->getPendingBucketSpaceDbTransition(makeBucketSpace());
- CPPUNIT_ASSERT_EQUAL(3, (int)pendingTransition.results().size());
+ auto& pendingTransition = state->getPendingBucketSpaceDbTransition(makeBucketSpace());
+ EXPECT_EQ(3, (int)pendingTransition.results().size());
}
-void
-BucketDBUpdaterTest::testPendingClusterStateWithGroupDown()
-{
+TEST_F(BucketDBUpdaterTest, testPendingClusterStateWithGroupDown) {
std::string config(getDistConfig6Nodes4Groups());
config += "distributor_auto_ownership_transfer_on_whole_group_down true\n";
setDistribution(config);
@@ -1801,30 +1544,25 @@ BucketDBUpdaterTest::testPendingClusterStateWithGroupDown()
// We're node index 0.
// Entire group 1 goes down. Must refetch from all nodes.
- CPPUNIT_ASSERT_EQUAL(
- getNodeList({0, 1, 2, 3, 4, 5}),
- getSentNodes("distributor:6 storage:6",
- "distributor:6 .2.s:d .3.s:d storage:6"));
+ EXPECT_EQ(getNodeList({0, 1, 2, 3, 4, 5}),
+ getSentNodes("distributor:6 storage:6",
+ "distributor:6 .2.s:d .3.s:d storage:6"));
// But don't fetch if not the entire group is down.
- CPPUNIT_ASSERT_EQUAL(
- std::string(""),
- getSentNodes("distributor:6 storage:6",
- "distributor:6 .2.s:d storage:6"));
+ EXPECT_EQ(std::string(""),
+ getSentNodes("distributor:6 storage:6",
+ "distributor:6 .2.s:d storage:6"));
}
-void
-BucketDBUpdaterTest::testPendingClusterStateWithGroupDownAndNoHandover()
-{
+TEST_F(BucketDBUpdaterTest, testPendingClusterStateWithGroupDownAndNoHandover) {
std::string config(getDistConfig6Nodes4Groups());
config += "distributor_auto_ownership_transfer_on_whole_group_down false\n";
setDistribution(config);
// Group is down, but config says to not do anything about it.
- CPPUNIT_ASSERT_EQUAL(
- getNodeList({0, 1, 2, 3, 4, 5}, _bucketSpaces.size() - 1),
- getSentNodes("distributor:6 storage:6",
- "distributor:6 .2.s:d .3.s:d storage:6"));
+ EXPECT_EQ(getNodeList({0, 1, 2, 3, 4, 5}, _bucketSpaces.size() - 1),
+ getSentNodes("distributor:6 storage:6",
+ "distributor:6 .2.s:d .3.s:d storage:6"));
}
void
@@ -1840,7 +1578,7 @@ parseInputData(const std::string& data,
uint16_t node = atoi(tok2[0].data());
state.setNodeReplied(node);
- auto &pendingTransition = state.getPendingBucketSpaceDbTransition(makeBucketSpace());
+ auto& pendingTransition = state.getPendingBucketSpaceDbTransition(makeBucketSpace());
vespalib::StringTokenizer tok3(tok2[1], ",");
for (uint32_t j = 0; j < tok3.size(); j++) {
@@ -1874,7 +1612,7 @@ struct BucketDumper : public BucketDatabase::EntryProcessor
std::ostringstream ost;
bool _includeBucketInfo;
- BucketDumper(bool includeBucketInfo)
+ explicit BucketDumper(bool includeBucketInfo)
: _includeBucketInfo(includeBucketInfo)
{
}
@@ -1969,11 +1707,9 @@ BucketDBUpdaterTest::mergeBucketLists(const std::string& existingData,
includeBucketInfo);
}
-void
-BucketDBUpdaterTest::testPendingClusterStateMerge()
-{
+TEST_F(BucketDBUpdaterTest, testPendingClusterStateMerge) {
// Simple initializing case - ask all nodes for info
- CPPUNIT_ASSERT_EQUAL(
+ EXPECT_EQ(
// Result is on the form: [bucket w/o count bits]:[node indexes]|..
std::string("4:0,1|2:0,1|6:1,2|1:0,2|5:2,0|3:2,1|"),
// Input is on the form: [node]:[bucket w/o count bits]|...
@@ -1982,7 +1718,7 @@ BucketDBUpdaterTest::testPendingClusterStateMerge()
"0:1,2,4,5|1:2,3,4,6|2:1,3,5,6"));
// Node came up with fewer buckets (lost disk)
- CPPUNIT_ASSERT_EQUAL(
+ EXPECT_EQ(
std::string("4:1|2:0,1|6:1,2|1:2,0|5:2|3:2,1|"),
mergeBucketLists(
lib::ClusterState("distributor:1 storage:3"),
@@ -1992,7 +1728,7 @@ BucketDBUpdaterTest::testPendingClusterStateMerge()
);
// New node came up
- CPPUNIT_ASSERT_EQUAL(
+ EXPECT_EQ(
std::string("4:0,1|2:0,1|6:1,2,3|1:0,2,3|5:2,0,3|3:2,1,3|"),
mergeBucketLists(
"0:1,2,4,5|1:2,3,4,6|2:1,3,5,6",
@@ -2001,14 +1737,14 @@ BucketDBUpdaterTest::testPendingClusterStateMerge()
// Node came up with some buckets removed and some added
// Buckets that were removed should not be removed as the node
// didn't lose a disk.
- CPPUNIT_ASSERT_EQUAL(
+ EXPECT_EQ(
std::string("8:0|4:0,1|2:0,1|6:1,0,2|1:0,2|5:2,0|3:2,1|"),
mergeBucketLists(
"0:1,2,4,5|1:2,3,4,6|2:1,3,5,6",
"0:1,2,6,8"));
// Node came up with no buckets
- CPPUNIT_ASSERT_EQUAL(
+ EXPECT_EQ(
std::string("4:1|2:1|6:1,2|1:2|5:2|3:2,1|"),
mergeBucketLists(
lib::ClusterState("distributor:1 storage:3"),
@@ -2019,7 +1755,7 @@ BucketDBUpdaterTest::testPendingClusterStateMerge()
// One node lost a disk, another was just reasked (distributor
// change)
- CPPUNIT_ASSERT_EQUAL(
+ EXPECT_EQ(
std::string("2:0,1|6:1,2|1:2,0|5:2|3:2,1|"),
mergeBucketLists(
lib::ClusterState("distributor:1 storage:3"),
@@ -2030,23 +1766,20 @@ BucketDBUpdaterTest::testPendingClusterStateMerge()
// Bucket info format is "bucketid/checksum/count/size"
// Node went from initializing to up and invalid bucket went to empty.
- CPPUNIT_ASSERT_EQUAL(
+ EXPECT_EQ(
std::string("2:0/0/0/0/t|"),
mergeBucketLists(
"0:2/0/0/1",
"0:2/0/0/0",
true));
- CPPUNIT_ASSERT_EQUAL(
- std::string("5:1/2/3/4/u,0/0/0/0/u|"),
- mergeBucketLists("", "0:5/0/0/0|1:5/2/3/4", true));
+ EXPECT_EQ(std::string("5:1/2/3/4/u,0/0/0/0/u|"),
+ mergeBucketLists("", "0:5/0/0/0|1:5/2/3/4", true));
}
-void
-BucketDBUpdaterTest::testPendingClusterStateMergeReplicaChanged()
-{
+TEST_F(BucketDBUpdaterTest, testPendingClusterStateMergeReplicaChanged) {
// Node went from initializing to up and non-invalid bucket changed.
- CPPUNIT_ASSERT_EQUAL(
+ EXPECT_EQ(
std::string("2:0/2/3/4/t|3:0/2/4/6/t|"),
mergeBucketLists(
lib::ClusterState("distributor:1 storage:1 .0.s:i"),
@@ -2056,20 +1789,18 @@ BucketDBUpdaterTest::testPendingClusterStateMergeReplicaChanged()
true));
}
-void
-BucketDBUpdaterTest::testNoDbResurrectionForBucketNotOwnedInCurrentState()
-{
+TEST_F(BucketDBUpdaterTest, testNoDbResurrectionForBucketNotOwnedInCurrentState) {
document::BucketId bucket(16, 3);
lib::ClusterState stateBefore("distributor:1 storage:1");
{
uint32_t expectedMsgs = _bucketSpaces.size(), dummyBucketsToReturn = 1;
- setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn);
+ ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn));
}
_sender.clear();
getBucketDBUpdater().recheckBucketInfo(0, makeDocumentBucket(bucket));
- CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size());
+ ASSERT_EQ(size_t(1), _sender.commands.size());
std::shared_ptr<api::RequestBucketInfoCommand> rbi(
std::dynamic_pointer_cast<RequestBucketInfoCommand>(
_sender.commands[0]));
@@ -2078,30 +1809,28 @@ BucketDBUpdaterTest::testNoDbResurrectionForBucketNotOwnedInCurrentState()
{
uint32_t expectedMsgs = messageCount(2), dummyBucketsToReturn = 1;
- setAndEnableClusterState(stateAfter, expectedMsgs, dummyBucketsToReturn);
+ ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(stateAfter, expectedMsgs, dummyBucketsToReturn));
}
- CPPUNIT_ASSERT(!getBucketDBUpdater().getDistributorComponent()
- .ownsBucketInCurrentState(makeDocumentBucket(bucket)));
+ EXPECT_FALSE(getBucketDBUpdater().getDistributorComponent()
+ .ownsBucketInCurrentState(makeDocumentBucket(bucket)));
- sendFakeReplyForSingleBucketRequest(*rbi);
+ ASSERT_NO_FATAL_FAILURE(sendFakeReplyForSingleBucketRequest(*rbi));
- CPPUNIT_ASSERT_EQUAL(std::string("NONEXISTING"), dumpBucket(bucket));
+ EXPECT_EQ(std::string("NONEXISTING"), dumpBucket(bucket));
}
-void
-BucketDBUpdaterTest::testNoDbResurrectionForBucketNotOwnedInPendingState()
-{
+TEST_F(BucketDBUpdaterTest, testNoDbResurrectionForBucketNotOwnedInPendingState) {
document::BucketId bucket(16, 3);
lib::ClusterState stateBefore("distributor:1 storage:1");
{
uint32_t expectedMsgs = _bucketSpaces.size(), dummyBucketsToReturn = 1;
- setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn);
+ ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn));
}
_sender.clear();
getBucketDBUpdater().recheckBucketInfo(0, makeDocumentBucket(bucket));
- CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size());
+ ASSERT_EQ(size_t(1), _sender.commands.size());
std::shared_ptr<api::RequestBucketInfoCommand> rbi(
std::dynamic_pointer_cast<RequestBucketInfoCommand>(
_sender.commands[0]));
@@ -2109,14 +1838,14 @@ BucketDBUpdaterTest::testNoDbResurrectionForBucketNotOwnedInPendingState()
lib::ClusterState stateAfter("distributor:3 storage:3");
// Set, but _don't_ enable cluster state. We want it to be pending.
setSystemState(stateAfter);
- CPPUNIT_ASSERT(getBucketDBUpdater().getDistributorComponent()
- .ownsBucketInCurrentState(makeDocumentBucket(bucket)));
- CPPUNIT_ASSERT(!getBucketDBUpdater()
- .checkOwnershipInPendingState(makeDocumentBucket(bucket)).isOwned());
+ EXPECT_TRUE(getBucketDBUpdater().getDistributorComponent()
+ .ownsBucketInCurrentState(makeDocumentBucket(bucket)));
+ EXPECT_FALSE(getBucketDBUpdater()
+ .checkOwnershipInPendingState(makeDocumentBucket(bucket)).isOwned());
- sendFakeReplyForSingleBucketRequest(*rbi);
+ ASSERT_NO_FATAL_FAILURE(sendFakeReplyForSingleBucketRequest(*rbi));
- CPPUNIT_ASSERT_EQUAL(std::string("NONEXISTING"), dumpBucket(bucket));
+ EXPECT_EQ(std::string("NONEXISTING"), dumpBucket(bucket));
}
/*
@@ -2126,109 +1855,101 @@ BucketDBUpdaterTest::testNoDbResurrectionForBucketNotOwnedInPendingState()
* will with a high likelihood end up not getting the complete view of the buckets in
* the cluster.
*/
-void
-BucketDBUpdaterTest::testClusterStateAlwaysSendsFullFetchWhenDistributionChangePending()
-{
+TEST_F(BucketDBUpdaterTest, testClusterStateAlwaysSendsFullFetchWhenDistributionChangePending) {
lib::ClusterState stateBefore("distributor:6 storage:6");
{
uint32_t expectedMsgs = messageCount(6), dummyBucketsToReturn = 1;
- setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn);
+ ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn));
}
_sender.clear();
std::string distConfig(getDistConfig6Nodes2Groups());
setDistribution(distConfig);
sortSentMessagesByIndex(_sender);
- CPPUNIT_ASSERT_EQUAL(messageCount(6), _sender.commands.size());
+ ASSERT_EQ(messageCount(6), _sender.commands.size());
// Suddenly, a wild cluster state change appears! Even though this state
// does not in itself imply any bucket changes, it will still overwrite the
// pending cluster state and thus its state of pending bucket info requests.
setSystemState(lib::ClusterState("distributor:6 .2.t:12345 storage:6"));
- CPPUNIT_ASSERT_EQUAL(messageCount(12), _sender.commands.size());
+ ASSERT_EQ(messageCount(12), _sender.commands.size());
// Send replies for first messageCount(6) (outdated requests).
int numBuckets = 10;
for (uint32_t i = 0; i < messageCount(6); ++i) {
- fakeBucketReply(lib::ClusterState("distributor:6 storage:6"),
- *_sender.commands[i], numBuckets);
+ ASSERT_NO_FATAL_FAILURE(fakeBucketReply(lib::ClusterState("distributor:6 storage:6"),
+ *_sender.commands[i], numBuckets));
}
// No change from these.
- assertCorrectBuckets(1, "distributor:6 storage:6");
+ ASSERT_NO_FATAL_FAILURE(assertCorrectBuckets(1, "distributor:6 storage:6"));
// Send for current pending.
for (uint32_t i = 0; i < messageCount(6); ++i) {
- fakeBucketReply(lib::ClusterState("distributor:6 .2.t:12345 storage:6"),
- *_sender.commands[i + messageCount(6)],
- numBuckets);
+ ASSERT_NO_FATAL_FAILURE(fakeBucketReply(lib::ClusterState("distributor:6 .2.t:12345 storage:6"),
+ *_sender.commands[i + messageCount(6)],
+ numBuckets));
}
- assertCorrectBuckets(numBuckets, "distributor:6 storage:6");
+ ASSERT_NO_FATAL_FAILURE(assertCorrectBuckets(numBuckets, "distributor:6 storage:6"));
_sender.clear();
// No more pending global fetch; this should be a no-op state.
setSystemState(lib::ClusterState("distributor:6 .3.t:12345 storage:6"));
- CPPUNIT_ASSERT_EQUAL(size_t(0), _sender.commands.size());
+ EXPECT_EQ(size_t(0), _sender.commands.size());
}
-void
-BucketDBUpdaterTest::testChangedDistributionConfigTriggersRecoveryMode()
-{
- setAndEnableClusterState(lib::ClusterState("distributor:6 storage:6"), messageCount(6), 20);
+TEST_F(BucketDBUpdaterTest, testChangedDistributionConfigTriggersRecoveryMode) {
+ ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("distributor:6 storage:6"), messageCount(6), 20));
_sender.clear();
// First cluster state; implicit scan of all buckets which does not
// use normal recovery mode ticking-path.
- CPPUNIT_ASSERT(!_distributor->isInRecoveryMode());
+ EXPECT_FALSE(_distributor->isInRecoveryMode());
std::string distConfig(getDistConfig6Nodes4Groups());
setDistribution(distConfig);
sortSentMessagesByIndex(_sender);
// No replies received yet, still no recovery mode.
- CPPUNIT_ASSERT(!_distributor->isInRecoveryMode());
+ EXPECT_FALSE(_distributor->isInRecoveryMode());
- CPPUNIT_ASSERT_EQUAL(messageCount(6), _sender.commands.size());
+ ASSERT_EQ(messageCount(6), _sender.commands.size());
uint32_t numBuckets = 10;
for (uint32_t i = 0; i < messageCount(6); ++i) {
- fakeBucketReply(lib::ClusterState("distributor:6 storage:6"),
- *_sender.commands[i], numBuckets);
+ ASSERT_NO_FATAL_FAILURE(fakeBucketReply(lib::ClusterState("distributor:6 storage:6"),
+ *_sender.commands[i], numBuckets));
}
// Pending cluster state (i.e. distribution) has been enabled, which should
// cause recovery mode to be entered.
- CPPUNIT_ASSERT(_distributor->isInRecoveryMode());
+ EXPECT_TRUE(_distributor->isInRecoveryMode());
}
-void
-BucketDBUpdaterTest::testNewlyAddedBucketsHaveCurrentTimeAsGcTimestamp()
-{
+TEST_F(BucketDBUpdaterTest, testNewlyAddedBucketsHaveCurrentTimeAsGcTimestamp) {
getClock().setAbsoluteTimeInSeconds(101234);
lib::ClusterState stateBefore("distributor:1 storage:1");
{
uint32_t expectedMsgs = _bucketSpaces.size(), dummyBucketsToReturn = 1;
- setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn);
+ ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn));
}
// setAndEnableClusterState adds n buckets with id (16, i)
document::BucketId bucket(16, 0);
BucketDatabase::Entry e(getBucket(bucket));
- CPPUNIT_ASSERT(e.valid());
- CPPUNIT_ASSERT_EQUAL(uint32_t(101234), e->getLastGarbageCollectionTime());
+ ASSERT_TRUE(e.valid());
+ EXPECT_EQ(uint32_t(101234), e->getLastGarbageCollectionTime());
}
-void
-BucketDBUpdaterTest::testNewerMutationsNotOverwrittenByEarlierBucketFetch()
-{
+TEST_F(BucketDBUpdaterTest, testNewerMutationsNotOverwrittenByEarlierBucketFetch) {
{
lib::ClusterState stateBefore("distributor:1 storage:1 .0.s:i");
uint32_t expectedMsgs = _bucketSpaces.size(), dummyBucketsToReturn = 0;
// This step is required to make the distributor ready for accepting
// the below explicit database insertion towards node 0.
- setAndEnableClusterState(stateBefore, expectedMsgs,
- dummyBucketsToReturn);
+ ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(stateBefore, expectedMsgs,
+ dummyBucketsToReturn));
}
_sender.clear();
getClock().setAbsoluteTimeInSeconds(1000);
lib::ClusterState state("distributor:1 storage:1");
setSystemState(state);
- CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size());
+ ASSERT_EQ(_bucketSpaces.size(), _sender.commands.size());
// Before replying with the bucket info, simulate the arrival of a mutation
// reply that alters the state of the bucket with information that will be
@@ -2253,13 +1974,12 @@ BucketDBUpdaterTest::testNewerMutationsNotOverwrittenByEarlierBucketFetch()
// correctness, as this should contain the same bucket info as that
// contained in the full bucket reply and the DB update is thus idempotent.
for (uint32_t i = 0; i < _bucketSpaces.size(); ++i) {
- fakeBucketReply(state, *_sender.commands[i], bucketsReturned);
+ ASSERT_NO_FATAL_FAILURE(fakeBucketReply(state, *_sender.commands[i], bucketsReturned));
}
BucketDatabase::Entry e(getBucket(bucket));
- CPPUNIT_ASSERT_EQUAL(uint32_t(1), e->getNodeCount());
- CPPUNIT_ASSERT_EQUAL(wantedInfo, e->getNodeRef(0).getBucketInfo());
-
+ ASSERT_EQ(uint32_t(1), e->getNodeCount());
+ EXPECT_EQ(wantedInfo, e->getNodeRef(0).getBucketInfo());
}
std::vector<uint16_t>
@@ -2286,9 +2006,11 @@ BucketDBUpdaterTest::getSentNodesWithPreemption(
{
lib::ClusterState stateBefore(oldClusterState);
uint32_t dummyBucketsToReturn = 10;
+ // FIXME cannot chain assertion checks in non-void function
setAndEnableClusterState(lib::ClusterState(oldClusterState),
expectedOldStateMessages,
dummyBucketsToReturn);
+
_sender.clear();
setSystemState(lib::ClusterState(preemptedClusterState));
@@ -2309,10 +2031,8 @@ using nodeVec = std::vector<uint16_t>;
* database modifications caused by intermediate states will not be
* accounted for (basically the ABA problem in a distributed setting).
*/
-void
-BucketDBUpdaterTest::preemptedDistrChangeCarriesNodeSetOverToNextStateFetch()
-{
- CPPUNIT_ASSERT_EQUAL(
+TEST_F(BucketDBUpdaterTest, preemptedDistrChangeCarriesNodeSetOverToNextStateFetch) {
+ EXPECT_EQ(
expandNodeVec({0, 1, 2, 3, 4, 5}),
getSentNodesWithPreemption("version:1 distributor:6 storage:6",
messageCount(6),
@@ -2320,10 +2040,8 @@ BucketDBUpdaterTest::preemptedDistrChangeCarriesNodeSetOverToNextStateFetch()
"version:3 distributor:6 storage:6"));
}
-void
-BucketDBUpdaterTest::preemptedStorChangeCarriesNodeSetOverToNextStateFetch()
-{
- CPPUNIT_ASSERT_EQUAL(
+TEST_F(BucketDBUpdaterTest, preemptedStorChangeCarriesNodeSetOverToNextStateFetch) {
+ EXPECT_EQ(
expandNodeVec({2, 3}),
getSentNodesWithPreemption(
"version:1 distributor:6 storage:6 .2.s:d",
@@ -2332,10 +2050,8 @@ BucketDBUpdaterTest::preemptedStorChangeCarriesNodeSetOverToNextStateFetch()
"version:3 distributor:6 storage:6"));
}
-void
-BucketDBUpdaterTest::preemptedStorageNodeDownMustBeReFetched()
-{
- CPPUNIT_ASSERT_EQUAL(
+TEST_F(BucketDBUpdaterTest, preemptedStorageNodeDownMustBeReFetched) {
+ EXPECT_EQ(
expandNodeVec({2}),
getSentNodesWithPreemption(
"version:1 distributor:6 storage:6",
@@ -2344,10 +2060,8 @@ BucketDBUpdaterTest::preemptedStorageNodeDownMustBeReFetched()
"version:3 distributor:6 storage:6"));
}
-void
-BucketDBUpdaterTest::doNotSendToPreemptedNodeNowInDownState()
-{
- CPPUNIT_ASSERT_EQUAL(
+TEST_F(BucketDBUpdaterTest, doNotSendToPreemptedNodeNowInDownState) {
+ EXPECT_EQ(
nodeVec{},
getSentNodesWithPreemption(
"version:1 distributor:6 storage:6 .2.s:d",
@@ -2356,12 +2070,10 @@ BucketDBUpdaterTest::doNotSendToPreemptedNodeNowInDownState()
"version:3 distributor:6 storage:6 .2.s:d")); // 2 down again.
}
-void
-BucketDBUpdaterTest::doNotSendToPreemptedNodeNotPartOfNewState()
-{
+TEST_F(BucketDBUpdaterTest, doNotSendToPreemptedNodeNotPartOfNewState) {
// Even though 100 nodes are preempted, not all of these should be part
// of the request afterwards when only 6 are part of the state.
- CPPUNIT_ASSERT_EQUAL(
+ EXPECT_EQ(
expandNodeVec({0, 1, 2, 3, 4, 5}),
getSentNodesWithPreemption(
"version:1 distributor:6 storage:100",
@@ -2370,20 +2082,18 @@ BucketDBUpdaterTest::doNotSendToPreemptedNodeNotPartOfNewState()
"version:3 distributor:6 storage:6"));
}
-void
-BucketDBUpdaterTest::outdatedNodeSetClearedAfterSuccessfulStateCompletion()
-{
+TEST_F(BucketDBUpdaterTest, outdatedNodeSetClearedAfterSuccessfulStateCompletion) {
lib::ClusterState stateBefore(
"version:1 distributor:6 storage:6 .1.t:1234");
uint32_t expectedMsgs = messageCount(6), dummyBucketsToReturn = 10;
- setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn);
+ ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn));
_sender.clear();
// New cluster state that should not by itself trigger any new fetches,
// unless outdated node set is somehow not cleared after an enabled
// (completed) cluster state has been set.
lib::ClusterState stateAfter("version:3 distributor:6 storage:6");
setSystemState(stateAfter);
- CPPUNIT_ASSERT_EQUAL(size_t(0), _sender.commands.size());
+ EXPECT_EQ(size_t(0), _sender.commands.size());
}
// XXX test currently disabled since distribution config currently isn't used
@@ -2392,12 +2102,10 @@ BucketDBUpdaterTest::outdatedNodeSetClearedAfterSuccessfulStateCompletion()
// distribution config will follow very shortly after the config has been
// applied to the node. The new cluster state will then send out requests to
// the correct node set.
-void
-BucketDBUpdaterTest::clusterConfigDownsizeOnlySendsToAvailableNodes()
-{
+TEST_F(BucketDBUpdaterTest, DISABLED_clusterConfigDownsizeOnlySendsToAvailableNodes) {
uint32_t expectedMsgs = 6, dummyBucketsToReturn = 20;
- setAndEnableClusterState(lib::ClusterState("distributor:6 storage:6"),
- expectedMsgs, dummyBucketsToReturn);
+ ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("distributor:6 storage:6"),
+ expectedMsgs, dummyBucketsToReturn));
_sender.clear();
// Intentionally trigger a racing config change which arrives before the
@@ -2406,14 +2114,12 @@ BucketDBUpdaterTest::clusterConfigDownsizeOnlySendsToAvailableNodes()
setDistribution(distConfig);
sortSentMessagesByIndex(_sender);
- CPPUNIT_ASSERT_EQUAL((nodeVec{0, 1, 2}), getSendSet());
+ EXPECT_EQ((nodeVec{0, 1, 2}), getSendSet());
}
-void
-BucketDBUpdaterTest::changedDiskSetTriggersReFetch()
-{
+TEST_F(BucketDBUpdaterTest, changedDiskSetTriggersReFetch) {
// Same number of online disks, but the set of disks has changed.
- CPPUNIT_ASSERT_EQUAL(
+ EXPECT_EQ(
getNodeList({1}),
getSentNodes("distributor:2 storage:2 .1.d:3 .1.d.2.s:d",
"distributor:2 storage:2 .1.d:3 .1.d.1.s:d"));
@@ -2427,12 +2133,10 @@ BucketDBUpdaterTest::changedDiskSetTriggersReFetch()
*
* See VESPA-790 for details.
*/
-void
-BucketDBUpdaterTest::nodeMissingFromConfigIsTreatedAsNeedingOwnershipTransfer()
-{
+TEST_F(BucketDBUpdaterTest, nodeMissingFromConfigIsTreatedAsNeedingOwnershipTransfer) {
uint32_t expectedMsgs = messageCount(3), dummyBucketsToReturn = 1;
- setAndEnableClusterState(lib::ClusterState("distributor:3 storage:3"),
- expectedMsgs, dummyBucketsToReturn);
+ ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("distributor:3 storage:3"),
+ expectedMsgs, dummyBucketsToReturn));
_sender.clear();
// Cluster goes from {0, 1, 2} -> {0, 1}. This leaves us with a config
@@ -2459,78 +2163,64 @@ BucketDBUpdaterTest::nodeMissingFromConfigIsTreatedAsNeedingOwnershipTransfer()
// Attempt to apply state with {0, 1} set. This will compare the new state
// with the previous state, which still has node 2.
expectedMsgs = messageCount(2);
- setAndEnableClusterState(lib::ClusterState("distributor:2 storage:2"),
- expectedMsgs, dummyBucketsToReturn);
+ ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("distributor:2 storage:2"),
+ expectedMsgs, dummyBucketsToReturn));
- CPPUNIT_ASSERT_EQUAL(expandNodeVec({0, 1}), getSendSet());
+ EXPECT_EQ(expandNodeVec({0, 1}), getSendSet());
}
-void
-BucketDBUpdaterTest::changed_distributor_set_implies_ownership_transfer()
-{
+TEST_F(BucketDBUpdaterTest, changed_distributor_set_implies_ownership_transfer) {
auto fixture = createPendingStateFixtureForStateChange(
"distributor:2 storage:2", "distributor:1 storage:2");
- CPPUNIT_ASSERT(fixture->state->hasBucketOwnershipTransfer());
+ EXPECT_TRUE(fixture->state->hasBucketOwnershipTransfer());
fixture = createPendingStateFixtureForStateChange(
"distributor:2 storage:2", "distributor:2 .1.s:d storage:2");
- CPPUNIT_ASSERT(fixture->state->hasBucketOwnershipTransfer());
+ EXPECT_TRUE(fixture->state->hasBucketOwnershipTransfer());
}
-void
-BucketDBUpdaterTest::unchanged_distributor_set_implies_no_ownership_transfer()
-{
+TEST_F(BucketDBUpdaterTest, unchanged_distributor_set_implies_no_ownership_transfer) {
auto fixture = createPendingStateFixtureForStateChange(
"distributor:2 storage:2", "distributor:2 storage:1");
- CPPUNIT_ASSERT(!fixture->state->hasBucketOwnershipTransfer());
+ EXPECT_FALSE(fixture->state->hasBucketOwnershipTransfer());
fixture = createPendingStateFixtureForStateChange(
"distributor:2 storage:2", "distributor:2 storage:2 .1.s:d");
- CPPUNIT_ASSERT(!fixture->state->hasBucketOwnershipTransfer());
+ EXPECT_FALSE(fixture->state->hasBucketOwnershipTransfer());
}
-void
-BucketDBUpdaterTest::changed_distribution_config_implies_ownership_transfer()
-{
+TEST_F(BucketDBUpdaterTest, changed_distribution_config_implies_ownership_transfer) {
auto fixture = createPendingStateFixtureForDistributionChange(
"distributor:2 storage:2");
- CPPUNIT_ASSERT(fixture->state->hasBucketOwnershipTransfer());
+ EXPECT_TRUE(fixture->state->hasBucketOwnershipTransfer());
}
-void
-BucketDBUpdaterTest::transition_time_tracked_for_single_state_change()
-{
- completeStateTransitionInSeconds("distributor:2 storage:2", 5, messageCount(2));
+TEST_F(BucketDBUpdaterTest, transition_time_tracked_for_single_state_change) {
+ ASSERT_NO_FATAL_FAILURE(completeStateTransitionInSeconds("distributor:2 storage:2", 5, messageCount(2)));
- CPPUNIT_ASSERT_EQUAL(uint64_t(5000), lastTransitionTimeInMillis());
+ EXPECT_EQ(uint64_t(5000), lastTransitionTimeInMillis());
}
-void
-BucketDBUpdaterTest::transition_time_reset_across_non_preempting_state_changes()
-{
- completeStateTransitionInSeconds("distributor:2 storage:2", 5, messageCount(2));
- completeStateTransitionInSeconds("distributor:2 storage:3", 3, messageCount(1));
+TEST_F(BucketDBUpdaterTest, transition_time_reset_across_non_preempting_state_changes) {
+ ASSERT_NO_FATAL_FAILURE(completeStateTransitionInSeconds("distributor:2 storage:2", 5, messageCount(2)));
+ ASSERT_NO_FATAL_FAILURE(completeStateTransitionInSeconds("distributor:2 storage:3", 3, messageCount(1)));
- CPPUNIT_ASSERT_EQUAL(uint64_t(3000), lastTransitionTimeInMillis());
+ EXPECT_EQ(uint64_t(3000), lastTransitionTimeInMillis());
}
-void
-BucketDBUpdaterTest::transition_time_tracked_for_distribution_config_change()
-{
+TEST_F(BucketDBUpdaterTest, transition_time_tracked_for_distribution_config_change) {
lib::ClusterState state("distributor:2 storage:2");
- setAndEnableClusterState(state, messageCount(2), 1);
+ ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(state, messageCount(2), 1));
_sender.clear();
std::string distConfig(getDistConfig3Nodes1Group());
setDistribution(distConfig);
getClock().addSecondsToTime(4);
- completeBucketInfoGathering(state, messageCount(2));
- CPPUNIT_ASSERT_EQUAL(uint64_t(4000), lastTransitionTimeInMillis());
+ ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(state, messageCount(2)));
+ EXPECT_EQ(uint64_t(4000), lastTransitionTimeInMillis());
}
-void
-BucketDBUpdaterTest::transition_time_tracked_across_preempted_transitions()
-{
+TEST_F(BucketDBUpdaterTest, transition_time_tracked_across_preempted_transitions) {
_sender.clear();
lib::ClusterState state("distributor:2 storage:2");
setSystemState(state);
@@ -2538,9 +2228,9 @@ BucketDBUpdaterTest::transition_time_tracked_across_preempted_transitions()
// Pre-empted with new state here, which will push out the old pending
// state and replace it with a new one. We should still count the time
// used processing the old state.
- completeStateTransitionInSeconds("distributor:2 storage:3", 3, messageCount(3));
+ ASSERT_NO_FATAL_FAILURE(completeStateTransitionInSeconds("distributor:2 storage:3", 3, messageCount(3)));
- CPPUNIT_ASSERT_EQUAL(uint64_t(8000), lastTransitionTimeInMillis());
+ EXPECT_EQ(uint64_t(8000), lastTransitionTimeInMillis());
}
/*
@@ -2554,10 +2244,10 @@ BucketDBUpdaterTest::transition_time_tracked_across_preempted_transitions()
* Yes, the order of node<->bucket id is reversed between the two, perhaps to make sure you're awake.
*/
-void BucketDBUpdaterTest::batch_update_of_existing_diverging_replicas_does_not_mark_any_as_trusted() {
+TEST_F(BucketDBUpdaterTest, batch_update_of_existing_diverging_replicas_does_not_mark_any_as_trusted) {
// Replacing bucket information for content node 0 should not mark existing
// untrusted replica as trusted as a side effect.
- CPPUNIT_ASSERT_EQUAL(
+ EXPECT_EQ(
std::string("5:1/7/8/9/u,0/1/2/3/u|"),
mergeBucketLists(
lib::ClusterState("distributor:1 storage:3 .0.s:i"),
@@ -2566,43 +2256,38 @@ void BucketDBUpdaterTest::batch_update_of_existing_diverging_replicas_does_not_m
"0:5/1/2/3|1:5/7/8/9", true));
}
-void BucketDBUpdaterTest::batch_add_of_new_diverging_replicas_does_not_mark_any_as_trusted() {
- CPPUNIT_ASSERT_EQUAL(
- std::string("5:1/7/8/9/u,0/1/2/3/u|"),
- mergeBucketLists("", "0:5/1/2/3|1:5/7/8/9", true));
+TEST_F(BucketDBUpdaterTest, batch_add_of_new_diverging_replicas_does_not_mark_any_as_trusted) {
+ EXPECT_EQ(std::string("5:1/7/8/9/u,0/1/2/3/u|"),
+ mergeBucketLists("", "0:5/1/2/3|1:5/7/8/9", true));
}
-void BucketDBUpdaterTest::batch_add_with_single_resulting_replica_implicitly_marks_as_trusted() {
- CPPUNIT_ASSERT_EQUAL(
- std::string("5:0/1/2/3/t|"),
- mergeBucketLists("", "0:5/1/2/3", true));
+TEST_F(BucketDBUpdaterTest, batch_add_with_single_resulting_replica_implicitly_marks_as_trusted) {
+ EXPECT_EQ(std::string("5:0/1/2/3/t|"),
+ mergeBucketLists("", "0:5/1/2/3", true));
}
-void BucketDBUpdaterTest::identity_update_of_single_replica_does_not_clear_trusted() {
- CPPUNIT_ASSERT_EQUAL(
- std::string("5:0/1/2/3/t|"),
- mergeBucketLists("0:5/1/2/3", "0:5/1/2/3", true));
+TEST_F(BucketDBUpdaterTest, identity_update_of_single_replica_does_not_clear_trusted) {
+ EXPECT_EQ(std::string("5:0/1/2/3/t|"),
+ mergeBucketLists("0:5/1/2/3", "0:5/1/2/3", true));
}
-void BucketDBUpdaterTest::identity_update_of_diverging_untrusted_replicas_does_not_mark_any_as_trusted() {
- CPPUNIT_ASSERT_EQUAL(
- std::string("5:1/7/8/9/u,0/1/2/3/u|"),
- mergeBucketLists("0:5/1/2/3|1:5/7/8/9", "0:5/1/2/3|1:5/7/8/9", true));
+TEST_F(BucketDBUpdaterTest, identity_update_of_diverging_untrusted_replicas_does_not_mark_any_as_trusted) {
+ EXPECT_EQ(std::string("5:1/7/8/9/u,0/1/2/3/u|"),
+ mergeBucketLists("0:5/1/2/3|1:5/7/8/9", "0:5/1/2/3|1:5/7/8/9", true));
}
-void BucketDBUpdaterTest::adding_diverging_replica_to_existing_trusted_does_not_remove_trusted() {
- CPPUNIT_ASSERT_EQUAL(
- std::string("5:1/2/3/4/u,0/1/2/3/t|"),
- mergeBucketLists("0:5/1/2/3", "0:5/1/2/3|1:5/2/3/4", true));
+TEST_F(BucketDBUpdaterTest, adding_diverging_replica_to_existing_trusted_does_not_remove_trusted) {
+ EXPECT_EQ(std::string("5:1/2/3/4/u,0/1/2/3/t|"),
+ mergeBucketLists("0:5/1/2/3", "0:5/1/2/3|1:5/2/3/4", true));
}
-void BucketDBUpdaterTest::batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted() {
+TEST_F(BucketDBUpdaterTest, batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted) {
// This differs from batch_update_of_existing_diverging_replicas_does_not_mark_any_as_trusted
// in that _all_ content nodes are considered outdated when distributor changes take place,
// and therefore a slightly different code path is taken. In particular, bucket info for
// outdated nodes gets removed before possibly being re-added (if present in the bucket info
// response).
- CPPUNIT_ASSERT_EQUAL(
+ EXPECT_EQ(
std::string("5:1/7/8/9/u,0/1/2/3/u|"),
mergeBucketLists(
lib::ClusterState("distributor:2 storage:3"),
@@ -2612,7 +2297,7 @@ void BucketDBUpdaterTest::batch_update_from_distributor_change_does_not_mark_div
}
// TODO remove on Vespa 8 - this is a workaround for https://github.com/vespa-engine/vespa/issues/8475
-void BucketDBUpdaterTest::global_distribution_hash_falls_back_to_legacy_format_upon_request_rejection() {
+TEST_F(BucketDBUpdaterTest, global_distribution_hash_falls_back_to_legacy_format_upon_request_rejection) {
std::string distConfig(getDistConfig6Nodes2Groups());
setDistribution(distConfig);
@@ -2620,7 +2305,7 @@ void BucketDBUpdaterTest::global_distribution_hash_falls_back_to_legacy_format_u
const vespalib::string legacy_hash = "(0d3|3|*(0;0;1;2)(1;3;4;5))";
setSystemState(lib::ClusterState("distributor:6 storage:6"));
- CPPUNIT_ASSERT_EQUAL(messageCount(6), _sender.commands.size());
+ ASSERT_EQ(messageCount(6), _sender.commands.size());
api::RequestBucketInfoCommand* global_req = nullptr;
for (auto& cmd : _sender.commands) {
@@ -2630,8 +2315,8 @@ void BucketDBUpdaterTest::global_distribution_hash_falls_back_to_legacy_format_u
break;
}
}
- CPPUNIT_ASSERT(global_req != nullptr);
- CPPUNIT_ASSERT_EQUAL(current_hash, global_req->getDistributionHash());
+ ASSERT_TRUE(global_req != nullptr);
+ ASSERT_EQ(current_hash, global_req->getDistributionHash());
auto reply = std::make_shared<api::RequestBucketInfoReply>(*global_req);
reply->setResult(api::ReturnCode::REJECTED);
@@ -2641,9 +2326,9 @@ void BucketDBUpdaterTest::global_distribution_hash_falls_back_to_legacy_format_u
getBucketDBUpdater().resendDelayedMessages();
// Should now be a resent request with legacy distribution hash
- CPPUNIT_ASSERT_EQUAL(messageCount(6) + 1, _sender.commands.size());
+ ASSERT_EQ(messageCount(6) + 1, _sender.commands.size());
auto& legacy_req = dynamic_cast<api::RequestBucketInfoCommand&>(*_sender.commands.back());
- CPPUNIT_ASSERT_EQUAL(legacy_hash, legacy_req.getDistributionHash());
+ ASSERT_EQ(legacy_hash, legacy_req.getDistributionHash());
// Now if we reject it _again_ we should cycle back to the current hash
// in case it wasn't a hash-based rejection after all. And the circle of life continues.
@@ -2654,9 +2339,9 @@ void BucketDBUpdaterTest::global_distribution_hash_falls_back_to_legacy_format_u
getClock().addSecondsToTime(10);
getBucketDBUpdater().resendDelayedMessages();
- CPPUNIT_ASSERT_EQUAL(messageCount(6) + 2, _sender.commands.size());
+ ASSERT_EQ(messageCount(6) + 2, _sender.commands.size());
auto& new_current_req = dynamic_cast<api::RequestBucketInfoCommand&>(*_sender.commands.back());
- CPPUNIT_ASSERT_EQUAL(current_hash, new_current_req.getDistributionHash());
+ ASSERT_EQ(current_hash, new_current_req.getDistributionHash());
}
namespace {
@@ -2680,23 +2365,21 @@ void for_each_bucket(const DistributorBucketSpaceRepo& repo, Func&& f) {
}
-using ConfigBuilder = vespa::config::content::core::StorDistributormanagerConfigBuilder;
-
-void BucketDBUpdaterTest::non_owned_buckets_moved_to_read_only_db_on_ownership_change() {
+TEST_F(BucketDBUpdaterTest, non_owned_buckets_moved_to_read_only_db_on_ownership_change) {
getConfig().setAllowStaleReadsDuringClusterStateTransitions(true);
lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition
set_cluster_state_bundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity
- CPPUNIT_ASSERT_EQUAL(messageCount(4), _sender.commands.size());
+ ASSERT_EQ(messageCount(4), _sender.commands.size());
constexpr uint32_t n_buckets = 10;
- completeBucketInfoGathering(initial_state, messageCount(4), n_buckets);
+ ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(initial_state, messageCount(4), n_buckets));
_sender.clear();
- CPPUNIT_ASSERT_EQUAL(size_t(n_buckets), mutable_default_db().size());
- CPPUNIT_ASSERT_EQUAL(size_t(n_buckets), mutable_global_db().size());
- CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size());
- CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_global_db().size());
+ EXPECT_EQ(size_t(n_buckets), mutable_default_db().size());
+ EXPECT_EQ(size_t(n_buckets), mutable_global_db().size());
+ EXPECT_EQ(size_t(0), read_only_default_db().size());
+ EXPECT_EQ(size_t(0), read_only_global_db().size());
lib::ClusterState pending_state("distributor:2 storage:4");
@@ -2707,54 +2390,54 @@ void BucketDBUpdaterTest::non_owned_buckets_moved_to_read_only_db_on_ownership_c
buckets_not_owned_in_pending_state.insert(Bucket(space, entry.getBucketId()));
}
});
- CPPUNIT_ASSERT(!buckets_not_owned_in_pending_state.empty());
+ EXPECT_FALSE(buckets_not_owned_in_pending_state.empty());
set_cluster_state_bundle(lib::ClusterStateBundle(pending_state, {}, true)); // Now requires activation
const auto buckets_not_owned_per_space = (buckets_not_owned_in_pending_state.size() / 2); // 2 spaces
const auto expected_mutable_buckets = n_buckets - buckets_not_owned_per_space;
- CPPUNIT_ASSERT_EQUAL(expected_mutable_buckets, mutable_default_db().size());
- CPPUNIT_ASSERT_EQUAL(expected_mutable_buckets, mutable_global_db().size());
- CPPUNIT_ASSERT_EQUAL(buckets_not_owned_per_space, read_only_default_db().size());
- CPPUNIT_ASSERT_EQUAL(buckets_not_owned_per_space, read_only_global_db().size());
+ EXPECT_EQ(expected_mutable_buckets, mutable_default_db().size());
+ EXPECT_EQ(expected_mutable_buckets, mutable_global_db().size());
+ EXPECT_EQ(buckets_not_owned_per_space, read_only_default_db().size());
+ EXPECT_EQ(buckets_not_owned_per_space, read_only_global_db().size());
for_each_bucket(read_only_repo(), [&](const auto& space, const auto& entry) {
- CPPUNIT_ASSERT(buckets_not_owned_in_pending_state.find(Bucket(space, entry.getBucketId()))
- != buckets_not_owned_in_pending_state.end());
+ EXPECT_TRUE(buckets_not_owned_in_pending_state.find(Bucket(space, entry.getBucketId()))
+ != buckets_not_owned_in_pending_state.end());
});
}
-void BucketDBUpdaterTest::buckets_no_longer_available_are_not_moved_to_read_only_database() {
+TEST_F(BucketDBUpdaterTest, buckets_no_longer_available_are_not_moved_to_read_only_database) {
constexpr uint32_t n_buckets = 10;
// No ownership change, just node down. Test redundancy is 2, so removing 2 nodes will
// cause some buckets to be entirely unavailable.
trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
"version:2 distributor:1 storage:4 .0.s:d .1.s:m", n_buckets, 0);
- CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size());
- CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_global_db().size());
+ EXPECT_EQ(size_t(0), read_only_default_db().size());
+ EXPECT_EQ(size_t(0), read_only_global_db().size());
}
-void BucketDBUpdaterTest::non_owned_buckets_purged_when_read_only_support_is_config_disabled() {
+TEST_F(BucketDBUpdaterTest, non_owned_buckets_purged_when_read_only_support_is_config_disabled) {
getConfig().setAllowStaleReadsDuringClusterStateTransitions(false);
lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition
set_cluster_state_bundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity
- CPPUNIT_ASSERT_EQUAL(messageCount(4), _sender.commands.size());
+ ASSERT_EQ(messageCount(4), _sender.commands.size());
constexpr uint32_t n_buckets = 10;
- completeBucketInfoGathering(initial_state, messageCount(4), n_buckets);
+ ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(initial_state, messageCount(4), n_buckets));
_sender.clear();
// Nothing in read-only DB after first bulk load of buckets.
- CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size());
- CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_global_db().size());
+ EXPECT_EQ(size_t(0), read_only_default_db().size());
+ EXPECT_EQ(size_t(0), read_only_global_db().size());
lib::ClusterState pending_state("distributor:2 storage:4");
setSystemState(pending_state);
// No buckets should be moved into read only db after ownership changes.
- CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size());
- CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_global_db().size());
+ EXPECT_EQ(size_t(0), read_only_default_db().size());
+ EXPECT_EQ(size_t(0), read_only_global_db().size());
}
void BucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transition(
@@ -2768,83 +2451,88 @@ void BucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transition(
getConfig().setAllowStaleReadsDuringClusterStateTransitions(true);
lib::ClusterState initial_state(initial_state_str);
setSystemState(initial_state);
- CPPUNIT_ASSERT_EQUAL(messageCount(initial_expected_msgs), _sender.commands.size());
- completeBucketInfoGathering(initial_state, messageCount(initial_expected_msgs), initial_buckets);
+ ASSERT_EQ(messageCount(initial_expected_msgs), _sender.commands.size());
+ ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(
+ initial_state, messageCount(initial_expected_msgs), initial_buckets));
_sender.clear();
lib::ClusterState pending_state(pending_state_str); // Ownership change
set_cluster_state_bundle(lib::ClusterStateBundle(pending_state, {}, true));
- CPPUNIT_ASSERT_EQUAL(messageCount(pending_expected_msgs), _sender.commands.size());
- completeBucketInfoGathering(pending_state, messageCount(pending_expected_msgs), pending_buckets);
+ ASSERT_EQ(messageCount(pending_expected_msgs), _sender.commands.size());
+ ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(
+ pending_state, messageCount(pending_expected_msgs), pending_buckets));
_sender.clear();
}
-void BucketDBUpdaterTest::deferred_activated_state_does_not_enable_state_until_activation_received() {
+TEST_F(BucketDBUpdaterTest, deferred_activated_state_does_not_enable_state_until_activation_received) {
constexpr uint32_t n_buckets = 10;
- trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4,
- "version:2 distributor:1 storage:4", n_buckets, 4);
+ ASSERT_NO_FATAL_FAILURE(
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4,
+ "version:2 distributor:1 storage:4", n_buckets, 4));
// Version should not be switched over yet
- CPPUNIT_ASSERT_EQUAL(uint32_t(1), getDistributor().getClusterStateBundle().getVersion());
+ EXPECT_EQ(uint32_t(1), getDistributor().getClusterStateBundle().getVersion());
- CPPUNIT_ASSERT_EQUAL(uint64_t(0), mutable_default_db().size());
- CPPUNIT_ASSERT_EQUAL(uint64_t(0), mutable_global_db().size());
+ EXPECT_EQ(uint64_t(0), mutable_default_db().size());
+ EXPECT_EQ(uint64_t(0), mutable_global_db().size());
- CPPUNIT_ASSERT(!activate_cluster_state_version(2));
+ EXPECT_FALSE(activate_cluster_state_version(2));
- CPPUNIT_ASSERT_EQUAL(uint32_t(2), getDistributor().getClusterStateBundle().getVersion());
- CPPUNIT_ASSERT_EQUAL(uint64_t(n_buckets), mutable_default_db().size());
- CPPUNIT_ASSERT_EQUAL(uint64_t(n_buckets), mutable_global_db().size());
+ EXPECT_EQ(uint32_t(2), getDistributor().getClusterStateBundle().getVersion());
+ EXPECT_EQ(uint64_t(n_buckets), mutable_default_db().size());
+ EXPECT_EQ(uint64_t(n_buckets), mutable_global_db().size());
}
-void BucketDBUpdaterTest::read_only_db_cleared_once_pending_state_is_activated() {
+TEST_F(BucketDBUpdaterTest, read_only_db_cleared_once_pending_state_is_activated) {
constexpr uint32_t n_buckets = 10;
- trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
- "version:2 distributor:2 storage:4", n_buckets, 0);
- CPPUNIT_ASSERT(!activate_cluster_state_version(2));
+ ASSERT_NO_FATAL_FAILURE(
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
+ "version:2 distributor:2 storage:4", n_buckets, 0));
+ EXPECT_FALSE(activate_cluster_state_version(2));
- CPPUNIT_ASSERT_EQUAL(uint64_t(0), read_only_default_db().size());
- CPPUNIT_ASSERT_EQUAL(uint64_t(0), read_only_global_db().size());
+ EXPECT_EQ(uint64_t(0), read_only_default_db().size());
+ EXPECT_EQ(uint64_t(0), read_only_global_db().size());
}
-void BucketDBUpdaterTest::read_only_db_is_populated_even_when_self_is_marked_down() {
+TEST_F(BucketDBUpdaterTest, read_only_db_is_populated_even_when_self_is_marked_down) {
constexpr uint32_t n_buckets = 10;
- trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
- "version:2 distributor:1 .0.s:d storage:4", n_buckets, 0);
+ ASSERT_NO_FATAL_FAILURE(
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4,
+ "version:2 distributor:1 .0.s:d storage:4", n_buckets, 0));
// State not yet activated, so read-only DBs have got all the buckets we used to have.
- CPPUNIT_ASSERT_EQUAL(uint64_t(0), mutable_default_db().size());
- CPPUNIT_ASSERT_EQUAL(uint64_t(0), mutable_global_db().size());
- CPPUNIT_ASSERT_EQUAL(uint64_t(n_buckets), read_only_default_db().size());
- CPPUNIT_ASSERT_EQUAL(uint64_t(n_buckets), read_only_global_db().size());
+ EXPECT_EQ(uint64_t(0), mutable_default_db().size());
+ EXPECT_EQ(uint64_t(0), mutable_global_db().size());
+ EXPECT_EQ(uint64_t(n_buckets), read_only_default_db().size());
+ EXPECT_EQ(uint64_t(n_buckets), read_only_global_db().size());
}
-void BucketDBUpdaterTest::activate_cluster_state_request_with_mismatching_version_returns_actual_version() {
+TEST_F(BucketDBUpdaterTest, activate_cluster_state_request_with_mismatching_version_returns_actual_version) {
constexpr uint32_t n_buckets = 10;
- trigger_completed_but_not_yet_activated_transition("version:4 distributor:1 storage:4", n_buckets, 4,
- "version:5 distributor:2 storage:4", n_buckets, 0);
+ ASSERT_NO_FATAL_FAILURE(
+ trigger_completed_but_not_yet_activated_transition("version:4 distributor:1 storage:4", n_buckets, 4,
+ "version:5 distributor:2 storage:4", n_buckets, 0));
- CPPUNIT_ASSERT(activate_cluster_state_version(4)); // Too old version
- assert_has_activate_cluster_state_reply_with_actual_version(5);
+ EXPECT_TRUE(activate_cluster_state_version(4)); // Too old version
+ ASSERT_NO_FATAL_FAILURE(assert_has_activate_cluster_state_reply_with_actual_version(5));
- CPPUNIT_ASSERT(activate_cluster_state_version(6)); // More recent version than what has been observed
- assert_has_activate_cluster_state_reply_with_actual_version(5);
+ EXPECT_TRUE(activate_cluster_state_version(6)); // More recent version than what has been observed
+ ASSERT_NO_FATAL_FAILURE(assert_has_activate_cluster_state_reply_with_actual_version(5));
}
-void BucketDBUpdaterTest::activate_cluster_state_request_without_pending_transition_passes_message_through() {
+TEST_F(BucketDBUpdaterTest, activate_cluster_state_request_without_pending_transition_passes_message_through) {
constexpr uint32_t n_buckets = 10;
- trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4,
- "version:2 distributor:1 storage:4", n_buckets, 4);
+ ASSERT_NO_FATAL_FAILURE(
+ trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4,
+ "version:2 distributor:1 storage:4", n_buckets, 4));
// Activate version 2; no pending cluster state after this.
- CPPUNIT_ASSERT(!activate_cluster_state_version(2));
+ EXPECT_FALSE(activate_cluster_state_version(2));
// No pending cluster state for version 3, just passed through to be implicitly bounced by state manager.
// Note: state manager is not modelled in this test, so we just check that the message handler returns
// false (meaning "didn't take message ownership") and there's no auto-generated reply.
- CPPUNIT_ASSERT(!activate_cluster_state_version(3));
- CPPUNIT_ASSERT_EQUAL(size_t(0), _sender.replies.size());
+ EXPECT_FALSE(activate_cluster_state_version(3));
+ EXPECT_EQ(size_t(0), _sender.replies.size());
}
-// TODO rename distributor config to imply two phase functionlity explicitly?
-
}