summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@oath.com>2020-10-02 13:11:53 +0000
committerHåvard Pettersen <havardpe@oath.com>2020-10-02 13:11:53 +0000
commit8c8c529e6fea1d0e8869dfb74881ca15ca9a89a9 (patch)
treee84e06c8d3ac29cc829511bbf3dc93576d4596df
parent625e97105d691c417de78826c3f947926215147e (diff)
parentc67283d48af378f25cf1bd3ed8e578d0e529813f (diff)
Merge branch 'master' into havardpe/generic-reduce
fixed Conflicts: eval/CMakeLists.txt eval/src/vespa/eval/instruction/CMakeLists.txt
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java21
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java51
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperTestServer.java8
-rw-r--r--eval/CMakeLists.txt1
-rw-r--r--eval/src/tests/instruction/generic_merge/CMakeLists.txt9
-rw-r--r--eval/src/tests/instruction/generic_merge/generic_merge_test.cpp82
-rw-r--r--eval/src/vespa/eval/eval/test/tensor_model.hpp2
-rw-r--r--eval/src/vespa/eval/instruction/CMakeLists.txt1
-rw-r--r--eval/src/vespa/eval/instruction/generic_merge.cpp147
-rw-r--r--eval/src/vespa/eval/instruction/generic_merge.h15
-rw-r--r--eval/src/vespa/eval/tensor/serialization/sparse_binary_format.cpp27
-rw-r--r--node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/Activator.java3
-rw-r--r--node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodePrioritizer.java14
-rw-r--r--node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/DockerProvisioningTest.java148
-rw-r--r--node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/LoadBalancerProvisionerTest.java35
-rw-r--r--node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/VirtualNodeProvisioningTest.java122
-rw-r--r--searchcore/src/apps/vespa-feed-bm/CMakeLists.txt6
-rw-r--r--searchcore/src/apps/vespa-feed-bm/bm_message_bus.cpp180
-rw-r--r--searchcore/src/apps/vespa-feed-bm/bm_message_bus.h42
-rw-r--r--searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp33
-rw-r--r--searchcore/src/apps/vespa-feed-bm/bm_storage_link.h8
-rw-r--r--searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp82
-rw-r--r--searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h37
-rw-r--r--searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.cpp43
-rw-r--r--searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.h26
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp2
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h4
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp84
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h41
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp32
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h4
-rw-r--r--searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.h1
-rw-r--r--searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp135
-rw-r--r--storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp15
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h1
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp53
-rw-r--r--storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h3
37 files changed, 1169 insertions, 349 deletions
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java
index 5c910983bc9..1587e2696b8 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java
@@ -54,7 +54,7 @@ import static org.junit.Assert.fail;
*/
public abstract class FleetControllerTest implements Waiter {
- private static Logger log = Logger.getLogger(FleetControllerTest.class.getName());
+ private static final Logger log = Logger.getLogger(FleetControllerTest.class.getName());
private static final int DEFAULT_NODE_COUNT = 10;
Supervisor supervisor;
@@ -342,10 +342,6 @@ public abstract class FleetControllerTest implements Waiter {
fleetController.waitForCompleteCycle(timeoutMS);
}
- protected void verifyNodeEvents(Node n, String exp) {
- verifyNodeEvents(n, exp, null);
- }
-
private static class ExpectLine {
Pattern regex;
int matchedCount = 0;
@@ -390,17 +386,15 @@ public abstract class FleetControllerTest implements Waiter {
* <li>The rest of the line is a regular expression.
* </ul>
*/
- private void verifyNodeEvents(Node n, String exp, String ignoreRegex) {
- Pattern ignorePattern = (ignoreRegex == null ? null : Pattern.compile(ignoreRegex));
+ protected void verifyNodeEvents(Node n, String exp) {
List<NodeEvent> events = fleetController.getNodeEvents(n);
String[] expectLines = exp.split("\n");
- List<ExpectLine> expected = new ArrayList<ExpectLine>();
+ List<ExpectLine> expected = new ArrayList<>();
for (String line : expectLines) {
expected.add(new ExpectLine(line));
}
boolean mismatch = false;
- StringBuilder eventLog = new StringBuilder();
StringBuilder errors = new StringBuilder();
int gotno = 0;
@@ -413,16 +407,10 @@ public abstract class FleetControllerTest implements Waiter {
eventLine = e.toString();
}
- if (ignorePattern != null && ignorePattern.matcher(eventLine).matches()) {
- ++gotno;
- continue;
- }
-
ExpectLine pattern = null;
if (expno < expected.size()) {
pattern = expected.get(expno);
}
- eventLog.append(eventLine).append("\n");
if (pattern == null) {
errors.append("Exhausted expected list before matching event " + gotno
@@ -456,9 +444,6 @@ public abstract class FleetControllerTest implements Waiter {
StringBuilder eventsGotten = new StringBuilder();
for (Event e : events) {
String eventLine = e.toString();
- if (ignorePattern != null && ignorePattern.matcher(eventLine).matches()) {
- continue;
- }
eventsGotten.append(eventLine).append("\n");
}
errors.append("\nExpected events matching:\n" + exp + "\n");
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java
index 557765ca761..d14f6701288 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java
@@ -16,6 +16,7 @@ import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
+import org.junit.rules.Timeout;
import java.util.ArrayList;
import java.util.List;
@@ -31,14 +32,17 @@ import static org.junit.Assert.assertTrue;
public class MasterElectionTest extends FleetControllerTest {
- private static Logger log = Logger.getLogger(MasterElectionTest.class.getName());
+ private static final Logger log = Logger.getLogger(MasterElectionTest.class.getName());
private Supervisor supervisor;
- private List<FleetController> fleetControllers = new ArrayList<>();
+ private final List<FleetController> fleetControllers = new ArrayList<>();
@Rule
public TestRule cleanupZookeeperLogsOnSuccess = new CleanupZookeeperLogsOnSuccess();
+ @Rule
+ public Timeout globalTimeout= Timeout.seconds(120);
+
private static int defaultZkSessionTimeoutInMillis() { return 30_000; }
protected void setUpFleetController(int count, boolean useFakeTimer, FleetControllerOptions options) throws Exception {
@@ -74,12 +78,12 @@ public class MasterElectionTest extends FleetControllerTest {
private void waitForZookeeperDisconnected() throws TimeoutException {
long maxTime = System.currentTimeMillis() + timeoutMS;
- for(FleetController f : fleetControllers) {
- while (true) {
- if (!f.hasZookeeperConnection()) break;
+ for (FleetController f : fleetControllers) {
+ while (f.hasZookeeperConnection()) {
timer.advanceTime(1000);
- try{ Thread.sleep(1); } catch (InterruptedException e) {}
- if (System.currentTimeMillis() > maxTime) throw new TimeoutException("Failed to notice zookeeper down within timeout of " + timeoutMS + " ms");
+ try { Thread.sleep(1); } catch (InterruptedException e) {}
+ if (System.currentTimeMillis() > maxTime)
+ throw new TimeoutException("Failed to notice zookeeper down within timeout of " + timeoutMS + " ms");
}
}
waitForCompleteCycles();
@@ -226,24 +230,20 @@ public class MasterElectionTest extends FleetControllerTest {
startingTest("MasterElectionTest::testClusterStateVersionIncreasesAcrossMasterElections");
FleetControllerOptions options = defaultOptions("mycluster");
options.masterZooKeeperCooldownPeriod = 1;
- setUpFleetController(5, false, options);
+ setUpFleetController(3, false, options);
// Currently need to have content nodes present for the cluster controller to even bother
// attempting to persisting its cluster state version to ZK.
setUpVdsNodes(false, new DummyVdsNodeOptions());
fleetController = fleetControllers.get(0); // Required to prevent waitForStableSystem from NPE'ing
waitForStableSystem();
waitForMaster(0);
- Stream.of(0, 1, 2, 3, 4).forEach(this::waitForCompleteCycle);
+ Stream.of(0, 1, 2).forEach(this::waitForCompleteCycle);
StrictlyIncreasingVersionChecker checker = StrictlyIncreasingVersionChecker.bootstrappedWith(
fleetControllers.get(0).getClusterState());
fleetControllers.get(0).shutdown();
waitForMaster(1);
- Stream.of(1, 2, 3, 4).forEach(this::waitForCompleteCycle);
+ Stream.of(1, 2).forEach(this::waitForCompleteCycle);
checker.updateAndVerify(fleetControllers.get(1).getClusterState());
- fleetControllers.get(1).shutdown();
- waitForMaster(2); // Still a quorum available
- Stream.of(2, 3, 4).forEach(this::waitForCompleteCycle);
- checker.updateAndVerify(fleetControllers.get(2).getClusterState());
}
@Test
@@ -275,7 +275,7 @@ public class MasterElectionTest extends FleetControllerTest {
FleetControllerOptions options = defaultOptions("mycluster");
options.masterZooKeeperCooldownPeriod = 100;
options.zooKeeperServerAddress = "localhost";
- setUpFleetController(5, false, options);
+ setUpFleetController(3, false, options);
waitForMaster(0);
log.log(Level.INFO, "STOPPING ZOOKEEPER SERVER AT " + zooKeeperServer.getAddress());
@@ -329,12 +329,21 @@ public class MasterElectionTest extends FleetControllerTest {
long endTime = System.currentTimeMillis() + timeoutMS;
while (System.currentTimeMillis() < endTime) {
boolean allOk = true;
- for (int i=0; i<nodes.length; ++i) {
+ for (int node : nodes) {
Request req = new Request("getMaster");
- connections.get(nodes[i]).invokeSync(req, FleetControllerTest.timeoutS);
- if (req.isError()) { allOk = false; break; }
- if (master != null && master != req.returnValues().get(0).asInt32()) { allOk = false; break; }
- if (reason != null && !reason.equals(req.returnValues().get(1).asString())) { allOk = false; break; }
+ connections.get(node).invokeSync(req, FleetControllerTest.timeoutS);
+ if (req.isError()) {
+ allOk = false;
+ break;
+ }
+ if (master != null && master != req.returnValues().get(0).asInt32()) {
+ allOk = false;
+ break;
+ }
+ if (reason != null && ! reason.equals(req.returnValues().get(1).asString())) {
+ allOk = false;
+ break;
+ }
}
if (allOk) return;
try{ Thread.sleep(100); } catch (InterruptedException e) {}
@@ -354,7 +363,7 @@ public class MasterElectionTest extends FleetControllerTest {
waitForMaster(0);
supervisor = new Supervisor(new Transport());
- List<Target> connections = new ArrayList<Target>();
+ List<Target> connections = new ArrayList<>();
for (FleetController fleetController : fleetControllers) {
int rpcPort = fleetController.getRpcPort();
Target connection = supervisor.connect(new Spec("localhost", rpcPort));
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperTestServer.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperTestServer.java
index 73b4163d542..7861db249bd 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperTestServer.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperTestServer.java
@@ -14,10 +14,10 @@ import java.time.Duration;
* This class sets up a zookeeper server, such that we can test fleetcontroller zookeeper parts without stubbing in the client.
*/
public class ZooKeeperTestServer {
- private File zooKeeperDir;
- private ZooKeeperServer server;
+ private final File zooKeeperDir;
+ private final ZooKeeperServer server;
private static final Duration tickTime = Duration.ofMillis(2000);
- private NIOServerCnxnFactory factory;
+ private final NIOServerCnxnFactory factory;
private static final String DIR_PREFIX = "test_fltctrl_zk";
private static final String DIR_POSTFIX = "sdir";
@@ -39,7 +39,7 @@ public class ZooKeeperTestServer {
try{
factory.startup(server);
} catch (InterruptedException e) {
- throw (RuntimeException) new IllegalStateException("Interrupted during test startup: ").initCause(e);
+ throw new IllegalStateException("Interrupted during test startup: ", e);
}
}
diff --git a/eval/CMakeLists.txt b/eval/CMakeLists.txt
index 2c6a05c7391..b8cc77b8290 100644
--- a/eval/CMakeLists.txt
+++ b/eval/CMakeLists.txt
@@ -35,6 +35,7 @@ vespa_define_module(
src/tests/gp/ponder_nov2017
src/tests/instruction/generic_join
src/tests/instruction/generic_reduce
+ src/tests/instruction/generic_merge
src/tests/instruction/generic_rename
src/tests/tensor/default_value_builder_factory
src/tests/tensor/dense_add_dimension_optimizer
diff --git a/eval/src/tests/instruction/generic_merge/CMakeLists.txt b/eval/src/tests/instruction/generic_merge/CMakeLists.txt
new file mode 100644
index 00000000000..154b04cb32f
--- /dev/null
+++ b/eval/src/tests/instruction/generic_merge/CMakeLists.txt
@@ -0,0 +1,9 @@
+# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(eval_generic_merge_test_app TEST
+ SOURCES
+ generic_merge_test.cpp
+ DEPENDS
+ vespaeval
+ GTest::GTest
+)
+vespa_add_test(NAME eval_generic_merge_test_app COMMAND eval_generic_merge_test_app)
diff --git a/eval/src/tests/instruction/generic_merge/generic_merge_test.cpp b/eval/src/tests/instruction/generic_merge/generic_merge_test.cpp
new file mode 100644
index 00000000000..501d5410b87
--- /dev/null
+++ b/eval/src/tests/instruction/generic_merge/generic_merge_test.cpp
@@ -0,0 +1,82 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/eval/eval/simple_value.h>
+#include <vespa/eval/eval/value_codec.h>
+#include <vespa/eval/instruction/generic_merge.h>
+#include <vespa/eval/eval/interpreted_function.h>
+#include <vespa/eval/eval/test/tensor_model.hpp>
+#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/vespalib/gtest/gtest.h>
+#include <optional>
+
+using namespace vespalib;
+using namespace vespalib::eval;
+using namespace vespalib::eval::instruction;
+using namespace vespalib::eval::test;
+
+using vespalib::make_string_short::fmt;
+
+std::vector<Layout> merge_layouts = {
+ {}, {},
+ {x(5)}, {x(5)},
+ {x(3),y(5)}, {x(3),y(5)},
+ float_cells({x(3),y(5)}), {x(3),y(5)},
+ {x(3),y(5)}, float_cells({x(3),y(5)}),
+ {x({"a","b","c"})}, {x({"a","b","c"})},
+ {x({"a","b","c"})}, {x({"c","d","e"})},
+ {x({"a","c","e"})}, {x({"b","c","d"})},
+ {x({"b","c","d"})}, {x({"a","c","e"})},
+ {x({"a","b","c"})}, {x({"c","d"})},
+ {x({"a","b"}),y({"foo","bar","baz"})}, {x({"b","c"}),y({"any","foo","bar"})},
+ {x(3),y({"foo", "bar"})}, {x(3),y({"baz", "bar"})},
+ {x({"a","b","c"}),y(5)}, {x({"b","c","d"}),y(5)}
+};
+
+
+TensorSpec reference_merge(const TensorSpec &a, const TensorSpec &b, join_fun_t fun) {
+ ValueType res_type = ValueType::merge(ValueType::from_spec(a.type()),
+ ValueType::from_spec(b.type()));
+ EXPECT_FALSE(res_type.is_error());
+ TensorSpec result(res_type.to_spec());
+ for (const auto &cell: a.cells()) {
+ auto other = b.cells().find(cell.first);
+ if (other == b.cells().end()) {
+ result.add(cell.first, cell.second);
+ } else {
+ result.add(cell.first, fun(cell.second, other->second));
+ }
+ }
+ for (const auto &cell: b.cells()) {
+ auto other = a.cells().find(cell.first);
+ if (other == a.cells().end()) {
+ result.add(cell.first, cell.second);
+ }
+ }
+ return result;
+}
+
+TensorSpec perform_generic_merge(const TensorSpec &a, const TensorSpec &b, join_fun_t fun) {
+ Stash stash;
+ const auto &factory = SimpleValueBuilderFactory::get();
+ auto lhs = value_from_spec(a, factory);
+ auto rhs = value_from_spec(b, factory);
+ auto my_op = GenericMerge::make_instruction(lhs->type(), rhs->type(), fun, factory, stash);
+ InterpretedFunction::EvalSingle single(my_op);
+ return spec_from_value(single.eval(std::vector<Value::CREF>({*lhs, *rhs})));
+}
+
+TEST(GenericMergeTest, generic_merge_works_for_simple_values) {
+ ASSERT_TRUE((merge_layouts.size() % 2) == 0);
+ for (size_t i = 0; i < merge_layouts.size(); i += 2) {
+ TensorSpec lhs = spec(merge_layouts[i], N());
+ TensorSpec rhs = spec(merge_layouts[i + 1], Div16(N()));
+ SCOPED_TRACE(fmt("\n===\nLHS: %s\nRHS: %s\n===\n", lhs.to_string().c_str(), rhs.to_string().c_str()));
+ for (auto fun: {operation::Add::f, operation::Mul::f, operation::Sub::f, operation::Max::f}) {
+ auto expect = reference_merge(lhs, rhs, fun);
+ auto actual = perform_generic_merge(lhs, rhs, fun);
+ EXPECT_EQ(actual, expect);
+ }
+ }
+}
+
+GTEST_MAIN_RUN_ALL_TESTS()
diff --git a/eval/src/vespa/eval/eval/test/tensor_model.hpp b/eval/src/vespa/eval/eval/test/tensor_model.hpp
index 42f0dc7e996..4e4ef60aaee 100644
--- a/eval/src/vespa/eval/eval/test/tensor_model.hpp
+++ b/eval/src/vespa/eval/eval/test/tensor_model.hpp
@@ -38,7 +38,7 @@ struct Div10 : Sequence {
double operator[](size_t i) const override { return (seq[i] / 10.0); }
};
-// Sequence of another sequence divided by 10
+// Sequence of another sequence divided by 16
struct Div16 : Sequence {
const Sequence &seq;
Div16(const Sequence &seq_in) : seq(seq_in) {}
diff --git a/eval/src/vespa/eval/instruction/CMakeLists.txt b/eval/src/vespa/eval/instruction/CMakeLists.txt
index c84d773af11..91ff4fd63dc 100644
--- a/eval/src/vespa/eval/instruction/CMakeLists.txt
+++ b/eval/src/vespa/eval/instruction/CMakeLists.txt
@@ -4,5 +4,6 @@ vespa_add_library(eval_instruction OBJECT
SOURCES
generic_join
generic_reduce
+ generic_merge
generic_rename
)
diff --git a/eval/src/vespa/eval/instruction/generic_merge.cpp b/eval/src/vespa/eval/instruction/generic_merge.cpp
new file mode 100644
index 00000000000..9d8ac2bb80a
--- /dev/null
+++ b/eval/src/vespa/eval/instruction/generic_merge.cpp
@@ -0,0 +1,147 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "generic_merge.h"
+#include <vespa/eval/eval/inline_operation.h>
+#include <vespa/vespalib/util/stash.h>
+#include <vespa/vespalib/util/typify.h>
+#include <cassert>
+
+namespace vespalib::eval::instruction {
+
+using State = InterpretedFunction::State;
+using Instruction = InterpretedFunction::Instruction;
+
+namespace {
+
+//-----------------------------------------------------------------------------
+
+template <typename T, typename IN> uint64_t wrap_param(const IN &value_in) {
+ const T &value = value_in;
+ static_assert(sizeof(uint64_t) == sizeof(&value));
+ return (uint64_t)&value;
+}
+
+template <typename T> const T &unwrap_param(uint64_t param) {
+ return *((const T *)param);
+}
+
+struct MergeParam {
+ const ValueType res_type;
+ const join_fun_t function;
+ const size_t num_mapped_dimensions;
+ const size_t dense_subspace_size;
+ std::vector<size_t> all_view_dims;
+ const ValueBuilderFactory &factory;
+ MergeParam(const ValueType &lhs_type, const ValueType &rhs_type,
+ join_fun_t function_in, const ValueBuilderFactory &factory_in)
+ : res_type(ValueType::join(lhs_type, rhs_type)),
+ function(function_in),
+ num_mapped_dimensions(lhs_type.count_mapped_dimensions()),
+ dense_subspace_size(lhs_type.dense_subspace_size()),
+ all_view_dims(num_mapped_dimensions),
+ factory(factory_in)
+ {
+ assert(!res_type.is_error());
+ assert(num_mapped_dimensions == rhs_type.count_mapped_dimensions());
+ assert(num_mapped_dimensions == res_type.count_mapped_dimensions());
+ assert(dense_subspace_size == rhs_type.dense_subspace_size());
+ assert(dense_subspace_size == res_type.dense_subspace_size());
+ for (size_t i = 0; i < num_mapped_dimensions; ++i) {
+ all_view_dims[i] = i;
+ }
+ }
+ ~MergeParam();
+};
+MergeParam::~MergeParam() = default;
+
+//-----------------------------------------------------------------------------
+
+template <typename LCT, typename RCT, typename OCT, typename Fun>
+std::unique_ptr<Value>
+generic_mixed_merge(const Value &a, const Value &b,
+ const MergeParam &params)
+{
+ Fun fun(params.function);
+ auto lhs_cells = a.cells().typify<LCT>();
+ auto rhs_cells = b.cells().typify<RCT>();
+ const size_t num_mapped = params.num_mapped_dimensions;
+ const size_t subspace_size = params.dense_subspace_size;
+ size_t guess_subspaces = std::max(a.index().size(), b.index().size());
+ auto builder = params.factory.create_value_builder<OCT>(params.res_type, num_mapped, subspace_size, guess_subspaces);
+ std::vector<vespalib::stringref> address(num_mapped);
+ std::vector<const vespalib::stringref *> addr_cref;
+ std::vector<vespalib::stringref *> addr_ref;
+ for (auto & ref : address) {
+ addr_cref.push_back(&ref);
+ addr_ref.push_back(&ref);
+ }
+ size_t lhs_subspace;
+ size_t rhs_subspace;
+ auto inner = b.index().create_view(params.all_view_dims);
+ auto outer = a.index().create_view({});
+ outer->lookup({});
+ while (outer->next_result(addr_ref, lhs_subspace)) {
+ OCT *dst = builder->add_subspace(address).begin();
+ inner->lookup(addr_cref);
+ if (inner->next_result({}, rhs_subspace)) {
+ const LCT *lhs_src = &lhs_cells[lhs_subspace * subspace_size];
+ const RCT *rhs_src = &rhs_cells[rhs_subspace * subspace_size];
+ for (size_t i = 0; i < subspace_size; ++i) {
+ *dst++ = fun(*lhs_src++, *rhs_src++);
+ }
+ } else {
+ const LCT *src = &lhs_cells[lhs_subspace * subspace_size];
+ for (size_t i = 0; i < subspace_size; ++i) {
+ *dst++ = *src++;
+ }
+ }
+ }
+ inner = a.index().create_view(params.all_view_dims);
+ outer = b.index().create_view({});
+ outer->lookup({});
+ while (outer->next_result(addr_ref, rhs_subspace)) {
+ inner->lookup(addr_cref);
+ if (! inner->next_result({}, lhs_subspace)) {
+ OCT *dst = builder->add_subspace(address).begin();
+ const RCT *src = &rhs_cells[rhs_subspace * subspace_size];
+ for (size_t i = 0; i < subspace_size; ++i) {
+ *dst++ = *src++;
+ }
+ }
+ }
+ return builder->build(std::move(builder));
+}
+
+template <typename LCT, typename RCT, typename OCT, typename Fun>
+void my_mixed_merge_op(State &state, uint64_t param_in) {
+ const auto &param = unwrap_param<MergeParam>(param_in);
+ const Value &lhs = state.peek(1);
+ const Value &rhs = state.peek(0);
+ auto up = generic_mixed_merge<LCT, RCT, OCT, Fun>(lhs, rhs, param);
+ auto &result = state.stash.create<std::unique_ptr<Value>>(std::move(up));
+ const Value &result_ref = *(result.get());
+ state.pop_pop_push(result_ref);
+};
+
+struct SelectGenericMergeOp {
+ template <typename LCT, typename RCT, typename OCT, typename Fun> static auto invoke() {
+ return my_mixed_merge_op<LCT,RCT,OCT,Fun>;
+ }
+};
+
+//-----------------------------------------------------------------------------
+
+} // namespace <unnamed>
+
+using MergeTypify = TypifyValue<TypifyCellType,operation::TypifyOp2>;
+
+Instruction
+GenericMerge::make_instruction(const ValueType &lhs_type, const ValueType &rhs_type, join_fun_t function,
+ const ValueBuilderFactory &factory, Stash &stash)
+{
+ const auto &param = stash.create<MergeParam>(lhs_type, rhs_type, function, factory);
+ auto fun = typify_invoke<4,MergeTypify,SelectGenericMergeOp>(lhs_type.cell_type(), rhs_type.cell_type(), param.res_type.cell_type(), function);
+ return Instruction(fun, wrap_param<MergeParam>(param));
+}
+
+} // namespace
diff --git a/eval/src/vespa/eval/instruction/generic_merge.h b/eval/src/vespa/eval/instruction/generic_merge.h
new file mode 100644
index 00000000000..02e2d18715a
--- /dev/null
+++ b/eval/src/vespa/eval/instruction/generic_merge.h
@@ -0,0 +1,15 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "generic_join.h"
+
+namespace vespalib::eval::instruction {
+
+struct GenericMerge {
+ static InterpretedFunction::Instruction
+ make_instruction(const ValueType &lhs_type, const ValueType &rhs_type, join_fun_t function,
+ const ValueBuilderFactory &factory, Stash &stash);
+};
+
+} // namespace
diff --git a/eval/src/vespa/eval/tensor/serialization/sparse_binary_format.cpp b/eval/src/vespa/eval/tensor/serialization/sparse_binary_format.cpp
index c246ccf16e1..a0b691872a0 100644
--- a/eval/src/vespa/eval/tensor/serialization/sparse_binary_format.cpp
+++ b/eval/src/vespa/eval/tensor/serialization/sparse_binary_format.cpp
@@ -131,6 +131,18 @@ SparseBinaryFormat::serialize(nbostream &stream, const Tensor &tensor)
stream.write(cells.peek(), cells.size());
}
+struct BuildSparseCells {
+ template<typename CT>
+ static auto invoke(ValueType type, nbostream &stream,
+ size_t dimensionsSize,
+ size_t cellsSize)
+ {
+ DirectSparseTensorBuilder<CT> builder(std::move(type));
+ decodeCells<CT>(stream, dimensionsSize, cellsSize, builder);
+ return builder.build();
+ }
+};
+
std::unique_ptr<Tensor>
SparseBinaryFormat::deserialize(nbostream &stream, CellType cell_type)
{
@@ -143,19 +155,8 @@ SparseBinaryFormat::deserialize(nbostream &stream, CellType cell_type)
}
size_t cellsSize = stream.getInt1_4Bytes();
ValueType type = ValueType::tensor_type(std::move(dimensions), cell_type);
- switch (cell_type) {
- case CellType::DOUBLE: {
- DirectSparseTensorBuilder<double> builder(type);
- builder.reserve(cellsSize);
- decodeCells<double>(stream, dimensionsSize, cellsSize, builder);
- return builder.build(); }
- case CellType::FLOAT: {
- DirectSparseTensorBuilder<float> builder(type);
- builder.reserve(cellsSize);
- decodeCells<float>(stream, dimensionsSize, cellsSize, builder);
- return builder.build(); }
- }
- abort();
+ return typify_invoke<1,eval::TypifyCellType,BuildSparseCells>(cell_type,
+ std::move(type), stream, dimensionsSize, cellsSize);
}
} // namespace
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/Activator.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/Activator.java
index 7158ccc57e3..e36d5fa4075 100644
--- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/Activator.java
+++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/Activator.java
@@ -20,7 +20,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
-import java.util.logging.Logger;
import java.util.stream.Collectors;
/**
@@ -30,8 +29,6 @@ import java.util.stream.Collectors;
*/
class Activator {
- private static final Logger logger = Logger.getLogger(Activator.class.getName());
-
private final NodeRepository nodeRepository;
private final Optional<LoadBalancerProvisioner> loadBalancerProvisioner;
diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodePrioritizer.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodePrioritizer.java
index 9925ba65324..b6b05949082 100644
--- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodePrioritizer.java
+++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodePrioritizer.java
@@ -136,6 +136,7 @@ public class NodePrioritizer {
.filter(node -> legalStates.contains(node.state()))
.filter(node -> node.allocation().isPresent())
.filter(node -> node.allocation().get().owner().equals(application))
+ .filter(node -> node.state() == Node.State.active || canStillAllocateToParentOf(node))
.map(node -> candidateFrom(node, false))
.forEach(candidate -> nodes.add(candidate));
}
@@ -177,6 +178,19 @@ public class NodePrioritizer {
return requestedNodes.fulfilledBy(nofNodesInCluster - nodeFailedNodes);
}
+ /**
+ * We may regret that a non-active node is allocated to a host and not offer it to the application
+ * now, e.g if we want to retire the host.
+ *
+ * @return true if we still want to allocate the given node to its parent
+ */
+ private boolean canStillAllocateToParentOf(Node node) {
+ if (node.parentHostname().isEmpty()) return true;
+ Optional<Node> parent = node.parentHostname().flatMap(nodeRepository::getNode);
+ if (parent.isEmpty()) return false;
+ return nodeRepository.canAllocateTenantNodeTo(parent.get());
+ }
+
private static NodeResources resources(NodeSpec requestedNodes) {
if ( ! (requestedNodes instanceof NodeSpec.CountNodeSpec)) return null;
return requestedNodes.resources().get();
diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/DockerProvisioningTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/DockerProvisioningTest.java
index d2a5e06469a..5abe7134aa0 100644
--- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/DockerProvisioningTest.java
+++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/DockerProvisioningTest.java
@@ -46,10 +46,8 @@ public class DockerProvisioningTest {
@Test
public void docker_application_deployment() {
ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build();
- ApplicationId application1 = ProvisioningTester.makeApplicationId();
-
- for (int i = 1; i < 10; i++)
- tester.makeReadyVirtualDockerNodes(1, dockerResources, "dockerHost" + i);
+ tester.makeReadyHosts(10, dockerResources).activateTenantHosts();
+ ApplicationId application1 = ProvisioningTester.makeApplicationId("app1");
Version wantedVespaVersion = Version.fromString("6.39");
int nodeCount = 7;
@@ -86,13 +84,12 @@ public class DockerProvisioningTest {
ApplicationId application1 = ProvisioningTester.makeApplicationId();
Version wantedVespaVersion = Version.fromString("6.39");
int nodeCount = 7;
- List<HostSpec> nodes = tester.prepare(application1,
- ClusterSpec.request(ClusterSpec.Type.content, ClusterSpec.Id.from("myContent")).vespaVersion(wantedVespaVersion).build(),
- nodeCount, 1, dockerResources);
try {
- tester.activate(application1, new HashSet<>(nodes));
+ List<HostSpec> nodes = tester.prepare(application1,
+ ClusterSpec.request(ClusterSpec.Type.content, ClusterSpec.Id.from("myContent")).vespaVersion(wantedVespaVersion).build(),
+ nodeCount, 1, dockerResources);
fail("Expected the allocation to fail due to parent hosts not being active yet");
- } catch (ParentHostUnavailableException ignored) { }
+ } catch (OutOfCapacityException expected) { }
// Activate the zone-app, thereby allocating the parents
List<HostSpec> hosts = tester.prepare(zoneApplication,
@@ -101,9 +98,9 @@ public class DockerProvisioningTest {
tester.activate(zoneApplication, hosts);
// Try allocating tenants again
- nodes = tester.prepare(application1,
- ClusterSpec.request(ClusterSpec.Type.content, ClusterSpec.Id.from("myContent")).vespaVersion(wantedVespaVersion).build(),
- nodeCount, 1, dockerResources);
+ List<HostSpec> nodes = tester.prepare(application1,
+ ClusterSpec.request(ClusterSpec.Type.content, ClusterSpec.Id.from("myContent")).vespaVersion(wantedVespaVersion).build(),
+ nodeCount, 1, dockerResources);
tester.activate(application1, new HashSet<>(nodes));
NodeList activeNodes = tester.getNodes(application1, Node.State.active);
@@ -152,53 +149,49 @@ public class DockerProvisioningTest {
/** Exclusive app first, then non-exclusive: Should give the same result as below */
@Test
public void docker_application_deployment_with_exclusive_app_first() {
+ NodeResources hostResources = new NodeResources(10, 40, 1000, 10);
+ NodeResources nodeResources = new NodeResources(1, 4, 100, 1);
ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build();
- for (int i = 1; i <= 4; i++)
- tester.makeReadyVirtualDockerNode(i, dockerResources, "host1");
- for (int i = 5; i <= 8; i++)
- tester.makeReadyVirtualDockerNode(i, dockerResources, "host2");
- for (int i = 9; i <= 12; i++)
- tester.makeReadyVirtualDockerNode(i, dockerResources, "host3");
- for (int i = 13; i <= 16; i++)
- tester.makeReadyVirtualDockerNode(i, dockerResources, "host4");
-
- ApplicationId application1 = ProvisioningTester.makeApplicationId();
- prepareAndActivate(application1, 2, true, tester);
- assertEquals(Set.of("host1", "host2"), hostsOf(tester.getNodes(application1, Node.State.active)));
-
- ApplicationId application2 = ProvisioningTester.makeApplicationId();
- prepareAndActivate(application2, 2, false, tester);
+ tester.makeReadyHosts(4, hostResources).activateTenantHosts();
+ ApplicationId application1 = ProvisioningTester.makeApplicationId("app1");
+ prepareAndActivate(application1, 2, true, nodeResources, tester);
+ assertEquals(Set.of("host-1.yahoo.com", "host-2.yahoo.com"),
+ hostsOf(tester.getNodes(application1, Node.State.active)));
+
+ ApplicationId application2 = ProvisioningTester.makeApplicationId("app2");
+ prepareAndActivate(application2, 2, false, nodeResources, tester);
assertEquals("Application is assigned to separate hosts",
- Set.of("host3", "host4"), hostsOf(tester.getNodes(application2, Node.State.active)));
+ Set.of("host-3.yahoo.com", "host-4.yahoo.com"),
+ hostsOf(tester.getNodes(application2, Node.State.active)));
}
/** Non-exclusive app first, then an exclusive: Should give the same result as above */
@Test
public void docker_application_deployment_with_exclusive_app_last() {
+ NodeResources hostResources = new NodeResources(10, 40, 1000, 10);
+ NodeResources nodeResources = new NodeResources(1, 4, 100, 1);
ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build();
- for (int i = 1; i <= 4; i++)
- tester.makeReadyVirtualDockerNode(i, dockerResources, "host1");
- for (int i = 5; i <= 8; i++)
- tester.makeReadyVirtualDockerNode(i, dockerResources, "host2");
- for (int i = 9; i <= 12; i++)
- tester.makeReadyVirtualDockerNode(i, dockerResources, "host3");
- for (int i = 13; i <= 16; i++)
- tester.makeReadyVirtualDockerNode(i, dockerResources, "host4");
-
- ApplicationId application1 = ProvisioningTester.makeApplicationId();
- prepareAndActivate(application1, 2, false, tester);
- assertEquals(Set.of("host1", "host2"), hostsOf(tester.getNodes(application1, Node.State.active)));
-
- ApplicationId application2 = ProvisioningTester.makeApplicationId();
- prepareAndActivate(application2, 2, true, tester);
+ tester.makeReadyHosts(4, hostResources).activateTenantHosts();
+ ApplicationId application1 = ProvisioningTester.makeApplicationId("app1");
+ prepareAndActivate(application1, 2, false, nodeResources, tester);
+ assertEquals(Set.of("host-1.yahoo.com", "host-2.yahoo.com"),
+ hostsOf(tester.getNodes(application1, Node.State.active)));
+
+ ApplicationId application2 = ProvisioningTester.makeApplicationId("app2");
+ prepareAndActivate(application2, 2, true, nodeResources, tester);
assertEquals("Application is assigned to separate hosts",
- Set.of("host3", "host4"), hostsOf(tester.getNodes(application2, Node.State.active)));
+ Set.of("host-3.yahoo.com", "host-4.yahoo.com"),
+ hostsOf(tester.getNodes(application2, Node.State.active)));
}
/** Test making an application exclusive */
@Test
public void docker_application_deployment_change_to_exclusive_and_back() {
+ NodeResources hostResources = new NodeResources(10, 40, 1000, 10);
+ NodeResources nodeResources = new NodeResources(1, 4, 100, 1);
ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build();
+ tester.makeReadyHosts(4, hostResources).activateTenantHosts();
+ /*
for (int i = 1; i <= 4; i++)
tester.makeReadyVirtualDockerNode(i, dockerResources, "host1");
for (int i = 5; i <= 8; i++)
@@ -207,19 +200,20 @@ public class DockerProvisioningTest {
tester.makeReadyVirtualDockerNode(i, dockerResources, "host3");
for (int i = 13; i <= 16; i++)
tester.makeReadyVirtualDockerNode(i, dockerResources, "host4");
+ */
ApplicationId application1 = ProvisioningTester.makeApplicationId();
- prepareAndActivate(application1, 2, false, tester);
+ prepareAndActivate(application1, 2, false, nodeResources, tester);
for (Node node : tester.getNodes(application1, Node.State.active))
assertFalse(node.allocation().get().membership().cluster().isExclusive());
- prepareAndActivate(application1, 2, true, tester);
- assertEquals(Set.of("host1", "host2"), hostsOf(tester.getNodes(application1, Node.State.active)));
+ prepareAndActivate(application1, 2, true, nodeResources, tester);
+ assertEquals(Set.of("host-1.yahoo.com", "host-2.yahoo.com"), hostsOf(tester.getNodes(application1, Node.State.active)));
for (Node node : tester.getNodes(application1, Node.State.active))
assertTrue(node.allocation().get().membership().cluster().isExclusive());
- prepareAndActivate(application1, 2, false, tester);
- assertEquals(Set.of("host1", "host2"), hostsOf(tester.getNodes(application1, Node.State.active)));
+ prepareAndActivate(application1, 2, false, nodeResources, tester);
+ assertEquals(Set.of("host-1.yahoo.com", "host-2.yahoo.com"), hostsOf(tester.getNodes(application1, Node.State.active)));
for (Node node : tester.getNodes(application1, Node.State.active))
assertFalse(node.allocation().get().membership().cluster().isExclusive());
}
@@ -227,56 +221,34 @@ public class DockerProvisioningTest {
/** Non-exclusive app first, then an exclusive: Should give the same result as above */
@Test
public void docker_application_deployment_with_exclusive_app_causing_allocation_failure() {
+ ApplicationId application1 = ApplicationId.from("tenant1", "app1", "default");
+ ApplicationId application2 = ApplicationId.from("tenant2", "app2", "default");
+ ApplicationId application3 = ApplicationId.from("tenant1", "app3", "default");
+ NodeResources hostResources = new NodeResources(10, 40, 1000, 10);
+ NodeResources nodeResources = new NodeResources(1, 4, 100, 1);
ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build();
- for (int i = 1; i <= 4; i++)
- tester.makeReadyVirtualDockerNode(i, dockerResources, "host1");
- for (int i = 5; i <= 8; i++)
- tester.makeReadyVirtualDockerNode(i, dockerResources, "host2");
- for (int i = 9; i <= 12; i++)
- tester.makeReadyVirtualDockerNode(i, dockerResources, "host3");
- for (int i = 13; i <= 16; i++)
- tester.makeReadyVirtualDockerNode(i, dockerResources, "host4");
+ tester.makeReadyHosts(4, hostResources).activateTenantHosts();
- ApplicationId application1 = ProvisioningTester.makeApplicationId();
- prepareAndActivate(application1, 2, true, tester);
- assertEquals(Set.of("host1", "host2"), hostsOf(tester.getNodes(application1, Node.State.active)));
+ prepareAndActivate(application1, 2, true, nodeResources, tester);
+ assertEquals(Set.of("host-1.yahoo.com", "host-2.yahoo.com"),
+ hostsOf(tester.getNodes(application1, Node.State.active)));
try {
- ApplicationId application2 = ApplicationId.from("tenant1", "app1", "default");
- prepareAndActivate(application2, 3, false, tester);
+ prepareAndActivate(application2, 3, false, nodeResources, tester);
fail("Expected allocation failure");
}
catch (Exception e) {
assertEquals("No room for 3 nodes as 2 of 4 hosts are exclusive",
"Could not satisfy request for 3 nodes with " +
- "[vcpu: 1.0, memory: 4.0 Gb, disk 100.0 Gb, bandwidth: 1.0 Gbps, storage type: local] " +
- "in tenant1.app1 container cluster 'myContainer' 6.39: " +
+ "[vcpu: 1.0, memory: 4.0 Gb, disk 100.0 Gb, bandwidth: 1.0 Gbps] " +
+ "in tenant2.app2 container cluster 'myContainer' 6.39: " +
"Out of capacity on group 0: " +
- "Not enough nodes available due to host exclusivity constraints, " +
- "insufficient nodes available on separate physical hosts",
+ "Not enough nodes available due to host exclusivity constraints",
e.getMessage());
}
// Adding 3 nodes of another application for the same tenant works
- ApplicationId application3 = ApplicationId.from(application1.tenant(), ApplicationName.from("app3"), InstanceName.from("default"));
- prepareAndActivate(application3, 2, true, tester);
- }
-
- // In dev, test and staging you get nodes with default flavor, but we should get specified flavor for docker nodes
- @Test
- public void get_specified_flavor_not_default_flavor_for_docker() {
- ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.test, RegionName.from("corp-us-east-1"))).build();
- ApplicationId application1 = ProvisioningTester.makeApplicationId();
- tester.makeReadyVirtualDockerNodes(1, dockerResources, "dockerHost");
-
- List<HostSpec> hosts = tester.prepare(application1,
- ClusterSpec.request(ClusterSpec.Type.content, ClusterSpec.Id.from("myContent")).vespaVersion("6.42").build(),
- 1, 1, dockerResources);
- tester.activate(application1, new HashSet<>(hosts));
-
- NodeList nodes = tester.getNodes(application1, Node.State.active);
- assertEquals(1, nodes.size());
- assertEquals("[vcpu: 1.0, memory: 4.0 Gb, disk 100.0 Gb, bandwidth: 1.0 Gbps, storage type: local]", nodes.asList().get(0).flavor().name());
+ prepareAndActivate(application3, 2, true, nodeResources, tester);
}
@Test
@@ -442,9 +414,13 @@ public class DockerProvisioningTest {
}
private void prepareAndActivate(ApplicationId application, int nodeCount, boolean exclusive, ProvisioningTester tester) {
+ prepareAndActivate(application, nodeCount, exclusive, dockerResources, tester);
+ }
+
+ private void prepareAndActivate(ApplicationId application, int nodeCount, boolean exclusive, NodeResources resources, ProvisioningTester tester) {
Set<HostSpec> hosts = new HashSet<>(tester.prepare(application,
ClusterSpec.request(ClusterSpec.Type.container, ClusterSpec.Id.from("myContainer")).vespaVersion("6.39").exclusive(exclusive).build(),
- Capacity.from(new ClusterResources(nodeCount, 1, dockerResources), false, true)));
+ Capacity.from(new ClusterResources(nodeCount, 1, resources), false, true)));
tester.activate(application, hosts);
}
diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/LoadBalancerProvisionerTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/LoadBalancerProvisionerTest.java
index 88f5d41a4f0..3d784c403f3 100644
--- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/LoadBalancerProvisionerTest.java
+++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/LoadBalancerProvisionerTest.java
@@ -6,6 +6,7 @@ import com.yahoo.config.provision.ApplicationId;
import com.yahoo.config.provision.Capacity;
import com.yahoo.config.provision.ClusterResources;
import com.yahoo.config.provision.ClusterSpec;
+import com.yahoo.config.provision.Flavor;
import com.yahoo.config.provision.HostName;
import com.yahoo.config.provision.HostSpec;
import com.yahoo.config.provision.NodeResources;
@@ -158,9 +159,10 @@ public class LoadBalancerProvisionerTest {
@Test
public void provision_load_balancers_with_dynamic_node_provisioning() {
- var nodes = prepare(app1, Capacity.from(new ClusterResources(2, 1, new NodeResources(1, 4, 10, 0.3)), false, true),
- true,
- clusterRequest(ClusterSpec.Type.container, ClusterSpec.Id.from("qrs")));
+ NodeResources resources = new NodeResources(1, 4, 10, 0.3);
+ tester.makeReadyHosts(2, resources);
+ tester.activateTenantHosts();
+ var nodes = tester.prepare(app1, clusterRequest(ClusterSpec.Type.container, ClusterSpec.Id.from("qrs")), 2 , 1, resources);
Supplier<LoadBalancer> lb = () -> tester.nodeRepository().loadBalancers(app1).asList().get(0);
assertTrue("Load balancer provisioned with empty reals", tester.loadBalancerService().instances().get(lb.get().id()).reals().isEmpty());
assignIps(tester.nodeRepository().getNodes(app1));
@@ -171,14 +173,12 @@ public class LoadBalancerProvisionerTest {
NestedTransaction removeTransaction = new NestedTransaction();
tester.provisioner().remove(removeTransaction, app1);
removeTransaction.commit();
- tester.nodeRepository().database().removeNodes(tester.nodeRepository().getNodes());
- assertTrue("Nodes are deleted", tester.nodeRepository().getNodes().isEmpty());
+ tester.nodeRepository().database().removeNodes(tester.nodeRepository().getNodes(NodeType.tenant));
+ assertTrue("Nodes are deleted", tester.nodeRepository().getNodes(NodeType.tenant).isEmpty());
assertSame("Load balancer is deactivated", LoadBalancer.State.inactive, lb.get().state());
// Application is redeployed
- nodes = prepare(app1, Capacity.from(new ClusterResources(2, 1, new NodeResources(1, 4, 10, 0.3)), false, true),
- true,
- clusterRequest(ClusterSpec.Type.container, ClusterSpec.Id.from("qrs")));
+ nodes = tester.prepare(app1, clusterRequest(ClusterSpec.Type.container, ClusterSpec.Id.from("qrs")), 2 , 1, resources);
assertTrue("Load balancer is reconfigured with empty reals", tester.loadBalancerService().instances().get(lb.get().id()).reals().isEmpty());
assignIps(tester.nodeRepository().getNodes(app1));
tester.activate(app1, nodes);
@@ -188,7 +188,6 @@ public class LoadBalancerProvisionerTest {
@Test
public void does_not_provision_load_balancers_for_non_tenant_node_type() {
tester.activate(infraApp1, prepare(infraApp1, Capacity.fromRequiredNodeType(NodeType.host),
- false,
clusterRequest(ClusterSpec.Type.container,
ClusterSpec.Id.from("tenant-host"))));
assertTrue("No load balancer provisioned", tester.loadBalancerService().instances().isEmpty());
@@ -220,7 +219,7 @@ public class LoadBalancerProvisionerTest {
ApplicationId configServerApp = ApplicationId.from("hosted-vespa", "zone-config-servers", "default");
Supplier<List<LoadBalancer>> lbs = () -> tester.nodeRepository().loadBalancers(configServerApp).asList();
var cluster = ClusterSpec.Id.from("zone-config-servers");
- var nodes = prepare(configServerApp, Capacity.fromRequiredNodeType(NodeType.config), false,
+ var nodes = prepare(configServerApp, Capacity.fromRequiredNodeType(NodeType.config),
clusterRequest(ClusterSpec.Type.admin, cluster));
assertEquals(1, lbs.get().size());
assertEquals("Prepare provisions load balancer with reserved nodes", 2, lbs.get().get(0).instance().reals().size());
@@ -235,7 +234,7 @@ public class LoadBalancerProvisionerTest {
ApplicationId controllerApp = ApplicationId.from("hosted-vespa", "controller", "default");
Supplier<List<LoadBalancer>> lbs = () -> tester.nodeRepository().loadBalancers(controllerApp).asList();
var cluster = ClusterSpec.Id.from("zone-config-servers");
- var nodes = prepare(controllerApp, Capacity.fromRequiredNodeType(NodeType.controller), false,
+ var nodes = prepare(controllerApp, Capacity.fromRequiredNodeType(NodeType.controller),
clusterRequest(ClusterSpec.Type.container, cluster));
assertEquals(1, lbs.get().size());
assertEquals("Prepare provisions load balancer with reserved nodes", 2, lbs.get().get(0).instance().reals().size());
@@ -249,15 +248,11 @@ public class LoadBalancerProvisionerTest {
}
private Set<HostSpec> prepare(ApplicationId application, ClusterSpec... specs) {
- return prepare(application, Capacity.from(new ClusterResources(2, 1, new NodeResources(1, 4, 10, 0.3)), false, true), false, specs);
+ return prepare(application, Capacity.from(new ClusterResources(2, 1, new NodeResources(1, 4, 10, 0.3)), false, true), specs);
}
- private Set<HostSpec> prepare(ApplicationId application, Capacity capacity, boolean dynamicDockerNodes, ClusterSpec... specs) {
- if (dynamicDockerNodes) {
- makeDynamicDockerNodes(specs.length * 2, capacity.type());
- } else {
- tester.makeReadyNodes(specs.length * 2, new NodeResources(1, 4, 10, 0.3), capacity.type());
- }
+ private Set<HostSpec> prepare(ApplicationId application, Capacity capacity, ClusterSpec... specs) {
+ tester.makeReadyNodes(specs.length * 2, new NodeResources(1, 4, 10, 0.3), capacity.type());
Set<HostSpec> allNodes = new LinkedHashSet<>();
for (ClusterSpec spec : specs) {
allNodes.addAll(tester.prepare(application, spec, capacity));
@@ -266,9 +261,11 @@ public class LoadBalancerProvisionerTest {
}
private void makeDynamicDockerNodes(int n, NodeType nodeType) {
+ tester.makeReadyHosts(n, new NodeResources(1, 4, 10, 0.3));
List<Node> nodes = new ArrayList<>(n);
for (int i = 1; i <= n; i++) {
- var node = Node.createDockerNode(Set.of(), "node" + i, "parent" + i,
+ var node = Node.createDockerNode(Set.of(), "vnode" + i,
+ tester.nodeRepository().getNodes(NodeType.host).get(n - 1).hostname(),
new NodeResources(1, 4, 10, 0.3),
nodeType);
nodes.add(node);
diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/VirtualNodeProvisioningTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/VirtualNodeProvisioningTest.java
index b5e31e7cbdb..d1446cd8bc1 100644
--- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/VirtualNodeProvisioningTest.java
+++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/VirtualNodeProvisioningTest.java
@@ -34,40 +34,37 @@ import static org.junit.Assert.assertNotNull;
// to remove these tests
public class VirtualNodeProvisioningTest {
- private static final NodeResources flavor = new NodeResources(4, 8, 100, 1);
+ private static final NodeResources resources = new NodeResources(4, 8, 100, 1);
private static final ClusterSpec contentClusterSpec = ClusterSpec.request(ClusterSpec.Type.content, ClusterSpec.Id.from("myContent")).vespaVersion("6.42").build();
private static final ClusterSpec containerClusterSpec = ClusterSpec.request(ClusterSpec.Type.container, ClusterSpec.Id.from("myContainer")).vespaVersion("6.42").build();
private ProvisioningTester tester = new ProvisioningTester.Builder().build();
- private ApplicationId applicationId = ProvisioningTester.makeApplicationId();
+ private ApplicationId applicationId = ProvisioningTester.makeApplicationId("test");
@Test
public void distinct_parent_host_for_each_node_in_a_cluster() {
- tester.makeReadyVirtualDockerNodes(2, flavor, "parentHost1");
- tester.makeReadyVirtualDockerNodes(2, flavor, "parentHost2");
- tester.makeReadyVirtualDockerNodes(2, flavor, "parentHost3");
- tester.makeReadyVirtualDockerNodes(1, flavor, "parentHost4");
-
- final int containerNodeCount = 4;
- final int contentNodeCount = 3;
- final int groups = 1;
- List<HostSpec> containerHosts = prepare(containerClusterSpec, containerNodeCount, groups);
- List<HostSpec> contentHosts = prepare(contentClusterSpec, contentNodeCount, groups);
+ tester.makeReadyHosts(4, new NodeResources(8, 16, 200, 2))
+ .activateTenantHosts();
+ int containerNodeCount = 4;
+ int contentNodeCount = 3;
+ int groups = 1;
+ List<HostSpec> containerHosts = tester.prepare(applicationId, containerClusterSpec, containerNodeCount, groups, resources);
+ List<HostSpec> contentHosts = tester.prepare(applicationId, contentClusterSpec, contentNodeCount, groups, resources);
activate(containerHosts, contentHosts);
- final List<Node> nodes = getNodes(applicationId);
+ List<Node> nodes = getNodes(applicationId);
assertEquals(contentNodeCount + containerNodeCount, nodes.size());
assertDistinctParentHosts(nodes, ClusterSpec.Type.container, containerNodeCount);
assertDistinctParentHosts(nodes, ClusterSpec.Type.content, contentNodeCount);
// Go down to 3 nodes in container cluster
- List<HostSpec> containerHosts2 = prepare(containerClusterSpec, containerNodeCount - 1, groups);
+ List<HostSpec> containerHosts2 = tester.prepare(applicationId, containerClusterSpec, containerNodeCount - 1, groups, resources);
activate(containerHosts2);
List<Node> nodes2 = getNodes(applicationId);
assertDistinctParentHosts(nodes2, ClusterSpec.Type.container, containerNodeCount - 1);
// Go up to 4 nodes again in container cluster
- List<HostSpec> containerHosts3 = prepare(containerClusterSpec, containerNodeCount, groups);
+ List<HostSpec> containerHosts3 = tester.prepare(applicationId, containerClusterSpec, containerNodeCount, groups, resources);
activate(containerHosts3);
List<Node> nodes3 = getNodes(applicationId);
assertDistinctParentHosts(nodes3, ClusterSpec.Type.container, containerNodeCount);
@@ -97,7 +94,7 @@ public class VirtualNodeProvisioningTest {
// Allowed to use same parent host for several nodes in same cluster in CD (even if prod env)
{
tester = new ProvisioningTester.Builder().zone(new Zone(SystemName.cd, Environment.prod, RegionName.from("us-east"))).build();
- tester.makeReadyNodes(4, flavor, NodeType.host, 1);
+ tester.makeReadyNodes(4, resources, NodeType.host, 1);
tester.prepareAndActivateInfraApplication(ProvisioningTester.makeApplicationId(), NodeType.host);
List<HostSpec> containerHosts = prepare(containerClusterSpec, containerNodeCount, groups);
@@ -110,19 +107,13 @@ public class VirtualNodeProvisioningTest {
@Test
public void will_retire_clashing_active() {
- tester.makeReadyVirtualDockerNodes(1, flavor, "parentHost1");
- tester.makeReadyVirtualDockerNodes(1, flavor, "parentHost2");
- tester.makeReadyVirtualDockerNodes(1, flavor, "parentHost3");
- tester.makeReadyVirtualDockerNodes(1, flavor, "parentHost4");
- tester.makeReadyVirtualDockerNodes(1, flavor, "parentHost5");
- tester.makeReadyVirtualDockerNodes(1, flavor, "parentHost6");
-
+ tester.makeReadyHosts(4, resources).activateTenantHosts();
int containerNodeCount = 2;
int contentNodeCount = 2;
int groups = 1;
- List<HostSpec> containerHosts = prepare(containerClusterSpec, containerNodeCount, groups);
- List<HostSpec> contentHosts = prepare(contentClusterSpec, contentNodeCount, groups);
- activate(containerHosts, contentHosts);
+ List<HostSpec> containerNodes = tester.prepare(applicationId, containerClusterSpec, containerNodeCount, groups, resources);
+ List<HostSpec> contentNodes = tester.prepare(applicationId, contentClusterSpec, contentNodeCount, groups, resources);
+ activate(containerNodes, contentNodes);
List<Node> nodes = getNodes(applicationId);
assertEquals(4, nodes.size());
@@ -130,48 +121,19 @@ public class VirtualNodeProvisioningTest {
assertDistinctParentHosts(nodes, ClusterSpec.Type.content, contentNodeCount);
tester.patchNodes(nodes, (n) -> n.withParentHostname("clashing"));
- containerHosts = prepare(containerClusterSpec, containerNodeCount, groups);
- contentHosts = prepare(contentClusterSpec, contentNodeCount, groups);
- activate(containerHosts, contentHosts);
+ containerNodes = prepare(containerClusterSpec, containerNodeCount, groups);
+ contentNodes = prepare(contentClusterSpec, contentNodeCount, groups);
+ activate(containerNodes, contentNodes);
nodes = getNodes(applicationId);
assertEquals(6, nodes.size());
assertEquals(2, nodes.stream().filter(n -> n.allocation().get().membership().retired()).count());
}
- @Test
- public void fail_when_all_hosts_become_clashing() {
- tester.makeReadyVirtualDockerNodes(1, flavor, "parentHost1");
- tester.makeReadyVirtualDockerNodes(1, flavor, "parentHost2");
- tester.makeReadyVirtualDockerNodes(1, flavor, "parentHost3");
- tester.makeReadyVirtualDockerNodes(1, flavor, "parentHost4");
-
- int containerNodeCount = 2;
- int contentNodeCount = 2;
- int groups = 1;
- List<HostSpec> containerHosts = prepare(containerClusterSpec, containerNodeCount, groups);
- List<HostSpec> contentHosts = prepare(contentClusterSpec, contentNodeCount, groups);
- activate(containerHosts, contentHosts);
-
- List<Node> nodes = getNodes(applicationId);
- assertEquals(4, nodes.size());
- assertDistinctParentHosts(nodes, ClusterSpec.Type.container, containerNodeCount);
- assertDistinctParentHosts(nodes, ClusterSpec.Type.content, contentNodeCount);
-
- tester.patchNodes(nodes, (n) -> n.withParentHostname("clashing"));
- OutOfCapacityException expected = null;
- try {
- containerHosts = prepare(containerClusterSpec, containerNodeCount, groups);
- } catch (OutOfCapacityException e) {
- expected = e;
- }
- assertNotNull(expected);
- }
-
@Test(expected = OutOfCapacityException.class)
public void fail_when_too_few_distinct_parent_hosts() {
- tester.makeReadyVirtualDockerNodes(2, flavor, "parentHost1");
- tester.makeReadyVirtualDockerNodes(1, flavor, "parentHost2");
+ tester.makeReadyVirtualDockerNodes(2, resources, "parentHost1");
+ tester.makeReadyVirtualDockerNodes(1, resources, "parentHost2");
int contentNodeCount = 3;
List<HostSpec> hosts = prepare(contentClusterSpec, contentNodeCount, 1);
@@ -182,26 +144,8 @@ public class VirtualNodeProvisioningTest {
}
@Test
- public void incomplete_parent_hosts_has_distinct_distribution() {
- tester.makeReadyVirtualDockerNode(1, flavor, "parentHost1");
- tester.makeReadyVirtualDockerNode(1, flavor, "parentHost2");
- tester.makeReadyVirtualNodes(1, flavor);
-
- final int contentNodeCount = 3;
- final int groups = 1;
- final List<HostSpec> contentHosts = prepare(contentClusterSpec, contentNodeCount, groups);
- activate(contentHosts);
- assertEquals(3, getNodes(applicationId).size());
-
- tester.makeReadyVirtualDockerNode(2, flavor, "parentHost1");
- tester.makeReadyVirtualDockerNode(2, flavor, "parentHost2");
-
- assertEquals(contentHosts, prepare(contentClusterSpec, contentNodeCount, groups));
- }
-
- @Test
public void indistinct_distribution_with_known_ready_nodes() {
- tester.makeReadyVirtualNodes(3, flavor);
+ tester.makeReadyVirtualNodes(3, resources);
final int contentNodeCount = 3;
final int groups = 1;
@@ -218,8 +162,8 @@ public class VirtualNodeProvisioningTest {
nodes = getNodes(applicationId);
assertEquals(3, nodes.stream().filter(n -> n.parentHostname().isPresent()).count());
- tester.makeReadyVirtualDockerNodes(1, flavor, "parentHost1");
- tester.makeReadyVirtualDockerNodes(2, flavor, "parentHost2");
+ tester.makeReadyVirtualDockerNodes(1, resources, "parentHost1");
+ tester.makeReadyVirtualDockerNodes(2, resources, "parentHost2");
OutOfCapacityException expectedException = null;
try {
@@ -232,7 +176,7 @@ public class VirtualNodeProvisioningTest {
@Test
public void unknown_distribution_with_known_ready_nodes() {
- tester.makeReadyVirtualNodes(3, flavor);
+ tester.makeReadyVirtualNodes(3, resources);
final int contentNodeCount = 3;
final int groups = 1;
@@ -240,15 +184,15 @@ public class VirtualNodeProvisioningTest {
activate(contentHosts);
assertEquals(3, getNodes(applicationId).size());
- tester.makeReadyVirtualDockerNodes(1, flavor, "parentHost1");
- tester.makeReadyVirtualDockerNodes(1, flavor, "parentHost2");
- tester.makeReadyVirtualDockerNodes(1, flavor, "parentHost3");
+ tester.makeReadyVirtualDockerNodes(1, resources, "parentHost1");
+ tester.makeReadyVirtualDockerNodes(1, resources, "parentHost2");
+ tester.makeReadyVirtualDockerNodes(1, resources, "parentHost3");
assertEquals(contentHosts, prepare(contentClusterSpec, contentNodeCount, groups));
}
@Test
public void unknown_distribution_with_known_and_unknown_ready_nodes() {
- tester.makeReadyVirtualNodes(3, flavor);
+ tester.makeReadyVirtualNodes(3, resources);
int contentNodeCount = 3;
int groups = 1;
@@ -256,8 +200,8 @@ public class VirtualNodeProvisioningTest {
activate(contentHosts);
assertEquals(3, getNodes(applicationId).size());
- tester.makeReadyVirtualDockerNodes(1, flavor, "parentHost1");
- tester.makeReadyVirtualNodes(1, flavor);
+ tester.makeReadyVirtualDockerNodes(1, resources, "parentHost1");
+ tester.makeReadyVirtualNodes(1, resources);
assertEquals(contentHosts, prepare(contentClusterSpec, contentNodeCount, groups));
}
@@ -289,7 +233,7 @@ public class VirtualNodeProvisioningTest {
}
private List<HostSpec> prepare(ClusterSpec clusterSpec, int nodeCount, int groups) {
- return tester.prepare(applicationId, clusterSpec, nodeCount, groups, flavor);
+ return tester.prepare(applicationId, clusterSpec, nodeCount, groups, resources);
}
private List<HostSpec> prepare(ClusterSpec clusterSpec, int nodeCount, int groups, NodeResources flavor) {
diff --git a/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt b/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt
index b5465b1f0dd..cf18be8f5de 100644
--- a/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt
+++ b/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt
@@ -3,11 +3,15 @@ vespa_add_executable(searchcore_vespa_feed_bm_app
SOURCES
vespa_feed_bm.cpp
bm_cluster_controller.cpp
+ bm_message_bus.cpp
bm_storage_chain_builder.cpp
bm_storage_link.cpp
+ document_api_message_bus_bm_feed_handler.cpp
+ pending_tracker_hash.cpp
spi_bm_feed_handler.cpp
- storage_api_rpc_bm_feed_handler.cpp
storage_api_chain_bm_feed_handler.cpp
+ storage_api_message_bus_bm_feed_handler.cpp
+ storage_api_rpc_bm_feed_handler.cpp
storage_reply_error_checker.cpp
OUTPUT_NAME vespa-feed-bm
DEPENDS
diff --git a/searchcore/src/apps/vespa-feed-bm/bm_message_bus.cpp b/searchcore/src/apps/vespa-feed-bm/bm_message_bus.cpp
new file mode 100644
index 00000000000..ec50a4c7c01
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/bm_message_bus.cpp
@@ -0,0 +1,180 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "bm_message_bus.h"
+#include "pending_tracker_hash.h"
+#include "pending_tracker.h"
+#include "storage_reply_error_checker.h"
+#include <vespa/messagebus/emptyreply.h>
+#include <vespa/messagebus/network/rpcnetworkparams.h>
+#include <vespa/messagebus/rpcmessagebus.h>
+#include <vespa/messagebus/ireplyhandler.h>
+#include <vespa/documentapi/messagebus/documentprotocol.h>
+#include <vespa/documentapi/messagebus/messages/documentmessage.h>
+#include <vespa/storageapi/mbusprot/storageprotocol.h>
+#include <vespa/storageapi/mbusprot/storagereply.h>
+#include <vespa/vespalib/stllike/asciistream.h>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".bm_message_bus");
+
+using documentapi::DocumentProtocol;
+using mbus::RPCMessageBus;
+using mbus::Reply;
+using mbus::SourceSession;
+using storage::mbusprot::StorageProtocol;
+using storage::mbusprot::StorageReply;
+
+namespace feedbm {
+
+namespace {
+
+std::atomic<uint64_t> bm_message_bus_msg_id(0u);
+
+vespalib::string reply_as_string(Reply &reply) {
+ vespalib::asciistream os;
+ if (reply.getType() == 0) {
+ os << "empty reply";
+ } else {
+ os << "reply=" << reply.toString() << ", protocol=" << reply.getProtocol();
+ }
+ os << ", ";
+ auto message = reply.getMessage();
+ if (message) {
+ os << "message=" << message->toString();
+ os << ", protocol=" << message->getProtocol();
+ } else {
+ os << "no message";
+ }
+ reply.setMessage(std::move(message));
+ os << ", ";
+ if (reply.hasErrors()) {
+ os << "errors=[";
+ for (uint32_t i = 0; i < reply.getNumErrors(); ++i) {
+ auto &error = reply.getError(i);
+ if (i > 0) {
+ os << ", ";
+ }
+ os << mbus::ErrorCode::getName(error.getCode()) << ": " << error.getMessage() << " (from " << error.getService() << ")";
+ }
+ os << "]";
+ } else {
+ os << "no errors";
+ }
+ return os.str();
+}
+
+}
+
+class BmMessageBus::ReplyHandler : public mbus::IReplyHandler,
+ public StorageReplyErrorChecker
+{
+ PendingTrackerHash _pending_hash;
+public:
+ ReplyHandler();
+ ~ReplyHandler() override;
+ void handleReply(std::unique_ptr<Reply> reply) override;
+ void retain(uint64_t msg_id, PendingTracker &tracker) { _pending_hash.retain(msg_id, tracker); }
+ void message_aborted(uint64_t msg_id);
+};
+
+BmMessageBus::ReplyHandler::ReplyHandler()
+ : mbus::IReplyHandler(),
+ StorageReplyErrorChecker(),
+ _pending_hash()
+{
+}
+
+BmMessageBus::ReplyHandler::~ReplyHandler() = default;
+
+void
+BmMessageBus::ReplyHandler::handleReply(std::unique_ptr<Reply> reply)
+{
+ auto msg_id = reply->getContext().value.UINT64;
+ auto tracker = _pending_hash.release(msg_id);
+ if (tracker != nullptr) {
+ bool failed = false;
+ if (reply->getType() == 0 || reply->hasErrors()) {
+ failed = true; // empty reply or error
+ } else {
+ auto protocol = reply->getProtocol();
+ if (protocol == DocumentProtocol::NAME) {
+ } else if (protocol == StorageProtocol::NAME) {
+ auto sreply = dynamic_cast<storage::mbusprot::StorageReply *>(reply.get());
+ if (sreply != nullptr) {
+ check_error(*sreply->getReply());
+ } else {
+ failed = true; // unexpected message type
+ }
+ } else {
+ failed = true; // unexpected protocol
+ }
+ }
+ if (failed) {
+ ++_errors;
+ LOG(error, "Unexpected %s", reply_as_string(*reply).c_str());
+ }
+ tracker->release();
+ } else {
+ ++_errors;
+ LOG(error, "Untracked %s", reply_as_string(*reply).c_str());
+ }
+}
+
+void
+BmMessageBus::ReplyHandler::message_aborted(uint64_t msg_id)
+{
+ ++_errors;
+ auto tracker = _pending_hash.release(msg_id);
+ tracker->release();
+}
+
+BmMessageBus::BmMessageBus(const config::ConfigUri& config_uri,
+ std::shared_ptr<const document::DocumentTypeRepo> document_type_repo,
+ const documentapi::LoadTypeSet& load_types)
+ : _reply_handler(std::make_unique<ReplyHandler>()),
+ _message_bus(),
+ _session()
+{
+ mbus::RPCNetworkParams params(config_uri);
+ mbus::ProtocolSet protocol_set;
+ protocol_set.add(std::make_shared<DocumentProtocol>(load_types, document_type_repo));
+ protocol_set.add(std::make_shared<StorageProtocol>(document_type_repo, load_types));
+ params.setIdentity(mbus::Identity("vespa-bm-client"));
+ _message_bus = std::make_unique<mbus::RPCMessageBus>(
+ protocol_set,
+ params,
+ config_uri);
+ mbus::SourceSessionParams srcParams;
+ srcParams.setThrottlePolicy(mbus::IThrottlePolicy::SP());
+ srcParams.setReplyHandler(*_reply_handler);
+ _session = _message_bus->getMessageBus().createSourceSession(srcParams);
+}
+
+BmMessageBus::~BmMessageBus()
+{
+ _session.reset();
+ _message_bus.reset();
+ _reply_handler.reset();
+}
+
+uint32_t
+BmMessageBus::get_error_count() const
+{
+ return _reply_handler->get_error_count();
+}
+
+void
+BmMessageBus::send_msg(std::unique_ptr<mbus::Message> msg, const mbus::Route &route, PendingTracker &tracker)
+{
+ auto msg_id = ++bm_message_bus_msg_id;
+ _reply_handler->retain(msg_id, tracker);
+ msg->setContext(mbus::Context(msg_id));
+ msg->setRetryEnabled(false);
+ auto result = _session->send(std::move(msg), route);
+ if (!result.isAccepted()) {
+ LOG(error, "Message not accepeted, error is '%s'", result.getError().toString().c_str());
+ _reply_handler->message_aborted(msg_id);
+ }
+}
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/bm_message_bus.h b/searchcore/src/apps/vespa-feed-bm/bm_message_bus.h
new file mode 100644
index 00000000000..9ebe394e9e6
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/bm_message_bus.h
@@ -0,0 +1,42 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <memory>
+
+namespace config { class ConfigUri; }
+namespace document { class DocumentTypeRepo; }
+namespace documentapi { class LoadTypeSet; }
+
+namespace mbus {
+
+class Message;
+class RPCMessageBus;
+class Route;
+class SourceSession;
+
+}
+
+namespace feedbm {
+
+class PendingTracker;
+
+/*
+ * Message bus for feed benchmark program.
+ */
+class BmMessageBus
+{
+ class ReplyHandler;
+ std::unique_ptr<ReplyHandler> _reply_handler;
+ std::unique_ptr<mbus::RPCMessageBus> _message_bus;
+ std::unique_ptr<mbus::SourceSession> _session;
+public:
+ BmMessageBus(const config::ConfigUri& config_uri,
+ std::shared_ptr<const document::DocumentTypeRepo> document_type_repo,
+ const documentapi::LoadTypeSet& load_types);
+ ~BmMessageBus();
+ uint32_t get_error_count() const;
+ void send_msg(std::unique_ptr<mbus::Message> msg, const mbus::Route &route, PendingTracker &tracker);
+};
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp b/searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp
index 79517e98094..2aeda91c30c 100644
--- a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp
+++ b/searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp
@@ -2,45 +2,18 @@
#include "bm_storage_link.h"
#include "pending_tracker.h"
-#include <vespa/vespalib/stllike/hash_map.hpp>
namespace feedbm {
-void
-BmStorageLink::retain(uint64_t msg_id, PendingTracker &tracker)
-{
- tracker.retain();
- std::lock_guard lock(_mutex);
- _pending.insert(std::make_pair(msg_id, &tracker));
-}
-
-PendingTracker *
-BmStorageLink::release(uint64_t msg_id)
-{
- std::lock_guard lock(_mutex);
- auto itr = _pending.find(msg_id);
- if (itr == _pending.end()) {
- return nullptr;
- }
- auto tracker = itr->second;
- _pending.erase(itr);
- return tracker;
-}
-
BmStorageLink::BmStorageLink()
: storage::StorageLink("vespa-bm-feed"),
StorageReplyErrorChecker(),
- _mutex(),
- _pending()
+ _pending_hash()
{
}
-BmStorageLink::~BmStorageLink()
-{
- std::lock_guard lock(_mutex);
- assert(_pending.empty());
-}
+BmStorageLink::~BmStorageLink() = default;
bool
BmStorageLink::onDown(const std::shared_ptr<storage::api::StorageMessage>& msg)
@@ -52,7 +25,7 @@ BmStorageLink::onDown(const std::shared_ptr<storage::api::StorageMessage>& msg)
bool
BmStorageLink::onUp(const std::shared_ptr<storage::api::StorageMessage>& msg)
{
- auto tracker = release(msg->getMsgId());
+ auto tracker = _pending_hash.release(msg->getMsgId());
if (tracker != nullptr) {
check_error(*msg);
tracker->release();
diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.h b/searchcore/src/apps/vespa-feed-bm/bm_storage_link.h
index 63ece355c02..95528d7b2d9 100644
--- a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.h
+++ b/searchcore/src/apps/vespa-feed-bm/bm_storage_link.h
@@ -3,8 +3,8 @@
#pragma once
#include "storage_reply_error_checker.h"
+#include "pending_tracker_hash.h"
#include <vespa/storage/common/storagelink.h>
-#include <vespa/vespalib/stllike/hash_map.h>
namespace feedbm {
@@ -17,15 +17,13 @@ class PendingTracker;
class BmStorageLink : public storage::StorageLink,
public StorageReplyErrorChecker
{
- std::mutex _mutex;
- vespalib::hash_map<uint64_t, PendingTracker *> _pending;
- PendingTracker *release(uint64_t msg_id);
+ PendingTrackerHash _pending_hash;
public:
BmStorageLink();
~BmStorageLink() override;
bool onDown(const std::shared_ptr<storage::api::StorageMessage>& msg) override;
bool onUp(const std::shared_ptr<storage::api::StorageMessage>& msg) override;
- void retain(uint64_t msg_id, PendingTracker &tracker);
+ void retain(uint64_t msg_id, PendingTracker &tracker) { _pending_hash.retain(msg_id, tracker); }
};
}
diff --git a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp
new file mode 100644
index 00000000000..276a5a8136b
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp
@@ -0,0 +1,82 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "document_api_message_bus_bm_feed_handler.h"
+#include "bm_message_bus.h"
+#include "pending_tracker.h"
+#include <vespa/document/fieldvalue/document.h>
+#include <vespa/document/update/documentupdate.h>
+#include <vespa/documentapi/messagebus/messages/putdocumentmessage.h>
+#include <vespa/documentapi/messagebus/messages/removedocumentmessage.h>
+#include <vespa/documentapi/messagebus/messages/updatedocumentmessage.h>
+#include <vespa/storageapi/messageapi/storagemessage.h>
+
+using document::Document;
+using document::DocumentId;
+using document::DocumentUpdate;
+using storage::api::StorageMessageAddress;
+using storage::lib::NodeType;
+
+namespace feedbm {
+
+DocumentApiMessageBusBmFeedHandler::DocumentApiMessageBusBmFeedHandler(BmMessageBus &message_bus)
+ : IBmFeedHandler(),
+ _name(vespalib::string("DocumentApiMessageBusBmFeedHandler(distributor)")),
+ _storage_address(std::make_unique<StorageMessageAddress>("storage", NodeType::DISTRIBUTOR, 0)),
+ _message_bus(message_bus)
+{
+}
+
+DocumentApiMessageBusBmFeedHandler::~DocumentApiMessageBusBmFeedHandler() = default;
+
+void
+DocumentApiMessageBusBmFeedHandler::send_msg(std::unique_ptr<documentapi::DocumentMessage> msg, PendingTracker& pending_tracker)
+{
+ _message_bus.send_msg(std::move(msg), _storage_address->getRoute(), pending_tracker);
+}
+
+void
+DocumentApiMessageBusBmFeedHandler::put(const document::Bucket&, std::unique_ptr<Document> document, uint64_t, PendingTracker& tracker)
+{
+ auto msg = std::make_unique<documentapi::PutDocumentMessage>(std::move(document));
+ send_msg(std::move(msg), tracker);
+}
+
+void
+DocumentApiMessageBusBmFeedHandler::update(const document::Bucket&, std::unique_ptr<DocumentUpdate> document_update, uint64_t, PendingTracker& tracker)
+{
+ auto msg = std::make_unique<documentapi::UpdateDocumentMessage>(std::move(document_update));
+ send_msg(std::move(msg), tracker);
+}
+
+void
+DocumentApiMessageBusBmFeedHandler::remove(const document::Bucket&, const DocumentId& document_id, uint64_t, PendingTracker& tracker)
+{
+ auto msg = std::make_unique<documentapi::RemoveDocumentMessage>(document_id);
+ send_msg(std::move(msg), tracker);
+}
+
+uint32_t
+DocumentApiMessageBusBmFeedHandler::get_error_count() const
+{
+ return _message_bus.get_error_count();
+}
+
+const vespalib::string&
+DocumentApiMessageBusBmFeedHandler::get_name() const
+{
+ return _name;
+}
+
+bool
+DocumentApiMessageBusBmFeedHandler::manages_buckets() const
+{
+ return true;
+}
+
+bool
+DocumentApiMessageBusBmFeedHandler::manages_timestamp() const
+{
+ return true;
+}
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h
new file mode 100644
index 00000000000..1e958da7900
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h
@@ -0,0 +1,37 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "i_bm_feed_handler.h"
+
+namespace document { class DocumentTypeRepo; }
+namespace documentapi { class DocumentMessage; };
+namespace storage::api { class StorageMessageAddress; }
+
+namespace feedbm {
+
+class BmMessageBus;
+
+/*
+ * Benchmark feed handler for feed to distributor using document api protocol
+ * over message bus.
+ */
+class DocumentApiMessageBusBmFeedHandler : public IBmFeedHandler
+{
+ vespalib::string _name;
+ std::unique_ptr<storage::api::StorageMessageAddress> _storage_address;
+ BmMessageBus& _message_bus;
+ void send_msg(std::unique_ptr<documentapi::DocumentMessage> msg, PendingTracker& tracker);
+public:
+ DocumentApiMessageBusBmFeedHandler(BmMessageBus &message_bus);
+ ~DocumentApiMessageBusBmFeedHandler();
+ void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override;
+ void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override;
+ void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override;
+ uint32_t get_error_count() const override;
+ const vespalib::string &get_name() const override;
+ bool manages_buckets() const override;
+ bool manages_timestamp() const override;
+};
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.cpp b/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.cpp
new file mode 100644
index 00000000000..6863d35703e
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.cpp
@@ -0,0 +1,43 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "pending_tracker_hash.h"
+#include "pending_tracker.h"
+#include <vespa/vespalib/stllike/hash_map.hpp>
+#include <cassert>
+
+namespace feedbm {
+
+PendingTrackerHash::PendingTrackerHash()
+ : _mutex(),
+ _pending()
+{
+}
+
+PendingTrackerHash::~PendingTrackerHash()
+{
+ std::lock_guard lock(_mutex);
+ assert(_pending.empty());
+}
+
+void
+PendingTrackerHash::retain(uint64_t msg_id, PendingTracker &tracker)
+{
+ tracker.retain();
+ std::lock_guard lock(_mutex);
+ _pending.insert(std::make_pair(msg_id, &tracker));
+}
+
+PendingTracker *
+PendingTrackerHash::release(uint64_t msg_id)
+{
+ std::lock_guard lock(_mutex);
+ auto itr = _pending.find(msg_id);
+ if (itr == _pending.end()) {
+ return nullptr;
+ }
+ auto tracker = itr->second;
+ _pending.erase(itr);
+ return tracker;
+}
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.h b/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.h
new file mode 100644
index 00000000000..89be93fd4ed
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.h
@@ -0,0 +1,26 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/stllike/hash_map.h>
+#include <mutex>
+
+namespace feedbm {
+
+class PendingTracker;
+
+/*
+ * Class maintaing mapping from message id to pending tracker
+ */
+class PendingTrackerHash
+{
+ std::mutex _mutex;
+ vespalib::hash_map<uint64_t, PendingTracker *> _pending;
+public:
+ PendingTrackerHash();
+ ~PendingTrackerHash();
+ PendingTracker *release(uint64_t msg_id);
+ void retain(uint64_t msg_id, PendingTracker &tracker);
+};
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp
index 2d45caff152..0e73582fdd4 100644
--- a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp
+++ b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp
@@ -33,7 +33,7 @@ std::shared_ptr<storage::api::StorageCommand> make_set_cluster_state_cmd() {
StorageApiChainBmFeedHandler::StorageApiChainBmFeedHandler(std::shared_ptr<BmStorageLinkContext> context, bool distributor)
: IBmFeedHandler(),
- _name(vespalib::string("StorageApiChainBmFeedHandler(") + (distributor ? "distributor" : "servicelayer") + ")"),
+ _name(vespalib::string("StorageApiChainBmFeedHandler(") + (distributor ? "distributor" : "service-layer") + ")"),
_distributor(distributor),
_context(std::move(context))
{
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h
index 089a7fd89e5..f877a244726 100644
--- a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h
+++ b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h
@@ -11,8 +11,8 @@ namespace feedbm {
struct BmStorageLinkContext;
/*
- * Benchmark feed handler for feed to service layer using storage api protocol
- * directly on the storage chain.
+ * Benchmark feed handler for feed to service layer or distributor
+ * using storage api protocol directly on the storage chain.
*/
class StorageApiChainBmFeedHandler : public IBmFeedHandler
{
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp
new file mode 100644
index 00000000000..a0a3eb5c6db
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp
@@ -0,0 +1,84 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "storage_api_message_bus_bm_feed_handler.h"
+#include "bm_message_bus.h"
+#include "pending_tracker.h"
+#include <vespa/document/fieldvalue/document.h>
+#include <vespa/document/update/documentupdate.h>
+#include <vespa/storageapi/messageapi/storagemessage.h>
+#include <vespa/storageapi/message/persistence.h>
+#include <vespa/storageapi/mbusprot/storagecommand.h>
+
+using document::Document;
+using document::DocumentId;
+using document::DocumentUpdate;
+using storage::api::StorageMessageAddress;
+using storage::lib::NodeType;
+
+namespace feedbm {
+
+StorageApiMessageBusBmFeedHandler::StorageApiMessageBusBmFeedHandler(BmMessageBus &message_bus, bool distributor)
+ : IBmFeedHandler(),
+ _name(vespalib::string("StorageApiMessageBusBmFeedHandler(") + (distributor ? "distributor" : "service-layer") + ")"),
+ _distributor(distributor),
+ _storage_address(std::make_unique<StorageMessageAddress>("storage", distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE, 0)),
+ _message_bus(message_bus)
+{
+}
+
+StorageApiMessageBusBmFeedHandler::~StorageApiMessageBusBmFeedHandler() = default;
+
+void
+StorageApiMessageBusBmFeedHandler::send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& pending_tracker)
+{
+ cmd->setSourceIndex(0);
+ auto msg = std::make_unique<storage::mbusprot::StorageCommand>(cmd);
+ _message_bus.send_msg(std::move(msg), _storage_address->getRoute(), pending_tracker);
+}
+
+void
+StorageApiMessageBusBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker)
+{
+ auto cmd = std::make_unique<storage::api::PutCommand>(bucket, std::move(document), timestamp);
+ send_msg(std::move(cmd), tracker);
+}
+
+void
+StorageApiMessageBusBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker)
+{
+ auto cmd = std::make_unique<storage::api::UpdateCommand>(bucket, std::move(document_update), timestamp);
+ send_msg(std::move(cmd), tracker);
+}
+
+void
+StorageApiMessageBusBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker)
+{
+ auto cmd = std::make_unique<storage::api::RemoveCommand>(bucket, document_id, timestamp);
+ send_msg(std::move(cmd), tracker);
+}
+
+uint32_t
+StorageApiMessageBusBmFeedHandler::get_error_count() const
+{
+ return _message_bus.get_error_count();
+}
+
+const vespalib::string&
+StorageApiMessageBusBmFeedHandler::get_name() const
+{
+ return _name;
+}
+
+bool
+StorageApiMessageBusBmFeedHandler::manages_buckets() const
+{
+ return _distributor;
+}
+
+bool
+StorageApiMessageBusBmFeedHandler::manages_timestamp() const
+{
+ return _distributor;
+}
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h
new file mode 100644
index 00000000000..84e69053289
--- /dev/null
+++ b/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h
@@ -0,0 +1,41 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "i_bm_feed_handler.h"
+
+namespace document { class DocumentTypeRepo; }
+namespace documentapi { class DocumentMessage; };
+namespace storage::api {
+class StorageCommand;
+class StorageMessageAddress;
+}
+
+namespace feedbm {
+
+class BmMessageBus;
+
+/*
+ * Benchmark feed handler for feed to service layer or distributor
+ * using storage api protocol over message bus.
+ */
+class StorageApiMessageBusBmFeedHandler : public IBmFeedHandler
+{
+ vespalib::string _name;
+ bool _distributor;
+ std::unique_ptr<storage::api::StorageMessageAddress> _storage_address;
+ BmMessageBus& _message_bus;
+ void send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker);
+public:
+ StorageApiMessageBusBmFeedHandler(BmMessageBus &message_bus, bool distributor);
+ ~StorageApiMessageBusBmFeedHandler();
+ void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override;
+ void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override;
+ void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override;
+ uint32_t get_error_count() const override;
+ const vespalib::string &get_name() const override;
+ bool manages_buckets() const override;
+ bool manages_timestamp() const override;
+};
+
+}
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp
index 53e833f3e24..9fe6662b79a 100644
--- a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp
+++ b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp
@@ -2,6 +2,7 @@
#include "storage_api_rpc_bm_feed_handler.h"
#include "pending_tracker.h"
+#include "pending_tracker_hash.h"
#include "storage_reply_error_checker.h"
#include <vespa/document/fieldvalue/document.h>
#include <vespa/document/update/documentupdate.h>
@@ -12,8 +13,6 @@
#include <vespa/storage/storageserver/rpc/message_codec_provider.h>
#include <vespa/storage/storageserver/rpc/shared_rpc_resources.h>
#include <vespa/storage/storageserver/rpc/storage_api_rpc_service.h>
-#include <vespa/vespalib/stllike/hash_map.h>
-#include <vespa/vespalib/stllike/hash_map.hpp>
#include <cassert>
using document::Document;
@@ -30,14 +29,12 @@ namespace feedbm {
class StorageApiRpcBmFeedHandler::MyMessageDispatcher : public storage::MessageDispatcher,
public StorageReplyErrorChecker
{
- std::mutex _mutex;
- vespalib::hash_map<uint64_t, PendingTracker *> _pending;
+ PendingTrackerHash _pending_hash;
public:
MyMessageDispatcher()
: storage::MessageDispatcher(),
StorageReplyErrorChecker(),
- _mutex(),
- _pending()
+ _pending_hash()
{
}
~MyMessageDispatcher() override;
@@ -49,28 +46,19 @@ public:
check_error(*msg);
release(msg->getMsgId());
}
- void retain(uint64_t msg_id, PendingTracker &tracker) {
- tracker.retain();
- std::lock_guard lock(_mutex);
- _pending.insert(std::make_pair(msg_id, &tracker));
- }
+ void retain(uint64_t msg_id, PendingTracker &tracker) { _pending_hash.retain(msg_id, tracker); }
void release(uint64_t msg_id) {
- PendingTracker *tracker = nullptr;
- {
- std::lock_guard lock(_mutex);
- auto itr = _pending.find(msg_id);
- assert(itr != _pending.end());
- tracker = itr->second;
- _pending.erase(itr);
+ auto tracker = _pending_hash.release(msg_id);
+ if (tracker != nullptr) {
+ tracker->release();
+ } else {
+ ++_errors;
}
- tracker->release();
}
};
StorageApiRpcBmFeedHandler::MyMessageDispatcher::~MyMessageDispatcher()
{
- std::lock_guard lock(_mutex);
- assert(_pending.empty());
}
StorageApiRpcBmFeedHandler::StorageApiRpcBmFeedHandler(SharedRpcResources& shared_rpc_resources_in,
@@ -78,7 +66,7 @@ StorageApiRpcBmFeedHandler::StorageApiRpcBmFeedHandler(SharedRpcResources& share
const StorageApiRpcService::Params& rpc_params,
bool distributor)
: IBmFeedHandler(),
- _name(vespalib::string("StorageApiRpcBmFeedHandler(") + (distributor ? "distributor" : "servicelayer") + ")"),
+ _name(vespalib::string("StorageApiRpcBmFeedHandler(") + (distributor ? "distributor" : "service-layer") + ")"),
_distributor(distributor),
_storage_address(std::make_unique<StorageMessageAddress>("storage", distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE, 0)),
_shared_rpc_resources(shared_rpc_resources_in),
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h
index 9901c21f174..535171c39e1 100644
--- a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h
+++ b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h
@@ -19,8 +19,8 @@ class SharedRpcResources;
namespace feedbm {
/*
- * Benchmark feed handler for feed to service layer using storage api protocol
- * over rpc.
+ * Benchmark feed handler for feed to service layer or distributor
+ * using storage api protocol over rpc.
*/
class StorageApiRpcBmFeedHandler : public IBmFeedHandler
{
diff --git a/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.h b/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.h
index 78004f3d787..4743367b426 100644
--- a/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.h
+++ b/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.h
@@ -9,6 +9,7 @@ namespace storage::api { class StorageMessage; }
namespace feedbm {
class StorageReplyErrorChecker {
+protected:
std::atomic<uint32_t> _errors;
public:
StorageReplyErrorChecker();
diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
index 3aada3cc958..dc8d787a778 100644
--- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
+++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp
@@ -1,12 +1,15 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "bm_cluster_controller.h"
+#include "bm_message_bus.h"
#include "bm_storage_chain_builder.h"
#include "bm_storage_link_context.h"
#include "pending_tracker.h"
#include "spi_bm_feed_handler.h"
#include "storage_api_chain_bm_feed_handler.h"
+#include "storage_api_message_bus_bm_feed_handler.h"
#include "storage_api_rpc_bm_feed_handler.h"
+#include "document_api_message_bus_bm_feed_handler.h"
#include <tests/proton/common/dummydbowner.h>
#include <vespa/config-attributes.h>
#include <vespa/config-bucketspaces.h>
@@ -123,11 +126,14 @@ using document::FieldUpdate;
using document::IntFieldValue;
using document::test::makeBucketSpace;
using feedbm::BmClusterController;
+using feedbm::BmMessageBus;
using feedbm::BmStorageChainBuilder;
using feedbm::BmStorageLinkContext;
using feedbm::IBmFeedHandler;
+using feedbm::DocumentApiMessageBusBmFeedHandler;
using feedbm::SpiBmFeedHandler;
using feedbm::StorageApiChainBmFeedHandler;
+using feedbm::StorageApiMessageBusBmFeedHandler;
using feedbm::StorageApiRpcBmFeedHandler;
using search::TuneFileDocumentDB;
using search::index::DummyFileHeaderContext;
@@ -193,6 +199,23 @@ std::shared_ptr<DocumentDBConfig> make_document_db_config(std::shared_ptr<Docume
doc_type_name.getName());
}
+void
+make_slobroks_config(SlobroksConfigBuilder& slobroks, int slobrok_port)
+{
+ SlobroksConfigBuilder::Slobrok slobrok;
+ slobrok.connectionspec = vespalib::make_string("tcp/localhost:%d", slobrok_port);
+ slobroks.slobrok.push_back(std::move(slobrok));
+}
+
+void
+make_bucketspaces_config(BucketspacesConfigBuilder &bucketspaces)
+{
+ BucketspacesConfigBuilder::Documenttype bucket_space_map;
+ bucket_space_map.name = "test";
+ bucket_space_map.bucketspace = "default";
+ bucketspaces.documenttype.emplace_back(std::move(bucket_space_map));
+}
+
class MyPersistenceEngineOwner : public IPersistenceEngineOwner
{
void setClusterState(BucketSpace, const storage::spi::ClusterState &) override { }
@@ -245,6 +268,8 @@ class BMParams {
uint32_t _response_threads;
bool _enable_distributor;
bool _enable_service_layer;
+ bool _use_document_api;
+ bool _use_message_bus;
bool _use_storage_chain;
bool _use_legacy_bucket_db;
uint32_t get_start(uint32_t thread_id) const {
@@ -261,6 +286,8 @@ public:
_response_threads(2), // Same default as in stor-filestor.def
_enable_distributor(false),
_enable_service_layer(false),
+ _use_document_api(false),
+ _use_message_bus(false),
_use_storage_chain(false),
_use_legacy_bucket_db(false)
{
@@ -276,7 +303,8 @@ public:
uint32_t get_rpc_network_threads() const { return _rpc_network_threads; }
uint32_t get_response_threads() const { return _response_threads; }
bool get_enable_distributor() const { return _enable_distributor; }
- bool get_enable_service_layer() const { return _enable_service_layer || _enable_distributor; }
+ bool get_use_document_api() const { return _use_document_api; }
+ bool get_use_message_bus() const { return _use_message_bus; }
bool get_use_storage_chain() const { return _use_storage_chain; }
bool get_use_legacy_bucket_db() const { return _use_legacy_bucket_db; }
void set_documents(uint32_t documents_in) { _documents = documents_in; }
@@ -288,9 +316,14 @@ public:
void set_response_threads(uint32_t threads_in) { _response_threads = threads_in; }
void set_enable_distributor(bool enable_distributor_in) { _enable_distributor = enable_distributor_in; }
void set_enable_service_layer(bool enable_service_layer_in) { _enable_service_layer = enable_service_layer_in; }
+ void set_use_document_api(bool use_document_api_in) { _use_document_api = use_document_api_in;; }
+ void set_use_message_bus(bool use_message_bus_in) { _use_message_bus = use_message_bus_in; }
void set_use_storage_chain(bool use_storage_chain_in) { _use_storage_chain = use_storage_chain_in; }
void set_use_legacy_bucket_db(bool use_legacy_bucket_db_in) { _use_legacy_bucket_db = use_legacy_bucket_db_in; }
bool check() const;
+ bool needs_service_layer() const { return _enable_service_layer || _enable_distributor || _use_storage_chain || _use_message_bus || _use_document_api; }
+ bool needs_distributor() const { return _enable_distributor || _use_document_api; }
+ bool needs_message_bus() const { return _use_message_bus || _use_document_api; }
};
bool
@@ -426,17 +459,14 @@ struct MyStorageConfig
} else {
stor_server.rootFolder = "storage";
}
- {
- SlobroksConfigBuilder::Slobrok slobrok;
- slobrok.connectionspec = vespalib::make_string("tcp/localhost:%d", slobrok_port);
- slobroks.slobrok.push_back(std::move(slobrok));
- }
+ make_slobroks_config(slobroks, slobrok_port);
stor_communicationmanager.useDirectStorageapiRpc = true;
stor_communicationmanager.rpc.numNetworkThreads = params.get_rpc_network_threads();
stor_communicationmanager.mbusport = mbus_port;
stor_communicationmanager.rpcport = rpc_port;
stor_status.httpport = status_port;
+ make_bucketspaces_config(bucketspaces);
}
~MyStorageConfig();
@@ -524,11 +554,7 @@ struct MyRpcClientConfig {
: config_id(config_id_in),
slobroks()
{
- {
- SlobroksConfigBuilder::Slobrok slobrok;
- slobrok.connectionspec = vespalib::make_string("tcp/localhost:%d", slobrok_port);
- slobroks.slobrok.push_back(std::move(slobrok));
- }
+ make_slobroks_config(slobroks, slobrok_port);
}
~MyRpcClientConfig();
@@ -539,6 +565,28 @@ struct MyRpcClientConfig {
MyRpcClientConfig::~MyRpcClientConfig() = default;
+struct MyMessageBusConfig {
+ vespalib::string config_id;
+ SlobroksConfigBuilder slobroks;
+ MessagebusConfigBuilder messagebus;
+
+ MyMessageBusConfig(const vespalib::string &config_id_in, int slobrok_port)
+ : config_id(config_id_in),
+ slobroks(),
+ messagebus()
+ {
+ make_slobroks_config(slobroks, slobrok_port);
+ }
+ ~MyMessageBusConfig();
+
+ void add_builders(ConfigSet &set) {
+ set.addBuilder(config_id, &slobroks);
+ set.addBuilder(config_id, &messagebus);
+ }
+};
+
+MyMessageBusConfig::~MyMessageBusConfig() = default;
+
}
struct PersistenceProviderFixture {
@@ -576,6 +624,7 @@ struct PersistenceProviderFixture {
MyServiceLayerConfig _service_layer_config;
MyDistributorConfig _distributor_config;
MyRpcClientConfig _rpc_client_config;
+ MyMessageBusConfig _message_bus_config;
ConfigSet _config_set;
std::shared_ptr<IConfigContext> _config_context;
std::unique_ptr<IBmFeedHandler> _feed_handler;
@@ -585,6 +634,7 @@ struct PersistenceProviderFixture {
std::unique_ptr<SharedRpcResources> _rpc_client_shared_rpc_resources;
std::shared_ptr<BmStorageLinkContext> _distributor_chain_context;
std::unique_ptr<storage::DistributorProcess> _distributor;
+ std::unique_ptr<BmMessageBus> _message_bus;
PersistenceProviderFixture(const BMParams& params);
~PersistenceProviderFixture();
@@ -599,8 +649,10 @@ struct PersistenceProviderFixture {
void wait_slobrok(const vespalib::string &name);
void start_service_layer(const BMParams& params);
void start_distributor(const BMParams& params);
+ void start_message_bus();
void create_feed_handler(const BMParams& params);
void shutdown_feed_handler();
+ void shutdown_message_bus();
void shutdown_distributor();
void shutdown_service_layer();
};
@@ -640,6 +692,7 @@ PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params)
_service_layer_config("bm-servicelayer", *_document_types, _slobrok_port, _service_layer_mbus_port, _service_layer_rpc_port, _service_layer_status_port, params),
_distributor_config("bm-distributor", *_document_types, _slobrok_port, _distributor_mbus_port, _distributor_rpc_port, _distributor_status_port, params),
_rpc_client_config("bm-rpc-client", _slobrok_port),
+ _message_bus_config("bm-message-bus", _slobrok_port),
_config_set(),
_config_context(std::make_shared<ConfigContext>(_config_set)),
_feed_handler(),
@@ -648,7 +701,8 @@ PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params)
_service_layer(),
_rpc_client_shared_rpc_resources(),
_distributor_chain_context(),
- _distributor()
+ _distributor(),
+ _message_bus()
{
create_document_db();
_persistence_engine = std::make_unique<PersistenceEngine>(_persistence_owner, _write_filter, -1, false);
@@ -657,6 +711,7 @@ PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params)
_service_layer_config.add_builders(_config_set);
_distributor_config.add_builders(_config_set);
_rpc_client_config.add_builders(_config_set);
+ _message_bus_config.add_builders(_config_set);
_feed_handler = std::make_unique<SpiBmFeedHandler>(*_persistence_engine);
}
@@ -772,7 +827,7 @@ PersistenceProviderFixture::start_service_layer(const BMParams& params)
LOG(info, "start service layer");
config::ConfigUri config_uri("bm-servicelayer", _config_context);
std::unique_ptr<BmStorageChainBuilder> chain_builder;
- if (params.get_use_storage_chain() && !params.get_enable_distributor()) {
+ if (params.get_use_storage_chain() && !params.needs_distributor()) {
chain_builder = std::make_unique<BmStorageChainBuilder>();
_service_layer_chain_context = chain_builder->get_context();
}
@@ -797,7 +852,7 @@ PersistenceProviderFixture::start_distributor(const BMParams& params)
{
config::ConfigUri config_uri("bm-distributor", _config_context);
std::unique_ptr<BmStorageChainBuilder> chain_builder;
- if (params.get_use_storage_chain()) {
+ if (params.get_use_storage_chain() && !params.get_use_document_api()) {
chain_builder = std::make_unique<BmStorageChainBuilder>();
_distributor_chain_context = chain_builder->get_context();
}
@@ -816,24 +871,39 @@ PersistenceProviderFixture::start_distributor(const BMParams& params)
}
void
+PersistenceProviderFixture::start_message_bus()
+{
+ config::ConfigUri config_uri("bm-message-bus", _config_context);
+ LOG(info, "Starting message bus");
+ _message_bus = std::make_unique<BmMessageBus>(config_uri,
+ _repo,
+ documentapi::LoadTypeSet());
+ LOG(info, "Started message bus");
+}
+
+void
PersistenceProviderFixture::create_feed_handler(const BMParams& params)
{
StorageApiRpcService::Params rpc_params;
// This is the same compression config as the default in stor-communicationmanager.def.
rpc_params.compression_config = CompressionConfig(CompressionConfig::Type::LZ4, 3, 90, 1024);
- if (params.get_enable_distributor()) {
+ if (params.get_use_document_api()) {
+ _feed_handler = std::make_unique<DocumentApiMessageBusBmFeedHandler>(*_message_bus);
+ } else if (params.get_enable_distributor()) {
if (params.get_use_storage_chain()) {
assert(_distributor_chain_context);
_feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(_distributor_chain_context, true);
+ } else if (params.get_use_message_bus()) {
+ _feed_handler = std::make_unique<StorageApiMessageBusBmFeedHandler>(*_message_bus, true);
} else {
_feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo, rpc_params, true);
}
- return;
- }
- if (params.get_enable_service_layer()) {
+ } else if (params.needs_service_layer()) {
if (params.get_use_storage_chain()) {
assert(_service_layer_chain_context);
_feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(_service_layer_chain_context, false);
+ } else if (params.get_use_message_bus()) {
+ _feed_handler = std::make_unique<StorageApiMessageBusBmFeedHandler>(*_message_bus, false);
} else {
_feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo, rpc_params, false);
}
@@ -847,6 +917,15 @@ PersistenceProviderFixture::shutdown_feed_handler()
}
void
+PersistenceProviderFixture::shutdown_message_bus()
+{
+ if (_message_bus) {
+ LOG(info, "stop message bus");
+ _message_bus.reset();
+ }
+}
+
+void
PersistenceProviderFixture::shutdown_distributor()
{
if (_distributor) {
@@ -1130,12 +1209,15 @@ void benchmark_async_spi(const BMParams &bm_params)
if (!f._feed_handler->manages_buckets()) {
f.create_buckets();
}
- if (bm_params.get_enable_service_layer()) {
+ if (bm_params.needs_service_layer()) {
f.start_service_layer(bm_params);
}
- if (bm_params.get_enable_distributor()) {
+ if (bm_params.needs_distributor()) {
f.start_distributor(bm_params);
}
+ if (bm_params.needs_message_bus()) {
+ f.start_message_bus();
+ }
f.create_feed_handler(bm_params);
vespalib::ThreadStackExecutor executor(bm_params.get_client_threads(), 128 * 1024);
auto put_feed = make_feed(executor, bm_params, [&f](BMRange range, BucketSelector bucket_selector) { return make_put_feed(f, range, bucket_selector); }, f.num_buckets(), "put");
@@ -1149,6 +1231,7 @@ void benchmark_async_spi(const BMParams &bm_params)
LOG(info, "--------------------------------");
f.shutdown_feed_handler();
+ f.shutdown_message_bus();
f.shutdown_distributor();
f.shutdown_service_layer();
}
@@ -1189,6 +1272,8 @@ App::usage()
"[--response-threads threads]\n"
"[--enable-distributor]\n"
"[--enable-service-layer]\n"
+ "[--use-document-api]\n"
+ "[--use-message-bus\n"
"[--use-storage-chain]\n"
"[--use-legacy-bucket-db]" << std::endl;
}
@@ -1209,6 +1294,8 @@ App::get_options()
{ "response-threads", 1, nullptr, 0 },
{ "enable-distributor", 0, nullptr, 0 },
{ "enable-service-layer", 0, nullptr, 0 },
+ { "use-document-api", 0, nullptr, 0 },
+ { "use-message-bus", 0, nullptr, 0 },
{ "use-storage-chain", 0, nullptr, 0 },
{ "use-legacy-bucket-db", 0, nullptr, 0 }
};
@@ -1222,6 +1309,8 @@ App::get_options()
LONGOPT_RESPONSE_THREADS,
LONGOPT_ENABLE_DISTRIBUTOR,
LONGOPT_ENABLE_SERVICE_LAYER,
+ LONGOPT_USE_DOCUMENT_API,
+ LONGOPT_USE_MESSAGE_BUS,
LONGOPT_USE_STORAGE_CHAIN,
LONGOPT_USE_LEGACY_BUCKET_DB
};
@@ -1258,6 +1347,12 @@ App::get_options()
case LONGOPT_ENABLE_SERVICE_LAYER:
_bm_params.set_enable_service_layer(true);
break;
+ case LONGOPT_USE_DOCUMENT_API:
+ _bm_params.set_use_document_api(true);
+ break;
+ case LONGOPT_USE_MESSAGE_BUS:
+ _bm_params.set_use_message_bus(true);
+ break;
case LONGOPT_USE_STORAGE_CHAIN:
_bm_params.set_use_storage_chain(true);
break;
diff --git a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
index 7fb33dc242b..8228ceb5e79 100644
--- a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
+++ b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp
@@ -308,6 +308,7 @@ TEST_F(StorageApiRpcServiceTest, can_send_and_respond_to_request_end_to_end) {
TEST_F(StorageApiRpcServiceTest, send_to_unknown_address_bounces_with_error_reply) {
auto cmd = _node_0->create_dummy_put_command();
cmd->setAddress(non_existing_address());
+ cmd->getTrace().setLevel(9);
_node_0->send_request(cmd);
auto bounced_msg = _node_0->wait_and_receive_single_message();
@@ -323,6 +324,7 @@ TEST_F(StorageApiRpcServiceTest, send_to_unknown_address_bounces_with_error_repl
to_slobrok_id(non_existing_address()).c_str(), vespalib::HostName::get().c_str());
EXPECT_EQ(put_reply->getResult(), api::ReturnCode(expected_code, expected_msg));
+ EXPECT_THAT(put_reply->getTrace().toString(), HasSubstr("The service must be having problems"));
}
TEST_F(StorageApiRpcServiceTest, request_metadata_is_propagated_to_receiver) {
@@ -408,4 +410,17 @@ TEST_F(StorageApiRpcServiceTest, malformed_request_payload_returns_rpc_error) {
// TODO also test bad response header/payload
+TEST_F(StorageApiRpcServiceTest, trace_events_are_emitted_for_send_and_receive) {
+ auto recv_cmd = send_and_receive_put_command_at_node_1([](auto& cmd){
+ cmd.getTrace().setLevel(9);
+ });
+ auto recv_reply = respond_and_receive_put_reply_at_node_0(recv_cmd);
+ auto trace_str = recv_reply->getTrace().toString();
+ // Ordering of traced events matter, so we use a cheeky regex.
+ EXPECT_THAT(trace_str, ContainsRegex("Sending request from.+"
+ "Request received at.+"
+ "Sending response from.+"
+ "Response received at"));
+}
+
}
diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h
index 722d1fd3a81..740277218c3 100644
--- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h
+++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h
@@ -48,6 +48,7 @@ public:
// Hostname of host node is running on.
[[nodiscard]] const vespalib::string& hostname() const noexcept { return _hostname; }
+ [[nodiscard]] const vespalib::string handle() const noexcept { return _handle; }
const RpcTargetFactory& target_factory() const;
private:
diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
index a8cdc0bfeea..8b5c7706510 100644
--- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
+++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp
@@ -16,12 +16,14 @@
#include <vespa/vespalib/data/databuffer.h>
#include <vespa/vespalib/util/compressor.h>
#include <vespa/vespalib/util/stringfmt.h>
+#include <vespa/vespalib/trace/tracelevel.h>
#include <cassert>
#include <vespa/log/log.h>
LOG_SETUP(".storage.storage_api_rpc_service");
using vespalib::compression::CompressionConfig;
+using vespalib::TraceLevel;
namespace storage::rpc {
@@ -177,16 +179,28 @@ void StorageApiRpcService::RPC_rpc_v1_send(FRT_RPCRequest* req) {
scmd->getTrace().setLevel(hdr.trace_level());
scmd->setTimeout(std::chrono::milliseconds(hdr.time_remaining_ms()));
req->DiscardBlobs();
+ if (scmd->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
+ scmd->getTrace().trace(TraceLevel::SEND_RECEIVE,
+ vespalib::make_string("Request received at '%s' (tcp/%s:%d) with %u bytes of payload",
+ _rpc_resources.handle().c_str(),
+ _rpc_resources.hostname().c_str(),
+ _rpc_resources.listen_port(),
+ uncompressed_size));
+ }
detach_and_forward_to_enqueuer(std::move(scmd), req);
} else {
req->SetError(FRTE_RPC_METHOD_FAILED, "Unable to decode RPC request payload");
}
}
-void StorageApiRpcService::encode_rpc_v1_response(FRT_RPCRequest& request, const api::StorageReply& reply) {
+void StorageApiRpcService::encode_rpc_v1_response(FRT_RPCRequest& request, api::StorageReply& reply) {
LOG(spam, "Server: encoding rpc.v1 response header and payload");
auto* ret = request.GetReturn();
+ if (reply.getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
+ reply.getTrace().trace(TraceLevel::SEND_RECEIVE,
+ vespalib::make_string("Sending response from '%s'", _rpc_resources.handle().c_str()));
+ }
// TODO skip encoding header altogether if no relevant fields set?
protobuf::ResponseHeader hdr;
if (reply.getTrace().getLevel() > 0) {
@@ -206,11 +220,21 @@ void StorageApiRpcService::send_rpc_v1_request(std::shared_ptr<api::StorageComma
if (!target) {
auto reply = cmd->makeReply();
reply->setResult(make_no_address_for_service_error(*cmd->getAddress()));
+ if (reply->getTrace().shouldTrace(TraceLevel::ERROR)) {
+ reply->getTrace().trace(TraceLevel::ERROR, reply->getResult().getMessage());
+ }
// Always dispatch async for synchronously generated replies, or we risk nuking the
// stack if the reply receiver keeps resending synchronously as well.
_message_dispatcher.dispatch_async(std::move(reply));
return;
}
+ if (cmd->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
+ cmd->getTrace().trace(TraceLevel::SEND_RECEIVE,
+ vespalib::make_string("Sending request from '%s' to '%s' (%s) with timeout of %g seconds",
+ _rpc_resources.handle().c_str(),
+ CachingRpcTargetResolver::address_to_slobrok_id(*cmd->getAddress()).c_str(),
+ target->_spec.c_str(), vespalib::to_s(cmd->getTimeout())));
+ }
std::unique_ptr<FRT_RPCRequest, SubRefDeleter> req(_rpc_resources.supervisor().AllocRPCRequest());
req->SetMethodName(rpc_v1_method_name());
@@ -233,12 +257,12 @@ void StorageApiRpcService::send_rpc_v1_request(std::shared_ptr<api::StorageComma
void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) {
std::unique_ptr<FRT_RPCRequest, SubRefDeleter> req(raw_req);
auto* req_ctx = static_cast<RpcRequestContext*>(req->GetContext()._value.VOIDP);
+ auto& cmd = *req_ctx->_originator_cmd;
if (!req->CheckReturnTypes("bixbix")) {
handle_request_done_rpc_error(*req, *req_ctx);
return;
}
LOG(spam, "Client: received rpc.v1 OK response");
-
const auto& ret = *req->GetReturn();
protobuf::ResponseHeader hdr;
if (!decode_header_from_rpc_params(ret, hdr)) {
@@ -246,8 +270,10 @@ void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) {
return;
}
std::unique_ptr<mbusprot::StorageReply> wrapped_reply;
- bool ok = uncompress_rpc_payload(ret, [&wrapped_reply, req_ctx](auto& codec, auto payload) {
+ uint32_t uncompressed_size = 0;
+ bool ok = uncompress_rpc_payload(ret, [&wrapped_reply, &uncompressed_size, req_ctx](auto& codec, auto payload) {
wrapped_reply = codec.decodeReply(payload, *req_ctx->_originator_cmd);
+ uncompressed_size = payload.size();
});
if (!ok) {
assert(!wrapped_reply);
@@ -259,9 +285,16 @@ void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) {
assert(reply);
if (!hdr.trace_payload().empty()) {
- req_ctx->_originator_cmd->getTrace().getRoot().addChild(mbus::TraceNode::decode(hdr.trace_payload()));
+ cmd.getTrace().getRoot().addChild(mbus::TraceNode::decode(hdr.trace_payload()));
+ }
+ if (cmd.getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) {
+ cmd.getTrace().trace(TraceLevel::SEND_RECEIVE,
+ vespalib::make_string("Response received at '%s' with %u bytes of payload",
+ _rpc_resources.handle().c_str(),
+ uncompressed_size));
}
- reply->getTrace().swap(req_ctx->_originator_cmd->getTrace());
+ reply->getTrace().swap(cmd.getTrace());
+ reply->setApproxByteSize(uncompressed_size);
// TODO ensure that no implicit long-lived refs end up pointing into RPC memory...!
req->DiscardBlobs();
@@ -291,10 +324,13 @@ void StorageApiRpcService::handle_request_done_decode_error(const RpcRequestCont
void StorageApiRpcService::create_and_dispatch_error_reply(api::StorageCommand& cmd, api::ReturnCode error) {
auto error_reply = cmd.makeReply();
- LOG(debug, "Client: rpc.v1 failed decode from %s: '%s'",
+ LOG(debug, "Client: rpc.v1 failed for target '%s': '%s'",
cmd.getAddress()->toString().c_str(), error.toString().c_str());
+ error_reply->getTrace().swap(cmd.getTrace());
+ if (error_reply->getTrace().shouldTrace(TraceLevel::ERROR)) {
+ error_reply->getTrace().trace(TraceLevel::ERROR, error.getMessage());
+ }
error_reply->setResult(std::move(error));
- // TODO needs tracing of received-event!
_message_dispatcher.dispatch_sync(std::move(error_reply));
}
@@ -355,9 +391,6 @@ bool StorageApiRpcService::target_supports_direct_rpc(
/*
* Major TODOs:
- * - tracing and trace propagation
- * - forwards/backwards compatibility
- * - what to remap bounced Not Found errors to internally?
* - lifetime semantics of FRT targets vs requests created from them?
* - lifetime of document type/fieldset repos vs messages
* - is repo ref squirreled away into the messages anywhere?
diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
index 39508e51841..cb2344ccd13 100644
--- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
+++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h
@@ -58,14 +58,13 @@ public:
[[nodiscard]] bool target_supports_direct_rpc(const api::StorageMessageAddress& addr) const noexcept;
void RPC_rpc_v1_send(FRT_RPCRequest* req);
- void encode_rpc_v1_response(FRT_RPCRequest& request, const api::StorageReply& reply);
+ void encode_rpc_v1_response(FRT_RPCRequest& request, api::StorageReply& reply);
void send_rpc_v1_request(std::shared_ptr<api::StorageCommand> cmd);
static constexpr const char* rpc_v1_method_name() noexcept {
return "storageapi.v1.send";
}
private:
- // TODO dedupe
void detach_and_forward_to_enqueuer(std::shared_ptr<api::StorageMessage> cmd, FRT_RPCRequest* req);
struct RpcRequestContext {