summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--build_settings.cmake2
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/ClusterCost.java15
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/ClusterInfo.java39
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/DeploymentCost.java11
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ClusterInfoMaintainer.java24
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/ApplicationSerializer.java13
-rw-r--r--controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java8
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/ClusterCostTest.java35
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/DeploymentCostTest.java38
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/ClusterInfoMaintainerTest.java2
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/persistence/ApplicationSerializerTest.java7
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiTest.java2
-rw-r--r--controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/deployment.json14
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp6
-rw-r--r--filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.cpp4
-rw-r--r--filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.cpp16
-rw-r--r--searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp111
-rw-r--r--searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp94
-rw-r--r--searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp48
-rw-r--r--searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp8
-rw-r--r--searchcore/src/tests/proton/feedtoken/feedtoken.cpp83
-rw-r--r--searchcore/src/vespa/searchcore/fdispatch/search/nodemanager.cpp5
-rw-r--r--searchcore/src/vespa/searchcore/proton/attribute/i_attribute_writer.h6
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp64
-rw-r--r--searchcore/src/vespa/searchcore/proton/common/feedtoken.h47
-rw-r--r--searchcore/src/vespa/searchcore/proton/metrics/CMakeLists.txt1
-rw-r--r--searchcore/src/vespa/searchcore/proton/metrics/feed_metrics.cpp60
-rw-r--r--searchcore/src/vespa/searchcore/proton/metrics/feed_metrics.h41
-rw-r--r--searchcore/src/vespa/searchcore/proton/metrics/legacy_documentdb_metrics.cpp1
-rw-r--r--searchcore/src/vespa/searchcore/proton/metrics/legacy_documentdb_metrics.h2
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp45
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp13
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h5
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/documentdb.cpp8
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp46
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.h57
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp44
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.h9
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedstate.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedstate.h22
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp10
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h10
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp5
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/putdonecontext.h24
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/removedonecontext.h13
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp14
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp5
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp90
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h16
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp17
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h11
-rw-r--r--searchlib/src/tests/queryeval/queryeval.cpp6
-rw-r--r--searchlib/src/vespa/searchlib/queryeval/multisearch.h8
-rw-r--r--searchlib/src/vespa/searchlib/queryeval/orsearch.cpp16
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/document_runnable.cpp13
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/document_runnable.h3
-rw-r--r--storage/src/tests/distributor/distributortest.cpp23
-rw-r--r--storage/src/tests/persistence/persistencetestutils.cpp3
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.cpp135
-rw-r--r--storage/src/vespa/storage/distributor/bucketdbupdater.h78
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp31
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h2
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp227
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp28
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.h52
-rw-r--r--storage/src/vespa/storage/storageserver/servicelayernode.cpp12
-rw-r--r--storage/src/vespa/storage/storageserver/storagenodecontext.cpp8
-rw-r--r--storage/src/vespa/storage/storageserver/storagenodecontext.h3
-rw-r--r--storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h8
-rw-r--r--storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp48
-rw-r--r--storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h10
-rw-r--r--storageframework/src/vespa/storageframework/generic/component/component.cpp22
-rw-r--r--storageframework/src/vespa/storageframework/generic/component/component.h2
-rw-r--r--storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp4
-rw-r--r--storageserver/src/vespa/storageserver/app/servicelayerprocess.h5
-rw-r--r--vespalog/src/test/bufferedlogtest.cpp5
-rw-r--r--vespalog/src/test/bufferedlogtest.logger1.cpp4
-rw-r--r--vespalog/src/test/bufferedlogtest.logger1.h2
-rw-r--r--vespalog/src/test/bufferedlogtest.logger2.cpp4
-rw-r--r--vespalog/src/test/bufferedlogtest.logger2.h2
-rw-r--r--vespalog/src/vespa/log/bufferedlogger.h8
-rw-r--r--vespalog/src/vespa/log/log.cpp15
-rw-r--r--vespalog/src/vespa/log/log.h44
84 files changed, 776 insertions, 1334 deletions
diff --git a/build_settings.cmake b/build_settings.cmake
index 425a2eddda7..2cccea9b64f 100644
--- a/build_settings.cmake
+++ b/build_settings.cmake
@@ -21,7 +21,7 @@ set(C_WARN_OPTS "-Winline -Wuninitialized -Werror -Wall -W -Wchar-subscripts -Wc
# Warnings that are specific to C++ compilation
# Note: this is not a union of C_WARN_OPTS, since CMAKE_CXX_FLAGS already includes CMAKE_C_FLAGS, which in turn includes C_WARN_OPTS transitively
-set(CXX_SPECIFIC_WARN_OPTS "-Wsuggest-override -Wnon-virtual-dtor")
+set(CXX_SPECIFIC_WARN_OPTS "-Wsuggest-override -Wnon-virtual-dtor -Wformat-security")
# C and C++ compiler flags
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -g -O3 -fno-omit-frame-pointer ${C_WARN_OPTS} -fPIC ${VESPA_CXX_ABI_FLAGS} -DBOOST_DISABLE_ASSERTS ${VESPA_CPU_ARCH_FLAGS} -mtune=intel ${EXTRA_C_FLAGS}")
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/ClusterCost.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/ClusterCost.java
index 03d0cd28ca1..985d6173b29 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/ClusterCost.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/ClusterCost.java
@@ -6,13 +6,14 @@ package com.yahoo.vespa.hosted.controller.application;
* tco and waste for one cluster of one deployment.
*
* The target utilization is defined the following assumptions:
- * 1. CPU contention starts to cause problems on 0.8
- * 2. Memory management starts to casue problems on 0.7
+ * 1. CPU contention starts to cause problems at 0.8
+ * 2. Memory management starts to cause problems at 0.7
* 3. Load is evenly divided between two deployments - each deployments can handle the other.
* 4. Memory and disk are agnostic to query load.
* 5. Peak utilization (daily variations) are twice the size of the average.
*
* With this in mind we get:
+ *
* CPU: 0.8/2/2 = 0.2
* Mem: 0.7
* Disk: 0.7
@@ -40,16 +41,18 @@ public class ClusterCost {
this.targetUtilization = new ClusterUtilization(0.7,0.2, 0.7, 0.3);
this.resultUtilization = calculateResultUtilization(systemUtilization, targetUtilization);
- this.tco = clusterInfo.getCost() * Math.min(1, this.resultUtilization.getMaxUtilization());
- this.waste = clusterInfo.getCost() - tco;
+ this.tco = clusterInfo.getHostnames().size() * clusterInfo.getFlavorCost();
+
+ double unusedUtilization = 1 - Math.min(1, resultUtilization.getMaxUtilization());
+ this.waste = tco * unusedUtilization;
}
- /** @return TCO in dollars */
+ /** @return The TCO in dollars for this cluster (node tco * nodes) */
public double getTco() {
return tco;
}
- /** @return Waste in dollars */
+ /** @return The amount of dollars spent for unused resources in this cluster */
public double getWaste() {
return waste;
}
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/ClusterInfo.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/ClusterInfo.java
index cb39177c811..40fc57acdc8 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/ClusterInfo.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/ClusterInfo.java
@@ -9,31 +9,62 @@ import java.util.List;
* Value object of static cluster information, in particular the TCO
* of the hardware used for this cluster.
*
+ * Some duplication/flattening of flavor info is done to simplify client usage.
+ *
* @author smorgrav
*/
public class ClusterInfo {
private final String flavor;
- private final int cost;
+ private final double flavorCPU;
+ private final double flavorMem;
+ private final double flavorDisk;
+ private final int flavorCost;
private final ClusterSpec.Type clusterType;
private final List<String> hostnames;
- public ClusterInfo(String flavor, int cost, ClusterSpec.Type clusterType, List<String> hostnames) {
+ /**
+ * @param flavor The name of the flavor eg. 'C-2B/24/500'
+ * @param flavorCost The cost of one node in dollars
+ * @param flavorCPU The number of cpu cores granted
+ * @param flavorMem The memory granted in Gb
+ * @param flavorDisk The disk size granted in Gb
+ * @param clusterType The vespa cluster type e.g 'container' or 'content'
+ * @param hostnames All hostnames in this cluster
+ */
+ public ClusterInfo(String flavor, int flavorCost, double flavorCPU, double flavorMem,
+ double flavorDisk, ClusterSpec.Type clusterType, List<String> hostnames) {
this.flavor = flavor;
- this.cost = cost;
+ this.flavorCost = flavorCost;
+ this.flavorCPU = flavorCPU;
+ this.flavorMem = flavorMem;
+ this.flavorDisk = flavorDisk;
this.clusterType = clusterType;
this.hostnames = hostnames;
}
+ /** @return The name of the flavor eg. 'C-2B/24/500' */
public String getFlavor() {
return flavor;
}
- public int getCost() { return cost; }
+ /** @return The cost of one node in dollars */
+ public int getFlavorCost() { return flavorCost; }
+
+ /** @return The disk size granted in Gb */
+ public double getFlavorDisk() { return flavorDisk; }
+
+ /** @return The number of cpu cores granted */
+ public double getFlavorCPU() { return flavorCPU; }
+
+ /** @return The memory granted in Gb */
+ public double getFlavorMem() { return flavorMem; }
+ /** @return The vespa cluster type e.g 'container' or 'content' */
public ClusterSpec.Type getClusterType() {
return clusterType;
}
+ /** @return All hostnames in this cluster */
public List<String> getHostnames() {
return hostnames;
}
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/DeploymentCost.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/DeploymentCost.java
index fce825bd99e..585690793bb 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/DeploymentCost.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/application/DeploymentCost.java
@@ -23,12 +23,16 @@ public class DeploymentCost {
double tco = 0;
double util = 0;
double waste = 0;
+ double maxWaste = -1;
for (ClusterCost costCluster : clusterCosts.values()) {
tco += costCluster.getTco();
waste += costCluster.getWaste();
- int nodesInCluster = costCluster.getClusterInfo().getHostnames().size();
- util = Math.max(util, nodesInCluster*costCluster.getResultUtilization().getMaxUtilization());
+
+ if (costCluster.getWaste() > maxWaste) {
+ util = costCluster.getResultUtilization().getMaxUtilization();
+ maxWaste = costCluster.getWaste();
+ }
}
this.utilization = util;
@@ -40,14 +44,17 @@ public class DeploymentCost {
return clusters;
}
+ /** @return Total cost of ownership for the deployment (sum of all clusters) */
public double getTco() {
return tco;
}
+ /** @return The utilization of clusters that wastes most money in this deployment */
public double getUtilization() {
return utilization;
}
+ /** @return The amount of dollars spent and not utilized */
public double getWaste() {
return waste;
}
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ClusterInfoMaintainer.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ClusterInfoMaintainer.java
index 8f5db8832fa..c807a7f0586 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ClusterInfoMaintainer.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/ClusterInfoMaintainer.java
@@ -2,6 +2,8 @@
package com.yahoo.vespa.hosted.controller.maintenance;
import com.yahoo.config.provision.ClusterSpec;
+import com.yahoo.config.provision.Flavor;
+import com.yahoo.config.provision.Zone;
import com.yahoo.vespa.curator.Lock;
import com.yahoo.vespa.hosted.controller.Application;
import com.yahoo.vespa.hosted.controller.Controller;
@@ -15,6 +17,7 @@ import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.logging.Logger;
import java.util.stream.Collectors;
@@ -38,7 +41,7 @@ public class ClusterInfoMaintainer extends Maintainer {
return node.membership.clusterId;
}
- private Map<ClusterSpec.Id, ClusterInfo> getClusterInfo(NodeList nodes) {
+ private Map<ClusterSpec.Id, ClusterInfo> getClusterInfo(NodeList nodes, Zone zone) {
Map<ClusterSpec.Id, ClusterInfo> infoMap = new HashMap<>();
// Group nodes by clusterid
@@ -53,9 +56,24 @@ public class ClusterInfoMaintainer extends Maintainer {
//Assume they are all equal and use first node as a representatitve for the cluster
NodeList.Node node = clusterNodes.get(0);
+ // Extract flavor info
+ double cpu = 0;
+ double mem = 0;
+ double disk = 0;
+ if (zone.nodeFlavors().isPresent()) {
+ Optional<Flavor> flavorOptional = zone.nodeFlavors().get().getFlavor(node.flavor);
+ if ((flavorOptional.isPresent())) {
+ Flavor flavor = flavorOptional.get();
+ cpu = flavor.getMinCpuCores();
+ mem = flavor.getMinMainMemoryAvailableGb();
+ disk = flavor.getMinMainMemoryAvailableGb();
+ }
+ }
+
// Add to map
List<String> hostnames = clusterNodes.stream().map(node1 -> node1.hostname).collect(Collectors.toList());
- ClusterInfo inf = new ClusterInfo(node.flavor, node.cost, ClusterSpec.Type.from(node.membership.clusterType), hostnames);
+ ClusterInfo inf = new ClusterInfo(node.flavor, node.cost, cpu, mem, disk,
+ ClusterSpec.Type.from(node.membership.clusterType), hostnames);
infoMap.put(new ClusterSpec.Id(id), inf);
}
@@ -71,7 +89,7 @@ public class ClusterInfoMaintainer extends Maintainer {
DeploymentId deploymentId = new DeploymentId(application.id(), deployment.zone());
try {
NodeList nodes = controller().applications().configserverClient().getNodeList(deploymentId);
- Map<ClusterSpec.Id, ClusterInfo> clusterInfo = getClusterInfo(nodes);
+ Map<ClusterSpec.Id, ClusterInfo> clusterInfo = getClusterInfo(nodes, deployment.zone());
Application app = application.with(deployment.withClusterInfo(clusterInfo));
controller.applications().store(app, lock);
} catch (IOException ioe) {
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/ApplicationSerializer.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/ApplicationSerializer.java
index 859e322b227..f54dd2a010f 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/ApplicationSerializer.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/persistence/ApplicationSerializer.java
@@ -85,6 +85,9 @@ public class ApplicationSerializer {
private final String clusterInfoField = "clusterInfo";
private final String clusterInfoFlavorField = "flavor";
private final String clusterInfoCostField = "cost";
+ private final String clusterInfoCpuField = "flavorCpu";
+ private final String clusterInfoMemField = "flavorMem";
+ private final String clusterInfoDiskField = "flavorDisk";
private final String clusterInfoTypeField = "clusterType";
private final String clusterInfoHostnamesField = "hostnames";
@@ -134,7 +137,10 @@ public class ApplicationSerializer {
private void toSlime(ClusterInfo info, Cursor object) {
object.setString(clusterInfoFlavorField, info.getFlavor());
- object.setLong(clusterInfoCostField, info.getCost());
+ object.setLong(clusterInfoCostField, info.getFlavorCost());
+ object.setDouble(clusterInfoCpuField, info.getFlavorCPU());
+ object.setDouble(clusterInfoMemField, info.getFlavorMem());
+ object.setDouble(clusterInfoDiskField, info.getFlavorDisk());
object.setString(clusterInfoTypeField, info.getClusterType().name());
Cursor array = object.setArray(clusterInfoHostnamesField);
for (String host : info.getHostnames()) {
@@ -274,10 +280,13 @@ public class ApplicationSerializer {
String flavor = inspector.field(clusterInfoFlavorField).asString();
int cost = (int)inspector.field(clusterInfoCostField).asLong();
String type = inspector.field(clusterInfoTypeField).asString();
+ double flavorCpu = inspector.field(clusterInfoCpuField).asDouble();
+ double flavorMem = inspector.field(clusterInfoMemField).asDouble();
+ double flavorDisk = inspector.field(clusterInfoDiskField).asDouble();
List<String> hostnames = new ArrayList<>();
inspector.field(clusterInfoHostnamesField).traverse((ArrayTraverser)(int index, Inspector value) -> hostnames.add(value.asString()));
- return new ClusterInfo(flavor, cost, ClusterSpec.Type.from(type), hostnames);
+ return new ClusterInfo(flavor, cost, flavorCpu, flavorMem, flavorDisk, ClusterSpec.Type.from(type), hostnames);
}
private Zone zoneFromSlime(Inspector object) {
diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java
index 42e89e7893f..248adcd4ec5 100644
--- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java
+++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiHandler.java
@@ -1084,8 +1084,8 @@ public class ApplicationApiHandler extends LoggingRequestHandler {
public static void toSlime(DeploymentCost deploymentCost, Cursor object) {
object.setLong("tco", (long)deploymentCost.getTco());
+ object.setLong("waste", (long)deploymentCost.getWaste());
object.setDouble("utilization", deploymentCost.getUtilization());
- object.setDouble("waste", deploymentCost.getWaste());
Cursor clustersObject = object.setObject("cluster");
for (Map.Entry<String, ClusterCost> clusterEntry : deploymentCost.getCluster().entrySet())
toSlime(clusterEntry.getValue(), clustersObject.setObject(clusterEntry.getKey()));
@@ -1096,8 +1096,12 @@ public class ApplicationApiHandler extends LoggingRequestHandler {
object.setString("resource", getResourceName(clusterCost.getResultUtilization()));
object.setDouble("utilization", clusterCost.getResultUtilization().getMaxUtilization());
object.setLong("tco", (int)clusterCost.getTco());
- object.setString("flavor", clusterCost.getClusterInfo().getFlavor());
object.setLong("waste", (int)clusterCost.getWaste());
+ object.setString("flavor", clusterCost.getClusterInfo().getFlavor());
+ object.setDouble("flavorCost", clusterCost.getClusterInfo().getFlavorCost());
+ object.setDouble("flavorCpu", clusterCost.getClusterInfo().getFlavorCPU());
+ object.setDouble("flavorMem", clusterCost.getClusterInfo().getFlavorMem());
+ object.setDouble("flavorDisk", clusterCost.getClusterInfo().getFlavorDisk());
object.setString("type", clusterCost.getClusterInfo().getClusterType().name());
Cursor utilObject = object.setObject("util");
utilObject.setDouble("cpu", clusterCost.getResultUtilization().getCpu());
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/ClusterCostTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/ClusterCostTest.java
new file mode 100644
index 00000000000..313f565f546
--- /dev/null
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/ClusterCostTest.java
@@ -0,0 +1,35 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.hosted.controller.application;
+
+import com.yahoo.config.provision.ClusterSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author smorgrav
+ */
+public class ClusterCostTest {
+
+ @Test
+ public void clusterCost() throws Exception {
+ List<String> hostnames = new ArrayList<>();
+ hostnames.add("host1");
+ hostnames.add("host2");
+ ClusterInfo info = new ClusterInfo("test", 100, 10, 10, 10, ClusterSpec.Type.container, hostnames);
+ ClusterUtilization util = new ClusterUtilization(0.3, 0.2, 0.5, 0.1);
+ ClusterCost cost = new ClusterCost(info, util);
+
+ // CPU is fully utilized
+ Assert.assertEquals(200, cost.getTco(), Double.MIN_VALUE);
+ Assert.assertEquals(0, cost.getWaste(), Double.MIN_VALUE);
+
+ // Set Disk as the most utilized resource
+ util = new ClusterUtilization(0.3, 0.1, 0.5, 0.1);
+ cost = new ClusterCost(info, util);
+ Assert.assertEquals(200, cost.getTco(), Double.MIN_NORMAL); // TCO is independent of utilization
+ Assert.assertEquals(57.1428571429, cost.getWaste(), 0.001); // Waste is not independent
+ }
+} \ No newline at end of file
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/DeploymentCostTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/DeploymentCostTest.java
new file mode 100644
index 00000000000..2e58253d768
--- /dev/null
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/application/DeploymentCostTest.java
@@ -0,0 +1,38 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.hosted.controller.application;
+
+import com.yahoo.config.provision.ClusterSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author smorgrav
+ */
+public class DeploymentCostTest {
+
+ @Test
+ public void deploymentCost() {
+ Map<String, ClusterCost> clusters = new HashMap<>();
+ clusters.put("cluster1", createClusterCost(100, 0.2));
+ clusters.put("cluster2", createClusterCost(50, 0.1));
+
+ DeploymentCost cost = new DeploymentCost(clusters);
+ Assert.assertEquals(300, cost.getTco(), Double.MIN_VALUE); // 2*100 + 2*50
+ Assert.assertEquals(28.5714285714, cost.getWaste(), 0.001); // from cluster2
+ Assert.assertEquals(0.7142857142857143, cost.getUtilization(), Double.MIN_VALUE); // from cluster2
+ }
+
+ private ClusterCost createClusterCost(int flavorCost, double cpuUtil) {
+ List<String> hostnames = new ArrayList<>();
+ hostnames.add("host1");
+ hostnames.add("host2");
+ ClusterInfo info = new ClusterInfo("test", flavorCost, 10, 10, 10, ClusterSpec.Type.container, hostnames);
+ ClusterUtilization util = new ClusterUtilization(0.3, cpuUtil, 0.5, 0.1);
+ return new ClusterCost(info, util);
+ }
+} \ No newline at end of file
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/ClusterInfoMaintainerTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/ClusterInfoMaintainerTest.java
index 7ae89082660..13919cefd3b 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/ClusterInfoMaintainerTest.java
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/maintenance/ClusterInfoMaintainerTest.java
@@ -31,7 +31,7 @@ public class ClusterInfoMaintainerTest {
deployment = tester.controller().applications().get(app).get().deployments().values().stream().findAny().get();
Assert.assertEquals(2, deployment.clusterInfo().size());
- Assert.assertEquals(10, deployment.clusterInfo().get(ClusterSpec.Id.from("clusterA")).getCost());
+ Assert.assertEquals(10, deployment.clusterInfo().get(ClusterSpec.Id.from("clusterA")).getFlavorCost());
}
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/persistence/ApplicationSerializerTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/persistence/ApplicationSerializerTest.java
index 3e73bf4445b..d43a434b983 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/persistence/ApplicationSerializerTest.java
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/persistence/ApplicationSerializerTest.java
@@ -118,10 +118,13 @@ public class ApplicationSerializerTest {
// Test cluster info
assertEquals(3, serialized.deployments().get(zone2).clusterInfo().size());
- assertEquals(10, serialized.deployments().get(zone2).clusterInfo().get(ClusterSpec.Id.from("id2")).getCost());
+ assertEquals(10, serialized.deployments().get(zone2).clusterInfo().get(ClusterSpec.Id.from("id2")).getFlavorCost());
assertEquals(ClusterSpec.Type.content, serialized.deployments().get(zone2).clusterInfo().get(ClusterSpec.Id.from("id2")).getClusterType());
assertEquals("flavor2", serialized.deployments().get(zone2).clusterInfo().get(ClusterSpec.Id.from("id2")).getFlavor());
assertEquals(4, serialized.deployments().get(zone2).clusterInfo().get(ClusterSpec.Id.from("id2")).getHostnames().size());
+ assertEquals(2, serialized.deployments().get(zone2).clusterInfo().get(ClusterSpec.Id.from("id2")).getFlavorCPU(), Double.MIN_VALUE);
+ assertEquals(4, serialized.deployments().get(zone2).clusterInfo().get(ClusterSpec.Id.from("id2")).getFlavorMem(), Double.MIN_VALUE);
+ assertEquals(50, serialized.deployments().get(zone2).clusterInfo().get(ClusterSpec.Id.from("id2")).getFlavorDisk(), Double.MIN_VALUE);
{ // test more deployment serialization cases
Application original2 = original.withDeploying(Optional.of(Change.ApplicationChange.of(ApplicationRevision.from("hash1"))));
@@ -153,7 +156,7 @@ public class ApplicationSerializerTest {
}
result.put(ClusterSpec.Id.from("id" + cluster), new ClusterInfo("flavor" + cluster, 10,
- ClusterSpec.Type.content, hostnames));
+ 2, 4, 50, ClusterSpec.Type.content, hostnames));
}
return result;
}
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiTest.java b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiTest.java
index 13b1165ccb2..d7641072abf 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiTest.java
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/ApplicationApiTest.java
@@ -740,7 +740,7 @@ public class ApplicationApiTest extends ControllerContainerTest {
List<String> hostnames = new ArrayList<>();
hostnames.add("host1");
hostnames.add("host2");
- clusterInfo.put(ClusterSpec.Id.from("cluster1"), new ClusterInfo("flavor1", 37, ClusterSpec.Type.content, hostnames));
+ clusterInfo.put(ClusterSpec.Id.from("cluster1"), new ClusterInfo("flavor1", 37, 2, 4, 50, ClusterSpec.Type.content, hostnames));
Map<ClusterSpec.Id, ClusterUtilization> clusterUtils = new HashMap<>();
clusterUtils.put(ClusterSpec.Id.from("cluster1"), new ClusterUtilization(0.3, 0.6, 0.4, 0.3));
deployment = deployment.withClusterInfo(clusterInfo);
diff --git a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/deployment.json b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/deployment.json
index 67fc48d4646..9174e7dd8b2 100644
--- a/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/deployment.json
+++ b/controller-server/src/test/java/com/yahoo/vespa/hosted/controller/restapi/application/responses/deployment.json
@@ -17,17 +17,21 @@
"gitBranch": "master",
"gitCommit": "commit1",
"cost": {
- "tco": 37,
- "utilization": 5.999999999999999,
- "waste": 0.0,
+ "tco": 74,
+ "waste": 0,
+ "utilization": 2.999999999999999,
"cluster": {
"cluster1": {
"count": 2,
"resource": "cpu",
"utilization": 2.999999999999999,
- "tco": 37,
- "flavor": "flavor1",
+ "tco": 74,
"waste": 0,
+ "flavor": "flavor1",
+ "flavorCost":37.0,
+ "flavorCpu":2.0,
+ "flavorMem":4.0,
+ "flavorDisk":50.0,
"type": "content",
"util": {
"cpu": 2.999999999999999,
diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp b/filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp
index 9668e09bcc8..b917bc015ec 100644
--- a/filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp
+++ b/filedistribution/src/vespa/filedistribution/distributor/filedownloader.cpp
@@ -285,7 +285,7 @@ FileDownloader::pathToCompletedFile(const std::string& fileReference) const {
boost::optional<FileDownloader::ResumeDataBuffer>
FileDownloader::getResumeData(const std::string& fileReference) {
- LOG(debug, ("Reading resume data for " + fileReference).c_str());
+ LOG(debug, "Reading resume data for '%s'", fileReference.c_str());
try {
fs::path path = (_dbPath / fileReference).string() + resumeDataSuffix;
if (fs::exists(path)) {
@@ -294,7 +294,7 @@ FileDownloader::getResumeData(const std::string& fileReference) {
std::istream_iterator<char> iterator(file), end;
std::copy(iterator, end, std::back_inserter(result));
- LOG(debug, ("Successfully retrieved resume data for " + fileReference).c_str());
+ LOG(debug, "Successfully retrieved resume data for '%s'", fileReference.c_str());
if (result.size() < 50) {
LOG(info, "Very small resume file %zu bytes.", result.size());
}
@@ -303,7 +303,7 @@ FileDownloader::getResumeData(const std::string& fileReference) {
}
} catch(...) {
//resume data is only an optimization
- LOG(info, ("Error while reading resume data for " + fileReference).c_str());
+ LOG(info, "Error while reading resume data for '%s'", fileReference.c_str());
}
return boost::optional<ResumeDataBuffer>();
}
diff --git a/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.cpp b/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.cpp
index 669cc550003..1763b798f03 100644
--- a/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.cpp
+++ b/filedistribution/src/vespa/filedistribution/distributor/filedownloadermanager.cpp
@@ -14,10 +14,10 @@ using filedistribution::Path;
namespace {
void logStartDownload(const std::set<std::string> & filesToDownload) {
std::ostringstream msg;
- msg <<"StartDownloads:" <<std::endl;
+ msg << "StartDownloads:" << std::endl;
std::copy(filesToDownload.begin(), filesToDownload.end(),
std::ostream_iterator<std::string>(msg, "\n"));
- LOG(debug, msg.str().c_str());
+ LOG(debug, "%s", msg.str().c_str());
}
} //anonymous namespace
diff --git a/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.cpp b/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.cpp
index 9ce31e0dd3a..e0338cffd52 100644
--- a/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.cpp
+++ b/filedistribution/src/vespa/filedistribution/model/filedistributionmodelimpl.cpp
@@ -13,11 +13,11 @@ LOG_SETUP(".filedistributionmodel");
namespace fs = boost::filesystem;
using filedistribution::ZKFileDBModel;
+using std::make_shared;
namespace {
//peer format: hostName:port
-
void
addPeerEntry(const std::string& peer,
filedistribution::FileDistributionModelImpl::PeerEntries& result) {
@@ -73,8 +73,7 @@ struct FileDistributionModelImpl::DeployedFilesChangedCallback :
std::weak_ptr<FileDistributionModelImpl> _parent;
- DeployedFilesChangedCallback(
- const std::shared_ptr<FileDistributionModelImpl> & parent)
+ DeployedFilesChangedCallback(const std::shared_ptr<FileDistributionModelImpl> & parent)
:_parent(parent)
{}
@@ -111,7 +110,7 @@ FileDistributionModelImpl::getPeers(const std::string& fileReference, size_t max
LOG(debug, "Found %zu peers for path '%s'", result.size(), path.string().c_str());
return result;
} catch(ZKNodeDoesNotExistsException&) {
- LOG(debug, ("No peer entries available for " + fileReference).c_str());
+ LOG(debug, "No peer entries available for '%s'", fileReference.c_str());
return PeerEntries();
}
}
@@ -119,8 +118,7 @@ FileDistributionModelImpl::getPeers(const std::string& fileReference, size_t max
fs::path
FileDistributionModelImpl::getPeerEntryPath(const std::string& fileReference) {
std::ostringstream entry;
- entry <<_hostName
- <<ZKFileDBModel::_peerEntrySeparator <<_port;
+ entry <<_hostName << ZKFileDBModel::_peerEntrySeparator <<_port;
return _fileDBModel.getPeersPath(fileReference) / entry.str();
}
@@ -167,8 +165,7 @@ std::set<std::string>
FileDistributionModelImpl::getFilesToDownload() {
DeployedFilesToDownload d(_zk.get());
std::vector<std::string> deployed = d.getDeployedFilesToDownload(_hostName,
- DeployedFilesChangedCallback::SP(
- new DeployedFilesChangedCallback(shared_from_this())));
+ make_shared<DeployedFilesChangedCallback>(shared_from_this()));
std::set<std::string> result(deployed.begin(), deployed.end());
@@ -187,8 +184,7 @@ FileDistributionModelImpl::updateActiveFileReferences(
std::sort(sortedFileReferences.begin(), sortedFileReferences.end());
LockGuard guard(_activeFileReferencesMutex);
- bool changed =
- sortedFileReferences != _activeFileReferences;
+ bool changed = sortedFileReferences != _activeFileReferences;
sortedFileReferences.swap(_activeFileReferences);
return changed;
diff --git a/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp b/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp
index 315fb7e86eb..87efd74659d 100644
--- a/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp
@@ -9,7 +9,6 @@
#include <vespa/searchcore/proton/documentmetastore/documentmetastore.h>
#include <vespa/searchcore/proton/documentmetastore/lidreusedelayer.h>
#include <vespa/searchcore/proton/matching/error_constant_value.h>
-#include <vespa/searchcore/proton/metrics/feed_metrics.h>
#include <vespa/searchcore/proton/index/index_writer.h>
#include <vespa/searchcore/proton/index/indexmanager.h>
#include <vespa/searchcore/proton/reprocessing/attribute_reprocessing_initializer.h>
@@ -47,7 +46,7 @@ using proton::matching::SessionManager;
using searchcorespi::IndexSearchable;
using searchcorespi::index::IThreadingService;
using proton::test::MockGidToLidChangeHandler;
-
+using std::make_shared;
using CCR = DocumentDBConfig::ComparisonResult;
using Configurer = SearchableDocSubDBConfigurer;
@@ -178,14 +177,8 @@ Fixture::Fixture()
vespalib::rmdir(BASE_DIR, true);
vespalib::mkdir(BASE_DIR);
initViewSet(_views);
- _configurer.reset(new Configurer(_views._summaryMgr,
- _views.searchView,
- _views.feedView,
- _queryLimiter,
- _constantValueRepo,
- _clock,
- "test",
- 0));
+ _configurer.reset(new Configurer(_views._summaryMgr, _views.searchView, _views.feedView, _queryLimiter,
+ _constantValueRepo, _clock, "test", 0));
}
Fixture::~Fixture() {}
@@ -193,53 +186,33 @@ void
Fixture::initViewSet(ViewSet &views)
{
Matchers::SP matchers(new Matchers(_clock, _queryLimiter, _constantValueRepo));
- IndexManager::SP indexMgr(new IndexManager(BASE_DIR, searchcorespi::index::WarmupConfig(),
- 2, 0, Schema(), 1, views._reconfigurer,
- views._writeService, _summaryExecutor, TuneFileIndexManager(),
- TuneFileAttributes(), views._fileHeaderContext));
- AttributeManager::SP attrMgr(new AttributeManager(BASE_DIR,
- "test.subdb",
- TuneFileAttributes(),
- views._fileHeaderContext,
- views._writeService.
- attributeFieldWriter(),
- views._hwInfo));
+ auto indexMgr = make_shared<IndexManager>(BASE_DIR, searchcorespi::index::WarmupConfig(), 2, 0, Schema(), 1,
+ views._reconfigurer, views._writeService, _summaryExecutor,
+ TuneFileIndexManager(), TuneFileAttributes(), views._fileHeaderContext);
+ auto attrMgr = make_shared<AttributeManager>(BASE_DIR, "test.subdb", TuneFileAttributes(), views._fileHeaderContext,
+ views._writeService.attributeFieldWriter(),views._hwInfo);
ProtonConfig protonCfg;
- SummaryManager::SP summaryMgr(
- new SummaryManager(_summaryExecutor, search::LogDocumentStore::Config(),
- GrowStrategy(), BASE_DIR, views._docTypeName,
- TuneFileSummary(), views._fileHeaderContext,
- views._noTlSyncer, search::IBucketizer::SP()));
- SessionManager::SP sesMgr(
- new SessionManager(protonCfg.grouping.sessionmanager.maxentries));
- DocumentMetaStoreContext::SP metaStore(
- new DocumentMetaStoreContext(std::make_shared<BucketDBOwner>()));
+ auto summaryMgr = make_shared<SummaryManager>
+ (_summaryExecutor, search::LogDocumentStore::Config(), GrowStrategy(), BASE_DIR, views._docTypeName,
+ TuneFileSummary(), views._fileHeaderContext,views._noTlSyncer, search::IBucketizer::SP());
+ auto sesMgr = make_shared<SessionManager>(protonCfg.grouping.sessionmanager.maxentries);
+ auto metaStore = make_shared<DocumentMetaStoreContext>(make_shared<BucketDBOwner>());
IIndexWriter::SP indexWriter(new IndexWriter(indexMgr));
AttributeWriter::SP attrWriter(new AttributeWriter(attrMgr));
ISummaryAdapter::SP summaryAdapter(new SummaryAdapter(summaryMgr));
- views._gidToLidChangeHandler = std::make_shared<MockGidToLidChangeHandler>();
+ views._gidToLidChangeHandler = make_shared<MockGidToLidChangeHandler>();
Schema::SP schema(new Schema());
views._summaryMgr = summaryMgr;
views._dmsc = metaStore;
- views._lidReuseDelayer.reset(
- new documentmetastore::LidReuseDelayer(views._writeService,
- metaStore->get()));
+ views._lidReuseDelayer.reset(new documentmetastore::LidReuseDelayer(views._writeService, metaStore->get()));
IndexSearchable::SP indexSearchable;
- MatchView::SP matchView(new MatchView(matchers, indexSearchable, attrMgr,
- sesMgr, metaStore, views._docIdLimit));
- views.searchView.set(
- SearchView::SP(
- new SearchView(
- summaryMgr->createSummarySetup(SummaryConfig(),
- SummarymapConfig(),
- JuniperrcConfig(),
- views.repo,
- attrMgr),
- matchView)));
- PerDocTypeFeedMetrics metrics(0);
+ MatchView::SP matchView(new MatchView(matchers, indexSearchable, attrMgr, sesMgr, metaStore, views._docIdLimit));
+ views.searchView.set(make_shared<SearchView>
+ (summaryMgr->createSummarySetup(SummaryConfig(), SummarymapConfig(),
+ JuniperrcConfig(), views.repo, attrMgr),
+ matchView));
views.feedView.set(
- SearchableFeedView::SP(
- new SearchableFeedView(StoreOnlyFeedView::Context(summaryAdapter,
+ make_shared<SearchableFeedView>(StoreOnlyFeedView::Context(summaryAdapter,
schema,
views.searchView.get()->getDocumentMetaStore(),
*views._gidToLidChangeHandler,
@@ -251,11 +224,10 @@ Fixture::initViewSet(ViewSet &views)
views.serialNum,
views.serialNum,
views._docTypeName,
- metrics,
0u /* subDbId */,
SubDbType::READY),
FastAccessFeedView::Context(attrWriter, views._docIdLimit),
- SearchableFeedView::Context(indexWriter))));
+ SearchableFeedView::Context(indexWriter)));
}
@@ -263,7 +235,6 @@ using MySummaryAdapter = test::MockSummaryAdapter;
struct MyFastAccessFeedView
{
- PerDocTypeFeedMetrics _metrics;
DummyFileHeaderContext _fileHeaderContext;
DocIdLimit _docIdLimit;
IThreadingService &_writeService;
@@ -276,13 +247,12 @@ struct MyFastAccessFeedView
VarHolder<FastAccessFeedView::SP> _feedView;
MyFastAccessFeedView(IThreadingService &writeService)
- : _metrics(0),
- _fileHeaderContext(),
+ : _fileHeaderContext(),
_docIdLimit(0),
_writeService(writeService),
_hwInfo(),
_dmsc(),
- _gidToLidChangeHandler(std::make_shared<DummyGidToLidChangeHandler>()),
+ _gidToLidChangeHandler(make_shared<DummyGidToLidChangeHandler>()),
_lidReuseDelayer(),
_commitTimeTracker(TimeStamp()),
_feedView()
@@ -295,30 +265,21 @@ struct MyFastAccessFeedView
void init() {
ISummaryAdapter::SP summaryAdapter(new MySummaryAdapter());
Schema::SP schema(new Schema());
- DocumentMetaStoreContext::SP docMetaCtx(
- new DocumentMetaStoreContext(std::make_shared<BucketDBOwner>()));
- _dmsc = docMetaCtx;
- _lidReuseDelayer.reset(
- new documentmetastore::LidReuseDelayer(_writeService,
- docMetaCtx->get()));
+ _dmsc = make_shared<DocumentMetaStoreContext>(std::make_shared<BucketDBOwner>());
+ _lidReuseDelayer.reset(new documentmetastore::LidReuseDelayer(_writeService, _dmsc->get()));
DocumentTypeRepo::SP repo = createRepo();
- StoreOnlyFeedView::Context storeOnlyCtx(summaryAdapter, schema, docMetaCtx, *_gidToLidChangeHandler, repo, _writeService, *_lidReuseDelayer, _commitTimeTracker);
- StoreOnlyFeedView::PersistentParams params(1, 1, DocTypeName(DOC_TYPE), _metrics, 0, SubDbType::NOTREADY);
- AttributeManager::SP mgr(new AttributeManager(BASE_DIR, "test.subdb",
- TuneFileAttributes(),
- _fileHeaderContext,
- _writeService.
- attributeFieldWriter(),
- _hwInfo));
+ StoreOnlyFeedView::Context storeOnlyCtx(summaryAdapter, schema, _dmsc, *_gidToLidChangeHandler, repo,
+ _writeService, *_lidReuseDelayer, _commitTimeTracker);
+ StoreOnlyFeedView::PersistentParams params(1, 1, DocTypeName(DOC_TYPE), 0, SubDbType::NOTREADY);
+ auto mgr = make_shared<AttributeManager>(BASE_DIR, "test.subdb", TuneFileAttributes(), _fileHeaderContext,
+ _writeService.attributeFieldWriter(), _hwInfo);
IAttributeWriter::SP writer(new AttributeWriter(mgr));
FastAccessFeedView::Context fastUpdateCtx(writer, _docIdLimit);
- _feedView.set(FastAccessFeedView::SP(new FastAccessFeedView(storeOnlyCtx,
- params, fastUpdateCtx)));;
+ _feedView.set(FastAccessFeedView::SP(new FastAccessFeedView(storeOnlyCtx, params, fastUpdateCtx)));;
}
};
-MyFastAccessFeedView::~MyFastAccessFeedView() {
-}
+MyFastAccessFeedView::~MyFastAccessFeedView() = default;
struct FastAccessFixture
{
@@ -328,8 +289,7 @@ struct FastAccessFixture
FastAccessFixture()
: _writeService(),
_view(_writeService),
- _configurer(_view._feedView,
- IAttributeWriterFactory::UP(new AttributeWriterFactory), "test")
+ _configurer(_view._feedView, IAttributeWriterFactory::UP(new AttributeWriterFactory), "test")
{
vespalib::rmdir(BASE_DIR, true);
vespalib::mkdir(BASE_DIR);
@@ -339,11 +299,10 @@ struct FastAccessFixture
}
};
-
DocumentDBConfig::SP
createConfig()
{
- return test::DocumentDBConfigBuilder(0, std::make_shared<Schema>(), "client", DOC_TYPE).
+ return test::DocumentDBConfigBuilder(0, make_shared<Schema>(), "client", DOC_TYPE).
repo(createRepo()).build();
}
diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
index cccbbededd1..b0b06a238c9 100644
--- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
@@ -14,7 +14,6 @@
#include <vespa/searchcore/proton/feedoperation/removeoperation.h>
#include <vespa/searchcore/proton/feedoperation/updateoperation.h>
#include <vespa/searchcore/proton/feedoperation/wipehistoryoperation.h>
-#include <vespa/searchcore/proton/metrics/feed_metrics.h>
#include <vespa/searchcore/proton/persistenceengine/i_resource_write_filter.h>
#include <vespa/searchcore/proton/server/configstore.h>
#include <vespa/searchcore/proton/server/ddbstate.h>
@@ -306,7 +305,7 @@ struct MyTransport : public FeedToken::ITransport {
bool documentWasFound;
MyTransport();
~MyTransport();
- virtual void send(Reply::UP, ResultUP res, bool documentWasFound_, double) override {
+ void send(ResultUP res, bool documentWasFound_) override {
result = std::move(res);
documentWasFound = documentWasFound_;
gate.countDown();
@@ -316,21 +315,12 @@ struct MyTransport : public FeedToken::ITransport {
MyTransport::MyTransport() : gate(), result(), documentWasFound(false) {}
MyTransport::~MyTransport() {}
-Reply::UP getReply(uint32_t type) {
- if (type == DocumentProtocol::REPLY_REMOVEDOCUMENT) {
- return Reply::UP(new RemoveDocumentReply);
- } else if (type == DocumentProtocol::REPLY_UPDATEDOCUMENT) {
- return Reply::UP(new UpdateDocumentReply);
- }
- return Reply::UP(new DocumentReply(type));
-}
-
struct FeedTokenContext {
MyTransport transport;
FeedToken::UP token_ap;
FeedToken &token;
- FeedTokenContext(uint32_t type = 0);
+ FeedTokenContext();
~FeedTokenContext();
bool await(uint32_t timeout = 80000) { return transport.gate.await(timeout); }
const Result *getResult() {
@@ -341,24 +331,23 @@ struct FeedTokenContext {
}
};
-FeedTokenContext::FeedTokenContext(uint32_t type)
+FeedTokenContext::FeedTokenContext()
: transport(),
- token_ap(new FeedToken(transport, getReply(type))),
+ token_ap(new FeedToken(transport)),
token(*token_ap)
{
- token.getReply().getTrace().setLevel(9);
}
-FeedTokenContext::~FeedTokenContext() {}
+
+FeedTokenContext::~FeedTokenContext() = default;
struct PutContext {
FeedTokenContext tokenCtx;
DocumentContext docCtx;
typedef std::shared_ptr<PutContext> SP;
PutContext(const vespalib::string &docId, DocBuilder &builder) :
- tokenCtx(DocumentProtocol::REPLY_PUTDOCUMENT),
+ tokenCtx(),
docCtx(docId, builder)
- {
- }
+ {}
};
@@ -372,12 +361,10 @@ struct PutHandler {
builder(db),
timestamp(0),
puts()
- {
- }
+ {}
void put(const vespalib::string &docId) {
PutContext::SP pc(new PutContext(docId, builder));
- FeedOperation::UP op(new PutOperation(pc->docCtx.bucketId,
- timestamp, pc->docCtx.doc));
+ FeedOperation::UP op(new PutOperation(pc->docCtx.bucketId, timestamp, pc->docCtx.doc));
handler.handleOperation(pc->tokenCtx.token, std::move(op));
timestamp = Timestamp(timestamp + 1);
puts.push_back(pc);
@@ -393,18 +380,6 @@ struct PutHandler {
};
-struct MyFeedMetrics : public metrics::MetricSet
-{
- PerDocTypeFeedMetrics _feed;
-
- MyFeedMetrics()
- : metrics::MetricSet("myfeedmetrics", "", "My feed metrics", NULL),
- _feed(this)
- {
- }
-};
-
-
struct MyTlsWriter : TlsWriter {
int store_count;
int erase_count;
@@ -419,7 +394,6 @@ struct MyTlsWriter : TlsWriter {
}
};
-
struct FeedHandlerFixture
{
DummyFileHeaderContext _fileHeaderContext;
@@ -432,7 +406,6 @@ struct FeedHandlerFixture
DDBState _state;
MyReplayConfig replayConfig;
MyFeedView feedView;
- MyFeedMetrics feedMetrics;
MyTlsWriter tls_writer;
BucketDBOwner _bucketDB;
bucketdb::BucketDBHandler _bucketDBHandler;
@@ -449,8 +422,8 @@ struct FeedHandlerFixture
feedView(schema.getRepo()),
_bucketDB(),
_bucketDBHandler(_bucketDB),
- handler(writeService, tlsSpec, schema.getDocType(),
- feedMetrics._feed, _state, owner, writeFilter, replayConfig, tls, &tls_writer)
+ handler(writeService, tlsSpec, schema.getDocType(), _state, owner,
+ writeFilter, replayConfig, tls, &tls_writer)
{
_state.enterLoadState();
_state.enterReplayTransactionLogState();
@@ -507,12 +480,10 @@ TEST_F("require that heartBeat calls FeedView's heartBeat",
TEST_F("require that outdated remove is ignored", FeedHandlerFixture)
{
DocumentContext doc_context("doc:test:foo", *f.schema.builder);
- FeedOperation::UP op(new RemoveOperation(doc_context.bucketId,
- Timestamp(10),
- doc_context.doc->getId()));
+ FeedOperation::UP op(new RemoveOperation(doc_context.bucketId, Timestamp(10), doc_context.doc->getId()));
static_cast<DocumentOperation &>(*op).setPrevDbDocumentId(DbDocumentId(4));
static_cast<DocumentOperation &>(*op).setPrevTimestamp(Timestamp(10000));
- FeedTokenContext token_context(DocumentProtocol::REPLY_REMOVEDOCUMENT);
+ FeedTokenContext token_context;
f.handler.performOperation(std::move(token_context.token_ap), std::move(op));
EXPECT_EQUAL(0, f.feedView.remove_count);
EXPECT_EQUAL(0, f.tls_writer.store_count);
@@ -599,28 +570,21 @@ TEST_F("require that flush cannot unprune", FeedHandlerFixture)
EXPECT_EQUAL(10u, f.handler.getPrunedSerialNum());
}
-TEST_F("require that remove of unknown document with known data type "
- "stores remove", FeedHandlerFixture)
+TEST_F("require that remove of unknown document with known data type stores remove", FeedHandlerFixture)
{
- DocumentContext doc_context("id:test:searchdocument::foo",
- *f.schema.builder);
- FeedOperation::UP op(new RemoveOperation(doc_context.bucketId,
- Timestamp(10),
- doc_context.doc->getId()));
- FeedTokenContext token_context(DocumentProtocol::REPLY_REMOVEDOCUMENT);
+ DocumentContext doc_context("id:test:searchdocument::foo", *f.schema.builder);
+ FeedOperation::UP op(new RemoveOperation(doc_context.bucketId, Timestamp(10), doc_context.doc->getId()));
+ FeedTokenContext token_context;
f.handler.performOperation(std::move(token_context.token_ap), std::move(op));
EXPECT_EQUAL(1, f.feedView.remove_count);
EXPECT_EQUAL(1, f.tls_writer.store_count);
}
-TEST_F("require that partial update for non-existing document is tagged as such",
- FeedHandlerFixture)
+TEST_F("require that partial update for non-existing document is tagged as such", FeedHandlerFixture)
{
UpdateContext upCtx("id:test:searchdocument::foo", *f.schema.builder);
- FeedOperation::UP op(new UpdateOperation(upCtx.bucketId,
- Timestamp(10),
- upCtx.update));
- FeedTokenContext token_context(DocumentProtocol::REPLY_UPDATEDOCUMENT);
+ FeedOperation::UP op(new UpdateOperation(upCtx.bucketId, Timestamp(10), upCtx.update));
+ FeedTokenContext token_context;
f.handler.performOperation(std::move(token_context.token_ap), std::move(op));
const UpdateResult *result = static_cast<const UpdateResult *>(token_context.getResult());
@@ -631,18 +595,14 @@ TEST_F("require that partial update for non-existing document is tagged as such"
EXPECT_EQUAL(0, f.tls_writer.store_count);
}
-TEST_F("require that partial update for non-existing document is created if specified",
- FeedHandlerFixture)
+TEST_F("require that partial update for non-existing document is created if specified", FeedHandlerFixture)
{
f.handler.setSerialNum(15);
UpdateContext upCtx("id:test:searchdocument::foo", *f.schema.builder);
upCtx.update->setCreateIfNonExistent(true);
- f.feedView.metaStore.insert(upCtx.update->getId().getGlobalId(),
- MyDocumentMetaStore::Entry(5, 5, Timestamp(10)));
- FeedOperation::UP op(new UpdateOperation(upCtx.bucketId,
- Timestamp(10),
- upCtx.update));
- FeedTokenContext token_context(DocumentProtocol::REPLY_UPDATEDOCUMENT);
+ f.feedView.metaStore.insert(upCtx.update->getId().getGlobalId(), MyDocumentMetaStore::Entry(5, 5, Timestamp(10)));
+ FeedOperation::UP op(new UpdateOperation(upCtx.bucketId, Timestamp(10), upCtx.update));
+ FeedTokenContext token_context;
f.handler.performOperation(std::move(token_context.token_ap), std::move(op));
const UpdateResult *result = static_cast<const UpdateResult *>(token_context.getResult());
@@ -678,7 +638,7 @@ TEST_F("require that update is rejected if resource limit is reached", FeedHandl
UpdateContext updCtx("id:test:searchdocument::foo", *f.schema.builder);
FeedOperation::UP op = std::make_unique<UpdateOperation>(updCtx.bucketId, Timestamp(10), updCtx.update);
- FeedTokenContext token(DocumentProtocol::REPLY_UPDATEDOCUMENT);
+ FeedTokenContext token;
f.handler.performOperation(std::move(token.token_ap), std::move(op));
EXPECT_EQUAL(0, f.feedView.update_count);
EXPECT_TRUE(dynamic_cast<const UpdateResult *>(token.getResult()));
@@ -694,7 +654,7 @@ TEST_F("require that remove is NOT rejected if resource limit is reached", FeedH
DocumentContext docCtx("id:test:searchdocument::foo", *f.schema.builder);
FeedOperation::UP op = std::make_unique<RemoveOperation>(docCtx.bucketId, Timestamp(10), docCtx.doc->getId());
- FeedTokenContext token(DocumentProtocol::REPLY_REMOVEDOCUMENT);
+ FeedTokenContext token;
f.handler.performOperation(std::move(token.token_ap), std::move(op));
EXPECT_EQUAL(1, f.feedView.remove_count);
EXPECT_EQUAL(Result::NONE, token.getResult()->getErrorCode());
diff --git a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp
index c820a9f392c..4eefbed0a53 100644
--- a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp
@@ -10,7 +10,6 @@
#include <vespa/searchcore/proton/common/feedtoken.h>
#include <vespa/searchcore/proton/documentmetastore/lidreusedelayer.h>
#include <vespa/searchcore/proton/index/i_index_writer.h>
-#include <vespa/searchcore/proton/metrics/feed_metrics.h>
#include <vespa/searchcore/proton/server/executorthreadingservice.h>
#include <vespa/searchcore/proton/server/ifrozenbuckethandler.h>
#include <vespa/searchcore/proton/server/isummaryadapter.h>
@@ -30,9 +29,6 @@
#include <vespa/searchlib/docstore/cachestats.h>
#include <vespa/searchlib/docstore/idocumentstore.h>
#include <vespa/searchlib/index/docbuilder.h>
-#include <vespa/vespalib/testkit/testapp.h>
-#include <vespa/vespalib/util/blockingthreadstackexecutor.h>
-#include <mutex>
#include <vespa/log/log.h>
LOG_SETUP("feedview_test");
@@ -132,8 +128,6 @@ struct MyTracer
struct ParamsContext
{
DocTypeName _docTypeName;
- FeedMetrics _feedMetrics;
- PerDocTypeFeedMetrics _metrics;
SearchableFeedView::PersistentParams _params;
ParamsContext(const vespalib::string &docType, const vespalib::string &baseDir);
@@ -143,9 +137,7 @@ struct ParamsContext
ParamsContext::ParamsContext(const vespalib::string &docType, const vespalib::string &baseDir)
: _docTypeName(docType),
- _feedMetrics(),
- _metrics(&_feedMetrics),
- _params(0, 0, _docTypeName, _metrics, subdb_id, SubDbType::READY)
+ _params(0, 0, _docTypeName, subdb_id, SubDbType::READY)
{
(void) baseDir;
}
@@ -420,11 +412,7 @@ struct MyTransport : public FeedToken::ITransport
MyTracer &_tracer;
MyTransport(MyTracer &tracer);
~MyTransport();
- virtual void send(mbus::Reply::UP reply,
- ResultUP result,
- bool documentWasFound,
- double latency_ms) override {
- (void) reply; (void) documentWasFound, (void) latency_ms;
+ void send(ResultUP result, bool ) override {
lastResult = std::move(result);
_tracer.traceAck(lastResult);
_gate.countDown();
@@ -492,36 +480,20 @@ DocumentContext::DocumentContext(const vespalib::string &docId, uint64_t timesta
{}
DocumentContext::~DocumentContext() {}
-namespace {
-
-mbus::Reply::UP
-createReply(MessageType mtype)
-{
- if (mtype == DocumentProtocol::REPLY_UPDATEDOCUMENT) {
- return mbus::Reply::UP(new documentapi::UpdateDocumentReply);
- } else if (mtype == DocumentProtocol::REPLY_REMOVEDOCUMENT) {
- return mbus::Reply::UP(new documentapi::RemoveDocumentReply);
- } else {
- return mbus::Reply::UP(new documentapi::DocumentReply(mtype));
- }
-}
-
-} // namespace
-
struct FeedTokenContext
{
MyTransport mt;
FeedToken ft;
typedef std::shared_ptr<FeedTokenContext> SP;
typedef std::vector<SP> List;
- FeedTokenContext(MyTracer &tracer, MessageType mtype);
+ FeedTokenContext(MyTracer &tracer);
~FeedTokenContext();
};
-FeedTokenContext::FeedTokenContext(MyTracer &tracer, MessageType mtype)
- : mt(tracer), ft(mt, createReply(mtype))
+FeedTokenContext::FeedTokenContext(MyTracer &tracer)
+ : mt(tracer), ft(mt)
{}
-FeedTokenContext::~FeedTokenContext() {}
+FeedTokenContext::~FeedTokenContext() = default;
struct FixtureBase
{
@@ -612,7 +584,7 @@ struct FixtureBase
}
void putAndWait(const DocumentContext &docCtx) {
- FeedTokenContext token(_tracer, DocumentProtocol::REPLY_PUTDOCUMENT);
+ FeedTokenContext token(_tracer);
PutOperation op(docCtx.bid, docCtx.ts, docCtx.doc);
runInMaster([&] () { performPut(&token.ft, op); });
}
@@ -624,7 +596,7 @@ struct FixtureBase
}
void updateAndWait(const DocumentContext &docCtx) {
- FeedTokenContext token(_tracer, DocumentProtocol::REPLY_UPDATEDOCUMENT);
+ FeedTokenContext token(_tracer);
UpdateOperation op(docCtx.bid, docCtx.ts, docCtx.upd);
runInMaster([&] () { performUpdate(&token.ft, op); });
}
@@ -636,13 +608,13 @@ struct FixtureBase
getFeedView().handleRemove(token, op);
} else {
if (token != NULL) {
- token->ack(op.getType(), pc._metrics);
+ token->ack();
}
}
}
void removeAndWait(const DocumentContext &docCtx) {
- FeedTokenContext token(_tracer, DocumentProtocol::REPLY_REMOVEDOCUMENT);
+ FeedTokenContext token(_tracer);
RemoveOperation op(docCtx.bid, docCtx.ts, docCtx.doc->getId());
runInMaster([&] () { performRemove(&token.ft, op); });
}
diff --git a/searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp b/searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp
index 1ec201fdbd2..bbdabe2d0ef 100644
--- a/searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/storeonlyfeedview/storeonlyfeedview_test.cpp
@@ -1,13 +1,10 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/document/base/documentid.h>
-#include <vespa/document/base/globalid.h>
-#include <vespa/document/bucket/bucketid.h>
#include <vespa/document/datatype/datatype.h>
#include <vespa/searchcommon/common/schema.h>
#include <vespa/searchcore/proton/common/commit_time_tracker.h>
#include <vespa/searchcore/proton/documentmetastore/lidreusedelayer.h>
-#include <vespa/searchcore/proton/metrics/feed_metrics.h>
#include <vespa/searchcore/proton/server/executorthreadingservice.h>
#include <vespa/searchcore/proton/server/putdonecontext.h>
#include <vespa/searchcore/proton/server/removedonecontext.h>
@@ -15,8 +12,6 @@
#include <vespa/searchcore/proton/reference/dummy_gid_to_lid_change_handler.h>
#include <vespa/searchcore/proton/test/mock_summary_adapter.h>
#include <vespa/searchcore/proton/test/thread_utils.h>
-#include <vespa/searchlib/common/idestructorcallback.h>
-#include <vespa/searchlib/common/serialnum.h>
#include <vespa/searchlib/index/docbuilder.h>
#include <vespa/vespalib/testkit/testapp.h>
@@ -225,8 +220,7 @@ struct FixtureBase {
commitTimeTracker(fastos::TimeStamp()),
feedview()
{
- PerDocTypeFeedMetrics metrics(0);
- StoreOnlyFeedView::PersistentParams params(0, 0, DocTypeName("foo"), metrics, subdb_id, subDbType);
+ StoreOnlyFeedView::PersistentParams params(0, 0, DocTypeName("foo"), subdb_id, subDbType);
metaStore->constructFreeList();
ISummaryAdapter::SP adapter = std::make_unique<MySummaryAdapter>(removeCount, putCount, heartbeatCount);
feedview = std::make_unique<FeedViewType>(adapter, metaStore, writeService, lidReuseDelayer,
diff --git a/searchcore/src/tests/proton/feedtoken/feedtoken.cpp b/searchcore/src/tests/proton/feedtoken/feedtoken.cpp
index 9df65ae3437..530c9ebef39 100644
--- a/searchcore/src/tests/proton/feedtoken/feedtoken.cpp
+++ b/searchcore/src/tests/proton/feedtoken/feedtoken.cpp
@@ -1,57 +1,39 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/messagebus/emptyreply.h>
-#include <vespa/messagebus/testlib/receptor.h>
-#include <vespa/documentapi/messagebus/messages/removedocumentreply.h>
-#include <vespa/searchcore/proton/common/feedtoken.h>
#include <vespa/vespalib/testkit/testapp.h>
+#include <vespa/searchcore/proton/common/feedtoken.h>
#include <vespa/vespalib/util/exceptions.h>
using namespace proton;
class LocalTransport : public FeedToken::ITransport {
private:
- mbus::Receptor _receptor;
- double _latency_ms;
+ size_t _receivedCount;
public:
LocalTransport()
- : _receptor(),
- _latency_ms(0.0)
- {
- // empty
- }
-
- void send(mbus::Reply::UP reply, ResultUP, bool, double latency_ms) override {
- _receptor.handleReply(std::move(reply));
- _latency_ms = latency_ms;
- }
+ : _receivedCount(0)
+ { }
- mbus::Reply::UP getReply() {
- return _receptor.getReply();
+ void send(ResultUP, bool) override {
+ _receivedCount++;
}
- double getLatencyMs() const {
- return _latency_ms;
- }
+ size_t getReceivedCount() const { return _receivedCount; }
};
class Test : public vespalib::TestApp {
private:
void testAck();
- void testAutoReply();
void testFail();
void testHandover();
- void testIntegrity();
public:
int Main() override {
TEST_INIT("feedtoken_test");
testAck(); TEST_FLUSH();
-// testAutoReply(); TEST_FLUSH();
testFail(); TEST_FLUSH();
testHandover(); TEST_FLUSH();
-// testIntegrity(); TEST_FLUSH();
TEST_DONE();
}
@@ -63,41 +45,18 @@ void
Test::testAck()
{
LocalTransport transport;
- mbus::Reply::UP msg(new documentapi::RemoveDocumentReply());
- FeedToken token(transport, std::move(msg));
+ FeedToken token(transport);
token.ack();
- mbus::Reply::UP reply = transport.getReply();
- ASSERT_TRUE(reply.get() != NULL);
- EXPECT_TRUE(!reply->hasErrors());
-}
-
-void
-Test::testAutoReply()
-{
- mbus::Receptor receptor;
- mbus::Reply::UP reply(new documentapi::RemoveDocumentReply());
- reply->pushHandler(receptor);
- {
- LocalTransport transport;
- FeedToken token(transport, std::move(reply));
- }
- reply = receptor.getReply(0);
- ASSERT_TRUE(reply.get() != NULL);
- EXPECT_TRUE(reply->hasErrors());
+ EXPECT_EQUAL(1u, transport.getReceivedCount());
}
void
Test::testFail()
{
LocalTransport transport;
- mbus::Reply::UP reply(new documentapi::RemoveDocumentReply());
- FeedToken token(transport, std::move(reply));
- token.fail(69, "6699");
- reply = transport.getReply();
- ASSERT_TRUE(reply.get() != NULL);
- EXPECT_EQUAL(1u, reply->getNumErrors());
- EXPECT_EQUAL(69u, reply->getError(0).getCode());
- EXPECT_EQUAL("6699", reply->getError(0).getMessage());
+ FeedToken token(transport);
+ token.fail();
+ EXPECT_EQUAL(1u, transport.getReceivedCount());
}
void
@@ -110,25 +69,11 @@ Test::testHandover()
};
LocalTransport transport;
- mbus::Reply::UP reply(new documentapi::RemoveDocumentReply());
- FeedToken token(transport, std::move(reply));
+ FeedToken token(transport);
token = MyHandover::handover(token);
token.ack();
- reply = transport.getReply();
- ASSERT_TRUE(reply.get() != NULL);
- EXPECT_TRUE(!reply->hasErrors());
+ EXPECT_EQUAL(1u, transport.getReceivedCount());
}
-void
-Test::testIntegrity()
-{
- LocalTransport transport;
- try {
- FeedToken token(transport, mbus::Reply::UP());
- EXPECT_TRUE(false); // should throw an exception
- } catch (vespalib::IllegalArgumentException &e) {
- (void)e; // expected
- }
-}
diff --git a/searchcore/src/vespa/searchcore/fdispatch/search/nodemanager.cpp b/searchcore/src/vespa/searchcore/fdispatch/search/nodemanager.cpp
index fd60b16ee70..10f70b2e518 100644
--- a/searchcore/src/vespa/searchcore/fdispatch/search/nodemanager.cpp
+++ b/searchcore/src/vespa/searchcore/fdispatch/search/nodemanager.cpp
@@ -239,10 +239,11 @@ FastS_NodeManager::SetCollDesc(FastS_DataSetCollDesc *configDesc,
break;
FastOS_Thread::Sleep(100);
};
- if (allup)
+ if (allup) {
LOG(debug, "All new engines up after %d ms", rwait);
- else
+ } else {
LOG(debug, "Some new engines still down after %d ms", rwait);
+ }
}
gencnt = SetDataSetCollection(newCollection);
diff --git a/searchcore/src/vespa/searchcore/proton/attribute/i_attribute_writer.h b/searchcore/src/vespa/searchcore/proton/attribute/i_attribute_writer.h
index 7455ccda5a3..abcf132d537 100644
--- a/searchcore/src/vespa/searchcore/proton/attribute/i_attribute_writer.h
+++ b/searchcore/src/vespa/searchcore/proton/attribute/i_attribute_writer.h
@@ -9,11 +9,7 @@
#include <vespa/searchcore/proton/attribute/i_attribute_manager.h>
#include <vespa/searchcore/proton/feedoperation/lidvectorcontext.h>
-namespace search {
-
-class IDestructorCallback;
-
-}
+namespace search { class IDestructorCallback; }
namespace proton {
diff --git a/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp b/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp
index 545a303cb16..008fafa332b 100644
--- a/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp
+++ b/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp
@@ -1,93 +1,53 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "feedtoken.h"
-#include <vespa/searchcore/proton/metrics/feed_metrics.h>
namespace proton {
-FeedToken::FeedToken(ITransport &transport, mbus::Reply::UP reply) :
- _state(new State(transport, std::move(reply), 1))
+FeedToken::FeedToken(ITransport &transport) :
+ _state(new State(transport, 1))
{
}
-FeedToken::State::State(ITransport & transport, mbus::Reply::UP reply, uint32_t numAcksRequired) :
+FeedToken::State::State(ITransport & transport, uint32_t numAcksRequired) :
_transport(transport),
- _reply(std::move(reply)),
_result(new storage::spi::Result()),
_documentWasFound(false),
- _unAckedCount(numAcksRequired),
- _lock(),
- _startTime()
+ _unAckedCount(numAcksRequired)
{
- assert(_reply);
- _startTime.SetNow();
+ assert(_unAckedCount > 0);
}
FeedToken::State::~State()
{
- assert(!_reply);
+ assert(_unAckedCount == 0);
}
void
FeedToken::State::ack()
{
- assert(_reply);
uint32_t prev(_unAckedCount--);
if (prev == 1) {
- _transport.send(std::move(_reply), std::move(_result), _documentWasFound, _startTime.MilliSecsToNow());
+ _transport.send(std::move(_result), _documentWasFound);
}
assert(prev >= 1);
}
-
-void
-FeedToken::State::ack(const FeedOperation::Type opType,
- PerDocTypeFeedMetrics &metrics)
-{
- assert(_reply);
- uint32_t prev(_unAckedCount--);
- if (prev == 1) {
- _transport.send(std::move(_reply), std::move(_result), _documentWasFound, _startTime.MilliSecsToNow());
- switch (opType) {
- case FeedOperation::PUT:
- metrics.RegisterPut(_startTime);
- break;
- case FeedOperation::REMOVE:
- case FeedOperation::REMOVE_BATCH:
- metrics.RegisterRemove(_startTime);
- break;
- case FeedOperation::UPDATE_42:
- case FeedOperation::UPDATE:
- metrics.RegisterUpdate(_startTime);
- break;
- case FeedOperation::MOVE:
- metrics.RegisterMove(_startTime);
- break;
- default:
- ;
- }
- }
- assert(prev >= 1);
-}
-
-
void
FeedToken::State::incNeededAcks()
{
- assert(_reply);
uint32_t prev(_unAckedCount++);
assert(prev >= 1);
(void) prev;
}
-
void
-FeedToken::State::fail(uint32_t errNum, const vespalib::string &errMsg)
+FeedToken::State::fail()
{
- assert(_reply);
- vespalib::LockGuard guard(_lock);
- _reply->addError(mbus::Error(errNum, errMsg));
- _transport.send(std::move(_reply), std::move(_result), _documentWasFound, _startTime.MilliSecsToNow());
+ uint32_t prev = _unAckedCount.exchange(0);
+ if (prev > 0) {
+ _transport.send(std::move(_result), _documentWasFound);
+ }
}
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h
index 722827ded87..856c8a22652 100644
--- a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h
+++ b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h
@@ -1,16 +1,13 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
-#include <vespa/messagebus/reply.h>
#include <vespa/persistence/spi/persistenceprovider.h>
#include <vespa/vespalib/util/exception.h>
#include <vespa/vespalib/util/sync.h>
-#include <vespa/searchcore/proton/feedoperation/feedoperation.h>
#include <atomic>
namespace proton {
-class PerDocTypeFeedMetrics;
typedef std::unique_ptr<storage::spi::Result> ResultUP;
/**
@@ -23,10 +20,7 @@ public:
class ITransport {
public:
virtual ~ITransport() { }
- virtual void send(mbus::Reply::UP reply,
- ResultUP result,
- bool documentWasFound,
- double latency_ms) = 0;
+ virtual void send(ResultUP result, bool documentWasFound) = 0;
};
private:
@@ -34,30 +28,21 @@ private:
public:
State(const State &) = delete;
State & operator = (const State &) = delete;
- State(ITransport & transport, mbus::Reply::UP reply, uint32_t numAcksRequired);
+ State(ITransport & transport, uint32_t numAcksRequired);
~State();
- void ack();
-
- void ack(const FeedOperation::Type opType, PerDocTypeFeedMetrics &metrics);
-
void incNeededAcks();
-
- void fail(uint32_t errNum, const vespalib::string &errMsg);
- mbus::Reply & getReply() { return *_reply; }
+ void ack();
+ void fail();
void setResult(ResultUP result, bool documentWasFound) {
_documentWasFound = documentWasFound;
_result = std::move(result);
}
const storage::spi::Result &getResult() { return *_result; }
- FastOS_Time getStartTime() const { return _startTime; }
private:
- ITransport &_transport;
- mbus::Reply::UP _reply;
- ResultUP _result;
- bool _documentWasFound;
+ ITransport &_transport;
+ ResultUP _result;
+ bool _documentWasFound;
std::atomic<uint32_t> _unAckedCount;
- vespalib::Lock _lock;
- FastOS_Time _startTime;
};
std::shared_ptr<State> _state;
@@ -72,9 +57,8 @@ public:
* vespalib::IllegalArgumentException.
*
* @param transport The transport to pass the reply to.
- * @param reply The mbus::Reply corresponding to this operation.
*/
- FeedToken(ITransport &transport, mbus::Reply::UP reply);
+ FeedToken(ITransport &transport);
FeedToken(FeedToken &&) = default;
FeedToken & operator =(FeedToken &&) = default;
@@ -89,10 +73,6 @@ public:
*/
void ack() const { _state->ack(); }
- void ack(const FeedOperation::Type opType, PerDocTypeFeedMetrics &metrics) const {
- _state->ack(opType, metrics);
- }
-
void incNeededAcks() const {
_state->incNeededAcks();
}
@@ -105,14 +85,7 @@ public:
* @param errNum A numerical representation of the error.
* @param errMsg A readable string detailing the error.
*/
- void fail(uint32_t errNum, const vespalib::string &errMsg) const { _state->fail(errNum, errMsg); }
-
- /**
- * Gives you access to the underlying reply message.
- *
- * @return The reply
- */
- mbus::Reply & getReply() const { return _state->getReply(); }
+ void fail() const { _state->fail(); }
/**
* Gives you access to the underlying result.
@@ -127,8 +100,6 @@ public:
void setResult(ResultUP result, bool documentWasFound) {
_state->setResult(std::move(result), documentWasFound);
}
-
- FastOS_Time getStartTime() const { return _state->getStartTime(); }
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/metrics/CMakeLists.txt
index 9f599208aa2..95d5f153f35 100644
--- a/searchcore/src/vespa/searchcore/proton/metrics/CMakeLists.txt
+++ b/searchcore/src/vespa/searchcore/proton/metrics/CMakeLists.txt
@@ -7,7 +7,6 @@ vespa_add_library(searchcore_proton_metrics STATIC
documentdb_metrics_collection.cpp
documentdb_tagged_metrics.cpp
executor_metrics.cpp
- feed_metrics.cpp
job_load_sampler.cpp
job_tracker.cpp
job_tracked_flush_target.cpp
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/feed_metrics.cpp b/searchcore/src/vespa/searchcore/proton/metrics/feed_metrics.cpp
deleted file mode 100644
index 06b2f8a5fca..00000000000
--- a/searchcore/src/vespa/searchcore/proton/metrics/feed_metrics.cpp
+++ /dev/null
@@ -1,60 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#include "feed_metrics.h"
-
-using vespalib::LockGuard;
-
-namespace proton {
-
-FeedMetrics::FeedMetrics()
- : metrics::MetricSet("feed", "", "Feed metrics", 0),
- count("count", "logdefault", "Feed messages handled", this),
- latency("latency", "logdefault", "Feed message latency", this)
-{
-}
-
-FeedMetrics::~FeedMetrics() {}
-
-PerDocTypeFeedMetrics::PerDocTypeFeedMetrics(MetricSet *parent)
- : MetricSet("feedmetrics", "", "Feed metrics", parent),
- _update_lock(),
- _puts("puts", "", "Number of feed put operations", this),
- _updates("updates", "", "Number of feed update operations", this),
- _removes("removes", "", "Number of feed remove operations", this),
- _moves("moves", "", "Number of feed move operations", this),
- _put_latency("put_latency", "", "Latency for feed puts", this),
- _update_latency("update_latency", "", "Latency for feed updates", this),
- _remove_latency("remove_latency", "", "Latency for feed removes", this),
- _move_latency("move_latency", "", "Latency for feed moves", this)
-{
-}
-
-PerDocTypeFeedMetrics::~PerDocTypeFeedMetrics() {}
-
-void PerDocTypeFeedMetrics::RegisterPut(const FastOS_Time &start_time) {
- LockGuard lock(_update_lock);
- _puts.inc(1);
- _put_latency.addValue(start_time.MilliSecsToNow() / 1000.0);
-}
-
-void PerDocTypeFeedMetrics::RegisterUpdate(const FastOS_Time &start_time) {
- LockGuard lock(_update_lock);
- _updates.inc(1);
- _update_latency.addValue(start_time.MilliSecsToNow() / 1000.0);
-}
-
-void PerDocTypeFeedMetrics::RegisterRemove(const FastOS_Time &start_time) {
- LockGuard lock(_update_lock);
- _removes.inc(1);
- _remove_latency.addValue(start_time.MilliSecsToNow() / 1000.0);
-}
-
-void
-PerDocTypeFeedMetrics::RegisterMove(const FastOS_Time &start_time)
-{
- LockGuard lock(_update_lock);
- _moves.inc(1);
- _move_latency.addValue(start_time.MilliSecsToNow() / 1000.0);
-}
-
-} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/feed_metrics.h b/searchcore/src/vespa/searchcore/proton/metrics/feed_metrics.h
deleted file mode 100644
index 8d8a8bcbb88..00000000000
--- a/searchcore/src/vespa/searchcore/proton/metrics/feed_metrics.h
+++ /dev/null
@@ -1,41 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-
-#pragma once
-
-#include <vespa/metrics/metrics.h>
-#include <vespa/vespalib/util/sync.h>
-
-namespace proton {
-
-struct FeedMetrics : metrics::MetricSet
-{
- vespalib::Lock updateLock;
- metrics::LongCountMetric count;
- metrics::DoubleAverageMetric latency;
-
- FeedMetrics();
- ~FeedMetrics();
-};
-
-class PerDocTypeFeedMetrics : metrics::MetricSet {
- vespalib::Lock _update_lock;
- metrics::LongCountMetric _puts;
- metrics::LongCountMetric _updates;
- metrics::LongCountMetric _removes;
- metrics::LongCountMetric _moves;
- metrics::DoubleAverageMetric _put_latency;
- metrics::DoubleAverageMetric _update_latency;
- metrics::DoubleAverageMetric _remove_latency;
- metrics::DoubleAverageMetric _move_latency;
-
-public:
- PerDocTypeFeedMetrics(metrics::MetricSet *parent);
- ~PerDocTypeFeedMetrics();
- void RegisterPut(const FastOS_Time &start_time);
- void RegisterUpdate(const FastOS_Time &start_time);
- void RegisterRemove(const FastOS_Time &start_time);
- void RegisterMove(const FastOS_Time &start_time);
-};
-
-} // namespace proton
-
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/legacy_documentdb_metrics.cpp b/searchcore/src/vespa/searchcore/proton/metrics/legacy_documentdb_metrics.cpp
index d324c4a7373..2c753d24c69 100644
--- a/searchcore/src/vespa/searchcore/proton/metrics/legacy_documentdb_metrics.cpp
+++ b/searchcore/src/vespa/searchcore/proton/metrics/legacy_documentdb_metrics.cpp
@@ -152,7 +152,6 @@ LegacyDocumentDBMetrics::LegacyDocumentDBMetrics(const std::string &docTypeName,
executor("executor", this),
indexExecutor("indexexecutor", this),
summaryExecutor("summaryexecutor", this),
- feed(this),
sessionManager(this),
ready("ready", this),
notReady("notready", this),
diff --git a/searchcore/src/vespa/searchcore/proton/metrics/legacy_documentdb_metrics.h b/searchcore/src/vespa/searchcore/proton/metrics/legacy_documentdb_metrics.h
index 78a13002d7a..0abad83a3a6 100644
--- a/searchcore/src/vespa/searchcore/proton/metrics/legacy_documentdb_metrics.h
+++ b/searchcore/src/vespa/searchcore/proton/metrics/legacy_documentdb_metrics.h
@@ -6,7 +6,6 @@
#include "executor_metrics.h"
#include <vespa/metrics/metrics.h>
#include "sessionmanager_metrics.h"
-#include "feed_metrics.h"
#include <vespa/searchcore/proton/matching/matching_stats.h>
namespace proton {
@@ -117,7 +116,6 @@ struct LegacyDocumentDBMetrics : metrics::MetricSet
ExecutorMetrics executor;
ExecutorMetrics indexExecutor;
ExecutorMetrics summaryExecutor;
- PerDocTypeFeedMetrics feed;
search::grouping::SessionManagerMetrics sessionManager;
SubDBMetrics ready;
SubDBMetrics notReady;
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
index 69a3a902af8..feebbf4cf2a 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
@@ -182,17 +182,14 @@ PersistenceEngine::getHandlerSnapshot(document::BucketSpace bucketSpace) const
}
PersistenceEngine::HandlerSnapshot::UP
-PersistenceEngine::getHandlerSnapshot(document::BucketSpace bucketSpace,
- const DocumentId &id) const
+PersistenceEngine::getHandlerSnapshot(document::BucketSpace bucketSpace, const DocumentId &id) const
{
LockGuard guard(_lock);
return _handlers.getHandlerSnapshot(bucketSpace, id);
}
-PersistenceEngine::PersistenceEngine(IPersistenceEngineOwner &owner,
- const IResourceWriteFilter &writeFilter,
- ssize_t defaultSerializedSize,
- bool ignoreMaxBytes)
+PersistenceEngine::PersistenceEngine(IPersistenceEngineOwner &owner, const IResourceWriteFilter &writeFilter,
+ ssize_t defaultSerializedSize, bool ignoreMaxBytes)
: AbstractPersistenceProvider(),
_defaultSerializedSize(defaultSerializedSize),
_ignoreMaxBytes(ignoreMaxBytes),
@@ -216,8 +213,7 @@ PersistenceEngine::~PersistenceEngine()
IPersistenceHandler::SP
-PersistenceEngine::putHandler(document::BucketSpace bucketSpace,
- const DocTypeName &docType,
+PersistenceEngine::putHandler(document::BucketSpace bucketSpace, const DocTypeName &docType,
const IPersistenceHandler::SP &handler)
{
LockGuard guard(_lock);
@@ -226,8 +222,7 @@ PersistenceEngine::putHandler(document::BucketSpace bucketSpace,
IPersistenceHandler::SP
-PersistenceEngine::getHandler(document::BucketSpace bucketSpace,
- const DocTypeName &docType) const
+PersistenceEngine::getHandler(document::BucketSpace bucketSpace, const DocTypeName &docType) const
{
LockGuard guard(_lock);
return _handlers.getHandler(bucketSpace, docType);
@@ -235,8 +230,7 @@ PersistenceEngine::getHandler(document::BucketSpace bucketSpace,
IPersistenceHandler::SP
-PersistenceEngine::removeHandler(document::BucketSpace bucketSpace,
- const DocTypeName &docType)
+PersistenceEngine::removeHandler(document::BucketSpace bucketSpace, const DocTypeName &docType)
{
// TODO: Grab bucket list and treat them as modified
LockGuard guard(_lock);
@@ -367,8 +361,7 @@ PersistenceEngine::put(const Bucket& b, Timestamp t, const document::Document::S
docType.toString().c_str()));
}
TransportLatch latch(1);
- FeedToken token(latch, mbus::Reply::UP(new documentapi::FeedReply(
- documentapi::DocumentProtocol::REPLY_PUTDOCUMENT)));
+ FeedToken token(latch);
handler->handlePut(token, b, t, doc);
latch.await();
return latch.getResult();
@@ -390,7 +383,7 @@ PersistenceEngine::remove(const Bucket& b, Timestamp t, const DocumentId& did, C
TransportLatch latch(snap->size());
for (; snap->handlers().valid(); snap->handlers().next()) {
IPersistenceHandler *handler = snap->handlers().get();
- FeedToken token(latch, Reply::UP(new RemoveDocumentReply));
+ FeedToken token(latch);
handler->handleRemove(token, b, t, did);
}
latch.await();
@@ -421,7 +414,7 @@ PersistenceEngine::update(const Bucket& b, Timestamp t, const DocumentUpdate::SP
IPersistenceHandler::SP handler = getHandler(b.getBucketSpace(), docType);
TransportLatch latch(1);
if (handler.get() != NULL) {
- FeedToken token(latch, mbus::Reply::UP(new documentapi::UpdateDocumentReply()));
+ FeedToken token(latch);
LOG(debug, "update = %s", upd->toXml().c_str());
handler->handleUpdate(token, b, t, upd);
latch.await();
@@ -433,10 +426,7 @@ PersistenceEngine::update(const Bucket& b, Timestamp t, const DocumentUpdate::SP
PersistenceEngine::GetResult
-PersistenceEngine::get(const Bucket& b,
- const document::FieldSet& fields,
- const DocumentId& did,
- Context& context) const
+PersistenceEngine::get(const Bucket& b, const document::FieldSet& fields, const DocumentId& did, Context& context) const
{
std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex);
HandlerSnapshot::UP snapshot = getHandlerSnapshot(b.getBucketSpace());
@@ -465,11 +455,8 @@ PersistenceEngine::get(const Bucket& b,
PersistenceEngine::CreateIteratorResult
-PersistenceEngine::createIterator(const Bucket &bucket,
- const document::FieldSet& fields,
- const Selection &selection,
- IncludedVersions versions,
- Context & context)
+PersistenceEngine::createIterator(const Bucket &bucket, const document::FieldSet& fields, const Selection &selection,
+ IncludedVersions versions, Context & context)
{
std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex);
HandlerSnapshot::UP snapshot = getHandlerSnapshot(bucket.getBucketSpace());
@@ -552,7 +539,7 @@ PersistenceEngine::createBucket(const Bucket &b, Context &)
TransportLatch latch(snap->size());
for (; snap->handlers().valid(); snap->handlers().next()) {
IPersistenceHandler *handler = snap->handlers().get();
- FeedToken token(latch, Reply::UP(new DocumentReply(0)));
+ FeedToken token(latch);
handler->handleCreateBucket(token, b);
}
latch.await();
@@ -569,7 +556,7 @@ PersistenceEngine::deleteBucket(const Bucket& b, Context&)
TransportLatch latch(snap->size());
for (; snap->handlers().valid(); snap->handlers().next()) {
IPersistenceHandler *handler = snap->handlers().get();
- FeedToken token(latch, Reply::UP(new DocumentReply(0)));
+ FeedToken token(latch);
handler->handleDeleteBucket(token, b);
}
latch.await();
@@ -612,7 +599,7 @@ PersistenceEngine::split(const Bucket& source, const Bucket& target1, const Buck
TransportLatch latch(snap->size());
for (; snap->handlers().valid(); snap->handlers().next()) {
IPersistenceHandler *handler = snap->handlers().get();
- FeedToken token(latch, Reply::UP(new DocumentReply(0)));
+ FeedToken token(latch);
handler->handleSplit(token, source, target1, target2);
}
latch.await();
@@ -631,7 +618,7 @@ PersistenceEngine::join(const Bucket& source1, const Bucket& source2, const Buck
TransportLatch latch(snap->size());
for (; snap->handlers().valid(); snap->handlers().next()) {
IPersistenceHandler *handler = snap->handlers().get();
- FeedToken token(latch, Reply::UP(new DocumentReply(0)));
+ FeedToken token(latch);
handler->handleJoin(token, source1, source2, target);
}
latch.await();
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp
index e719a0aa962..e0d512ae6e0 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp
@@ -16,16 +16,11 @@ TransportLatch::TransportLatch(uint32_t cnt)
TransportLatch::~TransportLatch() {}
void
-TransportLatch::send(mbus::Reply::UP reply,
- ResultUP result,
- bool documentWasFound,
- double latency_ms)
+TransportLatch::send(ResultUP result, bool documentWasFound)
{
- (void) reply;
- (void) latency_ms;
{
vespalib::LockGuard guard(_lock);
- if (!_result.get()) {
+ if (!_result) {
_result = std::move(result);
} else if (result->hasError()) {
_result.reset(new Result(mergeErrorResults(*_result, *result)));
@@ -40,9 +35,7 @@ Result
TransportLatch::mergeErrorResults(const Result &lhs, const Result &rhs)
{
Result::ErrorType error = (lhs.getErrorCode() > rhs.getErrorCode() ? lhs : rhs).getErrorCode();
- return Result(error, vespalib::make_string("%s, %s",
- lhs.getErrorMessage().c_str(),
- rhs.getErrorMessage().c_str()));
+ return Result(error, vespalib::make_string("%s, %s", lhs.getErrorMessage().c_str(), rhs.getErrorMessage().c_str()));
}
} // proton
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h
index 747c95358b4..12f92722dfa 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h
@@ -20,10 +20,7 @@ private:
public:
TransportLatch(uint32_t cnt);
~TransportLatch();
- virtual void send(mbus::Reply::UP reply,
- ResultUP result,
- bool documentWasFound,
- double latency_ms) override;
+ void send(ResultUP result, bool documentWasFound) override;
void await() {
_latch.await();
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
index cffa014534e..4198803d1fe 100644
--- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp
@@ -125,9 +125,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir,
_state(),
_dmUsageForwarder(_writeService.master()),
_writeFilter(),
- _feedHandler(_writeService, tlsSpec, docTypeName,
- getMetricsCollection().getLegacyMetrics().feed,
- _state, *this, _writeFilter, *this, tlsDirectWriter),
+ _feedHandler(_writeService, tlsSpec, docTypeName, _state, *this, _writeFilter, *this, tlsDirectWriter),
_subDBs(*this, *this, _feedHandler, _docTypeName, _writeService, warmupExecutor,
summaryExecutor, fileHeaderContext, metricsWireService, getMetricsCollection(),
queryLimiter, clock, _configMutex, _baseDir, protonCfg, hwInfo),
@@ -138,7 +136,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir,
_lastDocStoreCacheStats(),
_calc()
{
- assert(configSnapshot.get() != NULL);
+ assert(configSnapshot);
LOG(debug, "DocumentDB(%s): Creating database in directory '%s'",
_docTypeName.toString().c_str(), _baseDir.c_str());
@@ -277,7 +275,7 @@ DocumentDB::newConfigSnapshot(DocumentDBConfig::SP snapshot)
_pendingConfigSnapshot.set(snapshot);
{
lock_guard guard(_configMutex);
- if (_activeConfigSnapshot.get() == NULL) {
+ if ( ! _activeConfigSnapshot) {
LOG(debug,
"DocumentDB(%s): Ignoring new available config snapshot. "
"The document database does not have"
diff --git a/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp b/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp
index e46caca4fba..2b2849d025c 100644
--- a/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp
@@ -36,11 +36,8 @@ FastAccessFeedView::getUpdateScope(const DocumentUpdate &upd)
* Otherwise we can drop it and ack the operation right away.
*/
void
-FastAccessFeedView::putAttributes(SerialNum serialNum,
- search::DocumentIdT lid,
- const Document &doc,
- bool immediateCommit,
- OnPutDoneType onWriteDone)
+FastAccessFeedView::putAttributes(SerialNum serialNum, search::DocumentIdT lid, const Document &doc,
+ bool immediateCommit, OnPutDoneType onWriteDone)
{
_attributeWriter->put(serialNum, doc, lid, immediateCommit, onWriteDone);
if (immediateCommit && onWriteDone) {
@@ -49,29 +46,22 @@ FastAccessFeedView::putAttributes(SerialNum serialNum,
}
void
-FastAccessFeedView::updateAttributes(SerialNum serialNum,
- search::DocumentIdT lid,
- const DocumentUpdate &upd,
- bool immediateCommit,
- OnOperationDoneType onWriteDone)
+FastAccessFeedView::updateAttributes(SerialNum serialNum, search::DocumentIdT lid, const DocumentUpdate &upd,
+ bool immediateCommit, OnOperationDoneType onWriteDone)
{
_attributeWriter->update(serialNum, upd, lid, immediateCommit, onWriteDone);
}
void
-FastAccessFeedView::removeAttributes(SerialNum serialNum,
- search::DocumentIdT lid,
- bool immediateCommit,
- OnRemoveDoneType onWriteDone)
+FastAccessFeedView::removeAttributes(SerialNum serialNum, search::DocumentIdT lid,
+ bool immediateCommit, OnRemoveDoneType onWriteDone)
{
_attributeWriter->remove(serialNum, lid, immediateCommit, onWriteDone);
}
void
-FastAccessFeedView::removeAttributes(SerialNum serialNum,
- const LidVector &lidsToRemove,
- bool immediateCommit,
- OnWriteDoneType onWriteDone)
+FastAccessFeedView::removeAttributes(SerialNum serialNum, const LidVector &lidsToRemove,
+ bool immediateCommit, OnWriteDoneType onWriteDone)
{
_attributeWriter->remove(lidsToRemove, serialNum, immediateCommit, onWriteDone);
}
@@ -83,8 +73,7 @@ FastAccessFeedView::heartBeatAttributes(SerialNum serialNum)
}
FastAccessFeedView::FastAccessFeedView(const StoreOnlyFeedView::Context &storeOnlyCtx,
- const PersistentParams &params,
- const Context &ctx)
+ const PersistentParams &params, const Context &ctx)
: Parent(storeOnlyCtx, params),
_attributeWriter(ctx._attrWriter),
_docIdLimit(ctx._docIdLimit)
@@ -103,8 +92,7 @@ FastAccessFeedView::handleCompactLidSpace(const CompactLidSpaceOperation &op)
}
void
-FastAccessFeedView::forceCommit(SerialNum serialNum,
- OnForceCommitDoneType onCommitDone)
+FastAccessFeedView::forceCommit(SerialNum serialNum, OnForceCommitDoneType onCommitDone)
{
_attributeWriter->forceCommit(serialNum, onCommitDone);
onCommitDone->registerCommittedDocIdLimit(_metaStore.getCommittedDocIdLimit(), &_docIdLimit);
@@ -119,5 +107,19 @@ FastAccessFeedView::sync()
_writeService.attributeFieldWriter().sync();
}
+bool
+FastAccessFeedView::fastPartialUpdateAttribute(const vespalib::string &fieldName) const {
+ search::AttributeVector *attribute = _attributeWriter->getWritableAttribute(fieldName);
+ if (attribute == nullptr) {
+ // Partial update to non-attribute field must update document
+ return false;
+ }
+ search::attribute::BasicType::Type attrType = attribute->getBasicType();
+ // Partial update to tensor, predicate or reference attribute
+ // must update document
+ return ((attrType != search::attribute::BasicType::Type::PREDICATE) &&
+ (attrType != search::attribute::BasicType::Type::TENSOR) &&
+ (attrType != search::attribute::BasicType::Type::REFERENCE));
+}
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.h b/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.h
index 4056aecd24c..e1b0cf83f64 100644
--- a/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.h
+++ b/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.h
@@ -39,39 +39,28 @@ private:
const IAttributeWriter::SP _attributeWriter;
DocIdLimit &_docIdLimit;
- virtual UpdateScope getUpdateScope(const document::DocumentUpdate &upd) override;
+ UpdateScope getUpdateScope(const document::DocumentUpdate &upd) override;
- virtual void putAttributes(SerialNum serialNum,
- search::DocumentIdT lid,
- const document::Document &doc,
- bool immediateCommit,
- OnPutDoneType onWriteDone) override;
+ void putAttributes(SerialNum serialNum, search::DocumentIdT lid, const document::Document &doc,
+ bool immediateCommit, OnPutDoneType onWriteDone) override;
- virtual void updateAttributes(SerialNum serialNum,
- search::DocumentIdT lid,
- const document::DocumentUpdate &upd,
- bool immediateCommit,
- OnOperationDoneType onWriteDone) override;
+ void updateAttributes(SerialNum serialNum, search::DocumentIdT lid, const document::DocumentUpdate &upd,
+ bool immediateCommit, OnOperationDoneType onWriteDone) override;
- virtual void removeAttributes(SerialNum serialNum,
- search::DocumentIdT lid,
- bool immediateCommit,
- OnRemoveDoneType onWriteDone) override;
+ void removeAttributes(SerialNum serialNum, search::DocumentIdT lid,
+ bool immediateCommit, OnRemoveDoneType onWriteDone) override;
- virtual void removeAttributes(SerialNum serialNum,
- const LidVector &lidsToRemove,
- bool immediateCommit,
- OnWriteDoneType onWriteDone) override;
+ void removeAttributes(SerialNum serialNum, const LidVector &lidsToRemove,
+ bool immediateCommit, OnWriteDoneType onWriteDone) override;
- virtual void heartBeatAttributes(SerialNum serialNum) override;
+ void heartBeatAttributes(SerialNum serialNum) override;
protected:
- virtual void forceCommit(SerialNum serialNum, OnForceCommitDoneType onCommitDone) override;
+ void forceCommit(SerialNum serialNum, OnForceCommitDoneType onCommitDone) override;
public:
FastAccessFeedView(const StoreOnlyFeedView::Context &storeOnlyCtx,
- const PersistentParams &params,
- const Context &ctx);
+ const PersistentParams &params, const Context &ctx);
~FastAccessFeedView();
virtual const IAttributeWriter::SP &getAttributeWriter() const {
@@ -82,24 +71,10 @@ public:
return _docIdLimit;
}
- virtual void handleCompactLidSpace(const CompactLidSpaceOperation &op) override;
-
- virtual void sync() override;
-
- bool fastPartialUpdateAttribute(const vespalib::string &fieldName) {
- search::AttributeVector *attribute =
- _attributeWriter->getWritableAttribute(fieldName);
- if (attribute == nullptr) {
- // Partial update to non-attribute field must update document
- return false;
- }
- search::attribute::BasicType::Type attrType = attribute->getBasicType();
- // Partial update to tensor, predicate or reference attribute
- // must update document
- return ((attrType != search::attribute::BasicType::Type::PREDICATE) &&
- (attrType != search::attribute::BasicType::Type::TENSOR) &&
- (attrType != search::attribute::BasicType::Type::REFERENCE));
- }
+ void handleCompactLidSpace(const CompactLidSpaceOperation &op) override;
+ void sync() override;
+
+ bool fastPartialUpdateAttribute(const vespalib::string &fieldName) const;
};
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
index bb5f058429a..9300af5c3f0 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
@@ -49,27 +49,9 @@ namespace proton {
namespace {
-void
-setUpdateWasFound(mbus::Reply &reply, bool was_found)
-{
- assert(static_cast<DocumentReply&>(reply).getType() == DocumentProtocol::REPLY_UPDATEDOCUMENT);
- UpdateDocumentReply &update_rep = static_cast<UpdateDocumentReply&>(reply);
- update_rep.setWasFound(was_found);
-}
-
-void
-setRemoveWasFound(mbus::Reply &reply, bool was_found)
-{
- assert(static_cast<DocumentReply&>(reply).getType() == DocumentProtocol::REPLY_REMOVEDOCUMENT);
- RemoveDocumentReply &remove_rep = static_cast<RemoveDocumentReply&>(reply);
- remove_rep.setWasFound(was_found);
-}
-
bool
-ignoreOperation(const DocumentOperation &op)
-{
- return (op.getPrevTimestamp() != 0)
- && (op.getTimestamp() < op.getPrevTimestamp());
+ignoreOperation(const DocumentOperation &op) {
+ return (op.getPrevTimestamp() != 0) && (op.getTimestamp() < op.getPrevTimestamp());
}
} // namespace
@@ -119,7 +101,7 @@ void FeedHandler::performPut(FeedToken::UP token, PutOperation &op) {
op.getDocument()->getId().toString().c_str(), (uint64_t)op.getTimestamp(), (uint64_t)op.getPrevTimestamp());
if (token) {
token->setResult(ResultUP(new Result), false);
- token->ack(op.getType(), _metrics);
+ token->ack();
}
return;
}
@@ -142,8 +124,7 @@ FeedHandler::performUpdate(FeedToken::UP token, UpdateOperation &op)
} else {
if (token) {
token->setResult(ResultUP(new UpdateResult(Timestamp(0))), false);
- setUpdateWasFound(token->getReply(), false);
- token->ack(op.getType(), _metrics);
+ token->ack();
}
}
}
@@ -155,7 +136,6 @@ FeedHandler::performInternalUpdate(FeedToken::UP token, UpdateOperation &op)
storeOperation(op);
if (token) {
token->setResult(ResultUP(new UpdateResult(op.getPrevTimestamp())), true);
- setUpdateWasFound(token->getReply(), true);
}
_activeFeedView->handleUpdate(token.get(), op);
}
@@ -172,10 +152,9 @@ FeedHandler::createNonExistingDocument(FeedToken::UP token, const UpdateOperatio
storeOperation(putOp);
if (token) {
token->setResult(ResultUP(new UpdateResult(putOp.getTimestamp())), true);
- setUpdateWasFound(token->getReply(), true);
}
TransportLatch latch(1);
- FeedToken putToken(latch, mbus::Reply::UP(new FeedReply(DocumentProtocol::REPLY_PUTDOCUMENT)));
+ FeedToken putToken(latch);
_activeFeedView->handlePut(&putToken, putOp);
latch.await();
if (token) {
@@ -191,7 +170,7 @@ void FeedHandler::performRemove(FeedToken::UP token, RemoveOperation &op) {
op.getDocumentId().toString().c_str(), (uint64_t)op.getTimestamp(), (uint64_t)op.getPrevTimestamp());
if (token) {
token->setResult(ResultUP(new RemoveResult(false)), false);
- token->ack(op.getType(), _metrics);
+ token->ack();
}
return;
}
@@ -202,7 +181,6 @@ void FeedHandler::performRemove(FeedToken::UP token, RemoveOperation &op) {
if (token) {
bool documentWasFound = !op.getPrevMarkedAsRemoved();
token->setResult(ResultUP(new RemoveResult(documentWasFound)), documentWasFound);
- setRemoveWasFound(token->getReply(), documentWasFound);
}
_activeFeedView->handleRemove(token.get(), op);
} else if (op.hasDocType()) {
@@ -210,14 +188,12 @@ void FeedHandler::performRemove(FeedToken::UP token, RemoveOperation &op) {
storeOperation(op);
if (token) {
token->setResult(ResultUP(new RemoveResult(false)), false);
- setRemoveWasFound(token->getReply(), false);
}
_activeFeedView->handleRemove(token.get(), op);
} else {
if (token) {
token->setResult(ResultUP(new RemoveResult(false)), false);
- setRemoveWasFound(token->getReply(), false);
- token->ack(op.getType(), _metrics);
+ token->ack();
}
}
}
@@ -367,7 +343,6 @@ FeedHandler::changeFeedState(FeedState::SP newState, const LockGuard &)
FeedHandler::FeedHandler(IThreadingService &writeService,
const vespalib::string &tlsSpec,
const DocTypeName &docTypeName,
- PerDocTypeFeedMetrics &metrics,
DDBState &state,
IFeedHandlerOwner &owner,
const IResourceWriteFilter &writeFilter,
@@ -395,9 +370,8 @@ FeedHandler::FeedHandler(IThreadingService &writeService,
_delayedPrune(false),
_feedLock(),
_feedState(std::make_shared<InitState>(getDocTypeName())),
- _activeFeedView(NULL),
+ _activeFeedView(nullptr),
_bucketDBHandler(nullptr),
- _metrics(metrics),
_syncLock(),
_syncedSerialNum(0),
_allowSync(false)
@@ -499,7 +473,7 @@ void feedOperationRejected(FeedToken *token, const vespalib::string &opType, con
auto message = make_string("%s operation rejected for document '%s' of type '%s': '%s'",
opType.c_str(), docId.c_str(), docTypeName.toString().c_str(), rejectMessage.c_str());
token->setResult(ResultUP(new ResultType(Result::RESOURCE_EXHAUSTED, message)), false);
- token->fail(documentapi::DocumentProtocol::ERROR_REJECTED, message);
+ token->fail();
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
index 335a86e0279..dc955cfeb79 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
@@ -29,7 +29,6 @@ class IFeedView;
class IResourceWriteFilter;
class IReplayConfig;
class JoinBucketsOperation;
-class PerDocTypeFeedMetrics;
class PutOperation;
class RemoveOperation;
class SplitBucketOperation;
@@ -92,8 +91,6 @@ private:
// used by master write thread tasks
IFeedView *_activeFeedView;
bucketdb::IBucketDBHandler *_bucketDBHandler;
- PerDocTypeFeedMetrics &_metrics;
-
vespalib::Lock _syncLock;
SerialNum _syncedSerialNum;
bool _allowSync; // Sanity check
@@ -143,16 +140,14 @@ public:
* @param writeService The thread service used for all write tasks.
* @param tlsSpec The spec to connect to the transaction log server.
* @param docTypeName The name and version of the document type we are feed handler for.
- * @param metrics Feeding metrics.
* @param state Document db state
* @param owner Reference to the owner of this feed handler.
* @param replayConfig Reference to interface used for replaying config changes.
- * @param writer Inject writer for tls, or NULL to use internal.
+ * @param writer Inject writer for tls, or nullptr to use internal.
*/
FeedHandler(IThreadingService &writeService,
const vespalib::string &tlsSpec,
const DocTypeName &docTypeName,
- PerDocTypeFeedMetrics &metrics,
DDBState &state,
IFeedHandlerOwner &owner,
const IResourceWriteFilter &writerFilter,
@@ -160,7 +155,7 @@ public:
search::transactionlog::Writer & writer,
TlsWriter * tlsWriter = nullptr);
- virtual ~FeedHandler();
+ ~FeedHandler() override;
/**
* Init this feed handler.
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstate.cpp b/searchcore/src/vespa/searchcore/proton/server/feedstate.cpp
index 2ec3de61dea..3628505ed66 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedstate.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedstate.cpp
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "feedstate.h"
+#include <vespa/searchcore/proton/feedoperation/feedoperation.h>
#include <vespa/vespalib/util/exceptions.h>
using document::BucketId;
@@ -26,8 +27,7 @@ void FeedState::throwExceptionInReceive(const vespalib::string &docType,
}
void
-FeedState::throwExceptionInHandleOperation(const vespalib::string &docType,
- const FeedOperation &op)
+FeedState::throwExceptionInHandleOperation(const vespalib::string &docType, const FeedOperation &op)
{
throw IllegalStateException
(make_string("We should not receive any feed operations"
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedstate.h b/searchcore/src/vespa/searchcore/proton/server/feedstate.h
index 13dd6ea9dc8..472f5cb224f 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedstate.h
+++ b/searchcore/src/vespa/searchcore/proton/server/feedstate.h
@@ -12,27 +12,22 @@
namespace proton {
+class FeedOperation;
/**
* Class representing the current state of a feed handler.
*/
class FeedState {
public:
- enum Type { NORMAL,
- REPLAY_TRANSACTION_LOG,
- INIT };
+ enum Type { NORMAL, REPLAY_TRANSACTION_LOG, INIT };
private:
Type _type;
protected:
- void throwExceptionInReceive(const vespalib::string &docType,
- uint64_t serialRangeFrom,
- uint64_t serialRangeTo,
- size_t packetSize);
-
- void throwExceptionInHandleOperation(const vespalib::string &docType,
- const FeedOperation &op);
+ void throwExceptionInReceive(const vespalib::string &docType, uint64_t serialRangeFrom,
+ uint64_t serialRangeTo, size_t packetSize);
+ void throwExceptionInHandleOperation(const vespalib::string &docType, const FeedOperation &op);
public:
typedef std::shared_ptr<FeedState> SP;
@@ -43,11 +38,8 @@ public:
Type getType() const { return _type; }
vespalib::string getName() const;
- virtual void handleOperation(FeedToken token, FeedOperation::UP op) = 0;
-
- virtual void receive(const PacketWrapper::SP &wrap,
- vespalib::Executor &executor) = 0;
+ virtual void handleOperation(FeedToken token, std::unique_ptr<FeedOperation> op) = 0;
+ virtual void receive(const PacketWrapper::SP &wrap, vespalib::Executor &executor) = 0;
};
} // namespace proton
-
diff --git a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp
index 19235bb1e23..9c0115f0084 100644
--- a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.cpp
@@ -5,12 +5,8 @@
namespace proton {
-OperationDoneContext::OperationDoneContext(std::unique_ptr<FeedToken> token,
- const FeedOperation::Type opType,
- PerDocTypeFeedMetrics &metrics)
- : _token(std::move(token)),
- _opType(opType),
- _metrics(metrics)
+OperationDoneContext::OperationDoneContext(std::unique_ptr<FeedToken> token)
+ : _token(std::move(token))
{
}
@@ -24,7 +20,7 @@ OperationDoneContext::ack()
{
if (_token) {
std::unique_ptr<FeedToken> token(std::move(_token));
- token->ack(_opType, _metrics);
+ token->ack();
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h
index 3f7a6436604..b801987844b 100644
--- a/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h
+++ b/searchcore/src/vespa/searchcore/proton/server/operationdonecontext.h
@@ -7,7 +7,6 @@
namespace proton {
-class PerDocTypeFeedMetrics;
class FeedToken;
/**
@@ -20,18 +19,13 @@ class FeedToken;
class OperationDoneContext : public search::IDestructorCallback
{
std::unique_ptr<FeedToken> _token;
- const FeedOperation::Type _opType;
- PerDocTypeFeedMetrics &_metrics;
-
protected:
void ack();
public:
- OperationDoneContext(std::unique_ptr<FeedToken> token,
- const FeedOperation::Type opType,
- PerDocTypeFeedMetrics &metrics);
+ OperationDoneContext(std::unique_ptr<FeedToken> token);
- virtual ~OperationDoneContext();
+ ~OperationDoneContext() override;
FeedToken *getToken() { return _token.get(); }
};
diff --git a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp
index efb5a58dd2e..649eebb26f5 100644
--- a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.cpp
@@ -8,14 +8,13 @@
namespace proton {
PutDoneContext::PutDoneContext(std::unique_ptr<FeedToken> token,
- const FeedOperation::Type opType,
- PerDocTypeFeedMetrics &metrics,
+
IGidToLidChangeHandler &gidToLidChangeHandler,
const document::GlobalId &gid,
uint32_t lid,
search::SerialNum serialNum,
bool enableNotifyPut)
- : OperationDoneContext(std::move(token), opType, metrics),
+ : OperationDoneContext(std::move(token)),
_lid(lid),
_docIdLimit(nullptr),
_gidToLidChangeHandler(gidToLidChangeHandler),
diff --git a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h
index bddf9dabd90..3e98b02dda6 100644
--- a/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h
+++ b/searchcore/src/vespa/searchcore/proton/server/putdonecontext.h
@@ -6,9 +6,7 @@
#include <vespa/document/base/globalid.h>
#include <vespa/searchlib/common/serialnum.h>
-namespace proton
-{
-
+namespace proton {
class DocIdLimit;
class IGidToLidChangeHandler;
@@ -30,21 +28,11 @@ class PutDoneContext : public OperationDoneContext
bool _enableNotifyPut;
public:
- PutDoneContext(std::unique_ptr<FeedToken> token,
- const FeedOperation::Type opType,
- PerDocTypeFeedMetrics &metrics,
- IGidToLidChangeHandler &gidToLidChangeHandler,
- const document::GlobalId &gid,
- uint32_t lid,
- search::SerialNum serialNum,
- bool enableNotifyPut);
-
- virtual ~PutDoneContext();
-
- void registerPutLid(DocIdLimit *docIdLimit)
- {
- _docIdLimit = docIdLimit;
- }
+ PutDoneContext(std::unique_ptr<FeedToken> token, IGidToLidChangeHandler &gidToLidChangeHandler,
+ const document::GlobalId &gid, uint32_t lid, search::SerialNum serialNum, bool enableNotifyPut);
+ ~PutDoneContext() override;
+
+ void registerPutLid(DocIdLimit *docIdLimit) { _docIdLimit = docIdLimit; }
};
diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp
index 627e8d9f627..bd9a8240d73 100644
--- a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.cpp
@@ -8,13 +8,11 @@
namespace proton {
RemoveDoneContext::RemoveDoneContext(std::unique_ptr<FeedToken> token,
- const FeedOperation::Type opType,
- PerDocTypeFeedMetrics &metrics,
vespalib::Executor &executor,
IDocumentMetaStore &documentMetaStore,
PendingNotifyRemoveDone &&pendingNotifyRemoveDone,
uint32_t lid)
- : OperationDoneContext(std::move(token), opType, metrics),
+ : OperationDoneContext(std::move(token)),
_executor(executor),
_task(),
_pendingNotifyRemoveDone(std::move(pendingNotifyRemoveDone))
diff --git a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h
index c4fafb4e886..83f6013dd85 100644
--- a/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h
+++ b/searchcore/src/vespa/searchcore/proton/server/removedonecontext.h
@@ -8,8 +8,7 @@
#include <vespa/searchlib/common/serialnum.h>
#include <vespa/searchcore/proton/reference/pending_notify_remove_done.h>
-namespace proton
-{
+namespace proton {
class IDocumentMetaStore;
@@ -29,15 +28,11 @@ class RemoveDoneContext : public OperationDoneContext
PendingNotifyRemoveDone _pendingNotifyRemoveDone;
public:
- RemoveDoneContext(std::unique_ptr<FeedToken> token,
- const FeedOperation::Type opType,
- PerDocTypeFeedMetrics &metrics,
- vespalib::Executor &executor,
- IDocumentMetaStore &documentMetaStore,
- PendingNotifyRemoveDone &&pendingNotifyRemoveDone,
+ RemoveDoneContext(std::unique_ptr<FeedToken> token, vespalib::Executor &executor,
+ IDocumentMetaStore &documentMetaStore, PendingNotifyRemoveDone &&pendingNotifyRemoveDone,
uint32_t lid);
- virtual ~RemoveDoneContext();
+ ~RemoveDoneContext() override;
};
diff --git a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp
index d3d73b42fbe..0c87d24899d 100644
--- a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp
@@ -5,7 +5,6 @@
#include "operationdonecontext.h"
#include "removedonecontext.h"
#include <vespa/searchcore/proton/common/feedtoken.h>
-#include <vespa/searchcore/proton/metrics/feed_metrics.h>
#include <vespa/searchcore/proton/documentmetastore/ilidreusedelayer.h>
#include <vespa/vespalib/text/stringtokenizer.h>
#include <vespa/vespalib/util/closuretask.h>
@@ -35,10 +34,8 @@ SearchableFeedView::Context::Context(const IIndexWriter::SP &indexWriter)
SearchableFeedView::Context::~Context() = default;
-SearchableFeedView::SearchableFeedView(const StoreOnlyFeedView::Context &storeOnlyCtx,
- const PersistentParams &params,
- const FastAccessFeedView::Context &fastUpdateCtx,
- Context ctx)
+SearchableFeedView::SearchableFeedView(const StoreOnlyFeedView::Context &storeOnlyCtx, const PersistentParams &params,
+ const FastAccessFeedView::Context &fastUpdateCtx, Context ctx)
: Parent(storeOnlyCtx, params, fastUpdateCtx),
_indexWriter(ctx._indexWriter),
_hasIndexedFields(_schema->getNumIndexFields() > 0)
@@ -191,11 +188,8 @@ SearchableFeedView::performIndexRemove(SerialNum serialNum, const LidVector &lid
assert(_writeService.index().isCurrentThread());
for (const auto lid : lidsToRemove) {
VLOG(getDebugLevel(lid, nullptr),
- "database(%s): performIndexRemove: serialNum(%" PRIu64 "), "
- "lid(%d)",
- _params._docTypeName.toString().c_str(),
- serialNum,
- lid);
+ "database(%s): performIndexRemove: serialNum(%" PRIu64 "), lid(%d)",
+ _params._docTypeName.toString().c_str(), serialNum, lid);
_indexWriter->remove(serialNum, lid);
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp
index e931a28c6a5..bba03621f8a 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlydocsubdb.cpp
@@ -160,7 +160,7 @@ StoreOnlyDocSubDB::clearViews() {
size_t
StoreOnlyDocSubDB::getNumDocs() const
{
- if (_metaStoreCtx.get() != NULL) {
+ if (_metaStoreCtx) {
return _metaStoreCtx->get().getNumUsedLids();
} else {
return 0u;
@@ -378,8 +378,7 @@ StoreOnlyDocSubDB::getFeedViewPersistentParams()
{
SerialNum flushedDMSSN(_flushedDocumentMetaStoreSerialNum);
SerialNum flushedDSSN(_flushedDocumentStoreSerialNum);
- return StoreOnlyFeedView::PersistentParams(flushedDMSSN, flushedDSSN, _docTypeName,
- _metrics.feed, _subDbId, _subDbType);
+ return StoreOnlyFeedView::PersistentParams(flushedDMSSN, flushedDSSN, _docTypeName, _subDbId, _subDbType);
}
void
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
index de3e1648085..8c8559115bd 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp
@@ -11,10 +11,8 @@
#include <vespa/searchcore/proton/common/commit_time_tracker.h>
#include <vespa/searchcore/proton/common/feedtoken.h>
#include <vespa/searchcore/proton/documentmetastore/ilidreusedelayer.h>
-#include <vespa/searchcore/proton/metrics/feed_metrics.h>
#include <vespa/searchcore/proton/reference/i_gid_to_lid_change_handler.h>
#include <vespa/document/datatype/documenttype.h>
-#include <vespa/document/fieldvalue/document.h>
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/log/log.h>
@@ -54,19 +52,18 @@ private:
IDestructorCallback::SP _moveDoneCtx;
public:
- PutDoneContextForMove(std::unique_ptr<FeedToken> token, const FeedOperation::Type opType,
- PerDocTypeFeedMetrics &metrics,
+ PutDoneContextForMove(std::unique_ptr<FeedToken> token,
IGidToLidChangeHandler &gidToLidChangeHandler,
const document::GlobalId &gid,
uint32_t lid, search::SerialNum serialNum, bool enableNotifyPut, IDestructorCallback::SP moveDoneCtx)
- : PutDoneContext(std::move(token), opType, metrics, gidToLidChangeHandler, gid, lid, serialNum, enableNotifyPut),
+ : PutDoneContext(std::move(token), gidToLidChangeHandler, gid, lid, serialNum, enableNotifyPut),
_moveDoneCtx(std::move(moveDoneCtx))
{}
- virtual ~PutDoneContextForMove() {}
+ ~PutDoneContextForMove() = default;
};
std::shared_ptr<PutDoneContext>
-createPutDoneContext(FeedToken::UP &token, FeedOperation::Type opType, PerDocTypeFeedMetrics &metrics,
+createPutDoneContext(FeedToken::UP &token,
IGidToLidChangeHandler &gidToLidChangeHandler,
const document::GlobalId &gid, uint32_t lid,
SerialNum serialNum, bool enableNotifyPut,
@@ -74,25 +71,24 @@ createPutDoneContext(FeedToken::UP &token, FeedOperation::Type opType, PerDocTyp
{
std::shared_ptr<PutDoneContext> result;
if (moveDoneCtx) {
- result = std::make_shared<PutDoneContextForMove>(std::move(token), opType, metrics, gidToLidChangeHandler, gid, lid, serialNum, enableNotifyPut, std::move(moveDoneCtx));
+ result = std::make_shared<PutDoneContextForMove>(std::move(token), gidToLidChangeHandler, gid, lid, serialNum, enableNotifyPut, std::move(moveDoneCtx));
} else {
- result = std::make_shared<PutDoneContext>(std::move(token), opType, metrics, gidToLidChangeHandler, gid, lid, serialNum, enableNotifyPut);
+ result = std::make_shared<PutDoneContext>(std::move(token), gidToLidChangeHandler, gid, lid, serialNum, enableNotifyPut);
}
return result;
}
std::shared_ptr<PutDoneContext>
-createPutDoneContext(FeedToken::UP &token, FeedOperation::Type opType, PerDocTypeFeedMetrics &metrics, IGidToLidChangeHandler &gidToLidChangeHandler,
+createPutDoneContext(FeedToken::UP &token, IGidToLidChangeHandler &gidToLidChangeHandler,
const document::GlobalId &gid, uint32_t lid, SerialNum serialNum, bool enableNotifyPut)
{
- return createPutDoneContext(token, opType, metrics, gidToLidChangeHandler, gid, lid, serialNum, enableNotifyPut, IDestructorCallback::SP());
+ return createPutDoneContext(token, gidToLidChangeHandler, gid, lid, serialNum, enableNotifyPut, IDestructorCallback::SP());
}
std::shared_ptr<UpdateDoneContext>
-createUpdateDoneContext(FeedToken::UP &token, FeedOperation::Type opType,
- PerDocTypeFeedMetrics &metrics, const DocumentUpdate::SP &upd)
+createUpdateDoneContext(FeedToken::UP &token, const DocumentUpdate::SP &upd)
{
- return std::make_shared<UpdateDoneContext>(std::move(token), opType, metrics, upd);
+ return std::make_shared<UpdateDoneContext>(std::move(token), upd);
}
void setPrev(DocumentOperation &op, const documentmetastore::IStore::Result &result,
@@ -110,32 +106,28 @@ private:
IDestructorCallback::SP _moveDoneCtx;
public:
- RemoveDoneContextForMove(std::unique_ptr<FeedToken> token, const FeedOperation::Type opType,
- PerDocTypeFeedMetrics &metrics, vespalib::Executor &executor,
+ RemoveDoneContextForMove(std::unique_ptr<FeedToken> token, vespalib::Executor &executor,
IDocumentMetaStore &documentMetaStore,
PendingNotifyRemoveDone &&pendingNotifyRemoveDone,
uint32_t lid,
IDestructorCallback::SP moveDoneCtx)
- : RemoveDoneContext(std::move(token), opType, metrics, executor, documentMetaStore, std::move(pendingNotifyRemoveDone) ,lid),
+ : RemoveDoneContext(std::move(token), executor, documentMetaStore, std::move(pendingNotifyRemoveDone) ,lid),
_moveDoneCtx(std::move(moveDoneCtx))
{}
- virtual ~RemoveDoneContextForMove() {}
+ ~RemoveDoneContextForMove() = default;
};
std::shared_ptr<RemoveDoneContext>
-createRemoveDoneContext(std::unique_ptr<FeedToken> token, const FeedOperation::Type opType,
- PerDocTypeFeedMetrics &metrics, vespalib::Executor &executor,
- IDocumentMetaStore &documentMetaStore,
- PendingNotifyRemoveDone &&pendingNotifyRemoveDone,
- uint32_t lid,
- IDestructorCallback::SP moveDoneCtx)
+createRemoveDoneContext(std::unique_ptr<FeedToken> token, vespalib::Executor &executor,
+ IDocumentMetaStore &documentMetaStore, PendingNotifyRemoveDone &&pendingNotifyRemoveDone,
+ uint32_t lid, IDestructorCallback::SP moveDoneCtx)
{
if (moveDoneCtx) {
return std::make_shared<RemoveDoneContextForMove>
- (std::move(token), opType, metrics, executor, documentMetaStore, std::move(pendingNotifyRemoveDone), lid, std::move(moveDoneCtx));
+ (std::move(token), executor, documentMetaStore, std::move(pendingNotifyRemoveDone), lid, std::move(moveDoneCtx));
} else {
return std::make_shared<RemoveDoneContext>
- (std::move(token), opType, metrics, executor, documentMetaStore, std::move(pendingNotifyRemoveDone), lid);
+ (std::move(token), executor, documentMetaStore, std::move(pendingNotifyRemoveDone), lid);
}
}
@@ -216,6 +208,8 @@ StoreOnlyFeedView::StoreOnlyFeedView(const Context &ctx, const PersistentParams
_docType = _repo->getDocumentType(_params._docTypeName.getName());
}
+StoreOnlyFeedView::~StoreOnlyFeedView() = default;
+
void
StoreOnlyFeedView::sync()
{
@@ -240,10 +234,10 @@ StoreOnlyFeedView::forceCommit(SerialNum serialNum, OnForceCommitDoneType onComm
}
void
-StoreOnlyFeedView::considerEarlyAck(FeedToken::UP &token, FeedOperation::Type opType)
+StoreOnlyFeedView::considerEarlyAck(FeedToken::UP &token)
{
if (_commitTimeTracker.hasVisibilityDelay() && token) {
- token->ack(opType, _params._metrics);
+ token->ack();
token.reset();
}
}
@@ -289,7 +283,7 @@ StoreOnlyFeedView::internalPut(FeedToken::UP token, const PutOperation &putOp)
_params._subDbId, doc->toString(true).size(), doc->toString(true).c_str());
PendingNotifyRemoveDone pendingNotifyRemoveDone = adjustMetaStore(putOp, docId);
- considerEarlyAck(token, putOp.getType());
+ considerEarlyAck(token);
bool docAlreadyExists = putOp.getValidPrevDbdId(_params._subDbId);
@@ -297,18 +291,19 @@ StoreOnlyFeedView::internalPut(FeedToken::UP token, const PutOperation &putOp)
bool immediateCommit = _commitTimeTracker.needCommit();
const document::GlobalId &gid = docId.getGlobalId();
std::shared_ptr<PutDoneContext> onWriteDone =
- createPutDoneContext(token, putOp.getType(), _params._metrics,
- _gidToLidChangeHandler, gid, putOp.getLid(), serialNum, putOp.changedDbdId() && useDocumentMetaStore(serialNum));
+ createPutDoneContext(token, _gidToLidChangeHandler, gid, putOp.getLid(), serialNum,
+ putOp.changedDbdId() && useDocumentMetaStore(serialNum));
putSummary(serialNum, putOp.getLid(), doc, onWriteDone);
putAttributes(serialNum, putOp.getLid(), *doc, immediateCommit, onWriteDone);
putIndexedFields(serialNum, putOp.getLid(), doc, immediateCommit, onWriteDone);
}
if (docAlreadyExists && putOp.changedDbdId()) {
assert(!putOp.getValidDbdId(_params._subDbId));
- internalRemove(std::move(token), serialNum, std::move(pendingNotifyRemoveDone), putOp.getPrevLid(), putOp.getType(), IDestructorCallback::SP());
+ internalRemove(std::move(token), serialNum, std::move(pendingNotifyRemoveDone),
+ putOp.getPrevLid(), IDestructorCallback::SP());
}
if (token) {
- token->ack(putOp.getType(), _params._metrics);
+ token->ack();
}
}
@@ -432,13 +427,12 @@ StoreOnlyFeedView::internalUpdate(FeedToken::UP token, const UpdateOperation &up
(void) updateOk;
_metaStore.commit(serialNum, serialNum);
}
- considerEarlyAck(token, updOp.getType());
+ considerEarlyAck(token);
bool immediateCommit = _commitTimeTracker.needCommit();
- auto onWriteDone = createUpdateDoneContext(token, updOp.getType(), _params._metrics, updOp.getUpdate());
+ auto onWriteDone = createUpdateDoneContext(token, updOp.getUpdate());
updateAttributes(serialNum, lid, upd, immediateCommit, onWriteDone);
-
UpdateScope updateScope(getUpdateScope(upd));
if (updateScope.hasIndexOrNonAttributeFields()) {
PromisedDoc promisedDoc;
@@ -555,7 +549,7 @@ StoreOnlyFeedView::internalRemove(FeedToken::UP token, const RemoveOperation &rm
rmOp.getSubDbId(), rmOp.getLid(), rmOp.getPrevSubDbId(), rmOp.getPrevLid(), _params._subDbId);
PendingNotifyRemoveDone pendingNotifyRemoveDone = adjustMetaStore(rmOp, docId);
- considerEarlyAck(token, rmOp.getType());
+ considerEarlyAck(token);
if (rmOp.getValidDbdId(_params._subDbId)) {
Document::UP clearDoc(new Document(*_docType, docId));
@@ -566,22 +560,25 @@ StoreOnlyFeedView::internalRemove(FeedToken::UP token, const RemoveOperation &rm
if (rmOp.getValidPrevDbdId(_params._subDbId)) {
if (rmOp.changedDbdId()) {
assert(!rmOp.getValidDbdId(_params._subDbId));
- internalRemove(std::move(token), serialNum, std::move(pendingNotifyRemoveDone), rmOp.getPrevLid(), rmOp.getType(), IDestructorCallback::SP());
+ internalRemove(std::move(token), serialNum, std::move(pendingNotifyRemoveDone),
+ rmOp.getPrevLid(), IDestructorCallback::SP());
}
}
if (token) {
- token->ack(rmOp.getType(), _params._metrics);
+ token->ack();
}
}
void
-StoreOnlyFeedView::internalRemove(FeedToken::UP token, SerialNum serialNum, PendingNotifyRemoveDone &&pendingNotifyRemoveDone, Lid lid,
- FeedOperation::Type opType, IDestructorCallback::SP moveDoneCtx)
+StoreOnlyFeedView::internalRemove(FeedToken::UP token, SerialNum serialNum,
+ PendingNotifyRemoveDone &&pendingNotifyRemoveDone, Lid lid,
+ IDestructorCallback::SP moveDoneCtx)
{
bool explicitReuseLid = _lidReuseDelayer.delayReuse(lid);
std::shared_ptr<RemoveDoneContext> onWriteDone;
- onWriteDone = createRemoveDoneContext(std::move(token), opType, _params._metrics, _writeService.master(),
- _metaStore, std::move(pendingNotifyRemoveDone), (explicitReuseLid ? lid : 0u), moveDoneCtx);
+ onWriteDone = createRemoveDoneContext(std::move(token), _writeService.master(), _metaStore,
+ std::move(pendingNotifyRemoveDone), (explicitReuseLid ? lid : 0u),
+ std::move(moveDoneCtx));
removeSummary(serialNum, lid, onWriteDone);
bool immediateCommit = _commitTimeTracker.needCommit();
removeAttributes(serialNum, lid, immediateCommit, onWriteDone);
@@ -732,15 +729,14 @@ StoreOnlyFeedView::handleMove(const MoveOperation &moveOp, IDestructorCallback::
const document::GlobalId &gid = docId.getGlobalId();
FeedToken::UP token;
std::shared_ptr<PutDoneContext> onWriteDone =
- createPutDoneContext(token, moveOp.getType(), _params._metrics,
- _gidToLidChangeHandler, gid, moveOp.getLid(),
- serialNum, moveOp.changedDbdId() && useDocumentMetaStore(serialNum), doneCtx);
+ createPutDoneContext(token, _gidToLidChangeHandler, gid, moveOp.getLid(), serialNum,
+ moveOp.changedDbdId() && useDocumentMetaStore(serialNum), doneCtx);
putSummary(serialNum, moveOp.getLid(), doc, onWriteDone);
putAttributes(serialNum, moveOp.getLid(), *doc, immediateCommit, onWriteDone);
putIndexedFields(serialNum, moveOp.getLid(), doc, immediateCommit, onWriteDone);
}
if (docAlreadyExists && moveOp.changedDbdId()) {
- internalRemove(FeedToken::UP(), serialNum, std::move(pendingNotifyRemoveDone), moveOp.getPrevLid(), moveOp.getType(), doneCtx);
+ internalRemove(FeedToken::UP(), serialNum, std::move(pendingNotifyRemoveDone), moveOp.getPrevLid(), doneCtx);
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
index 021c2b2f8f7..baaf77bbe59 100644
--- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
+++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.h
@@ -27,7 +27,6 @@ namespace document { class GLobalId; }
namespace proton {
class IReplayConfig;
-class PerDocTypeFeedMetrics;
class ForceCommitContext;
class OperationDoneContext;
class PutDoneContext;
@@ -104,20 +103,17 @@ public:
const SerialNum _flushedDocumentMetaStoreSerialNum;
const SerialNum _flushedDocumentStoreSerialNum;
const DocTypeName _docTypeName;
- PerDocTypeFeedMetrics &_metrics;
const uint32_t _subDbId;
const SubDbType _subDbType;
PersistentParams(SerialNum flushedDocumentMetaStoreSerialNum,
SerialNum flushedDocumentStoreSerialNum,
const DocTypeName &docTypeName,
- PerDocTypeFeedMetrics &metrics,
uint32_t subDbId,
SubDbType subDbType)
: _flushedDocumentMetaStoreSerialNum(flushedDocumentMetaStoreSerialNum),
_flushedDocumentStoreSerialNum(flushedDocumentStoreSerialNum),
_docTypeName(docTypeName),
- _metrics(metrics),
_subDbId(subDbId),
_subDbType(subDbType)
{}
@@ -183,14 +179,14 @@ private:
size_t removeDocuments(const RemoveDocumentsOperation &op, bool remove_index_and_attribute_fields,
bool immediateCommit);
- void internalRemove(FeedTokenUP token, SerialNum serialNum, PendingNotifyRemoveDone &&pendingNotifyRemoveDone, Lid lid,
- FeedOperation::Type opType, std::shared_ptr<search::IDestructorCallback> moveDoneCtx);
+ void internalRemove(FeedTokenUP token, SerialNum serialNum, PendingNotifyRemoveDone &&pendingNotifyRemoveDone,
+ Lid lid, std::shared_ptr<search::IDestructorCallback> moveDoneCtx);
// Ack token early if visibility delay is nonzero
- void considerEarlyAck(FeedTokenUP &token, FeedOperation::Type opType);
+ void considerEarlyAck(FeedTokenUP &token);
- void makeUpdatedDocument(SerialNum serialNum, Lid lid, DocumentUpdateSP upd,
- OnOperationDoneType onWriteDone,PromisedDoc promisedDoc, PromisedStream promisedStream);
+ void makeUpdatedDocument(SerialNum serialNum, Lid lid, DocumentUpdateSP upd, OnOperationDoneType onWriteDone,
+ PromisedDoc promisedDoc, PromisedStream promisedStream);
protected:
virtual void internalDeleteBucket(const DeleteBucketOperation &delOp);
@@ -225,7 +221,7 @@ protected:
public:
StoreOnlyFeedView(const Context &ctx, const PersistentParams &params);
- virtual ~StoreOnlyFeedView() {}
+ ~StoreOnlyFeedView() override;
const ISummaryAdapter::SP &getSummaryAdapter() const { return _summaryAdapter; }
const search::index::Schema::SP &getSchema() const { return _schema; }
diff --git a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp
index 5eb7e2ed7f9..171990c32d3 100644
--- a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.cpp
@@ -3,23 +3,14 @@
#include "updatedonecontext.h"
#include <vespa/searchcore/proton/common/feedtoken.h>
-namespace proton
-{
-
+namespace proton {
-UpdateDoneContext::UpdateDoneContext(std::unique_ptr<FeedToken> token,
- const FeedOperation::Type opType,
- PerDocTypeFeedMetrics &metrics,
- const document::DocumentUpdate::SP &upd)
- : OperationDoneContext(std::move(token), opType, metrics),
+UpdateDoneContext::UpdateDoneContext(std::unique_ptr<FeedToken> token, const document::DocumentUpdate::SP &upd)
+ : OperationDoneContext(std::move(token)),
_upd(upd)
{
}
-
-UpdateDoneContext::~UpdateDoneContext()
-{
-}
-
+UpdateDoneContext::~UpdateDoneContext() = default;
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h
index df47538d6dd..4701db300de 100644
--- a/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h
+++ b/searchcore/src/vespa/searchcore/proton/server/updatedonecontext.h
@@ -5,8 +5,7 @@
#include "operationdonecontext.h"
#include <vespa/document/update/documentupdate.h>
-namespace proton
-{
+namespace proton {
/**
* Context class for document update operations that acks operation when
@@ -19,12 +18,8 @@ class UpdateDoneContext : public OperationDoneContext
{
document::DocumentUpdate::SP _upd;
public:
- UpdateDoneContext(std::unique_ptr<FeedToken> token,
- const FeedOperation::Type opType,
- PerDocTypeFeedMetrics &metrics,
- const document::DocumentUpdate::SP &upd);
-
- virtual ~UpdateDoneContext();
+ UpdateDoneContext(std::unique_ptr<FeedToken> token, const document::DocumentUpdate::SP &upd);
+ ~UpdateDoneContext() override;
const document::DocumentUpdate &getUpdate() { return *_upd; }
};
diff --git a/searchlib/src/tests/queryeval/queryeval.cpp b/searchlib/src/tests/queryeval/queryeval.cpp
index 9dc4f2eef89..c6dd6a430cc 100644
--- a/searchlib/src/tests/queryeval/queryeval.cpp
+++ b/searchlib/src/tests/queryeval/queryeval.cpp
@@ -287,6 +287,10 @@ private:
virtual void doSeek(uint32_t docid) override { (void) docid; }
};
+struct MultiSearchRemoveTest {
+ static SearchIterator::UP remove(MultiSearch &ms, size_t idx) { return ms.remove(idx); }
+};
+
TEST("testMultiSearch") {
MultiSearch::Children children;
children.push_back(new EmptySearch());
@@ -300,7 +304,7 @@ TEST("testMultiSearch") {
EXPECT_EQUAL(0u, ms._accumInsert);
EXPECT_EQUAL(0u, ms._accumRemove);
- EXPECT_EQUAL(children[1], ms.remove(1).get());
+ EXPECT_EQUAL(children[1], MultiSearchRemoveTest::remove(ms, 1).get());
EXPECT_EQUAL(2u, ms.getChildren().size());
EXPECT_EQUAL(children[0], ms.getChildren()[0]);
EXPECT_EQUAL(children[2], ms.getChildren()[1]);
diff --git a/searchlib/src/vespa/searchlib/queryeval/multisearch.h b/searchlib/src/vespa/searchlib/queryeval/multisearch.h
index fc1a9ec11cd..16bbd5d4ecc 100644
--- a/searchlib/src/vespa/searchlib/queryeval/multisearch.h
+++ b/searchlib/src/vespa/searchlib/queryeval/multisearch.h
@@ -5,14 +5,20 @@
#include "searchiterator.h"
#include <vector>
+class MultiSearchRemoveTest;
+
namespace search::queryeval {
+class MultiBitVectorIteratorBase;
+
/**
* A virtual intermediate class that serves as the basis for combining searches
* like and, or any or others that take a list of children.
**/
class MultiSearch : public SearchIterator
{
+ friend class ::MultiSearchRemoveTest;
+ friend class ::search::queryeval::MultiBitVectorIteratorBase;
public:
/**
* Defines how to represent the children iterators. vespalib::Array usage
@@ -32,13 +38,13 @@ public:
virtual bool isAndNot() const { return false; }
virtual bool isOr() const { return false; }
void insert(size_t index, SearchIterator::UP search);
- SearchIterator::UP remove(size_t index);
virtual bool needUnpack(size_t index) const { (void) index; return true; }
void initRange(uint32_t beginId, uint32_t endId) override;
protected:
void doUnpack(uint32_t docid) override;
void visitMembers(vespalib::ObjectVisitor &visitor) const override;
private:
+ SearchIterator::UP remove(size_t index); // friends only
/**
* Call back when children are removed / inserted after the Iterator has been constructed.
* This is to support code that make assumptions that iterators do not move around or disappear.
diff --git a/searchlib/src/vespa/searchlib/queryeval/orsearch.cpp b/searchlib/src/vespa/searchlib/queryeval/orsearch.cpp
index 6d59482d799..977080f8266 100644
--- a/searchlib/src/vespa/searchlib/queryeval/orsearch.cpp
+++ b/searchlib/src/vespa/searchlib/queryeval/orsearch.cpp
@@ -16,34 +16,20 @@ public:
void unpack(uint32_t docid, MultiSearch & search) {
const MultiSearch::Children & children(search.getChildren());
size_t sz(children.size());
- for (size_t i(0); i < sz; ) {
+ for (size_t i(0); i < sz; ++i) {
if (__builtin_expect(children[i]->getDocId() < docid, false)) {
children[i]->doSeek(docid);
- if (children[i]->getDocId() == search::endDocId) {
- sz = deactivate(search, i);
- continue;
- }
}
if (__builtin_expect(children[i]->getDocId() == docid, false)) {
children[i]->doUnpack(docid);
}
- i++;
}
}
void onRemove(size_t index) { (void) index; }
void onInsert(size_t index) { (void) index; }
bool needUnpack(size_t index) const { (void) index; return true; }
-private:
- static size_t deactivate(MultiSearch &children, size_t idx);
};
-size_t
-FullUnpack::deactivate(MultiSearch & search, size_t idx)
-{
- search.remove(idx);
- return search.getChildren().size();
-}
-
class SelectiveUnpack
{
public:
diff --git a/staging_vespalib/src/vespa/vespalib/util/document_runnable.cpp b/staging_vespalib/src/vespa/vespalib/util/document_runnable.cpp
index a612d5652ff..c0a32197c07 100644
--- a/staging_vespalib/src/vespa/vespalib/util/document_runnable.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/document_runnable.cpp
@@ -2,15 +2,18 @@
#include "document_runnable.h"
#include <vespa/vespalib/util/exceptions.h>
-#include <vespa/vespalib/util/stringfmt.h>
namespace document {
-Runnable::Runnable(FastOS_ThreadPool* pool)
+Runnable::Runnable()
: _stateLock(),
_state(NOT_RUNNING)
{
- if (pool) start(*pool);
+}
+
+Runnable::~Runnable() {
+ vespalib::MonitorGuard monitorGuard(_stateLock);
+ assert(_state == NOT_RUNNING);
}
bool Runnable::start(FastOS_ThreadPool& pool)
@@ -19,7 +22,7 @@ bool Runnable::start(FastOS_ThreadPool& pool)
while (_state == STOPPING) monitor.wait();
if (_state != NOT_RUNNING) return false;
_state = STARTING;
- if (pool.NewThread(this) == NULL) {
+ if (pool.NewThread(this) == nullptr) {
throw vespalib::IllegalStateException("Faled starting a new thread", VESPA_STRLOC);
}
return true;
@@ -42,7 +45,7 @@ bool Runnable::onStop()
bool Runnable::join() const
{
vespalib::MonitorGuard monitor(_stateLock);
- if (_state == STARTING || _state == RUNNING) return false;
+ assert ((_state != STARTING) && (_state != RUNNING));
while (_state != NOT_RUNNING) monitor.wait();
return true;
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/document_runnable.h b/staging_vespalib/src/vespa/vespalib/util/document_runnable.h
index 456c7fefdbb..60a67189b5b 100644
--- a/staging_vespalib/src/vespa/vespalib/util/document_runnable.h
+++ b/staging_vespalib/src/vespa/vespalib/util/document_runnable.h
@@ -42,7 +42,8 @@ public:
* Create a runnable.
* @param pool If set, runnable will be started in constructor.
*/
- Runnable(FastOS_ThreadPool* pool = 0);
+ Runnable();
+ ~Runnable();
/**
* Start this runnable.
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index 12a4118aa08..d0941571a6a 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -50,6 +50,7 @@ class Distributor_Test : public CppUnit::TestFixture,
CPPUNIT_TEST(merge_busy_inhibit_duration_is_propagated_to_pending_message_tracker);
CPPUNIT_TEST(external_client_requests_are_handled_individually_in_priority_order);
CPPUNIT_TEST(internal_messages_are_started_in_fifo_order_batch);
+ CPPUNIT_TEST(closing_aborts_priority_queued_client_requests);
CPPUNIT_TEST_SUITE_END();
protected:
@@ -81,6 +82,7 @@ protected:
void merge_busy_inhibit_duration_is_propagated_to_pending_message_tracker();
void external_client_requests_are_handled_individually_in_priority_order();
void internal_messages_are_started_in_fifo_order_batch();
+ void closing_aborts_priority_queued_client_requests();
public:
void setUp() override {
@@ -927,6 +929,27 @@ void Distributor_Test::internal_messages_are_started_in_fifo_order_batch() {
CPPUNIT_ASSERT_EQUAL(api::BucketInfo(1, 1, 1), e.getBucketInfo().getNode(0)->getBucketInfo());
}
+void Distributor_Test::closing_aborts_priority_queued_client_requests() {
+ setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1");
+ document::BucketId bucket(16, 1);
+ addNodesToBucketDB(bucket, "0=1/1/1/t");
+
+ document::DocumentId id("id:foo:testdoctype1:n=1:foo");
+ vespalib::stringref field_set = "";
+ for (int i = 0; i < 10; ++i) {
+ auto cmd = std::make_shared<api::GetCommand>(document::BucketId(), id, field_set);
+ _distributor->onDown(cmd);
+ }
+ tickDistributorNTimes(1);
+ // Closing should trigger 1 abort via startet GetOperation and 9 aborts from pri queue
+ _distributor->close();
+ CPPUNIT_ASSERT_EQUAL(size_t(10), _sender.replies.size());
+ for (auto& msg : _sender.replies) {
+ CPPUNIT_ASSERT_EQUAL(api::ReturnCode::ABORTED,
+ dynamic_cast<api::StorageReply&>(*msg).getResult().getResult());
+ }
+}
+
}
}
diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp
index 0fd5a30d8bd..6be021a549c 100644
--- a/storage/src/tests/persistence/persistencetestutils.cpp
+++ b/storage/src/tests/persistence/persistencetestutils.cpp
@@ -98,8 +98,7 @@ PersistenceTestUtils::createPersistenceThread(uint32_t disk)
getEnv()._fileStorHandler,
getEnv()._metrics,
disk,
- 255,
- false));
+ 255));
}
document::Document::SP
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
index 67942d3d447..ce7f7afc670 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp
@@ -26,27 +26,23 @@ BucketDBUpdater::BucketDBUpdater(Distributor& owner, ManagedBucketSpace& bucketS
{
}
-BucketDBUpdater::~BucketDBUpdater() {}
+BucketDBUpdater::~BucketDBUpdater() = default;
void
BucketDBUpdater::flush()
{
- for (std::map<uint64_t, BucketRequest>::iterator
- i(_sentMessages.begin()), end(_sentMessages.end());
- i != end; ++i)
- {
+ for (auto & entry : _sentMessages) {
// Cannot sendDown MergeBucketReplies during flushing, since
// all lower links have been closed
- if (i->second._mergeReplyGuard.get()) {
- i->second._mergeReplyGuard->resetReply();
+ if (entry.second._mergeReplyGuard) {
+ entry.second._mergeReplyGuard->resetReply();
}
}
_sentMessages.clear();
}
void
-BucketDBUpdater::print(std::ostream& out, bool verbose,
- const std::string& indent) const
+BucketDBUpdater::print(std::ostream& out, bool verbose, const std::string& indent) const
{
(void) verbose; (void) indent;
out << "BucketDBUpdater";
@@ -55,17 +51,15 @@ BucketDBUpdater::print(std::ostream& out, bool verbose,
bool
BucketDBUpdater::hasPendingClusterState() const
{
- return _pendingClusterState.get() != nullptr;
+ return static_cast<bool>(_pendingClusterState);
}
BucketOwnership
BucketDBUpdater::checkOwnershipInPendingState(const document::BucketId& b) const
{
if (hasPendingClusterState()) {
- const lib::ClusterState& state(
- _pendingClusterState->getNewClusterState());
- const lib::Distribution& distribution(
- _pendingClusterState->getDistribution());
+ const lib::ClusterState& state(_pendingClusterState->getNewClusterState());
+ const lib::Distribution& distribution(_pendingClusterState->getDistribution());
if (!_bucketSpaceComponent.ownsBucketInState(distribution, state, b)) {
return BucketOwnership::createNotOwnedInState(state);
}
@@ -74,22 +68,6 @@ BucketDBUpdater::checkOwnershipInPendingState(const document::BucketId& b) const
}
void
-BucketDBUpdater::clearPending(uint16_t node)
-{
- for (std::map<uint64_t, BucketRequest>::iterator iter(
- _sentMessages.begin()); iter != _sentMessages.end();)
- {
- if (iter->second.targetNode == node) {
- std::map<uint64_t, BucketRequest>::iterator del = iter;
- iter++;
- _sentMessages.erase(del);
- } else {
- iter++;
- }
- }
-}
-
-void
BucketDBUpdater::sendRequestBucketInfo(
uint16_t node,
const document::BucketId& bucket,
@@ -145,9 +123,8 @@ BucketDBUpdater::removeSuperfluousBuckets(
_bucketSpaceComponent.getBucketDatabase().forEach(proc);
- for (uint32_t i = 0; i < proc.getBucketsToRemove().size(); ++i) {
- _bucketSpaceComponent.getBucketDatabase()
- .remove(proc.getBucketsToRemove()[i]);
+ for (const auto & entry :proc.getBucketsToRemove()) {
+ _bucketSpaceComponent.getBucketDatabase().remove(entry);
}
}
@@ -247,9 +224,8 @@ BucketDBUpdater::onSetSystemState(
BucketDBUpdater::MergeReplyGuard::~MergeReplyGuard()
{
- if (_reply.get()) {
- _updater.getDistributorComponent().getDistributor()
- .handleCompletedMerge(_reply);
+ if (_reply) {
+ _updater.getDistributorComponent().getDistributor().handleCompletedMerge(_reply);
}
}
@@ -293,13 +269,8 @@ BucketDBUpdater::sendAllQueuedBucketRechecks()
"via NotifyBucketChange commands",
_enqueuedRechecks.size());
- typedef std::set<EnqueuedBucketRecheck>::const_iterator const_iterator;
- for (const_iterator it(_enqueuedRechecks.begin()),
- e(_enqueuedRechecks.end()); it != e; ++it)
- {
- sendRequestBucketInfo(it->node,
- it->bucket,
- std::shared_ptr<MergeReplyGuard>());
+ for (const auto & entry :_enqueuedRechecks) {
+ sendRequestBucketInfo(entry.node, entry.bucket, std::shared_ptr<MergeReplyGuard>());
}
_enqueuedRechecks.clear();
}
@@ -382,14 +353,14 @@ BucketDBUpdater::handleSingleBucketInfoFailure(
if (req.bucket != document::BucketId(0)) {
framework::MilliSecTime sendTime(_bucketSpaceComponent.getClock());
sendTime += framework::MilliSecTime(100);
- _delayedRequests.push_back(std::make_pair(sendTime, req));
+ _delayedRequests.emplace_back(sendTime, req);
}
}
void
BucketDBUpdater::resendDelayedMessages()
{
- if (_pendingClusterState.get()) {
+ if (_pendingClusterState) {
_pendingClusterState->resendDelayedMessages();
}
if (_delayedRequests.empty()) return; // Don't fetch time if not needed
@@ -398,9 +369,7 @@ BucketDBUpdater::resendDelayedMessages()
&& currentTime >= _delayedRequests.front().first)
{
BucketRequest& req(_delayedRequests.front().second);
- sendRequestBucketInfo(req.targetNode,
- req.bucket,
- std::shared_ptr<MergeReplyGuard>());
+ sendRequestBucketInfo(req.targetNode, req.bucket, std::shared_ptr<MergeReplyGuard>());
_delayedRequests.pop_front();
}
}
@@ -408,19 +377,13 @@ BucketDBUpdater::resendDelayedMessages()
void
BucketDBUpdater::convertBucketInfoToBucketList(
const std::shared_ptr<api::RequestBucketInfoReply>& repl,
- uint16_t targetNode,
- BucketListMerger::BucketList& newList)
+ uint16_t targetNode, BucketListMerger::BucketList& newList)
{
- for (uint32_t i = 0; i < repl->getBucketInfo().size(); i++) {
- LOG(debug,
- "Received bucket information from node %u for bucket %s: %s",
- targetNode,
- repl->getBucketInfo()[i]._bucketId.toString().c_str(),
- repl->getBucketInfo()[i]._info.toString().c_str());
+ for (const auto & entry : repl->getBucketInfo()) {
+ LOG(debug, "Received bucket information from node %u for bucket %s: %s", targetNode,
+ entry._bucketId.toString().c_str(), entry._info.toString().c_str());
- newList.push_back(BucketListMerger::BucketEntry(
- repl->getBucketInfo()[i]._bucketId,
- repl->getBucketInfo()[i]._info));
+ newList.emplace_back(entry._bucketId, entry._info);
}
}
@@ -446,8 +409,7 @@ bool
BucketDBUpdater::processSingleBucketInfoReply(
const std::shared_ptr<api::RequestBucketInfoReply> & repl)
{
- std::map<uint64_t, BucketRequest>::iterator iter =
- _sentMessages.find(repl->getMsgId());
+ auto iter = _sentMessages.find(repl->getMsgId());
// Has probably been deleted for some reason earlier.
if (iter == _sentMessages.end()) {
@@ -477,36 +439,30 @@ BucketDBUpdater::addBucketInfoForNode(
{
const BucketCopy* copy(e->getNode(node));
if (copy) {
- existing.push_back(BucketListMerger::BucketEntry(
- e.getBucketId(), copy->getBucketInfo()));
+ existing.emplace_back(e.getBucketId(), copy->getBucketInfo());
}
}
void
-BucketDBUpdater::findRelatedBucketsInDatabase(
- uint16_t node,
- const document::BucketId& bucketId,
- BucketListMerger::BucketList& existing)
+BucketDBUpdater::findRelatedBucketsInDatabase(uint16_t node, const document::BucketId& bucketId,
+ BucketListMerger::BucketList& existing)
{
std::vector<BucketDatabase::Entry> entries;
_bucketSpaceComponent.getBucketDatabase().getAll(bucketId, entries);
- for (uint32_t j = 0; j < entries.size(); ++j) {
- addBucketInfoForNode(entries[j], node, existing);
+ for (const BucketDatabase::Entry & entry : entries) {
+ addBucketInfoForNode(entry, node, existing);
}
}
void
BucketDBUpdater::updateDatabase(uint16_t node, BucketListMerger& merger)
{
- for (uint32_t i = 0; i < merger.getRemovedEntries().size(); i++) {
- _bucketSpaceComponent.removeNodeFromDB(merger.getRemovedEntries()[i], node);
+ for (const document::BucketId & bucket : merger.getRemovedEntries()) {
+ _bucketSpaceComponent.removeNodeFromDB(bucket, node);
}
- for (uint32_t i = 0; i < merger.getAddedEntries().size(); i++) {
- const BucketListMerger::BucketEntry& entry(
- merger.getAddedEntries()[i]);
-
+ for (const BucketListMerger::BucketEntry& entry : merger.getAddedEntries()) {
_bucketSpaceComponent.updateBucketDatabase(
entry.first,
BucketCopy(merger.getTimestamp(), node, entry.second),
@@ -603,12 +559,11 @@ BucketDBUpdater::reportXmlStatus(vespalib::xml::XmlOutputStream& xos,
<< XmlTag("systemstate_active")
<< XmlContent(_bucketSpaceComponent.getClusterState().toString())
<< XmlEndTag();
- if (_pendingClusterState.get() != 0) {
+ if (_pendingClusterState) {
xos << *_pendingClusterState;
}
xos << XmlTag("systemstate_history");
- typedef std::list<PendingClusterState::Summary>::const_reverse_iterator HistoryIter;
- for (HistoryIter i(_history.rbegin()), e(_history.rend()); i != e; ++i) {
+ for (auto i(_history.rbegin()), e(_history.rend()); i != e; ++i) {
xos << XmlTag("change")
<< XmlAttribute("from", i->_prevClusterState)
<< XmlAttribute("to", i->_newClusterState)
@@ -617,18 +572,16 @@ BucketDBUpdater::reportXmlStatus(vespalib::xml::XmlOutputStream& xos,
}
xos << XmlEndTag()
<< XmlTag("single_bucket_requests");
- for (std::map<uint64_t, BucketRequest>::const_iterator iter
- = _sentMessages.begin(); iter != _sentMessages.end(); iter++)
+ for (const auto & entry : _sentMessages)
{
xos << XmlTag("storagenode")
- << XmlAttribute("index", iter->second.targetNode);
- if (iter->second.bucket.getRawId() == 0) {
+ << XmlAttribute("index", entry.second.targetNode);
+ if (entry.second.bucket.getRawId() == 0) {
xos << XmlAttribute("bucket", ALL);
} else {
- xos << XmlAttribute("bucket", iter->second.bucket.getId(),
- XmlAttribute::HEX);
+ xos << XmlAttribute("bucket", entry.second.bucket.getId(), XmlAttribute::HEX);
}
- xos << XmlAttribute("sendtimestamp", iter->second.timestamp)
+ xos << XmlAttribute("sendtimestamp", entry.second.timestamp)
<< XmlEndTag();
}
xos << XmlEndTag() << XmlEndTag();
@@ -642,17 +595,13 @@ BucketDBUpdater::BucketListGenerator::process(BucketDatabase::Entry& e)
const BucketCopy* copy(e->getNode(_node));
if (copy) {
- _entries.push_back(
- BucketListMerger::BucketEntry(
- bucketId,
- copy->getBucketInfo()));
+ _entries.emplace_back(bucketId, copy->getBucketInfo());
}
return true;
}
void
-BucketDBUpdater::NodeRemover::logRemove(const document::BucketId& bucketId,
- const char* msg) const
+BucketDBUpdater::NodeRemover::logRemove(const document::BucketId& bucketId, const char* msg) const
{
LOG(spam, "Removing bucket %s: %s", bucketId.toString().c_str(), msg);
LOG_BUCKET_OPERATION_NO_LOCK(bucketId, msg);
@@ -748,7 +697,7 @@ BucketDBUpdater::NodeRemover::process(BucketDatabase::Entry& e)
BucketDBUpdater::NodeRemover::~NodeRemover()
{
- if (_removedBuckets.size() > 0) {
+ if ( !_removedBuckets.empty()) {
std::ostringstream ost;
ost << "After system state change "
<< _oldState.getTextualDifference(_state) << ", we removed "
@@ -760,7 +709,7 @@ BucketDBUpdater::NodeRemover::~NodeRemover()
if (_removedBuckets.size() >= 10) {
ost << " ...";
}
- LOGBM(info, ost.str().c_str());
+ LOGBM(info, "%s", ost.str().c_str());
}
}
diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h
index 6081f782cd5..d3b2bbf86ca 100644
--- a/storage/src/vespa/storage/distributor/bucketdbupdater.h
+++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h
@@ -36,16 +36,11 @@ public:
~BucketDBUpdater();
void flush();
-
BucketOwnership checkOwnershipInPendingState(const document::BucketId&) const;
-
void recheckBucketInfo(uint32_t nodeIdx, const document::BucketId& bid);
bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& cmd) override;
-
- bool onRequestBucketInfoReply(
- const std::shared_ptr<api::RequestBucketInfoReply> & repl) override;
-
+ bool onRequestBucketInfoReply(const std::shared_ptr<api::RequestBucketInfoReply> & repl) override;
bool onMergeBucketReply(const std::shared_ptr<api::MergeBucketReply>& reply) override;
bool onNotifyBucketChange(const std::shared_ptr<api::NotifyBucketChangeCommand>&) override;
void resendDelayedMessages();
@@ -72,8 +67,7 @@ private:
ManagedBucketSpaceComponent _bucketSpaceComponent;
class MergeReplyGuard {
public:
- MergeReplyGuard(BucketDBUpdater& updater,
- const std::shared_ptr<api::MergeBucketReply>& reply)
+ MergeReplyGuard(BucketDBUpdater& updater, const std::shared_ptr<api::MergeBucketReply>& reply)
: _updater(updater), _reply(reply) {}
~MergeReplyGuard();
@@ -90,9 +84,7 @@ private:
BucketRequest()
: targetNode(0), bucket(0), timestamp(0) {};
- BucketRequest(uint16_t t,
- uint64_t currentTime,
- const document::BucketId& b,
+ BucketRequest(uint16_t t, uint64_t currentTime, const document::BucketId& b,
const std::shared_ptr<MergeReplyGuard>& guard)
: targetNode(t),
bucket(b),
@@ -112,8 +104,7 @@ private:
EnqueuedBucketRecheck() : node(0), bucket() {}
- EnqueuedBucketRecheck(uint16_t _node,
- const document::BucketId& _bucket)
+ EnqueuedBucketRecheck(uint16_t _node, const document::BucketId& _bucket)
: node(_node),
bucket(_bucket)
{}
@@ -130,35 +121,21 @@ private:
};
bool hasPendingClusterState() const;
-
- void clearPending(uint16_t node);
-
- bool pendingClusterStateAccepted(
- const std::shared_ptr<api::RequestBucketInfoReply>& repl);
- bool bucketOwnedAccordingToPendingState(
- const document::BucketId& bucketId) const;
- bool processSingleBucketInfoReply(
- const std::shared_ptr<api::RequestBucketInfoReply>& repl);
- void handleSingleBucketInfoFailure(
- const std::shared_ptr<api::RequestBucketInfoReply>& repl,
- const BucketRequest& req);
+ bool pendingClusterStateAccepted(const std::shared_ptr<api::RequestBucketInfoReply>& repl);
+ bool bucketOwnedAccordingToPendingState(const document::BucketId& bucketId) const;
+ bool processSingleBucketInfoReply(const std::shared_ptr<api::RequestBucketInfoReply>& repl);
+ void handleSingleBucketInfoFailure(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
+ const BucketRequest& req);
bool isPendingClusterStateCompleted() const;
void processCompletedPendingClusterState();
- void mergeBucketInfoWithDatabase(
- const std::shared_ptr<api::RequestBucketInfoReply>& repl,
- const BucketRequest& req);
- void convertBucketInfoToBucketList(
- const std::shared_ptr<api::RequestBucketInfoReply>& repl,
- uint16_t targetNode,
- BucketListMerger::BucketList& newList);
- void sendRequestBucketInfo(
- uint16_t node,
- const document::BucketId& bucket,
- const std::shared_ptr<MergeReplyGuard>& mergeReply);
- void addBucketInfoForNode(
- const BucketDatabase::Entry& e,
- uint16_t node,
- BucketListMerger::BucketList& existing) const;
+ void mergeBucketInfoWithDatabase(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
+ const BucketRequest& req);
+ void convertBucketInfoToBucketList(const std::shared_ptr<api::RequestBucketInfoReply>& repl,
+ uint16_t targetNode, BucketListMerger::BucketList& newList);
+ void sendRequestBucketInfo(uint16_t node, const document::BucketId& bucket,
+ const std::shared_ptr<MergeReplyGuard>& mergeReply);
+ void addBucketInfoForNode(const BucketDatabase::Entry& e, uint16_t node,
+ BucketListMerger::BucketList& existing) const;
void ensureTransitionTimerStarted();
void completeTransitionTimer();
/**
@@ -167,10 +144,8 @@ private:
* in bucketId, or that bucketId is contained in, that have copies
* on the given node.
*/
- void findRelatedBucketsInDatabase(
- uint16_t node,
- const document::BucketId& bucketId,
- BucketListMerger::BucketList& existing);
+ void findRelatedBucketsInDatabase(uint16_t node, const document::BucketId& bucketId,
+ BucketListMerger::BucketList& existing);
/**
Updates the bucket database from the information generated by the given
@@ -178,18 +153,15 @@ private:
*/
void updateDatabase(uint16_t node, BucketListMerger& merger);
- void updateState(const lib::ClusterState& oldState,
- const lib::ClusterState& newState);
+ void updateState(const lib::ClusterState& oldState, const lib::ClusterState& newState);
- void removeSuperfluousBuckets(const lib::Distribution& newDistribution,
- const lib::ClusterState& newState);
+ void removeSuperfluousBuckets(const lib::Distribution& newDistribution, const lib::ClusterState& newState);
void replyToPreviousPendingClusterStateIfAny();
void enableCurrentClusterStateInDistributor();
void addCurrentStateToClusterStateHistory();
- void enqueueRecheckUntilPendingStateEnabled(uint16_t node,
- const document::BucketId&);
+ void enqueueRecheckUntilPendingStateEnabled(uint16_t node, const document::BucketId&);
void sendAllQueuedBucketRechecks();
friend class BucketDBUpdater_Test;
@@ -198,8 +170,7 @@ private:
class BucketListGenerator
{
public:
- BucketListGenerator(uint16_t node,
- BucketListMerger::BucketList& entries)
+ BucketListGenerator(uint16_t node, BucketListMerger::BucketList& entries)
: _node(node), _entries(entries) {};
bool process(BucketDatabase::Entry&);
@@ -237,8 +208,7 @@ private:
return _removedBuckets;
}
private:
- void setCopiesInEntry(BucketDatabase::Entry& e,
- const std::vector<BucketCopy>& copies) const;
+ void setCopiesInEntry(BucketDatabase::Entry& e, const std::vector<BucketCopy>& copies) const;
void removeEmptyBucket(const document::BucketId& bucketId);
const lib::ClusterState _oldState;
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index 53df19fd10c..3dfe5103654 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -203,23 +203,26 @@ Distributor::onOpen()
}
}
-void
-Distributor::onClose()
-{
- for (uint32_t i=0; i<_messageQueue.size(); ++i) {
- std::shared_ptr<api::StorageMessage> msg = _messageQueue[i];
+void Distributor::send_shutdown_abort_reply(const std::shared_ptr<api::StorageMessage>& msg) {
+ api::StorageReply::UP reply(
+ std::dynamic_pointer_cast<api::StorageCommand>(msg)->makeReply());
+ reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Distributor is shutting down"));
+ sendUp(std::shared_ptr<api::StorageMessage>(reply.release()));
+}
+
+void Distributor::onClose() {
+ for (auto& msg : _messageQueue) {
if (!msg->getType().isReply()) {
- api::StorageReply::UP reply(
- std::dynamic_pointer_cast<api::StorageCommand>(msg)
- ->makeReply());
- reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED,
- "Distributor is shutting down"));
- sendUp(std::shared_ptr<api::StorageMessage>(reply.release()));
+ send_shutdown_abort_reply(msg);
}
}
_messageQueue.clear();
+ while (!_client_request_priority_queue.empty()) {
+ send_shutdown_abort_reply(_client_request_priority_queue.top());
+ _client_request_priority_queue.pop();
+ }
- LOG(debug, "Distributor::onFlush invoked");
+ LOG(debug, "Distributor::onClose invoked");
_bucketDBUpdater.flush();
_operationOwner.onClose();
_maintenanceOperationOwner.onClose();
@@ -619,10 +622,10 @@ void Distributor::startExternalOperations() {
const bool start_single_client_request = !_client_request_priority_queue.empty();
if (start_single_client_request) {
- auto& msg = _client_request_priority_queue.top();
+ const auto& msg = _client_request_priority_queue.top();
MBUS_TRACE(msg->getTrace(), 9, "Distributor: Grabbed from "
"client request priority queue to be processed.");
- handle_or_propagate_message(msg); // TODO move() once we've move-enabled our message chains
+ handle_or_propagate_message(msg);
_client_request_priority_queue.pop();
}
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index 438001acc40..5856aca71a1 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -187,11 +187,11 @@ private:
};
void setNodeStateUp();
-
bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg);
bool isMaintenanceReply(const api::StorageReply& reply) const;
void handleStatusRequests();
+ void send_shutdown_abort_reply(const std::shared_ptr<api::StorageMessage>&);
void handle_or_propagate_message(const std::shared_ptr<api::StorageMessage>& msg);
void startExternalOperations();
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index b46217f6443..ac3d901fd65 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -89,8 +89,7 @@ FileStorManager::~FileStorManager()
}
void
-FileStorManager::print(std::ostream& out, bool verbose,
- const std::string& indent) const
+FileStorManager::print(std::ostream& out, bool verbose, const std::string& indent) const
{
(void) verbose; (void) indent;
out << "FileStorManager";
@@ -128,20 +127,14 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC
LOG(spam, "Setting up disk %u", i);
for (uint32_t j = 0; j < 4; j++) {
_disks[i].push_back(DiskThread::SP(
- new PersistenceThread(
- _compReg, _configUri, *_provider,
- *_filestorHandler,
- *_metrics->disks[i]->threads[j],
- i, 255, false)));
+ new PersistenceThread(_compReg, _configUri, *_provider, *_filestorHandler,
+ *_metrics->disks[i]->threads[j], i, 255)));
}
for (uint32_t j = 4; j < 6; j++) {
_disks[i].push_back(DiskThread::SP(
- new PersistenceThread(
- _compReg, _configUri, *_provider,
- *_filestorHandler,
- *_metrics->disks[i]->threads[j],
- i, 100)));
+ new PersistenceThread(_compReg, _configUri, *_provider, *_filestorHandler,
+ *_metrics->disks[i]->threads[j], i, 100)));
}
}
@@ -149,12 +142,8 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC
LOG(spam, "Setting up disk %u, thread %u with priority %d",
i, j, _config->threads[j].lowestpri);
_disks[i].push_back(DiskThread::SP(
- new PersistenceThread(
- _compReg, _configUri, *_provider,
- *_filestorHandler,
- *_metrics->disks[i]->threads[j],
- i, _config->threads[j].lowestpri,
- false)));
+ new PersistenceThread(_compReg, _configUri, *_provider, *_filestorHandler,
+ *_metrics->disks[i]->threads[j], i, _config->threads[j].lowestpri)));
}
} else {
@@ -389,11 +378,9 @@ FileStorManager::onRevert(const shared_ptr<api::RevertCommand>& cmd)
}
bool
-FileStorManager::onMultiOperation(
- const std::shared_ptr<api::MultiOperationCommand>& cmd)
+FileStorManager::onMultiOperation(const std::shared_ptr<api::MultiOperationCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(
- *cmd, 0));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, 0));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -401,11 +388,9 @@ FileStorManager::onMultiOperation(
}
bool
-FileStorManager::onBatchPutRemove(
- const std::shared_ptr<api::BatchPutRemoveCommand>& cmd)
+FileStorManager::onBatchPutRemove(const std::shared_ptr<api::BatchPutRemoveCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(
- *cmd, 0));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, 0));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -413,11 +398,9 @@ FileStorManager::onBatchPutRemove(
}
bool
-FileStorManager::onRemoveLocation(
- const std::shared_ptr<api::RemoveLocationCommand>& cmd)
+FileStorManager::onRemoveLocation(const std::shared_ptr<api::RemoveLocationCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -425,11 +408,9 @@ FileStorManager::onRemoveLocation(
}
bool
-FileStorManager::onStatBucket(
- const std::shared_ptr<api::StatBucketCommand>& cmd)
+FileStorManager::onStatBucket(const std::shared_ptr<api::StatBucketCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -478,8 +459,7 @@ FileStorManager::onCreateBucket(
}
}
}
- std::shared_ptr<api::CreateBucketReply> reply(
- (api::CreateBucketReply*)cmd->makeReply().release());
+ std::shared_ptr<api::CreateBucketReply> reply((api::CreateBucketReply*)cmd->makeReply().release());
reply->setBucketInfo(api::BucketInfo(0, 0, 0, 0, 0, true, cmd->getActive()));
reply->setResult(code);
sendUp(reply);
@@ -491,12 +471,10 @@ FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd)
{
uint16_t disk;
{
- StorBucketDatabase::WrappedEntry entry(
- _component.getBucketDatabase().get(
- cmd->getBucketId(), "FileStorManager::onDeleteBucket"));
+ StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase().get(cmd->getBucketId(),
+ "FileStorManager::onDeleteBucket"));
if (!entry.exist()) {
- LOG(debug, "%s was already deleted",
- cmd->getBucketId().toString().c_str());
+ LOG(debug, "%s was already deleted", cmd->getBucketId().toString().c_str());
std::shared_ptr<api::StorageReply> reply(cmd->makeReply().release());
sendUp(reply);
return true;
@@ -520,10 +498,8 @@ FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd)
LOG(debug, "Rejecting bucket delete: %s", ost.str().c_str());
std::shared_ptr<api::StorageReply> reply(cmd->makeReply().release());
- static_cast<api::DeleteBucketReply&>(*reply).setBucketInfo(
- entry->getBucketInfo());
- reply->setResult(api::ReturnCode(api::ReturnCode::REJECTED,
- ost.str()));
+ static_cast<api::DeleteBucketReply&>(*reply).setBucketInfo(entry->getBucketInfo());
+ reply->setResult(api::ReturnCode(api::ReturnCode::REJECTED, ost.str()));
entry.unlock();
sendUp(reply);
return true;
@@ -538,13 +514,10 @@ FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd)
disk = entry->disk;
entry.remove();
}
- _filestorHandler->failOperations(
- cmd->getBucketId(),
- disk,
- api::ReturnCode(api::ReturnCode::BUCKET_DELETED,
- vespalib::make_string(
- "Bucket %s about to be deleted anyway",
- cmd->getBucketId().toString().c_str())));
+ _filestorHandler->failOperations(cmd->getBucketId(), disk,
+ api::ReturnCode(api::ReturnCode::BUCKET_DELETED,
+ vespalib::make_string("Bucket %s about to be deleted anyway",
+ cmd->getBucketId().toString().c_str())));
return true;
}
@@ -564,10 +537,7 @@ FileStorManager::ensureConsistentBucket(
// Don't create empty bucket if merge isn't allowed to continue.
entry.remove();
}
- replyDroppedOperation(msg,
- bucket,
- api::ReturnCode::ABORTED,
- "bucket is inconsistently split");
+ replyDroppedOperation(msg, bucket, api::ReturnCode::ABORTED, "bucket is inconsistently split");
return StorBucketDatabase::WrappedEntry();
}
@@ -577,10 +547,8 @@ FileStorManager::ensureConsistentBucket(
bool
FileStorManager::onMergeBucket(const shared_ptr<api::MergeBucketCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(
- ensureConsistentBucket(cmd->getBucketId(),
- *cmd,
- "FileStorManager::onMergeBucket"));
+ StorBucketDatabase::WrappedEntry entry(ensureConsistentBucket(cmd->getBucketId(), *cmd,
+ "FileStorManager::onMergeBucket"));
if (!entry.exist()) {
return true;
}
@@ -589,26 +557,20 @@ FileStorManager::onMergeBucket(const shared_ptr<api::MergeBucketCommand>& cmd)
entry->disk = _component.getIdealPartition(cmd->getBucketId());
if (_partitions[entry->disk].isUp()) {
entry->info = api::BucketInfo(0, 0, 0, 0, 0, true, false);
- LOG(debug, "Created bucket %s on disk %d (node index is %d) due "
- "to merge being received.",
- cmd->getBucketId().toString().c_str(),
- entry->disk, _component.getIndex());
+ LOG(debug, "Created bucket %s on disk %d (node index is %d) due to merge being received.",
+ cmd->getBucketId().toString().c_str(), entry->disk, _component.getIndex());
// Call before writing bucket entry as we need to have bucket
// lock while calling
handlePersistenceMessage(cmd, entry->disk);
entry.write();
} else {
entry.remove();
- api::ReturnCode code(
- api::ReturnCode::IO_FAILURE,
+ api::ReturnCode code(api::ReturnCode::IO_FAILURE,
vespalib::make_string(
"Trying to perform merge %s whose bucket belongs on target disk %d, which is down. Cluster state version of command is %d, our system state version is %d",
- cmd->toString().c_str(),
- entry->disk,
- cmd->getClusterStateVersion(),
+ cmd->toString().c_str(), entry->disk, cmd->getClusterStateVersion(),
_component.getStateUpdater().getSystemState()->getVersion()));
- LOGBT(debug, cmd->getBucketId().toString(),
- "%s", code.getMessage().c_str());
+ LOGBT(debug, cmd->getBucketId().toString(), "%s", code.getMessage().c_str());
api::MergeBucketReply::SP reply(new api::MergeBucketReply(*cmd));
reply->setResult(code);
sendUp(reply);
@@ -668,17 +630,13 @@ FileStorManager::onGetBucketDiff(
}
bool
-FileStorManager::validateApplyDiffCommandBucket(
- api::StorageMessage& msg,
- const StorBucketDatabase::WrappedEntry& entry)
+FileStorManager::validateApplyDiffCommandBucket(api::StorageMessage& msg, const StorBucketDatabase::WrappedEntry& entry)
{
if (!entry.exist()) {
return false;
}
if (!_component.getBucketDatabase().isConsistent(entry)) {
- replyDroppedOperation(msg,
- entry.getBucketId(),
- api::ReturnCode::ABORTED,
+ replyDroppedOperation(msg, entry.getBucketId(), api::ReturnCode::ABORTED,
"bucket became inconsistent during merging");
return false;
}
@@ -686,31 +644,26 @@ FileStorManager::validateApplyDiffCommandBucket(
}
bool
-FileStorManager::validateDiffReplyBucket(
- const StorBucketDatabase::WrappedEntry& entry,
- const document::BucketId& bucket)
+FileStorManager::validateDiffReplyBucket(const StorBucketDatabase::WrappedEntry& entry,
+ const document::BucketId& bucket)
{
if (!entry.exist()) {
_filestorHandler->clearMergeStatus(bucket,
- api::ReturnCode(api::ReturnCode::BUCKET_NOT_FOUND,
- "Bucket removed during merge"));
+ api::ReturnCode(api::ReturnCode::BUCKET_NOT_FOUND, "Bucket removed during merge"));
return false;
}
if (!_component.getBucketDatabase().isConsistent(entry)) {
_filestorHandler->clearMergeStatus(bucket,
- api::ReturnCode(api::ReturnCode::ABORTED,
- "Bucket became inconsistent during merging"));
+ api::ReturnCode(api::ReturnCode::ABORTED, "Bucket became inconsistent during merging"));
return false;
}
return true;
}
bool
-FileStorManager::onGetBucketDiffReply(
- const shared_ptr<api::GetBucketDiffReply>& reply)
+FileStorManager::onGetBucketDiffReply(const shared_ptr<api::GetBucketDiffReply>& reply)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *reply, reply->getBucketId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*reply, reply->getBucketId()));
if (validateDiffReplyBucket(entry, reply->getBucketId())) {
handlePersistenceMessage(reply, entry->disk);
}
@@ -718,11 +671,9 @@ FileStorManager::onGetBucketDiffReply(
}
bool
-FileStorManager::onApplyBucketDiff(
- const shared_ptr<api::ApplyBucketDiffCommand>& cmd)
+FileStorManager::onApplyBucketDiff(const shared_ptr<api::ApplyBucketDiffCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (validateApplyDiffCommandBucket(*cmd, entry)) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -730,8 +681,7 @@ FileStorManager::onApplyBucketDiff(
}
bool
-FileStorManager::onApplyBucketDiffReply(
- const shared_ptr<api::ApplyBucketDiffReply>& reply)
+FileStorManager::onApplyBucketDiffReply(const shared_ptr<api::ApplyBucketDiffReply>& reply)
{
StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
*reply, reply->getBucketId()));
@@ -742,8 +692,7 @@ FileStorManager::onApplyBucketDiffReply(
}
bool
-FileStorManager::onJoinBuckets(
- const std::shared_ptr<api::JoinBucketsCommand>& cmd)
+FileStorManager::onJoinBuckets(const std::shared_ptr<api::JoinBucketsCommand>& cmd)
{
StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase().get(
cmd->getBucketId(), "FileStorManager::onJoinBuckets"));
@@ -757,11 +706,9 @@ FileStorManager::onJoinBuckets(
}
bool
-FileStorManager::onSplitBucket(
- const std::shared_ptr<api::SplitBucketCommand>& cmd)
+FileStorManager::onSplitBucket(const std::shared_ptr<api::SplitBucketCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -772,8 +719,7 @@ bool
FileStorManager::onSetBucketState(
const std::shared_ptr<api::SetBucketStateCommand>& cmd)
{
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -786,10 +732,8 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
switch (msg->getType()) {
case GetIterCommand::ID:
{
- shared_ptr<GetIterCommand> cmd(
- std::static_pointer_cast<GetIterCommand>(msg));
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ shared_ptr<GetIterCommand> cmd(std::static_pointer_cast<GetIterCommand>(msg));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -797,10 +741,8 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
}
case CreateIteratorCommand::ID:
{
- shared_ptr<CreateIteratorCommand> cmd(
- std::static_pointer_cast<CreateIteratorCommand>(msg));
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ shared_ptr<CreateIteratorCommand> cmd(std::static_pointer_cast<CreateIteratorCommand>(msg));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -808,28 +750,22 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
}
case DestroyIteratorCommand::ID:
{
- spi::Context context(msg->getLoadType(), msg->getPriority(),
- msg->getTrace().getLevel());
- shared_ptr<DestroyIteratorCommand> cmd(
- std::static_pointer_cast<DestroyIteratorCommand>(msg));
+ spi::Context context(msg->getLoadType(), msg->getPriority(), msg->getTrace().getLevel());
+ shared_ptr<DestroyIteratorCommand> cmd(std::static_pointer_cast<DestroyIteratorCommand>(msg));
_provider->destroyIterator(cmd->getIteratorId(), context);
msg->getTrace().getRoot().addChild(context.getTrace().getRoot());
return true;
}
case ReadBucketList::ID:
{
- shared_ptr<ReadBucketList> cmd(
- std::static_pointer_cast<ReadBucketList>(msg));
-
+ shared_ptr<ReadBucketList> cmd(std::static_pointer_cast<ReadBucketList>(msg));
handlePersistenceMessage(cmd, cmd->getPartition());
return true;
}
case ReadBucketInfo::ID:
{
- shared_ptr<ReadBucketInfo> cmd(
- std::static_pointer_cast<ReadBucketInfo>(msg));
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ shared_ptr<ReadBucketInfo> cmd(std::static_pointer_cast<ReadBucketInfo>(msg));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -837,10 +773,8 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
}
case InternalBucketJoinCommand::ID:
{
- shared_ptr<InternalBucketJoinCommand> cmd(
- std::static_pointer_cast<InternalBucketJoinCommand>(msg));
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ shared_ptr<InternalBucketJoinCommand> cmd(std::static_pointer_cast<InternalBucketJoinCommand>(msg));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -848,10 +782,8 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
}
case RepairBucketCommand::ID:
{
- shared_ptr<RepairBucketCommand> cmd(
- std::static_pointer_cast<RepairBucketCommand>(msg));
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ shared_ptr<RepairBucketCommand> cmd(std::static_pointer_cast<RepairBucketCommand>(msg));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -859,10 +791,8 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
}
case BucketDiskMoveCommand::ID:
{
- shared_ptr<BucketDiskMoveCommand> cmd(
- std::static_pointer_cast<BucketDiskMoveCommand>(msg));
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ shared_ptr<BucketDiskMoveCommand> cmd(std::static_pointer_cast<BucketDiskMoveCommand>(msg));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -870,10 +800,8 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
}
case RecheckBucketInfoCommand::ID:
{
- shared_ptr<RecheckBucketInfoCommand> cmd(
- std::static_pointer_cast<RecheckBucketInfoCommand>(msg));
- StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(
- *cmd, cmd->getBucketId()));
+ shared_ptr<RecheckBucketInfoCommand> cmd(std::static_pointer_cast<RecheckBucketInfoCommand>(msg));
+ StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucketId()));
if (entry.exist()) {
handlePersistenceMessage(cmd, entry->disk);
}
@@ -881,8 +809,7 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
}
case AbortBucketOperationsCommand::ID:
{
- shared_ptr<AbortBucketOperationsCommand> cmd(
- std::static_pointer_cast<AbortBucketOperationsCommand>(msg));
+ shared_ptr<AbortBucketOperationsCommand> cmd(std::static_pointer_cast<AbortBucketOperationsCommand>(msg));
handleAbortBucketOperations(cmd);
return true;
}
@@ -892,8 +819,7 @@ FileStorManager::onInternal(const shared_ptr<api::InternalCommand>& msg)
}
void
-FileStorManager::handleAbortBucketOperations(
- const shared_ptr<AbortBucketOperationsCommand>& cmd)
+FileStorManager::handleAbortBucketOperations(const shared_ptr<AbortBucketOperationsCommand>& cmd)
{
_filestorHandler->abortQueuedOperations(*cmd);
sendReply(api::StorageReply::SP(cmd->makeReply().release()));
@@ -925,8 +851,7 @@ FileStorManager::sendReply(const std::shared_ptr<api::StorageReply>& reply)
LOG(spam, "Sending reply %s", reply->toString().c_str());
if (reply->getType() == api::MessageType::INTERNAL_REPLY) {
- std::shared_ptr<api::InternalReply> rep(
- std::dynamic_pointer_cast<api::InternalReply>(reply));
+ std::shared_ptr<api::InternalReply> rep(std::dynamic_pointer_cast<api::InternalReply>(reply));
assert(rep.get());
if (onInternalReply(rep)) return;
}
@@ -1021,8 +946,7 @@ FileStorManager::isMerging(const document::BucketId& bucket) const
namespace {
struct Deactivator {
- StorBucketDatabase::Decision operator()(
- document::BucketId::Type, StorBucketDatabase::Entry& data)
+ StorBucketDatabase::Decision operator()(document::BucketId::Type, StorBucketDatabase::Entry& data)
{
data.info.setActive(false);
return StorBucketDatabase::UPDATE;
@@ -1034,21 +958,16 @@ void
FileStorManager::updateState()
{
lib::ClusterState::CSP state(_component.getStateUpdater().getSystemState());
- spi::ClusterState spiState(
- *state, _component.getIndex(), *_component.getDistribution());
+ spi::ClusterState spiState(*state, _component.getIndex(), *_component.getDistribution());
lib::Node node(_component.getNodeType(), _component.getIndex());
bool nodeUp = state->getNodeState(node).getState().oneOf("uir");
- LOG(debug, "FileStorManager received cluster state '%s'",
- state->toString().c_str());
+ LOG(debug, "FileStorManager received cluster state '%s'", state->toString().c_str());
// If edge where we go down
if (_nodeUpInLastNodeStateSeenByProvider && !nodeUp) {
- LOG(debug,
- "Received cluster state where this node is down; "
- "de-activating all buckets in database");
+ LOG(debug, "Received cluster state where this node is down; de-activating all buckets in database");
Deactivator deactivator;
- _component.getBucketDatabase().all(
- deactivator, "FileStorManager::updateState");
+ _component.getBucketDatabase().all(deactivator, "FileStorManager::updateState");
}
_provider->setClusterState(spiState);
_nodeUpInLastNodeStateSeenByProvider = nodeUp;
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index 0ab512cd63f..11f8b5d1cf4 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -5,7 +5,6 @@
#include "bucketownershipnotifier.h"
#include "testandsethelper.h"
#include <vespa/storageapi/message/multioperation.h>
-#include <vespa/storage/bucketdb/storbucketdb.h>
#include <vespa/storageapi/message/bucketsplitting.h>
#include <vespa/storage/common/bucketoperationlogger.h>
#include <vespa/document/fieldset/fieldsetrepo.h>
@@ -24,15 +23,8 @@ PersistenceThread::PersistenceThread(ServiceLayerComponentRegister& compReg,
FileStorHandler& filestorHandler,
FileStorThreadMetrics& metrics,
uint16_t deviceIndex,
- uint8_t lowestPriority,
- bool startThread)
- : _env(configUri,
- compReg,
- filestorHandler,
- metrics,
- deviceIndex,
- lowestPriority,
- provider),
+ uint8_t lowestPriority)
+ : _env(configUri, compReg, filestorHandler, metrics, deviceIndex, lowestPriority, provider),
_warnOnSlowOperations(5000),
_spi(provider),
_processAllHandler(_env, provider),
@@ -43,13 +35,10 @@ PersistenceThread::PersistenceThread(ServiceLayerComponentRegister& compReg,
_flushMonitor(),
_closed(false)
{
- (void) startThread;
std::ostringstream threadName;
- threadName << "Disk " << _env._partition << " thread "
- << (void*) this;
+ threadName << "Disk " << _env._partition << " thread " << (void*) this;
_component.reset(new ServiceLayerComponent(compReg, threadName.str()));
- _bucketOwnershipNotifier.reset(
- new BucketOwnershipNotifier(*_component, filestorHandler));
+ _bucketOwnershipNotifier.reset(new BucketOwnershipNotifier(*_component, filestorHandler));
framework::MilliSecTime maxProcessingTime(60 * 1000);
framework::MilliSecTime waitTime(1000);
_thread = _component->startThread(*this, maxProcessingTime, waitTime);
@@ -57,8 +46,7 @@ PersistenceThread::PersistenceThread(ServiceLayerComponentRegister& compReg,
PersistenceThread::~PersistenceThread()
{
- LOG(debug, "Shutting down persistence thread. Waiting for current "
- "operation to finish.");
+ LOG(debug, "Shutting down persistence thread. Waiting for current operation to finish.");
_thread->interrupt();
LOG(debug, "Waiting for thread to terminate.");
_thread->join();
@@ -66,8 +54,7 @@ PersistenceThread::~PersistenceThread()
}
spi::Bucket
-PersistenceThread::getBucket(const DocumentId& id,
- const BucketId& bucket) const
+PersistenceThread::getBucket(const DocumentId& id, const BucketId& bucket) const
{
BucketId docBucket(_env._bucketFactory.getBucketId(id));
docBucket.setUsedBits(bucket.getUsedBits());
@@ -82,8 +69,7 @@ PersistenceThread::getBucket(const DocumentId& id,
}
bool
-PersistenceThread::checkForError(const spi::Result& response,
- MessageTracker& tracker)
+PersistenceThread::checkForError(const spi::Result& response, MessageTracker& tracker)
{
uint32_t code = _env.convertErrorCode(response);
diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h
index 218d2f7dd23..4d714dc878a 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.h
+++ b/storage/src/vespa/storage/persistence/persistencethread.h
@@ -16,31 +16,22 @@ namespace storage {
class BucketOwnershipNotifier;
class TestAndSetHelper;
-class PersistenceThread : public DiskThread, public Types
+class PersistenceThread final : public DiskThread, public Types
{
public:
- PersistenceThread(ServiceLayerComponentRegister&,
- const config::ConfigUri & configUri,
- spi::PersistenceProvider& provider,
- FileStorHandler& filestorHandler,
- FileStorThreadMetrics& metrics,
- uint16_t deviceIndex,
- uint8_t lowestPriority,
- bool startThread = false);
+ PersistenceThread(ServiceLayerComponentRegister&, const config::ConfigUri & configUri,
+ spi::PersistenceProvider& provider, FileStorHandler& filestorHandler,
+ FileStorThreadMetrics& metrics, uint16_t deviceIndex, uint8_t lowestPriority);
~PersistenceThread();
/** Waits for current operation to be finished. */
void flush() override;
-
- bool isMerging(const BucketId& bucket) const;
-
framework::Thread& getThread() override { return *_thread; }
MessageTracker::UP handlePut(api::PutCommand& cmd);
MessageTracker::UP handleRemove(api::RemoveCommand& cmd);
MessageTracker::UP handleUpdate(api::UpdateCommand& cmd);
MessageTracker::UP handleGet(api::GetCommand& cmd);
-
MessageTracker::UP handleMultiOperation(api::MultiOperationCommand& cmd);
MessageTracker::UP handleRevert(api::RevertCommand& cmd);
MessageTracker::UP handleCreateBucket(api::CreateBucketCommand& cmd);
@@ -57,37 +48,31 @@ public:
MessageTracker::UP handleRecheckBucketInfo(RecheckBucketInfoCommand& cmd);
private:
- PersistenceUtil _env;
- uint32_t _warnOnSlowOperations;
-
+ PersistenceUtil _env;
+ uint32_t _warnOnSlowOperations;
spi::PersistenceProvider& _spi;
- ProcessAllHandler _processAllHandler;
- MergeHandler _mergeHandler;
- DiskMoveOperationHandler _diskMoveHandler;
+ ProcessAllHandler _processAllHandler;
+ MergeHandler _mergeHandler;
+ DiskMoveOperationHandler _diskMoveHandler;
ServiceLayerComponent::UP _component;
- framework::Thread::UP _thread;
- spi::Context _context;
+ framework::Thread::UP _thread;
+ spi::Context _context;
std::unique_ptr<BucketOwnershipNotifier> _bucketOwnershipNotifier;
+ vespalib::Monitor _flushMonitor;
+ bool _closed;
- vespalib::Monitor _flushMonitor;
- bool _closed;
-
- void setBucketInfo(MessageTracker& tracker,
- const document::BucketId& bucketId);
+ void setBucketInfo(MessageTracker& tracker, const document::BucketId& bucketId);
- bool checkProviderBucketInfoMatches(const spi::Bucket&,
- const api::BucketInfo&) const;
+ bool checkProviderBucketInfoMatches(const spi::Bucket&, const api::BucketInfo&) const;
- void updateBucketDatabase(const document::BucketId& id,
- const api::BucketInfo& info);
+ void updateBucketDatabase(const document::BucketId& id, const api::BucketInfo& info);
/**
* Sanity-checking of join command parameters. Invokes tracker.fail() with
* an appropriate error and returns false iff the command does not validate
* OK. Returns true and does not touch the tracker otherwise.
*/
- bool validateJoinCommand(const api::JoinBucketsCommand& cmd,
- MessageTracker& tracker) const;
+ bool validateJoinCommand(const api::JoinBucketsCommand& cmd, MessageTracker& tracker) const;
// Message handling functions
MessageTracker::UP handleCommand(api::StorageCommand&);
@@ -102,8 +87,7 @@ private:
bool checkForError(const spi::Result& response, MessageTracker& tracker);
spi::Bucket getBucket(const DocumentId& id, const BucketId& bucket) const;
- void flushAllReplies(const document::BucketId& bucketId,
- std::vector<MessageTracker::UP>& trackers);
+ void flushAllReplies(const document::BucketId& bucketId, std::vector<MessageTracker::UP>& trackers);
friend class TestAndSetHelper;
bool tasConditionExists(const api::TestAndSetCommand & cmd);
diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
index e2861ef42cd..9faf380d2ba 100644
--- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp
+++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
@@ -28,13 +28,11 @@ using StorServerConfigBuilder = vespa::config::content::core::StorServerConfigBu
namespace storage {
-ServiceLayerNode::ServiceLayerNode(
- const config::ConfigUri & configUri,
- ServiceLayerNodeContext& context,
- ApplicationGenerationFetcher& generationFetcher,
- spi::PersistenceProvider& persistenceProvider,
- const VisitorFactory::Map& externalVisitors)
- : StorageNode(configUri, context, generationFetcher, std::unique_ptr<HostInfo>(new HostInfo)),
+ServiceLayerNode::ServiceLayerNode(const config::ConfigUri & configUri, ServiceLayerNodeContext& context,
+ ApplicationGenerationFetcher& generationFetcher,
+ spi::PersistenceProvider& persistenceProvider,
+ const VisitorFactory::Map& externalVisitors)
+ : StorageNode(configUri, context, generationFetcher, std::make_unique<HostInfo>()),
_context(context),
_persistenceProvider(persistenceProvider),
_partitions(0),
diff --git a/storage/src/vespa/storage/storageserver/storagenodecontext.cpp b/storage/src/vespa/storage/storageserver/storagenodecontext.cpp
index 5f4ceff2a6b..2e1aa52e68d 100644
--- a/storage/src/vespa/storage/storageserver/storagenodecontext.cpp
+++ b/storage/src/vespa/storage/storageserver/storagenodecontext.cpp
@@ -4,16 +4,16 @@
#include <vespa/storageframework/defaultimplementation/memory/prioritymemorylogic.h>
-using storage::framework::defaultimplementation::AllocationLogic;
-
namespace storage {
+using framework::defaultimplementation::AllocationLogic;
+using framework::defaultimplementation::PriorityMemoryLogic;
+
StorageNodeContext::StorageNodeContext(ComponentRegister::UP compReg, framework::Clock::UP clock)
: _componentRegister(std::move(compReg)),
_clock(std::move(clock)),
_threadPool(*_clock),
- _memoryLogic(new framework::defaultimplementation::PriorityMemoryLogic(
- *_clock, 1024 * 1024 * 1024)),
+ _memoryLogic(new PriorityMemoryLogic(*_clock, 1024 * 1024 * 1024)),
_memoryManager(AllocationLogic::UP(_memoryLogic))
{
_componentRegister->setClock(*_clock);
diff --git a/storage/src/vespa/storage/storageserver/storagenodecontext.h b/storage/src/vespa/storage/storageserver/storagenodecontext.h
index 7d6f3b0aef5..0149f975f63 100644
--- a/storage/src/vespa/storage/storageserver/storagenodecontext.h
+++ b/storage/src/vespa/storage/storageserver/storagenodecontext.h
@@ -53,8 +53,7 @@ struct StorageNodeContext {
protected:
// Initialization has been split in two as subclass needs to initialize
// component register before sending it on.
- StorageNodeContext(ComponentRegister::UP,
- framework::Clock::UP);
+ StorageNodeContext(ComponentRegister::UP, framework::Clock::UP);
private:
ComponentRegister::UP _componentRegister;
diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h
index bdc491a5da6..798251f0573 100644
--- a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h
+++ b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadimpl.h
@@ -49,12 +49,8 @@ class ThreadImpl : public Thread
void run();
public:
- ThreadImpl(ThreadPoolImpl&,
- Runnable&,
- vespalib::stringref id,
- uint64_t waitTimeMs,
- uint64_t maxProcessTimeMs,
- int ticksBeforeWait);
+ ThreadImpl(ThreadPoolImpl&, Runnable&, vespalib::stringref id, uint64_t waitTimeMs,
+ uint64_t maxProcessTimeMs, int ticksBeforeWait);
~ThreadImpl();
bool interrupted() const override;
diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp
index eae02c71cfb..09c805b2b85 100644
--- a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp
+++ b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.cpp
@@ -3,12 +3,12 @@
#include "threadpoolimpl.h"
#include "threadimpl.h"
#include <vespa/vespalib/util/exceptions.h>
+#include <thread>
+using namespace std::chrono_literals;
using vespalib::IllegalStateException;
-namespace storage {
-namespace framework {
-namespace defaultimplementation {
+namespace storage::framework::defaultimplementation {
ThreadPoolImpl::ThreadPoolImpl(Clock& clock)
: _backendThreadPool(512 * 1024),
@@ -21,11 +21,11 @@ ThreadPoolImpl::~ThreadPoolImpl()
{
vespalib::LockGuard lock(_threadVectorLock);
_stopping = true;
- for (uint32_t i=0, n=_threads.size(); i<n; ++i) {
- _threads[i]->interrupt();
+ for (ThreadImpl * thread : _threads) {
+ thread->interrupt();
}
- for (uint32_t i=0, n=_threads.size(); i<n; ++i) {
- _threads[i]->join();
+ for (ThreadImpl * thread : _threads) {
+ thread->join();
}
}
for (uint32_t i=0; true; i+=10) {
@@ -34,30 +34,25 @@ ThreadPoolImpl::~ThreadPoolImpl()
if (_threads.empty()) break;
}
if (i > 1000) {
- fprintf(stderr, "Failed to kill thread pool. Threads won't die. (And "
- "if allowing thread pool object to be deleted this "
- "will create a segfault later)\n");
+ fprintf(stderr, "Failed to kill thread pool. Threads won't die. (And if allowing thread pool object"
+ " to be deleted this will create a segfault later)\n");
abort();
}
- FastOS_Thread::Sleep(10);
+ std::this_thread::sleep_for(10ms);
}
_backendThreadPool.Close();
}
Thread::UP
-ThreadPoolImpl::startThread(Runnable& runnable,
- vespalib::stringref id,
- uint64_t waitTimeMs,
- uint64_t maxProcessTime,
- int ticksBeforeWait)
+ThreadPoolImpl::startThread(Runnable& runnable, vespalib::stringref id, uint64_t waitTimeMs,
+ uint64_t maxProcessTime, int ticksBeforeWait)
{
vespalib::LockGuard lock(_threadVectorLock);
if (_stopping) {
- throw vespalib::IllegalStateException("Threadpool is stopping", VESPA_STRLOC);
+ throw IllegalStateException("Threadpool is stopping", VESPA_STRLOC);
}
ThreadImpl* ti;
- Thread::UP t(ti = new ThreadImpl(
- *this, runnable, id, waitTimeMs, maxProcessTime, ticksBeforeWait));
+ Thread::UP t(ti = new ThreadImpl(*this, runnable, id, waitTimeMs, maxProcessTime, ticksBeforeWait));
_threads.push_back(ti);
return t;
}
@@ -66,9 +61,8 @@ void
ThreadPoolImpl::visitThreads(ThreadVisitor& visitor) const
{
vespalib::LockGuard lock(_threadVectorLock);
- for (uint32_t i=0, n=_threads.size(); i<n; ++i) {
- visitor.visitThread(_threads[i]->getId(), _threads[i]->getProperties(),
- _threads[i]->getTickData());
+ for (const ThreadImpl * thread : _threads) {
+ visitor.visitThread(thread->getId(), thread->getProperties(), thread->getTickData());
}
}
@@ -78,14 +72,12 @@ ThreadPoolImpl::unregisterThread(ThreadImpl& t)
vespalib::LockGuard lock(_threadVectorLock);
std::vector<ThreadImpl*> threads;
threads.reserve(_threads.size());
- for (uint32_t i=0, n=_threads.size(); i<n; ++i) {
- if (_threads[i] != &t) {
- threads.push_back(_threads[i]);
+ for (ThreadImpl * thread : _threads) {
+ if (thread != &t) {
+ threads.push_back(thread);
}
}
_threads.swap(threads);
}
-} // defaultimplementation
-} // framework
-} // storage
+}
diff --git a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h
index 2b16836eba2..4e973c2ad20 100644
--- a/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h
+++ b/storageframework/src/vespa/storageframework/defaultimplementation/thread/threadpoolimpl.h
@@ -12,11 +12,11 @@ class ThreadImpl;
struct ThreadPoolImpl : public ThreadPool
{
- FastOS_ThreadPool _backendThreadPool;
- std::vector<ThreadImpl*> _threads;
- vespalib::Lock _threadVectorLock;
- Clock& _clock;
- bool _stopping;
+ FastOS_ThreadPool _backendThreadPool;
+ std::vector<ThreadImpl*> _threads;
+ vespalib::Lock _threadVectorLock;
+ Clock & _clock;
+ bool _stopping;
public:
ThreadPoolImpl(Clock&);
diff --git a/storageframework/src/vespa/storageframework/generic/component/component.cpp b/storageframework/src/vespa/storageframework/generic/component/component.cpp
index a35cad68b00..869df4296ef 100644
--- a/storageframework/src/vespa/storageframework/generic/component/component.cpp
+++ b/storageframework/src/vespa/storageframework/generic/component/component.cpp
@@ -33,9 +33,7 @@ Component::Component(ComponentRegister& cr, vespalib::stringref name)
cr.registerComponent(*this);
}
-Component::~Component()
-{
-}
+Component::~Component() = default;
void
Component::registerComponentStateListener(ComponentStateListener& l)
@@ -67,8 +65,7 @@ Component::registerMetricUpdateHook(MetricUpdateHook& hook, SecondTime period)
assert(_metricUpdateHook.first == 0);
_metricUpdateHook = std::make_pair(&hook, period);
if (_metricReg != 0) {
- _metricReg->registerUpdateHook(
- _name, *_metricUpdateHook.first, _metricUpdateHook.second);
+ _metricReg->registerUpdateHook(_name, *_metricUpdateHook.first, _metricUpdateHook.second);
}
}
@@ -86,8 +83,7 @@ void
Component::setMetricRegistrator(MetricRegistrator& mr) {
_metricReg = &mr;
if (_metricUpdateHook.first != 0) {
- _metricReg->registerUpdateHook(
- _name, *_metricUpdateHook.first, _metricUpdateHook.second);
+ _metricReg->registerUpdateHook(_name, *_metricUpdateHook.first, _metricUpdateHook.second);
}
if (_metric != 0) {
_metricReg->registerMetric(*_metric);
@@ -117,16 +113,10 @@ Component::getClock() const
// Helper functions for components wanting to start a single thread.
Thread::UP
-Component::startThread(Runnable& runnable,
- MilliSecTime waitTime,
- MilliSecTime maxProcessTime,
- int ticksBeforeWait)
+Component::startThread(Runnable& runnable, MilliSecTime waitTime, MilliSecTime maxProcessTime, int ticksBeforeWait)
{
- return getThreadPool().startThread(runnable,
- getName(),
- waitTime.getTime(),
- maxProcessTime.getTime(),
- ticksBeforeWait);
+ return getThreadPool().startThread(runnable, getName(), waitTime.getTime(),
+ maxProcessTime.getTime(), ticksBeforeWait);
}
void
diff --git a/storageframework/src/vespa/storageframework/generic/component/component.h b/storageframework/src/vespa/storageframework/generic/component/component.h
index 8a65d186557..b16e31290b8 100644
--- a/storageframework/src/vespa/storageframework/generic/component/component.h
+++ b/storageframework/src/vespa/storageframework/generic/component/component.h
@@ -79,7 +79,7 @@ namespace storage::framework {
class ComponentRegister;
struct ComponentStateListener {
- virtual ~ComponentStateListener() {}
+ virtual ~ComponentStateListener() = default;
virtual void onOpen() {}
virtual void onClose() {}
diff --git a/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp b/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp
index 20868e79fce..2ca29223254 100644
--- a/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp
+++ b/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp
@@ -17,7 +17,7 @@ void
ServiceLayerProcess::shutdown()
{
Process::shutdown();
- _node.reset(0);
+ _node.reset();
}
void
@@ -25,7 +25,7 @@ ServiceLayerProcess::createNode()
{
_externalVisitors["searchvisitor"].reset(new SearchVisitorFactory(_configUri));
setupProvider();
- _node.reset(new ServiceLayerNode(_configUri, _context, *this, getProvider(), _externalVisitors));
+ _node = std::make_unique<ServiceLayerNode>(_configUri, _context, *this, getProvider(), _externalVisitors);
_node->init();
}
diff --git a/storageserver/src/vespa/storageserver/app/servicelayerprocess.h b/storageserver/src/vespa/storageserver/app/servicelayerprocess.h
index 0a002db114b..af991c00f03 100644
--- a/storageserver/src/vespa/storageserver/app/servicelayerprocess.h
+++ b/storageserver/src/vespa/storageserver/app/servicelayerprocess.h
@@ -19,9 +19,8 @@
#include "process.h"
#include <vespa/storage/storageserver/servicelayernodecontext.h>
#include <vespa/storage/common/visitorfactory.h>
-#include <vespa/config/config.h>
-#include <vespa/config/helper/configfetcher.h>
-#include <vespa/config-persistence.h>
+
+namespace config { class ConfigUri; }
namespace storage {
diff --git a/vespalog/src/test/bufferedlogtest.cpp b/vespalog/src/test/bufferedlogtest.cpp
index 3e974057361..9b69d9d3a14 100644
--- a/vespalog/src/test/bufferedlogtest.cpp
+++ b/vespalog/src/test/bufferedlogtest.cpp
@@ -71,12 +71,11 @@ void spamLog2(uint64_t& time, int diff) {
time += diff;
std::ostringstream ost;
ost << "Message " << i;
- LOGBT(info, ost.str(), ost.str().c_str());
+ LOGBT(info, ost.str(), "%s", ost.str().c_str());
}
}
-void testThatEntriesWithHighCountIsKept(const std::string& file,
- uint64_t& timer)
+void testThatEntriesWithHighCountIsKept(const std::string& file, uint64_t& timer)
{
std::cerr << "testThatEntriesWithHighCountIsKept ...\n";
timer = 10 * 1000000 + 4;
diff --git a/vespalog/src/test/bufferedlogtest.logger1.cpp b/vespalog/src/test/bufferedlogtest.logger1.cpp
index 24b9237a71d..dde5f2edbb5 100644
--- a/vespalog/src/test/bufferedlogtest.logger1.cpp
+++ b/vespalog/src/test/bufferedlogtest.logger1.cpp
@@ -5,7 +5,7 @@
LOG_SETUP(".logger1");
-void logWithLogger1(const std::string& token, const std::string message)
+void logWithLogger1(const std::string& token, const std::string & message)
{
- LOGBT(info, token, message.c_str());
+ LOGBT(info, token, "%s", message.c_str());
}
diff --git a/vespalog/src/test/bufferedlogtest.logger1.h b/vespalog/src/test/bufferedlogtest.logger1.h
index acbd67e8c3e..1ec8f9acad4 100644
--- a/vespalog/src/test/bufferedlogtest.logger1.h
+++ b/vespalog/src/test/bufferedlogtest.logger1.h
@@ -4,5 +4,5 @@
#include <string>
-void logWithLogger1(const std::string& token, const std::string message);
+void logWithLogger1(const std::string& token, const std::string & message);
diff --git a/vespalog/src/test/bufferedlogtest.logger2.cpp b/vespalog/src/test/bufferedlogtest.logger2.cpp
index 8d6ca97130e..992eeb70481 100644
--- a/vespalog/src/test/bufferedlogtest.logger2.cpp
+++ b/vespalog/src/test/bufferedlogtest.logger2.cpp
@@ -5,7 +5,7 @@
LOG_SETUP(".logger2");
-void logWithLogger2(const std::string& token, const std::string message)
+void logWithLogger2(const std::string& token, const std::string & message)
{
- LOGBT(info, token, message.c_str());
+ LOGBT(info, token, "%s", message.c_str());
}
diff --git a/vespalog/src/test/bufferedlogtest.logger2.h b/vespalog/src/test/bufferedlogtest.logger2.h
index 398d799a2b3..cd88d8ea0be 100644
--- a/vespalog/src/test/bufferedlogtest.logger2.h
+++ b/vespalog/src/test/bufferedlogtest.logger2.h
@@ -4,5 +4,5 @@
#include <string>
-void logWithLogger2(const std::string& token, const std::string message);
+void logWithLogger2(const std::string& token, const std::string & message);
diff --git a/vespalog/src/vespa/log/bufferedlogger.h b/vespalog/src/vespa/log/bufferedlogger.h
index 64419311a17..9081ab8f9cf 100644
--- a/vespalog/src/vespa/log/bufferedlogger.h
+++ b/vespalog/src/vespa/log/bufferedlogger.h
@@ -102,7 +102,7 @@
"", __VA_ARGS__); \
} \
} \
- } while (0)
+ } while (false)
#endif
// Define LOGBM macro for logging buffered, using the message itself as a
@@ -121,7 +121,7 @@
"", __VA_ARGS__); \
} \
} \
- } while (0)
+ } while (false)
// Define LOGBP macro for logging buffered, using the call point as token.
// (File/line of macro caller)
@@ -140,7 +140,7 @@
__FILE__, __LINE__, ost123.str(), ##ARGS); \
} \
} \
- } while (0)
+ } while (false)
// Define LOGT calls for using the buffer specifically stating token
#define LOGBT(level, token, ...) \
@@ -156,7 +156,7 @@
__FILE__, __LINE__, token, __VA_ARGS__); \
} \
} \
- } while (0)
+ } while (false)
#define LOGB_FLUSH() \
ns_log::BufferedLogger::logger.flush()
diff --git a/vespalog/src/vespa/log/log.cpp b/vespalog/src/vespa/log/log.cpp
index 7d98c22957d..193f4230502 100644
--- a/vespalog/src/vespa/log/log.cpp
+++ b/vespalog/src/vespa/log/log.cpp
@@ -23,7 +23,7 @@ namespace ns_log {
uint64_t Timer::getTimestamp() const {
struct timeval tv;
- gettimeofday(&tv, NULL);
+ gettimeofday(&tv, nullptr);
uint64_t timestamp = tv.tv_sec;
timestamp *= 1000000;
timestamp += tv.tv_usec;
@@ -164,16 +164,16 @@ Logger::~Logger()
{
_numInstances--;
if ((_numInstances == 1)) {
- if (logger != NULL) {
+ if (logger != nullptr) {
logger->~Logger();
free(logger);
- logger = NULL;
+ logger = nullptr;
}
} else if (_numInstances == 0) {
delete _controlFile;
logInitialised = false;
delete _target;
- _target = NULL;
+ _target = nullptr;
}
}
@@ -390,13 +390,6 @@ Logger::doEventValue(const char *name, double value)
}
void
-Logger::doEventCollection(uint64_t collectionId, const char* name, const char* params)
-{
- doLog(event, "", 0, "collection/1 collectionId=%lu name=\"%s\" %s",
- collectionId, name, params);
-}
-
-void
Logger::doEventState(const char *name, const char *value)
{
doLog(event, "", 0, "state/1 name=\"%s\" value=\"%s\"", name, value);
diff --git a/vespalog/src/vespa/log/log.h b/vespalog/src/vespa/log/log.h
index 127baf14d55..de0b9809e35 100644
--- a/vespalog/src/vespa/log/log.h
+++ b/vespalog/src/vespa/log/log.h
@@ -39,13 +39,13 @@ do { \
if (logger.wants(ns_log::Logger::level)) { \
logger.doLog(ns_log::Logger::level, __FILE__, __LINE__, __VA_ARGS__); \
} \
-} while (0)
+} while (false)
#define VLOG(level, ...) \
do { \
if (logger.wants(level)) { \
logger.doLog(level, __FILE__, __LINE__, __VA_ARGS__); \
} \
-} while (0)
+} while (false)
#endif
// Must use placement new in the following definition, since the variable
@@ -64,7 +64,7 @@ do {
if (logger->wants(ns_log::Logger::level)) { \
logger->doLog(ns_log::Logger::level, __FILE__, __LINE__, __VA_ARGS__); \
} \
-} while (0)
+} while (false)
#define LOG_WOULD_LOG(level) logger.wants(ns_log::Logger::level)
#define LOG_WOULD_VLOG(level) logger.wants(level)
@@ -74,84 +74,77 @@ do { \
if (logger.wants(ns_log::Logger::event)) { \
logger.doEventStarting(name); \
} \
-} while (0)
+} while (false)
#define EV_STOPPING(name,why) \
do { \
if (logger.wants(ns_log::Logger::event)) { \
logger.doEventStopping(name, why); \
} \
-} while (0)
+} while (false)
#define EV_STARTED(name) \
do { \
if (logger.wants(ns_log::Logger::event)) { \
logger.doEventStarted(name); \
} \
-} while (0)
+} while (false)
#define EV_STOPPED(name,pid,exitcode) \
do { \
if (logger.wants(ns_log::Logger::event)) { \
logger.doEventStopped(name, pid, exitcode); \
} \
-} while (0)
+} while (false)
#define EV_RELOADING(name) \
do { \
if (logger.wants(ns_log::Logger::event)) { \
logger.doEventReloading(name); \
} \
-} while (0)
+} while (false)
#define EV_RELOADED(name) \
do { \
if (logger.wants(ns_log::Logger::event)) { \
logger.doEventReloaded(name); \
} \
-} while (0)
+} while (false)
#define EV_CRASH(name,pid,signal) \
do { \
if (logger.wants(ns_log::Logger::event)) { \
logger.doEventCrash(name, pid, signal); \
} \
-} while (0)
+} while (false)
#define EV_PROGRESS(name, ...) \
do { \
if (logger.wants(ns_log::Logger::event)) { \
logger.doEventProgress(name, __VA_ARGS__); \
} \
-} while (0)
+} while (false)
#define EV_COUNT(name,value) \
do { \
if (logger.wants(ns_log::Logger::event)) { \
logger.doEventCount(name, value); \
} \
-} while (0)
+} while (false)
#define EV_VALUE(name,value) \
do { \
if (logger.wants(ns_log::Logger::event)) { \
logger.doEventValue(name, value); \
} \
-} while (0)
-
-#define EV_COLLECTION(collectionId,name,params) \
-do { \
- if (logger.wants(ns_log::Logger::event)) { \
- logger.doEventCollection(collectionId, name, params); \
- } \
-} while (0)
+} while (false)
#define EV_STATE(name,value) \
do { \
if (logger.wants(ns_log::Logger::event)) { \
logger.doEventState(name, value); \
} \
-} while (0)
+} while (false)
namespace ns_log {
@@ -161,7 +154,7 @@ class ControlFile;
// XXX this is way too complicated, must be some simpler way to do this
/** Timer class used to retrieve timestamp, such that we can override in test */
struct Timer {
- virtual ~Timer() {}
+ virtual ~Timer() = default;
virtual uint64_t getTimestamp() const;
};
@@ -187,7 +180,6 @@ public:
static bool fakePid;
private:
- Logger();
Logger(const Logger &);
Logger& operator =(const Logger &);
@@ -205,7 +197,6 @@ private:
static ControlFile *_controlFile;
static void setTarget();
- void makeLockFile();
char _appendix[256];
@@ -220,8 +211,7 @@ private:
const char *fmt, va_list args);
public:
~Logger();
- explicit Logger(const char *name, const char *rcsId = 0);
- static Logger& getLogger(const char *name);
+ explicit Logger(const char *name, const char *rcsId = nullptr);
int setRcsId(const char *rcsId);
static const char *levelName(LogLevel level);
@@ -247,8 +237,6 @@ public:
void doEventProgress(const char *name, double value, double total = 0);
void doEventCount(const char *name, uint64_t value);
void doEventValue(const char *name, double value);
- void doEventCollection(uint64_t collectionId, const char *name,
- const char *params);
void doEventState(const char *name, const char *value);
// Only for unit testing