diff options
author | Håvard Pettersen <havardpe@oath.com> | 2020-10-02 13:11:53 +0000 |
---|---|---|
committer | Håvard Pettersen <havardpe@oath.com> | 2020-10-02 13:11:53 +0000 |
commit | 8c8c529e6fea1d0e8869dfb74881ca15ca9a89a9 (patch) | |
tree | e84e06c8d3ac29cc829511bbf3dc93576d4596df | |
parent | 625e97105d691c417de78826c3f947926215147e (diff) | |
parent | c67283d48af378f25cf1bd3ed8e578d0e529813f (diff) |
Merge branch 'master' into havardpe/generic-reduce
fixed Conflicts:
eval/CMakeLists.txt
eval/src/vespa/eval/instruction/CMakeLists.txt
37 files changed, 1169 insertions, 349 deletions
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java index 5c910983bc9..1587e2696b8 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java @@ -54,7 +54,7 @@ import static org.junit.Assert.fail; */ public abstract class FleetControllerTest implements Waiter { - private static Logger log = Logger.getLogger(FleetControllerTest.class.getName()); + private static final Logger log = Logger.getLogger(FleetControllerTest.class.getName()); private static final int DEFAULT_NODE_COUNT = 10; Supervisor supervisor; @@ -342,10 +342,6 @@ public abstract class FleetControllerTest implements Waiter { fleetController.waitForCompleteCycle(timeoutMS); } - protected void verifyNodeEvents(Node n, String exp) { - verifyNodeEvents(n, exp, null); - } - private static class ExpectLine { Pattern regex; int matchedCount = 0; @@ -390,17 +386,15 @@ public abstract class FleetControllerTest implements Waiter { * <li>The rest of the line is a regular expression. * </ul> */ - private void verifyNodeEvents(Node n, String exp, String ignoreRegex) { - Pattern ignorePattern = (ignoreRegex == null ? null : Pattern.compile(ignoreRegex)); + protected void verifyNodeEvents(Node n, String exp) { List<NodeEvent> events = fleetController.getNodeEvents(n); String[] expectLines = exp.split("\n"); - List<ExpectLine> expected = new ArrayList<ExpectLine>(); + List<ExpectLine> expected = new ArrayList<>(); for (String line : expectLines) { expected.add(new ExpectLine(line)); } boolean mismatch = false; - StringBuilder eventLog = new StringBuilder(); StringBuilder errors = new StringBuilder(); int gotno = 0; @@ -413,16 +407,10 @@ public abstract class FleetControllerTest implements Waiter { eventLine = e.toString(); } - if (ignorePattern != null && ignorePattern.matcher(eventLine).matches()) { - ++gotno; - continue; - } - ExpectLine pattern = null; if (expno < expected.size()) { pattern = expected.get(expno); } - eventLog.append(eventLine).append("\n"); if (pattern == null) { errors.append("Exhausted expected list before matching event " + gotno @@ -456,9 +444,6 @@ public abstract class FleetControllerTest implements Waiter { StringBuilder eventsGotten = new StringBuilder(); for (Event e : events) { String eventLine = e.toString(); - if (ignorePattern != null && ignorePattern.matcher(eventLine).matches()) { - continue; - } eventsGotten.append(eventLine).append("\n"); } errors.append("\nExpected events matching:\n" + exp + "\n"); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java index 557765ca761..d14f6701288 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java @@ -16,6 +16,7 @@ import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestRule; +import org.junit.rules.Timeout; import java.util.ArrayList; import java.util.List; @@ -31,14 +32,17 @@ import static org.junit.Assert.assertTrue; public class MasterElectionTest extends FleetControllerTest { - private static Logger log = Logger.getLogger(MasterElectionTest.class.getName()); + private static final Logger log = Logger.getLogger(MasterElectionTest.class.getName()); private Supervisor supervisor; - private List<FleetController> fleetControllers = new ArrayList<>(); + private final List<FleetController> fleetControllers = new ArrayList<>(); @Rule public TestRule cleanupZookeeperLogsOnSuccess = new CleanupZookeeperLogsOnSuccess(); + @Rule + public Timeout globalTimeout= Timeout.seconds(120); + private static int defaultZkSessionTimeoutInMillis() { return 30_000; } protected void setUpFleetController(int count, boolean useFakeTimer, FleetControllerOptions options) throws Exception { @@ -74,12 +78,12 @@ public class MasterElectionTest extends FleetControllerTest { private void waitForZookeeperDisconnected() throws TimeoutException { long maxTime = System.currentTimeMillis() + timeoutMS; - for(FleetController f : fleetControllers) { - while (true) { - if (!f.hasZookeeperConnection()) break; + for (FleetController f : fleetControllers) { + while (f.hasZookeeperConnection()) { timer.advanceTime(1000); - try{ Thread.sleep(1); } catch (InterruptedException e) {} - if (System.currentTimeMillis() > maxTime) throw new TimeoutException("Failed to notice zookeeper down within timeout of " + timeoutMS + " ms"); + try { Thread.sleep(1); } catch (InterruptedException e) {} + if (System.currentTimeMillis() > maxTime) + throw new TimeoutException("Failed to notice zookeeper down within timeout of " + timeoutMS + " ms"); } } waitForCompleteCycles(); @@ -226,24 +230,20 @@ public class MasterElectionTest extends FleetControllerTest { startingTest("MasterElectionTest::testClusterStateVersionIncreasesAcrossMasterElections"); FleetControllerOptions options = defaultOptions("mycluster"); options.masterZooKeeperCooldownPeriod = 1; - setUpFleetController(5, false, options); + setUpFleetController(3, false, options); // Currently need to have content nodes present for the cluster controller to even bother // attempting to persisting its cluster state version to ZK. setUpVdsNodes(false, new DummyVdsNodeOptions()); fleetController = fleetControllers.get(0); // Required to prevent waitForStableSystem from NPE'ing waitForStableSystem(); waitForMaster(0); - Stream.of(0, 1, 2, 3, 4).forEach(this::waitForCompleteCycle); + Stream.of(0, 1, 2).forEach(this::waitForCompleteCycle); StrictlyIncreasingVersionChecker checker = StrictlyIncreasingVersionChecker.bootstrappedWith( fleetControllers.get(0).getClusterState()); fleetControllers.get(0).shutdown(); waitForMaster(1); - Stream.of(1, 2, 3, 4).forEach(this::waitForCompleteCycle); + Stream.of(1, 2).forEach(this::waitForCompleteCycle); checker.updateAndVerify(fleetControllers.get(1).getClusterState()); - fleetControllers.get(1).shutdown(); - waitForMaster(2); // Still a quorum available - Stream.of(2, 3, 4).forEach(this::waitForCompleteCycle); - checker.updateAndVerify(fleetControllers.get(2).getClusterState()); } @Test @@ -275,7 +275,7 @@ public class MasterElectionTest extends FleetControllerTest { FleetControllerOptions options = defaultOptions("mycluster"); options.masterZooKeeperCooldownPeriod = 100; options.zooKeeperServerAddress = "localhost"; - setUpFleetController(5, false, options); + setUpFleetController(3, false, options); waitForMaster(0); log.log(Level.INFO, "STOPPING ZOOKEEPER SERVER AT " + zooKeeperServer.getAddress()); @@ -329,12 +329,21 @@ public class MasterElectionTest extends FleetControllerTest { long endTime = System.currentTimeMillis() + timeoutMS; while (System.currentTimeMillis() < endTime) { boolean allOk = true; - for (int i=0; i<nodes.length; ++i) { + for (int node : nodes) { Request req = new Request("getMaster"); - connections.get(nodes[i]).invokeSync(req, FleetControllerTest.timeoutS); - if (req.isError()) { allOk = false; break; } - if (master != null && master != req.returnValues().get(0).asInt32()) { allOk = false; break; } - if (reason != null && !reason.equals(req.returnValues().get(1).asString())) { allOk = false; break; } + connections.get(node).invokeSync(req, FleetControllerTest.timeoutS); + if (req.isError()) { + allOk = false; + break; + } + if (master != null && master != req.returnValues().get(0).asInt32()) { + allOk = false; + break; + } + if (reason != null && ! reason.equals(req.returnValues().get(1).asString())) { + allOk = false; + break; + } } if (allOk) return; try{ Thread.sleep(100); } catch (InterruptedException e) {} @@ -354,7 +363,7 @@ public class MasterElectionTest extends FleetControllerTest { waitForMaster(0); supervisor = new Supervisor(new Transport()); - List<Target> connections = new ArrayList<Target>(); + List<Target> connections = new ArrayList<>(); for (FleetController fleetController : fleetControllers) { int rpcPort = fleetController.getRpcPort(); Target connection = supervisor.connect(new Spec("localhost", rpcPort)); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperTestServer.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperTestServer.java index 73b4163d542..7861db249bd 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperTestServer.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ZooKeeperTestServer.java @@ -14,10 +14,10 @@ import java.time.Duration; * This class sets up a zookeeper server, such that we can test fleetcontroller zookeeper parts without stubbing in the client. */ public class ZooKeeperTestServer { - private File zooKeeperDir; - private ZooKeeperServer server; + private final File zooKeeperDir; + private final ZooKeeperServer server; private static final Duration tickTime = Duration.ofMillis(2000); - private NIOServerCnxnFactory factory; + private final NIOServerCnxnFactory factory; private static final String DIR_PREFIX = "test_fltctrl_zk"; private static final String DIR_POSTFIX = "sdir"; @@ -39,7 +39,7 @@ public class ZooKeeperTestServer { try{ factory.startup(server); } catch (InterruptedException e) { - throw (RuntimeException) new IllegalStateException("Interrupted during test startup: ").initCause(e); + throw new IllegalStateException("Interrupted during test startup: ", e); } } diff --git a/eval/CMakeLists.txt b/eval/CMakeLists.txt index 2c6a05c7391..b8cc77b8290 100644 --- a/eval/CMakeLists.txt +++ b/eval/CMakeLists.txt @@ -35,6 +35,7 @@ vespa_define_module( src/tests/gp/ponder_nov2017 src/tests/instruction/generic_join src/tests/instruction/generic_reduce + src/tests/instruction/generic_merge src/tests/instruction/generic_rename src/tests/tensor/default_value_builder_factory src/tests/tensor/dense_add_dimension_optimizer diff --git a/eval/src/tests/instruction/generic_merge/CMakeLists.txt b/eval/src/tests/instruction/generic_merge/CMakeLists.txt new file mode 100644 index 00000000000..154b04cb32f --- /dev/null +++ b/eval/src/tests/instruction/generic_merge/CMakeLists.txt @@ -0,0 +1,9 @@ +# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(eval_generic_merge_test_app TEST + SOURCES + generic_merge_test.cpp + DEPENDS + vespaeval + GTest::GTest +) +vespa_add_test(NAME eval_generic_merge_test_app COMMAND eval_generic_merge_test_app) diff --git a/eval/src/tests/instruction/generic_merge/generic_merge_test.cpp b/eval/src/tests/instruction/generic_merge/generic_merge_test.cpp new file mode 100644 index 00000000000..501d5410b87 --- /dev/null +++ b/eval/src/tests/instruction/generic_merge/generic_merge_test.cpp @@ -0,0 +1,82 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/eval/eval/simple_value.h> +#include <vespa/eval/eval/value_codec.h> +#include <vespa/eval/instruction/generic_merge.h> +#include <vespa/eval/eval/interpreted_function.h> +#include <vespa/eval/eval/test/tensor_model.hpp> +#include <vespa/vespalib/util/stringfmt.h> +#include <vespa/vespalib/gtest/gtest.h> +#include <optional> + +using namespace vespalib; +using namespace vespalib::eval; +using namespace vespalib::eval::instruction; +using namespace vespalib::eval::test; + +using vespalib::make_string_short::fmt; + +std::vector<Layout> merge_layouts = { + {}, {}, + {x(5)}, {x(5)}, + {x(3),y(5)}, {x(3),y(5)}, + float_cells({x(3),y(5)}), {x(3),y(5)}, + {x(3),y(5)}, float_cells({x(3),y(5)}), + {x({"a","b","c"})}, {x({"a","b","c"})}, + {x({"a","b","c"})}, {x({"c","d","e"})}, + {x({"a","c","e"})}, {x({"b","c","d"})}, + {x({"b","c","d"})}, {x({"a","c","e"})}, + {x({"a","b","c"})}, {x({"c","d"})}, + {x({"a","b"}),y({"foo","bar","baz"})}, {x({"b","c"}),y({"any","foo","bar"})}, + {x(3),y({"foo", "bar"})}, {x(3),y({"baz", "bar"})}, + {x({"a","b","c"}),y(5)}, {x({"b","c","d"}),y(5)} +}; + + +TensorSpec reference_merge(const TensorSpec &a, const TensorSpec &b, join_fun_t fun) { + ValueType res_type = ValueType::merge(ValueType::from_spec(a.type()), + ValueType::from_spec(b.type())); + EXPECT_FALSE(res_type.is_error()); + TensorSpec result(res_type.to_spec()); + for (const auto &cell: a.cells()) { + auto other = b.cells().find(cell.first); + if (other == b.cells().end()) { + result.add(cell.first, cell.second); + } else { + result.add(cell.first, fun(cell.second, other->second)); + } + } + for (const auto &cell: b.cells()) { + auto other = a.cells().find(cell.first); + if (other == a.cells().end()) { + result.add(cell.first, cell.second); + } + } + return result; +} + +TensorSpec perform_generic_merge(const TensorSpec &a, const TensorSpec &b, join_fun_t fun) { + Stash stash; + const auto &factory = SimpleValueBuilderFactory::get(); + auto lhs = value_from_spec(a, factory); + auto rhs = value_from_spec(b, factory); + auto my_op = GenericMerge::make_instruction(lhs->type(), rhs->type(), fun, factory, stash); + InterpretedFunction::EvalSingle single(my_op); + return spec_from_value(single.eval(std::vector<Value::CREF>({*lhs, *rhs}))); +} + +TEST(GenericMergeTest, generic_merge_works_for_simple_values) { + ASSERT_TRUE((merge_layouts.size() % 2) == 0); + for (size_t i = 0; i < merge_layouts.size(); i += 2) { + TensorSpec lhs = spec(merge_layouts[i], N()); + TensorSpec rhs = spec(merge_layouts[i + 1], Div16(N())); + SCOPED_TRACE(fmt("\n===\nLHS: %s\nRHS: %s\n===\n", lhs.to_string().c_str(), rhs.to_string().c_str())); + for (auto fun: {operation::Add::f, operation::Mul::f, operation::Sub::f, operation::Max::f}) { + auto expect = reference_merge(lhs, rhs, fun); + auto actual = perform_generic_merge(lhs, rhs, fun); + EXPECT_EQ(actual, expect); + } + } +} + +GTEST_MAIN_RUN_ALL_TESTS() diff --git a/eval/src/vespa/eval/eval/test/tensor_model.hpp b/eval/src/vespa/eval/eval/test/tensor_model.hpp index 42f0dc7e996..4e4ef60aaee 100644 --- a/eval/src/vespa/eval/eval/test/tensor_model.hpp +++ b/eval/src/vespa/eval/eval/test/tensor_model.hpp @@ -38,7 +38,7 @@ struct Div10 : Sequence { double operator[](size_t i) const override { return (seq[i] / 10.0); } }; -// Sequence of another sequence divided by 10 +// Sequence of another sequence divided by 16 struct Div16 : Sequence { const Sequence &seq; Div16(const Sequence &seq_in) : seq(seq_in) {} diff --git a/eval/src/vespa/eval/instruction/CMakeLists.txt b/eval/src/vespa/eval/instruction/CMakeLists.txt index c84d773af11..91ff4fd63dc 100644 --- a/eval/src/vespa/eval/instruction/CMakeLists.txt +++ b/eval/src/vespa/eval/instruction/CMakeLists.txt @@ -4,5 +4,6 @@ vespa_add_library(eval_instruction OBJECT SOURCES generic_join generic_reduce + generic_merge generic_rename ) diff --git a/eval/src/vespa/eval/instruction/generic_merge.cpp b/eval/src/vespa/eval/instruction/generic_merge.cpp new file mode 100644 index 00000000000..9d8ac2bb80a --- /dev/null +++ b/eval/src/vespa/eval/instruction/generic_merge.cpp @@ -0,0 +1,147 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "generic_merge.h" +#include <vespa/eval/eval/inline_operation.h> +#include <vespa/vespalib/util/stash.h> +#include <vespa/vespalib/util/typify.h> +#include <cassert> + +namespace vespalib::eval::instruction { + +using State = InterpretedFunction::State; +using Instruction = InterpretedFunction::Instruction; + +namespace { + +//----------------------------------------------------------------------------- + +template <typename T, typename IN> uint64_t wrap_param(const IN &value_in) { + const T &value = value_in; + static_assert(sizeof(uint64_t) == sizeof(&value)); + return (uint64_t)&value; +} + +template <typename T> const T &unwrap_param(uint64_t param) { + return *((const T *)param); +} + +struct MergeParam { + const ValueType res_type; + const join_fun_t function; + const size_t num_mapped_dimensions; + const size_t dense_subspace_size; + std::vector<size_t> all_view_dims; + const ValueBuilderFactory &factory; + MergeParam(const ValueType &lhs_type, const ValueType &rhs_type, + join_fun_t function_in, const ValueBuilderFactory &factory_in) + : res_type(ValueType::join(lhs_type, rhs_type)), + function(function_in), + num_mapped_dimensions(lhs_type.count_mapped_dimensions()), + dense_subspace_size(lhs_type.dense_subspace_size()), + all_view_dims(num_mapped_dimensions), + factory(factory_in) + { + assert(!res_type.is_error()); + assert(num_mapped_dimensions == rhs_type.count_mapped_dimensions()); + assert(num_mapped_dimensions == res_type.count_mapped_dimensions()); + assert(dense_subspace_size == rhs_type.dense_subspace_size()); + assert(dense_subspace_size == res_type.dense_subspace_size()); + for (size_t i = 0; i < num_mapped_dimensions; ++i) { + all_view_dims[i] = i; + } + } + ~MergeParam(); +}; +MergeParam::~MergeParam() = default; + +//----------------------------------------------------------------------------- + +template <typename LCT, typename RCT, typename OCT, typename Fun> +std::unique_ptr<Value> +generic_mixed_merge(const Value &a, const Value &b, + const MergeParam ¶ms) +{ + Fun fun(params.function); + auto lhs_cells = a.cells().typify<LCT>(); + auto rhs_cells = b.cells().typify<RCT>(); + const size_t num_mapped = params.num_mapped_dimensions; + const size_t subspace_size = params.dense_subspace_size; + size_t guess_subspaces = std::max(a.index().size(), b.index().size()); + auto builder = params.factory.create_value_builder<OCT>(params.res_type, num_mapped, subspace_size, guess_subspaces); + std::vector<vespalib::stringref> address(num_mapped); + std::vector<const vespalib::stringref *> addr_cref; + std::vector<vespalib::stringref *> addr_ref; + for (auto & ref : address) { + addr_cref.push_back(&ref); + addr_ref.push_back(&ref); + } + size_t lhs_subspace; + size_t rhs_subspace; + auto inner = b.index().create_view(params.all_view_dims); + auto outer = a.index().create_view({}); + outer->lookup({}); + while (outer->next_result(addr_ref, lhs_subspace)) { + OCT *dst = builder->add_subspace(address).begin(); + inner->lookup(addr_cref); + if (inner->next_result({}, rhs_subspace)) { + const LCT *lhs_src = &lhs_cells[lhs_subspace * subspace_size]; + const RCT *rhs_src = &rhs_cells[rhs_subspace * subspace_size]; + for (size_t i = 0; i < subspace_size; ++i) { + *dst++ = fun(*lhs_src++, *rhs_src++); + } + } else { + const LCT *src = &lhs_cells[lhs_subspace * subspace_size]; + for (size_t i = 0; i < subspace_size; ++i) { + *dst++ = *src++; + } + } + } + inner = a.index().create_view(params.all_view_dims); + outer = b.index().create_view({}); + outer->lookup({}); + while (outer->next_result(addr_ref, rhs_subspace)) { + inner->lookup(addr_cref); + if (! inner->next_result({}, lhs_subspace)) { + OCT *dst = builder->add_subspace(address).begin(); + const RCT *src = &rhs_cells[rhs_subspace * subspace_size]; + for (size_t i = 0; i < subspace_size; ++i) { + *dst++ = *src++; + } + } + } + return builder->build(std::move(builder)); +} + +template <typename LCT, typename RCT, typename OCT, typename Fun> +void my_mixed_merge_op(State &state, uint64_t param_in) { + const auto ¶m = unwrap_param<MergeParam>(param_in); + const Value &lhs = state.peek(1); + const Value &rhs = state.peek(0); + auto up = generic_mixed_merge<LCT, RCT, OCT, Fun>(lhs, rhs, param); + auto &result = state.stash.create<std::unique_ptr<Value>>(std::move(up)); + const Value &result_ref = *(result.get()); + state.pop_pop_push(result_ref); +}; + +struct SelectGenericMergeOp { + template <typename LCT, typename RCT, typename OCT, typename Fun> static auto invoke() { + return my_mixed_merge_op<LCT,RCT,OCT,Fun>; + } +}; + +//----------------------------------------------------------------------------- + +} // namespace <unnamed> + +using MergeTypify = TypifyValue<TypifyCellType,operation::TypifyOp2>; + +Instruction +GenericMerge::make_instruction(const ValueType &lhs_type, const ValueType &rhs_type, join_fun_t function, + const ValueBuilderFactory &factory, Stash &stash) +{ + const auto ¶m = stash.create<MergeParam>(lhs_type, rhs_type, function, factory); + auto fun = typify_invoke<4,MergeTypify,SelectGenericMergeOp>(lhs_type.cell_type(), rhs_type.cell_type(), param.res_type.cell_type(), function); + return Instruction(fun, wrap_param<MergeParam>(param)); +} + +} // namespace diff --git a/eval/src/vespa/eval/instruction/generic_merge.h b/eval/src/vespa/eval/instruction/generic_merge.h new file mode 100644 index 00000000000..02e2d18715a --- /dev/null +++ b/eval/src/vespa/eval/instruction/generic_merge.h @@ -0,0 +1,15 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "generic_join.h" + +namespace vespalib::eval::instruction { + +struct GenericMerge { + static InterpretedFunction::Instruction + make_instruction(const ValueType &lhs_type, const ValueType &rhs_type, join_fun_t function, + const ValueBuilderFactory &factory, Stash &stash); +}; + +} // namespace diff --git a/eval/src/vespa/eval/tensor/serialization/sparse_binary_format.cpp b/eval/src/vespa/eval/tensor/serialization/sparse_binary_format.cpp index c246ccf16e1..a0b691872a0 100644 --- a/eval/src/vespa/eval/tensor/serialization/sparse_binary_format.cpp +++ b/eval/src/vespa/eval/tensor/serialization/sparse_binary_format.cpp @@ -131,6 +131,18 @@ SparseBinaryFormat::serialize(nbostream &stream, const Tensor &tensor) stream.write(cells.peek(), cells.size()); } +struct BuildSparseCells { + template<typename CT> + static auto invoke(ValueType type, nbostream &stream, + size_t dimensionsSize, + size_t cellsSize) + { + DirectSparseTensorBuilder<CT> builder(std::move(type)); + decodeCells<CT>(stream, dimensionsSize, cellsSize, builder); + return builder.build(); + } +}; + std::unique_ptr<Tensor> SparseBinaryFormat::deserialize(nbostream &stream, CellType cell_type) { @@ -143,19 +155,8 @@ SparseBinaryFormat::deserialize(nbostream &stream, CellType cell_type) } size_t cellsSize = stream.getInt1_4Bytes(); ValueType type = ValueType::tensor_type(std::move(dimensions), cell_type); - switch (cell_type) { - case CellType::DOUBLE: { - DirectSparseTensorBuilder<double> builder(type); - builder.reserve(cellsSize); - decodeCells<double>(stream, dimensionsSize, cellsSize, builder); - return builder.build(); } - case CellType::FLOAT: { - DirectSparseTensorBuilder<float> builder(type); - builder.reserve(cellsSize); - decodeCells<float>(stream, dimensionsSize, cellsSize, builder); - return builder.build(); } - } - abort(); + return typify_invoke<1,eval::TypifyCellType,BuildSparseCells>(cell_type, + std::move(type), stream, dimensionsSize, cellsSize); } } // namespace diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/Activator.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/Activator.java index 7158ccc57e3..e36d5fa4075 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/Activator.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/Activator.java @@ -20,7 +20,6 @@ import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; -import java.util.logging.Logger; import java.util.stream.Collectors; /** @@ -30,8 +29,6 @@ import java.util.stream.Collectors; */ class Activator { - private static final Logger logger = Logger.getLogger(Activator.class.getName()); - private final NodeRepository nodeRepository; private final Optional<LoadBalancerProvisioner> loadBalancerProvisioner; diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodePrioritizer.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodePrioritizer.java index 9925ba65324..b6b05949082 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodePrioritizer.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodePrioritizer.java @@ -136,6 +136,7 @@ public class NodePrioritizer { .filter(node -> legalStates.contains(node.state())) .filter(node -> node.allocation().isPresent()) .filter(node -> node.allocation().get().owner().equals(application)) + .filter(node -> node.state() == Node.State.active || canStillAllocateToParentOf(node)) .map(node -> candidateFrom(node, false)) .forEach(candidate -> nodes.add(candidate)); } @@ -177,6 +178,19 @@ public class NodePrioritizer { return requestedNodes.fulfilledBy(nofNodesInCluster - nodeFailedNodes); } + /** + * We may regret that a non-active node is allocated to a host and not offer it to the application + * now, e.g if we want to retire the host. + * + * @return true if we still want to allocate the given node to its parent + */ + private boolean canStillAllocateToParentOf(Node node) { + if (node.parentHostname().isEmpty()) return true; + Optional<Node> parent = node.parentHostname().flatMap(nodeRepository::getNode); + if (parent.isEmpty()) return false; + return nodeRepository.canAllocateTenantNodeTo(parent.get()); + } + private static NodeResources resources(NodeSpec requestedNodes) { if ( ! (requestedNodes instanceof NodeSpec.CountNodeSpec)) return null; return requestedNodes.resources().get(); diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/DockerProvisioningTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/DockerProvisioningTest.java index d2a5e06469a..5abe7134aa0 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/DockerProvisioningTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/DockerProvisioningTest.java @@ -46,10 +46,8 @@ public class DockerProvisioningTest { @Test public void docker_application_deployment() { ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build(); - ApplicationId application1 = ProvisioningTester.makeApplicationId(); - - for (int i = 1; i < 10; i++) - tester.makeReadyVirtualDockerNodes(1, dockerResources, "dockerHost" + i); + tester.makeReadyHosts(10, dockerResources).activateTenantHosts(); + ApplicationId application1 = ProvisioningTester.makeApplicationId("app1"); Version wantedVespaVersion = Version.fromString("6.39"); int nodeCount = 7; @@ -86,13 +84,12 @@ public class DockerProvisioningTest { ApplicationId application1 = ProvisioningTester.makeApplicationId(); Version wantedVespaVersion = Version.fromString("6.39"); int nodeCount = 7; - List<HostSpec> nodes = tester.prepare(application1, - ClusterSpec.request(ClusterSpec.Type.content, ClusterSpec.Id.from("myContent")).vespaVersion(wantedVespaVersion).build(), - nodeCount, 1, dockerResources); try { - tester.activate(application1, new HashSet<>(nodes)); + List<HostSpec> nodes = tester.prepare(application1, + ClusterSpec.request(ClusterSpec.Type.content, ClusterSpec.Id.from("myContent")).vespaVersion(wantedVespaVersion).build(), + nodeCount, 1, dockerResources); fail("Expected the allocation to fail due to parent hosts not being active yet"); - } catch (ParentHostUnavailableException ignored) { } + } catch (OutOfCapacityException expected) { } // Activate the zone-app, thereby allocating the parents List<HostSpec> hosts = tester.prepare(zoneApplication, @@ -101,9 +98,9 @@ public class DockerProvisioningTest { tester.activate(zoneApplication, hosts); // Try allocating tenants again - nodes = tester.prepare(application1, - ClusterSpec.request(ClusterSpec.Type.content, ClusterSpec.Id.from("myContent")).vespaVersion(wantedVespaVersion).build(), - nodeCount, 1, dockerResources); + List<HostSpec> nodes = tester.prepare(application1, + ClusterSpec.request(ClusterSpec.Type.content, ClusterSpec.Id.from("myContent")).vespaVersion(wantedVespaVersion).build(), + nodeCount, 1, dockerResources); tester.activate(application1, new HashSet<>(nodes)); NodeList activeNodes = tester.getNodes(application1, Node.State.active); @@ -152,53 +149,49 @@ public class DockerProvisioningTest { /** Exclusive app first, then non-exclusive: Should give the same result as below */ @Test public void docker_application_deployment_with_exclusive_app_first() { + NodeResources hostResources = new NodeResources(10, 40, 1000, 10); + NodeResources nodeResources = new NodeResources(1, 4, 100, 1); ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build(); - for (int i = 1; i <= 4; i++) - tester.makeReadyVirtualDockerNode(i, dockerResources, "host1"); - for (int i = 5; i <= 8; i++) - tester.makeReadyVirtualDockerNode(i, dockerResources, "host2"); - for (int i = 9; i <= 12; i++) - tester.makeReadyVirtualDockerNode(i, dockerResources, "host3"); - for (int i = 13; i <= 16; i++) - tester.makeReadyVirtualDockerNode(i, dockerResources, "host4"); - - ApplicationId application1 = ProvisioningTester.makeApplicationId(); - prepareAndActivate(application1, 2, true, tester); - assertEquals(Set.of("host1", "host2"), hostsOf(tester.getNodes(application1, Node.State.active))); - - ApplicationId application2 = ProvisioningTester.makeApplicationId(); - prepareAndActivate(application2, 2, false, tester); + tester.makeReadyHosts(4, hostResources).activateTenantHosts(); + ApplicationId application1 = ProvisioningTester.makeApplicationId("app1"); + prepareAndActivate(application1, 2, true, nodeResources, tester); + assertEquals(Set.of("host-1.yahoo.com", "host-2.yahoo.com"), + hostsOf(tester.getNodes(application1, Node.State.active))); + + ApplicationId application2 = ProvisioningTester.makeApplicationId("app2"); + prepareAndActivate(application2, 2, false, nodeResources, tester); assertEquals("Application is assigned to separate hosts", - Set.of("host3", "host4"), hostsOf(tester.getNodes(application2, Node.State.active))); + Set.of("host-3.yahoo.com", "host-4.yahoo.com"), + hostsOf(tester.getNodes(application2, Node.State.active))); } /** Non-exclusive app first, then an exclusive: Should give the same result as above */ @Test public void docker_application_deployment_with_exclusive_app_last() { + NodeResources hostResources = new NodeResources(10, 40, 1000, 10); + NodeResources nodeResources = new NodeResources(1, 4, 100, 1); ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build(); - for (int i = 1; i <= 4; i++) - tester.makeReadyVirtualDockerNode(i, dockerResources, "host1"); - for (int i = 5; i <= 8; i++) - tester.makeReadyVirtualDockerNode(i, dockerResources, "host2"); - for (int i = 9; i <= 12; i++) - tester.makeReadyVirtualDockerNode(i, dockerResources, "host3"); - for (int i = 13; i <= 16; i++) - tester.makeReadyVirtualDockerNode(i, dockerResources, "host4"); - - ApplicationId application1 = ProvisioningTester.makeApplicationId(); - prepareAndActivate(application1, 2, false, tester); - assertEquals(Set.of("host1", "host2"), hostsOf(tester.getNodes(application1, Node.State.active))); - - ApplicationId application2 = ProvisioningTester.makeApplicationId(); - prepareAndActivate(application2, 2, true, tester); + tester.makeReadyHosts(4, hostResources).activateTenantHosts(); + ApplicationId application1 = ProvisioningTester.makeApplicationId("app1"); + prepareAndActivate(application1, 2, false, nodeResources, tester); + assertEquals(Set.of("host-1.yahoo.com", "host-2.yahoo.com"), + hostsOf(tester.getNodes(application1, Node.State.active))); + + ApplicationId application2 = ProvisioningTester.makeApplicationId("app2"); + prepareAndActivate(application2, 2, true, nodeResources, tester); assertEquals("Application is assigned to separate hosts", - Set.of("host3", "host4"), hostsOf(tester.getNodes(application2, Node.State.active))); + Set.of("host-3.yahoo.com", "host-4.yahoo.com"), + hostsOf(tester.getNodes(application2, Node.State.active))); } /** Test making an application exclusive */ @Test public void docker_application_deployment_change_to_exclusive_and_back() { + NodeResources hostResources = new NodeResources(10, 40, 1000, 10); + NodeResources nodeResources = new NodeResources(1, 4, 100, 1); ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build(); + tester.makeReadyHosts(4, hostResources).activateTenantHosts(); + /* for (int i = 1; i <= 4; i++) tester.makeReadyVirtualDockerNode(i, dockerResources, "host1"); for (int i = 5; i <= 8; i++) @@ -207,19 +200,20 @@ public class DockerProvisioningTest { tester.makeReadyVirtualDockerNode(i, dockerResources, "host3"); for (int i = 13; i <= 16; i++) tester.makeReadyVirtualDockerNode(i, dockerResources, "host4"); + */ ApplicationId application1 = ProvisioningTester.makeApplicationId(); - prepareAndActivate(application1, 2, false, tester); + prepareAndActivate(application1, 2, false, nodeResources, tester); for (Node node : tester.getNodes(application1, Node.State.active)) assertFalse(node.allocation().get().membership().cluster().isExclusive()); - prepareAndActivate(application1, 2, true, tester); - assertEquals(Set.of("host1", "host2"), hostsOf(tester.getNodes(application1, Node.State.active))); + prepareAndActivate(application1, 2, true, nodeResources, tester); + assertEquals(Set.of("host-1.yahoo.com", "host-2.yahoo.com"), hostsOf(tester.getNodes(application1, Node.State.active))); for (Node node : tester.getNodes(application1, Node.State.active)) assertTrue(node.allocation().get().membership().cluster().isExclusive()); - prepareAndActivate(application1, 2, false, tester); - assertEquals(Set.of("host1", "host2"), hostsOf(tester.getNodes(application1, Node.State.active))); + prepareAndActivate(application1, 2, false, nodeResources, tester); + assertEquals(Set.of("host-1.yahoo.com", "host-2.yahoo.com"), hostsOf(tester.getNodes(application1, Node.State.active))); for (Node node : tester.getNodes(application1, Node.State.active)) assertFalse(node.allocation().get().membership().cluster().isExclusive()); } @@ -227,56 +221,34 @@ public class DockerProvisioningTest { /** Non-exclusive app first, then an exclusive: Should give the same result as above */ @Test public void docker_application_deployment_with_exclusive_app_causing_allocation_failure() { + ApplicationId application1 = ApplicationId.from("tenant1", "app1", "default"); + ApplicationId application2 = ApplicationId.from("tenant2", "app2", "default"); + ApplicationId application3 = ApplicationId.from("tenant1", "app3", "default"); + NodeResources hostResources = new NodeResources(10, 40, 1000, 10); + NodeResources nodeResources = new NodeResources(1, 4, 100, 1); ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.prod, RegionName.from("us-east"))).build(); - for (int i = 1; i <= 4; i++) - tester.makeReadyVirtualDockerNode(i, dockerResources, "host1"); - for (int i = 5; i <= 8; i++) - tester.makeReadyVirtualDockerNode(i, dockerResources, "host2"); - for (int i = 9; i <= 12; i++) - tester.makeReadyVirtualDockerNode(i, dockerResources, "host3"); - for (int i = 13; i <= 16; i++) - tester.makeReadyVirtualDockerNode(i, dockerResources, "host4"); + tester.makeReadyHosts(4, hostResources).activateTenantHosts(); - ApplicationId application1 = ProvisioningTester.makeApplicationId(); - prepareAndActivate(application1, 2, true, tester); - assertEquals(Set.of("host1", "host2"), hostsOf(tester.getNodes(application1, Node.State.active))); + prepareAndActivate(application1, 2, true, nodeResources, tester); + assertEquals(Set.of("host-1.yahoo.com", "host-2.yahoo.com"), + hostsOf(tester.getNodes(application1, Node.State.active))); try { - ApplicationId application2 = ApplicationId.from("tenant1", "app1", "default"); - prepareAndActivate(application2, 3, false, tester); + prepareAndActivate(application2, 3, false, nodeResources, tester); fail("Expected allocation failure"); } catch (Exception e) { assertEquals("No room for 3 nodes as 2 of 4 hosts are exclusive", "Could not satisfy request for 3 nodes with " + - "[vcpu: 1.0, memory: 4.0 Gb, disk 100.0 Gb, bandwidth: 1.0 Gbps, storage type: local] " + - "in tenant1.app1 container cluster 'myContainer' 6.39: " + + "[vcpu: 1.0, memory: 4.0 Gb, disk 100.0 Gb, bandwidth: 1.0 Gbps] " + + "in tenant2.app2 container cluster 'myContainer' 6.39: " + "Out of capacity on group 0: " + - "Not enough nodes available due to host exclusivity constraints, " + - "insufficient nodes available on separate physical hosts", + "Not enough nodes available due to host exclusivity constraints", e.getMessage()); } // Adding 3 nodes of another application for the same tenant works - ApplicationId application3 = ApplicationId.from(application1.tenant(), ApplicationName.from("app3"), InstanceName.from("default")); - prepareAndActivate(application3, 2, true, tester); - } - - // In dev, test and staging you get nodes with default flavor, but we should get specified flavor for docker nodes - @Test - public void get_specified_flavor_not_default_flavor_for_docker() { - ProvisioningTester tester = new ProvisioningTester.Builder().zone(new Zone(Environment.test, RegionName.from("corp-us-east-1"))).build(); - ApplicationId application1 = ProvisioningTester.makeApplicationId(); - tester.makeReadyVirtualDockerNodes(1, dockerResources, "dockerHost"); - - List<HostSpec> hosts = tester.prepare(application1, - ClusterSpec.request(ClusterSpec.Type.content, ClusterSpec.Id.from("myContent")).vespaVersion("6.42").build(), - 1, 1, dockerResources); - tester.activate(application1, new HashSet<>(hosts)); - - NodeList nodes = tester.getNodes(application1, Node.State.active); - assertEquals(1, nodes.size()); - assertEquals("[vcpu: 1.0, memory: 4.0 Gb, disk 100.0 Gb, bandwidth: 1.0 Gbps, storage type: local]", nodes.asList().get(0).flavor().name()); + prepareAndActivate(application3, 2, true, nodeResources, tester); } @Test @@ -442,9 +414,13 @@ public class DockerProvisioningTest { } private void prepareAndActivate(ApplicationId application, int nodeCount, boolean exclusive, ProvisioningTester tester) { + prepareAndActivate(application, nodeCount, exclusive, dockerResources, tester); + } + + private void prepareAndActivate(ApplicationId application, int nodeCount, boolean exclusive, NodeResources resources, ProvisioningTester tester) { Set<HostSpec> hosts = new HashSet<>(tester.prepare(application, ClusterSpec.request(ClusterSpec.Type.container, ClusterSpec.Id.from("myContainer")).vespaVersion("6.39").exclusive(exclusive).build(), - Capacity.from(new ClusterResources(nodeCount, 1, dockerResources), false, true))); + Capacity.from(new ClusterResources(nodeCount, 1, resources), false, true))); tester.activate(application, hosts); } diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/LoadBalancerProvisionerTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/LoadBalancerProvisionerTest.java index 88f5d41a4f0..3d784c403f3 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/LoadBalancerProvisionerTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/LoadBalancerProvisionerTest.java @@ -6,6 +6,7 @@ import com.yahoo.config.provision.ApplicationId; import com.yahoo.config.provision.Capacity; import com.yahoo.config.provision.ClusterResources; import com.yahoo.config.provision.ClusterSpec; +import com.yahoo.config.provision.Flavor; import com.yahoo.config.provision.HostName; import com.yahoo.config.provision.HostSpec; import com.yahoo.config.provision.NodeResources; @@ -158,9 +159,10 @@ public class LoadBalancerProvisionerTest { @Test public void provision_load_balancers_with_dynamic_node_provisioning() { - var nodes = prepare(app1, Capacity.from(new ClusterResources(2, 1, new NodeResources(1, 4, 10, 0.3)), false, true), - true, - clusterRequest(ClusterSpec.Type.container, ClusterSpec.Id.from("qrs"))); + NodeResources resources = new NodeResources(1, 4, 10, 0.3); + tester.makeReadyHosts(2, resources); + tester.activateTenantHosts(); + var nodes = tester.prepare(app1, clusterRequest(ClusterSpec.Type.container, ClusterSpec.Id.from("qrs")), 2 , 1, resources); Supplier<LoadBalancer> lb = () -> tester.nodeRepository().loadBalancers(app1).asList().get(0); assertTrue("Load balancer provisioned with empty reals", tester.loadBalancerService().instances().get(lb.get().id()).reals().isEmpty()); assignIps(tester.nodeRepository().getNodes(app1)); @@ -171,14 +173,12 @@ public class LoadBalancerProvisionerTest { NestedTransaction removeTransaction = new NestedTransaction(); tester.provisioner().remove(removeTransaction, app1); removeTransaction.commit(); - tester.nodeRepository().database().removeNodes(tester.nodeRepository().getNodes()); - assertTrue("Nodes are deleted", tester.nodeRepository().getNodes().isEmpty()); + tester.nodeRepository().database().removeNodes(tester.nodeRepository().getNodes(NodeType.tenant)); + assertTrue("Nodes are deleted", tester.nodeRepository().getNodes(NodeType.tenant).isEmpty()); assertSame("Load balancer is deactivated", LoadBalancer.State.inactive, lb.get().state()); // Application is redeployed - nodes = prepare(app1, Capacity.from(new ClusterResources(2, 1, new NodeResources(1, 4, 10, 0.3)), false, true), - true, - clusterRequest(ClusterSpec.Type.container, ClusterSpec.Id.from("qrs"))); + nodes = tester.prepare(app1, clusterRequest(ClusterSpec.Type.container, ClusterSpec.Id.from("qrs")), 2 , 1, resources); assertTrue("Load balancer is reconfigured with empty reals", tester.loadBalancerService().instances().get(lb.get().id()).reals().isEmpty()); assignIps(tester.nodeRepository().getNodes(app1)); tester.activate(app1, nodes); @@ -188,7 +188,6 @@ public class LoadBalancerProvisionerTest { @Test public void does_not_provision_load_balancers_for_non_tenant_node_type() { tester.activate(infraApp1, prepare(infraApp1, Capacity.fromRequiredNodeType(NodeType.host), - false, clusterRequest(ClusterSpec.Type.container, ClusterSpec.Id.from("tenant-host")))); assertTrue("No load balancer provisioned", tester.loadBalancerService().instances().isEmpty()); @@ -220,7 +219,7 @@ public class LoadBalancerProvisionerTest { ApplicationId configServerApp = ApplicationId.from("hosted-vespa", "zone-config-servers", "default"); Supplier<List<LoadBalancer>> lbs = () -> tester.nodeRepository().loadBalancers(configServerApp).asList(); var cluster = ClusterSpec.Id.from("zone-config-servers"); - var nodes = prepare(configServerApp, Capacity.fromRequiredNodeType(NodeType.config), false, + var nodes = prepare(configServerApp, Capacity.fromRequiredNodeType(NodeType.config), clusterRequest(ClusterSpec.Type.admin, cluster)); assertEquals(1, lbs.get().size()); assertEquals("Prepare provisions load balancer with reserved nodes", 2, lbs.get().get(0).instance().reals().size()); @@ -235,7 +234,7 @@ public class LoadBalancerProvisionerTest { ApplicationId controllerApp = ApplicationId.from("hosted-vespa", "controller", "default"); Supplier<List<LoadBalancer>> lbs = () -> tester.nodeRepository().loadBalancers(controllerApp).asList(); var cluster = ClusterSpec.Id.from("zone-config-servers"); - var nodes = prepare(controllerApp, Capacity.fromRequiredNodeType(NodeType.controller), false, + var nodes = prepare(controllerApp, Capacity.fromRequiredNodeType(NodeType.controller), clusterRequest(ClusterSpec.Type.container, cluster)); assertEquals(1, lbs.get().size()); assertEquals("Prepare provisions load balancer with reserved nodes", 2, lbs.get().get(0).instance().reals().size()); @@ -249,15 +248,11 @@ public class LoadBalancerProvisionerTest { } private Set<HostSpec> prepare(ApplicationId application, ClusterSpec... specs) { - return prepare(application, Capacity.from(new ClusterResources(2, 1, new NodeResources(1, 4, 10, 0.3)), false, true), false, specs); + return prepare(application, Capacity.from(new ClusterResources(2, 1, new NodeResources(1, 4, 10, 0.3)), false, true), specs); } - private Set<HostSpec> prepare(ApplicationId application, Capacity capacity, boolean dynamicDockerNodes, ClusterSpec... specs) { - if (dynamicDockerNodes) { - makeDynamicDockerNodes(specs.length * 2, capacity.type()); - } else { - tester.makeReadyNodes(specs.length * 2, new NodeResources(1, 4, 10, 0.3), capacity.type()); - } + private Set<HostSpec> prepare(ApplicationId application, Capacity capacity, ClusterSpec... specs) { + tester.makeReadyNodes(specs.length * 2, new NodeResources(1, 4, 10, 0.3), capacity.type()); Set<HostSpec> allNodes = new LinkedHashSet<>(); for (ClusterSpec spec : specs) { allNodes.addAll(tester.prepare(application, spec, capacity)); @@ -266,9 +261,11 @@ public class LoadBalancerProvisionerTest { } private void makeDynamicDockerNodes(int n, NodeType nodeType) { + tester.makeReadyHosts(n, new NodeResources(1, 4, 10, 0.3)); List<Node> nodes = new ArrayList<>(n); for (int i = 1; i <= n; i++) { - var node = Node.createDockerNode(Set.of(), "node" + i, "parent" + i, + var node = Node.createDockerNode(Set.of(), "vnode" + i, + tester.nodeRepository().getNodes(NodeType.host).get(n - 1).hostname(), new NodeResources(1, 4, 10, 0.3), nodeType); nodes.add(node); diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/VirtualNodeProvisioningTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/VirtualNodeProvisioningTest.java index b5e31e7cbdb..d1446cd8bc1 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/VirtualNodeProvisioningTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/VirtualNodeProvisioningTest.java @@ -34,40 +34,37 @@ import static org.junit.Assert.assertNotNull; // to remove these tests public class VirtualNodeProvisioningTest { - private static final NodeResources flavor = new NodeResources(4, 8, 100, 1); + private static final NodeResources resources = new NodeResources(4, 8, 100, 1); private static final ClusterSpec contentClusterSpec = ClusterSpec.request(ClusterSpec.Type.content, ClusterSpec.Id.from("myContent")).vespaVersion("6.42").build(); private static final ClusterSpec containerClusterSpec = ClusterSpec.request(ClusterSpec.Type.container, ClusterSpec.Id.from("myContainer")).vespaVersion("6.42").build(); private ProvisioningTester tester = new ProvisioningTester.Builder().build(); - private ApplicationId applicationId = ProvisioningTester.makeApplicationId(); + private ApplicationId applicationId = ProvisioningTester.makeApplicationId("test"); @Test public void distinct_parent_host_for_each_node_in_a_cluster() { - tester.makeReadyVirtualDockerNodes(2, flavor, "parentHost1"); - tester.makeReadyVirtualDockerNodes(2, flavor, "parentHost2"); - tester.makeReadyVirtualDockerNodes(2, flavor, "parentHost3"); - tester.makeReadyVirtualDockerNodes(1, flavor, "parentHost4"); - - final int containerNodeCount = 4; - final int contentNodeCount = 3; - final int groups = 1; - List<HostSpec> containerHosts = prepare(containerClusterSpec, containerNodeCount, groups); - List<HostSpec> contentHosts = prepare(contentClusterSpec, contentNodeCount, groups); + tester.makeReadyHosts(4, new NodeResources(8, 16, 200, 2)) + .activateTenantHosts(); + int containerNodeCount = 4; + int contentNodeCount = 3; + int groups = 1; + List<HostSpec> containerHosts = tester.prepare(applicationId, containerClusterSpec, containerNodeCount, groups, resources); + List<HostSpec> contentHosts = tester.prepare(applicationId, contentClusterSpec, contentNodeCount, groups, resources); activate(containerHosts, contentHosts); - final List<Node> nodes = getNodes(applicationId); + List<Node> nodes = getNodes(applicationId); assertEquals(contentNodeCount + containerNodeCount, nodes.size()); assertDistinctParentHosts(nodes, ClusterSpec.Type.container, containerNodeCount); assertDistinctParentHosts(nodes, ClusterSpec.Type.content, contentNodeCount); // Go down to 3 nodes in container cluster - List<HostSpec> containerHosts2 = prepare(containerClusterSpec, containerNodeCount - 1, groups); + List<HostSpec> containerHosts2 = tester.prepare(applicationId, containerClusterSpec, containerNodeCount - 1, groups, resources); activate(containerHosts2); List<Node> nodes2 = getNodes(applicationId); assertDistinctParentHosts(nodes2, ClusterSpec.Type.container, containerNodeCount - 1); // Go up to 4 nodes again in container cluster - List<HostSpec> containerHosts3 = prepare(containerClusterSpec, containerNodeCount, groups); + List<HostSpec> containerHosts3 = tester.prepare(applicationId, containerClusterSpec, containerNodeCount, groups, resources); activate(containerHosts3); List<Node> nodes3 = getNodes(applicationId); assertDistinctParentHosts(nodes3, ClusterSpec.Type.container, containerNodeCount); @@ -97,7 +94,7 @@ public class VirtualNodeProvisioningTest { // Allowed to use same parent host for several nodes in same cluster in CD (even if prod env) { tester = new ProvisioningTester.Builder().zone(new Zone(SystemName.cd, Environment.prod, RegionName.from("us-east"))).build(); - tester.makeReadyNodes(4, flavor, NodeType.host, 1); + tester.makeReadyNodes(4, resources, NodeType.host, 1); tester.prepareAndActivateInfraApplication(ProvisioningTester.makeApplicationId(), NodeType.host); List<HostSpec> containerHosts = prepare(containerClusterSpec, containerNodeCount, groups); @@ -110,19 +107,13 @@ public class VirtualNodeProvisioningTest { @Test public void will_retire_clashing_active() { - tester.makeReadyVirtualDockerNodes(1, flavor, "parentHost1"); - tester.makeReadyVirtualDockerNodes(1, flavor, "parentHost2"); - tester.makeReadyVirtualDockerNodes(1, flavor, "parentHost3"); - tester.makeReadyVirtualDockerNodes(1, flavor, "parentHost4"); - tester.makeReadyVirtualDockerNodes(1, flavor, "parentHost5"); - tester.makeReadyVirtualDockerNodes(1, flavor, "parentHost6"); - + tester.makeReadyHosts(4, resources).activateTenantHosts(); int containerNodeCount = 2; int contentNodeCount = 2; int groups = 1; - List<HostSpec> containerHosts = prepare(containerClusterSpec, containerNodeCount, groups); - List<HostSpec> contentHosts = prepare(contentClusterSpec, contentNodeCount, groups); - activate(containerHosts, contentHosts); + List<HostSpec> containerNodes = tester.prepare(applicationId, containerClusterSpec, containerNodeCount, groups, resources); + List<HostSpec> contentNodes = tester.prepare(applicationId, contentClusterSpec, contentNodeCount, groups, resources); + activate(containerNodes, contentNodes); List<Node> nodes = getNodes(applicationId); assertEquals(4, nodes.size()); @@ -130,48 +121,19 @@ public class VirtualNodeProvisioningTest { assertDistinctParentHosts(nodes, ClusterSpec.Type.content, contentNodeCount); tester.patchNodes(nodes, (n) -> n.withParentHostname("clashing")); - containerHosts = prepare(containerClusterSpec, containerNodeCount, groups); - contentHosts = prepare(contentClusterSpec, contentNodeCount, groups); - activate(containerHosts, contentHosts); + containerNodes = prepare(containerClusterSpec, containerNodeCount, groups); + contentNodes = prepare(contentClusterSpec, contentNodeCount, groups); + activate(containerNodes, contentNodes); nodes = getNodes(applicationId); assertEquals(6, nodes.size()); assertEquals(2, nodes.stream().filter(n -> n.allocation().get().membership().retired()).count()); } - @Test - public void fail_when_all_hosts_become_clashing() { - tester.makeReadyVirtualDockerNodes(1, flavor, "parentHost1"); - tester.makeReadyVirtualDockerNodes(1, flavor, "parentHost2"); - tester.makeReadyVirtualDockerNodes(1, flavor, "parentHost3"); - tester.makeReadyVirtualDockerNodes(1, flavor, "parentHost4"); - - int containerNodeCount = 2; - int contentNodeCount = 2; - int groups = 1; - List<HostSpec> containerHosts = prepare(containerClusterSpec, containerNodeCount, groups); - List<HostSpec> contentHosts = prepare(contentClusterSpec, contentNodeCount, groups); - activate(containerHosts, contentHosts); - - List<Node> nodes = getNodes(applicationId); - assertEquals(4, nodes.size()); - assertDistinctParentHosts(nodes, ClusterSpec.Type.container, containerNodeCount); - assertDistinctParentHosts(nodes, ClusterSpec.Type.content, contentNodeCount); - - tester.patchNodes(nodes, (n) -> n.withParentHostname("clashing")); - OutOfCapacityException expected = null; - try { - containerHosts = prepare(containerClusterSpec, containerNodeCount, groups); - } catch (OutOfCapacityException e) { - expected = e; - } - assertNotNull(expected); - } - @Test(expected = OutOfCapacityException.class) public void fail_when_too_few_distinct_parent_hosts() { - tester.makeReadyVirtualDockerNodes(2, flavor, "parentHost1"); - tester.makeReadyVirtualDockerNodes(1, flavor, "parentHost2"); + tester.makeReadyVirtualDockerNodes(2, resources, "parentHost1"); + tester.makeReadyVirtualDockerNodes(1, resources, "parentHost2"); int contentNodeCount = 3; List<HostSpec> hosts = prepare(contentClusterSpec, contentNodeCount, 1); @@ -182,26 +144,8 @@ public class VirtualNodeProvisioningTest { } @Test - public void incomplete_parent_hosts_has_distinct_distribution() { - tester.makeReadyVirtualDockerNode(1, flavor, "parentHost1"); - tester.makeReadyVirtualDockerNode(1, flavor, "parentHost2"); - tester.makeReadyVirtualNodes(1, flavor); - - final int contentNodeCount = 3; - final int groups = 1; - final List<HostSpec> contentHosts = prepare(contentClusterSpec, contentNodeCount, groups); - activate(contentHosts); - assertEquals(3, getNodes(applicationId).size()); - - tester.makeReadyVirtualDockerNode(2, flavor, "parentHost1"); - tester.makeReadyVirtualDockerNode(2, flavor, "parentHost2"); - - assertEquals(contentHosts, prepare(contentClusterSpec, contentNodeCount, groups)); - } - - @Test public void indistinct_distribution_with_known_ready_nodes() { - tester.makeReadyVirtualNodes(3, flavor); + tester.makeReadyVirtualNodes(3, resources); final int contentNodeCount = 3; final int groups = 1; @@ -218,8 +162,8 @@ public class VirtualNodeProvisioningTest { nodes = getNodes(applicationId); assertEquals(3, nodes.stream().filter(n -> n.parentHostname().isPresent()).count()); - tester.makeReadyVirtualDockerNodes(1, flavor, "parentHost1"); - tester.makeReadyVirtualDockerNodes(2, flavor, "parentHost2"); + tester.makeReadyVirtualDockerNodes(1, resources, "parentHost1"); + tester.makeReadyVirtualDockerNodes(2, resources, "parentHost2"); OutOfCapacityException expectedException = null; try { @@ -232,7 +176,7 @@ public class VirtualNodeProvisioningTest { @Test public void unknown_distribution_with_known_ready_nodes() { - tester.makeReadyVirtualNodes(3, flavor); + tester.makeReadyVirtualNodes(3, resources); final int contentNodeCount = 3; final int groups = 1; @@ -240,15 +184,15 @@ public class VirtualNodeProvisioningTest { activate(contentHosts); assertEquals(3, getNodes(applicationId).size()); - tester.makeReadyVirtualDockerNodes(1, flavor, "parentHost1"); - tester.makeReadyVirtualDockerNodes(1, flavor, "parentHost2"); - tester.makeReadyVirtualDockerNodes(1, flavor, "parentHost3"); + tester.makeReadyVirtualDockerNodes(1, resources, "parentHost1"); + tester.makeReadyVirtualDockerNodes(1, resources, "parentHost2"); + tester.makeReadyVirtualDockerNodes(1, resources, "parentHost3"); assertEquals(contentHosts, prepare(contentClusterSpec, contentNodeCount, groups)); } @Test public void unknown_distribution_with_known_and_unknown_ready_nodes() { - tester.makeReadyVirtualNodes(3, flavor); + tester.makeReadyVirtualNodes(3, resources); int contentNodeCount = 3; int groups = 1; @@ -256,8 +200,8 @@ public class VirtualNodeProvisioningTest { activate(contentHosts); assertEquals(3, getNodes(applicationId).size()); - tester.makeReadyVirtualDockerNodes(1, flavor, "parentHost1"); - tester.makeReadyVirtualNodes(1, flavor); + tester.makeReadyVirtualDockerNodes(1, resources, "parentHost1"); + tester.makeReadyVirtualNodes(1, resources); assertEquals(contentHosts, prepare(contentClusterSpec, contentNodeCount, groups)); } @@ -289,7 +233,7 @@ public class VirtualNodeProvisioningTest { } private List<HostSpec> prepare(ClusterSpec clusterSpec, int nodeCount, int groups) { - return tester.prepare(applicationId, clusterSpec, nodeCount, groups, flavor); + return tester.prepare(applicationId, clusterSpec, nodeCount, groups, resources); } private List<HostSpec> prepare(ClusterSpec clusterSpec, int nodeCount, int groups, NodeResources flavor) { diff --git a/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt b/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt index b5465b1f0dd..cf18be8f5de 100644 --- a/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt +++ b/searchcore/src/apps/vespa-feed-bm/CMakeLists.txt @@ -3,11 +3,15 @@ vespa_add_executable(searchcore_vespa_feed_bm_app SOURCES vespa_feed_bm.cpp bm_cluster_controller.cpp + bm_message_bus.cpp bm_storage_chain_builder.cpp bm_storage_link.cpp + document_api_message_bus_bm_feed_handler.cpp + pending_tracker_hash.cpp spi_bm_feed_handler.cpp - storage_api_rpc_bm_feed_handler.cpp storage_api_chain_bm_feed_handler.cpp + storage_api_message_bus_bm_feed_handler.cpp + storage_api_rpc_bm_feed_handler.cpp storage_reply_error_checker.cpp OUTPUT_NAME vespa-feed-bm DEPENDS diff --git a/searchcore/src/apps/vespa-feed-bm/bm_message_bus.cpp b/searchcore/src/apps/vespa-feed-bm/bm_message_bus.cpp new file mode 100644 index 00000000000..ec50a4c7c01 --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/bm_message_bus.cpp @@ -0,0 +1,180 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "bm_message_bus.h" +#include "pending_tracker_hash.h" +#include "pending_tracker.h" +#include "storage_reply_error_checker.h" +#include <vespa/messagebus/emptyreply.h> +#include <vespa/messagebus/network/rpcnetworkparams.h> +#include <vespa/messagebus/rpcmessagebus.h> +#include <vespa/messagebus/ireplyhandler.h> +#include <vespa/documentapi/messagebus/documentprotocol.h> +#include <vespa/documentapi/messagebus/messages/documentmessage.h> +#include <vespa/storageapi/mbusprot/storageprotocol.h> +#include <vespa/storageapi/mbusprot/storagereply.h> +#include <vespa/vespalib/stllike/asciistream.h> + +#include <vespa/log/log.h> +LOG_SETUP(".bm_message_bus"); + +using documentapi::DocumentProtocol; +using mbus::RPCMessageBus; +using mbus::Reply; +using mbus::SourceSession; +using storage::mbusprot::StorageProtocol; +using storage::mbusprot::StorageReply; + +namespace feedbm { + +namespace { + +std::atomic<uint64_t> bm_message_bus_msg_id(0u); + +vespalib::string reply_as_string(Reply &reply) { + vespalib::asciistream os; + if (reply.getType() == 0) { + os << "empty reply"; + } else { + os << "reply=" << reply.toString() << ", protocol=" << reply.getProtocol(); + } + os << ", "; + auto message = reply.getMessage(); + if (message) { + os << "message=" << message->toString(); + os << ", protocol=" << message->getProtocol(); + } else { + os << "no message"; + } + reply.setMessage(std::move(message)); + os << ", "; + if (reply.hasErrors()) { + os << "errors=["; + for (uint32_t i = 0; i < reply.getNumErrors(); ++i) { + auto &error = reply.getError(i); + if (i > 0) { + os << ", "; + } + os << mbus::ErrorCode::getName(error.getCode()) << ": " << error.getMessage() << " (from " << error.getService() << ")"; + } + os << "]"; + } else { + os << "no errors"; + } + return os.str(); +} + +} + +class BmMessageBus::ReplyHandler : public mbus::IReplyHandler, + public StorageReplyErrorChecker +{ + PendingTrackerHash _pending_hash; +public: + ReplyHandler(); + ~ReplyHandler() override; + void handleReply(std::unique_ptr<Reply> reply) override; + void retain(uint64_t msg_id, PendingTracker &tracker) { _pending_hash.retain(msg_id, tracker); } + void message_aborted(uint64_t msg_id); +}; + +BmMessageBus::ReplyHandler::ReplyHandler() + : mbus::IReplyHandler(), + StorageReplyErrorChecker(), + _pending_hash() +{ +} + +BmMessageBus::ReplyHandler::~ReplyHandler() = default; + +void +BmMessageBus::ReplyHandler::handleReply(std::unique_ptr<Reply> reply) +{ + auto msg_id = reply->getContext().value.UINT64; + auto tracker = _pending_hash.release(msg_id); + if (tracker != nullptr) { + bool failed = false; + if (reply->getType() == 0 || reply->hasErrors()) { + failed = true; // empty reply or error + } else { + auto protocol = reply->getProtocol(); + if (protocol == DocumentProtocol::NAME) { + } else if (protocol == StorageProtocol::NAME) { + auto sreply = dynamic_cast<storage::mbusprot::StorageReply *>(reply.get()); + if (sreply != nullptr) { + check_error(*sreply->getReply()); + } else { + failed = true; // unexpected message type + } + } else { + failed = true; // unexpected protocol + } + } + if (failed) { + ++_errors; + LOG(error, "Unexpected %s", reply_as_string(*reply).c_str()); + } + tracker->release(); + } else { + ++_errors; + LOG(error, "Untracked %s", reply_as_string(*reply).c_str()); + } +} + +void +BmMessageBus::ReplyHandler::message_aborted(uint64_t msg_id) +{ + ++_errors; + auto tracker = _pending_hash.release(msg_id); + tracker->release(); +} + +BmMessageBus::BmMessageBus(const config::ConfigUri& config_uri, + std::shared_ptr<const document::DocumentTypeRepo> document_type_repo, + const documentapi::LoadTypeSet& load_types) + : _reply_handler(std::make_unique<ReplyHandler>()), + _message_bus(), + _session() +{ + mbus::RPCNetworkParams params(config_uri); + mbus::ProtocolSet protocol_set; + protocol_set.add(std::make_shared<DocumentProtocol>(load_types, document_type_repo)); + protocol_set.add(std::make_shared<StorageProtocol>(document_type_repo, load_types)); + params.setIdentity(mbus::Identity("vespa-bm-client")); + _message_bus = std::make_unique<mbus::RPCMessageBus>( + protocol_set, + params, + config_uri); + mbus::SourceSessionParams srcParams; + srcParams.setThrottlePolicy(mbus::IThrottlePolicy::SP()); + srcParams.setReplyHandler(*_reply_handler); + _session = _message_bus->getMessageBus().createSourceSession(srcParams); +} + +BmMessageBus::~BmMessageBus() +{ + _session.reset(); + _message_bus.reset(); + _reply_handler.reset(); +} + +uint32_t +BmMessageBus::get_error_count() const +{ + return _reply_handler->get_error_count(); +} + +void +BmMessageBus::send_msg(std::unique_ptr<mbus::Message> msg, const mbus::Route &route, PendingTracker &tracker) +{ + auto msg_id = ++bm_message_bus_msg_id; + _reply_handler->retain(msg_id, tracker); + msg->setContext(mbus::Context(msg_id)); + msg->setRetryEnabled(false); + auto result = _session->send(std::move(msg), route); + if (!result.isAccepted()) { + LOG(error, "Message not accepeted, error is '%s'", result.getError().toString().c_str()); + _reply_handler->message_aborted(msg_id); + } +} + +} diff --git a/searchcore/src/apps/vespa-feed-bm/bm_message_bus.h b/searchcore/src/apps/vespa-feed-bm/bm_message_bus.h new file mode 100644 index 00000000000..9ebe394e9e6 --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/bm_message_bus.h @@ -0,0 +1,42 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <memory> + +namespace config { class ConfigUri; } +namespace document { class DocumentTypeRepo; } +namespace documentapi { class LoadTypeSet; } + +namespace mbus { + +class Message; +class RPCMessageBus; +class Route; +class SourceSession; + +} + +namespace feedbm { + +class PendingTracker; + +/* + * Message bus for feed benchmark program. + */ +class BmMessageBus +{ + class ReplyHandler; + std::unique_ptr<ReplyHandler> _reply_handler; + std::unique_ptr<mbus::RPCMessageBus> _message_bus; + std::unique_ptr<mbus::SourceSession> _session; +public: + BmMessageBus(const config::ConfigUri& config_uri, + std::shared_ptr<const document::DocumentTypeRepo> document_type_repo, + const documentapi::LoadTypeSet& load_types); + ~BmMessageBus(); + uint32_t get_error_count() const; + void send_msg(std::unique_ptr<mbus::Message> msg, const mbus::Route &route, PendingTracker &tracker); +}; + +} diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp b/searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp index 79517e98094..2aeda91c30c 100644 --- a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp +++ b/searchcore/src/apps/vespa-feed-bm/bm_storage_link.cpp @@ -2,45 +2,18 @@ #include "bm_storage_link.h" #include "pending_tracker.h" -#include <vespa/vespalib/stllike/hash_map.hpp> namespace feedbm { -void -BmStorageLink::retain(uint64_t msg_id, PendingTracker &tracker) -{ - tracker.retain(); - std::lock_guard lock(_mutex); - _pending.insert(std::make_pair(msg_id, &tracker)); -} - -PendingTracker * -BmStorageLink::release(uint64_t msg_id) -{ - std::lock_guard lock(_mutex); - auto itr = _pending.find(msg_id); - if (itr == _pending.end()) { - return nullptr; - } - auto tracker = itr->second; - _pending.erase(itr); - return tracker; -} - BmStorageLink::BmStorageLink() : storage::StorageLink("vespa-bm-feed"), StorageReplyErrorChecker(), - _mutex(), - _pending() + _pending_hash() { } -BmStorageLink::~BmStorageLink() -{ - std::lock_guard lock(_mutex); - assert(_pending.empty()); -} +BmStorageLink::~BmStorageLink() = default; bool BmStorageLink::onDown(const std::shared_ptr<storage::api::StorageMessage>& msg) @@ -52,7 +25,7 @@ BmStorageLink::onDown(const std::shared_ptr<storage::api::StorageMessage>& msg) bool BmStorageLink::onUp(const std::shared_ptr<storage::api::StorageMessage>& msg) { - auto tracker = release(msg->getMsgId()); + auto tracker = _pending_hash.release(msg->getMsgId()); if (tracker != nullptr) { check_error(*msg); tracker->release(); diff --git a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.h b/searchcore/src/apps/vespa-feed-bm/bm_storage_link.h index 63ece355c02..95528d7b2d9 100644 --- a/searchcore/src/apps/vespa-feed-bm/bm_storage_link.h +++ b/searchcore/src/apps/vespa-feed-bm/bm_storage_link.h @@ -3,8 +3,8 @@ #pragma once #include "storage_reply_error_checker.h" +#include "pending_tracker_hash.h" #include <vespa/storage/common/storagelink.h> -#include <vespa/vespalib/stllike/hash_map.h> namespace feedbm { @@ -17,15 +17,13 @@ class PendingTracker; class BmStorageLink : public storage::StorageLink, public StorageReplyErrorChecker { - std::mutex _mutex; - vespalib::hash_map<uint64_t, PendingTracker *> _pending; - PendingTracker *release(uint64_t msg_id); + PendingTrackerHash _pending_hash; public: BmStorageLink(); ~BmStorageLink() override; bool onDown(const std::shared_ptr<storage::api::StorageMessage>& msg) override; bool onUp(const std::shared_ptr<storage::api::StorageMessage>& msg) override; - void retain(uint64_t msg_id, PendingTracker &tracker); + void retain(uint64_t msg_id, PendingTracker &tracker) { _pending_hash.retain(msg_id, tracker); } }; } diff --git a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp new file mode 100644 index 00000000000..276a5a8136b --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.cpp @@ -0,0 +1,82 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "document_api_message_bus_bm_feed_handler.h" +#include "bm_message_bus.h" +#include "pending_tracker.h" +#include <vespa/document/fieldvalue/document.h> +#include <vespa/document/update/documentupdate.h> +#include <vespa/documentapi/messagebus/messages/putdocumentmessage.h> +#include <vespa/documentapi/messagebus/messages/removedocumentmessage.h> +#include <vespa/documentapi/messagebus/messages/updatedocumentmessage.h> +#include <vespa/storageapi/messageapi/storagemessage.h> + +using document::Document; +using document::DocumentId; +using document::DocumentUpdate; +using storage::api::StorageMessageAddress; +using storage::lib::NodeType; + +namespace feedbm { + +DocumentApiMessageBusBmFeedHandler::DocumentApiMessageBusBmFeedHandler(BmMessageBus &message_bus) + : IBmFeedHandler(), + _name(vespalib::string("DocumentApiMessageBusBmFeedHandler(distributor)")), + _storage_address(std::make_unique<StorageMessageAddress>("storage", NodeType::DISTRIBUTOR, 0)), + _message_bus(message_bus) +{ +} + +DocumentApiMessageBusBmFeedHandler::~DocumentApiMessageBusBmFeedHandler() = default; + +void +DocumentApiMessageBusBmFeedHandler::send_msg(std::unique_ptr<documentapi::DocumentMessage> msg, PendingTracker& pending_tracker) +{ + _message_bus.send_msg(std::move(msg), _storage_address->getRoute(), pending_tracker); +} + +void +DocumentApiMessageBusBmFeedHandler::put(const document::Bucket&, std::unique_ptr<Document> document, uint64_t, PendingTracker& tracker) +{ + auto msg = std::make_unique<documentapi::PutDocumentMessage>(std::move(document)); + send_msg(std::move(msg), tracker); +} + +void +DocumentApiMessageBusBmFeedHandler::update(const document::Bucket&, std::unique_ptr<DocumentUpdate> document_update, uint64_t, PendingTracker& tracker) +{ + auto msg = std::make_unique<documentapi::UpdateDocumentMessage>(std::move(document_update)); + send_msg(std::move(msg), tracker); +} + +void +DocumentApiMessageBusBmFeedHandler::remove(const document::Bucket&, const DocumentId& document_id, uint64_t, PendingTracker& tracker) +{ + auto msg = std::make_unique<documentapi::RemoveDocumentMessage>(document_id); + send_msg(std::move(msg), tracker); +} + +uint32_t +DocumentApiMessageBusBmFeedHandler::get_error_count() const +{ + return _message_bus.get_error_count(); +} + +const vespalib::string& +DocumentApiMessageBusBmFeedHandler::get_name() const +{ + return _name; +} + +bool +DocumentApiMessageBusBmFeedHandler::manages_buckets() const +{ + return true; +} + +bool +DocumentApiMessageBusBmFeedHandler::manages_timestamp() const +{ + return true; +} + +} diff --git a/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h new file mode 100644 index 00000000000..1e958da7900 --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/document_api_message_bus_bm_feed_handler.h @@ -0,0 +1,37 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "i_bm_feed_handler.h" + +namespace document { class DocumentTypeRepo; } +namespace documentapi { class DocumentMessage; }; +namespace storage::api { class StorageMessageAddress; } + +namespace feedbm { + +class BmMessageBus; + +/* + * Benchmark feed handler for feed to distributor using document api protocol + * over message bus. + */ +class DocumentApiMessageBusBmFeedHandler : public IBmFeedHandler +{ + vespalib::string _name; + std::unique_ptr<storage::api::StorageMessageAddress> _storage_address; + BmMessageBus& _message_bus; + void send_msg(std::unique_ptr<documentapi::DocumentMessage> msg, PendingTracker& tracker); +public: + DocumentApiMessageBusBmFeedHandler(BmMessageBus &message_bus); + ~DocumentApiMessageBusBmFeedHandler(); + void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override; + void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override; + void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override; + uint32_t get_error_count() const override; + const vespalib::string &get_name() const override; + bool manages_buckets() const override; + bool manages_timestamp() const override; +}; + +} diff --git a/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.cpp b/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.cpp new file mode 100644 index 00000000000..6863d35703e --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.cpp @@ -0,0 +1,43 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "pending_tracker_hash.h" +#include "pending_tracker.h" +#include <vespa/vespalib/stllike/hash_map.hpp> +#include <cassert> + +namespace feedbm { + +PendingTrackerHash::PendingTrackerHash() + : _mutex(), + _pending() +{ +} + +PendingTrackerHash::~PendingTrackerHash() +{ + std::lock_guard lock(_mutex); + assert(_pending.empty()); +} + +void +PendingTrackerHash::retain(uint64_t msg_id, PendingTracker &tracker) +{ + tracker.retain(); + std::lock_guard lock(_mutex); + _pending.insert(std::make_pair(msg_id, &tracker)); +} + +PendingTracker * +PendingTrackerHash::release(uint64_t msg_id) +{ + std::lock_guard lock(_mutex); + auto itr = _pending.find(msg_id); + if (itr == _pending.end()) { + return nullptr; + } + auto tracker = itr->second; + _pending.erase(itr); + return tracker; +} + +} diff --git a/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.h b/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.h new file mode 100644 index 00000000000..89be93fd4ed --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/pending_tracker_hash.h @@ -0,0 +1,26 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/vespalib/stllike/hash_map.h> +#include <mutex> + +namespace feedbm { + +class PendingTracker; + +/* + * Class maintaing mapping from message id to pending tracker + */ +class PendingTrackerHash +{ + std::mutex _mutex; + vespalib::hash_map<uint64_t, PendingTracker *> _pending; +public: + PendingTrackerHash(); + ~PendingTrackerHash(); + PendingTracker *release(uint64_t msg_id); + void retain(uint64_t msg_id, PendingTracker &tracker); +}; + +} diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp index 2d45caff152..0e73582fdd4 100644 --- a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp +++ b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.cpp @@ -33,7 +33,7 @@ std::shared_ptr<storage::api::StorageCommand> make_set_cluster_state_cmd() { StorageApiChainBmFeedHandler::StorageApiChainBmFeedHandler(std::shared_ptr<BmStorageLinkContext> context, bool distributor) : IBmFeedHandler(), - _name(vespalib::string("StorageApiChainBmFeedHandler(") + (distributor ? "distributor" : "servicelayer") + ")"), + _name(vespalib::string("StorageApiChainBmFeedHandler(") + (distributor ? "distributor" : "service-layer") + ")"), _distributor(distributor), _context(std::move(context)) { diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h index 089a7fd89e5..f877a244726 100644 --- a/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h +++ b/searchcore/src/apps/vespa-feed-bm/storage_api_chain_bm_feed_handler.h @@ -11,8 +11,8 @@ namespace feedbm { struct BmStorageLinkContext; /* - * Benchmark feed handler for feed to service layer using storage api protocol - * directly on the storage chain. + * Benchmark feed handler for feed to service layer or distributor + * using storage api protocol directly on the storage chain. */ class StorageApiChainBmFeedHandler : public IBmFeedHandler { diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp new file mode 100644 index 00000000000..a0a3eb5c6db --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.cpp @@ -0,0 +1,84 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "storage_api_message_bus_bm_feed_handler.h" +#include "bm_message_bus.h" +#include "pending_tracker.h" +#include <vespa/document/fieldvalue/document.h> +#include <vespa/document/update/documentupdate.h> +#include <vespa/storageapi/messageapi/storagemessage.h> +#include <vespa/storageapi/message/persistence.h> +#include <vespa/storageapi/mbusprot/storagecommand.h> + +using document::Document; +using document::DocumentId; +using document::DocumentUpdate; +using storage::api::StorageMessageAddress; +using storage::lib::NodeType; + +namespace feedbm { + +StorageApiMessageBusBmFeedHandler::StorageApiMessageBusBmFeedHandler(BmMessageBus &message_bus, bool distributor) + : IBmFeedHandler(), + _name(vespalib::string("StorageApiMessageBusBmFeedHandler(") + (distributor ? "distributor" : "service-layer") + ")"), + _distributor(distributor), + _storage_address(std::make_unique<StorageMessageAddress>("storage", distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE, 0)), + _message_bus(message_bus) +{ +} + +StorageApiMessageBusBmFeedHandler::~StorageApiMessageBusBmFeedHandler() = default; + +void +StorageApiMessageBusBmFeedHandler::send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& pending_tracker) +{ + cmd->setSourceIndex(0); + auto msg = std::make_unique<storage::mbusprot::StorageCommand>(cmd); + _message_bus.send_msg(std::move(msg), _storage_address->getRoute(), pending_tracker); +} + +void +StorageApiMessageBusBmFeedHandler::put(const document::Bucket& bucket, std::unique_ptr<Document> document, uint64_t timestamp, PendingTracker& tracker) +{ + auto cmd = std::make_unique<storage::api::PutCommand>(bucket, std::move(document), timestamp); + send_msg(std::move(cmd), tracker); +} + +void +StorageApiMessageBusBmFeedHandler::update(const document::Bucket& bucket, std::unique_ptr<DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) +{ + auto cmd = std::make_unique<storage::api::UpdateCommand>(bucket, std::move(document_update), timestamp); + send_msg(std::move(cmd), tracker); +} + +void +StorageApiMessageBusBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) +{ + auto cmd = std::make_unique<storage::api::RemoveCommand>(bucket, document_id, timestamp); + send_msg(std::move(cmd), tracker); +} + +uint32_t +StorageApiMessageBusBmFeedHandler::get_error_count() const +{ + return _message_bus.get_error_count(); +} + +const vespalib::string& +StorageApiMessageBusBmFeedHandler::get_name() const +{ + return _name; +} + +bool +StorageApiMessageBusBmFeedHandler::manages_buckets() const +{ + return _distributor; +} + +bool +StorageApiMessageBusBmFeedHandler::manages_timestamp() const +{ + return _distributor; +} + +} diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h new file mode 100644 index 00000000000..84e69053289 --- /dev/null +++ b/searchcore/src/apps/vespa-feed-bm/storage_api_message_bus_bm_feed_handler.h @@ -0,0 +1,41 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "i_bm_feed_handler.h" + +namespace document { class DocumentTypeRepo; } +namespace documentapi { class DocumentMessage; }; +namespace storage::api { +class StorageCommand; +class StorageMessageAddress; +} + +namespace feedbm { + +class BmMessageBus; + +/* + * Benchmark feed handler for feed to service layer or distributor + * using storage api protocol over message bus. + */ +class StorageApiMessageBusBmFeedHandler : public IBmFeedHandler +{ + vespalib::string _name; + bool _distributor; + std::unique_ptr<storage::api::StorageMessageAddress> _storage_address; + BmMessageBus& _message_bus; + void send_msg(std::shared_ptr<storage::api::StorageCommand> cmd, PendingTracker& tracker); +public: + StorageApiMessageBusBmFeedHandler(BmMessageBus &message_bus, bool distributor); + ~StorageApiMessageBusBmFeedHandler(); + void put(const document::Bucket& bucket, std::unique_ptr<document::Document> document, uint64_t timestamp, PendingTracker& tracker) override; + void update(const document::Bucket& bucket, std::unique_ptr<document::DocumentUpdate> document_update, uint64_t timestamp, PendingTracker& tracker) override; + void remove(const document::Bucket& bucket, const document::DocumentId& document_id, uint64_t timestamp, PendingTracker& tracker) override; + uint32_t get_error_count() const override; + const vespalib::string &get_name() const override; + bool manages_buckets() const override; + bool manages_timestamp() const override; +}; + +} diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp index 53e833f3e24..9fe6662b79a 100644 --- a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp +++ b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.cpp @@ -2,6 +2,7 @@ #include "storage_api_rpc_bm_feed_handler.h" #include "pending_tracker.h" +#include "pending_tracker_hash.h" #include "storage_reply_error_checker.h" #include <vespa/document/fieldvalue/document.h> #include <vespa/document/update/documentupdate.h> @@ -12,8 +13,6 @@ #include <vespa/storage/storageserver/rpc/message_codec_provider.h> #include <vespa/storage/storageserver/rpc/shared_rpc_resources.h> #include <vespa/storage/storageserver/rpc/storage_api_rpc_service.h> -#include <vespa/vespalib/stllike/hash_map.h> -#include <vespa/vespalib/stllike/hash_map.hpp> #include <cassert> using document::Document; @@ -30,14 +29,12 @@ namespace feedbm { class StorageApiRpcBmFeedHandler::MyMessageDispatcher : public storage::MessageDispatcher, public StorageReplyErrorChecker { - std::mutex _mutex; - vespalib::hash_map<uint64_t, PendingTracker *> _pending; + PendingTrackerHash _pending_hash; public: MyMessageDispatcher() : storage::MessageDispatcher(), StorageReplyErrorChecker(), - _mutex(), - _pending() + _pending_hash() { } ~MyMessageDispatcher() override; @@ -49,28 +46,19 @@ public: check_error(*msg); release(msg->getMsgId()); } - void retain(uint64_t msg_id, PendingTracker &tracker) { - tracker.retain(); - std::lock_guard lock(_mutex); - _pending.insert(std::make_pair(msg_id, &tracker)); - } + void retain(uint64_t msg_id, PendingTracker &tracker) { _pending_hash.retain(msg_id, tracker); } void release(uint64_t msg_id) { - PendingTracker *tracker = nullptr; - { - std::lock_guard lock(_mutex); - auto itr = _pending.find(msg_id); - assert(itr != _pending.end()); - tracker = itr->second; - _pending.erase(itr); + auto tracker = _pending_hash.release(msg_id); + if (tracker != nullptr) { + tracker->release(); + } else { + ++_errors; } - tracker->release(); } }; StorageApiRpcBmFeedHandler::MyMessageDispatcher::~MyMessageDispatcher() { - std::lock_guard lock(_mutex); - assert(_pending.empty()); } StorageApiRpcBmFeedHandler::StorageApiRpcBmFeedHandler(SharedRpcResources& shared_rpc_resources_in, @@ -78,7 +66,7 @@ StorageApiRpcBmFeedHandler::StorageApiRpcBmFeedHandler(SharedRpcResources& share const StorageApiRpcService::Params& rpc_params, bool distributor) : IBmFeedHandler(), - _name(vespalib::string("StorageApiRpcBmFeedHandler(") + (distributor ? "distributor" : "servicelayer") + ")"), + _name(vespalib::string("StorageApiRpcBmFeedHandler(") + (distributor ? "distributor" : "service-layer") + ")"), _distributor(distributor), _storage_address(std::make_unique<StorageMessageAddress>("storage", distributor ? NodeType::DISTRIBUTOR : NodeType::STORAGE, 0)), _shared_rpc_resources(shared_rpc_resources_in), diff --git a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h index 9901c21f174..535171c39e1 100644 --- a/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h +++ b/searchcore/src/apps/vespa-feed-bm/storage_api_rpc_bm_feed_handler.h @@ -19,8 +19,8 @@ class SharedRpcResources; namespace feedbm { /* - * Benchmark feed handler for feed to service layer using storage api protocol - * over rpc. + * Benchmark feed handler for feed to service layer or distributor + * using storage api protocol over rpc. */ class StorageApiRpcBmFeedHandler : public IBmFeedHandler { diff --git a/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.h b/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.h index 78004f3d787..4743367b426 100644 --- a/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.h +++ b/searchcore/src/apps/vespa-feed-bm/storage_reply_error_checker.h @@ -9,6 +9,7 @@ namespace storage::api { class StorageMessage; } namespace feedbm { class StorageReplyErrorChecker { +protected: std::atomic<uint32_t> _errors; public: StorageReplyErrorChecker(); diff --git a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp index 3aada3cc958..dc8d787a778 100644 --- a/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp +++ b/searchcore/src/apps/vespa-feed-bm/vespa_feed_bm.cpp @@ -1,12 +1,15 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "bm_cluster_controller.h" +#include "bm_message_bus.h" #include "bm_storage_chain_builder.h" #include "bm_storage_link_context.h" #include "pending_tracker.h" #include "spi_bm_feed_handler.h" #include "storage_api_chain_bm_feed_handler.h" +#include "storage_api_message_bus_bm_feed_handler.h" #include "storage_api_rpc_bm_feed_handler.h" +#include "document_api_message_bus_bm_feed_handler.h" #include <tests/proton/common/dummydbowner.h> #include <vespa/config-attributes.h> #include <vespa/config-bucketspaces.h> @@ -123,11 +126,14 @@ using document::FieldUpdate; using document::IntFieldValue; using document::test::makeBucketSpace; using feedbm::BmClusterController; +using feedbm::BmMessageBus; using feedbm::BmStorageChainBuilder; using feedbm::BmStorageLinkContext; using feedbm::IBmFeedHandler; +using feedbm::DocumentApiMessageBusBmFeedHandler; using feedbm::SpiBmFeedHandler; using feedbm::StorageApiChainBmFeedHandler; +using feedbm::StorageApiMessageBusBmFeedHandler; using feedbm::StorageApiRpcBmFeedHandler; using search::TuneFileDocumentDB; using search::index::DummyFileHeaderContext; @@ -193,6 +199,23 @@ std::shared_ptr<DocumentDBConfig> make_document_db_config(std::shared_ptr<Docume doc_type_name.getName()); } +void +make_slobroks_config(SlobroksConfigBuilder& slobroks, int slobrok_port) +{ + SlobroksConfigBuilder::Slobrok slobrok; + slobrok.connectionspec = vespalib::make_string("tcp/localhost:%d", slobrok_port); + slobroks.slobrok.push_back(std::move(slobrok)); +} + +void +make_bucketspaces_config(BucketspacesConfigBuilder &bucketspaces) +{ + BucketspacesConfigBuilder::Documenttype bucket_space_map; + bucket_space_map.name = "test"; + bucket_space_map.bucketspace = "default"; + bucketspaces.documenttype.emplace_back(std::move(bucket_space_map)); +} + class MyPersistenceEngineOwner : public IPersistenceEngineOwner { void setClusterState(BucketSpace, const storage::spi::ClusterState &) override { } @@ -245,6 +268,8 @@ class BMParams { uint32_t _response_threads; bool _enable_distributor; bool _enable_service_layer; + bool _use_document_api; + bool _use_message_bus; bool _use_storage_chain; bool _use_legacy_bucket_db; uint32_t get_start(uint32_t thread_id) const { @@ -261,6 +286,8 @@ public: _response_threads(2), // Same default as in stor-filestor.def _enable_distributor(false), _enable_service_layer(false), + _use_document_api(false), + _use_message_bus(false), _use_storage_chain(false), _use_legacy_bucket_db(false) { @@ -276,7 +303,8 @@ public: uint32_t get_rpc_network_threads() const { return _rpc_network_threads; } uint32_t get_response_threads() const { return _response_threads; } bool get_enable_distributor() const { return _enable_distributor; } - bool get_enable_service_layer() const { return _enable_service_layer || _enable_distributor; } + bool get_use_document_api() const { return _use_document_api; } + bool get_use_message_bus() const { return _use_message_bus; } bool get_use_storage_chain() const { return _use_storage_chain; } bool get_use_legacy_bucket_db() const { return _use_legacy_bucket_db; } void set_documents(uint32_t documents_in) { _documents = documents_in; } @@ -288,9 +316,14 @@ public: void set_response_threads(uint32_t threads_in) { _response_threads = threads_in; } void set_enable_distributor(bool enable_distributor_in) { _enable_distributor = enable_distributor_in; } void set_enable_service_layer(bool enable_service_layer_in) { _enable_service_layer = enable_service_layer_in; } + void set_use_document_api(bool use_document_api_in) { _use_document_api = use_document_api_in;; } + void set_use_message_bus(bool use_message_bus_in) { _use_message_bus = use_message_bus_in; } void set_use_storage_chain(bool use_storage_chain_in) { _use_storage_chain = use_storage_chain_in; } void set_use_legacy_bucket_db(bool use_legacy_bucket_db_in) { _use_legacy_bucket_db = use_legacy_bucket_db_in; } bool check() const; + bool needs_service_layer() const { return _enable_service_layer || _enable_distributor || _use_storage_chain || _use_message_bus || _use_document_api; } + bool needs_distributor() const { return _enable_distributor || _use_document_api; } + bool needs_message_bus() const { return _use_message_bus || _use_document_api; } }; bool @@ -426,17 +459,14 @@ struct MyStorageConfig } else { stor_server.rootFolder = "storage"; } - { - SlobroksConfigBuilder::Slobrok slobrok; - slobrok.connectionspec = vespalib::make_string("tcp/localhost:%d", slobrok_port); - slobroks.slobrok.push_back(std::move(slobrok)); - } + make_slobroks_config(slobroks, slobrok_port); stor_communicationmanager.useDirectStorageapiRpc = true; stor_communicationmanager.rpc.numNetworkThreads = params.get_rpc_network_threads(); stor_communicationmanager.mbusport = mbus_port; stor_communicationmanager.rpcport = rpc_port; stor_status.httpport = status_port; + make_bucketspaces_config(bucketspaces); } ~MyStorageConfig(); @@ -524,11 +554,7 @@ struct MyRpcClientConfig { : config_id(config_id_in), slobroks() { - { - SlobroksConfigBuilder::Slobrok slobrok; - slobrok.connectionspec = vespalib::make_string("tcp/localhost:%d", slobrok_port); - slobroks.slobrok.push_back(std::move(slobrok)); - } + make_slobroks_config(slobroks, slobrok_port); } ~MyRpcClientConfig(); @@ -539,6 +565,28 @@ struct MyRpcClientConfig { MyRpcClientConfig::~MyRpcClientConfig() = default; +struct MyMessageBusConfig { + vespalib::string config_id; + SlobroksConfigBuilder slobroks; + MessagebusConfigBuilder messagebus; + + MyMessageBusConfig(const vespalib::string &config_id_in, int slobrok_port) + : config_id(config_id_in), + slobroks(), + messagebus() + { + make_slobroks_config(slobroks, slobrok_port); + } + ~MyMessageBusConfig(); + + void add_builders(ConfigSet &set) { + set.addBuilder(config_id, &slobroks); + set.addBuilder(config_id, &messagebus); + } +}; + +MyMessageBusConfig::~MyMessageBusConfig() = default; + } struct PersistenceProviderFixture { @@ -576,6 +624,7 @@ struct PersistenceProviderFixture { MyServiceLayerConfig _service_layer_config; MyDistributorConfig _distributor_config; MyRpcClientConfig _rpc_client_config; + MyMessageBusConfig _message_bus_config; ConfigSet _config_set; std::shared_ptr<IConfigContext> _config_context; std::unique_ptr<IBmFeedHandler> _feed_handler; @@ -585,6 +634,7 @@ struct PersistenceProviderFixture { std::unique_ptr<SharedRpcResources> _rpc_client_shared_rpc_resources; std::shared_ptr<BmStorageLinkContext> _distributor_chain_context; std::unique_ptr<storage::DistributorProcess> _distributor; + std::unique_ptr<BmMessageBus> _message_bus; PersistenceProviderFixture(const BMParams& params); ~PersistenceProviderFixture(); @@ -599,8 +649,10 @@ struct PersistenceProviderFixture { void wait_slobrok(const vespalib::string &name); void start_service_layer(const BMParams& params); void start_distributor(const BMParams& params); + void start_message_bus(); void create_feed_handler(const BMParams& params); void shutdown_feed_handler(); + void shutdown_message_bus(); void shutdown_distributor(); void shutdown_service_layer(); }; @@ -640,6 +692,7 @@ PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params) _service_layer_config("bm-servicelayer", *_document_types, _slobrok_port, _service_layer_mbus_port, _service_layer_rpc_port, _service_layer_status_port, params), _distributor_config("bm-distributor", *_document_types, _slobrok_port, _distributor_mbus_port, _distributor_rpc_port, _distributor_status_port, params), _rpc_client_config("bm-rpc-client", _slobrok_port), + _message_bus_config("bm-message-bus", _slobrok_port), _config_set(), _config_context(std::make_shared<ConfigContext>(_config_set)), _feed_handler(), @@ -648,7 +701,8 @@ PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params) _service_layer(), _rpc_client_shared_rpc_resources(), _distributor_chain_context(), - _distributor() + _distributor(), + _message_bus() { create_document_db(); _persistence_engine = std::make_unique<PersistenceEngine>(_persistence_owner, _write_filter, -1, false); @@ -657,6 +711,7 @@ PersistenceProviderFixture::PersistenceProviderFixture(const BMParams& params) _service_layer_config.add_builders(_config_set); _distributor_config.add_builders(_config_set); _rpc_client_config.add_builders(_config_set); + _message_bus_config.add_builders(_config_set); _feed_handler = std::make_unique<SpiBmFeedHandler>(*_persistence_engine); } @@ -772,7 +827,7 @@ PersistenceProviderFixture::start_service_layer(const BMParams& params) LOG(info, "start service layer"); config::ConfigUri config_uri("bm-servicelayer", _config_context); std::unique_ptr<BmStorageChainBuilder> chain_builder; - if (params.get_use_storage_chain() && !params.get_enable_distributor()) { + if (params.get_use_storage_chain() && !params.needs_distributor()) { chain_builder = std::make_unique<BmStorageChainBuilder>(); _service_layer_chain_context = chain_builder->get_context(); } @@ -797,7 +852,7 @@ PersistenceProviderFixture::start_distributor(const BMParams& params) { config::ConfigUri config_uri("bm-distributor", _config_context); std::unique_ptr<BmStorageChainBuilder> chain_builder; - if (params.get_use_storage_chain()) { + if (params.get_use_storage_chain() && !params.get_use_document_api()) { chain_builder = std::make_unique<BmStorageChainBuilder>(); _distributor_chain_context = chain_builder->get_context(); } @@ -816,24 +871,39 @@ PersistenceProviderFixture::start_distributor(const BMParams& params) } void +PersistenceProviderFixture::start_message_bus() +{ + config::ConfigUri config_uri("bm-message-bus", _config_context); + LOG(info, "Starting message bus"); + _message_bus = std::make_unique<BmMessageBus>(config_uri, + _repo, + documentapi::LoadTypeSet()); + LOG(info, "Started message bus"); +} + +void PersistenceProviderFixture::create_feed_handler(const BMParams& params) { StorageApiRpcService::Params rpc_params; // This is the same compression config as the default in stor-communicationmanager.def. rpc_params.compression_config = CompressionConfig(CompressionConfig::Type::LZ4, 3, 90, 1024); - if (params.get_enable_distributor()) { + if (params.get_use_document_api()) { + _feed_handler = std::make_unique<DocumentApiMessageBusBmFeedHandler>(*_message_bus); + } else if (params.get_enable_distributor()) { if (params.get_use_storage_chain()) { assert(_distributor_chain_context); _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(_distributor_chain_context, true); + } else if (params.get_use_message_bus()) { + _feed_handler = std::make_unique<StorageApiMessageBusBmFeedHandler>(*_message_bus, true); } else { _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo, rpc_params, true); } - return; - } - if (params.get_enable_service_layer()) { + } else if (params.needs_service_layer()) { if (params.get_use_storage_chain()) { assert(_service_layer_chain_context); _feed_handler = std::make_unique<StorageApiChainBmFeedHandler>(_service_layer_chain_context, false); + } else if (params.get_use_message_bus()) { + _feed_handler = std::make_unique<StorageApiMessageBusBmFeedHandler>(*_message_bus, false); } else { _feed_handler = std::make_unique<StorageApiRpcBmFeedHandler>(*_rpc_client_shared_rpc_resources, _repo, rpc_params, false); } @@ -847,6 +917,15 @@ PersistenceProviderFixture::shutdown_feed_handler() } void +PersistenceProviderFixture::shutdown_message_bus() +{ + if (_message_bus) { + LOG(info, "stop message bus"); + _message_bus.reset(); + } +} + +void PersistenceProviderFixture::shutdown_distributor() { if (_distributor) { @@ -1130,12 +1209,15 @@ void benchmark_async_spi(const BMParams &bm_params) if (!f._feed_handler->manages_buckets()) { f.create_buckets(); } - if (bm_params.get_enable_service_layer()) { + if (bm_params.needs_service_layer()) { f.start_service_layer(bm_params); } - if (bm_params.get_enable_distributor()) { + if (bm_params.needs_distributor()) { f.start_distributor(bm_params); } + if (bm_params.needs_message_bus()) { + f.start_message_bus(); + } f.create_feed_handler(bm_params); vespalib::ThreadStackExecutor executor(bm_params.get_client_threads(), 128 * 1024); auto put_feed = make_feed(executor, bm_params, [&f](BMRange range, BucketSelector bucket_selector) { return make_put_feed(f, range, bucket_selector); }, f.num_buckets(), "put"); @@ -1149,6 +1231,7 @@ void benchmark_async_spi(const BMParams &bm_params) LOG(info, "--------------------------------"); f.shutdown_feed_handler(); + f.shutdown_message_bus(); f.shutdown_distributor(); f.shutdown_service_layer(); } @@ -1189,6 +1272,8 @@ App::usage() "[--response-threads threads]\n" "[--enable-distributor]\n" "[--enable-service-layer]\n" + "[--use-document-api]\n" + "[--use-message-bus\n" "[--use-storage-chain]\n" "[--use-legacy-bucket-db]" << std::endl; } @@ -1209,6 +1294,8 @@ App::get_options() { "response-threads", 1, nullptr, 0 }, { "enable-distributor", 0, nullptr, 0 }, { "enable-service-layer", 0, nullptr, 0 }, + { "use-document-api", 0, nullptr, 0 }, + { "use-message-bus", 0, nullptr, 0 }, { "use-storage-chain", 0, nullptr, 0 }, { "use-legacy-bucket-db", 0, nullptr, 0 } }; @@ -1222,6 +1309,8 @@ App::get_options() LONGOPT_RESPONSE_THREADS, LONGOPT_ENABLE_DISTRIBUTOR, LONGOPT_ENABLE_SERVICE_LAYER, + LONGOPT_USE_DOCUMENT_API, + LONGOPT_USE_MESSAGE_BUS, LONGOPT_USE_STORAGE_CHAIN, LONGOPT_USE_LEGACY_BUCKET_DB }; @@ -1258,6 +1347,12 @@ App::get_options() case LONGOPT_ENABLE_SERVICE_LAYER: _bm_params.set_enable_service_layer(true); break; + case LONGOPT_USE_DOCUMENT_API: + _bm_params.set_use_document_api(true); + break; + case LONGOPT_USE_MESSAGE_BUS: + _bm_params.set_use_message_bus(true); + break; case LONGOPT_USE_STORAGE_CHAIN: _bm_params.set_use_storage_chain(true); break; diff --git a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp index 7fb33dc242b..8228ceb5e79 100644 --- a/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp +++ b/storage/src/tests/storageserver/rpc/storage_api_rpc_service_test.cpp @@ -308,6 +308,7 @@ TEST_F(StorageApiRpcServiceTest, can_send_and_respond_to_request_end_to_end) { TEST_F(StorageApiRpcServiceTest, send_to_unknown_address_bounces_with_error_reply) { auto cmd = _node_0->create_dummy_put_command(); cmd->setAddress(non_existing_address()); + cmd->getTrace().setLevel(9); _node_0->send_request(cmd); auto bounced_msg = _node_0->wait_and_receive_single_message(); @@ -323,6 +324,7 @@ TEST_F(StorageApiRpcServiceTest, send_to_unknown_address_bounces_with_error_repl to_slobrok_id(non_existing_address()).c_str(), vespalib::HostName::get().c_str()); EXPECT_EQ(put_reply->getResult(), api::ReturnCode(expected_code, expected_msg)); + EXPECT_THAT(put_reply->getTrace().toString(), HasSubstr("The service must be having problems")); } TEST_F(StorageApiRpcServiceTest, request_metadata_is_propagated_to_receiver) { @@ -408,4 +410,17 @@ TEST_F(StorageApiRpcServiceTest, malformed_request_payload_returns_rpc_error) { // TODO also test bad response header/payload +TEST_F(StorageApiRpcServiceTest, trace_events_are_emitted_for_send_and_receive) { + auto recv_cmd = send_and_receive_put_command_at_node_1([](auto& cmd){ + cmd.getTrace().setLevel(9); + }); + auto recv_reply = respond_and_receive_put_reply_at_node_0(recv_cmd); + auto trace_str = recv_reply->getTrace().toString(); + // Ordering of traced events matter, so we use a cheeky regex. + EXPECT_THAT(trace_str, ContainsRegex("Sending request from.+" + "Request received at.+" + "Sending response from.+" + "Response received at")); +} + } diff --git a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h index 722d1fd3a81..740277218c3 100644 --- a/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h +++ b/storage/src/vespa/storage/storageserver/rpc/shared_rpc_resources.h @@ -48,6 +48,7 @@ public: // Hostname of host node is running on. [[nodiscard]] const vespalib::string& hostname() const noexcept { return _hostname; } + [[nodiscard]] const vespalib::string handle() const noexcept { return _handle; } const RpcTargetFactory& target_factory() const; private: diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp index a8cdc0bfeea..8b5c7706510 100644 --- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp +++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.cpp @@ -16,12 +16,14 @@ #include <vespa/vespalib/data/databuffer.h> #include <vespa/vespalib/util/compressor.h> #include <vespa/vespalib/util/stringfmt.h> +#include <vespa/vespalib/trace/tracelevel.h> #include <cassert> #include <vespa/log/log.h> LOG_SETUP(".storage.storage_api_rpc_service"); using vespalib::compression::CompressionConfig; +using vespalib::TraceLevel; namespace storage::rpc { @@ -177,16 +179,28 @@ void StorageApiRpcService::RPC_rpc_v1_send(FRT_RPCRequest* req) { scmd->getTrace().setLevel(hdr.trace_level()); scmd->setTimeout(std::chrono::milliseconds(hdr.time_remaining_ms())); req->DiscardBlobs(); + if (scmd->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) { + scmd->getTrace().trace(TraceLevel::SEND_RECEIVE, + vespalib::make_string("Request received at '%s' (tcp/%s:%d) with %u bytes of payload", + _rpc_resources.handle().c_str(), + _rpc_resources.hostname().c_str(), + _rpc_resources.listen_port(), + uncompressed_size)); + } detach_and_forward_to_enqueuer(std::move(scmd), req); } else { req->SetError(FRTE_RPC_METHOD_FAILED, "Unable to decode RPC request payload"); } } -void StorageApiRpcService::encode_rpc_v1_response(FRT_RPCRequest& request, const api::StorageReply& reply) { +void StorageApiRpcService::encode_rpc_v1_response(FRT_RPCRequest& request, api::StorageReply& reply) { LOG(spam, "Server: encoding rpc.v1 response header and payload"); auto* ret = request.GetReturn(); + if (reply.getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) { + reply.getTrace().trace(TraceLevel::SEND_RECEIVE, + vespalib::make_string("Sending response from '%s'", _rpc_resources.handle().c_str())); + } // TODO skip encoding header altogether if no relevant fields set? protobuf::ResponseHeader hdr; if (reply.getTrace().getLevel() > 0) { @@ -206,11 +220,21 @@ void StorageApiRpcService::send_rpc_v1_request(std::shared_ptr<api::StorageComma if (!target) { auto reply = cmd->makeReply(); reply->setResult(make_no_address_for_service_error(*cmd->getAddress())); + if (reply->getTrace().shouldTrace(TraceLevel::ERROR)) { + reply->getTrace().trace(TraceLevel::ERROR, reply->getResult().getMessage()); + } // Always dispatch async for synchronously generated replies, or we risk nuking the // stack if the reply receiver keeps resending synchronously as well. _message_dispatcher.dispatch_async(std::move(reply)); return; } + if (cmd->getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) { + cmd->getTrace().trace(TraceLevel::SEND_RECEIVE, + vespalib::make_string("Sending request from '%s' to '%s' (%s) with timeout of %g seconds", + _rpc_resources.handle().c_str(), + CachingRpcTargetResolver::address_to_slobrok_id(*cmd->getAddress()).c_str(), + target->_spec.c_str(), vespalib::to_s(cmd->getTimeout()))); + } std::unique_ptr<FRT_RPCRequest, SubRefDeleter> req(_rpc_resources.supervisor().AllocRPCRequest()); req->SetMethodName(rpc_v1_method_name()); @@ -233,12 +257,12 @@ void StorageApiRpcService::send_rpc_v1_request(std::shared_ptr<api::StorageComma void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) { std::unique_ptr<FRT_RPCRequest, SubRefDeleter> req(raw_req); auto* req_ctx = static_cast<RpcRequestContext*>(req->GetContext()._value.VOIDP); + auto& cmd = *req_ctx->_originator_cmd; if (!req->CheckReturnTypes("bixbix")) { handle_request_done_rpc_error(*req, *req_ctx); return; } LOG(spam, "Client: received rpc.v1 OK response"); - const auto& ret = *req->GetReturn(); protobuf::ResponseHeader hdr; if (!decode_header_from_rpc_params(ret, hdr)) { @@ -246,8 +270,10 @@ void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) { return; } std::unique_ptr<mbusprot::StorageReply> wrapped_reply; - bool ok = uncompress_rpc_payload(ret, [&wrapped_reply, req_ctx](auto& codec, auto payload) { + uint32_t uncompressed_size = 0; + bool ok = uncompress_rpc_payload(ret, [&wrapped_reply, &uncompressed_size, req_ctx](auto& codec, auto payload) { wrapped_reply = codec.decodeReply(payload, *req_ctx->_originator_cmd); + uncompressed_size = payload.size(); }); if (!ok) { assert(!wrapped_reply); @@ -259,9 +285,16 @@ void StorageApiRpcService::RequestDone(FRT_RPCRequest* raw_req) { assert(reply); if (!hdr.trace_payload().empty()) { - req_ctx->_originator_cmd->getTrace().getRoot().addChild(mbus::TraceNode::decode(hdr.trace_payload())); + cmd.getTrace().getRoot().addChild(mbus::TraceNode::decode(hdr.trace_payload())); + } + if (cmd.getTrace().shouldTrace(TraceLevel::SEND_RECEIVE)) { + cmd.getTrace().trace(TraceLevel::SEND_RECEIVE, + vespalib::make_string("Response received at '%s' with %u bytes of payload", + _rpc_resources.handle().c_str(), + uncompressed_size)); } - reply->getTrace().swap(req_ctx->_originator_cmd->getTrace()); + reply->getTrace().swap(cmd.getTrace()); + reply->setApproxByteSize(uncompressed_size); // TODO ensure that no implicit long-lived refs end up pointing into RPC memory...! req->DiscardBlobs(); @@ -291,10 +324,13 @@ void StorageApiRpcService::handle_request_done_decode_error(const RpcRequestCont void StorageApiRpcService::create_and_dispatch_error_reply(api::StorageCommand& cmd, api::ReturnCode error) { auto error_reply = cmd.makeReply(); - LOG(debug, "Client: rpc.v1 failed decode from %s: '%s'", + LOG(debug, "Client: rpc.v1 failed for target '%s': '%s'", cmd.getAddress()->toString().c_str(), error.toString().c_str()); + error_reply->getTrace().swap(cmd.getTrace()); + if (error_reply->getTrace().shouldTrace(TraceLevel::ERROR)) { + error_reply->getTrace().trace(TraceLevel::ERROR, error.getMessage()); + } error_reply->setResult(std::move(error)); - // TODO needs tracing of received-event! _message_dispatcher.dispatch_sync(std::move(error_reply)); } @@ -355,9 +391,6 @@ bool StorageApiRpcService::target_supports_direct_rpc( /* * Major TODOs: - * - tracing and trace propagation - * - forwards/backwards compatibility - * - what to remap bounced Not Found errors to internally? * - lifetime semantics of FRT targets vs requests created from them? * - lifetime of document type/fieldset repos vs messages * - is repo ref squirreled away into the messages anywhere? diff --git a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h index 39508e51841..cb2344ccd13 100644 --- a/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h +++ b/storage/src/vespa/storage/storageserver/rpc/storage_api_rpc_service.h @@ -58,14 +58,13 @@ public: [[nodiscard]] bool target_supports_direct_rpc(const api::StorageMessageAddress& addr) const noexcept; void RPC_rpc_v1_send(FRT_RPCRequest* req); - void encode_rpc_v1_response(FRT_RPCRequest& request, const api::StorageReply& reply); + void encode_rpc_v1_response(FRT_RPCRequest& request, api::StorageReply& reply); void send_rpc_v1_request(std::shared_ptr<api::StorageCommand> cmd); static constexpr const char* rpc_v1_method_name() noexcept { return "storageapi.v1.send"; } private: - // TODO dedupe void detach_and_forward_to_enqueuer(std::shared_ptr<api::StorageMessage> cmd, FRT_RPCRequest* req); struct RpcRequestContext { |