diff options
29 files changed, 1117 insertions, 1094 deletions
diff --git a/.github/ISSUE_TEMPLATE/vespa-8.md b/.github/ISSUE_TEMPLATE/vespa-8.md new file mode 100644 index 00000000000..f35e428fa12 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/vespa-8.md @@ -0,0 +1,15 @@ +--- +name: Vespa 8 +about: Template for issues related to Vespa 8 +title: 'Vespa 8: [your description]' +labels: Vespa 8 +assignees: '' + +--- + +When creating an issue, please consider the following items: + +* How are users affected? +* How can app owners check if they are affected? +* How can tenant fix the issue? +* How to communicate the change to the users? diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/Admin.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/Admin.java index 8a98730c9c6..22811ae5878 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/admin/Admin.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/Admin.java @@ -6,11 +6,9 @@ import com.yahoo.cloud.config.ZookeepersConfig; import com.yahoo.cloud.config.log.LogdConfig; import com.yahoo.config.application.api.DeployLogger; import com.yahoo.config.model.ConfigModelContext.ApplicationType; -import com.yahoo.config.model.api.ConfigServerSpec; import com.yahoo.config.model.deploy.DeployState; import com.yahoo.config.model.producer.AbstractConfigProducer; import com.yahoo.config.provision.ApplicationId; -import com.yahoo.config.provision.SystemName; import com.yahoo.config.provision.Zone; import com.yahoo.vespa.model.AbstractService; import com.yahoo.vespa.model.ConfigProxy; @@ -20,10 +18,8 @@ import com.yahoo.vespa.model.Logd; import com.yahoo.vespa.model.admin.clustercontroller.ClusterControllerContainerCluster; import com.yahoo.vespa.model.admin.metricsproxy.MetricsProxyContainer; import com.yahoo.vespa.model.admin.metricsproxy.MetricsProxyContainerCluster; -import com.yahoo.vespa.model.admin.monitoring.MetricsConsumer; import com.yahoo.vespa.model.admin.monitoring.Monitoring; import com.yahoo.vespa.model.admin.monitoring.builder.Metrics; -import com.yahoo.vespa.model.container.ContainerCluster; import com.yahoo.vespa.model.filedistribution.FileDistributionConfigProducer; import com.yahoo.vespa.model.filedistribution.FileDistributionConfigProvider; import com.yahoo.vespa.model.filedistribution.FileDistributor; @@ -32,7 +28,6 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Optional; /** @@ -156,6 +151,7 @@ public class Admin extends AbstractConfigProducer implements Serializable { logserver(new LogdConfig.Logserver.Builder(). use(logServerContainerCluster.isPresent() || !isHostedVespa). host(logserver.getHostName()). + rpcport(logserver.getRelativePort(0)). port(logserver.getRelativePort(1))); } } diff --git a/config-model/src/main/java/com/yahoo/vespa/model/admin/Logserver.java b/config-model/src/main/java/com/yahoo/vespa/model/admin/Logserver.java index 0d641dcbd38..6c173043092 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/admin/Logserver.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/admin/Logserver.java @@ -9,6 +9,7 @@ import com.yahoo.vespa.model.AbstractService; * system. * * @author gjoranv + * @author bjorncs */ public class Logserver extends AbstractService { @@ -17,10 +18,10 @@ public class Logserver extends AbstractService { public Logserver(AbstractConfigProducer parent) { super(parent, "logserver"); - portsMeta.on(0).tag("unused"); - portsMeta.on(1).tag("logtp"); - portsMeta.on(2).tag("logtp").tag("telnet").tag("last-errors-holder"); - portsMeta.on(3).tag("logtp").tag("telnet").tag("replicator"); + portsMeta.on(0).tag("logtp").tag("rpc"); + portsMeta.on(1).tag("logtp").tag("legacy"); + portsMeta.on(2).tag("unused"); + portsMeta.on(3).tag("unused"); setProp("clustertype", "admin"); setProp("clustername", "admin"); } @@ -37,6 +38,8 @@ public class Logserver extends AbstractService { */ private String getMyJVMArgs() { StringBuilder sb = new StringBuilder(); + sb.append("-Dlogserver.rpcListenPort=").append(getRelativePort(0)); + sb.append(" "); sb.append("-Dlogserver.listenport=").append(getRelativePort(1)); sb.append(" "); sb.append("-Dlogserver.logarchive.dir=" + logArchiveDir); @@ -56,7 +59,7 @@ public class Logserver extends AbstractService { * @return 'true' always */ public boolean requiresWantedPort() { - return true; + return true; // TODO Support dynamic port allocation for logserver } /** @@ -68,7 +71,7 @@ public class Logserver extends AbstractService { @Override public String[] getPortSuffixes() { - return new String[]{ "unused", "logtp", "last.errors", "replicator" }; + return new String[]{ "rpc", "legacy", "unused/1", "unused/2" }; } } diff --git a/config-model/src/test/java/com/yahoo/vespa/model/admin/AdminTestCase.java b/config-model/src/test/java/com/yahoo/vespa/model/admin/AdminTestCase.java index 749ee2b9acc..6d65c2a472e 100644 --- a/config-model/src/test/java/com/yahoo/vespa/model/admin/AdminTestCase.java +++ b/config-model/src/test/java/com/yahoo/vespa/model/admin/AdminTestCase.java @@ -18,6 +18,7 @@ import com.yahoo.container.StatisticsConfig; import com.yahoo.container.jdisc.config.HealthMonitorConfig; import com.yahoo.net.HostName; import com.yahoo.vespa.config.core.StateserverConfig; +import com.yahoo.vespa.model.Service; import com.yahoo.vespa.model.VespaModel; import com.yahoo.vespa.model.container.ApplicationContainerCluster; import com.yahoo.vespa.model.container.component.Component; @@ -28,7 +29,6 @@ import org.junit.Test; import java.util.Set; -import static com.yahoo.config.model.api.container.ContainerServiceType.METRICS_PROXY_CONTAINER; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -94,6 +94,9 @@ public class AdminTestCase { LogdConfig lc = new LogdConfig(lb); assertEquals(lc.logserver().host(), localhost); + Service logserver = vespaModel.getService("admin/logserver").get(); + assertEquals(logserver.getRelativePort(0), lc.logserver().rpcport()); + // Verify services in the sentinel config SentinelConfig.Builder b = new SentinelConfig.Builder(); vespaModel.getConfig(b, localhostConfigId); diff --git a/configdefinitions/src/vespa/lb-services.def b/configdefinitions/src/vespa/lb-services.def index 33c568061fe..189b39461ec 100644 --- a/configdefinitions/src/vespa/lb-services.def +++ b/configdefinitions/src/vespa/lb-services.def @@ -7,6 +7,7 @@ namespace=cloud.config # Active rotation given as flag 'active' for a prod region in deployment.xml # Default true for now (since code in config-model to set it is not ready yet), should have no default value tenants{}.applications{}.activeRotation bool default=true +tenants{}.applications{}.upstreamHttps bool default=false tenants{}.applications{}.hosts{}.hostname string default="(unknownhostname)" tenants{}.applications{}.hosts{}.services{}.type string default="(noservicetype)" diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/SuperModelManager.java b/configserver/src/main/java/com/yahoo/vespa/config/server/SuperModelManager.java index 80818aea2e8..eeae770da43 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/SuperModelManager.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/SuperModelManager.java @@ -14,6 +14,7 @@ import com.yahoo.config.provision.Zone; import com.yahoo.vespa.config.GenerationCounter; import com.yahoo.vespa.config.server.application.ApplicationSet; import com.yahoo.vespa.config.server.model.SuperModelConfigProvider; +import com.yahoo.vespa.flags.FlagSource; import java.time.Instant; import java.util.ArrayList; @@ -28,6 +29,7 @@ public class SuperModelManager implements SuperModelProvider { private final Zone zone; private final Object monitor = new Object(); + private final FlagSource flagSource; private SuperModelConfigProvider superModelConfigProvider; // Guarded by 'this' monitor private final List<SuperModelListener> listeners = new ArrayList<>(); // Guarded by 'this' monitor @@ -39,7 +41,9 @@ public class SuperModelManager implements SuperModelProvider { @Inject public SuperModelManager(ConfigserverConfig configserverConfig, NodeFlavors nodeFlavors, - GenerationCounter generationCounter) { + GenerationCounter generationCounter, + FlagSource flagSource) { + this.flagSource = flagSource; this.zone = new Zone(configserverConfig, nodeFlavors); this.generationCounter = generationCounter; this.masterGeneration = configserverConfig.masterGeneration(); @@ -107,6 +111,6 @@ public class SuperModelManager implements SuperModelProvider { private void makeNewSuperModelConfigProvider(SuperModel newSuperModel) { generation = masterGeneration + generationCounter.get(); - superModelConfigProvider = new SuperModelConfigProvider(newSuperModel, zone); + superModelConfigProvider = new SuperModelConfigProvider(newSuperModel, zone, flagSource); } } diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/model/LbServicesProducer.java b/configserver/src/main/java/com/yahoo/vespa/config/server/model/LbServicesProducer.java index ceeea197440..1503e4d2397 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/model/LbServicesProducer.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/model/LbServicesProducer.java @@ -9,6 +9,10 @@ import com.yahoo.config.model.api.ServiceInfo; import com.yahoo.config.provision.ApplicationId; import com.yahoo.config.provision.TenantName; import com.yahoo.config.provision.Zone; +import com.yahoo.vespa.flags.BooleanFlag; +import com.yahoo.vespa.flags.FetchVector; +import com.yahoo.vespa.flags.FlagSource; +import com.yahoo.vespa.flags.Flags; import java.util.Collections; import java.util.Comparator; @@ -31,10 +35,12 @@ public class LbServicesProducer implements LbServicesConfig.Producer { private final Map<TenantName, Set<ApplicationInfo>> models; private final Zone zone; + private final BooleanFlag useHttpsLoadBalancerUpstream; - public LbServicesProducer(Map<TenantName, Set<ApplicationInfo>> models, Zone zone) { + public LbServicesProducer(Map<TenantName, Set<ApplicationInfo>> models, Zone zone, FlagSource flagSource) { this.models = models; this.zone = zone; + this.useHttpsLoadBalancerUpstream = Flags.USE_HTTPS_LOAD_BALANCER_UPSTREAM.bindTo(flagSource); } @Override @@ -61,6 +67,7 @@ public class LbServicesProducer implements LbServicesConfig.Producer { private LbServicesConfig.Tenants.Applications.Builder getAppConfig(ApplicationInfo app) { LbServicesConfig.Tenants.Applications.Builder ab = new LbServicesConfig.Tenants.Applications.Builder(); ab.activeRotation(getActiveRotation(app)); + ab.upstreamHttps(useHttpsLoadBalancerUpstream(app)); app.getModel().getHosts().stream() .sorted((a, b) -> a.getHostname().compareTo(b.getHostname())) .forEach(hostInfo -> ab.hosts(hostInfo.getHostname(), getHostsConfig(hostInfo))); @@ -81,6 +88,10 @@ public class LbServicesProducer implements LbServicesConfig.Producer { return activeRotation; } + private boolean useHttpsLoadBalancerUpstream(ApplicationInfo app) { + return useHttpsLoadBalancerUpstream.with(FetchVector.Dimension.APPLICATION_ID, app.getApplicationId().serializedForm()).value(); + } + private LbServicesConfig.Tenants.Applications.Hosts.Builder getHostsConfig(HostInfo hostInfo) { LbServicesConfig.Tenants.Applications.Hosts.Builder hb = new LbServicesConfig.Tenants.Applications.Hosts.Builder(); hb.hostname(hostInfo.getHostname()); diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/model/SuperModelConfigProvider.java b/configserver/src/main/java/com/yahoo/vespa/config/server/model/SuperModelConfigProvider.java index 4a75414c272..75f53667c4a 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/model/SuperModelConfigProvider.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/model/SuperModelConfigProvider.java @@ -11,6 +11,7 @@ import com.yahoo.config.provision.ApplicationId; import com.yahoo.config.provision.Zone; import com.yahoo.vespa.config.ConfigKey; import com.yahoo.vespa.config.ConfigPayload; +import com.yahoo.vespa.flags.FlagSource; import java.util.Collections; import java.util.Map; @@ -26,9 +27,9 @@ public class SuperModelConfigProvider implements LbServicesConfig.Producer, Rout private final LbServicesProducer lbProd; private final RoutingProducer zoneProd; - public SuperModelConfigProvider(SuperModel superModel, Zone zone) { + public SuperModelConfigProvider(SuperModel superModel, Zone zone, FlagSource flagSource) { this.superModel = superModel; - this.lbProd = new LbServicesProducer(Collections.unmodifiableMap(superModel.getModelsPerTenant()), zone); + this.lbProd = new LbServicesProducer(Collections.unmodifiableMap(superModel.getModelsPerTenant()), zone, flagSource); this.zoneProd = new RoutingProducer(Collections.unmodifiableMap(superModel.getModelsPerTenant())); } diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/SuperModelControllerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/SuperModelControllerTest.java index 97496bd4177..c6f6be5fbab 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/SuperModelControllerTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/SuperModelControllerTest.java @@ -22,6 +22,7 @@ import com.yahoo.vespa.config.protocol.JRTServerConfigRequestV3; import com.yahoo.vespa.config.protocol.Trace; import com.yahoo.vespa.config.server.model.SuperModelConfigProvider; import com.yahoo.vespa.config.server.rpc.UncompressedConfigResponseFactory; +import com.yahoo.vespa.flags.InMemoryFlagSource; import com.yahoo.vespa.model.VespaModel; import org.junit.Before; import org.junit.Test; @@ -55,7 +56,7 @@ public class SuperModelControllerTest { ApplicationName.from("foo"), InstanceName.defaultName()); models.put(app, new ApplicationInfo(app, 4l, new VespaModel(FilesApplicationPackage.fromFile(testApp)))); SuperModel superModel = new SuperModel(models); - handler = new SuperModelController(new SuperModelConfigProvider(superModel, Zone.defaultZone()), new TestConfigDefinitionRepo(), 2, new UncompressedConfigResponseFactory()); + handler = new SuperModelController(new SuperModelConfigProvider(superModel, Zone.defaultZone(), new InMemoryFlagSource()), new TestConfigDefinitionRepo(), 2, new UncompressedConfigResponseFactory()); } @Test @@ -98,7 +99,7 @@ public class SuperModelControllerTest { models.put(tooAdvanced, createApplicationInfo(testApp3, tooAdvanced, 4l)); SuperModel superModel = new SuperModel(models); - SuperModelController han = new SuperModelController(new SuperModelConfigProvider(superModel, Zone.defaultZone()), new TestConfigDefinitionRepo(), 2, new UncompressedConfigResponseFactory()); + SuperModelController han = new SuperModelController(new SuperModelConfigProvider(superModel, Zone.defaultZone(), new InMemoryFlagSource()), new TestConfigDefinitionRepo(), 2, new UncompressedConfigResponseFactory()); LbServicesConfig.Builder lb = new LbServicesConfig.Builder(); han.getSuperModel().getConfig(lb); LbServicesConfig lbc = new LbServicesConfig(lb); @@ -126,7 +127,7 @@ public class SuperModelControllerTest { models.put(tooAdvanced, createApplicationInfo(testApp3, tooAdvanced, 4l)); SuperModel superModel = new SuperModel(models); - SuperModelController han = new SuperModelController(new SuperModelConfigProvider(superModel, Zone.defaultZone()), new TestConfigDefinitionRepo(), 2, new UncompressedConfigResponseFactory()); + SuperModelController han = new SuperModelController(new SuperModelConfigProvider(superModel, Zone.defaultZone(), new InMemoryFlagSource()), new TestConfigDefinitionRepo(), 2, new UncompressedConfigResponseFactory()); LbServicesConfig.Builder lb = new LbServicesConfig.Builder(); han.getSuperModel().getConfig(lb); LbServicesConfig lbc = new LbServicesConfig(lb); diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/SuperModelRequestHandlerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/SuperModelRequestHandlerTest.java index 3288b418bb1..35ad97b7b43 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/SuperModelRequestHandlerTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/SuperModelRequestHandlerTest.java @@ -11,6 +11,7 @@ import com.yahoo.config.provision.ApplicationId; import com.yahoo.vespa.config.server.application.ApplicationSet; import com.yahoo.vespa.config.server.monitoring.MetricUpdater; import com.yahoo.vespa.curator.mock.MockCurator; +import com.yahoo.vespa.flags.InMemoryFlagSource; import com.yahoo.vespa.model.VespaModel; import org.junit.Before; @@ -45,7 +46,7 @@ public class SuperModelRequestHandlerTest { public void setup() { counter = new SuperModelGenerationCounter(new MockCurator()); ConfigserverConfig configserverConfig = new ConfigserverConfig(new ConfigserverConfig.Builder()); - manager = new SuperModelManager(configserverConfig, emptyNodeFlavors(), counter); + manager = new SuperModelManager(configserverConfig, emptyNodeFlavors(), counter, new InMemoryFlagSource()); controller = new SuperModelRequestHandler(new TestConfigDefinitionRepo(), configserverConfig, manager); } @@ -97,7 +98,7 @@ public class SuperModelRequestHandlerTest { ApplicationId foo = applicationId("a", "foo"); long masterGen = 10; ConfigserverConfig configserverConfig = new ConfigserverConfig(new ConfigserverConfig.Builder().masterGeneration(masterGen)); - manager = new SuperModelManager(configserverConfig, emptyNodeFlavors(), counter); + manager = new SuperModelManager(configserverConfig, emptyNodeFlavors(), counter, new InMemoryFlagSource()); controller = new SuperModelRequestHandler(new TestConfigDefinitionRepo(), configserverConfig, manager); long gen = counter.increment(); diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/model/LbServicesProducerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/model/LbServicesProducerTest.java index 352245757ca..0e124addaf7 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/model/LbServicesProducerTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/model/LbServicesProducerTest.java @@ -16,6 +16,8 @@ import com.yahoo.config.provision.Rotation; import com.yahoo.config.provision.TenantName; import com.yahoo.config.provision.Zone; import com.yahoo.vespa.config.ConfigPayload; +import com.yahoo.vespa.flags.Flags; +import com.yahoo.vespa.flags.InMemoryFlagSource; import com.yahoo.vespa.model.VespaModel; import org.junit.Test; import org.xml.sax.SAXException; @@ -44,6 +46,7 @@ public class LbServicesProducerTest { private static final String rotation2 = "rotation-2"; private static final String rotationString = rotation1 + "," + rotation2; private static final Set<Rotation> rotations = Collections.singleton(new Rotation(rotationString)); + private final InMemoryFlagSource flagSource = new InMemoryFlagSource(); @Test public void testDeterministicGetConfig() throws IOException, SAXException { @@ -87,6 +90,22 @@ public class LbServicesProducerTest { } } + @Test + public void https_upstream_is_configured_from_feature_flag() throws IOException, SAXException { + { + flagSource.withBooleanFlag(Flags.USE_HTTPS_LOAD_BALANCER_UPSTREAM.id(), true); + RegionName regionName = RegionName.from("us-east-1"); + LbServicesConfig conf = createModelAndGetLbServicesConfig(regionName); + assertTrue(conf.tenants("foo").applications("foo:prod:" + regionName.value() + ":default").upstreamHttps()); + } + { + flagSource.withBooleanFlag(Flags.USE_HTTPS_LOAD_BALANCER_UPSTREAM.id(), false); + RegionName regionName = RegionName.from("us-east-2"); + LbServicesConfig conf = createModelAndGetLbServicesConfig(regionName); + assertFalse(conf.tenants("foo").applications("foo:prod:" + regionName.value() + ":default").upstreamHttps()); + } + } + private LbServicesConfig createModelAndGetLbServicesConfig(RegionName regionName) throws IOException, SAXException { Zone zone = new Zone(Environment.prod, regionName); Map<TenantName, Set<ApplicationInfo>> testModel = createTestModel(new DeployState.Builder() @@ -96,7 +115,7 @@ public class LbServicesProducerTest { } private LbServicesConfig getLbServicesConfig(Zone zone, Map<TenantName, Set<ApplicationInfo>> testModel) { - LbServicesProducer producer = new LbServicesProducer(testModel, zone); + LbServicesProducer producer = new LbServicesProducer(testModel, zone, flagSource); LbServicesConfig.Builder builder = new LbServicesConfig.Builder(); producer.getConfig(builder); return new LbServicesConfig(builder); diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/RpcTester.java b/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/RpcTester.java index 43a2c01f26a..dd66f720b1f 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/RpcTester.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/rpc/RpcTester.java @@ -18,6 +18,7 @@ import com.yahoo.vespa.config.server.host.HostRegistries; import com.yahoo.vespa.config.server.monitoring.Metrics; import com.yahoo.vespa.config.server.tenant.MockTenantProvider; import com.yahoo.vespa.config.server.tenant.TenantHandlerProvider; +import com.yahoo.vespa.flags.InMemoryFlagSource; import org.junit.After; import org.junit.rules.TemporaryFolder; @@ -93,7 +94,8 @@ public class RpcTester implements AutoCloseable { new SuperModelManager( configserverConfig, emptyNodeFlavors(), - generationCounter)), + generationCounter, + new InMemoryFlagSource())), Metrics.createTestMetrics(), new HostRegistries(), hostLivenessTracker, new FileServer(temporaryFolder.newFolder())); rpcServer.onTenantCreate(TenantName.from("default"), tenantProvider); diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java index dccda0bf733..df72720a46c 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java @@ -52,7 +52,6 @@ import java.util.logging.Logger; * * @author baldersheim */ -@SuppressWarnings("deprecation") public abstract class VespaBackEndSearcher extends PingableSearcher { static final CompoundName PACKET_COMPRESSION_LIMIT = new CompoundName("packetcompressionlimit"); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index cf5bcedcf51..74d9c38b273 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -48,7 +48,7 @@ public class Dispatcher extends AbstractComponent { private static final CompoundName dispatchInternal = new CompoundName("dispatch.internal"); /** If enabled, search queries will use protobuf rpc */ - private static final CompoundName dispatchProtobuf = new CompoundName("dispatch.protobuf"); + public static final CompoundName dispatchProtobuf = new CompoundName("dispatch.protobuf"); /** A model of the search cluster this dispatches to */ private final SearchCluster searchCluster; diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java index 019e07221a6..4422538cff6 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java @@ -18,42 +18,46 @@ interface Client { int uncompressedLength, byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds); - void search(NodeConnection node, CompressionType compression, - int uncompressedLength, byte[] compressedPayload, RpcSearchInvoker responseReceiver, - double timeoutSeconds); + void request(String rpcMethod, NodeConnection node, CompressionType compression, int uncompressedLength, + byte[] compressedPayload, ResponseReceiver responseReceiver, double timeoutSeconds); /** Creates a connection to a particular node in this */ NodeConnection createConnection(String hostname, int port); - class GetDocsumsResponseOrError { + interface ResponseReceiver { + void receive(ResponseOrError<ProtobufResponse> response); + } + + class ResponseOrError<T> { + final Optional<T> response; + final Optional<String> error; - // One of these will be non empty and the other not - private Optional<GetDocsumsResponse> response; - private Optional<String> error; + public static <T> ResponseOrError<T> fromResponse(T response) { + return new ResponseOrError<>(response); + } - public static GetDocsumsResponseOrError fromResponse(GetDocsumsResponse response) { - return new GetDocsumsResponseOrError(Optional.of(response), Optional.empty()); + public static <T> ResponseOrError<T> fromError(String error) { + return new ResponseOrError<T>(error); } - public static GetDocsumsResponseOrError fromError(String error) { - return new GetDocsumsResponseOrError(Optional.empty(), Optional.of(error)); + ResponseOrError(T response) { + this.response = Optional.of(response); + this.error = Optional.empty(); } - private GetDocsumsResponseOrError(Optional<GetDocsumsResponse> response, Optional<String> error) { - this.response = response; - this.error = error; + ResponseOrError(String error) { + this.response = Optional.empty(); + this.error = Optional.of(error); } /** Returns the response, or empty if there is an error */ - public Optional<GetDocsumsResponse> response() { return response; } + public Optional<T> response() { return response; } /** Returns the error or empty if there is a response */ public Optional<String> error() { return error; } - } class GetDocsumsResponse { - private final byte compression; private final int uncompressedSize; private final byte[] compressedSlimeBytes; @@ -91,38 +95,12 @@ interface Client { } - class SearchResponseOrError { - // One of these will be non empty and the other not - private Optional<SearchResponse> response; - private Optional<String> error; - - public static SearchResponseOrError fromResponse(SearchResponse response) { - return new SearchResponseOrError(Optional.of(response), Optional.empty()); - } - - public static SearchResponseOrError fromError(String error) { - return new SearchResponseOrError(Optional.empty(), Optional.of(error)); - } - - private SearchResponseOrError(Optional<SearchResponse> response, Optional<String> error) { - this.response = response; - this.error = error; - } - - /** Returns the response, or empty if there is an error */ - public Optional<SearchResponse> response() { return response; } - - /** Returns the error or empty if there is a response */ - public Optional<String> error() { return error; } - - } - - class SearchResponse { + class ProtobufResponse { private final byte compression; private final int uncompressedSize; private final byte[] compressedPayload; - public SearchResponse(byte compression, int uncompressedSize, byte[] compressedPayload) { + public ProtobufResponse(byte compression, int uncompressedSize, byte[] compressedPayload) { this.compression = compression; this.uncompressedSize = uncompressedSize; this.compressedPayload = compressedPayload; diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/MapConverter.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/MapConverter.java index 817ecfe0091..74828dd6740 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/MapConverter.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/MapConverter.java @@ -10,46 +10,42 @@ import com.yahoo.tensor.serialization.TypedBinaryFormat; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.function.Consumer; /** * @author ollivir */ public class MapConverter { - @FunctionalInterface - public interface PropertyInserter<T> { - void add(T prop); - } - - public static void convertMapTensors(Map<String, Object> map, PropertyInserter<TensorProperty.Builder> inserter) { + public static void convertMapTensors(Map<String, Object> map, Consumer<TensorProperty.Builder> inserter) { for (var entry : map.entrySet()) { var value = entry.getValue(); if (value instanceof Tensor) { byte[] tensor = TypedBinaryFormat.encode((Tensor) value); - inserter.add(TensorProperty.newBuilder().setName(entry.getKey()).setValue(ByteString.copyFrom(tensor))); + inserter.accept(TensorProperty.newBuilder().setName(entry.getKey()).setValue(ByteString.copyFrom(tensor))); } } } - public static void convertMapStrings(Map<String, Object> map, PropertyInserter<StringProperty.Builder> inserter) { + public static void convertMapStrings(Map<String, Object> map, Consumer<StringProperty.Builder> inserter) { for (var entry : map.entrySet()) { var value = entry.getValue(); if (!(value instanceof Tensor)) { - inserter.add(StringProperty.newBuilder().setName(entry.getKey()).addValues(value.toString())); + inserter.accept(StringProperty.newBuilder().setName(entry.getKey()).addValues(value.toString())); } } } - public static void convertStringMultiMap(Map<String, List<String>> map, PropertyInserter<StringProperty.Builder> inserter) { + public static void convertStringMultiMap(Map<String, List<String>> map, Consumer<StringProperty.Builder> inserter) { for (var entry : map.entrySet()) { var values = entry.getValue(); if (values != null) { - inserter.add(StringProperty.newBuilder().setName(entry.getKey()).addAllValues(values)); + inserter.accept(StringProperty.newBuilder().setName(entry.getKey()).addAllValues(values)); } } } - public static void convertMultiMap(Map<String, List<Object>> map, PropertyInserter<StringProperty.Builder> stringInserter, - PropertyInserter<TensorProperty.Builder> tensorInserter) { + public static void convertMultiMap(Map<String, List<Object>> map, Consumer<StringProperty.Builder> stringInserter, + Consumer<TensorProperty.Builder> tensorInserter) { for (var entry : map.entrySet()) { if (entry.getValue() != null) { var key = entry.getKey(); @@ -58,14 +54,14 @@ public class MapConverter { if (value != null) { if (value instanceof Tensor) { byte[] tensor = TypedBinaryFormat.encode((Tensor) value); - tensorInserter.add(TensorProperty.newBuilder().setName(key).setValue(ByteString.copyFrom(tensor))); + tensorInserter.accept(TensorProperty.newBuilder().setName(key).setValue(ByteString.copyFrom(tensor))); } else { stringValues.add(value.toString()); } } } if (!stringValues.isEmpty()) { - stringInserter.add(StringProperty.newBuilder().setName(key).addAllValues(stringValues)); + stringInserter.accept(StringProperty.newBuilder().setName(key).addAllValues(stringValues)); } } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java index 1c1c9ccd115..9903aacdda0 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java @@ -1,7 +1,8 @@ package com.yahoo.search.dispatch.rpc; import ai.vespa.searchlib.searchprotocol.protobuf.SearchProtocol; -import ai.vespa.searchlib.searchprotocol.protobuf.SearchProtocol.SearchRequest.Builder; +import ai.vespa.searchlib.searchprotocol.protobuf.SearchProtocol.StringProperty; +import ai.vespa.searchlib.searchprotocol.protobuf.SearchProtocol.TensorProperty; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import com.yahoo.document.GlobalId; @@ -15,11 +16,10 @@ import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.grouping.vespa.GroupingExecutor; import com.yahoo.search.query.Model; +import com.yahoo.search.query.QueryTree; import com.yahoo.search.query.Ranking; import com.yahoo.search.query.Sorting; import com.yahoo.search.query.Sorting.Order; -import com.yahoo.search.query.ranking.RankFeatures; -import com.yahoo.search.query.ranking.RankProperties; import com.yahoo.search.result.Coverage; import com.yahoo.search.result.Relevance; import com.yahoo.searchlib.aggregation.Grouping; @@ -28,31 +28,24 @@ import com.yahoo.vespa.objects.BufferSerializer; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.function.Consumer; public class ProtobufSerialization { private static final int INITIAL_SERIALIZATION_BUFFER_SIZE = 10 * 1024; - public static byte[] serializeQuery(Query query, String serverId, boolean includeQueryData) { - return convertFromQuery(query, serverId, includeQueryData).toByteArray(); + public static byte[] serializeSearchRequest(Query query, String serverId) { + return convertFromQuery(query, serverId).toByteArray(); } - public static byte[] serializeResult(Result searchResult) { - return convertFromResult(searchResult).toByteArray(); - } - - public static Result deserializeToResult(byte[] payload, Query query, VespaBackEndSearcher searcher) - throws InvalidProtocolBufferException { - var protobuf = SearchProtocol.SearchReply.parseFrom(payload); - var result = convertToResult(query, protobuf, searcher.getDocumentDatabase(query)); - return result; - } - - private static SearchProtocol.SearchRequest convertFromQuery(Query query, String serverId, boolean includeQueryData) { + private static SearchProtocol.SearchRequest convertFromQuery(Query query, String serverId) { var builder = SearchProtocol.SearchRequest.newBuilder().setHits(query.getHits()).setOffset(query.getOffset()) .setTimeout((int) query.getTimeLeft()); - mergeToRequestFromRanking(query.getRanking(), builder, includeQueryData); - mergeToRequestFromModel(query.getModel(), builder); + var documentDb = query.getModel().getDocumentDb(); + if (documentDb != null) { + builder.setDocumentType(documentDb); + } + builder.setQueryTreeBlob(serializeQueryTree(query.getModel().getQueryTree())); if (query.getGroupingSessionCache() || query.getRanking().getQueryCache()) { // TODO verify that the session key is included whenever rank properties would have been @@ -71,71 +64,101 @@ public class ProtobufSerialization { gbuf.getBuf().flip(); builder.setGroupingBlob(ByteString.copyFrom(gbuf.getBuf().getByteBuffer())); } - if (query.getGroupingSessionCache()) { builder.setCacheGrouping(true); } + mergeToSearchRequestFromRanking(query.getRanking(), builder); + return builder.build(); } - private static void mergeToRequestFromModel(Model model, SearchProtocol.SearchRequest.Builder builder) { - if (model.getDocumentDb() != null) { - builder.setDocumentType(model.getDocumentDb()); + private static void mergeToSearchRequestFromRanking(Ranking ranking, SearchProtocol.SearchRequest.Builder builder) { + builder.setRankProfile(ranking.getProfile()); + + if (ranking.getQueryCache()) { + builder.setCacheQuery(true); } - int bufferSize = INITIAL_SERIALIZATION_BUFFER_SIZE; - boolean success = false; - while (!success) { - try { - ByteBuffer treeBuffer = ByteBuffer.allocate(bufferSize); - model.getQueryTree().encode(treeBuffer); - treeBuffer.flip(); - builder.setQueryTreeBlob(ByteString.copyFrom(treeBuffer)); - success = true; - } catch (java.nio.BufferOverflowException e) { - bufferSize *= 2; - } + if (ranking.getSorting() != null) { + mergeToSearchRequestFromSorting(ranking.getSorting(), builder); } + if (ranking.getLocation() != null) { + builder.setGeoLocation(ranking.getLocation().toString()); + } + + var featureMap = ranking.getFeatures().asMap(); + MapConverter.convertMapStrings(featureMap, builder::addFeatureOverrides); + MapConverter.convertMapTensors(featureMap, builder::addTensorFeatureOverrides); + mergeRankProperties(ranking, builder::addRankProperties, builder::addTensorRankProperties); } - private static void mergeToRequestFromSorting(Sorting sorting, SearchProtocol.SearchRequest.Builder builder, boolean includeQueryData) { + private static void mergeToSearchRequestFromSorting(Sorting sorting, SearchProtocol.SearchRequest.Builder builder) { for (var field : sorting.fieldOrders()) { - var sortField = SearchProtocol.SortField.newBuilder().setField(field.getSorter().getName()) + var sortField = SearchProtocol.SortField.newBuilder() + .setField(field.getSorter().getName()) .setAscending(field.getSortOrder() == Order.ASCENDING).build(); builder.addSorting(sortField); } } - private static void mergeToRequestFromRanking(Ranking ranking, SearchProtocol.SearchRequest.Builder builder, boolean includeQueryData) { - builder.setRankProfile(ranking.getProfile()); - if (ranking.getQueryCache()) { - builder.setCacheQuery(true); + public static SearchProtocol.DocsumRequest.Builder createDocsumRequestBuilder(Query query, String serverId, String summaryClass, + boolean includeQueryData) { + var builder = SearchProtocol.DocsumRequest.newBuilder() + .setTimeout((int) query.getTimeLeft()) + .setDumpFeatures(query.properties().getBoolean(Ranking.RANKFEATURES, false)); + + if (summaryClass != null) { + builder.setSummaryClass(summaryClass); } - if (ranking.getSorting() != null) { - mergeToRequestFromSorting(ranking.getSorting(), builder, includeQueryData); + + var documentDb = query.getModel().getDocumentDb(); + if (documentDb != null) { + builder.setDocumentType(documentDb); } - if (ranking.getLocation() != null) { - builder.setGeoLocation(ranking.getLocation().toString()); + + var ranking = query.getRanking(); + if (ranking.getQueryCache()) { + builder.setCacheQuery(true); + builder.setSessionKey(query.getSessionId(serverId).toString()); } - mergeToRequestFromRankFeatures(ranking.getFeatures(), builder, includeQueryData); - mergeToRequestFromRankProperties(ranking.getProperties(), builder, includeQueryData); - } + builder.setRankProfile(query.getRanking().getProfile()); - private static void mergeToRequestFromRankFeatures(RankFeatures features, SearchProtocol.SearchRequest.Builder builder, boolean includeQueryData) { if (includeQueryData) { - MapConverter.convertMapStrings(features.asMap(), builder::addFeatureOverrides); - MapConverter.convertMapTensors(features.asMap(), builder::addTensorFeatureOverrides); + mergeQueryDataToDocsumRequest(query, builder); } + + return builder; } - private static void mergeToRequestFromRankProperties(RankProperties properties, Builder builder, boolean includeQueryData) { - if (includeQueryData) { - MapConverter.convertMultiMap(properties.asMap(), propB -> { - if (!GetDocSumsPacket.sessionIdKey.equals(propB.getName())) { - builder.addRankProperties(propB); - } - }, builder::addTensorRankProperties); + public static byte[] serializeDocsumRequest(SearchProtocol.DocsumRequest.Builder builder, List<FastHit> documents) { + builder.clearGlobalIds(); + for (var hit : documents) { + builder.addGlobalIds(ByteString.copyFrom(hit.getGlobalId().getRawId())); } + return builder.build().toByteArray(); + } + + private static void mergeQueryDataToDocsumRequest(Query query, SearchProtocol.DocsumRequest.Builder builder) { + var ranking = query.getRanking(); + var featureMap = ranking.getFeatures().asMap(); + + builder.setQueryTreeBlob(serializeQueryTree(query.getModel().getQueryTree())); + builder.setGeoLocation(ranking.getLocation().toString()); + MapConverter.convertMapStrings(featureMap, builder::addFeatureOverrides); + MapConverter.convertMapTensors(featureMap, builder::addTensorFeatureOverrides); + MapConverter.convertStringMultiMap(query.getPresentation().getHighlight().getHighlightTerms(), builder::addHighlightTerms); + mergeRankProperties(ranking, builder::addRankProperties, builder::addTensorRankProperties); + } + + public static byte[] serializeResult(Result searchResult) { + return convertFromResult(searchResult).toByteArray(); + } + + public static Result deserializeToSearchResult(byte[] payload, Query query, VespaBackEndSearcher searcher) + throws InvalidProtocolBufferException { + var protobuf = SearchProtocol.SearchReply.parseFrom(payload); + var result = convertToResult(query, protobuf, searcher.getDocumentDatabase(query)); + return result; } private static Result convertToResult(Query query, SearchProtocol.SearchReply protobuf, DocumentDatabase documentDatabase) { @@ -211,4 +234,26 @@ public class ProtobufSerialization { return builder.build(); } + private static ByteString serializeQueryTree(QueryTree queryTree) { + int bufferSize = INITIAL_SERIALIZATION_BUFFER_SIZE; + while (true) { + try { + ByteBuffer treeBuffer = ByteBuffer.allocate(bufferSize); + queryTree.encode(treeBuffer); + treeBuffer.flip(); + return ByteString.copyFrom(treeBuffer); + } catch (java.nio.BufferOverflowException e) { + bufferSize *= 2; + } + } + } + + private static void mergeRankProperties(Ranking ranking, Consumer<StringProperty.Builder> stringProperties, + Consumer<TensorProperty.Builder> tensorProperties) { + MapConverter.convertMultiMap(ranking.getProperties().asMap(), propB -> { + if (!GetDocSumsPacket.sessionIdKey.equals(propB.getName())) { + stringProperties.accept(propB); + } + }, tensorProperties); + } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java index 32a7917d43c..2aa01b05955 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java @@ -22,7 +22,6 @@ import java.util.List; * @author bratseth */ class RpcClient implements Client { - private final Supervisor supervisor = new Supervisor(new Transport()); @Override @@ -44,15 +43,15 @@ class RpcClient implements Client { } @Override - public void search(NodeConnection node, CompressionType compression, int uncompressedLength, byte[] compressedPayload, - RpcSearchInvoker responseReceiver, double timeoutSeconds) { - Request request = new Request("vespa.searchprotocol.search"); + public void request(String rpcMethod, NodeConnection node, CompressionType compression, int uncompressedLength, byte[] compressedPayload, + ResponseReceiver responseReceiver, double timeoutSeconds) { + Request request = new Request(rpcMethod); request.parameters().add(new Int8Value(compression.getCode())); request.parameters().add(new Int32Value(uncompressedLength)); request.parameters().add(new DataValue(compressedPayload)); RpcNodeConnection rpcNode = ((RpcNodeConnection) node); - rpcNode.invokeAsync(request, timeoutSeconds, new RpcSearchResponseWaiter(rpcNode, responseReceiver)); + rpcNode.invokeAsync(request, timeoutSeconds, new RpcProtobufResponseWaiter(rpcNode, responseReceiver)); } private static class RpcNodeConnection implements NodeConnection { @@ -111,40 +110,37 @@ class RpcClient implements Client { @Override public void handleRequestDone(Request requestWithResponse) { if (requestWithResponse.isError()) { - handler.receive(GetDocsumsResponseOrError.fromError("Error response from " + node + ": " + - requestWithResponse.errorMessage())); + handler.receive(ResponseOrError.fromError("Error response from " + node + ": " + requestWithResponse.errorMessage())); return; } Values returnValues = requestWithResponse.returnValues(); if (returnValues.size() < 3) { - handler.receive(GetDocsumsResponseOrError.fromError("Invalid getDocsums response from " + node + - ": Expected 3 return arguments, got " + - returnValues.size())); + handler.receive(ResponseOrError.fromError( + "Invalid getDocsums response from " + node + ": Expected 3 return arguments, got " + returnValues.size())); return; } byte compression = returnValues.get(0).asInt8(); int uncompressedSize = returnValues.get(1).asInt32(); byte[] compressedSlimeBytes = returnValues.get(2).asData(); + @SuppressWarnings("unchecked") // TODO: Non-protobuf rpc docsums to be removed soon List<FastHit> hits = (List<FastHit>) requestWithResponse.getContext(); - handler.receive(GetDocsumsResponseOrError.fromResponse(new GetDocsumsResponse(compression, - uncompressedSize, - compressedSlimeBytes, - hits))); + handler.receive( + ResponseOrError.fromResponse(new GetDocsumsResponse(compression, uncompressedSize, compressedSlimeBytes, hits))); } } - private static class RpcSearchResponseWaiter implements RequestWaiter { + private static class RpcProtobufResponseWaiter implements RequestWaiter { /** The node to which we made the request we are waiting for - for error messages only */ private final RpcNodeConnection node; /** The handler to which the response is forwarded */ - private final RpcSearchInvoker handler; + private final ResponseReceiver handler; - public RpcSearchResponseWaiter(RpcNodeConnection node, RpcSearchInvoker handler) { + public RpcProtobufResponseWaiter(RpcNodeConnection node, ResponseReceiver handler) { this.node = node; this.handler = handler; } @@ -152,13 +148,13 @@ class RpcClient implements Client { @Override public void handleRequestDone(Request requestWithResponse) { if (requestWithResponse.isError()) { - handler.receive(SearchResponseOrError.fromError("Error response from " + node + ": " + requestWithResponse.errorMessage())); + handler.receive(ResponseOrError.fromError("Error response from " + node + ": " + requestWithResponse.errorMessage())); return; } Values returnValues = requestWithResponse.returnValues(); if (returnValues.size() < 3) { - handler.receive(SearchResponseOrError.fromError( + handler.receive(ResponseOrError.fromError( "Invalid getDocsums response from " + node + ": Expected 3 return arguments, got " + returnValues.size())); return; } @@ -166,7 +162,7 @@ class RpcClient implements Client { byte compression = returnValues.get(0).asInt8(); int uncompressedSize = returnValues.get(1).asInt32(); byte[] compressedPayload = returnValues.get(2).asData(); - handler.receive(SearchResponseOrError.fromResponse(new SearchResponse(compression, uncompressedSize, compressedPayload))); + handler.receive(ResponseOrError.fromResponse(new ProtobufResponse(compression, uncompressedSize, compressedPayload))); } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java index b7286997514..760f7486923 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java @@ -13,6 +13,7 @@ import com.yahoo.prelude.fastsearch.TimeoutException; import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.dispatch.FillInvoker; +import com.yahoo.search.dispatch.rpc.Client.GetDocsumsResponse; import com.yahoo.search.query.SessionId; import com.yahoo.search.result.ErrorMessage; import com.yahoo.search.result.Hit; @@ -102,7 +103,7 @@ public class RpcFillInvoker extends FillInvoker { Client.NodeConnection node = resourcePool.nodeConnections().get(nodeId); if (node == null) { String error = "Could not fill hits from unknown node " + nodeId; - responseReceiver.receive(Client.GetDocsumsResponseOrError.fromError(error)); + responseReceiver.receive(Client.ResponseOrError.fromError(error)); result.hits().addError(ErrorMessage.createEmptyDocsums(error)); log.warning("Got hits with partid " + nodeId + ", which is not included in the current dispatch config"); return; @@ -143,7 +144,7 @@ public class RpcFillInvoker extends FillInvoker { /** Receiver of the responses to a set of getDocsums requests */ public static class GetDocsumsResponseReceiver { - private final BlockingQueue<Client.GetDocsumsResponseOrError> responses; + private final BlockingQueue<Client.ResponseOrError<GetDocsumsResponse>> responses; private final Compressor compressor; private final Result result; @@ -161,7 +162,7 @@ public class RpcFillInvoker extends FillInvoker { } /** Called by a thread belonging to the client when a valid response becomes available */ - public void receive(Client.GetDocsumsResponseOrError response) { + public void receive(Client.ResponseOrError<GetDocsumsResponse> response) { responses.add(response); } @@ -181,7 +182,7 @@ public class RpcFillInvoker extends FillInvoker { if (timeLeftMs <= 0) { throwTimeout(); } - Client.GetDocsumsResponseOrError response = responses.poll(timeLeftMs, TimeUnit.MILLISECONDS); + Client.ResponseOrError<GetDocsumsResponse> response = responses.poll(timeLeftMs, TimeUnit.MILLISECONDS); if (response == null) throwTimeout(); skippedHits += processResponse(response, summaryClass, documentDb); @@ -197,7 +198,7 @@ public class RpcFillInvoker extends FillInvoker { } } - private int processResponse(Client.GetDocsumsResponseOrError responseOrError, + private int processResponse(Client.ResponseOrError<GetDocsumsResponse> responseOrError, String summaryClass, DocumentDatabase documentDb) { if (responseOrError.error().isPresent()) { diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java index f17e7d63431..c1b164aaeaa 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java @@ -6,6 +6,7 @@ import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; import com.yahoo.processing.request.CompoundName; import com.yahoo.search.Query; import com.yahoo.search.Result; +import com.yahoo.search.dispatch.Dispatcher; import com.yahoo.search.dispatch.FillInvoker; import com.yahoo.search.dispatch.InvokerFactory; import com.yahoo.search.dispatch.SearchInvoker; @@ -36,8 +37,15 @@ public class RpcInvokerFactory extends InvokerFactory { @Override public Optional<FillInvoker> createFillInvoker(VespaBackEndSearcher searcher, Result result) { Query query = result.getQuery(); + + boolean summaryNeedsQuery = searcher.summaryNeedsQuery(query); + + if(query.properties().getBoolean(Dispatcher.dispatchProtobuf, false)) { + return Optional.of(new RpcProtobufFillInvoker(rpcResourcePool, searcher.getDocumentDatabase(query), searcher.getServerId(), + summaryNeedsQuery)); + } if (query.properties().getBoolean(dispatchSummaries, true) - && ! searcher.summaryNeedsQuery(query) + && ! summaryNeedsQuery && query.getRanking().getLocation() == null) { return Optional.of(new RpcFillInvoker(rpcResourcePool, searcher.getDocumentDatabase(query))); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java new file mode 100644 index 00000000000..3ec821beba8 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java @@ -0,0 +1,227 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch.rpc; + +import ai.vespa.searchlib.searchprotocol.protobuf.SearchProtocol; +import com.google.protobuf.InvalidProtocolBufferException; +import com.yahoo.collections.ListMap; +import com.yahoo.collections.Pair; +import com.yahoo.compress.CompressionType; +import com.yahoo.compress.Compressor; +import com.yahoo.container.protect.Error; +import com.yahoo.data.access.Inspector; +import com.yahoo.data.access.slime.SlimeAdapter; +import com.yahoo.prelude.fastsearch.DocumentDatabase; +import com.yahoo.prelude.fastsearch.FastHit; +import com.yahoo.prelude.fastsearch.TimeoutException; +import com.yahoo.search.Query; +import com.yahoo.search.Result; +import com.yahoo.search.dispatch.FillInvoker; +import com.yahoo.search.dispatch.rpc.Client.ProtobufResponse; +import com.yahoo.search.result.ErrorMessage; +import com.yahoo.search.result.Hit; +import com.yahoo.slime.ArrayTraverser; +import com.yahoo.slime.BinaryFormat; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * {@link FillInvoker} implementation using Protobuf over JRT + * + * @author bratseth + * @author ollivir + */ +public class RpcProtobufFillInvoker extends FillInvoker { + private static final String RPC_METHOD = "vespa.searchprotocol.getDocsums"; + + private static final Logger log = Logger.getLogger(RpcProtobufFillInvoker.class.getName()); + + private final DocumentDatabase documentDb; + private final RpcResourcePool resourcePool; + private final boolean summaryNeedsQuery; + private final String serverId; + + private BlockingQueue<Pair<Client.ResponseOrError<ProtobufResponse>, List<FastHit>>> responses; + + /** Whether we have already logged/notified about an error - to avoid spamming */ + private boolean hasReportedError = false; + + /** The number of responses we should receive (and process) before this is complete */ + private int outstandingResponses; + + RpcProtobufFillInvoker(RpcResourcePool resourcePool, DocumentDatabase documentDb, String serverId, boolean summaryNeedsQuery) { + this.documentDb = documentDb; + this.resourcePool = resourcePool; + this.serverId = serverId; + this.summaryNeedsQuery = summaryNeedsQuery; + } + + @Override + protected void sendFillRequest(Result result, String summaryClass) { + ListMap<Integer, FastHit> hitsByNode = hitsByNode(result); + + CompressionType compression = CompressionType + .valueOf(result.getQuery().properties().getString(RpcResourcePool.dispatchCompression, "LZ4").toUpperCase()); + + result.getQuery().trace(false, 5, "Sending ", hitsByNode.size(), " summary fetch requests with jrt/protobuf"); + + outstandingResponses = hitsByNode.size(); + responses = new LinkedBlockingQueue<>(outstandingResponses); + + var builder = ProtobufSerialization.createDocsumRequestBuilder(result.getQuery(), serverId, summaryClass, summaryNeedsQuery); + for (Map.Entry<Integer, List<FastHit>> nodeHits : hitsByNode.entrySet()) { + var payload = ProtobufSerialization.serializeDocsumRequest(builder, nodeHits.getValue()); + sendDocsumsRequest(nodeHits.getKey(), nodeHits.getValue(), payload, compression, result); + } + } + + @Override + protected void getFillResults(Result result, String summaryClass) { + try { + processResponses(result, summaryClass); + result.hits().setSorted(false); + result.analyzeHits(); + } catch (TimeoutException e) { + result.hits().addError(ErrorMessage.createTimeout("Summary data is incomplete: " + e.getMessage())); + } + } + + @Override + protected void release() { + // nothing to release + } + + /** Called by a thread belonging to the client when a valid response becomes available */ + public void receive(Client.ResponseOrError<ProtobufResponse> response, List<FastHit> hitsContext) { + responses.add(new Pair<>(response, hitsContext)); + } + + /** Return a map of hits by their search node (partition) id */ + private static ListMap<Integer, FastHit> hitsByNode(Result result) { + ListMap<Integer, FastHit> hitsByNode = new ListMap<>(); + for (Iterator<Hit> i = result.hits().unorderedDeepIterator(); i.hasNext();) { + Hit h = i.next(); + if (!(h instanceof FastHit)) + continue; + FastHit hit = (FastHit) h; + + hitsByNode.put(hit.getDistributionKey(), hit); + } + return hitsByNode; + } + + /** Send a docsums request to a node. Responses will be added to the given receiver. */ + private void sendDocsumsRequest(int nodeId, List<FastHit> hits, byte[] payload, CompressionType compression, Result result) { + Client.NodeConnection node = resourcePool.nodeConnections().get(nodeId); + if (node == null) { + String error = "Could not fill hits from unknown node " + nodeId; + receive(Client.ResponseOrError.fromError(error), hits); + result.hits().addError(ErrorMessage.createEmptyDocsums(error)); + log.warning("Got hits with partid " + nodeId + ", which is not included in the current dispatch config"); + return; + } + + Query query = result.getQuery(); + double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0; + Compressor.Compression compressionResult = resourcePool.compressor().compress(compression, payload); + resourcePool.client().request(RPC_METHOD, node, compressionResult.type(), payload.length, compressionResult.data(), + roe -> receive(roe, hits), timeoutSeconds); + } + + private void processResponses(Result result, String summaryClass) throws TimeoutException { + try { + int skippedHits = 0; + while (outstandingResponses > 0) { + long timeLeftMs = result.getQuery().getTimeLeft(); + if (timeLeftMs <= 0) { + throwTimeout(); + } + var responseAndHits = responses.poll(timeLeftMs, TimeUnit.MILLISECONDS); + if (responseAndHits == null) { + throwTimeout(); + } + var response = responseAndHits.getFirst(); + var hitsContext = responseAndHits.getSecond(); + skippedHits += processResponse(result, response, hitsContext, summaryClass); + outstandingResponses--; + } + if (skippedHits != 0) { + result.hits().addError(ErrorMessage + .createEmptyDocsums("Missing hit summary data for summary " + summaryClass + " for " + skippedHits + " hits")); + } + } catch (InterruptedException e) { + // TODO: Add error + } + } + + private int processResponse(Result result, Client.ResponseOrError<ProtobufResponse> responseOrError, List<FastHit> hitsContext, + String summaryClass) { + if (responseOrError.error().isPresent()) { + if (hasReportedError) { + return 0; + } + String error = responseOrError.error().get(); + result.hits().addError(ErrorMessage.createBackendCommunicationError(error)); + log.log(Level.WARNING, "Error fetching summary data: " + error); + hasReportedError = true; + } else { + Client.ProtobufResponse response = responseOrError.response().get(); + CompressionType compression = CompressionType.valueOf(response.compression()); + byte[] responseBytes = resourcePool.compressor().decompress(response.compressedPayload(), compression, + response.uncompressedSize()); + return fill(result, hitsContext, summaryClass, responseBytes); + } + return 0; + } + + private void addErrors(Result result, com.yahoo.slime.Inspector errors) { + errors.traverse((ArrayTraverser) (index, value) -> { + int errorCode = ("timeout".equalsIgnoreCase(value.field("type").asString())) ? Error.TIMEOUT.code : Error.UNSPECIFIED.code; + result.hits().addError(new ErrorMessage(errorCode, value.field("message").asString(), value.field("details").asString())); + }); + } + + private int fill(Result result, List<FastHit> hits, String summaryClass, byte[] payload) { + try { + var protobuf = SearchProtocol.DocsumReply.parseFrom(payload); + var root = BinaryFormat.decode(protobuf.getSlimeSummaries().toByteArray()).get(); + var errors = root.field("errors"); + boolean hasErrors = errors.valid() && (errors.entries() > 0); + if (hasErrors) { + addErrors(result, errors); + } + + Inspector summaries = new SlimeAdapter(root.field("docsums")); + if (!summaries.valid()) { + return 0; // No summaries; Perhaps we requested a non-existing summary class + } + int skippedHits = 0; + for (int i = 0; i < hits.size(); i++) { + Inspector summary = summaries.entry(i).field("docsum"); + if (summary.fieldCount() != 0) { + hits.get(i).setField(Hit.SDDOCNAME_FIELD, documentDb.getName()); + hits.get(i).addSummary(documentDb.getDocsumDefinitionSet().getDocsum(summaryClass), summary); + hits.get(i).setFilled(summaryClass); + } else { + skippedHits++; + } + } + return skippedHits; + } catch (InvalidProtocolBufferException ex) { + log.log(Level.WARNING, "Invalid response to docsum request", ex); + result.hits().addError(ErrorMessage.createInternalServerError("Invalid response to docsum request from backend")); + return 0; + } + } + + private void throwTimeout() throws TimeoutException { + throw new TimeoutException("Timed out waiting for summary data. " + outstandingResponses + " responses outstanding."); + } + +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java index 88d77c760e3..e54ac00f821 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java @@ -9,7 +9,7 @@ import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.dispatch.SearchInvoker; -import com.yahoo.search.dispatch.rpc.Client.SearchResponse; +import com.yahoo.search.dispatch.rpc.Client.ProtobufResponse; import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.result.ErrorMessage; import com.yahoo.search.searchchain.Execution; @@ -25,11 +25,13 @@ import java.util.concurrent.TimeUnit; * * @author ollivir */ -public class RpcSearchInvoker extends SearchInvoker { +public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseReceiver { + private static final String RPC_METHOD = "vespa.searchprotocol.search"; + private final VespaBackEndSearcher searcher; private final Node node; private final RpcResourcePool resourcePool; - private final BlockingQueue<Client.SearchResponseOrError> responses; + private final BlockingQueue<Client.ResponseOrError<ProtobufResponse>> responses; private Query query; @@ -50,15 +52,16 @@ public class RpcSearchInvoker extends SearchInvoker { Client.NodeConnection nodeConnection = resourcePool.nodeConnections().get(node.key()); if (nodeConnection == null) { - responses.add(Client.SearchResponseOrError.fromError("Could send search to unknown node " + node.key())); + responses.add(Client.ResponseOrError.fromError("Could not send search to unknown node " + node.key())); responseAvailable(); return; } + query.trace(false, 5, "Sending search request with jrt/protobuf to node with dist key ", node.key()); - var payload = ProtobufSerialization.serializeQuery(query, searcher.getServerId(), true); + var payload = ProtobufSerialization.serializeSearchRequest(query, searcher.getServerId()); double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0; Compressor.Compression compressionResult = resourcePool.compressor().compress(compression, payload); - resourcePool.client().search(nodeConnection, compressionResult.type(), payload.length, compressionResult.data(), this, + resourcePool.client().request(RPC_METHOD, nodeConnection, compressionResult.type(), payload.length, compressionResult.data(), this, timeoutSeconds); } @@ -68,7 +71,7 @@ public class RpcSearchInvoker extends SearchInvoker { if (timeLeftMs <= 0) { return errorResult(query, ErrorMessage.createTimeout("Timeout while waiting for " + getName())); } - Client.SearchResponseOrError response = null; + Client.ResponseOrError<ProtobufResponse> response = null; try { response = responses.poll(timeLeftMs, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { @@ -84,11 +87,11 @@ public class RpcSearchInvoker extends SearchInvoker { return errorResult(query, ErrorMessage.createInternalServerError("Neither error nor result available")); } - SearchResponse searchResponse = response.response().get(); - CompressionType compression = CompressionType.valueOf(searchResponse.compression()); - byte[] payload = resourcePool.compressor().decompress(searchResponse.compressedPayload(), compression, - searchResponse.uncompressedSize()); - var result = ProtobufSerialization.deserializeToResult(payload, query, searcher); + ProtobufResponse protobufResponse = response.response().get(); + CompressionType compression = CompressionType.valueOf(protobufResponse.compression()); + byte[] payload = resourcePool.compressor().decompress(protobufResponse.compressedPayload(), compression, + protobufResponse.uncompressedSize()); + var result = ProtobufSerialization.deserializeToSearchResult(payload, query, searcher); result.hits().unorderedIterator().forEachRemaining(hit -> { if(hit instanceof FastHit) { FastHit fhit = (FastHit) hit; @@ -106,7 +109,7 @@ public class RpcSearchInvoker extends SearchInvoker { // nothing to release } - public void receive(Client.SearchResponseOrError response) { + public void receive(Client.ResponseOrError<ProtobufResponse> response) { responses.add(response); responseAvailable(); } diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/MockClient.java b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/MockClient.java index f9b628e594a..687d3e728c0 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/MockClient.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/MockClient.java @@ -7,9 +7,6 @@ import com.yahoo.document.GlobalId; import com.yahoo.document.idstring.IdIdString; import com.yahoo.prelude.fastsearch.FastHit; import com.yahoo.search.Result; -import com.yahoo.search.dispatch.rpc.Client; -import com.yahoo.search.dispatch.rpc.RpcFillInvoker; -import com.yahoo.search.dispatch.rpc.RpcSearchInvoker; import com.yahoo.slime.ArrayTraverser; import com.yahoo.slime.BinaryFormat; import com.yahoo.slime.Cursor; @@ -44,7 +41,7 @@ public class MockClient implements Client { int uncompressedSize, byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds) { if (malfunctioning) { - responseReceiver.receive(GetDocsumsResponseOrError.fromError("Malfunctioning")); + responseReceiver.receive(ResponseOrError.fromError("Malfunctioning")); return; } @@ -74,25 +71,25 @@ public class MockClient implements Client { Compressor.Compression compressionResult = compressor.compress(compression, slimeBytes); GetDocsumsResponse response = new GetDocsumsResponse(compressionResult.type().getCode(), slimeBytes.length, compressionResult.data(), hitsContext); - responseReceiver.receive(GetDocsumsResponseOrError.fromResponse(response)); + responseReceiver.receive(ResponseOrError.fromResponse(response)); } @Override - public void search(NodeConnection node, CompressionType compression, int uncompressedLength, byte[] compressedPayload, - RpcSearchInvoker responseReceiver, double timeoutSeconds) { + public void request(String rpcMethod, NodeConnection node, CompressionType compression, int uncompressedLength, byte[] compressedPayload, + ResponseReceiver responseReceiver, double timeoutSeconds) { if (malfunctioning) { - responseReceiver.receive(SearchResponseOrError.fromError("Malfunctioning")); + responseReceiver.receive(ResponseOrError.fromError("Malfunctioning")); return; } if(searchResult == null) { - responseReceiver.receive(SearchResponseOrError.fromError("No result defined")); + responseReceiver.receive(ResponseOrError.fromError("No result defined")); return; } var payload = ProtobufSerialization.serializeResult(searchResult); var compressionResult = compressor.compress(compression, payload); - var response = new SearchResponse(compressionResult.type().getCode(), payload.length, compressionResult.data()); - responseReceiver.receive(SearchResponseOrError.fromResponse(response)); + var response = new ProtobufResponse(compressionResult.type().getCode(), payload.length, compressionResult.data()); + responseReceiver.receive(ResponseOrError.fromResponse(response)); } public void setDocsumReponse(String nodeId, int docId, String docsumClass, Map<String, Object> docsumValues) { diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java index 689be53de23..b9d894f9eb5 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java @@ -54,8 +54,8 @@ public class RpcSearchInvokerTest { AtomicInteger lengthHolder) { return new Client() { @Override - public void search(NodeConnection node, CompressionType compression, int uncompressedLength, byte[] compressedPayload, - RpcSearchInvoker responseReceiver, double timeoutSeconds) { + public void request(String rpcMethod, NodeConnection node, CompressionType compression, int uncompressedLength, + byte[] compressedPayload, ResponseReceiver responseReceiver, double timeoutSeconds) { compressionTypeHolder.set(compression); payloadHolder.set(compressedPayload); lengthHolder.set(uncompressedLength); diff --git a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java index e294aee02e7..2e0e15e8a12 100644 --- a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java +++ b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java @@ -113,6 +113,18 @@ public class Flags { "Takes effect at redeployment", APPLICATION_ID); + public static final UnboundBooleanFlag USE_LB_STATUS_FILE = defineFeatureFlag( + "use-lb-status-file", false, + "Serve status.html from the new path", + "Takes effect on restart of Docker container", + APPLICATION_ID, HOSTNAME); + + public static final UnboundBooleanFlag USE_HTTPS_LOAD_BALANCER_UPSTREAM = defineFeatureFlag( + "use-https-load-balancer-upstream", false, + "Use https between load balancer and upstream containers", + "Takes effect at redeployment", + APPLICATION_ID); + /** WARNING: public for testing: All flags should be defined in {@link Flags}. */ public static UnboundBooleanFlag defineFeatureFlag(String flagId, boolean defaultValue, String description, String modificationEffect, FetchVector.Dimension... dimensions) { diff --git a/logd/src/main/resources/configdefinitions/logd.def b/logd/src/main/resources/configdefinitions/logd.def index 570c2e36f99..61150c373aa 100644 --- a/logd/src/main/resources/configdefinitions/logd.def +++ b/logd/src/main/resources/configdefinitions/logd.def @@ -7,10 +7,13 @@ stateport int default=0 ## Host to contact the logserver on. logserver.host string default="localhost" -## Port to contact the logserver on. +## RPC port of logserver. +logserver.rpcport int default=5822 + +## Legacy port to contact the logserver on. logserver.port int default=5821 -## Forward to a logserver. If false, logserver.host and logserver.port are irrelevant +## Forward to a logserver. Other logserver configuration is irrelevant if false. logserver.use bool default=true ## Loglevel config whether they should be stored and/or forwarded diff --git a/storage/src/tests/CMakeLists.txt b/storage/src/tests/CMakeLists.txt index c4436161f28..1894a3fe6a8 100644 --- a/storage/src/tests/CMakeLists.txt +++ b/storage/src/tests/CMakeLists.txt @@ -7,6 +7,7 @@ vespa_add_executable(storage_gtest_runner_app TEST gtest_runner.cpp DEPENDS storage_testbucketmover + storage_gtestdistributor ) vespa_add_test( diff --git a/storage/src/tests/distributor/CMakeLists.txt b/storage/src/tests/distributor/CMakeLists.txt index f58fd37f622..1e9dca3f7f1 100644 --- a/storage/src/tests/distributor/CMakeLists.txt +++ b/storage/src/tests/distributor/CMakeLists.txt @@ -4,7 +4,6 @@ vespa_add_library(storage_testdistributor TEST blockingoperationstartertest.cpp bucketdatabasetest.cpp bucketdbmetricupdatertest.cpp - bucketdbupdatertest.cpp bucketgctimecalculatortest.cpp bucketstateoperationtest.cpp distributor_host_info_reporter_test.cpp @@ -46,3 +45,16 @@ vespa_add_library(storage_testdistributor TEST storage_testcommon storage_testhostreporter ) + +vespa_add_library(storage_gtestdistributor TEST + SOURCES + bucketdbupdatertest.cpp + # Fixture etc. dupes with non-gtest runner : + distributortestutil.cpp + messagesenderstub.cpp + DEPENDS + storage_distributor + storage_testcommon + storage_testhostreporter + gtest +) diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp index 9795f5db5dc..2beae2ac43b 100644 --- a/storage/src/tests/distributor/bucketdbupdatertest.cpp +++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp @@ -1,7 +1,5 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/vdstestlib/cppunit/macros.h> -#include <iomanip> #include <vespa/storageapi/message/persistence.h> #include <vespa/storage/distributor/bucketdbupdater.h> #include <vespa/storage/distributor/distributormetricsset.h> @@ -19,6 +17,9 @@ #include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/vespalib/text/stringtokenizer.h> #include <sstream> +#include <iomanip> + +#include <gtest/gtest.h> using namespace storage::api; using namespace storage::lib; @@ -29,6 +30,8 @@ using document::FixedBucketSpaces; using document::BucketId; using document::Bucket; +using namespace ::testing; + namespace storage::distributor { namespace { @@ -54,145 +57,12 @@ getRequestBucketInfoStrings(uint32_t count) } -class BucketDBUpdaterTest : public CppUnit::TestFixture, +class BucketDBUpdaterTest : public Test, public DistributorTestUtil { - CPPUNIT_TEST_SUITE(BucketDBUpdaterTest); - CPPUNIT_TEST(testNormalUsage); // Make sure that bucketdbupdater sends requests to nodes, send responses back for 3 nodes, check that bucketdb is in correct state - CPPUNIT_TEST(testDistributorChange); - CPPUNIT_TEST(testDistributorChangeWithGrouping); - CPPUNIT_TEST(testNormalUsageInitializing); // Check that we send request bucket info when storage node is initializing, and send another when it's up. - CPPUNIT_TEST(testFailedRequestBucketInfo); - CPPUNIT_TEST(testBitChange); // Check what happens when distribution bits change - CPPUNIT_TEST(testNodeDown); - CPPUNIT_TEST(testStorageNodeInMaintenanceClearsBucketsForNode); - CPPUNIT_TEST(testNodeDownCopiesGetInSync); - CPPUNIT_TEST(testDownWhileInit); - CPPUNIT_TEST(testInitializingWhileRecheck); - CPPUNIT_TEST(testRecheckNode); - CPPUNIT_TEST(testRecheckNodeWithFailure); - CPPUNIT_TEST(testNotifyBucketChange); - CPPUNIT_TEST(testNotifyBucketChangeFromNodeDown); - CPPUNIT_TEST(testNotifyChangeWithPendingStateQueuesBucketInfoRequests); - CPPUNIT_TEST(testMergeReply); - CPPUNIT_TEST(testMergeReplyNodeDown); - CPPUNIT_TEST(testMergeReplyNodeDownAfterRequestSent); - CPPUNIT_TEST(testFlush); - CPPUNIT_TEST(testPendingClusterStateSendMessages); - CPPUNIT_TEST(testPendingClusterStateReceive); - CPPUNIT_TEST(testPendingClusterStateMerge); - CPPUNIT_TEST(testPendingClusterStateMergeReplicaChanged); - CPPUNIT_TEST(testPendingClusterStateWithGroupDown); - CPPUNIT_TEST(testPendingClusterStateWithGroupDownAndNoHandover); - CPPUNIT_TEST(testNoDbResurrectionForBucketNotOwnedInCurrentState); - CPPUNIT_TEST(testNoDbResurrectionForBucketNotOwnedInPendingState); - CPPUNIT_TEST(testClusterStateAlwaysSendsFullFetchWhenDistributionChangePending); - CPPUNIT_TEST(testChangedDistributionConfigTriggersRecoveryMode); - CPPUNIT_TEST(testNewlyAddedBucketsHaveCurrentTimeAsGcTimestamp); - CPPUNIT_TEST(testNewerMutationsNotOverwrittenByEarlierBucketFetch); - CPPUNIT_TEST(preemptedDistrChangeCarriesNodeSetOverToNextStateFetch); - CPPUNIT_TEST(preemptedStorChangeCarriesNodeSetOverToNextStateFetch); - CPPUNIT_TEST(preemptedStorageNodeDownMustBeReFetched); - CPPUNIT_TEST(outdatedNodeSetClearedAfterSuccessfulStateCompletion); - CPPUNIT_TEST(doNotSendToPreemptedNodeNowInDownState); - CPPUNIT_TEST(doNotSendToPreemptedNodeNotPartOfNewState); - CPPUNIT_TEST_DISABLED(clusterConfigDownsizeOnlySendsToAvailableNodes); - CPPUNIT_TEST(changedDiskSetTriggersReFetch); - CPPUNIT_TEST(nodeMissingFromConfigIsTreatedAsNeedingOwnershipTransfer); - CPPUNIT_TEST(changed_distributor_set_implies_ownership_transfer); - CPPUNIT_TEST(unchanged_distributor_set_implies_no_ownership_transfer); - CPPUNIT_TEST(changed_distribution_config_implies_ownership_transfer); - CPPUNIT_TEST(transition_time_tracked_for_single_state_change); - CPPUNIT_TEST(transition_time_reset_across_non_preempting_state_changes); - CPPUNIT_TEST(transition_time_tracked_for_distribution_config_change); - CPPUNIT_TEST(transition_time_tracked_across_preempted_transitions); - CPPUNIT_TEST(batch_update_of_existing_diverging_replicas_does_not_mark_any_as_trusted); - CPPUNIT_TEST(batch_add_of_new_diverging_replicas_does_not_mark_any_as_trusted); - CPPUNIT_TEST(batch_add_with_single_resulting_replica_implicitly_marks_as_trusted); - CPPUNIT_TEST(identity_update_of_single_replica_does_not_clear_trusted); - CPPUNIT_TEST(identity_update_of_diverging_untrusted_replicas_does_not_mark_any_as_trusted); - CPPUNIT_TEST(adding_diverging_replica_to_existing_trusted_does_not_remove_trusted); - CPPUNIT_TEST(batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted); - CPPUNIT_TEST(global_distribution_hash_falls_back_to_legacy_format_upon_request_rejection); - CPPUNIT_TEST(non_owned_buckets_moved_to_read_only_db_on_ownership_change); - CPPUNIT_TEST(buckets_no_longer_available_are_not_moved_to_read_only_database); - CPPUNIT_TEST(non_owned_buckets_purged_when_read_only_support_is_config_disabled); - CPPUNIT_TEST(deferred_activated_state_does_not_enable_state_until_activation_received); - CPPUNIT_TEST(read_only_db_cleared_once_pending_state_is_activated); - CPPUNIT_TEST(read_only_db_is_populated_even_when_self_is_marked_down); - CPPUNIT_TEST(activate_cluster_state_request_with_mismatching_version_returns_actual_version); - CPPUNIT_TEST(activate_cluster_state_request_without_pending_transition_passes_message_through); - CPPUNIT_TEST_SUITE_END(); - public: BucketDBUpdaterTest(); -protected: - void testNormalUsage(); - void testDistributorChange(); - void testDistributorChangeWithGrouping(); - void testNormalUsageInitializing(); - void testFailedRequestBucketInfo(); - void testBitChange(); - void testNodeDown(); - void testStorageNodeInMaintenanceClearsBucketsForNode(); - void testNodeDownCopiesGetInSync(); - void testDownWhileInit(); - void testInitializingWhileRecheck(); - void testRecheckNode(); - void testRecheckNodeWithFailure(); - void testNotifyBucketChange(); - void testNotifyBucketChangeFromNodeDown(); - void testNotifyChangeWithPendingStateQueuesBucketInfoRequests(); - void testMergeReply(); - void testMergeReplyNodeDown(); - void testMergeReplyNodeDownAfterRequestSent(); - void testFlush(); - void testPendingClusterStateSendMessages(); - void testPendingClusterStateReceive(); - void testPendingClusterStateMerge(); - void testPendingClusterStateMergeReplicaChanged(); - void testPendingClusterStateWithGroupDown(); - void testPendingClusterStateWithGroupDownAndNoHandover(); - void testNoDbResurrectionForBucketNotOwnedInCurrentState(); - void testNoDbResurrectionForBucketNotOwnedInPendingState(); - void testClusterStateAlwaysSendsFullFetchWhenDistributionChangePending(); - void testChangedDistributionConfigTriggersRecoveryMode(); - void testNewlyAddedBucketsHaveCurrentTimeAsGcTimestamp(); - void testNewerMutationsNotOverwrittenByEarlierBucketFetch(); - void preemptedDistrChangeCarriesNodeSetOverToNextStateFetch(); - void preemptedStorChangeCarriesNodeSetOverToNextStateFetch(); - void preemptedStorageNodeDownMustBeReFetched(); - void outdatedNodeSetClearedAfterSuccessfulStateCompletion(); - void doNotSendToPreemptedNodeNowInDownState(); - void doNotSendToPreemptedNodeNotPartOfNewState(); - void clusterConfigDownsizeOnlySendsToAvailableNodes(); - void changedDiskSetTriggersReFetch(); - void nodeMissingFromConfigIsTreatedAsNeedingOwnershipTransfer(); - void changed_distributor_set_implies_ownership_transfer(); - void unchanged_distributor_set_implies_no_ownership_transfer(); - void changed_distribution_config_implies_ownership_transfer(); - void transition_time_tracked_for_single_state_change(); - void transition_time_reset_across_non_preempting_state_changes(); - void transition_time_tracked_for_distribution_config_change(); - void transition_time_tracked_across_preempted_transitions(); - void batch_update_of_existing_diverging_replicas_does_not_mark_any_as_trusted(); - void batch_add_of_new_diverging_replicas_does_not_mark_any_as_trusted(); - void batch_add_with_single_resulting_replica_implicitly_marks_as_trusted(); - void identity_update_of_single_replica_does_not_clear_trusted(); - void identity_update_of_diverging_untrusted_replicas_does_not_mark_any_as_trusted(); - void adding_diverging_replica_to_existing_trusted_does_not_remove_trusted(); - void batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted(); - void global_distribution_hash_falls_back_to_legacy_format_upon_request_rejection(); - void non_owned_buckets_moved_to_read_only_db_on_ownership_change(); - void buckets_no_longer_available_are_not_moved_to_read_only_database(); - void non_owned_buckets_purged_when_read_only_support_is_config_disabled(); - void deferred_activated_state_does_not_enable_state_until_activation_received(); - void read_only_db_cleared_once_pending_state_is_activated(); - void read_only_db_is_populated_even_when_self_is_marked_down(); - void activate_cluster_state_request_with_mismatching_version_returns_actual_version(); - void activate_cluster_state_request_without_pending_transition_passes_message_through(); - auto &defaultDistributorBucketSpace() { return getBucketSpaceRepo().get(makeBucketSpace()); } bool bucketExistsThatHasNode(int bucketCount, uint16_t node) const; @@ -250,14 +120,14 @@ protected: public: using OutdatedNodesMap = dbtransition::OutdatedNodesMap; - void setUp() override { + void SetUp() override { createLinks(); _bucketSpaces = getBucketSpaces(); // Disable deferred activation by default (at least for now) to avoid breaking the entire world. getConfig().setAllowStaleReadsDuringClusterStateTransitions(false); }; - void tearDown() override { + void TearDown() override { close(); } @@ -309,7 +179,7 @@ public: uint32_t bucketCount, uint32_t invalidBucketCount = 0) { - CPPUNIT_ASSERT(cmd.getType() == MessageType::REQUESTBUCKETINFO); + ASSERT_EQ(cmd.getType(), MessageType::REQUESTBUCKETINFO); const api::StorageMessageAddress &address(*cmd.getAddress()); getBucketDBUpdater().onRequestBucketInfoReply( getFakeBucketReply(state, @@ -322,7 +192,7 @@ public: void sendFakeReplyForSingleBucketRequest( const api::RequestBucketInfoCommand& rbi) { - CPPUNIT_ASSERT_EQUAL(size_t(1), rbi.getBuckets().size()); + ASSERT_EQ(size_t(1), rbi.getBuckets().size()); const document::BucketId& bucket(rbi.getBuckets()[0]); std::shared_ptr<api::RequestBucketInfoReply> reply( @@ -379,17 +249,17 @@ public: void verifyInvalid(document::BucketId id, int storageNode) { BucketDatabase::Entry entry = getBucketDatabase().get(id); - CPPUNIT_ASSERT(entry.valid()); + ASSERT_TRUE(entry.valid()); bool found = false; for (uint32_t j = 0; j<entry->getNodeCount(); j++) { if (entry->getNodeRef(j).getNode() == storageNode) { - CPPUNIT_ASSERT(!entry->getNodeRef(j).valid()); + ASSERT_FALSE(entry->getNodeRef(j).valid()); found = true; } } - CPPUNIT_ASSERT(found); + ASSERT_TRUE(found); } struct OrderByIncreasingNodeIndex { @@ -433,10 +303,10 @@ public: } void assert_has_activate_cluster_state_reply_with_actual_version(uint32_t version) { - CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size()); + ASSERT_EQ(size_t(1), _sender.replies.size()); auto* response = dynamic_cast<api::ActivateClusterStateVersionReply*>(_sender.replies.back().get()); - CPPUNIT_ASSERT(response != nullptr); - CPPUNIT_ASSERT_EQUAL(version, response->actualVersion()); + ASSERT_TRUE(response != nullptr); + ASSERT_EQ(version, response->actualVersion()); _sender.clear(); } @@ -445,11 +315,11 @@ public: uint32_t bucketCount = 1, uint32_t invalidBucketCount = 0) { - CPPUNIT_ASSERT_EQUAL(expectedMsgs, _sender.commands.size()); + ASSERT_EQ(expectedMsgs, _sender.commands.size()); for (uint32_t i = 0; i < _sender.commands.size(); i++) { - fakeBucketReply(state, *_sender.commands[i], - bucketCount, invalidBucketCount); + ASSERT_NO_FATAL_FAILURE(fakeBucketReply(state, *_sender.commands[i], + bucketCount, invalidBucketCount)); } } @@ -459,7 +329,7 @@ public: { _sender.clear(); setSystemState(state); - completeBucketInfoGathering(state, expectedMsgs, nBuckets); + ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(state, expectedMsgs, nBuckets)); } void completeStateTransitionInSeconds(const std::string& stateStr, @@ -470,7 +340,7 @@ public: lib::ClusterState state(stateStr); setSystemState(state); getClock().addSecondsToTime(seconds); - completeBucketInfoGathering(state, expectedMsgs); + ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(state, expectedMsgs)); } uint64_t lastTransitionTimeInMillis() { @@ -486,32 +356,31 @@ public: setSystemState(newState); for (uint32_t i=0; i< messageCount(numStorageNodes); i++) { - CPPUNIT_ASSERT(_sender.commands[i]->getType() == - MessageType::REQUESTBUCKETINFO); + ASSERT_EQ(_sender.commands[i]->getType(), MessageType::REQUESTBUCKETINFO); const api::StorageMessageAddress *address = _sender.commands[i]->getAddress(); - CPPUNIT_ASSERT_EQUAL((uint32_t)(i / _bucketSpaces.size()), (uint32_t)address->getIndex()); + ASSERT_EQ((uint32_t)(i / _bucketSpaces.size()), (uint32_t)address->getIndex()); } } void initializeNodesAndBuckets(uint32_t numStorageNodes, uint32_t numBuckets) { - setStorageNodes(numStorageNodes); + ASSERT_NO_FATAL_FAILURE(setStorageNodes(numStorageNodes)); vespalib::string state(vespalib::make_string( "distributor:1 storage:%d", numStorageNodes)); lib::ClusterState newState(state); for (uint32_t i=0; i< messageCount(numStorageNodes); i++) { - fakeBucketReply(newState, *_sender.commands[i], numBuckets); + ASSERT_NO_FATAL_FAILURE(fakeBucketReply(newState, *_sender.commands[i], numBuckets)); } - assertCorrectBuckets(numBuckets, state); + ASSERT_NO_FATAL_FAILURE(assertCorrectBuckets(numBuckets, state)); } bool bucketHasNode(document::BucketId id, uint16_t node) const { BucketDatabase::Entry entry = getBucket(id); - CPPUNIT_ASSERT(entry.valid()); + assert(entry.valid()); for (uint32_t j=0; j<entry->getNodeCount(); j++) { if (entry->getNodeRef(j).getNode() == node) { @@ -555,9 +424,8 @@ public: void assertCorrectBuckets(int numBuckets, const std::string& stateStr) { lib::ClusterState state(stateStr); for (int i=0; i<numBuckets; i++) { - CPPUNIT_ASSERT_EQUAL( - getIdealStr(document::BucketId(16, i), state), - getNodes(document::BucketId(16, i))); + ASSERT_EQ(getIdealStr(document::BucketId(16, i), state), + getNodes(document::BucketId(16, i))); } } @@ -677,31 +545,26 @@ public: } }; -CPPUNIT_TEST_SUITE_REGISTRATION(BucketDBUpdaterTest); - BucketDBUpdaterTest::BucketDBUpdaterTest() - : CppUnit::TestFixture(), - DistributorTestUtil(), + : DistributorTestUtil(), _bucketSpaces() { } -void -BucketDBUpdaterTest::testNormalUsage() -{ +TEST_F(BucketDBUpdaterTest, testNormalUsage) { setSystemState(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3")); - CPPUNIT_ASSERT_EQUAL(messageCount(3), _sender.commands.size()); + ASSERT_EQ(messageCount(3), _sender.commands.size()); // Ensure distribution hash is set correctly - CPPUNIT_ASSERT_EQUAL( + ASSERT_EQ( defaultDistributorBucketSpace().getDistribution() .getNodeGraph().getDistributionConfigHash(), dynamic_cast<const RequestBucketInfoCommand&>( *_sender.commands[0]).getDistributionHash()); - fakeBucketReply(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"), - *_sender.commands[0], 10); + ASSERT_NO_FATAL_FAILURE(fakeBucketReply(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"), + *_sender.commands[0], 10)); _sender.clear(); @@ -709,101 +572,96 @@ BucketDBUpdaterTest::testNormalUsage() // change is only implemented after completion of previous cluster state setSystemState(lib::ClusterState("distributor:2 .0.s:i storage:3")); - CPPUNIT_ASSERT_EQUAL(messageCount(3), _sender.commands.size()); + ASSERT_EQ(messageCount(3), _sender.commands.size()); // Expect reply of first set SystemState request. - CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size()); + ASSERT_EQ(size_t(1), _sender.replies.size()); - completeBucketInfoGathering(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"), - messageCount(3), 10); - assertCorrectBuckets(10, "distributor:2 storage:3"); + ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering( + lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"), + messageCount(3), 10)); + ASSERT_NO_FATAL_FAILURE(assertCorrectBuckets(10, "distributor:2 storage:3")); } -void -BucketDBUpdaterTest::testDistributorChange() -{ +TEST_F(BucketDBUpdaterTest, testDistributorChange) { int numBuckets = 100; // First sends request setSystemState(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3")); - CPPUNIT_ASSERT_EQUAL(messageCount(3), _sender.commands.size()); - completeBucketInfoGathering(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"), - messageCount(3), numBuckets); + ASSERT_EQ(messageCount(3), _sender.commands.size()); + ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(lib::ClusterState("distributor:2 .0.s:i .1.s:i storage:3"), + messageCount(3), numBuckets)); _sender.clear(); // No change from initializing to up (when done with last job) setSystemState(lib::ClusterState("distributor:2 storage:3")); - CPPUNIT_ASSERT_EQUAL(size_t(0), _sender.commands.size()); + ASSERT_EQ(size_t(0), _sender.commands.size()); _sender.clear(); // Adding node. No new read requests, but buckets thrown setSystemState(lib::ClusterState("distributor:3 storage:3")); - CPPUNIT_ASSERT_EQUAL(size_t(0), _sender.commands.size()); - assertCorrectBuckets(numBuckets, "distributor:3 storage:3"); + ASSERT_EQ(size_t(0), _sender.commands.size()); + ASSERT_NO_FATAL_FAILURE(assertCorrectBuckets(numBuckets, "distributor:3 storage:3")); _sender.clear(); // Removing distributor. Need to refetch new data from all nodes. setSystemState(lib::ClusterState("distributor:2 storage:3")); - CPPUNIT_ASSERT_EQUAL(messageCount(3), _sender.commands.size()); - completeBucketInfoGathering(lib::ClusterState("distributor:2 storage:3"), - messageCount(3), numBuckets); + ASSERT_EQ(messageCount(3), _sender.commands.size()); + ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(lib::ClusterState("distributor:2 storage:3"), + messageCount(3), numBuckets)); _sender.clear(); - assertCorrectBuckets(numBuckets, "distributor:2 storage:3"); + ASSERT_NO_FATAL_FAILURE(assertCorrectBuckets(numBuckets, "distributor:2 storage:3")); } -void -BucketDBUpdaterTest::testDistributorChangeWithGrouping() -{ +TEST_F(BucketDBUpdaterTest, testDistributorChangeWithGrouping) { std::string distConfig(getDistConfig6Nodes2Groups()); setDistribution(distConfig); int numBuckets = 100; setSystemState(lib::ClusterState("distributor:6 storage:6")); - CPPUNIT_ASSERT_EQUAL(messageCount(6), _sender.commands.size()); - completeBucketInfoGathering(lib::ClusterState("distributor:6 storage:6"), - messageCount(6), numBuckets); + ASSERT_EQ(messageCount(6), _sender.commands.size()); + ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(lib::ClusterState("distributor:6 storage:6"), + messageCount(6), numBuckets)); _sender.clear(); // Distributor going down in other group, no change setSystemState(lib::ClusterState("distributor:6 .5.s:d storage:6")); - CPPUNIT_ASSERT_EQUAL(size_t(0), _sender.commands.size()); + ASSERT_EQ(size_t(0), _sender.commands.size()); _sender.clear(); setSystemState(lib::ClusterState("distributor:6 storage:6")); - CPPUNIT_ASSERT_EQUAL(size_t(0), _sender.commands.size()); - assertCorrectBuckets(numBuckets, "distributor:6 storage:6"); + ASSERT_EQ(size_t(0), _sender.commands.size()); + ASSERT_NO_FATAL_FAILURE(assertCorrectBuckets(numBuckets, "distributor:6 storage:6")); _sender.clear(); // Unchanged grouping cause no change. setDistribution(distConfig); - CPPUNIT_ASSERT_EQUAL(size_t(0), _sender.commands.size()); + ASSERT_EQ(size_t(0), _sender.commands.size()); // Changed grouping cause change setDistribution(getDistConfig6Nodes4Groups()); - CPPUNIT_ASSERT_EQUAL(messageCount(6), _sender.commands.size()); + ASSERT_EQ(messageCount(6), _sender.commands.size()); } -void -BucketDBUpdaterTest::testNormalUsageInitializing() -{ +TEST_F(BucketDBUpdaterTest, testNormalUsageInitializing) { setSystemState(lib::ClusterState("distributor:1 .0.s:i storage:1 .0.s:i")); - CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size()); + ASSERT_EQ(_bucketSpaces.size(), _sender.commands.size()); // Not yet passing on system state. - CPPUNIT_ASSERT_EQUAL(size_t(0), _senderDown.commands.size()); + ASSERT_EQ(size_t(0), _senderDown.commands.size()); - completeBucketInfoGathering(lib::ClusterState("distributor:1 .0.s:i storage:1"), - _bucketSpaces.size(), 10, 10); + ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(lib::ClusterState("distributor:1 .0.s:i storage:1"), + _bucketSpaces.size(), 10, 10)); - assertCorrectBuckets(10, "distributor:1 storage:1"); + ASSERT_NO_FATAL_FAILURE(assertCorrectBuckets(10, "distributor:1 storage:1")); for (int i=10; i<20; i++) { - verifyInvalid(document::BucketId(16, i), 0); + ASSERT_NO_FATAL_FAILURE(verifyInvalid(document::BucketId(16, i), 0)); } // Pass on cluster state and recheck buckets now. - CPPUNIT_ASSERT_EQUAL(size_t(1), _senderDown.commands.size()); + ASSERT_EQ(size_t(1), _senderDown.commands.size()); _sender.clear(); _senderDown.clear(); @@ -811,24 +669,22 @@ BucketDBUpdaterTest::testNormalUsageInitializing() setSystemState(lib::ClusterState("distributor:1 .0.s:i storage:1")); // Send a new request bucket info up. - CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size()); + ASSERT_EQ(_bucketSpaces.size(), _sender.commands.size()); - completeBucketInfoGathering(lib::ClusterState("distributor:1 .0.s:i storage:1"), - _bucketSpaces.size(), 20); + ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(lib::ClusterState("distributor:1 .0.s:i storage:1"), + _bucketSpaces.size(), 20)); // Pass on cluster state and recheck buckets now. - CPPUNIT_ASSERT_EQUAL(size_t(1), _senderDown.commands.size()); + ASSERT_EQ(size_t(1), _senderDown.commands.size()); - assertCorrectBuckets(20, "distributor:1 storage:1"); + ASSERT_NO_FATAL_FAILURE(assertCorrectBuckets(20, "distributor:1 storage:1")); } -void -BucketDBUpdaterTest::testFailedRequestBucketInfo() -{ +TEST_F(BucketDBUpdaterTest, testFailedRequestBucketInfo) { setSystemState(lib::ClusterState("distributor:1 .0.s:i storage:1")); // 2 messages sent up: 1 to the nodes, and one reply to the setsystemstate. - CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size()); + ASSERT_EQ(_bucketSpaces.size(), _sender.commands.size()); { for (uint32_t i = 0; i < _bucketSpaces.size(); ++i) { @@ -847,43 +703,38 @@ BucketDBUpdaterTest::testFailedRequestBucketInfo() } // Should be resent. - CPPUNIT_ASSERT_EQUAL(getRequestBucketInfoStrings(messageCount(2)), - _sender.getCommands()); + ASSERT_EQ(getRequestBucketInfoStrings(messageCount(2)), _sender.getCommands()); - CPPUNIT_ASSERT_EQUAL(size_t(0), _senderDown.commands.size()); + ASSERT_EQ(size_t(0), _senderDown.commands.size()); for (uint32_t i = 0; i < _bucketSpaces.size(); ++i) { - fakeBucketReply(lib::ClusterState("distributor:1 .0.s:i storage:1"), - *_sender.commands[_bucketSpaces.size() + i], 10); + ASSERT_NO_FATAL_FAILURE(fakeBucketReply(lib::ClusterState("distributor:1 .0.s:i storage:1"), + *_sender.commands[_bucketSpaces.size() + i], 10)); } for (int i=0; i<10; i++) { - CPPUNIT_ASSERT_EQUAL( - std::string(""), - verifyBucket(document::BucketId(16, i), - lib::ClusterState("distributor:1 storage:1"))); + EXPECT_EQ(std::string(""), + verifyBucket(document::BucketId(16, i), + lib::ClusterState("distributor:1 storage:1"))); } // Set system state should now be passed on - CPPUNIT_ASSERT_EQUAL(std::string("Set system state"), - _senderDown.getCommands()); + EXPECT_EQ(std::string("Set system state"), _senderDown.getCommands()); } -void -BucketDBUpdaterTest::testDownWhileInit() -{ - setStorageNodes(3); +TEST_F(BucketDBUpdaterTest, testDownWhileInit) { + ASSERT_NO_FATAL_FAILURE(setStorageNodes(3)); - fakeBucketReply(lib::ClusterState("distributor:1 storage:3"), - *_sender.commands[0], 5); + ASSERT_NO_FATAL_FAILURE(fakeBucketReply(lib::ClusterState("distributor:1 storage:3"), + *_sender.commands[0], 5)); setSystemState(lib::ClusterState("distributor:1 storage:3 .1.s:d")); - fakeBucketReply(lib::ClusterState("distributor:1 storage:3"), - *_sender.commands[2], 5); + ASSERT_NO_FATAL_FAILURE(fakeBucketReply(lib::ClusterState("distributor:1 storage:3"), + *_sender.commands[2], 5)); - fakeBucketReply(lib::ClusterState("distributor:1 storage:3"), - *_sender.commands[1], 5); + ASSERT_NO_FATAL_FAILURE(fakeBucketReply(lib::ClusterState("distributor:1 storage:3"), + *_sender.commands[1], 5)); } bool @@ -934,96 +785,83 @@ BucketDBUpdaterTest::expandNodeVec(const std::vector<uint16_t> &nodes) return res; } -void -BucketDBUpdaterTest::testNodeDown() -{ - setStorageNodes(3); +TEST_F(BucketDBUpdaterTest, testNodeDown) { + ASSERT_NO_FATAL_FAILURE(setStorageNodes(3)); enableDistributorClusterState("distributor:1 storage:3"); for (int i=1; i<100; i++) { addIdealNodes(document::BucketId(16, i)); } - CPPUNIT_ASSERT(bucketExistsThatHasNode(100, 1)); + EXPECT_TRUE(bucketExistsThatHasNode(100, 1)); setSystemState(lib::ClusterState("distributor:1 storage:3 .1.s:d")); - CPPUNIT_ASSERT(!bucketExistsThatHasNode(100, 1)); + EXPECT_FALSE(bucketExistsThatHasNode(100, 1)); } -void -BucketDBUpdaterTest::testStorageNodeInMaintenanceClearsBucketsForNode() -{ - setStorageNodes(3); +TEST_F(BucketDBUpdaterTest, testStorageNodeInMaintenanceClearsBucketsForNode) { + ASSERT_NO_FATAL_FAILURE(setStorageNodes(3)); enableDistributorClusterState("distributor:1 storage:3"); for (int i=1; i<100; i++) { addIdealNodes(document::BucketId(16, i)); } - CPPUNIT_ASSERT(bucketExistsThatHasNode(100, 1)); + EXPECT_TRUE(bucketExistsThatHasNode(100, 1)); setSystemState(lib::ClusterState("distributor:1 storage:3 .1.s:m")); - - CPPUNIT_ASSERT(!bucketExistsThatHasNode(100, 1)); + + EXPECT_FALSE(bucketExistsThatHasNode(100, 1)); } -void -BucketDBUpdaterTest::testNodeDownCopiesGetInSync() -{ - setStorageNodes(3); +TEST_F(BucketDBUpdaterTest, testNodeDownCopiesGetInSync) { + ASSERT_NO_FATAL_FAILURE(setStorageNodes(3)); - lib::ClusterState systemState("distributor:1 storage:3"); + lib::ClusterState systemState("distributor:1 storage:3"); document::BucketId bid(16, 1); addNodesToBucketDB(bid, "0=3,1=2,2=3"); setSystemState(lib::ClusterState("distributor:1 storage:3 .1.s:d")); - CPPUNIT_ASSERT_EQUAL( + EXPECT_EQ( std::string("BucketId(0x4000000000000001) : " "node(idx=0,crc=0x3,docs=3/3,bytes=3/3,trusted=true,active=false,ready=false), " "node(idx=2,crc=0x3,docs=3/3,bytes=3/3,trusted=true,active=false,ready=false)"), dumpBucket(bid)); } -void -BucketDBUpdaterTest::testInitializingWhileRecheck() -{ +TEST_F(BucketDBUpdaterTest, testInitializingWhileRecheck) { lib::ClusterState systemState("distributor:1 storage:2 .0.s:i .0.i:0.1"); setSystemState(systemState); - CPPUNIT_ASSERT_EQUAL(messageCount(2), _sender.commands.size()); - CPPUNIT_ASSERT_EQUAL(size_t(0), _senderDown.commands.size()); + ASSERT_EQ(messageCount(2), _sender.commands.size()); + ASSERT_EQ(size_t(0), _senderDown.commands.size()); getBucketDBUpdater().recheckBucketInfo(1, makeDocumentBucket(document::BucketId(16, 3))); for (uint32_t i = 0; i < messageCount(2); ++i) { - fakeBucketReply(systemState, *_sender.commands[i], 100); + ASSERT_NO_FATAL_FAILURE(fakeBucketReply(systemState, *_sender.commands[i], 100)); } // Now we can pass on system state. - CPPUNIT_ASSERT_EQUAL(size_t(1), _senderDown.commands.size()); - - CPPUNIT_ASSERT_EQUAL(MessageType::SETSYSTEMSTATE, - _senderDown.commands[0]->getType()); + ASSERT_EQ(size_t(1), _senderDown.commands.size()); + EXPECT_EQ(MessageType::SETSYSTEMSTATE, _senderDown.commands[0]->getType()); } -void -BucketDBUpdaterTest::testBitChange() -{ - +TEST_F(BucketDBUpdaterTest, testBitChange) { std::vector<document::BucketId> bucketlist; { setSystemState(lib::ClusterState("bits:14 storage:1 distributor:2")); - CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size()); + ASSERT_EQ(_bucketSpaces.size(), _sender.commands.size()); for (uint32_t bsi = 0; bsi < _bucketSpaces.size(); ++bsi) { - CPPUNIT_ASSERT(_sender.commands[bsi]->getType() == MessageType::REQUESTBUCKETINFO); + ASSERT_EQ(_sender.commands[bsi]->getType(), MessageType::REQUESTBUCKETINFO); const auto &req = dynamic_cast<const RequestBucketInfoCommand &>(*_sender.commands[bsi]); - RequestBucketInfoReply* sreply = new RequestBucketInfoReply(req); + auto sreply = std::make_shared<RequestBucketInfoReply>(req); sreply->setAddress(storageAddress(0)); api::RequestBucketInfoReply::EntryVector &vec = sreply->getBucketInfo(); if (req.getBucketSpace() == FixedBucketSpaces::default_space()) { @@ -1046,29 +884,27 @@ BucketDBUpdaterTest::testBitChange() } } - getBucketDBUpdater().onRequestBucketInfoReply(std::shared_ptr<RequestBucketInfoReply>(sreply)); + getBucketDBUpdater().onRequestBucketInfoReply(sreply); } } - CPPUNIT_ASSERT_EQUAL( - std::string("BucketId(0x4000000000000001) : " - "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), - dumpBucket(bucketlist[0])); - CPPUNIT_ASSERT_EQUAL( - std::string("BucketId(0x4000000000000002) : " - "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), - dumpBucket(bucketlist[1])); + EXPECT_EQ(std::string("BucketId(0x4000000000000001) : " + "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), + dumpBucket(bucketlist[0])); + EXPECT_EQ(std::string("BucketId(0x4000000000000002) : " + "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), + dumpBucket(bucketlist[1])); { _sender.clear(); setSystemState(lib::ClusterState("bits:16 storage:1 distributor:2")); - CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size()); + ASSERT_EQ(_bucketSpaces.size(), _sender.commands.size()); for (uint32_t bsi = 0; bsi < _bucketSpaces.size(); ++bsi) { - CPPUNIT_ASSERT(_sender.commands[bsi]->getType() == MessageType::REQUESTBUCKETINFO); + ASSERT_EQ(_sender.commands[bsi]->getType(), MessageType::REQUESTBUCKETINFO); const auto &req = dynamic_cast<const RequestBucketInfoCommand &>(*_sender.commands[bsi]); - RequestBucketInfoReply* sreply = new RequestBucketInfoReply(req); + auto sreply = std::make_shared<RequestBucketInfoReply>(req); sreply->setAddress(storageAddress(0)); sreply->setResult(api::ReturnCode::OK); if (req.getBucketSpace() == FixedBucketSpaces::default_space()) { @@ -1085,27 +921,22 @@ BucketDBUpdaterTest::testBitChange() api::BucketInfo(10,1,1))); } - getBucketDBUpdater().onRequestBucketInfoReply( - std::shared_ptr<RequestBucketInfoReply>(sreply)); + getBucketDBUpdater().onRequestBucketInfoReply(sreply); } } - CPPUNIT_ASSERT_EQUAL( - std::string("BucketId(0x4000000000000000) : " - "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), - dumpBucket(document::BucketId(16, 0))); - CPPUNIT_ASSERT_EQUAL( - std::string("BucketId(0x4000000000000001) : " - "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), - dumpBucket(document::BucketId(16, 1))); - CPPUNIT_ASSERT_EQUAL( - std::string("BucketId(0x4000000000000002) : " - "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), - dumpBucket(document::BucketId(16, 2))); - CPPUNIT_ASSERT_EQUAL( - std::string("BucketId(0x4000000000000004) : " - "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), - dumpBucket(document::BucketId(16, 4))); + EXPECT_EQ(std::string("BucketId(0x4000000000000000) : " + "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), + dumpBucket(document::BucketId(16, 0))); + EXPECT_EQ(std::string("BucketId(0x4000000000000001) : " + "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), + dumpBucket(document::BucketId(16, 1))); + EXPECT_EQ(std::string("BucketId(0x4000000000000002) : " + "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), + dumpBucket(document::BucketId(16, 2))); + EXPECT_EQ(std::string("BucketId(0x4000000000000004) : " + "node(idx=0,crc=0xa,docs=1/1,bytes=1/1,trusted=true,active=false,ready=false)"), + dumpBucket(document::BucketId(16, 4))); { _sender.clear(); @@ -1118,29 +949,23 @@ BucketDBUpdaterTest::testBitChange() } }; -void -BucketDBUpdaterTest::testRecheckNodeWithFailure() -{ - initializeNodesAndBuckets(3, 5); +TEST_F(BucketDBUpdaterTest, testRecheckNodeWithFailure) { + ASSERT_NO_FATAL_FAILURE(initializeNodesAndBuckets(3, 5)); _sender.clear(); getBucketDBUpdater().recheckBucketInfo(1, makeDocumentBucket(document::BucketId(16, 3))); - CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); - + ASSERT_EQ(size_t(1), _sender.commands.size()); uint16_t index = 0; { - api::RequestBucketInfoCommand& rbi( - dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[0])); - CPPUNIT_ASSERT_EQUAL(size_t(1), rbi.getBuckets().size()); - CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 3), rbi.getBuckets()[0]); - auto reply(std::make_shared<api::RequestBucketInfoReply>(rbi)); - + auto& rbi = dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[0]); + ASSERT_EQ(size_t(1), rbi.getBuckets().size()); + EXPECT_EQ(document::BucketId(16, 3), rbi.getBuckets()[0]); + auto reply = std::make_shared<api::RequestBucketInfoReply>(rbi); const api::StorageMessageAddress *address = _sender.commands[0]->getAddress(); index = address->getIndex(); - reply->setResult(api::ReturnCode::NOT_CONNECTED); getBucketDBUpdater().onRequestBucketInfoReply(reply); // Trigger that delayed message is sent @@ -1148,43 +973,39 @@ BucketDBUpdaterTest::testRecheckNodeWithFailure() getBucketDBUpdater().resendDelayedMessages(); } - CPPUNIT_ASSERT_EQUAL(size_t(2), _sender.commands.size()); + ASSERT_EQ(size_t(2), _sender.commands.size()); setSystemState( lib::ClusterState(vespalib::make_string("distributor:1 storage:3 .%d.s:d", index))); // Recheck bucket. { - api::RequestBucketInfoCommand& rbi(dynamic_cast<RequestBucketInfoCommand&> - (*_sender.commands[1])); - CPPUNIT_ASSERT_EQUAL(size_t(1), rbi.getBuckets().size()); - CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 3), rbi.getBuckets()[0]); - auto reply(std::make_shared<api::RequestBucketInfoReply>(rbi)); + auto& rbi = dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[1]); + ASSERT_EQ(size_t(1), rbi.getBuckets().size()); + EXPECT_EQ(document::BucketId(16, 3), rbi.getBuckets()[0]); + auto reply = std::make_shared<api::RequestBucketInfoReply>(rbi); reply->setResult(api::ReturnCode::NOT_CONNECTED); getBucketDBUpdater().onRequestBucketInfoReply(reply); } // Should not retry since node is down. - CPPUNIT_ASSERT_EQUAL(size_t(2), _sender.commands.size()); + EXPECT_EQ(size_t(2), _sender.commands.size()); } -void -BucketDBUpdaterTest::testRecheckNode() -{ - initializeNodesAndBuckets(3, 5); +TEST_F(BucketDBUpdaterTest, testRecheckNode) { + ASSERT_NO_FATAL_FAILURE(initializeNodesAndBuckets(3, 5)); _sender.clear(); getBucketDBUpdater().recheckBucketInfo(1, makeDocumentBucket(document::BucketId(16, 3))); - CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); + ASSERT_EQ(size_t(1), _sender.commands.size()); - api::RequestBucketInfoCommand& rbi( - dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[0])); - CPPUNIT_ASSERT_EQUAL(size_t(1), rbi.getBuckets().size()); - CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 3), rbi.getBuckets()[0]); + auto& rbi = dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[0]); + ASSERT_EQ(size_t(1), rbi.getBuckets().size()); + EXPECT_EQ(document::BucketId(16, 3), rbi.getBuckets()[0]); - auto reply(std::make_shared<api::RequestBucketInfoReply>(rbi)); + auto reply = std::make_shared<api::RequestBucketInfoReply>(rbi); reply->getBucketInfo().push_back( api::RequestBucketInfoReply::Entry(document::BucketId(16, 3), api::BucketInfo(20, 10, 12, 50, 60, true, true))); @@ -1192,29 +1013,24 @@ BucketDBUpdaterTest::testRecheckNode() lib::ClusterState state("distributor:1 storage:3"); for (uint32_t i = 0; i < 3; i++) { - CPPUNIT_ASSERT_EQUAL( - getIdealStr(document::BucketId(16, i), state), - getNodes(document::BucketId(16, i))); + EXPECT_EQ(getIdealStr(document::BucketId(16, i), state), + getNodes(document::BucketId(16, i))); } for (uint32_t i = 4; i < 5; i++) { - CPPUNIT_ASSERT_EQUAL( - getIdealStr(document::BucketId(16, i), state), - getNodes(document::BucketId(16, i))); + EXPECT_EQ(getIdealStr(document::BucketId(16, i), state), + getNodes(document::BucketId(16, i))); } BucketDatabase::Entry entry = getBucketDatabase().get(document::BucketId(16, 3)); - CPPUNIT_ASSERT(entry.valid()); + ASSERT_TRUE(entry.valid()); const BucketCopy* copy = entry->getNode(1); - CPPUNIT_ASSERT(copy != 0); - CPPUNIT_ASSERT_EQUAL(api::BucketInfo(20,10,12, 50, 60, true, true), - copy->getBucketInfo()); + ASSERT_TRUE(copy != nullptr); + EXPECT_EQ(api::BucketInfo(20,10,12, 50, 60, true, true), copy->getBucketInfo()); } -void -BucketDBUpdaterTest::testNotifyBucketChange() -{ +TEST_F(BucketDBUpdaterTest, testNotifyBucketChange) { enableDistributorClusterState("distributor:1 storage:1"); addNodesToBucketDB(document::BucketId(16, 1), "0=1234"); @@ -1237,54 +1053,47 @@ BucketDBUpdaterTest::testNotifyBucketChange() } // Must receive reply - CPPUNIT_ASSERT_EQUAL(size_t(2), _sender.replies.size()); + ASSERT_EQ(size_t(2), _sender.replies.size()); for (int i = 0; i < 2; ++i) { - CPPUNIT_ASSERT_EQUAL(MessageType::NOTIFYBUCKETCHANGE_REPLY, - _sender.replies[i]->getType()); + ASSERT_EQ(MessageType::NOTIFYBUCKETCHANGE_REPLY, + _sender.replies[i]->getType()); } // No database update until request bucket info replies have been received. - CPPUNIT_ASSERT_EQUAL(std::string("BucketId(0x4000000000000001) : " - "node(idx=0,crc=0x4d2,docs=1234/1234,bytes=1234/1234," - "trusted=false,active=false,ready=false)"), - dumpBucket(document::BucketId(16, 1))); - CPPUNIT_ASSERT_EQUAL(std::string("NONEXISTING"), - dumpBucket(document::BucketId(16, 2))); + EXPECT_EQ(std::string("BucketId(0x4000000000000001) : " + "node(idx=0,crc=0x4d2,docs=1234/1234,bytes=1234/1234," + "trusted=false,active=false,ready=false)"), + dumpBucket(document::BucketId(16, 1))); + EXPECT_EQ(std::string("NONEXISTING"), dumpBucket(document::BucketId(16, 2))); - CPPUNIT_ASSERT_EQUAL(size_t(2), _sender.commands.size()); + ASSERT_EQ(size_t(2), _sender.commands.size()); std::vector<api::BucketInfo> infos; infos.push_back(api::BucketInfo(4567, 200, 2000, 400, 4000, true, true)); infos.push_back(api::BucketInfo(8999, 300, 3000, 500, 5000, false, false)); for (int i = 0; i < 2; ++i) { - api::RequestBucketInfoCommand& rbi( - dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[i])); - CPPUNIT_ASSERT_EQUAL(size_t(1), rbi.getBuckets().size()); - CPPUNIT_ASSERT_EQUAL(document::BucketId(16, i + 1), rbi.getBuckets()[0]); + auto& rbi = dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[i]); + ASSERT_EQ(size_t(1), rbi.getBuckets().size()); + EXPECT_EQ(document::BucketId(16, i + 1), rbi.getBuckets()[0]); - auto reply(std::make_shared<api::RequestBucketInfoReply>(rbi)); + auto reply = std::make_shared<api::RequestBucketInfoReply>(rbi); reply->getBucketInfo().push_back( api::RequestBucketInfoReply::Entry(document::BucketId(16, i + 1), infos[i])); getBucketDBUpdater().onRequestBucketInfoReply(reply); } - CPPUNIT_ASSERT_EQUAL( - std::string("BucketId(0x4000000000000001) : " - "node(idx=0,crc=0x11d7,docs=200/400,bytes=2000/4000,trusted=true,active=true,ready=true)"), - dumpBucket(document::BucketId(16, 1))); - CPPUNIT_ASSERT_EQUAL( - std::string("BucketId(0x4000000000000002) : " - "node(idx=0,crc=0x2327,docs=300/500,bytes=3000/5000,trusted=true,active=false,ready=false)"), - dumpBucket(document::BucketId(16, 2))); - + EXPECT_EQ(std::string("BucketId(0x4000000000000001) : " + "node(idx=0,crc=0x11d7,docs=200/400,bytes=2000/4000,trusted=true,active=true,ready=true)"), + dumpBucket(document::BucketId(16, 1))); + EXPECT_EQ(std::string("BucketId(0x4000000000000002) : " + "node(idx=0,crc=0x2327,docs=300/500,bytes=3000/5000,trusted=true,active=false,ready=false)"), + dumpBucket(document::BucketId(16, 2))); } -void -BucketDBUpdaterTest::testNotifyBucketChangeFromNodeDown() -{ +TEST_F(BucketDBUpdaterTest, testNotifyBucketChangeFromNodeDown) { enableDistributorClusterState("distributor:1 storage:2"); addNodesToBucketDB(document::BucketId(16, 1), "1=1234"); @@ -1302,25 +1111,22 @@ BucketDBUpdaterTest::testNotifyBucketChangeFromNodeDown() // (sendRequestBucketInfo drops message if node is down). enableDistributorClusterState("distributor:1 storage:2 .0.s:d"); - CPPUNIT_ASSERT_EQUAL( - std::string("BucketId(0x4000000000000001) : " - "node(idx=1,crc=0x4d2,docs=1234/1234,bytes=1234/1234,trusted=false,active=false,ready=false)"), - dumpBucket(document::BucketId(16, 1))); + ASSERT_EQ(std::string("BucketId(0x4000000000000001) : " + "node(idx=1,crc=0x4d2,docs=1234/1234,bytes=1234/1234,trusted=false,active=false,ready=false)"), + dumpBucket(document::BucketId(16, 1))); - CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size()); - CPPUNIT_ASSERT_EQUAL(MessageType::NOTIFYBUCKETCHANGE_REPLY, - _sender.replies[0]->getType()); + ASSERT_EQ(size_t(1), _sender.replies.size()); + ASSERT_EQ(MessageType::NOTIFYBUCKETCHANGE_REPLY, _sender.replies[0]->getType()); // Currently, this pending operation will be auto-flushed when the cluster state // changes so the behavior is still correct. Keep this test around to prevent // regressions here. - CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); - api::RequestBucketInfoCommand& rbi( - dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[0])); - CPPUNIT_ASSERT_EQUAL(size_t(1), rbi.getBuckets().size()); - CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 1), rbi.getBuckets()[0]); + ASSERT_EQ(size_t(1), _sender.commands.size()); + auto& rbi = dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[0]); + ASSERT_EQ(size_t(1), rbi.getBuckets().size()); + EXPECT_EQ(document::BucketId(16, 1), rbi.getBuckets()[0]); - auto reply(std::make_shared<api::RequestBucketInfoReply>(rbi)); + auto reply = std::make_shared<api::RequestBucketInfoReply>(rbi); reply->getBucketInfo().push_back( api::RequestBucketInfoReply::Entry( document::BucketId(16, 1), @@ -1328,10 +1134,9 @@ BucketDBUpdaterTest::testNotifyBucketChangeFromNodeDown() getBucketDBUpdater().onRequestBucketInfoReply(reply); // No change - CPPUNIT_ASSERT_EQUAL( - std::string("BucketId(0x4000000000000001) : " - "node(idx=1,crc=0x4d2,docs=1234/1234,bytes=1234/1234,trusted=false,active=false,ready=false)"), - dumpBucket(document::BucketId(16, 1))); + EXPECT_EQ(std::string("BucketId(0x4000000000000001) : " + "node(idx=1,crc=0x4d2,docs=1234/1234,bytes=1234/1234,trusted=false,active=false,ready=false)"), + dumpBucket(document::BucketId(16, 1))); } /** @@ -1342,11 +1147,9 @@ BucketDBUpdaterTest::testNotifyBucketChangeFromNodeDown() * distributor in the pending state but not by the current state would be * discarded when attempted inserted into the bucket database. */ -void -BucketDBUpdaterTest::testNotifyChangeWithPendingStateQueuesBucketInfoRequests() -{ +TEST_F(BucketDBUpdaterTest, testNotifyChangeWithPendingStateQueuesBucketInfoRequests) { setSystemState(lib::ClusterState("distributor:1 storage:1")); - CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size()); + ASSERT_EQ(_bucketSpaces.size(), _sender.commands.size()); { api::BucketInfo info(8999, 300, 3000, 500, 5000, false, false); @@ -1356,18 +1159,17 @@ BucketDBUpdaterTest::testNotifyChangeWithPendingStateQueuesBucketInfoRequests() getBucketDBUpdater().onNotifyBucketChange(cmd); } - CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size()); + ASSERT_EQ(_bucketSpaces.size(), _sender.commands.size()); - completeBucketInfoGathering(lib::ClusterState("distributor:1 storage:1"), - _bucketSpaces.size(), 10); + ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(lib::ClusterState("distributor:1 storage:1"), + _bucketSpaces.size(), 10)); - CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size() + 1, _sender.commands.size()); + ASSERT_EQ(_bucketSpaces.size() + 1, _sender.commands.size()); { - api::RequestBucketInfoCommand& rbi( - dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[_bucketSpaces.size()])); - CPPUNIT_ASSERT_EQUAL(size_t(1), rbi.getBuckets().size()); - CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 1), rbi.getBuckets()[0]); + auto& rbi = dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[_bucketSpaces.size()]); + ASSERT_EQ(size_t(1), rbi.getBuckets().size()); + EXPECT_EQ(document::BucketId(16, 1), rbi.getBuckets()[0]); } _sender.clear(); @@ -1375,19 +1177,16 @@ BucketDBUpdaterTest::testNotifyChangeWithPendingStateQueuesBucketInfoRequests() { lib::ClusterState state("distributor:1 storage:2"); uint32_t expectedMsgs = _bucketSpaces.size(), dummyBucketsToReturn = 1; - setAndEnableClusterState(state, expectedMsgs, dummyBucketsToReturn); + ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(state, expectedMsgs, dummyBucketsToReturn)); } - CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size()); + ASSERT_EQ(_bucketSpaces.size(), _sender.commands.size()); { - api::RequestBucketInfoCommand& rbi( - dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[0])); - CPPUNIT_ASSERT_EQUAL(size_t(0), rbi.getBuckets().size()); + auto& rbi = dynamic_cast<RequestBucketInfoCommand&>(*_sender.commands[0]); + EXPECT_EQ(size_t(0), rbi.getBuckets().size()); } } -void -BucketDBUpdaterTest::testMergeReply() -{ +TEST_F(BucketDBUpdaterTest, testMergeReply) { enableDistributorClusterState("distributor:1 storage:3"); addNodesToBucketDB(document::BucketId(16, 1234), @@ -1400,23 +1199,21 @@ BucketDBUpdaterTest::testMergeReply() api::MergeBucketCommand cmd(makeDocumentBucket(document::BucketId(16, 1234)), nodes, 0); - auto reply(std::make_shared<api::MergeBucketReply>(cmd)); + auto reply = std::make_shared<api::MergeBucketReply>(cmd); _sender.clear(); getBucketDBUpdater().onMergeBucketReply(reply); - CPPUNIT_ASSERT_EQUAL(size_t(3), _sender.commands.size()); + ASSERT_EQ(size_t(3), _sender.commands.size()); for (uint32_t i = 0; i < 3; i++) { - std::shared_ptr<api::RequestBucketInfoCommand> - req(std::dynamic_pointer_cast<api::RequestBucketInfoCommand>( - _sender.commands[i])); + auto req = std::dynamic_pointer_cast<api::RequestBucketInfoCommand>(_sender.commands[i]); - CPPUNIT_ASSERT(req.get()); - CPPUNIT_ASSERT_EQUAL(size_t(1), req->getBuckets().size()); - CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 1234), req->getBuckets()[0]); + ASSERT_TRUE(req.get() != nullptr); + ASSERT_EQ(size_t(1), req->getBuckets().size()); + EXPECT_EQ(document::BucketId(16, 1234), req->getBuckets()[0]); - auto reqreply(std::make_shared<api::RequestBucketInfoReply>(*req)); + auto reqreply = std::make_shared<api::RequestBucketInfoReply>(*req); reqreply->getBucketInfo().push_back( api::RequestBucketInfoReply::Entry(document::BucketId(16, 1234), api::BucketInfo(10 * (i + 1), 100 * (i +1), 1000 * (i+1)))); @@ -1424,17 +1221,14 @@ BucketDBUpdaterTest::testMergeReply() getBucketDBUpdater().onRequestBucketInfoReply(reqreply); } - CPPUNIT_ASSERT_EQUAL( - std::string("BucketId(0x40000000000004d2) : " - "node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false,ready=false), " - "node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false), " - "node(idx=2,crc=0x1e,docs=300/300,bytes=3000/3000,trusted=false,active=false,ready=false)"), - dumpBucket(document::BucketId(16, 1234))); + EXPECT_EQ(std::string("BucketId(0x40000000000004d2) : " + "node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false,ready=false), " + "node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false), " + "node(idx=2,crc=0x1e,docs=300/300,bytes=3000/3000,trusted=false,active=false,ready=false)"), + dumpBucket(document::BucketId(16, 1234))); }; -void -BucketDBUpdaterTest::testMergeReplyNodeDown() -{ +TEST_F(BucketDBUpdaterTest, testMergeReplyNodeDown) { enableDistributorClusterState("distributor:1 storage:3"); std::vector<api::MergeBucketCommand::Node> nodes; @@ -1446,25 +1240,23 @@ BucketDBUpdaterTest::testMergeReplyNodeDown() api::MergeBucketCommand cmd(makeDocumentBucket(document::BucketId(16, 1234)), nodes, 0); - auto reply(std::make_shared<api::MergeBucketReply>(cmd)); + auto reply = std::make_shared<api::MergeBucketReply>(cmd); setSystemState(lib::ClusterState("distributor:1 storage:2")); _sender.clear(); getBucketDBUpdater().onMergeBucketReply(reply); - CPPUNIT_ASSERT_EQUAL(size_t(2), _sender.commands.size()); + ASSERT_EQ(size_t(2), _sender.commands.size()); for (uint32_t i = 0; i < 2; i++) { - std::shared_ptr<api::RequestBucketInfoCommand> req( - std::dynamic_pointer_cast<api::RequestBucketInfoCommand>( - _sender.commands[i])); + auto req = std::dynamic_pointer_cast<api::RequestBucketInfoCommand>(_sender.commands[i]); - CPPUNIT_ASSERT(req.get()); - CPPUNIT_ASSERT_EQUAL(size_t(1), req->getBuckets().size()); - CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 1234), req->getBuckets()[0]); + ASSERT_TRUE(req.get() != nullptr); + ASSERT_EQ(size_t(1), req->getBuckets().size()); + EXPECT_EQ(document::BucketId(16, 1234), req->getBuckets()[0]); - auto reqreply(std::make_shared<api::RequestBucketInfoReply>(*req)); + auto reqreply = std::make_shared<api::RequestBucketInfoReply>(*req); reqreply->getBucketInfo().push_back( api::RequestBucketInfoReply::Entry( document::BucketId(16, 1234), @@ -1472,16 +1264,13 @@ BucketDBUpdaterTest::testMergeReplyNodeDown() getBucketDBUpdater().onRequestBucketInfoReply(reqreply); } - CPPUNIT_ASSERT_EQUAL( - std::string("BucketId(0x40000000000004d2) : " - "node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false,ready=false), " - "node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false)"), - dumpBucket(document::BucketId(16, 1234))); + EXPECT_EQ(std::string("BucketId(0x40000000000004d2) : " + "node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false,ready=false), " + "node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false)"), + dumpBucket(document::BucketId(16, 1234))); }; -void -BucketDBUpdaterTest::testMergeReplyNodeDownAfterRequestSent() -{ +TEST_F(BucketDBUpdaterTest, testMergeReplyNodeDownAfterRequestSent) { enableDistributorClusterState("distributor:1 storage:3"); std::vector<api::MergeBucketCommand::Node> nodes; @@ -1498,20 +1287,18 @@ BucketDBUpdaterTest::testMergeReplyNodeDownAfterRequestSent() _sender.clear(); getBucketDBUpdater().onMergeBucketReply(reply); - CPPUNIT_ASSERT_EQUAL(size_t(3), _sender.commands.size()); + ASSERT_EQ(size_t(3), _sender.commands.size()); setSystemState(lib::ClusterState("distributor:1 storage:2")); for (uint32_t i = 0; i < 3; i++) { - std::shared_ptr<api::RequestBucketInfoCommand> req( - std::dynamic_pointer_cast<api::RequestBucketInfoCommand>( - _sender.commands[i])); + auto req = std::dynamic_pointer_cast<api::RequestBucketInfoCommand>(_sender.commands[i]); - CPPUNIT_ASSERT(req.get()); - CPPUNIT_ASSERT_EQUAL(size_t(1), req->getBuckets().size()); - CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 1234), req->getBuckets()[0]); + ASSERT_TRUE(req.get() != nullptr); + ASSERT_EQ(size_t(1), req->getBuckets().size()); + EXPECT_EQ(document::BucketId(16, 1234), req->getBuckets()[0]); - auto reqreply(std::make_shared<api::RequestBucketInfoReply>(*req)); + auto reqreply = std::make_shared<api::RequestBucketInfoReply>(*req); reqreply->getBucketInfo().push_back( api::RequestBucketInfoReply::Entry( document::BucketId(16, 1234), @@ -1519,17 +1306,14 @@ BucketDBUpdaterTest::testMergeReplyNodeDownAfterRequestSent() getBucketDBUpdater().onRequestBucketInfoReply(reqreply); } - CPPUNIT_ASSERT_EQUAL( - std::string("BucketId(0x40000000000004d2) : " - "node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false,ready=false), " - "node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false)"), - dumpBucket(document::BucketId(16, 1234))); + EXPECT_EQ(std::string("BucketId(0x40000000000004d2) : " + "node(idx=0,crc=0xa,docs=100/100,bytes=1000/1000,trusted=false,active=false,ready=false), " + "node(idx=1,crc=0x14,docs=200/200,bytes=2000/2000,trusted=false,active=false,ready=false)"), + dumpBucket(document::BucketId(16, 1234))); }; -void -BucketDBUpdaterTest::testFlush() -{ +TEST_F(BucketDBUpdaterTest, testFlush) { enableDistributorClusterState("distributor:1 storage:3"); _sender.clear(); @@ -1540,21 +1324,19 @@ BucketDBUpdaterTest::testFlush() nodes.push_back(api::MergeBucketCommand::Node(i)); } - api::MergeBucketCommand cmd(makeDocumentBucket(document::BucketId(16, 1234)), - nodes, - 0); + api::MergeBucketCommand cmd(makeDocumentBucket(document::BucketId(16, 1234)), nodes, 0); auto reply(std::make_shared<api::MergeBucketReply>(cmd)); _sender.clear(); getBucketDBUpdater().onMergeBucketReply(reply); - CPPUNIT_ASSERT_EQUAL(size_t(3), _sender.commands.size()); - CPPUNIT_ASSERT_EQUAL(size_t(0), _senderDown.replies.size()); + ASSERT_EQ(size_t(3), _sender.commands.size()); + ASSERT_EQ(size_t(0), _senderDown.replies.size()); getBucketDBUpdater().flush(); // Flushing should drop all merge bucket replies - CPPUNIT_ASSERT_EQUAL(size_t(0), _senderDown.commands.size()); + EXPECT_EQ(size_t(0), _senderDown.commands.size()); } std::string @@ -1569,8 +1351,7 @@ BucketDBUpdaterTest::getSentNodes( std::ostringstream ost; for (uint32_t i = 0; i < fixture->sender.commands.size(); i++) { - RequestBucketInfoCommand& req(dynamic_cast<RequestBucketInfoCommand&>( - *fixture->sender.commands[i])); + auto& req = dynamic_cast<RequestBucketInfoCommand&>(*fixture->sender.commands[i]); if (i > 0) { ost << ","; @@ -1599,155 +1380,124 @@ BucketDBUpdaterTest::getSentNodesDistributionChanged( std::ostringstream ost; for (uint32_t i = 0; i < sender.commands.size(); i++) { - RequestBucketInfoCommand* req = - dynamic_cast<RequestBucketInfoCommand*>(sender.commands[i].get()); + auto& req = dynamic_cast<RequestBucketInfoCommand&>(*sender.commands[i]); if (i > 0) { ost << ","; } - ost << req->getAddress()->getIndex(); + ost << req.getAddress()->getIndex(); } return ost.str(); } -void -BucketDBUpdaterTest::testPendingClusterStateSendMessages() -{ - CPPUNIT_ASSERT_EQUAL( - getNodeList({0, 1, 2}), - getSentNodes("cluster:d", - "distributor:1 storage:3")); - - CPPUNIT_ASSERT_EQUAL( - getNodeList({0, 1}), - getSentNodes("cluster:d", - "distributor:1 storage:3 .2.s:m")); - - CPPUNIT_ASSERT_EQUAL( - getNodeList({2}), - getSentNodes("distributor:1 storage:2", - "distributor:1 storage:3")); - - CPPUNIT_ASSERT_EQUAL( - getNodeList({2, 3, 4, 5}), - getSentNodes("distributor:1 storage:2", - "distributor:1 storage:6")); - - CPPUNIT_ASSERT_EQUAL( - getNodeList({0, 1, 2}), - getSentNodes("distributor:4 storage:3", - "distributor:3 storage:3")); - - CPPUNIT_ASSERT_EQUAL( - getNodeList({0, 1, 2, 3}), - getSentNodes("distributor:4 storage:3", - "distributor:4 .2.s:d storage:4")); - - CPPUNIT_ASSERT_EQUAL( - std::string(""), - getSentNodes("distributor:4 storage:3", - "distributor:4 .0.s:d storage:4")); - - CPPUNIT_ASSERT_EQUAL( - std::string(""), - getSentNodes("distributor:3 storage:3", - "distributor:4 storage:3")); - - CPPUNIT_ASSERT_EQUAL( - getNodeList({2}), - getSentNodes("distributor:3 storage:3 .2.s:i", - "distributor:3 storage:3")); - - CPPUNIT_ASSERT_EQUAL( - getNodeList({1}), - getSentNodes("distributor:3 storage:3 .1.s:d", - "distributor:3 storage:3")); - - CPPUNIT_ASSERT_EQUAL( - getNodeList({1, 2, 4}), - getSentNodes("distributor:3 storage:4 .1.s:d .2.s:i", - "distributor:3 storage:5")); - - CPPUNIT_ASSERT_EQUAL( - std::string(""), - getSentNodes("distributor:1 storage:3", - "cluster:d")); - - CPPUNIT_ASSERT_EQUAL( - std::string(""), - getSentNodes("distributor:1 storage:3", - "distributor:1 storage:3")); - - CPPUNIT_ASSERT_EQUAL( - std::string(""), - getSentNodes("distributor:1 storage:3", - "cluster:d distributor:1 storage:6")); - - CPPUNIT_ASSERT_EQUAL( - std::string(""), - getSentNodes("distributor:3 storage:3", - "distributor:3 .2.s:m storage:3")); - - CPPUNIT_ASSERT_EQUAL( - getNodeList({0, 1, 2}), - getSentNodes("distributor:3 .2.s:m storage:3", - "distributor:3 .2.s:d storage:3")); - - CPPUNIT_ASSERT_EQUAL( - std::string(""), - getSentNodes("distributor:3 .2.s:m storage:3", - "distributor:3 storage:3")); - - CPPUNIT_ASSERT_EQUAL( - getNodeList({0, 1, 2}), - getSentNodesDistributionChanged("distributor:3 storage:3")); - - CPPUNIT_ASSERT_EQUAL( - getNodeList({0, 1}), - getSentNodes("distributor:10 storage:2", - "distributor:10 .1.s:d storage:2")); - - CPPUNIT_ASSERT_EQUAL( - getNodeList({1}), - getSentNodes("distributor:2 storage:2", - "distributor:2 storage:2 .1.d:3 .1.d.1.s:d")); +TEST_F(BucketDBUpdaterTest, testPendingClusterStateSendMessages) { + EXPECT_EQ(getNodeList({0, 1, 2}), + getSentNodes("cluster:d", + "distributor:1 storage:3")); - CPPUNIT_ASSERT_EQUAL( - getNodeList({1}), - getSentNodes("distributor:2 storage:2 .1.s:d", - "distributor:2 storage:2 .1.d:3 .1.d.1.s:d")); + EXPECT_EQ(getNodeList({0, 1}), + getSentNodes("cluster:d", + "distributor:1 storage:3 .2.s:m")); - CPPUNIT_ASSERT_EQUAL( - std::string(""), - getSentNodes("distributor:2 storage:2", - "distributor:3 .2.s:i storage:2")); + EXPECT_EQ(getNodeList({2}), + getSentNodes("distributor:1 storage:2", + "distributor:1 storage:3")); - CPPUNIT_ASSERT_EQUAL( - getNodeList({0, 1, 2}), - getSentNodes("distributor:3 storage:3", - "distributor:3 .2.s:s storage:3")); + EXPECT_EQ(getNodeList({2, 3, 4, 5}), + getSentNodes("distributor:1 storage:2", + "distributor:1 storage:6")); - CPPUNIT_ASSERT_EQUAL( - std::string(""), - getSentNodes("distributor:3 .2.s:s storage:3", - "distributor:3 .2.s:d storage:3")); + EXPECT_EQ(getNodeList({0, 1, 2}), + getSentNodes("distributor:4 storage:3", + "distributor:3 storage:3")); - CPPUNIT_ASSERT_EQUAL( - getNodeList({1}), - getSentNodes("distributor:3 storage:3 .1.s:m", - "distributor:3 storage:3")); - - CPPUNIT_ASSERT_EQUAL( - std::string(""), - getSentNodes("distributor:3 storage:3", - "distributor:3 storage:3 .1.s:m")); + EXPECT_EQ(getNodeList({0, 1, 2, 3}), + getSentNodes("distributor:4 storage:3", + "distributor:4 .2.s:d storage:4")); + + EXPECT_EQ(std::string(""), + getSentNodes("distributor:4 storage:3", + "distributor:4 .0.s:d storage:4")); + + EXPECT_EQ(std::string(""), + getSentNodes("distributor:3 storage:3", + "distributor:4 storage:3")); + + EXPECT_EQ(getNodeList({2}), + getSentNodes("distributor:3 storage:3 .2.s:i", + "distributor:3 storage:3")); + + EXPECT_EQ(getNodeList({1}), + getSentNodes("distributor:3 storage:3 .1.s:d", + "distributor:3 storage:3")); + + EXPECT_EQ(getNodeList({1, 2, 4}), + getSentNodes("distributor:3 storage:4 .1.s:d .2.s:i", + "distributor:3 storage:5")); + + EXPECT_EQ(std::string(""), + getSentNodes("distributor:1 storage:3", + "cluster:d")); + + EXPECT_EQ(std::string(""), + getSentNodes("distributor:1 storage:3", + "distributor:1 storage:3")); + + EXPECT_EQ(std::string(""), + getSentNodes("distributor:1 storage:3", + "cluster:d distributor:1 storage:6")); + + EXPECT_EQ(std::string(""), + getSentNodes("distributor:3 storage:3", + "distributor:3 .2.s:m storage:3")); + + EXPECT_EQ(getNodeList({0, 1, 2}), + getSentNodes("distributor:3 .2.s:m storage:3", + "distributor:3 .2.s:d storage:3")); + + EXPECT_EQ(std::string(""), + getSentNodes("distributor:3 .2.s:m storage:3", + "distributor:3 storage:3")); + + EXPECT_EQ(getNodeList({0, 1, 2}), + getSentNodesDistributionChanged("distributor:3 storage:3")); + + EXPECT_EQ(getNodeList({0, 1}), + getSentNodes("distributor:10 storage:2", + "distributor:10 .1.s:d storage:2")); + + EXPECT_EQ(getNodeList({1}), + getSentNodes("distributor:2 storage:2", + "distributor:2 storage:2 .1.d:3 .1.d.1.s:d")); + + EXPECT_EQ(getNodeList({1}), + getSentNodes("distributor:2 storage:2 .1.s:d", + "distributor:2 storage:2 .1.d:3 .1.d.1.s:d")); + + EXPECT_EQ(std::string(""), + getSentNodes("distributor:2 storage:2", + "distributor:3 .2.s:i storage:2")); + + EXPECT_EQ(getNodeList({0, 1, 2}), + getSentNodes("distributor:3 storage:3", + "distributor:3 .2.s:s storage:3")); + + EXPECT_EQ(std::string(""), + getSentNodes("distributor:3 .2.s:s storage:3", + "distributor:3 .2.s:d storage:3")); + + EXPECT_EQ(getNodeList({1}), + getSentNodes("distributor:3 storage:3 .1.s:m", + "distributor:3 storage:3")); + + EXPECT_EQ(std::string(""), + getSentNodes("distributor:3 storage:3", + "distributor:3 storage:3 .1.s:m")); }; -void -BucketDBUpdaterTest::testPendingClusterStateReceive() -{ +TEST_F(BucketDBUpdaterTest, testPendingClusterStateReceive) { MessageSenderStub sender; auto cmd(std::make_shared<api::SetSystemStateCommand>( @@ -1761,38 +1511,31 @@ BucketDBUpdaterTest::testPendingClusterStateReceive() clock, clusterInfo, sender, getBucketSpaceRepo(), getReadOnlyBucketSpaceRepo(), cmd, outdatedNodesMap, api::Timestamp(1))); - CPPUNIT_ASSERT_EQUAL(messageCount(3), sender.commands.size()); + ASSERT_EQ(messageCount(3), sender.commands.size()); sortSentMessagesByIndex(sender); std::ostringstream ost; for (uint32_t i = 0; i < sender.commands.size(); i++) { - RequestBucketInfoCommand* req = - dynamic_cast<RequestBucketInfoCommand*>(sender.commands[i].get()); + auto* req = dynamic_cast<RequestBucketInfoCommand*>(sender.commands[i].get()); + ASSERT_TRUE(req != nullptr); - RequestBucketInfoReply* rep = - new RequestBucketInfoReply(*req); + auto rep = std::make_shared<RequestBucketInfoReply>(*req); rep->getBucketInfo().push_back( RequestBucketInfoReply::Entry( document::BucketId(16, i), api::BucketInfo(i, i, i, i, i))); - CPPUNIT_ASSERT( - state->onRequestBucketInfoReply( - std::shared_ptr<api::RequestBucketInfoReply>(rep))); - - CPPUNIT_ASSERT_EQUAL(i == sender.commands.size() - 1 ? true : false, - state->done()); + ASSERT_TRUE(state->onRequestBucketInfoReply(rep)); + ASSERT_EQ((i == (sender.commands.size() - 1)), state->done()); } - auto &pendingTransition = state->getPendingBucketSpaceDbTransition(makeBucketSpace()); - CPPUNIT_ASSERT_EQUAL(3, (int)pendingTransition.results().size()); + auto& pendingTransition = state->getPendingBucketSpaceDbTransition(makeBucketSpace()); + EXPECT_EQ(3, (int)pendingTransition.results().size()); } -void -BucketDBUpdaterTest::testPendingClusterStateWithGroupDown() -{ +TEST_F(BucketDBUpdaterTest, testPendingClusterStateWithGroupDown) { std::string config(getDistConfig6Nodes4Groups()); config += "distributor_auto_ownership_transfer_on_whole_group_down true\n"; setDistribution(config); @@ -1801,30 +1544,25 @@ BucketDBUpdaterTest::testPendingClusterStateWithGroupDown() // We're node index 0. // Entire group 1 goes down. Must refetch from all nodes. - CPPUNIT_ASSERT_EQUAL( - getNodeList({0, 1, 2, 3, 4, 5}), - getSentNodes("distributor:6 storage:6", - "distributor:6 .2.s:d .3.s:d storage:6")); + EXPECT_EQ(getNodeList({0, 1, 2, 3, 4, 5}), + getSentNodes("distributor:6 storage:6", + "distributor:6 .2.s:d .3.s:d storage:6")); // But don't fetch if not the entire group is down. - CPPUNIT_ASSERT_EQUAL( - std::string(""), - getSentNodes("distributor:6 storage:6", - "distributor:6 .2.s:d storage:6")); + EXPECT_EQ(std::string(""), + getSentNodes("distributor:6 storage:6", + "distributor:6 .2.s:d storage:6")); } -void -BucketDBUpdaterTest::testPendingClusterStateWithGroupDownAndNoHandover() -{ +TEST_F(BucketDBUpdaterTest, testPendingClusterStateWithGroupDownAndNoHandover) { std::string config(getDistConfig6Nodes4Groups()); config += "distributor_auto_ownership_transfer_on_whole_group_down false\n"; setDistribution(config); // Group is down, but config says to not do anything about it. - CPPUNIT_ASSERT_EQUAL( - getNodeList({0, 1, 2, 3, 4, 5}, _bucketSpaces.size() - 1), - getSentNodes("distributor:6 storage:6", - "distributor:6 .2.s:d .3.s:d storage:6")); + EXPECT_EQ(getNodeList({0, 1, 2, 3, 4, 5}, _bucketSpaces.size() - 1), + getSentNodes("distributor:6 storage:6", + "distributor:6 .2.s:d .3.s:d storage:6")); } void @@ -1840,7 +1578,7 @@ parseInputData(const std::string& data, uint16_t node = atoi(tok2[0].data()); state.setNodeReplied(node); - auto &pendingTransition = state.getPendingBucketSpaceDbTransition(makeBucketSpace()); + auto& pendingTransition = state.getPendingBucketSpaceDbTransition(makeBucketSpace()); vespalib::StringTokenizer tok3(tok2[1], ","); for (uint32_t j = 0; j < tok3.size(); j++) { @@ -1874,7 +1612,7 @@ struct BucketDumper : public BucketDatabase::EntryProcessor std::ostringstream ost; bool _includeBucketInfo; - BucketDumper(bool includeBucketInfo) + explicit BucketDumper(bool includeBucketInfo) : _includeBucketInfo(includeBucketInfo) { } @@ -1969,11 +1707,9 @@ BucketDBUpdaterTest::mergeBucketLists(const std::string& existingData, includeBucketInfo); } -void -BucketDBUpdaterTest::testPendingClusterStateMerge() -{ +TEST_F(BucketDBUpdaterTest, testPendingClusterStateMerge) { // Simple initializing case - ask all nodes for info - CPPUNIT_ASSERT_EQUAL( + EXPECT_EQ( // Result is on the form: [bucket w/o count bits]:[node indexes]|.. std::string("4:0,1|2:0,1|6:1,2|1:0,2|5:2,0|3:2,1|"), // Input is on the form: [node]:[bucket w/o count bits]|... @@ -1982,7 +1718,7 @@ BucketDBUpdaterTest::testPendingClusterStateMerge() "0:1,2,4,5|1:2,3,4,6|2:1,3,5,6")); // Node came up with fewer buckets (lost disk) - CPPUNIT_ASSERT_EQUAL( + EXPECT_EQ( std::string("4:1|2:0,1|6:1,2|1:2,0|5:2|3:2,1|"), mergeBucketLists( lib::ClusterState("distributor:1 storage:3"), @@ -1992,7 +1728,7 @@ BucketDBUpdaterTest::testPendingClusterStateMerge() ); // New node came up - CPPUNIT_ASSERT_EQUAL( + EXPECT_EQ( std::string("4:0,1|2:0,1|6:1,2,3|1:0,2,3|5:2,0,3|3:2,1,3|"), mergeBucketLists( "0:1,2,4,5|1:2,3,4,6|2:1,3,5,6", @@ -2001,14 +1737,14 @@ BucketDBUpdaterTest::testPendingClusterStateMerge() // Node came up with some buckets removed and some added // Buckets that were removed should not be removed as the node // didn't lose a disk. - CPPUNIT_ASSERT_EQUAL( + EXPECT_EQ( std::string("8:0|4:0,1|2:0,1|6:1,0,2|1:0,2|5:2,0|3:2,1|"), mergeBucketLists( "0:1,2,4,5|1:2,3,4,6|2:1,3,5,6", "0:1,2,6,8")); // Node came up with no buckets - CPPUNIT_ASSERT_EQUAL( + EXPECT_EQ( std::string("4:1|2:1|6:1,2|1:2|5:2|3:2,1|"), mergeBucketLists( lib::ClusterState("distributor:1 storage:3"), @@ -2019,7 +1755,7 @@ BucketDBUpdaterTest::testPendingClusterStateMerge() // One node lost a disk, another was just reasked (distributor // change) - CPPUNIT_ASSERT_EQUAL( + EXPECT_EQ( std::string("2:0,1|6:1,2|1:2,0|5:2|3:2,1|"), mergeBucketLists( lib::ClusterState("distributor:1 storage:3"), @@ -2030,23 +1766,20 @@ BucketDBUpdaterTest::testPendingClusterStateMerge() // Bucket info format is "bucketid/checksum/count/size" // Node went from initializing to up and invalid bucket went to empty. - CPPUNIT_ASSERT_EQUAL( + EXPECT_EQ( std::string("2:0/0/0/0/t|"), mergeBucketLists( "0:2/0/0/1", "0:2/0/0/0", true)); - CPPUNIT_ASSERT_EQUAL( - std::string("5:1/2/3/4/u,0/0/0/0/u|"), - mergeBucketLists("", "0:5/0/0/0|1:5/2/3/4", true)); + EXPECT_EQ(std::string("5:1/2/3/4/u,0/0/0/0/u|"), + mergeBucketLists("", "0:5/0/0/0|1:5/2/3/4", true)); } -void -BucketDBUpdaterTest::testPendingClusterStateMergeReplicaChanged() -{ +TEST_F(BucketDBUpdaterTest, testPendingClusterStateMergeReplicaChanged) { // Node went from initializing to up and non-invalid bucket changed. - CPPUNIT_ASSERT_EQUAL( + EXPECT_EQ( std::string("2:0/2/3/4/t|3:0/2/4/6/t|"), mergeBucketLists( lib::ClusterState("distributor:1 storage:1 .0.s:i"), @@ -2056,20 +1789,18 @@ BucketDBUpdaterTest::testPendingClusterStateMergeReplicaChanged() true)); } -void -BucketDBUpdaterTest::testNoDbResurrectionForBucketNotOwnedInCurrentState() -{ +TEST_F(BucketDBUpdaterTest, testNoDbResurrectionForBucketNotOwnedInCurrentState) { document::BucketId bucket(16, 3); lib::ClusterState stateBefore("distributor:1 storage:1"); { uint32_t expectedMsgs = _bucketSpaces.size(), dummyBucketsToReturn = 1; - setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn); + ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn)); } _sender.clear(); getBucketDBUpdater().recheckBucketInfo(0, makeDocumentBucket(bucket)); - CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); + ASSERT_EQ(size_t(1), _sender.commands.size()); std::shared_ptr<api::RequestBucketInfoCommand> rbi( std::dynamic_pointer_cast<RequestBucketInfoCommand>( _sender.commands[0])); @@ -2078,30 +1809,28 @@ BucketDBUpdaterTest::testNoDbResurrectionForBucketNotOwnedInCurrentState() { uint32_t expectedMsgs = messageCount(2), dummyBucketsToReturn = 1; - setAndEnableClusterState(stateAfter, expectedMsgs, dummyBucketsToReturn); + ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(stateAfter, expectedMsgs, dummyBucketsToReturn)); } - CPPUNIT_ASSERT(!getBucketDBUpdater().getDistributorComponent() - .ownsBucketInCurrentState(makeDocumentBucket(bucket))); + EXPECT_FALSE(getBucketDBUpdater().getDistributorComponent() + .ownsBucketInCurrentState(makeDocumentBucket(bucket))); - sendFakeReplyForSingleBucketRequest(*rbi); + ASSERT_NO_FATAL_FAILURE(sendFakeReplyForSingleBucketRequest(*rbi)); - CPPUNIT_ASSERT_EQUAL(std::string("NONEXISTING"), dumpBucket(bucket)); + EXPECT_EQ(std::string("NONEXISTING"), dumpBucket(bucket)); } -void -BucketDBUpdaterTest::testNoDbResurrectionForBucketNotOwnedInPendingState() -{ +TEST_F(BucketDBUpdaterTest, testNoDbResurrectionForBucketNotOwnedInPendingState) { document::BucketId bucket(16, 3); lib::ClusterState stateBefore("distributor:1 storage:1"); { uint32_t expectedMsgs = _bucketSpaces.size(), dummyBucketsToReturn = 1; - setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn); + ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn)); } _sender.clear(); getBucketDBUpdater().recheckBucketInfo(0, makeDocumentBucket(bucket)); - CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.commands.size()); + ASSERT_EQ(size_t(1), _sender.commands.size()); std::shared_ptr<api::RequestBucketInfoCommand> rbi( std::dynamic_pointer_cast<RequestBucketInfoCommand>( _sender.commands[0])); @@ -2109,14 +1838,14 @@ BucketDBUpdaterTest::testNoDbResurrectionForBucketNotOwnedInPendingState() lib::ClusterState stateAfter("distributor:3 storage:3"); // Set, but _don't_ enable cluster state. We want it to be pending. setSystemState(stateAfter); - CPPUNIT_ASSERT(getBucketDBUpdater().getDistributorComponent() - .ownsBucketInCurrentState(makeDocumentBucket(bucket))); - CPPUNIT_ASSERT(!getBucketDBUpdater() - .checkOwnershipInPendingState(makeDocumentBucket(bucket)).isOwned()); + EXPECT_TRUE(getBucketDBUpdater().getDistributorComponent() + .ownsBucketInCurrentState(makeDocumentBucket(bucket))); + EXPECT_FALSE(getBucketDBUpdater() + .checkOwnershipInPendingState(makeDocumentBucket(bucket)).isOwned()); - sendFakeReplyForSingleBucketRequest(*rbi); + ASSERT_NO_FATAL_FAILURE(sendFakeReplyForSingleBucketRequest(*rbi)); - CPPUNIT_ASSERT_EQUAL(std::string("NONEXISTING"), dumpBucket(bucket)); + EXPECT_EQ(std::string("NONEXISTING"), dumpBucket(bucket)); } /* @@ -2126,109 +1855,101 @@ BucketDBUpdaterTest::testNoDbResurrectionForBucketNotOwnedInPendingState() * will with a high likelihood end up not getting the complete view of the buckets in * the cluster. */ -void -BucketDBUpdaterTest::testClusterStateAlwaysSendsFullFetchWhenDistributionChangePending() -{ +TEST_F(BucketDBUpdaterTest, testClusterStateAlwaysSendsFullFetchWhenDistributionChangePending) { lib::ClusterState stateBefore("distributor:6 storage:6"); { uint32_t expectedMsgs = messageCount(6), dummyBucketsToReturn = 1; - setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn); + ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn)); } _sender.clear(); std::string distConfig(getDistConfig6Nodes2Groups()); setDistribution(distConfig); sortSentMessagesByIndex(_sender); - CPPUNIT_ASSERT_EQUAL(messageCount(6), _sender.commands.size()); + ASSERT_EQ(messageCount(6), _sender.commands.size()); // Suddenly, a wild cluster state change appears! Even though this state // does not in itself imply any bucket changes, it will still overwrite the // pending cluster state and thus its state of pending bucket info requests. setSystemState(lib::ClusterState("distributor:6 .2.t:12345 storage:6")); - CPPUNIT_ASSERT_EQUAL(messageCount(12), _sender.commands.size()); + ASSERT_EQ(messageCount(12), _sender.commands.size()); // Send replies for first messageCount(6) (outdated requests). int numBuckets = 10; for (uint32_t i = 0; i < messageCount(6); ++i) { - fakeBucketReply(lib::ClusterState("distributor:6 storage:6"), - *_sender.commands[i], numBuckets); + ASSERT_NO_FATAL_FAILURE(fakeBucketReply(lib::ClusterState("distributor:6 storage:6"), + *_sender.commands[i], numBuckets)); } // No change from these. - assertCorrectBuckets(1, "distributor:6 storage:6"); + ASSERT_NO_FATAL_FAILURE(assertCorrectBuckets(1, "distributor:6 storage:6")); // Send for current pending. for (uint32_t i = 0; i < messageCount(6); ++i) { - fakeBucketReply(lib::ClusterState("distributor:6 .2.t:12345 storage:6"), - *_sender.commands[i + messageCount(6)], - numBuckets); + ASSERT_NO_FATAL_FAILURE(fakeBucketReply(lib::ClusterState("distributor:6 .2.t:12345 storage:6"), + *_sender.commands[i + messageCount(6)], + numBuckets)); } - assertCorrectBuckets(numBuckets, "distributor:6 storage:6"); + ASSERT_NO_FATAL_FAILURE(assertCorrectBuckets(numBuckets, "distributor:6 storage:6")); _sender.clear(); // No more pending global fetch; this should be a no-op state. setSystemState(lib::ClusterState("distributor:6 .3.t:12345 storage:6")); - CPPUNIT_ASSERT_EQUAL(size_t(0), _sender.commands.size()); + EXPECT_EQ(size_t(0), _sender.commands.size()); } -void -BucketDBUpdaterTest::testChangedDistributionConfigTriggersRecoveryMode() -{ - setAndEnableClusterState(lib::ClusterState("distributor:6 storage:6"), messageCount(6), 20); +TEST_F(BucketDBUpdaterTest, testChangedDistributionConfigTriggersRecoveryMode) { + ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("distributor:6 storage:6"), messageCount(6), 20)); _sender.clear(); // First cluster state; implicit scan of all buckets which does not // use normal recovery mode ticking-path. - CPPUNIT_ASSERT(!_distributor->isInRecoveryMode()); + EXPECT_FALSE(_distributor->isInRecoveryMode()); std::string distConfig(getDistConfig6Nodes4Groups()); setDistribution(distConfig); sortSentMessagesByIndex(_sender); // No replies received yet, still no recovery mode. - CPPUNIT_ASSERT(!_distributor->isInRecoveryMode()); + EXPECT_FALSE(_distributor->isInRecoveryMode()); - CPPUNIT_ASSERT_EQUAL(messageCount(6), _sender.commands.size()); + ASSERT_EQ(messageCount(6), _sender.commands.size()); uint32_t numBuckets = 10; for (uint32_t i = 0; i < messageCount(6); ++i) { - fakeBucketReply(lib::ClusterState("distributor:6 storage:6"), - *_sender.commands[i], numBuckets); + ASSERT_NO_FATAL_FAILURE(fakeBucketReply(lib::ClusterState("distributor:6 storage:6"), + *_sender.commands[i], numBuckets)); } // Pending cluster state (i.e. distribution) has been enabled, which should // cause recovery mode to be entered. - CPPUNIT_ASSERT(_distributor->isInRecoveryMode()); + EXPECT_TRUE(_distributor->isInRecoveryMode()); } -void -BucketDBUpdaterTest::testNewlyAddedBucketsHaveCurrentTimeAsGcTimestamp() -{ +TEST_F(BucketDBUpdaterTest, testNewlyAddedBucketsHaveCurrentTimeAsGcTimestamp) { getClock().setAbsoluteTimeInSeconds(101234); lib::ClusterState stateBefore("distributor:1 storage:1"); { uint32_t expectedMsgs = _bucketSpaces.size(), dummyBucketsToReturn = 1; - setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn); + ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn)); } // setAndEnableClusterState adds n buckets with id (16, i) document::BucketId bucket(16, 0); BucketDatabase::Entry e(getBucket(bucket)); - CPPUNIT_ASSERT(e.valid()); - CPPUNIT_ASSERT_EQUAL(uint32_t(101234), e->getLastGarbageCollectionTime()); + ASSERT_TRUE(e.valid()); + EXPECT_EQ(uint32_t(101234), e->getLastGarbageCollectionTime()); } -void -BucketDBUpdaterTest::testNewerMutationsNotOverwrittenByEarlierBucketFetch() -{ +TEST_F(BucketDBUpdaterTest, testNewerMutationsNotOverwrittenByEarlierBucketFetch) { { lib::ClusterState stateBefore("distributor:1 storage:1 .0.s:i"); uint32_t expectedMsgs = _bucketSpaces.size(), dummyBucketsToReturn = 0; // This step is required to make the distributor ready for accepting // the below explicit database insertion towards node 0. - setAndEnableClusterState(stateBefore, expectedMsgs, - dummyBucketsToReturn); + ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(stateBefore, expectedMsgs, + dummyBucketsToReturn)); } _sender.clear(); getClock().setAbsoluteTimeInSeconds(1000); lib::ClusterState state("distributor:1 storage:1"); setSystemState(state); - CPPUNIT_ASSERT_EQUAL(_bucketSpaces.size(), _sender.commands.size()); + ASSERT_EQ(_bucketSpaces.size(), _sender.commands.size()); // Before replying with the bucket info, simulate the arrival of a mutation // reply that alters the state of the bucket with information that will be @@ -2253,13 +1974,12 @@ BucketDBUpdaterTest::testNewerMutationsNotOverwrittenByEarlierBucketFetch() // correctness, as this should contain the same bucket info as that // contained in the full bucket reply and the DB update is thus idempotent. for (uint32_t i = 0; i < _bucketSpaces.size(); ++i) { - fakeBucketReply(state, *_sender.commands[i], bucketsReturned); + ASSERT_NO_FATAL_FAILURE(fakeBucketReply(state, *_sender.commands[i], bucketsReturned)); } BucketDatabase::Entry e(getBucket(bucket)); - CPPUNIT_ASSERT_EQUAL(uint32_t(1), e->getNodeCount()); - CPPUNIT_ASSERT_EQUAL(wantedInfo, e->getNodeRef(0).getBucketInfo()); - + ASSERT_EQ(uint32_t(1), e->getNodeCount()); + EXPECT_EQ(wantedInfo, e->getNodeRef(0).getBucketInfo()); } std::vector<uint16_t> @@ -2286,9 +2006,11 @@ BucketDBUpdaterTest::getSentNodesWithPreemption( { lib::ClusterState stateBefore(oldClusterState); uint32_t dummyBucketsToReturn = 10; + // FIXME cannot chain assertion checks in non-void function setAndEnableClusterState(lib::ClusterState(oldClusterState), expectedOldStateMessages, dummyBucketsToReturn); + _sender.clear(); setSystemState(lib::ClusterState(preemptedClusterState)); @@ -2309,10 +2031,8 @@ using nodeVec = std::vector<uint16_t>; * database modifications caused by intermediate states will not be * accounted for (basically the ABA problem in a distributed setting). */ -void -BucketDBUpdaterTest::preemptedDistrChangeCarriesNodeSetOverToNextStateFetch() -{ - CPPUNIT_ASSERT_EQUAL( +TEST_F(BucketDBUpdaterTest, preemptedDistrChangeCarriesNodeSetOverToNextStateFetch) { + EXPECT_EQ( expandNodeVec({0, 1, 2, 3, 4, 5}), getSentNodesWithPreemption("version:1 distributor:6 storage:6", messageCount(6), @@ -2320,10 +2040,8 @@ BucketDBUpdaterTest::preemptedDistrChangeCarriesNodeSetOverToNextStateFetch() "version:3 distributor:6 storage:6")); } -void -BucketDBUpdaterTest::preemptedStorChangeCarriesNodeSetOverToNextStateFetch() -{ - CPPUNIT_ASSERT_EQUAL( +TEST_F(BucketDBUpdaterTest, preemptedStorChangeCarriesNodeSetOverToNextStateFetch) { + EXPECT_EQ( expandNodeVec({2, 3}), getSentNodesWithPreemption( "version:1 distributor:6 storage:6 .2.s:d", @@ -2332,10 +2050,8 @@ BucketDBUpdaterTest::preemptedStorChangeCarriesNodeSetOverToNextStateFetch() "version:3 distributor:6 storage:6")); } -void -BucketDBUpdaterTest::preemptedStorageNodeDownMustBeReFetched() -{ - CPPUNIT_ASSERT_EQUAL( +TEST_F(BucketDBUpdaterTest, preemptedStorageNodeDownMustBeReFetched) { + EXPECT_EQ( expandNodeVec({2}), getSentNodesWithPreemption( "version:1 distributor:6 storage:6", @@ -2344,10 +2060,8 @@ BucketDBUpdaterTest::preemptedStorageNodeDownMustBeReFetched() "version:3 distributor:6 storage:6")); } -void -BucketDBUpdaterTest::doNotSendToPreemptedNodeNowInDownState() -{ - CPPUNIT_ASSERT_EQUAL( +TEST_F(BucketDBUpdaterTest, doNotSendToPreemptedNodeNowInDownState) { + EXPECT_EQ( nodeVec{}, getSentNodesWithPreemption( "version:1 distributor:6 storage:6 .2.s:d", @@ -2356,12 +2070,10 @@ BucketDBUpdaterTest::doNotSendToPreemptedNodeNowInDownState() "version:3 distributor:6 storage:6 .2.s:d")); // 2 down again. } -void -BucketDBUpdaterTest::doNotSendToPreemptedNodeNotPartOfNewState() -{ +TEST_F(BucketDBUpdaterTest, doNotSendToPreemptedNodeNotPartOfNewState) { // Even though 100 nodes are preempted, not all of these should be part // of the request afterwards when only 6 are part of the state. - CPPUNIT_ASSERT_EQUAL( + EXPECT_EQ( expandNodeVec({0, 1, 2, 3, 4, 5}), getSentNodesWithPreemption( "version:1 distributor:6 storage:100", @@ -2370,20 +2082,18 @@ BucketDBUpdaterTest::doNotSendToPreemptedNodeNotPartOfNewState() "version:3 distributor:6 storage:6")); } -void -BucketDBUpdaterTest::outdatedNodeSetClearedAfterSuccessfulStateCompletion() -{ +TEST_F(BucketDBUpdaterTest, outdatedNodeSetClearedAfterSuccessfulStateCompletion) { lib::ClusterState stateBefore( "version:1 distributor:6 storage:6 .1.t:1234"); uint32_t expectedMsgs = messageCount(6), dummyBucketsToReturn = 10; - setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn); + ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(stateBefore, expectedMsgs, dummyBucketsToReturn)); _sender.clear(); // New cluster state that should not by itself trigger any new fetches, // unless outdated node set is somehow not cleared after an enabled // (completed) cluster state has been set. lib::ClusterState stateAfter("version:3 distributor:6 storage:6"); setSystemState(stateAfter); - CPPUNIT_ASSERT_EQUAL(size_t(0), _sender.commands.size()); + EXPECT_EQ(size_t(0), _sender.commands.size()); } // XXX test currently disabled since distribution config currently isn't used @@ -2392,12 +2102,10 @@ BucketDBUpdaterTest::outdatedNodeSetClearedAfterSuccessfulStateCompletion() // distribution config will follow very shortly after the config has been // applied to the node. The new cluster state will then send out requests to // the correct node set. -void -BucketDBUpdaterTest::clusterConfigDownsizeOnlySendsToAvailableNodes() -{ +TEST_F(BucketDBUpdaterTest, DISABLED_clusterConfigDownsizeOnlySendsToAvailableNodes) { uint32_t expectedMsgs = 6, dummyBucketsToReturn = 20; - setAndEnableClusterState(lib::ClusterState("distributor:6 storage:6"), - expectedMsgs, dummyBucketsToReturn); + ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("distributor:6 storage:6"), + expectedMsgs, dummyBucketsToReturn)); _sender.clear(); // Intentionally trigger a racing config change which arrives before the @@ -2406,14 +2114,12 @@ BucketDBUpdaterTest::clusterConfigDownsizeOnlySendsToAvailableNodes() setDistribution(distConfig); sortSentMessagesByIndex(_sender); - CPPUNIT_ASSERT_EQUAL((nodeVec{0, 1, 2}), getSendSet()); + EXPECT_EQ((nodeVec{0, 1, 2}), getSendSet()); } -void -BucketDBUpdaterTest::changedDiskSetTriggersReFetch() -{ +TEST_F(BucketDBUpdaterTest, changedDiskSetTriggersReFetch) { // Same number of online disks, but the set of disks has changed. - CPPUNIT_ASSERT_EQUAL( + EXPECT_EQ( getNodeList({1}), getSentNodes("distributor:2 storage:2 .1.d:3 .1.d.2.s:d", "distributor:2 storage:2 .1.d:3 .1.d.1.s:d")); @@ -2427,12 +2133,10 @@ BucketDBUpdaterTest::changedDiskSetTriggersReFetch() * * See VESPA-790 for details. */ -void -BucketDBUpdaterTest::nodeMissingFromConfigIsTreatedAsNeedingOwnershipTransfer() -{ +TEST_F(BucketDBUpdaterTest, nodeMissingFromConfigIsTreatedAsNeedingOwnershipTransfer) { uint32_t expectedMsgs = messageCount(3), dummyBucketsToReturn = 1; - setAndEnableClusterState(lib::ClusterState("distributor:3 storage:3"), - expectedMsgs, dummyBucketsToReturn); + ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("distributor:3 storage:3"), + expectedMsgs, dummyBucketsToReturn)); _sender.clear(); // Cluster goes from {0, 1, 2} -> {0, 1}. This leaves us with a config @@ -2459,78 +2163,64 @@ BucketDBUpdaterTest::nodeMissingFromConfigIsTreatedAsNeedingOwnershipTransfer() // Attempt to apply state with {0, 1} set. This will compare the new state // with the previous state, which still has node 2. expectedMsgs = messageCount(2); - setAndEnableClusterState(lib::ClusterState("distributor:2 storage:2"), - expectedMsgs, dummyBucketsToReturn); + ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("distributor:2 storage:2"), + expectedMsgs, dummyBucketsToReturn)); - CPPUNIT_ASSERT_EQUAL(expandNodeVec({0, 1}), getSendSet()); + EXPECT_EQ(expandNodeVec({0, 1}), getSendSet()); } -void -BucketDBUpdaterTest::changed_distributor_set_implies_ownership_transfer() -{ +TEST_F(BucketDBUpdaterTest, changed_distributor_set_implies_ownership_transfer) { auto fixture = createPendingStateFixtureForStateChange( "distributor:2 storage:2", "distributor:1 storage:2"); - CPPUNIT_ASSERT(fixture->state->hasBucketOwnershipTransfer()); + EXPECT_TRUE(fixture->state->hasBucketOwnershipTransfer()); fixture = createPendingStateFixtureForStateChange( "distributor:2 storage:2", "distributor:2 .1.s:d storage:2"); - CPPUNIT_ASSERT(fixture->state->hasBucketOwnershipTransfer()); + EXPECT_TRUE(fixture->state->hasBucketOwnershipTransfer()); } -void -BucketDBUpdaterTest::unchanged_distributor_set_implies_no_ownership_transfer() -{ +TEST_F(BucketDBUpdaterTest, unchanged_distributor_set_implies_no_ownership_transfer) { auto fixture = createPendingStateFixtureForStateChange( "distributor:2 storage:2", "distributor:2 storage:1"); - CPPUNIT_ASSERT(!fixture->state->hasBucketOwnershipTransfer()); + EXPECT_FALSE(fixture->state->hasBucketOwnershipTransfer()); fixture = createPendingStateFixtureForStateChange( "distributor:2 storage:2", "distributor:2 storage:2 .1.s:d"); - CPPUNIT_ASSERT(!fixture->state->hasBucketOwnershipTransfer()); + EXPECT_FALSE(fixture->state->hasBucketOwnershipTransfer()); } -void -BucketDBUpdaterTest::changed_distribution_config_implies_ownership_transfer() -{ +TEST_F(BucketDBUpdaterTest, changed_distribution_config_implies_ownership_transfer) { auto fixture = createPendingStateFixtureForDistributionChange( "distributor:2 storage:2"); - CPPUNIT_ASSERT(fixture->state->hasBucketOwnershipTransfer()); + EXPECT_TRUE(fixture->state->hasBucketOwnershipTransfer()); } -void -BucketDBUpdaterTest::transition_time_tracked_for_single_state_change() -{ - completeStateTransitionInSeconds("distributor:2 storage:2", 5, messageCount(2)); +TEST_F(BucketDBUpdaterTest, transition_time_tracked_for_single_state_change) { + ASSERT_NO_FATAL_FAILURE(completeStateTransitionInSeconds("distributor:2 storage:2", 5, messageCount(2))); - CPPUNIT_ASSERT_EQUAL(uint64_t(5000), lastTransitionTimeInMillis()); + EXPECT_EQ(uint64_t(5000), lastTransitionTimeInMillis()); } -void -BucketDBUpdaterTest::transition_time_reset_across_non_preempting_state_changes() -{ - completeStateTransitionInSeconds("distributor:2 storage:2", 5, messageCount(2)); - completeStateTransitionInSeconds("distributor:2 storage:3", 3, messageCount(1)); +TEST_F(BucketDBUpdaterTest, transition_time_reset_across_non_preempting_state_changes) { + ASSERT_NO_FATAL_FAILURE(completeStateTransitionInSeconds("distributor:2 storage:2", 5, messageCount(2))); + ASSERT_NO_FATAL_FAILURE(completeStateTransitionInSeconds("distributor:2 storage:3", 3, messageCount(1))); - CPPUNIT_ASSERT_EQUAL(uint64_t(3000), lastTransitionTimeInMillis()); + EXPECT_EQ(uint64_t(3000), lastTransitionTimeInMillis()); } -void -BucketDBUpdaterTest::transition_time_tracked_for_distribution_config_change() -{ +TEST_F(BucketDBUpdaterTest, transition_time_tracked_for_distribution_config_change) { lib::ClusterState state("distributor:2 storage:2"); - setAndEnableClusterState(state, messageCount(2), 1); + ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(state, messageCount(2), 1)); _sender.clear(); std::string distConfig(getDistConfig3Nodes1Group()); setDistribution(distConfig); getClock().addSecondsToTime(4); - completeBucketInfoGathering(state, messageCount(2)); - CPPUNIT_ASSERT_EQUAL(uint64_t(4000), lastTransitionTimeInMillis()); + ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(state, messageCount(2))); + EXPECT_EQ(uint64_t(4000), lastTransitionTimeInMillis()); } -void -BucketDBUpdaterTest::transition_time_tracked_across_preempted_transitions() -{ +TEST_F(BucketDBUpdaterTest, transition_time_tracked_across_preempted_transitions) { _sender.clear(); lib::ClusterState state("distributor:2 storage:2"); setSystemState(state); @@ -2538,9 +2228,9 @@ BucketDBUpdaterTest::transition_time_tracked_across_preempted_transitions() // Pre-empted with new state here, which will push out the old pending // state and replace it with a new one. We should still count the time // used processing the old state. - completeStateTransitionInSeconds("distributor:2 storage:3", 3, messageCount(3)); + ASSERT_NO_FATAL_FAILURE(completeStateTransitionInSeconds("distributor:2 storage:3", 3, messageCount(3))); - CPPUNIT_ASSERT_EQUAL(uint64_t(8000), lastTransitionTimeInMillis()); + EXPECT_EQ(uint64_t(8000), lastTransitionTimeInMillis()); } /* @@ -2554,10 +2244,10 @@ BucketDBUpdaterTest::transition_time_tracked_across_preempted_transitions() * Yes, the order of node<->bucket id is reversed between the two, perhaps to make sure you're awake. */ -void BucketDBUpdaterTest::batch_update_of_existing_diverging_replicas_does_not_mark_any_as_trusted() { +TEST_F(BucketDBUpdaterTest, batch_update_of_existing_diverging_replicas_does_not_mark_any_as_trusted) { // Replacing bucket information for content node 0 should not mark existing // untrusted replica as trusted as a side effect. - CPPUNIT_ASSERT_EQUAL( + EXPECT_EQ( std::string("5:1/7/8/9/u,0/1/2/3/u|"), mergeBucketLists( lib::ClusterState("distributor:1 storage:3 .0.s:i"), @@ -2566,43 +2256,38 @@ void BucketDBUpdaterTest::batch_update_of_existing_diverging_replicas_does_not_m "0:5/1/2/3|1:5/7/8/9", true)); } -void BucketDBUpdaterTest::batch_add_of_new_diverging_replicas_does_not_mark_any_as_trusted() { - CPPUNIT_ASSERT_EQUAL( - std::string("5:1/7/8/9/u,0/1/2/3/u|"), - mergeBucketLists("", "0:5/1/2/3|1:5/7/8/9", true)); +TEST_F(BucketDBUpdaterTest, batch_add_of_new_diverging_replicas_does_not_mark_any_as_trusted) { + EXPECT_EQ(std::string("5:1/7/8/9/u,0/1/2/3/u|"), + mergeBucketLists("", "0:5/1/2/3|1:5/7/8/9", true)); } -void BucketDBUpdaterTest::batch_add_with_single_resulting_replica_implicitly_marks_as_trusted() { - CPPUNIT_ASSERT_EQUAL( - std::string("5:0/1/2/3/t|"), - mergeBucketLists("", "0:5/1/2/3", true)); +TEST_F(BucketDBUpdaterTest, batch_add_with_single_resulting_replica_implicitly_marks_as_trusted) { + EXPECT_EQ(std::string("5:0/1/2/3/t|"), + mergeBucketLists("", "0:5/1/2/3", true)); } -void BucketDBUpdaterTest::identity_update_of_single_replica_does_not_clear_trusted() { - CPPUNIT_ASSERT_EQUAL( - std::string("5:0/1/2/3/t|"), - mergeBucketLists("0:5/1/2/3", "0:5/1/2/3", true)); +TEST_F(BucketDBUpdaterTest, identity_update_of_single_replica_does_not_clear_trusted) { + EXPECT_EQ(std::string("5:0/1/2/3/t|"), + mergeBucketLists("0:5/1/2/3", "0:5/1/2/3", true)); } -void BucketDBUpdaterTest::identity_update_of_diverging_untrusted_replicas_does_not_mark_any_as_trusted() { - CPPUNIT_ASSERT_EQUAL( - std::string("5:1/7/8/9/u,0/1/2/3/u|"), - mergeBucketLists("0:5/1/2/3|1:5/7/8/9", "0:5/1/2/3|1:5/7/8/9", true)); +TEST_F(BucketDBUpdaterTest, identity_update_of_diverging_untrusted_replicas_does_not_mark_any_as_trusted) { + EXPECT_EQ(std::string("5:1/7/8/9/u,0/1/2/3/u|"), + mergeBucketLists("0:5/1/2/3|1:5/7/8/9", "0:5/1/2/3|1:5/7/8/9", true)); } -void BucketDBUpdaterTest::adding_diverging_replica_to_existing_trusted_does_not_remove_trusted() { - CPPUNIT_ASSERT_EQUAL( - std::string("5:1/2/3/4/u,0/1/2/3/t|"), - mergeBucketLists("0:5/1/2/3", "0:5/1/2/3|1:5/2/3/4", true)); +TEST_F(BucketDBUpdaterTest, adding_diverging_replica_to_existing_trusted_does_not_remove_trusted) { + EXPECT_EQ(std::string("5:1/2/3/4/u,0/1/2/3/t|"), + mergeBucketLists("0:5/1/2/3", "0:5/1/2/3|1:5/2/3/4", true)); } -void BucketDBUpdaterTest::batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted() { +TEST_F(BucketDBUpdaterTest, batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted) { // This differs from batch_update_of_existing_diverging_replicas_does_not_mark_any_as_trusted // in that _all_ content nodes are considered outdated when distributor changes take place, // and therefore a slightly different code path is taken. In particular, bucket info for // outdated nodes gets removed before possibly being re-added (if present in the bucket info // response). - CPPUNIT_ASSERT_EQUAL( + EXPECT_EQ( std::string("5:1/7/8/9/u,0/1/2/3/u|"), mergeBucketLists( lib::ClusterState("distributor:2 storage:3"), @@ -2612,7 +2297,7 @@ void BucketDBUpdaterTest::batch_update_from_distributor_change_does_not_mark_div } // TODO remove on Vespa 8 - this is a workaround for https://github.com/vespa-engine/vespa/issues/8475 -void BucketDBUpdaterTest::global_distribution_hash_falls_back_to_legacy_format_upon_request_rejection() { +TEST_F(BucketDBUpdaterTest, global_distribution_hash_falls_back_to_legacy_format_upon_request_rejection) { std::string distConfig(getDistConfig6Nodes2Groups()); setDistribution(distConfig); @@ -2620,7 +2305,7 @@ void BucketDBUpdaterTest::global_distribution_hash_falls_back_to_legacy_format_u const vespalib::string legacy_hash = "(0d3|3|*(0;0;1;2)(1;3;4;5))"; setSystemState(lib::ClusterState("distributor:6 storage:6")); - CPPUNIT_ASSERT_EQUAL(messageCount(6), _sender.commands.size()); + ASSERT_EQ(messageCount(6), _sender.commands.size()); api::RequestBucketInfoCommand* global_req = nullptr; for (auto& cmd : _sender.commands) { @@ -2630,8 +2315,8 @@ void BucketDBUpdaterTest::global_distribution_hash_falls_back_to_legacy_format_u break; } } - CPPUNIT_ASSERT(global_req != nullptr); - CPPUNIT_ASSERT_EQUAL(current_hash, global_req->getDistributionHash()); + ASSERT_TRUE(global_req != nullptr); + ASSERT_EQ(current_hash, global_req->getDistributionHash()); auto reply = std::make_shared<api::RequestBucketInfoReply>(*global_req); reply->setResult(api::ReturnCode::REJECTED); @@ -2641,9 +2326,9 @@ void BucketDBUpdaterTest::global_distribution_hash_falls_back_to_legacy_format_u getBucketDBUpdater().resendDelayedMessages(); // Should now be a resent request with legacy distribution hash - CPPUNIT_ASSERT_EQUAL(messageCount(6) + 1, _sender.commands.size()); + ASSERT_EQ(messageCount(6) + 1, _sender.commands.size()); auto& legacy_req = dynamic_cast<api::RequestBucketInfoCommand&>(*_sender.commands.back()); - CPPUNIT_ASSERT_EQUAL(legacy_hash, legacy_req.getDistributionHash()); + ASSERT_EQ(legacy_hash, legacy_req.getDistributionHash()); // Now if we reject it _again_ we should cycle back to the current hash // in case it wasn't a hash-based rejection after all. And the circle of life continues. @@ -2654,9 +2339,9 @@ void BucketDBUpdaterTest::global_distribution_hash_falls_back_to_legacy_format_u getClock().addSecondsToTime(10); getBucketDBUpdater().resendDelayedMessages(); - CPPUNIT_ASSERT_EQUAL(messageCount(6) + 2, _sender.commands.size()); + ASSERT_EQ(messageCount(6) + 2, _sender.commands.size()); auto& new_current_req = dynamic_cast<api::RequestBucketInfoCommand&>(*_sender.commands.back()); - CPPUNIT_ASSERT_EQUAL(current_hash, new_current_req.getDistributionHash()); + ASSERT_EQ(current_hash, new_current_req.getDistributionHash()); } namespace { @@ -2680,23 +2365,21 @@ void for_each_bucket(const DistributorBucketSpaceRepo& repo, Func&& f) { } -using ConfigBuilder = vespa::config::content::core::StorDistributormanagerConfigBuilder; - -void BucketDBUpdaterTest::non_owned_buckets_moved_to_read_only_db_on_ownership_change() { +TEST_F(BucketDBUpdaterTest, non_owned_buckets_moved_to_read_only_db_on_ownership_change) { getConfig().setAllowStaleReadsDuringClusterStateTransitions(true); lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition set_cluster_state_bundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity - CPPUNIT_ASSERT_EQUAL(messageCount(4), _sender.commands.size()); + ASSERT_EQ(messageCount(4), _sender.commands.size()); constexpr uint32_t n_buckets = 10; - completeBucketInfoGathering(initial_state, messageCount(4), n_buckets); + ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(initial_state, messageCount(4), n_buckets)); _sender.clear(); - CPPUNIT_ASSERT_EQUAL(size_t(n_buckets), mutable_default_db().size()); - CPPUNIT_ASSERT_EQUAL(size_t(n_buckets), mutable_global_db().size()); - CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size()); - CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_global_db().size()); + EXPECT_EQ(size_t(n_buckets), mutable_default_db().size()); + EXPECT_EQ(size_t(n_buckets), mutable_global_db().size()); + EXPECT_EQ(size_t(0), read_only_default_db().size()); + EXPECT_EQ(size_t(0), read_only_global_db().size()); lib::ClusterState pending_state("distributor:2 storage:4"); @@ -2707,54 +2390,54 @@ void BucketDBUpdaterTest::non_owned_buckets_moved_to_read_only_db_on_ownership_c buckets_not_owned_in_pending_state.insert(Bucket(space, entry.getBucketId())); } }); - CPPUNIT_ASSERT(!buckets_not_owned_in_pending_state.empty()); + EXPECT_FALSE(buckets_not_owned_in_pending_state.empty()); set_cluster_state_bundle(lib::ClusterStateBundle(pending_state, {}, true)); // Now requires activation const auto buckets_not_owned_per_space = (buckets_not_owned_in_pending_state.size() / 2); // 2 spaces const auto expected_mutable_buckets = n_buckets - buckets_not_owned_per_space; - CPPUNIT_ASSERT_EQUAL(expected_mutable_buckets, mutable_default_db().size()); - CPPUNIT_ASSERT_EQUAL(expected_mutable_buckets, mutable_global_db().size()); - CPPUNIT_ASSERT_EQUAL(buckets_not_owned_per_space, read_only_default_db().size()); - CPPUNIT_ASSERT_EQUAL(buckets_not_owned_per_space, read_only_global_db().size()); + EXPECT_EQ(expected_mutable_buckets, mutable_default_db().size()); + EXPECT_EQ(expected_mutable_buckets, mutable_global_db().size()); + EXPECT_EQ(buckets_not_owned_per_space, read_only_default_db().size()); + EXPECT_EQ(buckets_not_owned_per_space, read_only_global_db().size()); for_each_bucket(read_only_repo(), [&](const auto& space, const auto& entry) { - CPPUNIT_ASSERT(buckets_not_owned_in_pending_state.find(Bucket(space, entry.getBucketId())) - != buckets_not_owned_in_pending_state.end()); + EXPECT_TRUE(buckets_not_owned_in_pending_state.find(Bucket(space, entry.getBucketId())) + != buckets_not_owned_in_pending_state.end()); }); } -void BucketDBUpdaterTest::buckets_no_longer_available_are_not_moved_to_read_only_database() { +TEST_F(BucketDBUpdaterTest, buckets_no_longer_available_are_not_moved_to_read_only_database) { constexpr uint32_t n_buckets = 10; // No ownership change, just node down. Test redundancy is 2, so removing 2 nodes will // cause some buckets to be entirely unavailable. trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, "version:2 distributor:1 storage:4 .0.s:d .1.s:m", n_buckets, 0); - CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size()); - CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_global_db().size()); + EXPECT_EQ(size_t(0), read_only_default_db().size()); + EXPECT_EQ(size_t(0), read_only_global_db().size()); } -void BucketDBUpdaterTest::non_owned_buckets_purged_when_read_only_support_is_config_disabled() { +TEST_F(BucketDBUpdaterTest, non_owned_buckets_purged_when_read_only_support_is_config_disabled) { getConfig().setAllowStaleReadsDuringClusterStateTransitions(false); lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition set_cluster_state_bundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity - CPPUNIT_ASSERT_EQUAL(messageCount(4), _sender.commands.size()); + ASSERT_EQ(messageCount(4), _sender.commands.size()); constexpr uint32_t n_buckets = 10; - completeBucketInfoGathering(initial_state, messageCount(4), n_buckets); + ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering(initial_state, messageCount(4), n_buckets)); _sender.clear(); // Nothing in read-only DB after first bulk load of buckets. - CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size()); - CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_global_db().size()); + EXPECT_EQ(size_t(0), read_only_default_db().size()); + EXPECT_EQ(size_t(0), read_only_global_db().size()); lib::ClusterState pending_state("distributor:2 storage:4"); setSystemState(pending_state); // No buckets should be moved into read only db after ownership changes. - CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size()); - CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_global_db().size()); + EXPECT_EQ(size_t(0), read_only_default_db().size()); + EXPECT_EQ(size_t(0), read_only_global_db().size()); } void BucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transition( @@ -2768,83 +2451,88 @@ void BucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transition( getConfig().setAllowStaleReadsDuringClusterStateTransitions(true); lib::ClusterState initial_state(initial_state_str); setSystemState(initial_state); - CPPUNIT_ASSERT_EQUAL(messageCount(initial_expected_msgs), _sender.commands.size()); - completeBucketInfoGathering(initial_state, messageCount(initial_expected_msgs), initial_buckets); + ASSERT_EQ(messageCount(initial_expected_msgs), _sender.commands.size()); + ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering( + initial_state, messageCount(initial_expected_msgs), initial_buckets)); _sender.clear(); lib::ClusterState pending_state(pending_state_str); // Ownership change set_cluster_state_bundle(lib::ClusterStateBundle(pending_state, {}, true)); - CPPUNIT_ASSERT_EQUAL(messageCount(pending_expected_msgs), _sender.commands.size()); - completeBucketInfoGathering(pending_state, messageCount(pending_expected_msgs), pending_buckets); + ASSERT_EQ(messageCount(pending_expected_msgs), _sender.commands.size()); + ASSERT_NO_FATAL_FAILURE(completeBucketInfoGathering( + pending_state, messageCount(pending_expected_msgs), pending_buckets)); _sender.clear(); } -void BucketDBUpdaterTest::deferred_activated_state_does_not_enable_state_until_activation_received() { +TEST_F(BucketDBUpdaterTest, deferred_activated_state_does_not_enable_state_until_activation_received) { constexpr uint32_t n_buckets = 10; - trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, - "version:2 distributor:1 storage:4", n_buckets, 4); + ASSERT_NO_FATAL_FAILURE( + trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, + "version:2 distributor:1 storage:4", n_buckets, 4)); // Version should not be switched over yet - CPPUNIT_ASSERT_EQUAL(uint32_t(1), getDistributor().getClusterStateBundle().getVersion()); + EXPECT_EQ(uint32_t(1), getDistributor().getClusterStateBundle().getVersion()); - CPPUNIT_ASSERT_EQUAL(uint64_t(0), mutable_default_db().size()); - CPPUNIT_ASSERT_EQUAL(uint64_t(0), mutable_global_db().size()); + EXPECT_EQ(uint64_t(0), mutable_default_db().size()); + EXPECT_EQ(uint64_t(0), mutable_global_db().size()); - CPPUNIT_ASSERT(!activate_cluster_state_version(2)); + EXPECT_FALSE(activate_cluster_state_version(2)); - CPPUNIT_ASSERT_EQUAL(uint32_t(2), getDistributor().getClusterStateBundle().getVersion()); - CPPUNIT_ASSERT_EQUAL(uint64_t(n_buckets), mutable_default_db().size()); - CPPUNIT_ASSERT_EQUAL(uint64_t(n_buckets), mutable_global_db().size()); + EXPECT_EQ(uint32_t(2), getDistributor().getClusterStateBundle().getVersion()); + EXPECT_EQ(uint64_t(n_buckets), mutable_default_db().size()); + EXPECT_EQ(uint64_t(n_buckets), mutable_global_db().size()); } -void BucketDBUpdaterTest::read_only_db_cleared_once_pending_state_is_activated() { +TEST_F(BucketDBUpdaterTest, read_only_db_cleared_once_pending_state_is_activated) { constexpr uint32_t n_buckets = 10; - trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, - "version:2 distributor:2 storage:4", n_buckets, 0); - CPPUNIT_ASSERT(!activate_cluster_state_version(2)); + ASSERT_NO_FATAL_FAILURE( + trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, + "version:2 distributor:2 storage:4", n_buckets, 0)); + EXPECT_FALSE(activate_cluster_state_version(2)); - CPPUNIT_ASSERT_EQUAL(uint64_t(0), read_only_default_db().size()); - CPPUNIT_ASSERT_EQUAL(uint64_t(0), read_only_global_db().size()); + EXPECT_EQ(uint64_t(0), read_only_default_db().size()); + EXPECT_EQ(uint64_t(0), read_only_global_db().size()); } -void BucketDBUpdaterTest::read_only_db_is_populated_even_when_self_is_marked_down() { +TEST_F(BucketDBUpdaterTest, read_only_db_is_populated_even_when_self_is_marked_down) { constexpr uint32_t n_buckets = 10; - trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, - "version:2 distributor:1 .0.s:d storage:4", n_buckets, 0); + ASSERT_NO_FATAL_FAILURE( + trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, + "version:2 distributor:1 .0.s:d storage:4", n_buckets, 0)); // State not yet activated, so read-only DBs have got all the buckets we used to have. - CPPUNIT_ASSERT_EQUAL(uint64_t(0), mutable_default_db().size()); - CPPUNIT_ASSERT_EQUAL(uint64_t(0), mutable_global_db().size()); - CPPUNIT_ASSERT_EQUAL(uint64_t(n_buckets), read_only_default_db().size()); - CPPUNIT_ASSERT_EQUAL(uint64_t(n_buckets), read_only_global_db().size()); + EXPECT_EQ(uint64_t(0), mutable_default_db().size()); + EXPECT_EQ(uint64_t(0), mutable_global_db().size()); + EXPECT_EQ(uint64_t(n_buckets), read_only_default_db().size()); + EXPECT_EQ(uint64_t(n_buckets), read_only_global_db().size()); } -void BucketDBUpdaterTest::activate_cluster_state_request_with_mismatching_version_returns_actual_version() { +TEST_F(BucketDBUpdaterTest, activate_cluster_state_request_with_mismatching_version_returns_actual_version) { constexpr uint32_t n_buckets = 10; - trigger_completed_but_not_yet_activated_transition("version:4 distributor:1 storage:4", n_buckets, 4, - "version:5 distributor:2 storage:4", n_buckets, 0); + ASSERT_NO_FATAL_FAILURE( + trigger_completed_but_not_yet_activated_transition("version:4 distributor:1 storage:4", n_buckets, 4, + "version:5 distributor:2 storage:4", n_buckets, 0)); - CPPUNIT_ASSERT(activate_cluster_state_version(4)); // Too old version - assert_has_activate_cluster_state_reply_with_actual_version(5); + EXPECT_TRUE(activate_cluster_state_version(4)); // Too old version + ASSERT_NO_FATAL_FAILURE(assert_has_activate_cluster_state_reply_with_actual_version(5)); - CPPUNIT_ASSERT(activate_cluster_state_version(6)); // More recent version than what has been observed - assert_has_activate_cluster_state_reply_with_actual_version(5); + EXPECT_TRUE(activate_cluster_state_version(6)); // More recent version than what has been observed + ASSERT_NO_FATAL_FAILURE(assert_has_activate_cluster_state_reply_with_actual_version(5)); } -void BucketDBUpdaterTest::activate_cluster_state_request_without_pending_transition_passes_message_through() { +TEST_F(BucketDBUpdaterTest, activate_cluster_state_request_without_pending_transition_passes_message_through) { constexpr uint32_t n_buckets = 10; - trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, - "version:2 distributor:1 storage:4", n_buckets, 4); + ASSERT_NO_FATAL_FAILURE( + trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, + "version:2 distributor:1 storage:4", n_buckets, 4)); // Activate version 2; no pending cluster state after this. - CPPUNIT_ASSERT(!activate_cluster_state_version(2)); + EXPECT_FALSE(activate_cluster_state_version(2)); // No pending cluster state for version 3, just passed through to be implicitly bounced by state manager. // Note: state manager is not modelled in this test, so we just check that the message handler returns // false (meaning "didn't take message ownership") and there's no auto-generated reply. - CPPUNIT_ASSERT(!activate_cluster_state_version(3)); - CPPUNIT_ASSERT_EQUAL(size_t(0), _sender.replies.size()); + EXPECT_FALSE(activate_cluster_state_version(3)); + EXPECT_EQ(size_t(0), _sender.replies.size()); } -// TODO rename distributor config to imply two phase functionlity explicitly? - } |