diff options
88 files changed, 602 insertions, 299 deletions
diff --git a/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java b/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java index 4b9fd7db27e..b40f1fed131 100644 --- a/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java +++ b/config-model-api/src/main/java/com/yahoo/config/model/api/ModelContext.java @@ -67,7 +67,7 @@ public interface ModelContext { default Optional<EndpointCertificateSecrets> endpointCertificateSecrets() { return Optional.empty(); } double defaultTermwiseLimit(); boolean useBucketSpaceMetric(); - default boolean useNewAthenzFilter() { return false; } + default boolean useNewAthenzFilter() { return true; } // TODO bjorncs: Remove after end of April // TODO: Remove after April 2020 default boolean usePhraseSegmenting() { return false; } diff --git a/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java b/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java index 896c6ea9a7f..c2f1e399e08 100644 --- a/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java +++ b/config-model/src/main/java/com/yahoo/config/model/deploy/TestProperties.java @@ -42,7 +42,6 @@ public class TestProperties implements ModelContext.Properties { private boolean useAdaptiveDispatch = false; private double defaultTermwiseLimit = 1.0; private Optional<EndpointCertificateSecrets> endpointCertificateSecrets = Optional.empty(); - private boolean useNewAthenzFilter = false; private AthenzDomain athenzDomain; @Override public boolean multitenant() { return multitenant; } @@ -63,7 +62,6 @@ public class TestProperties implements ModelContext.Properties { @Override public Optional<TlsSecrets> tlsSecrets() { return endpointCertificateSecrets.map(TlsSecrets::new); } @Override public double defaultTermwiseLimit() { return defaultTermwiseLimit; } @Override public boolean useBucketSpaceMetric() { return true; } - @Override public boolean useNewAthenzFilter() { return useNewAthenzFilter; } @Override public boolean useDedicatedNodesWhenUnspecified() { return true; } @Override public Optional<AthenzDomain> athenzDomain() { return Optional.ofNullable(athenzDomain); } @@ -107,11 +105,6 @@ public class TestProperties implements ModelContext.Properties { return this; } - public TestProperties setUseNewAthenzFilter(boolean useNewAthenzFilter) { - this.useNewAthenzFilter = useNewAthenzFilter; - return this; - } - public TestProperties setZone(Zone zone) { this.zone = zone; return this; diff --git a/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/NodesSpecification.java b/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/NodesSpecification.java index 6a52ff4f051..ad6eebe1ca5 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/NodesSpecification.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/builder/xml/dom/NodesSpecification.java @@ -60,6 +60,18 @@ public class NodesSpecification { boolean required, boolean canFail, boolean exclusive, Optional<String> dockerImageRepo, Optional<String> combinedId) { + if (max.smallerThan(min)) + throw new IllegalArgumentException("Min resources must be larger or equal to max resources, but " + + max + " is smaller than " + min); + + // Non-scaled resources must be equal + if ( ! min.nodeResources().justNonNumbers().equals(max.nodeResources().justNonNumbers())) + throw new IllegalArgumentException("Min and max resources must have the same non-numeric settings, but " + + "min is " + min + " and max " + max); + if (min.nodeResources().bandwidthGbps() != max.nodeResources().bandwidthGbps()) + throw new IllegalArgumentException("Min and max resources must have the same bandwith, but " + + "min is " + min + " and max " + max); + this.min = min; this.max = max; this.dedicated = dedicated; diff --git a/config-provisioning/src/main/java/com/yahoo/config/provision/ClusterResources.java b/config-provisioning/src/main/java/com/yahoo/config/provision/ClusterResources.java index a4ed22c5266..87b5133d4eb 100644 --- a/config-provisioning/src/main/java/com/yahoo/config/provision/ClusterResources.java +++ b/config-provisioning/src/main/java/com/yahoo/config/provision/ClusterResources.java @@ -44,10 +44,12 @@ public class ClusterResources { return false; } - /** Returns true if this is within the given limits (inclusive) */ + /** Returns true if this is within the given limits (inclusive) and is compatible with them */ public boolean isWithin(ClusterResources min, ClusterResources max) { if (this.smallerThan(min)) return false; if (max.smallerThan(this)) return false; + if ( ! this.nodeResources.justNonNumbers().compatibleWith(min.nodeResources.justNonNumbers())) return false; + if ( ! this.nodeResources.justNonNumbers().compatibleWith(max.nodeResources.justNonNumbers())) return false; return true; } diff --git a/config-provisioning/src/main/java/com/yahoo/config/provision/NodeResources.java b/config-provisioning/src/main/java/com/yahoo/config/provision/NodeResources.java index 5fc05a87a7d..05b604b263f 100644 --- a/config-provisioning/src/main/java/com/yahoo/config/provision/NodeResources.java +++ b/config-provisioning/src/main/java/com/yahoo/config/provision/NodeResources.java @@ -148,6 +148,11 @@ public class NodeResources { return with(NodeResources.DiskSpeed.any).with(StorageType.any); } + /** Returns this with all numbers set to 0 */ + public NodeResources justNonNumbers() { + return withVcpu(0).withMemoryGb(0).withDiskGb(0).withBandwidthGbps(0); + } + public NodeResources subtract(NodeResources other) { if ( ! this.isInterchangeableWith(other)) throw new IllegalArgumentException(this + " and " + other + " are not interchangeable"); diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java index 7bfbf5f4e06..bb38d62895b 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/deploy/ModelContextImpl.java @@ -142,7 +142,6 @@ public class ModelContextImpl implements ModelContext { private final Optional<EndpointCertificateSecrets> endpointCertificateSecrets; private final double defaultTermwiseLimit; private final boolean useBucketSpaceMetric; - private final boolean useNewAthenzFilter; private final String proxyProtocol; private final Optional<AthenzDomain> athenzDomain; private final boolean useDedicatedNodesWhenUnspecified; @@ -179,8 +178,6 @@ public class ModelContextImpl implements ModelContext { .with(FetchVector.Dimension.APPLICATION_ID, applicationId.serializedForm()).value(); this.useBucketSpaceMetric = Flags.USE_BUCKET_SPACE_METRIC.bindTo(flagSource) .with(FetchVector.Dimension.APPLICATION_ID, applicationId.serializedForm()).value(); - this.useNewAthenzFilter = Flags.USE_NEW_ATHENZ_FILTER.bindTo(flagSource) - .with(FetchVector.Dimension.APPLICATION_ID, applicationId.serializedForm()).value(); this.proxyProtocol = Flags.PROXY_PROTOCOL.bindTo(flagSource) .with(FetchVector.Dimension.APPLICATION_ID, applicationId.serializedForm()).value(); this.athenzDomain = athenzDomain; @@ -241,7 +238,7 @@ public class ModelContextImpl implements ModelContext { public boolean useBucketSpaceMetric() { return useBucketSpaceMetric; } @Override - public boolean useNewAthenzFilter() { return useNewAthenzFilter; } + public boolean useNewAthenzFilter() { return true; } @Override public String proxyProtocol() { return proxyProtocol; } diff --git a/container-disc/src/main/java/com/yahoo/container/jdisc/component/Deconstructor.java b/container-disc/src/main/java/com/yahoo/container/jdisc/component/Deconstructor.java index 3a153ec3d8a..6a46e331762 100644 --- a/container-disc/src/main/java/com/yahoo/container/jdisc/component/Deconstructor.java +++ b/container-disc/src/main/java/com/yahoo/container/jdisc/component/Deconstructor.java @@ -61,7 +61,7 @@ public class Deconstructor implements ComponentDeconstructor { ((SharedResource) component).release(); } } - if (! destructibleComponents.isEmpty()) + if (! destructibleComponents.isEmpty() || ! bundles.isEmpty()) executor.schedule(new DestructComponentTask(destructibleComponents, bundles), delay.getSeconds(), TimeUnit.SECONDS); } diff --git a/container-disc/src/test/java/com/yahoo/container/jdisc/component/DeconstructorTest.java b/container-disc/src/test/java/com/yahoo/container/jdisc/component/DeconstructorTest.java index 345f75f7eb6..efdc8f44c17 100644 --- a/container-disc/src/test/java/com/yahoo/container/jdisc/component/DeconstructorTest.java +++ b/container-disc/src/test/java/com/yahoo/container/jdisc/component/DeconstructorTest.java @@ -2,14 +2,13 @@ package com.yahoo.container.jdisc.component; import com.yahoo.component.AbstractComponent; +import com.yahoo.container.bundle.MockBundle; import com.yahoo.container.di.componentgraph.Provider; import com.yahoo.jdisc.ResourceReference; import com.yahoo.jdisc.SharedResource; import org.junit.Before; import org.junit.Test; -import java.util.Collections; - import static java.util.Collections.emptyList; import static java.util.Collections.singleton; import static org.junit.Assert.assertTrue; @@ -51,22 +50,41 @@ public class DeconstructorTest { assertTrue(sharedResource.released); } + @Test + public void bundles_are_uninstalled() throws InterruptedException { + var bundle = new UninstallableMockBundle(); + // Done by executor, so it takes some time even with a 0 delay. + deconstructor.deconstruct(emptyList(), singleton(bundle)); + int cnt = 0; + while (! bundle.uninstalled && (cnt++ < 12000)) { + Thread.sleep(10); + } + assertTrue(bundle.uninstalled); + } + private static class TestAbstractComponent extends AbstractComponent { boolean destructed = false; @Override public void deconstruct() { destructed = true; } } private static class TestProvider implements Provider<Void> { - boolean destructed = false; + volatile boolean destructed = false; @Override public Void get() { return null; } @Override public void deconstruct() { destructed = true; } } private static class TestSharedResource implements SharedResource { - boolean released = false; + volatile boolean released = false; @Override public ResourceReference refer() { return null; } @Override public void release() { released = true; } } + + private static class UninstallableMockBundle extends MockBundle { + boolean uninstalled = false; + @Override public void uninstall() { + uninstalled = true; + } + } } diff --git a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/systemflags/SystemFlagsDeployResult.java b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/systemflags/SystemFlagsDeployResult.java index 010e98c2640..57d47757c5e 100644 --- a/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/systemflags/SystemFlagsDeployResult.java +++ b/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/restapi/systemflags/SystemFlagsDeployResult.java @@ -396,5 +396,19 @@ class SystemFlagsDeployResult { } Warning toWarning(Set<FlagsTarget> targets) { return new Warning(message, targets, flagId); } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + WarningWithoutTarget that = (WarningWithoutTarget) o; + return Objects.equals(message, that.message) && + Objects.equals(flagId, that.flagId); + } + + @Override + public int hashCode() { + return Objects.hash(message, flagId); + } } } diff --git a/default_build_settings.cmake b/default_build_settings.cmake index a23a8f1489a..622755495c2 100644 --- a/default_build_settings.cmake +++ b/default_build_settings.cmake @@ -56,12 +56,6 @@ function(setup_vespa_default_build_settings_darwin) set(DEFAULT_EXTRA_INCLUDE_DIRECTORY "${DEFAULT_EXTRA_INCLUDE_DIRECTORY}" PARENT_SCOPE) endfunction() -function(setup_vespa_default_build_settings_fedora_29) - message("-- Setting up default build settings for fedora 29") - set(DEFAULT_EXTRA_INCLUDE_DIRECTORY "${VESPA_DEPS}/include" "/usr/include/openblas" PARENT_SCOPE) - set(DEFAULT_VESPA_LLVM_VERSION "7" PARENT_SCOPE) -endfunction() - function(setup_vespa_default_build_settings_fedora_30) message("-- Setting up default build settings for fedora 30") set(DEFAULT_EXTRA_INCLUDE_DIRECTORY "${VESPA_DEPS}/include" "/usr/include/openblas" PARENT_SCOPE) @@ -171,8 +165,6 @@ function(vespa_use_default_build_settings) setup_vespa_default_build_settings_centos_8() elseif(VESPA_OS_DISTRO STREQUAL "darwin") setup_vespa_default_build_settings_darwin() - elseif(VESPA_OS_DISTRO_COMBINED STREQUAL "fedora 29") - setup_vespa_default_build_settings_fedora_29() elseif(VESPA_OS_DISTRO_COMBINED STREQUAL "fedora 30") setup_vespa_default_build_settings_fedora_30() elseif(VESPA_OS_DISTRO_COMBINED STREQUAL "fedora 31") diff --git a/dist/vespa.spec b/dist/vespa.spec index c89cd42df0f..ba57b1f8c40 100644 --- a/dist/vespa.spec +++ b/dist/vespa.spec @@ -68,13 +68,6 @@ BuildRequires: vespa-protobuf-devel >= 3.7.0-4 BuildRequires: cmake >= 3.9.1 BuildRequires: maven BuildRequires: openssl-devel -%if 0%{?fc29} -BuildRequires: vespa-protobuf-devel >= 3.7.0-4 -BuildRequires: llvm-devel >= 7.0.0 -BuildRequires: boost-devel >= 1.66 -BuildRequires: gtest-devel -BuildRequires: gmock-devel -%endif %if 0%{?fc30} BuildRequires: vespa-protobuf-devel >= 3.7.0-4 BuildRequires: llvm-devel >= 8.0.0 @@ -180,11 +173,6 @@ Requires: openssl-libs %endif %if 0%{?fedora} Requires: openssl-libs -%if 0%{?fc29} -Requires: vespa-protobuf >= 3.7.0-4 -Requires: llvm-libs >= 7.0.0 -%define _vespa_llvm_version 7 -%endif %if 0%{?fc30} Requires: vespa-protobuf >= 3.7.0-4 Requires: llvm-libs >= 8.0.0 diff --git a/eval/src/tests/eval/interpreted_function/interpreted_function_test.cpp b/eval/src/tests/eval/interpreted_function/interpreted_function_test.cpp index d946d244d17..60700817266 100644 --- a/eval/src/tests/eval/interpreted_function/interpreted_function_test.cpp +++ b/eval/src/tests/eval/interpreted_function/interpreted_function_test.cpp @@ -83,7 +83,6 @@ struct MyEvalTest : test::EvalSpec::EvalTest { ? NodeTypes(function, std::vector<ValueType>(params.params.size(), ValueType::double_type())) : NodeTypes(); InterpretedFunction ifun(engine, function, node_types); - ASSERT_EQUAL(ifun.num_params(), params.params.size()); InterpretedFunction::Context ictx(ifun); const Value &result_value = ifun.eval(ictx, params); report_result(result_value.is_double(), result_value.as_double(), expected_result, description); diff --git a/eval/src/tests/eval/node_types/node_types_test.cpp b/eval/src/tests/eval/node_types/node_types_test.cpp index 8eaa7a80a81..7912ec213bc 100644 --- a/eval/src/tests/eval/node_types/node_types_test.cpp +++ b/eval/src/tests/eval/node_types/node_types_test.cpp @@ -18,6 +18,14 @@ struct TypeSpecExtractor : public vespalib::eval::SymbolExtractor { } }; +void print_errors(const NodeTypes &types) { + if (!types.errors().empty()) { + for (const auto &msg: types.errors()) { + fprintf(stderr, "type error: %s\n", msg.c_str()); + } + } +} + void verify(const vespalib::string &type_expr, const vespalib::string &type_spec) { auto function = Function::parse(type_expr, TypeSpecExtractor()); if (!EXPECT_TRUE(!function->has_error())) { @@ -29,11 +37,7 @@ void verify(const vespalib::string &type_expr, const vespalib::string &type_spec input_types.push_back(ValueType::from_spec(function->param_name(i))); } NodeTypes types(*function, input_types); - if (!types.errors().empty()) { - for (const auto &msg: types.errors()) { - fprintf(stderr, "type error: %s\n", msg.c_str()); - } - } + print_errors(types); ValueType expected_type = ValueType::from_spec(type_spec); ValueType actual_type = types.get_type(function->root()); EXPECT_EQUAL(expected_type, actual_type); @@ -306,4 +310,38 @@ TEST("require that empty type repo works as expected") { EXPECT_FALSE(types.all_types_are_double()); } +TEST("require that types for a subtree can be exported") { + auto function = Function::parse("(1+2)+3"); + const auto &root = function->root(); + ASSERT_EQUAL(root.num_children(), 2u); + const auto &n_1_2 = root.get_child(0); + const auto &n_3 = root.get_child(1); + ASSERT_EQUAL(n_1_2.num_children(), 2u); + const auto &n_1 = n_1_2.get_child(0); + const auto &n_2 = n_1_2.get_child(1); + NodeTypes all_types(*function, {}); + NodeTypes some_types = all_types.export_types(n_1_2); + EXPECT_EQUAL(all_types.errors().size(), 0u); + EXPECT_EQUAL(some_types.errors().size(), 0u); + for (const auto node: {&root, &n_3}) { + EXPECT_TRUE(all_types.get_type(*node).is_double()); + EXPECT_TRUE(some_types.get_type(*node).is_error()); + } + for (const auto node: {&n_1_2, &n_1, &n_2}) { + EXPECT_TRUE(all_types.get_type(*node).is_double()); + EXPECT_TRUE(some_types.get_type(*node).is_double()); + } +} + +TEST("require that export_types produces an error for missing types") { + auto fun1 = Function::parse("1+2"); + auto fun2 = Function::parse("1+2"); + NodeTypes fun1_types(*fun1, {}); + NodeTypes bad_export = fun1_types.export_types(fun2->root()); + EXPECT_EQUAL(bad_export.errors().size(), 1u); + print_errors(bad_export); + EXPECT_TRUE(fun1_types.get_type(fun1->root()).is_double()); + EXPECT_TRUE(bad_export.get_type(fun2->root()).is_error()); +} + TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/eval/src/tests/tensor/dense_replace_type_function/dense_replace_type_function_test.cpp b/eval/src/tests/tensor/dense_replace_type_function/dense_replace_type_function_test.cpp index 0533b1c92b7..732fc9c3e69 100644 --- a/eval/src/tests/tensor/dense_replace_type_function/dense_replace_type_function_test.cpp +++ b/eval/src/tests/tensor/dense_replace_type_function/dense_replace_type_function_test.cpp @@ -23,7 +23,7 @@ struct ChildMock : Leaf { bool is_mutable; ChildMock(const ValueType &type) : Leaf(type), is_mutable(true) {} bool result_is_mutable() const override { return is_mutable; } - InterpretedFunction::Instruction compile_self(Stash &) const override { abort(); } + InterpretedFunction::Instruction compile_self(const TensorEngine &, Stash &) const override { abort(); } }; struct Fixture { @@ -43,7 +43,7 @@ struct Fixture { { my_fun.push_children(children); state.stack.push_back(*my_value); - my_fun.compile_self(state.stash).perform(state); + my_fun.compile_self(engine, state.stash).perform(state); ASSERT_EQUAL(children.size(), 1u); ASSERT_EQUAL(state.stack.size(), 1u); ASSERT_TRUE(!new_type.is_error()); diff --git a/eval/src/vespa/eval/eval/basic_nodes.cpp b/eval/src/vespa/eval/eval/basic_nodes.cpp index 9d2b3a619fb..bc7202bebc1 100644 --- a/eval/src/vespa/eval/eval/basic_nodes.cpp +++ b/eval/src/vespa/eval/eval/basic_nodes.cpp @@ -25,7 +25,7 @@ struct Frame { double Node::get_const_value() const { assert(is_const()); - InterpretedFunction function(SimpleTensorEngine::ref(), *this, 0, NodeTypes()); + InterpretedFunction function(SimpleTensorEngine::ref(), *this, NodeTypes()); NoParams no_params; InterpretedFunction::Context ctx(function); return function.eval(ctx, no_params).as_double(); diff --git a/eval/src/vespa/eval/eval/compile_tensor_function.cpp b/eval/src/vespa/eval/eval/compile_tensor_function.cpp index ac36720895f..18ef59507d8 100644 --- a/eval/src/vespa/eval/eval/compile_tensor_function.cpp +++ b/eval/src/vespa/eval/eval/compile_tensor_function.cpp @@ -32,10 +32,11 @@ struct Frame { }; struct ProgramCompiler { + const TensorEngine &engine; Stash &stash; std::vector<Frame> stack; std::vector<Instruction> prog; - ProgramCompiler(Stash &stash_in) : stash(stash_in), stack(), prog() {} + ProgramCompiler(const TensorEngine &engine_in, Stash &stash_in) : engine(engine_in), stash(stash_in), stack(), prog() {} void append(const std::vector<Instruction> &other_prog) { prog.insert(prog.end(), other_prog.begin(), other_prog.end()); @@ -43,9 +44,9 @@ struct ProgramCompiler { void open(const TensorFunction &node) { if (auto if_node = as<tensor_function::If>(node)) { - append(compile_tensor_function(if_node->cond(), stash)); - auto true_prog = compile_tensor_function(if_node->true_child(), stash); - auto false_prog = compile_tensor_function(if_node->false_child(), stash); + append(compile_tensor_function(engine, if_node->cond(), stash)); + auto true_prog = compile_tensor_function(engine, if_node->true_child(), stash); + auto false_prog = compile_tensor_function(engine, if_node->false_child(), stash); true_prog.emplace_back(op_skip, false_prog.size()); prog.emplace_back(op_skip_if_false, true_prog.size()); append(true_prog); @@ -56,7 +57,7 @@ struct ProgramCompiler { } void close(const TensorFunction &node) { - prog.push_back(node.compile_self(stash)); + prog.push_back(node.compile_self(engine, stash)); } std::vector<Instruction> compile(const TensorFunction &function) { @@ -75,8 +76,8 @@ struct ProgramCompiler { } // namespace vespalib::eval::<unnamed> -std::vector<Instruction> compile_tensor_function(const TensorFunction &function, Stash &stash) { - ProgramCompiler compiler(stash); +std::vector<Instruction> compile_tensor_function(const TensorEngine &engine, const TensorFunction &function, Stash &stash) { + ProgramCompiler compiler(engine, stash); return compiler.compile(function); } diff --git a/eval/src/vespa/eval/eval/compile_tensor_function.h b/eval/src/vespa/eval/eval/compile_tensor_function.h index 013d228c2f9..63f00fde053 100644 --- a/eval/src/vespa/eval/eval/compile_tensor_function.h +++ b/eval/src/vespa/eval/eval/compile_tensor_function.h @@ -10,7 +10,8 @@ namespace vespalib { class Stash; } namespace vespalib::eval { struct TensorFunction; +struct TensorEngine; -std::vector<InterpretedFunction::Instruction> compile_tensor_function(const TensorFunction &function, Stash &stash); +std::vector<InterpretedFunction::Instruction> compile_tensor_function(const TensorEngine &engine, const TensorFunction &function, Stash &stash); } // namespace vespalib::eval diff --git a/eval/src/vespa/eval/eval/interpreted_function.cpp b/eval/src/vespa/eval/eval/interpreted_function.cpp index ec28604fd87..121db4ffb6e 100644 --- a/eval/src/vespa/eval/eval/interpreted_function.cpp +++ b/eval/src/vespa/eval/eval/interpreted_function.cpp @@ -61,21 +61,19 @@ InterpretedFunction::Context::Context(const InterpretedFunction &ifun) InterpretedFunction::InterpretedFunction(const TensorEngine &engine, const TensorFunction &function) : _program(), _stash(), - _num_params(0), _tensor_engine(engine) { - _program = compile_tensor_function(function, _stash); + _program = compile_tensor_function(engine, function, _stash); } -InterpretedFunction::InterpretedFunction(const TensorEngine &engine, const nodes::Node &root, size_t num_params_in, const NodeTypes &types) +InterpretedFunction::InterpretedFunction(const TensorEngine &engine, const nodes::Node &root, const NodeTypes &types) : _program(), _stash(), - _num_params(num_params_in), _tensor_engine(engine) { const TensorFunction &plain_fun = make_tensor_function(engine, root, types, _stash); const TensorFunction &optimized = engine.optimize(plain_fun, _stash); - _program = compile_tensor_function(optimized, _stash); + _program = compile_tensor_function(engine, optimized, _stash); } InterpretedFunction::~InterpretedFunction() = default; diff --git a/eval/src/vespa/eval/eval/interpreted_function.h b/eval/src/vespa/eval/eval/interpreted_function.h index e638ccffcea..fb67fcb0b74 100644 --- a/eval/src/vespa/eval/eval/interpreted_function.h +++ b/eval/src/vespa/eval/eval/interpreted_function.h @@ -87,20 +87,18 @@ public: private: std::vector<Instruction> _program; Stash _stash; - size_t _num_params; const TensorEngine &_tensor_engine; public: typedef std::unique_ptr<InterpretedFunction> UP; // for testing; use with care; the tensor function must be kept alive InterpretedFunction(const TensorEngine &engine, const TensorFunction &function); - InterpretedFunction(const TensorEngine &engine, const nodes::Node &root, size_t num_params_in, const NodeTypes &types); + InterpretedFunction(const TensorEngine &engine, const nodes::Node &root, const NodeTypes &types); InterpretedFunction(const TensorEngine &engine, const Function &function, const NodeTypes &types) - : InterpretedFunction(engine, function.root(), function.num_params(), types) {} + : InterpretedFunction(engine, function.root(), types) {} InterpretedFunction(InterpretedFunction &&rhs) = default; ~InterpretedFunction(); size_t program_size() const { return _program.size(); } - size_t num_params() const { return _num_params; } const Value &eval(Context &ctx, const LazyParams ¶ms) const; double estimate_cost_us(const std::vector<double> ¶ms, double budget = 5.0) const; static Function::Issues detect_issues(const Function &function); diff --git a/eval/src/vespa/eval/eval/make_tensor_function.cpp b/eval/src/vespa/eval/eval/make_tensor_function.cpp index bbf6cadbac2..f503532c1f9 100644 --- a/eval/src/vespa/eval/eval/make_tensor_function.cpp +++ b/eval/src/vespa/eval/eval/make_tensor_function.cpp @@ -122,13 +122,13 @@ struct TensorFunctionBuilder : public NodeVisitor, public NodeTraverser { } void make_lambda(const TensorLambda &node) { - InterpretedFunction my_fun(tensor_engine, node.lambda().root(), node.type().dimensions().size(), types); if (node.bindings().empty()) { - NoParams no_params; - TensorSpec spec = tensor_function::Lambda::create_spec_impl(node.type(), no_params, node.bindings(), my_fun); + NoParams no_bound_params; + InterpretedFunction my_fun(tensor_engine, node.lambda().root(), types); + TensorSpec spec = tensor_function::Lambda::create_spec_impl(node.type(), no_bound_params, node.bindings(), my_fun); make_const(node, *stash.create<Value::UP>(tensor_engine.from_spec(spec))); } else { - stack.push_back(tensor_function::lambda(node.type(), node.bindings(), std::move(my_fun), stash)); + stack.push_back(tensor_function::lambda(node.type(), node.bindings(), node.lambda(), types.export_types(node.lambda().root()), stash)); } } diff --git a/eval/src/vespa/eval/eval/node_types.cpp b/eval/src/vespa/eval/eval/node_types.cpp index 5fe441b7a4e..468b9a58655 100644 --- a/eval/src/vespa/eval/eval/node_types.cpp +++ b/eval/src/vespa/eval/eval/node_types.cpp @@ -297,6 +297,26 @@ TypeResolver::TypeResolver(const std::vector<ValueType> ¶ms_in, TypeResolver::~TypeResolver() {} +struct TypeExporter : public NodeTraverser { + const std::map<const Node *, ValueType> &parent_type_map; + std::map<const Node *, ValueType> &exported_type_map; + size_t missing_cnt; + TypeExporter(const std::map<const Node *, ValueType> &parent_type_map_in, + std::map<const Node *, ValueType> &exported_type_map_out) + : parent_type_map(parent_type_map_in), + exported_type_map(exported_type_map_out), + missing_cnt(0) {} + bool open(const Node &) override { return true; } + void close(const Node &node) override { + auto pos = parent_type_map.find(&node); + if (pos != parent_type_map.end()) { + exported_type_map.emplace(&node, pos->second); + } else { + ++missing_cnt; + } + } +}; + } // namespace vespalib::eval::nodes::<unnamed> } // namespace vespalib::eval::nodes @@ -317,6 +337,18 @@ NodeTypes::NodeTypes(const Function &function, const std::vector<ValueType> &inp NodeTypes::~NodeTypes() = default; +NodeTypes +NodeTypes::export_types(const nodes::Node &root) const +{ + NodeTypes exported_types; + nodes::TypeExporter exporter(_type_map, exported_types._type_map); + root.traverse(exporter); + if (exporter.missing_cnt > 0) { + exported_types._errors.push_back(fmt("[export]: %zu nodes had missing types", exporter.missing_cnt)); + } + return exported_types; +} + const ValueType & NodeTypes::get_type(const nodes::Node &node) const { diff --git a/eval/src/vespa/eval/eval/node_types.h b/eval/src/vespa/eval/eval/node_types.h index c072915ffb1..72332564409 100644 --- a/eval/src/vespa/eval/eval/node_types.h +++ b/eval/src/vespa/eval/eval/node_types.h @@ -26,9 +26,12 @@ private: std::vector<vespalib::string> _errors; public: NodeTypes(); + NodeTypes(NodeTypes &&rhs) = default; + NodeTypes &operator=(NodeTypes &&rhs) = default; NodeTypes(const Function &function, const std::vector<ValueType> &input_types); ~NodeTypes(); const std::vector<vespalib::string> &errors() const { return _errors; } + NodeTypes export_types(const nodes::Node &root) const; const ValueType &get_type(const nodes::Node &node) const; template <typename F> void each(F &&f) const { diff --git a/eval/src/vespa/eval/eval/tensor_function.cpp b/eval/src/vespa/eval/eval/tensor_function.cpp index 889738a201d..9b7968f5092 100644 --- a/eval/src/vespa/eval/eval/tensor_function.cpp +++ b/eval/src/vespa/eval/eval/tensor_function.cpp @@ -135,8 +135,8 @@ void op_tensor_create(State &state, uint64_t param) { } void op_tensor_lambda(State &state, uint64_t param) { - const Lambda &self = unwrap_param<Lambda>(param); - TensorSpec spec = self.create_spec(*state.params); + const Lambda::Self &self = unwrap_param<Lambda::Self>(param); + TensorSpec spec = self.parent.create_spec(*state.params, self.fun); const Value &result = *state.stash.create<Value::UP>(state.engine.from_spec(spec)); state.stack.emplace_back(result); } @@ -243,7 +243,7 @@ Op2::visit_children(vespalib::ObjectVisitor &visitor) const //----------------------------------------------------------------------------- Instruction -ConstValue::compile_self(Stash &) const +ConstValue::compile_self(const TensorEngine &, Stash &) const { return Instruction(op_load_const, wrap_param<Value>(_value)); } @@ -262,7 +262,7 @@ ConstValue::visit_self(vespalib::ObjectVisitor &visitor) const //----------------------------------------------------------------------------- Instruction -Inject::compile_self(Stash &) const +Inject::compile_self(const TensorEngine &, Stash &) const { return Instruction::fetch_param(_param_idx); } @@ -277,7 +277,7 @@ Inject::visit_self(vespalib::ObjectVisitor &visitor) const //----------------------------------------------------------------------------- Instruction -Reduce::compile_self(Stash &stash) const +Reduce::compile_self(const TensorEngine &, Stash &stash) const { ReduceParams ¶ms = stash.create<ReduceParams>(_aggr, _dimensions); return Instruction(op_tensor_reduce, wrap_param<ReduceParams>(params)); @@ -294,7 +294,7 @@ Reduce::visit_self(vespalib::ObjectVisitor &visitor) const //----------------------------------------------------------------------------- Instruction -Map::compile_self(Stash &) const +Map::compile_self(const TensorEngine &, Stash &) const { if (result_type().is_double()) { return Instruction(op_double_map, to_param(_function)); @@ -312,7 +312,7 @@ Map::visit_self(vespalib::ObjectVisitor &visitor) const //----------------------------------------------------------------------------- Instruction -Join::compile_self(Stash &) const +Join::compile_self(const TensorEngine &, Stash &) const { if (result_type().is_double()) { if (_function == operation::Mul::f) { @@ -336,7 +336,7 @@ Join::visit_self(vespalib::ObjectVisitor &visitor) const //----------------------------------------------------------------------------- Instruction -Merge::compile_self(Stash &) const +Merge::compile_self(const TensorEngine &, Stash &) const { return Instruction(op_tensor_merge, to_param(_function)); } @@ -351,7 +351,7 @@ Merge::visit_self(vespalib::ObjectVisitor &visitor) const //----------------------------------------------------------------------------- Instruction -Concat::compile_self(Stash &) const +Concat::compile_self(const TensorEngine &, Stash &) const { return Instruction(op_tensor_concat, wrap_param<vespalib::string>(_dimension)); } @@ -374,7 +374,7 @@ Create::push_children(std::vector<Child::CREF> &children) const } Instruction -Create::compile_self(Stash &) const +Create::compile_self(const TensorEngine &, Stash &) const { return Instruction(op_tensor_create, wrap_param<Create>(*this)); } @@ -436,9 +436,11 @@ Lambda::create_spec_impl(const ValueType &type, const LazyParams ¶ms, const } InterpretedFunction::Instruction -Lambda::compile_self(Stash &) const +Lambda::compile_self(const TensorEngine &engine, Stash &stash) const { - return Instruction(op_tensor_lambda, wrap_param<Lambda>(*this)); + InterpretedFunction fun(engine, _lambda->root(), _lambda_types); + Self &self = stash.create<Self>(*this, std::move(fun)); + return Instruction(op_tensor_lambda, wrap_param<Self>(self)); } void @@ -471,7 +473,7 @@ Peek::push_children(std::vector<Child::CREF> &children) const } Instruction -Peek::compile_self(Stash &) const +Peek::compile_self(const TensorEngine &, Stash &) const { return Instruction(op_tensor_peek, wrap_param<Peek>(*this)); } @@ -500,7 +502,7 @@ Peek::visit_children(vespalib::ObjectVisitor &visitor) const //----------------------------------------------------------------------------- Instruction -Rename::compile_self(Stash &stash) const +Rename::compile_self(const TensorEngine &, Stash &stash) const { RenameParams ¶ms = stash.create<RenameParams>(_from, _to); return Instruction(op_tensor_rename, wrap_param<RenameParams>(params)); @@ -524,7 +526,7 @@ If::push_children(std::vector<Child::CREF> &children) const } Instruction -If::compile_self(Stash &) const +If::compile_self(const TensorEngine &, Stash &) const { // 'if' is handled directly by compile_tensor_function to enable // lazy-evaluation of true/false sub-expressions. @@ -578,8 +580,8 @@ const Node &create(const ValueType &type, const std::map<TensorSpec::Address,Nod return stash.create<Create>(type, spec); } -const Node &lambda(const ValueType &type, const std::vector<size_t> &bindings, InterpretedFunction function, Stash &stash) { - return stash.create<Lambda>(type, bindings, std::move(function)); +const Node &lambda(const ValueType &type, const std::vector<size_t> &bindings, const Function &function, NodeTypes node_types, Stash &stash) { + return stash.create<Lambda>(type, bindings, function, std::move(node_types)); } const Node &peek(const Node ¶m, const std::map<vespalib::string, std::variant<TensorSpec::Label, Node::CREF>> &spec, Stash &stash) { diff --git a/eval/src/vespa/eval/eval/tensor_function.h b/eval/src/vespa/eval/eval/tensor_function.h index c4e7384abcd..2cc70f50b15 100644 --- a/eval/src/vespa/eval/eval/tensor_function.h +++ b/eval/src/vespa/eval/eval/tensor_function.h @@ -101,9 +101,10 @@ struct TensorFunction * the value stack during execution. * * @return instruction representing the operation of this node + * @param engine the tensor engine used for evaluation * @param stash heterogeneous object store **/ - virtual InterpretedFunction::Instruction compile_self(Stash &stash) const = 0; + virtual InterpretedFunction::Instruction compile_self(const TensorEngine &engine, Stash &stash) const = 0; // for debug dumping vespalib::string as_string() const; @@ -185,7 +186,7 @@ public: ConstValue(const Value &value_in) : Leaf(value_in.type()), _value(value_in) {} const Value &value() const { return _value; } bool result_is_mutable() const override { return false; } - InterpretedFunction::Instruction compile_self(Stash &stash) const final override; + InterpretedFunction::Instruction compile_self(const TensorEngine &engine, Stash &stash) const final override; void visit_self(vespalib::ObjectVisitor &visitor) const override; }; @@ -201,7 +202,7 @@ public: : Leaf(result_type_in), _param_idx(param_idx_in) {} size_t param_idx() const { return _param_idx; } bool result_is_mutable() const override { return false; } - InterpretedFunction::Instruction compile_self(Stash &stash) const final override; + InterpretedFunction::Instruction compile_self(const TensorEngine &engine, Stash &stash) const final override; void visit_self(vespalib::ObjectVisitor &visitor) const override; }; @@ -222,7 +223,7 @@ public: Aggr aggr() const { return _aggr; } const std::vector<vespalib::string> &dimensions() const { return _dimensions; } bool result_is_mutable() const override { return true; } - InterpretedFunction::Instruction compile_self(Stash &stash) const final override; + InterpretedFunction::Instruction compile_self(const TensorEngine &engine, Stash &stash) const final override; void visit_self(vespalib::ObjectVisitor &visitor) const override; }; @@ -240,7 +241,7 @@ public: : Op1(result_type_in, child_in), _function(function_in) {} map_fun_t function() const { return _function; } bool result_is_mutable() const override { return true; } - InterpretedFunction::Instruction compile_self(Stash &stash) const override; + InterpretedFunction::Instruction compile_self(const TensorEngine &engine, Stash &stash) const override; void visit_self(vespalib::ObjectVisitor &visitor) const override; }; @@ -259,7 +260,7 @@ public: : Op2(result_type_in, lhs_in, rhs_in), _function(function_in) {} join_fun_t function() const { return _function; } bool result_is_mutable() const override { return true; } - InterpretedFunction::Instruction compile_self(Stash &stash) const override; + InterpretedFunction::Instruction compile_self(const TensorEngine &engine, Stash &stash) const override; void visit_self(vespalib::ObjectVisitor &visitor) const override; }; @@ -278,7 +279,7 @@ public: : Op2(result_type_in, lhs_in, rhs_in), _function(function_in) {} join_fun_t function() const { return _function; } bool result_is_mutable() const override { return true; } - InterpretedFunction::Instruction compile_self(Stash &stash) const override; + InterpretedFunction::Instruction compile_self(const TensorEngine &engine, Stash &stash) const override; void visit_self(vespalib::ObjectVisitor &visitor) const override; }; @@ -297,7 +298,7 @@ public: : Op2(result_type_in, lhs_in, rhs_in), _dimension(dimension_in) {} const vespalib::string &dimension() const { return _dimension; } bool result_is_mutable() const override { return true; } - InterpretedFunction::Instruction compile_self(Stash &stash) const final override; + InterpretedFunction::Instruction compile_self(const TensorEngine &engine, Stash &stash) const final override; void visit_self(vespalib::ObjectVisitor &visitor) const override; }; @@ -318,7 +319,7 @@ public: } const std::map<TensorSpec::Address, Child> &spec() const { return _spec; } bool result_is_mutable() const override { return true; } - InterpretedFunction::Instruction compile_self(Stash &stash) const final override; + InterpretedFunction::Instruction compile_self(const TensorEngine &engine, Stash &stash) const final override; void push_children(std::vector<Child::CREF> &children) const final override; void visit_children(vespalib::ObjectVisitor &visitor) const final override; }; @@ -328,16 +329,26 @@ public: class Lambda : public Node { using Super = Node; +public: + struct Self { + const Lambda &parent; + InterpretedFunction fun; + Self(const Lambda &parent_in, InterpretedFunction fun_in) + : parent(parent_in), fun(std::move(fun_in)) {} + }; private: std::vector<size_t> _bindings; - InterpretedFunction _lambda; + std::shared_ptr<Function const> _lambda; + NodeTypes _lambda_types; public: - Lambda(const ValueType &result_type_in, const std::vector<size_t> &bindings_in, InterpretedFunction lambda_in) - : Node(result_type_in), _bindings(bindings_in), _lambda(std::move(lambda_in)) {} + Lambda(const ValueType &result_type_in, const std::vector<size_t> &bindings_in, const Function &lambda_in, NodeTypes lambda_types_in) + : Node(result_type_in), _bindings(bindings_in), _lambda(lambda_in.shared_from_this()), _lambda_types(std::move(lambda_types_in)) {} static TensorSpec create_spec_impl(const ValueType &type, const LazyParams ¶ms, const std::vector<size_t> &bind, const InterpretedFunction &fun); - TensorSpec create_spec(const LazyParams ¶ms) const { return create_spec_impl(result_type(), params, _bindings, _lambda); } + TensorSpec create_spec(const LazyParams ¶ms, const InterpretedFunction &fun) const { + return create_spec_impl(result_type(), params, _bindings, fun); + } bool result_is_mutable() const override { return true; } - InterpretedFunction::Instruction compile_self(Stash &stash) const final override; + InterpretedFunction::Instruction compile_self(const TensorEngine &engine, Stash &stash) const final override; void push_children(std::vector<Child::CREF> &children) const final override; void visit_self(vespalib::ObjectVisitor &visitor) const override; }; @@ -372,7 +383,7 @@ public: const std::map<vespalib::string, MyLabel> &spec() const { return _spec; } const ValueType ¶m_type() const { return _param.get().result_type(); } bool result_is_mutable() const override { return true; } - InterpretedFunction::Instruction compile_self(Stash &stash) const final override; + InterpretedFunction::Instruction compile_self(const TensorEngine &engine, Stash &stash) const final override; void push_children(std::vector<Child::CREF> &children) const final override; void visit_children(vespalib::ObjectVisitor &visitor) const final override; }; @@ -394,7 +405,7 @@ public: const std::vector<vespalib::string> &from() const { return _from; } const std::vector<vespalib::string> &to() const { return _to; } bool result_is_mutable() const override { return true; } - InterpretedFunction::Instruction compile_self(Stash &stash) const final override; + InterpretedFunction::Instruction compile_self(const TensorEngine &engine, Stash &stash) const final override; void visit_self(vespalib::ObjectVisitor &visitor) const override; }; @@ -420,7 +431,7 @@ public: return (true_child().result_is_mutable() && false_child().result_is_mutable()); } - InterpretedFunction::Instruction compile_self(Stash &stash) const final override; + InterpretedFunction::Instruction compile_self(const TensorEngine &engine, Stash &stash) const final override; void visit_children(vespalib::ObjectVisitor &visitor) const final override; }; @@ -434,7 +445,7 @@ const Node &join(const Node &lhs, const Node &rhs, join_fun_t function, Stash &s const Node &merge(const Node &lhs, const Node &rhs, join_fun_t function, Stash &stash); const Node &concat(const Node &lhs, const Node &rhs, const vespalib::string &dimension, Stash &stash); const Node &create(const ValueType &type, const std::map<TensorSpec::Address, Node::CREF> &spec, Stash &stash); -const Node &lambda(const ValueType &type, const std::vector<size_t> &bindings, InterpretedFunction function, Stash &stash); +const Node &lambda(const ValueType &type, const std::vector<size_t> &bindings, const Function &function, NodeTypes node_types, Stash &stash); const Node &peek(const Node ¶m, const std::map<vespalib::string, std::variant<TensorSpec::Label, Node::CREF>> &spec, Stash &stash); const Node &rename(const Node &child, const std::vector<vespalib::string> &from, const std::vector<vespalib::string> &to, Stash &stash); const Node &if_node(const Node &cond, const Node &true_child, const Node &false_child, Stash &stash); diff --git a/eval/src/vespa/eval/tensor/default_tensor_engine.cpp b/eval/src/vespa/eval/tensor/default_tensor_engine.cpp index a9e1ad84eb7..b16241fe5e5 100644 --- a/eval/src/vespa/eval/tensor/default_tensor_engine.cpp +++ b/eval/src/vespa/eval/tensor/default_tensor_engine.cpp @@ -14,6 +14,7 @@ #include "dense/dense_fast_rename_optimizer.h" #include "dense/dense_add_dimension_optimizer.h" #include "dense/dense_remove_dimension_optimizer.h" +#include "dense/dense_lambda_peek_optimizer.h" #include "dense/dense_inplace_join_function.h" #include "dense/dense_inplace_map_function.h" #include "dense/vector_from_doubles_function.h" @@ -267,6 +268,7 @@ DefaultTensorEngine::optimize(const TensorFunction &expr, Stash &stash) const const Child &child = nodes.back(); child.set(VectorFromDoublesFunction::optimize(child.get(), stash)); child.set(DenseTensorCreateFunction::optimize(child.get(), stash)); + child.set(DenseLambdaPeekOptimizer::optimize(child.get(), stash)); child.set(DenseTensorPeekFunction::optimize(child.get(), stash)); child.set(DenseDotProductFunction::optimize(child.get(), stash)); child.set(DenseXWProductFunction::optimize(child.get(), stash)); diff --git a/eval/src/vespa/eval/tensor/dense/CMakeLists.txt b/eval/src/vespa/eval/tensor/dense/CMakeLists.txt index 635a49cb4a9..9e4c9857bd1 100644 --- a/eval/src/vespa/eval/tensor/dense/CMakeLists.txt +++ b/eval/src/vespa/eval/tensor/dense/CMakeLists.txt @@ -14,6 +14,7 @@ vespa_add_library(eval_tensor_dense OBJECT dense_tensor_address_mapper.cpp dense_tensor_cells_iterator.cpp dense_tensor_create_function.cpp + dense_lambda_peek_optimizer.cpp dense_tensor_modify.cpp dense_tensor_peek_function.cpp dense_tensor_reduce.cpp diff --git a/eval/src/vespa/eval/tensor/dense/dense_dot_product_function.cpp b/eval/src/vespa/eval/tensor/dense/dense_dot_product_function.cpp index 2fe89861e9f..c9ff57e4a65 100644 --- a/eval/src/vespa/eval/tensor/dense/dense_dot_product_function.cpp +++ b/eval/src/vespa/eval/tensor/dense/dense_dot_product_function.cpp @@ -11,6 +11,7 @@ namespace vespalib::tensor { using eval::ValueType; using eval::TensorFunction; +using eval::TensorEngine; using eval::as; using eval::Aggr; using namespace eval::tensor_function; @@ -71,7 +72,7 @@ DenseDotProductFunction::DenseDotProductFunction(const eval::TensorFunction &lhs } eval::InterpretedFunction::Instruction -DenseDotProductFunction::compile_self(Stash &) const +DenseDotProductFunction::compile_self(const TensorEngine &, Stash &) const { auto op = my_select(lhs().result_type().cell_type(), rhs().result_type().cell_type()); return eval::InterpretedFunction::Instruction(op); diff --git a/eval/src/vespa/eval/tensor/dense/dense_dot_product_function.h b/eval/src/vespa/eval/tensor/dense/dense_dot_product_function.h index 1d8f749689b..1ee6baff2a5 100644 --- a/eval/src/vespa/eval/tensor/dense/dense_dot_product_function.h +++ b/eval/src/vespa/eval/tensor/dense/dense_dot_product_function.h @@ -16,7 +16,7 @@ private: public: DenseDotProductFunction(const eval::TensorFunction &lhs_in, const eval::TensorFunction &rhs_in); - eval::InterpretedFunction::Instruction compile_self(Stash &stash) const override; + eval::InterpretedFunction::Instruction compile_self(const eval::TensorEngine &engine, Stash &stash) const override; bool result_is_mutable() const override { return true; } static bool compatible_types(const ValueType &res, const ValueType &lhs, const ValueType &rhs); static const eval::TensorFunction &optimize(const eval::TensorFunction &expr, Stash &stash); diff --git a/eval/src/vespa/eval/tensor/dense/dense_inplace_join_function.cpp b/eval/src/vespa/eval/tensor/dense/dense_inplace_join_function.cpp index 990215cbbe8..2107c7661f2 100644 --- a/eval/src/vespa/eval/tensor/dense/dense_inplace_join_function.cpp +++ b/eval/src/vespa/eval/tensor/dense/dense_inplace_join_function.cpp @@ -10,6 +10,7 @@ namespace vespalib::tensor { using eval::Value; using eval::ValueType; using eval::TensorFunction; +using eval::TensorEngine; using eval::as; using namespace eval::tensor_function; @@ -74,7 +75,7 @@ DenseInplaceJoinFunction::~DenseInplaceJoinFunction() } eval::InterpretedFunction::Instruction -DenseInplaceJoinFunction::compile_self(Stash &) const +DenseInplaceJoinFunction::compile_self(const TensorEngine &, Stash &) const { auto op = my_select(lhs().result_type().cell_type(), rhs().result_type().cell_type(), _write_left); diff --git a/eval/src/vespa/eval/tensor/dense/dense_inplace_join_function.h b/eval/src/vespa/eval/tensor/dense/dense_inplace_join_function.h index 83acee6d31f..acd1a2d716b 100644 --- a/eval/src/vespa/eval/tensor/dense/dense_inplace_join_function.h +++ b/eval/src/vespa/eval/tensor/dense/dense_inplace_join_function.h @@ -25,7 +25,7 @@ public: ~DenseInplaceJoinFunction(); bool write_left() const { return _write_left; } bool result_is_mutable() const override { return true; } - eval::InterpretedFunction::Instruction compile_self(Stash &stash) const override; + eval::InterpretedFunction::Instruction compile_self(const eval::TensorEngine &engine, Stash &stash) const override; void visit_self(vespalib::ObjectVisitor &visitor) const override; static const eval::TensorFunction &optimize(const eval::TensorFunction &expr, Stash &stash); }; diff --git a/eval/src/vespa/eval/tensor/dense/dense_inplace_map_function.cpp b/eval/src/vespa/eval/tensor/dense/dense_inplace_map_function.cpp index e086eab0b13..62434073f8e 100644 --- a/eval/src/vespa/eval/tensor/dense/dense_inplace_map_function.cpp +++ b/eval/src/vespa/eval/tensor/dense/dense_inplace_map_function.cpp @@ -9,6 +9,7 @@ namespace vespalib::tensor { using eval::Value; using eval::ValueType; using eval::TensorFunction; +using eval::TensorEngine; using eval::as; using namespace eval::tensor_function; @@ -42,7 +43,7 @@ DenseInplaceMapFunction::~DenseInplaceMapFunction() } eval::InterpretedFunction::Instruction -DenseInplaceMapFunction::compile_self(Stash &) const +DenseInplaceMapFunction::compile_self(const TensorEngine &, Stash &) const { auto op = select_1<MyInplaceMapOp>(result_type().cell_type()); return eval::InterpretedFunction::Instruction(op, (uint64_t)function()); diff --git a/eval/src/vespa/eval/tensor/dense/dense_inplace_map_function.h b/eval/src/vespa/eval/tensor/dense/dense_inplace_map_function.h index acc4504176a..52122f4d8dc 100644 --- a/eval/src/vespa/eval/tensor/dense/dense_inplace_map_function.h +++ b/eval/src/vespa/eval/tensor/dense/dense_inplace_map_function.h @@ -18,7 +18,7 @@ public: map_fun_t function_in); ~DenseInplaceMapFunction(); bool result_is_mutable() const override { return true; } - eval::InterpretedFunction::Instruction compile_self(Stash &stash) const override; + eval::InterpretedFunction::Instruction compile_self(const eval::TensorEngine &engine, Stash &stash) const override; static const eval::TensorFunction &optimize(const eval::TensorFunction &expr, Stash &stash); }; diff --git a/eval/src/vespa/eval/tensor/dense/dense_lambda_peek_optimizer.cpp b/eval/src/vespa/eval/tensor/dense/dense_lambda_peek_optimizer.cpp new file mode 100644 index 00000000000..14954a77834 --- /dev/null +++ b/eval/src/vespa/eval/tensor/dense/dense_lambda_peek_optimizer.cpp @@ -0,0 +1,13 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "dense_lambda_peek_optimizer.h" + +namespace vespalib::tensor { + +const eval::TensorFunction & +DenseLambdaPeekOptimizer::optimize(const eval::TensorFunction &expr, Stash &) +{ + return expr; +} + +} diff --git a/eval/src/vespa/eval/tensor/dense/dense_lambda_peek_optimizer.h b/eval/src/vespa/eval/tensor/dense/dense_lambda_peek_optimizer.h new file mode 100644 index 00000000000..7d2a0efde76 --- /dev/null +++ b/eval/src/vespa/eval/tensor/dense/dense_lambda_peek_optimizer.h @@ -0,0 +1,18 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/eval/eval/tensor_function.h> + +namespace vespalib::tensor { + +/** + * Tensor lambda optimizer for creating a new dense tensor based on + * peeking cells of a single existing tensor. This can represent a + * wide area of operations (reshape, gather, slice). + **/ +struct DenseLambdaPeekOptimizer { + static const eval::TensorFunction &optimize(const eval::TensorFunction &expr, Stash &stash); +}; + +} diff --git a/eval/src/vespa/eval/tensor/dense/dense_matmul_function.cpp b/eval/src/vespa/eval/tensor/dense/dense_matmul_function.cpp index 1feecc31e51..695e0fddd08 100644 --- a/eval/src/vespa/eval/tensor/dense/dense_matmul_function.cpp +++ b/eval/src/vespa/eval/tensor/dense/dense_matmul_function.cpp @@ -13,6 +13,7 @@ namespace vespalib::tensor { using eval::ValueType; using eval::TensorFunction; +using eval::TensorEngine; using eval::as; using eval::Aggr; using namespace eval::tensor_function; @@ -194,7 +195,7 @@ DenseMatMulFunction::DenseMatMulFunction(const eval::ValueType &result_type, DenseMatMulFunction::~DenseMatMulFunction() = default; eval::InterpretedFunction::Instruction -DenseMatMulFunction::compile_self(Stash &stash) const +DenseMatMulFunction::compile_self(const TensorEngine &, Stash &stash) const { Self &self = stash.create<Self>(result_type(), _lhs_size, _common_size, _rhs_size); auto op = my_select(lhs().result_type().cell_type(), rhs().result_type().cell_type(), diff --git a/eval/src/vespa/eval/tensor/dense/dense_matmul_function.h b/eval/src/vespa/eval/tensor/dense/dense_matmul_function.h index f0b6d8b6c19..88d7b9f37e0 100644 --- a/eval/src/vespa/eval/tensor/dense/dense_matmul_function.h +++ b/eval/src/vespa/eval/tensor/dense/dense_matmul_function.h @@ -50,7 +50,7 @@ public: bool lhs_common_inner() const { return _lhs_common_inner; } bool rhs_common_inner() const { return _rhs_common_inner; } - eval::InterpretedFunction::Instruction compile_self(Stash &stash) const override; + eval::InterpretedFunction::Instruction compile_self(const eval::TensorEngine &engine, Stash &stash) const override; void visit_self(vespalib::ObjectVisitor &visitor) const override; static const eval::TensorFunction &optimize(const eval::TensorFunction &expr, Stash &stash); }; diff --git a/eval/src/vespa/eval/tensor/dense/dense_replace_type_function.cpp b/eval/src/vespa/eval/tensor/dense/dense_replace_type_function.cpp index b81b0f2c876..e2dc2241555 100644 --- a/eval/src/vespa/eval/tensor/dense/dense_replace_type_function.cpp +++ b/eval/src/vespa/eval/tensor/dense/dense_replace_type_function.cpp @@ -9,6 +9,7 @@ namespace vespalib::tensor { using eval::Value; using eval::ValueType; using eval::TensorFunction; +using eval::TensorEngine; using eval::as; using namespace eval::tensor_function; @@ -38,7 +39,7 @@ DenseReplaceTypeFunction::~DenseReplaceTypeFunction() } eval::InterpretedFunction::Instruction -DenseReplaceTypeFunction::compile_self(Stash &) const +DenseReplaceTypeFunction::compile_self(const TensorEngine &, Stash &) const { return eval::InterpretedFunction::Instruction(my_replace_type_op, (uint64_t)&(result_type())); } diff --git a/eval/src/vespa/eval/tensor/dense/dense_replace_type_function.h b/eval/src/vespa/eval/tensor/dense/dense_replace_type_function.h index 4ad1f4c1cee..22c3886022d 100644 --- a/eval/src/vespa/eval/tensor/dense/dense_replace_type_function.h +++ b/eval/src/vespa/eval/tensor/dense/dense_replace_type_function.h @@ -16,7 +16,7 @@ public: DenseReplaceTypeFunction(const eval::ValueType &result_type, const eval::TensorFunction &child); ~DenseReplaceTypeFunction(); - eval::InterpretedFunction::Instruction compile_self(Stash &stash) const override; + eval::InterpretedFunction::Instruction compile_self(const eval::TensorEngine &engine, Stash &stash) const override; bool result_is_mutable() const override { return child().result_is_mutable(); } static const DenseReplaceTypeFunction &create_compact(const eval::ValueType &result_type, const eval::TensorFunction &child, diff --git a/eval/src/vespa/eval/tensor/dense/dense_tensor_create_function.cpp b/eval/src/vespa/eval/tensor/dense/dense_tensor_create_function.cpp index b072f334f13..3533ab20175 100644 --- a/eval/src/vespa/eval/tensor/dense/dense_tensor_create_function.cpp +++ b/eval/src/vespa/eval/tensor/dense/dense_tensor_create_function.cpp @@ -12,6 +12,7 @@ using eval::DoubleValue; using eval::ValueType; using eval::TensorSpec; using eval::TensorFunction; +using eval::TensorEngine; using Child = eval::TensorFunction::Child; using eval::as; using namespace eval::tensor_function; @@ -68,7 +69,7 @@ DenseTensorCreateFunction::push_children(std::vector<Child::CREF> &target) const } eval::InterpretedFunction::Instruction -DenseTensorCreateFunction::compile_self(Stash &) const +DenseTensorCreateFunction::compile_self(const TensorEngine &, Stash &) const { static_assert(sizeof(uint64_t) == sizeof(&_self)); auto op = select_1<MyTensorCreateOp>(result_type().cell_type()); diff --git a/eval/src/vespa/eval/tensor/dense/dense_tensor_create_function.h b/eval/src/vespa/eval/tensor/dense/dense_tensor_create_function.h index 6d262f48aa6..d471658fba0 100644 --- a/eval/src/vespa/eval/tensor/dense/dense_tensor_create_function.h +++ b/eval/src/vespa/eval/tensor/dense/dense_tensor_create_function.h @@ -25,7 +25,7 @@ public: ~DenseTensorCreateFunction(); const eval::ValueType &result_type() const override { return _self.result_type; } void push_children(std::vector<Child::CREF> &children) const override; - eval::InterpretedFunction::Instruction compile_self(Stash &stash) const override; + eval::InterpretedFunction::Instruction compile_self(const eval::TensorEngine &engine, Stash &stash) const override; bool result_is_mutable() const override { return true; } static const eval::TensorFunction &optimize(const eval::TensorFunction &expr, Stash &stash); }; diff --git a/eval/src/vespa/eval/tensor/dense/dense_tensor_peek_function.cpp b/eval/src/vespa/eval/tensor/dense/dense_tensor_peek_function.cpp index 07a2f08c423..6dc081cba1c 100644 --- a/eval/src/vespa/eval/tensor/dense/dense_tensor_peek_function.cpp +++ b/eval/src/vespa/eval/tensor/dense/dense_tensor_peek_function.cpp @@ -12,6 +12,7 @@ using eval::DoubleValue; using eval::ValueType; using eval::TensorSpec; using eval::TensorFunction; +using eval::TensorEngine; using Child = eval::TensorFunction::Child; using eval::as; using namespace eval::tensor_function; @@ -67,7 +68,7 @@ DenseTensorPeekFunction::push_children(std::vector<Child::CREF> &target) const } eval::InterpretedFunction::Instruction -DenseTensorPeekFunction::compile_self(Stash &) const +DenseTensorPeekFunction::compile_self(const TensorEngine &, Stash &) const { static_assert(sizeof(uint64_t) == sizeof(&_spec)); auto op = select_1<MyTensorPeekOp>(_children[0].get().result_type().cell_type()); diff --git a/eval/src/vespa/eval/tensor/dense/dense_tensor_peek_function.h b/eval/src/vespa/eval/tensor/dense/dense_tensor_peek_function.h index 150bab5211f..8ed672f95b0 100644 --- a/eval/src/vespa/eval/tensor/dense/dense_tensor_peek_function.h +++ b/eval/src/vespa/eval/tensor/dense/dense_tensor_peek_function.h @@ -26,7 +26,7 @@ public: ~DenseTensorPeekFunction(); const eval::ValueType &result_type() const override { return eval::DoubleValue::double_type(); } void push_children(std::vector<Child::CREF> &children) const override; - eval::InterpretedFunction::Instruction compile_self(Stash &stash) const override; + eval::InterpretedFunction::Instruction compile_self(const eval::TensorEngine &engine, Stash &stash) const override; bool result_is_mutable() const override { return true; } static const eval::TensorFunction &optimize(const eval::TensorFunction &expr, Stash &stash); }; diff --git a/eval/src/vespa/eval/tensor/dense/dense_xw_product_function.cpp b/eval/src/vespa/eval/tensor/dense/dense_xw_product_function.cpp index fb126b03d19..a0d63a1ce1e 100644 --- a/eval/src/vespa/eval/tensor/dense/dense_xw_product_function.cpp +++ b/eval/src/vespa/eval/tensor/dense/dense_xw_product_function.cpp @@ -13,6 +13,7 @@ namespace vespalib::tensor { using eval::ValueType; using eval::TensorFunction; +using eval::TensorEngine; using eval::as; using eval::Aggr; using namespace eval::tensor_function; @@ -156,7 +157,7 @@ DenseXWProductFunction::DenseXWProductFunction(const eval::ValueType &result_typ } eval::InterpretedFunction::Instruction -DenseXWProductFunction::compile_self(Stash &stash) const +DenseXWProductFunction::compile_self(const TensorEngine &, Stash &stash) const { Self &self = stash.create<Self>(result_type(), _vector_size, _result_size); auto op = my_select(lhs().result_type().cell_type(), diff --git a/eval/src/vespa/eval/tensor/dense/dense_xw_product_function.h b/eval/src/vespa/eval/tensor/dense/dense_xw_product_function.h index d7c39fa45a2..9f05222fff6 100644 --- a/eval/src/vespa/eval/tensor/dense/dense_xw_product_function.h +++ b/eval/src/vespa/eval/tensor/dense/dense_xw_product_function.h @@ -44,7 +44,7 @@ public: size_t result_size() const { return _result_size; } bool common_inner() const { return _common_inner; } - eval::InterpretedFunction::Instruction compile_self(Stash &stash) const override; + eval::InterpretedFunction::Instruction compile_self(const eval::TensorEngine &engine, Stash &stash) const override; void visit_self(vespalib::ObjectVisitor &visitor) const override; static const eval::TensorFunction &optimize(const eval::TensorFunction &expr, Stash &stash); }; diff --git a/eval/src/vespa/eval/tensor/dense/vector_from_doubles_function.cpp b/eval/src/vespa/eval/tensor/dense/vector_from_doubles_function.cpp index 4ef678a1de9..7a4b5917f00 100644 --- a/eval/src/vespa/eval/tensor/dense/vector_from_doubles_function.cpp +++ b/eval/src/vespa/eval/tensor/dense/vector_from_doubles_function.cpp @@ -9,6 +9,7 @@ namespace vespalib::tensor { using eval::Value; using eval::ValueType; using eval::TensorFunction; +using eval::TensorEngine; using Child = eval::TensorFunction::Child; using eval::as; using namespace eval::tensor_function; @@ -90,7 +91,7 @@ VectorFromDoublesFunction::push_children(std::vector<Child::CREF> &target) const } eval::InterpretedFunction::Instruction -VectorFromDoublesFunction::compile_self(Stash &) const +VectorFromDoublesFunction::compile_self(const TensorEngine &, Stash &) const { return eval::InterpretedFunction::Instruction(my_vector_from_doubles_op, (uint64_t)&_self); } diff --git a/eval/src/vespa/eval/tensor/dense/vector_from_doubles_function.h b/eval/src/vespa/eval/tensor/dense/vector_from_doubles_function.h index 378c9026f84..28346c4cb3b 100644 --- a/eval/src/vespa/eval/tensor/dense/vector_from_doubles_function.h +++ b/eval/src/vespa/eval/tensor/dense/vector_from_doubles_function.h @@ -30,7 +30,7 @@ public: return _self.resultType.dimensions()[0].name; } size_t size() const { return _self.resultSize; } - eval::InterpretedFunction::Instruction compile_self(Stash &stash) const override; + eval::InterpretedFunction::Instruction compile_self(const eval::TensorEngine &engine, Stash &stash) const override; bool result_is_mutable() const override { return true; } static const eval::TensorFunction &optimize(const eval::TensorFunction &expr, Stash &stash); }; diff --git a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java index f392b597e3b..7c947a539b6 100644 --- a/flags/src/main/java/com/yahoo/vespa/flags/Flags.java +++ b/flags/src/main/java/com/yahoo/vespa/flags/Flags.java @@ -210,12 +210,6 @@ public class Flags { "Whether the endpoint certificate maintainer should backfill missing certificate data from cameo", "Takes effect on next scheduled run of maintainer - set to \"disable\", \"dryrun\" or \"enable\""); - public static final UnboundBooleanFlag USE_NEW_ATHENZ_FILTER = defineFeatureFlag( - "use-new-athenz-filter", false, - "Use new Athenz filter that supports access-tokens", - "Takes effect at redeployment", - APPLICATION_ID); - public static final UnboundStringFlag DOCKER_IMAGE_REPO = defineStringFlag( "docker-image-repo", "", "Override default docker image repo. Docker image version will be Vespa version.", diff --git a/messagebus/src/vespa/messagebus/messenger.cpp b/messagebus/src/vespa/messagebus/messenger.cpp index 36211d8ec38..5313c4adcbb 100644 --- a/messagebus/src/vespa/messagebus/messenger.cpp +++ b/messagebus/src/vespa/messagebus/messenger.cpp @@ -246,13 +246,13 @@ Messenger::start() void Messenger::deliverMessage(Message::UP msg, IMessageHandler &handler) { - handler.handleMessage(std::move(msg)); + enqueue(std::make_unique<MessageTask>(std::move(msg), handler)); } void Messenger::deliverReply(Reply::UP reply, IReplyHandler &handler) { - handler.handleReply(std::move(reply)); + enqueue(std::make_unique<ReplyTask>(std::move(reply), handler)); } void diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp index faa67b9bece..de3be2ffa01 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.cpp @@ -17,8 +17,6 @@ #include <vespa/fnet/scheduler.h> #include <vespa/fnet/transport.h> #include <vespa/fnet/frt/supervisor.h> -#include <vespa/vespalib/util/singleexecutor.h> -#include <vespa/vespalib/util/lambdatask.h> #include <vespa/fastos/thread.h> #include <thread> @@ -80,17 +78,6 @@ struct TargetPoolTask : public FNET_Task { } }; -std::unique_ptr<vespalib::SyncableThreadExecutor> -createExecutor(RPCNetworkParams::OptimizeFor optimizeFor) { - switch (optimizeFor) { - case RPCNetworkParams::OptimizeFor::LATENCY: - return std::make_unique<vespalib::ThreadStackExecutor>(1, 0x10000); - case RPCNetworkParams::OptimizeFor::THROUGHPUT: - default: - return std::make_unique<vespalib::SingleExecutor>(100); - } -} - } RPCNetwork::SendContext::SendContext(RPCNetwork &net, const Message &msg, @@ -120,16 +107,8 @@ RPCNetwork::SendContext::handleVersion(const vespalib::Version *version) } } if (shouldSend) { - if (_net.allowDispatchForEncode()) { - auto rejected = _net.getEncodeExecutor(true).execute(vespalib::makeLambdaTask([this]() { - _net.send(*this); - delete this; - })); - assert (!rejected); - } else { - _net.send(*this); - delete this; - } + _net.send(*this); + delete this; } } @@ -137,7 +116,7 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : _owner(nullptr), _ident(params.getIdentity()), _threadPool(std::make_unique<FastOS_ThreadPool>(128000, 0)), - _transport(std::make_unique<FNET_Transport>(params.getNumThreads())), + _transport(std::make_unique<FNET_Transport>()), _orb(std::make_unique<FRT_Supervisor>(_transport.get())), _scheduler(*_transport->GetScheduler()), _slobrokCfgFactory(std::make_unique<slobrok::ConfiguratorFactory>(params.getSlobrokConfig())), @@ -147,8 +126,7 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : _targetPool(std::make_unique<RPCTargetPool>(params.getConnectionExpireSecs())), _targetPoolTask(std::make_unique<TargetPoolTask>(_scheduler, *_targetPool)), _servicePool(std::make_unique<RPCServicePool>(*_mirror, 4096)), - _singleEncodeExecutor(createExecutor(params.getOptimizeFor())), - _singleDecodeExecutor(createExecutor(params.getOptimizeFor())), + _executor(std::make_unique<vespalib::ThreadStackExecutor>(params.getNumThreads(), 65536)), _sendV1(std::make_unique<RPCSendV1>()), _sendV2(std::make_unique<RPCSendV2>()), _sendAdapters(), @@ -158,6 +136,7 @@ RPCNetwork::RPCNetwork(const RPCNetworkParams ¶ms) : { _transport->SetMaxInputBufferSize(params.getMaxInputBufferSize()); _transport->SetMaxOutputBufferSize(params.getMaxOutputBufferSize()); + _transport->SetTCPNoDelay(params.getTcpNoDelay()); } RPCNetwork::~RPCNetwork() @@ -427,8 +406,7 @@ void RPCNetwork::sync() { SyncTask task(_scheduler); - _singleEncodeExecutor->sync(); - _singleDecodeExecutor->sync(); + _executor->sync(); task.await(); } @@ -437,10 +415,8 @@ RPCNetwork::shutdown() { _transport->ShutDown(true); _threadPool->Close(); - _singleEncodeExecutor->shutdown(); - _singleDecodeExecutor->shutdown(); - _singleEncodeExecutor->sync(); - _singleDecodeExecutor->sync(); + _executor->shutdown(); + _executor->sync(); } void diff --git a/messagebus/src/vespa/messagebus/network/rpcnetwork.h b/messagebus/src/vespa/messagebus/network/rpcnetwork.h index a510aae9014..a8eb514387c 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetwork.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetwork.h @@ -65,8 +65,7 @@ private: std::unique_ptr<RPCTargetPool> _targetPool; std::unique_ptr<FNET_Task> _targetPoolTask; std::unique_ptr<RPCServicePool> _servicePool; - std::unique_ptr<vespalib::SyncableThreadExecutor> _singleEncodeExecutor; - std::unique_ptr<vespalib::SyncableThreadExecutor> _singleDecodeExecutor; + std::unique_ptr<vespalib::ThreadStackExecutor> _executor; std::unique_ptr<RPCSendAdapter> _sendV1; std::unique_ptr<RPCSendAdapter> _sendV2; SendAdapterMap _sendAdapters; @@ -74,6 +73,7 @@ private: bool _allowDispatchForEncode; bool _allowDispatchForDecode; + /** * Resolves and assigns a service address for the given recipient using the * given address. This is called by the {@link @@ -224,8 +224,7 @@ public: const slobrok::api::IMirrorAPI &getMirror() const override; CompressionConfig getCompressionConfig() { return _compressionConfig; } void invoke(FRT_RPCRequest *req); - vespalib::Executor & getEncodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleEncodeExecutor : *_singleEncodeExecutor; } - vespalib::Executor & getDecodeExecutor(bool requireSequencing) const { return requireSequencing ? *_singleDecodeExecutor : *_singleDecodeExecutor; } + vespalib::Executor & getExecutor() const { return *_executor; } bool allowDispatchForEncode() const { return _allowDispatchForEncode; } bool allowDispatchForDecode() const { return _allowDispatchForDecode; } diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp index 482a46b2564..5bf277a8ee6 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.cpp @@ -14,8 +14,8 @@ RPCNetworkParams::RPCNetworkParams(config::ConfigUri configUri) : _listenPort(0), _maxInputBufferSize(256*1024), _maxOutputBufferSize(256*1024), - _numThreads(1), - _optimizeFor(OptimizeFor::LATENCY), + _numThreads(4), + _tcpNoDelay(true), _dispatchOnEncode(true), _dispatchOnDecode(false), _connectionExpireSecs(600), diff --git a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h index a4b752f46d4..140f81c611c 100644 --- a/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h +++ b/messagebus/src/vespa/messagebus/network/rpcnetworkparams.h @@ -12,10 +12,21 @@ namespace mbus { * held by this class. This class has reasonable default values for each parameter. */ class RPCNetworkParams { -public: - enum class OptimizeFor { LATENCY, THROUGHPUT}; +private: using CompressionConfig = vespalib::compression::CompressionConfig; + Identity _identity; + config::ConfigUri _slobrokConfig; + int _listenPort; + uint32_t _maxInputBufferSize; + uint32_t _maxOutputBufferSize; + uint32_t _numThreads; + bool _tcpNoDelay; + bool _dispatchOnEncode; + bool _dispatchOnDecode; + double _connectionExpireSecs; + CompressionConfig _compressionConfig; +public: RPCNetworkParams(); RPCNetworkParams(config::ConfigUri configUri); ~RPCNetworkParams(); @@ -96,12 +107,12 @@ public: uint32_t getNumThreads() const { return _numThreads; } - RPCNetworkParams &setOptimizeFor(OptimizeFor tcpNoDelay) { - _optimizeFor = tcpNoDelay; + RPCNetworkParams &setTcpNoDelay(bool tcpNoDelay) { + _tcpNoDelay = tcpNoDelay; return *this; } - OptimizeFor getOptimizeFor() const { return _optimizeFor; } + bool getTcpNoDelay() const { return _tcpNoDelay; } /** * Returns the number of seconds before an idle network connection expires. @@ -187,18 +198,6 @@ public: } uint32_t getDispatchOnEncode() const { return _dispatchOnEncode; } -private: - Identity _identity; - config::ConfigUri _slobrokConfig; - int _listenPort; - uint32_t _maxInputBufferSize; - uint32_t _maxOutputBufferSize; - uint32_t _numThreads; - OptimizeFor _optimizeFor; - bool _dispatchOnEncode; - bool _dispatchOnDecode; - double _connectionExpireSecs; - CompressionConfig _compressionConfig; }; } diff --git a/messagebus/src/vespa/messagebus/network/rpcsend.cpp b/messagebus/src/vespa/messagebus/network/rpcsend.cpp index d217c7964d6..2422638dc05 100644 --- a/messagebus/src/vespa/messagebus/network/rpcsend.cpp +++ b/messagebus/src/vespa/messagebus/network/rpcsend.cpp @@ -148,14 +148,7 @@ RPCSend::send(RoutingNode &recipient, const vespalib::Version &version, void RPCSend::RequestDone(FRT_RPCRequest *req) { - if ( _net->allowDispatchForDecode()) { - auto rejected = _net->getDecodeExecutor(true).execute(makeLambdaTask([this, req]() { - doRequestDone(req); - })); - assert (!rejected); - } else { - doRequestDone(req); - } + doRequestDone(req); } void @@ -228,13 +221,13 @@ void RPCSend::handleReply(Reply::UP reply) { const IProtocol * protocol = _net->getOwner().getProtocol(reply->getProtocol()); - if (protocol && _net->allowDispatchForEncode()) { - auto rejected = _net->getEncodeExecutor(protocol->requireSequencing()).execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable { + if (!protocol || protocol->requireSequencing() || !_net->allowDispatchForEncode()) { + doHandleReply(protocol, std::move(reply)); + } else { + auto rejected = _net->getExecutor().execute(makeLambdaTask([this, protocol, reply = std::move(reply)]() mutable { doHandleReply(protocol, std::move(reply)); })); assert (!rejected); - } else { - doHandleReply(protocol, std::move(reply)); } } @@ -273,13 +266,13 @@ RPCSend::invoke(FRT_RPCRequest *req) vespalib::string(params->getProtocol()).c_str(), _serverIdent.c_str()))); return; } - if (_net->allowDispatchForDecode()) { - auto rejected = _net->getDecodeExecutor(protocol->requireSequencing()).execute(makeLambdaTask([this, req, protocol, params = std::move(params)]() mutable { + if (protocol->requireSequencing() || !_net->allowDispatchForDecode()) { + doRequest(req, protocol, std::move(params)); + } else { + auto rejected = _net->getExecutor().execute(makeLambdaTask([this, req, protocol, params = std::move(params)]() mutable { doRequest(req, protocol, std::move(params)); })); assert (!rejected); - } else { - doRequest(req, protocol, std::move(params)); } } diff --git a/metrics/CMakeLists.txt b/metrics/CMakeLists.txt index 6cf1eadd6f7..6f854fed7c6 100644 --- a/metrics/CMakeLists.txt +++ b/metrics/CMakeLists.txt @@ -9,6 +9,7 @@ vespa_define_module( LIBS src/vespa/metrics + src/vespa/metrics/common TESTS src/tests diff --git a/metrics/src/vespa/metrics/CMakeLists.txt b/metrics/src/vespa/metrics/CMakeLists.txt index 96156dc84b0..0d7eeba3601 100644 --- a/metrics/src/vespa/metrics/CMakeLists.txt +++ b/metrics/src/vespa/metrics/CMakeLists.txt @@ -20,6 +20,7 @@ vespa_add_library(metrics valuemetric.cpp valuemetricvalues.cpp xmlwriter.cpp + $<TARGET_OBJECTS:metrics_common> INSTALL lib64 DEPENDS diff --git a/metrics/src/vespa/metrics/common/CMakeLists.txt b/metrics/src/vespa/metrics/common/CMakeLists.txt new file mode 100644 index 00000000000..50183655dad --- /dev/null +++ b/metrics/src/vespa/metrics/common/CMakeLists.txt @@ -0,0 +1,6 @@ +# Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(metrics_common OBJECT + SOURCES + memory_usage_metrics.cpp + DEPENDS +) diff --git a/metrics/src/vespa/metrics/common/memory_usage_metrics.cpp b/metrics/src/vespa/metrics/common/memory_usage_metrics.cpp new file mode 100644 index 00000000000..0c38e567749 --- /dev/null +++ b/metrics/src/vespa/metrics/common/memory_usage_metrics.cpp @@ -0,0 +1,28 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "memory_usage_metrics.h" +#include <vespa/vespalib/util/memoryusage.h> + +namespace metrics { + +MemoryUsageMetrics::MemoryUsageMetrics(metrics::MetricSet* parent) + : MetricSet("memory_usage", {}, "The memory usage for a given component", parent), + _allocated_bytes("allocated_bytes", {}, "The number of allocated bytes", this), + _used_bytes("used_bytes", {}, "The number of used bytes (<= allocated_bytes)", this), + _dead_bytes("dead_bytes", {}, "The number of dead bytes (<= used_bytes)", this), + _on_hold_bytes("onhold_bytes", {}, "The number of bytes on hold", this) +{ +} + +MemoryUsageMetrics::~MemoryUsageMetrics() = default; + +void +MemoryUsageMetrics::update(const vespalib::MemoryUsage& usage) +{ + _allocated_bytes.set(usage.allocatedBytes()); + _used_bytes.set(usage.usedBytes()); + _dead_bytes.set(usage.deadBytes()); + _on_hold_bytes.set(usage.allocatedBytesOnHold()); +} + +} diff --git a/metrics/src/vespa/metrics/common/memory_usage_metrics.h b/metrics/src/vespa/metrics/common/memory_usage_metrics.h new file mode 100644 index 00000000000..7030db8c163 --- /dev/null +++ b/metrics/src/vespa/metrics/common/memory_usage_metrics.h @@ -0,0 +1,26 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/metrics/metrics.h> + +namespace vespalib { class MemoryUsage; } + +namespace metrics { + +/** + * Metric set for memory usage metrics. + */ +class MemoryUsageMetrics : public metrics::MetricSet { + metrics::LongValueMetric _allocated_bytes; + metrics::LongValueMetric _used_bytes; + metrics::LongValueMetric _dead_bytes; + metrics::LongValueMetric _on_hold_bytes; + +public: + explicit MemoryUsageMetrics(metrics::MetricSet* parent); + ~MemoryUsageMetrics() override; + void update(const vespalib::MemoryUsage& usage); +}; + +} // namespace metrics diff --git a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/yum/Yum.java b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/yum/Yum.java index 5f7fbdd0d69..4fde82f6fd6 100644 --- a/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/yum/Yum.java +++ b/node-admin/src/main/java/com/yahoo/vespa/hosted/node/admin/task/util/yum/Yum.java @@ -101,9 +101,12 @@ public class Yum { boolean modified = false; if (!alreadyLocked) { - terminal.newCommandLine(context) - .add("yum", "versionlock", "add", targetVersionLockName) - .execute(); + CommandLine commandLine = terminal.newCommandLine(context).add("yum", "versionlock", "add"); + // If the targetVersionLockName refers to a package in a by-default-disabled repo, + // we must enable the repo unless targetVersionLockName is already installed. + // The other versionlock commands (list, delete) does not require --enablerepo. + for (String repo : repos) commandLine.add("--enablerepo=" + repo); + commandLine.add(targetVersionLockName).execute(); modified = true; } @@ -121,7 +124,6 @@ public class Yum { var installCommand = terminal.newCommandLine(context).add("yum", "install"); for (String repo : repos) installCommand.add("--enablerepo=" + repo); installCommand.add("--assumeyes", yumPackage.toName()); - String output = installCommand.executeSilently().getUntrimmedOutput(); if (NOTHING_TO_DO_PATTERN.matcher(output).find()) { diff --git a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/yum/YumTest.java b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/yum/YumTest.java index c7e2885a907..f4034b38495 100644 --- a/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/yum/YumTest.java +++ b/node-admin/src/test/java/com/yahoo/vespa/hosted/node/admin/task/util/yum/YumTest.java @@ -180,15 +180,18 @@ public class YumTest { terminal.expectCommand("yum versionlock delete \"0:package-1-0.1-8.el7.*\" 2>&1"); - terminal.expectCommand("yum versionlock add \"0:package-1-0.10-654.el7.*\" 2>&1"); + terminal.expectCommand("yum versionlock add --enablerepo=somerepo \"0:package-1-0.10-654.el7.*\" 2>&1"); terminal.expectCommand( - "yum install --assumeyes 0:package-1-0.10-654.el7 2>&1", + "yum install --enablerepo=somerepo --assumeyes 0:package-1-0.10-654.el7 2>&1", 0, "Nothing to do\n"); - assertTrue(yum.installFixedVersion(taskContext, YumPackageName.fromString("0:package-1-0.10-654.el7"))); + assertTrue(yum.installFixedVersion( + taskContext, + YumPackageName.fromString("0:package-1-0.10-654.el7"), + "somerepo")); } @Test diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/NodeList.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/NodeList.java index bdae658f76e..93e5b160524 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/NodeList.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/NodeList.java @@ -50,6 +50,11 @@ public class NodeList implements Iterable<Node> { return filter(node -> node.allocation().get().membership().retired()); } + /** Returns the subset of nodes which are removable */ + public NodeList removable() { + return filter(node -> node.allocation().get().isRemovable()); + } + /** Returns the subset of nodes having exactly the given resources */ public NodeList resources(NodeResources resources) { return filter(node -> node.flavor().resources().equals(resources)); } diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/ResourceIterator.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/ResourceIterator.java index bc14ca1779c..b7d5995884e 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/ResourceIterator.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/autoscale/ResourceIterator.java @@ -134,7 +134,12 @@ public class ResourceIterator { } } - return allocation.realResources().withVcpu(cpu).withMemoryGb(memory).withDiskGb(disk); + // Combine the scaled resource values computed here + // and the currently combined values of non-scaled resources + return new NodeResources(cpu, memory, disk, + cluster.minResources().nodeResources().bandwidthGbps(), + cluster.minResources().nodeResources().diskSpeed(), + cluster.minResources().nodeResources().storageType()); } private double clusterUsage(Resource resource, double load) { diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeAllocation.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeAllocation.java index c92f7889496..d96282f1722 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeAllocation.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeAllocation.java @@ -100,7 +100,6 @@ class NodeAllocation { List<Node> accepted = new ArrayList<>(); for (PrioritizableNode node : nodesPrioritized) { Node offered = node.node; - if (offered.allocation().isPresent()) { ClusterMembership membership = offered.allocation().get().membership(); if ( ! offered.allocation().get().owner().equals(application)) continue; // wrong application diff --git a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeRepositoryProvisioner.java b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeRepositoryProvisioner.java index d03aa0cac91..37d0e9bbfb8 100644 --- a/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeRepositoryProvisioner.java +++ b/node-repository/src/main/java/com/yahoo/vespa/hosted/provision/provisioning/NodeRepositoryProvisioner.java @@ -161,12 +161,22 @@ public class NodeRepositoryProvisioner implements Provisioner { ClusterSpec.Id clusterId, Capacity requested) { List<Node> nodes = NodeList.copyOf(nodeRepository.getNodes(applicationId, Node.State.active)) - .cluster(clusterId).not().retired().asList(); - if (nodes.size() < 1) return Optional.empty(); + .cluster(clusterId) + .not().retired() + .not().removable() + .asList(); + if (nodes.isEmpty()) return Optional.empty(); long groups = nodes.stream().map(node -> node.allocation().get().membership().cluster().group()).distinct().count(); - var resources = new ClusterResources(nodes.size(), (int)groups, nodes.get(0).flavor().resources()); - if ( ! resources.isWithin(requested.minResources(), requested.maxResources())) return Optional.empty(); - return Optional.of(resources); + + // To allow non-numeric settings to be updated without resetting to the min target, we need to use + // the non-numeric settings of the current min limit with the current numeric settings + NodeResources nodeResources = nodes.get(0).allocation().get().requestedResources() + .with(requested.minResources().nodeResources().diskSpeed()) + .with(requested.maxResources().nodeResources().storageType()); + var currentResources = new ClusterResources(nodes.size(), (int)groups, nodeResources); + if ( ! currentResources.isWithin(requested.minResources(), requested.maxResources())) return Optional.empty(); + + return Optional.of(currentResources); } private void logIfDownscaled(int targetNodes, int actualNodes, ClusterSpec cluster, ProvisionLogger logger) { diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/AutoscalingTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/AutoscalingTest.java index 8bfb17c0bd4..497a2a31ce5 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/AutoscalingTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/AutoscalingTest.java @@ -31,16 +31,18 @@ public class AutoscalingTest { @Test public void testAutoscalingSingleContentGroup() { - NodeResources resources = new NodeResources(3, 100, 100, 1); - ClusterResources min = new ClusterResources( 2, 1, new NodeResources(1, 1, 1, 1)); - ClusterResources max = new ClusterResources(20, 1, new NodeResources(100, 1000, 1000, 1)); - AutoscalingTester tester = new AutoscalingTester(resources); + NodeResources hostResources = new NodeResources(3, 100, 100, 1); + ClusterResources min = new ClusterResources( 2, 1, + new NodeResources(1, 1, 1, 1, NodeResources.DiskSpeed.any)); + ClusterResources max = new ClusterResources(20, 1, + new NodeResources(100, 1000, 1000, 1, NodeResources.DiskSpeed.any)); + AutoscalingTester tester = new AutoscalingTester(hostResources); ApplicationId application1 = tester.applicationId("application1"); ClusterSpec cluster1 = tester.clusterSpec(ClusterSpec.Type.content, "cluster1"); // deploy - tester.deploy(application1, cluster1, 5, 1, resources); + tester.deploy(application1, cluster1, 5, 1, hostResources); assertTrue("No measurements -> No change", tester.autoscale(application1, cluster1.id(), min, max).isEmpty()); @@ -69,6 +71,35 @@ public class AutoscalingTest { tester.autoscale(application1, cluster1.id(), min, max)); } + @Test + public void testAutoscalingHandlesDiskSettingChanges() { + NodeResources hostResources = new NodeResources(3, 100, 100, 1, NodeResources.DiskSpeed.slow); + AutoscalingTester tester = new AutoscalingTester(hostResources); + + ApplicationId application1 = tester.applicationId("application1"); + ClusterSpec cluster1 = tester.clusterSpec(ClusterSpec.Type.content, "cluster1"); + + // deploy with slow + tester.deploy(application1, cluster1, 5, 1, hostResources); + tester.nodeRepository().getNodes(application1).stream() + .allMatch(n -> n.allocation().get().requestedResources().diskSpeed() == NodeResources.DiskSpeed.slow); + + tester.addMeasurements(Resource.cpu, 0.25f, 1f, 120, application1); + // Changing min and max from slow to any + ClusterResources min = new ClusterResources( 2, 1, + new NodeResources(1, 1, 1, 1, NodeResources.DiskSpeed.any)); + ClusterResources max = new ClusterResources(20, 1, + new NodeResources(100, 1000, 1000, 1, NodeResources.DiskSpeed.any)); + AllocatableClusterResources scaledResources = tester.assertResources("Scaling up since resource usage is too high", + 15, 1, 1.3, 28.6, 28.6, + tester.autoscale(application1, cluster1.id(), min, max)); + assertEquals("Disk speed from min/max is used", + NodeResources.DiskSpeed.any, scaledResources.realResources().diskSpeed()); + tester.deploy(application1, cluster1, scaledResources); + tester.nodeRepository().getNodes(application1).stream() + .allMatch(n -> n.allocation().get().requestedResources().diskSpeed() == NodeResources.DiskSpeed.any); + } + /** We prefer fewer nodes for container clusters as (we assume) they all use the same disk and memory */ @Test public void testAutoscalingSingleContainerGroup() { diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/AutoscalingTester.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/AutoscalingTester.java index ebc4d158ded..aed262b6c96 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/AutoscalingTester.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/autoscale/AutoscalingTester.java @@ -206,6 +206,8 @@ class AutoscalingTester { flavor.minMainMemoryAvailableGb(resources.memoryGb()); flavor.minDiskAvailableGb(resources.diskGb()); flavor.bandwidth(resources.bandwidthGbps() * 1000); + flavor.fastDisk(resources.diskSpeed().compatibleWith(NodeResources.DiskSpeed.fast)); + flavor.remoteStorage(resources.storageType().compatibleWith(NodeResources.StorageType.remote)); return flavor; } diff --git a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/ProvisioningTest.java b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/ProvisioningTest.java index bd8be5063fd..8fc73d68785 100644 --- a/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/ProvisioningTest.java +++ b/node-repository/src/test/java/com/yahoo/vespa/hosted/provision/provisioning/ProvisioningTest.java @@ -354,6 +354,17 @@ public class ProvisioningTest { assertTrue(state.allHosts.stream().allMatch(host -> host.requestedResources().get().diskSpeed() == NodeResources.DiskSpeed.fast)); assertTrue(tester.nodeRepository().getNodes(application).stream().allMatch(node -> node.allocation().get().requestedResources().diskSpeed() == NodeResources.DiskSpeed.fast)); } + + { + // Go back to any + SystemState state = prepare(application, 0, 0, 5, 3, + defaultResources.justNumbers(), + tester); + assertEquals(8, state.allHosts.size()); + tester.activate(application, state.allHosts); + assertTrue(state.allHosts.stream().allMatch(host -> host.requestedResources().get().diskSpeed() == NodeResources.DiskSpeed.any)); + assertTrue(tester.nodeRepository().getNodes(application).stream().allMatch(node -> node.allocation().get().requestedResources().diskSpeed() == NodeResources.DiskSpeed.any)); + } } @Test diff --git a/searchcore/src/vespa/searchcore/proton/metrics/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/metrics/CMakeLists.txt index 6077a6ddd87..8a1dd7ea101 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/metrics/CMakeLists.txt @@ -12,7 +12,6 @@ vespa_add_library(searchcore_proton_metrics STATIC job_tracker.cpp job_tracked_flush_target.cpp job_tracked_flush_task.cpp - memory_usage_metrics.cpp metrics_engine.cpp resource_usage_metrics.cpp sessionmanager_metrics.cpp diff --git a/searchcore/src/vespa/searchcore/proton/metrics/memory_usage_metrics.cpp b/searchcore/src/vespa/searchcore/proton/metrics/memory_usage_metrics.cpp deleted file mode 100644 index 1f687d14969..00000000000 --- a/searchcore/src/vespa/searchcore/proton/metrics/memory_usage_metrics.cpp +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "memory_usage_metrics.h" -#include <vespa/vespalib/util/memoryusage.h> - -namespace proton { - -MemoryUsageMetrics::MemoryUsageMetrics(metrics::MetricSet *parent) - : metrics::MetricSet("memory_usage", {}, "The memory usage for a given component", parent), - _allocatedBytes("allocated_bytes", {}, "The number of allocated bytes", this), - _usedBytes("used_bytes", {}, "The number of used bytes (<= allocatedbytes)", this), - _deadBytes("dead_bytes", {}, "The number of dead bytes (<= usedbytes)", this), - _onHoldBytes("onhold_bytes", {}, "The number of bytes on hold", this) -{ -} - -MemoryUsageMetrics::~MemoryUsageMetrics() {} - -void -MemoryUsageMetrics::update(const vespalib::MemoryUsage &usage) -{ - _allocatedBytes.set(usage.allocatedBytes()); - _usedBytes.set(usage.usedBytes()); - _deadBytes.set(usage.deadBytes()); - _onHoldBytes.set(usage.allocatedBytesOnHold()); -} - -} diff --git a/searchcore/src/vespa/searchcore/proton/metrics/memory_usage_metrics.h b/searchcore/src/vespa/searchcore/proton/metrics/memory_usage_metrics.h index 89177c3a359..e82e55848a5 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/memory_usage_metrics.h +++ b/searchcore/src/vespa/searchcore/proton/metrics/memory_usage_metrics.h @@ -2,27 +2,12 @@ #pragma once -#include <vespa/metrics/metrics.h> +#include <vespa/metrics/common/memory_usage_metrics.h> namespace vespalib { class MemoryUsage; } namespace proton { -/** - * Metric set for memory usage metrics. - */ -class MemoryUsageMetrics : public metrics::MetricSet -{ -private: - metrics::LongValueMetric _allocatedBytes; - metrics::LongValueMetric _usedBytes; - metrics::LongValueMetric _deadBytes; - metrics::LongValueMetric _onHoldBytes; - -public: - MemoryUsageMetrics(metrics::MetricSet *parent); - ~MemoryUsageMetrics(); - void update(const vespalib::MemoryUsage &usage); -}; +using MemoryUsageMetrics = metrics::MemoryUsageMetrics; } // namespace proton diff --git a/searchlib/src/apps/vespa-ranking-expression-analyzer/vespa-ranking-expression-analyzer.cpp b/searchlib/src/apps/vespa-ranking-expression-analyzer/vespa-ranking-expression-analyzer.cpp index 30177dbe693..65b27dd411a 100644 --- a/searchlib/src/apps/vespa-ranking-expression-analyzer/vespa-ranking-expression-analyzer.cpp +++ b/searchlib/src/apps/vespa-ranking-expression-analyzer/vespa-ranking-expression-analyzer.cpp @@ -153,7 +153,7 @@ struct FunctionInfo { size_t get_path_len(const TreeList &trees) const { size_t path = 0; for (const Node *tree: trees) { - InterpretedFunction ifun(DefaultTensorEngine::ref(), *tree, params.size(), NodeTypes()); + InterpretedFunction ifun(DefaultTensorEngine::ref(), *tree, NodeTypes()); InterpretedFunction::Context ctx(ifun); SimpleParams fun_params(params); ifun.eval(ctx, fun_params); diff --git a/storage/src/tests/distributor/bucketdbmetricupdatertest.cpp b/storage/src/tests/distributor/bucketdbmetricupdatertest.cpp index 1008d3ee4f2..e0c3cf161bb 100644 --- a/storage/src/tests/distributor/bucketdbmetricupdatertest.cpp +++ b/storage/src/tests/distributor/bucketdbmetricupdatertest.cpp @@ -4,6 +4,7 @@ #include <vespa/storage/distributor/distributormetricsset.h> #include <vespa/storage/distributor/idealstatemetricsset.h> #include <vespa/storage/config/config-stor-distributormanager.h> +#include <vespa/vespalib/util/memoryusage.h> #include <vespa/vespalib/gtest/gtest.h> #include <string> #include <sstream> @@ -100,6 +101,40 @@ TEST_F(BucketDBMetricUpdaterTest, doc_and_byte_counts_are_updated) { EXPECT_EQ(34, dms.bytesStored.getLast()); } +TEST_F(BucketDBMetricUpdaterTest, bucket_db_memory_usage_metrics_are_updated) { + BucketDBMetricUpdater metric_updater; + IdealStateMetricSet ims; + DistributorMetricSet dms(_loadTypes); + + vespalib::MemoryUsage mem_usage; + mem_usage.incAllocatedBytes(1000); + mem_usage.incDeadBytes(700); + metric_updater.update_db_memory_usage(mem_usage, true); + + mem_usage.incAllocatedBytes(500); + mem_usage.incDeadBytes(100); + metric_updater.update_db_memory_usage(mem_usage, false); + + metric_updater.completeRound(false); + metric_updater.getLastCompleteStats().propagateMetrics(ims, dms); + + auto* m = dms.mutable_dbs.memory_usage.getMetric("allocated_bytes"); + ASSERT_TRUE(m != nullptr); + EXPECT_EQ(m->getLongValue("last"), 1000); + + m = dms.mutable_dbs.memory_usage.getMetric("dead_bytes"); + ASSERT_TRUE(m != nullptr); + EXPECT_EQ(m->getLongValue("last"), 700); + + m = dms.read_only_dbs.memory_usage.getMetric("allocated_bytes"); + ASSERT_TRUE(m != nullptr); + EXPECT_EQ(m->getLongValue("last"), 1500); + + m = dms.read_only_dbs.memory_usage.getMetric("dead_bytes"); + ASSERT_TRUE(m != nullptr); + EXPECT_EQ(m->getLongValue("last"), 800); +} + TEST_F(BucketDBMetricUpdaterTest, buckets_with_too_few_and_too_many_copies) { BucketDBMetricUpdater metricUpdater; IdealStateMetricSet ims; diff --git a/storage/src/tests/storageserver/communicationmanagertest.cpp b/storage/src/tests/storageserver/communicationmanagertest.cpp index 8444319b395..6657a9f1600 100644 --- a/storage/src/tests/storageserver/communicationmanagertest.cpp +++ b/storage/src/tests/storageserver/communicationmanagertest.cpp @@ -158,7 +158,6 @@ TEST_F(CommunicationManagerTest, commands_are_dequeued_in_fifo_order) { storConfig.getConfigId()); DummyStorageLink *storageLink = new DummyStorageLink(); storage.push_back(std::unique_ptr<StorageLink>(storageLink)); - storage.open(); // Message dequeing does not start before we invoke `open` on the storage // link chain, so we enqueue messages in randomized priority order before @@ -169,6 +168,7 @@ TEST_F(CommunicationManagerTest, commands_are_dequeued_in_fifo_order) { for (auto pri : pris) { storage.enqueue(createDummyCommand(pri)); } + storage.open(); storageLink->waitForMessages(pris.size(), MESSAGE_WAIT_TIME_SEC); for (size_t i = 0; i < pris.size(); ++i) { @@ -191,12 +191,12 @@ TEST_F(CommunicationManagerTest, replies_are_dequeued_in_fifo_order) { storConfig.getConfigId()); DummyStorageLink *storageLink = new DummyStorageLink(); storage.push_back(std::unique_ptr<StorageLink>(storageLink)); - storage.open(); std::vector<api::StorageMessage::Priority> pris{200, 0, 255, 128}; for (auto pri : pris) { storage.enqueue(createDummyCommand(pri)->makeReply()); } + storage.open(); storageLink->waitForMessages(pris.size(), MESSAGE_WAIT_TIME_SEC); // Want FIFO order for replies, not priority-sorted order. diff --git a/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp b/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp index 66d44a655e0..85f5c8c5be9 100644 --- a/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp +++ b/storage/src/vespa/storage/bucketdb/btree_bucket_database.cpp @@ -526,6 +526,12 @@ void BTreeBucketDatabase::print(std::ostream& out, bool verbose, (void)indent; } +vespalib::MemoryUsage BTreeBucketDatabase::memory_usage() const noexcept { + auto mem_usage = _tree.getMemoryUsage(); + mem_usage.merge(_store.getMemoryUsage()); + return mem_usage; +} + BTreeBucketDatabase::ReadGuardImpl::ReadGuardImpl(const BTreeBucketDatabase& db) : _db(&db), _guard(_db->_generation_handler.takeGuard()), diff --git a/storage/src/vespa/storage/bucketdb/btree_bucket_database.h b/storage/src/vespa/storage/bucketdb/btree_bucket_database.h index 1f2b25814a8..8898b0c395a 100644 --- a/storage/src/vespa/storage/bucketdb/btree_bucket_database.h +++ b/storage/src/vespa/storage/bucketdb/btree_bucket_database.h @@ -91,6 +91,8 @@ public: std::unique_ptr<ReadGuard> acquire_read_guard() const override { return std::make_unique<ReadGuardImpl>(*this); } + + vespalib::MemoryUsage memory_usage() const noexcept override; }; } diff --git a/storage/src/vespa/storage/bucketdb/bucketdatabase.h b/storage/src/vespa/storage/bucketdb/bucketdatabase.h index 46aaaa997d9..2dbcdd194ef 100644 --- a/storage/src/vespa/storage/bucketdb/bucketdatabase.h +++ b/storage/src/vespa/storage/bucketdb/bucketdatabase.h @@ -7,6 +7,7 @@ #include <vespa/vespalib/util/printable.h> #include <vespa/storage/bucketdb/bucketinfo.h> #include <vespa/document/bucket/bucketid.h> +#include <vespa/vespalib/util/memoryusage.h> namespace storage { @@ -250,6 +251,8 @@ public: virtual std::unique_ptr<ReadGuard> acquire_read_guard() const { return std::unique_ptr<ReadGuard>(); } + + [[nodiscard]] virtual vespalib::MemoryUsage memory_usage() const noexcept = 0; }; template <typename BucketInfoType> diff --git a/storage/src/vespa/storage/bucketdb/mapbucketdatabase.cpp b/storage/src/vespa/storage/bucketdb/mapbucketdatabase.cpp index 463e4a4b8ce..edb808da294 100644 --- a/storage/src/vespa/storage/bucketdb/mapbucketdatabase.cpp +++ b/storage/src/vespa/storage/bucketdb/mapbucketdatabase.cpp @@ -594,4 +594,30 @@ void MapBucketDatabase::ReadGuardImpl::find_parents_and_self(const document::Buc _db->getParents(bucket, entries); } +namespace { + +template <typename T> +size_t allocated_by_vec(const T& vec) noexcept { + return (vec.capacity() * sizeof(typename T::value_type)); +} + +template <typename T> +size_t used_by_vec(const T& vec) noexcept { + return (vec.size() * sizeof(typename T::value_type)); +} + +} + +vespalib::MemoryUsage MapBucketDatabase::memory_usage() const noexcept { + // We don't have a concept of hold lists here, nor do we know the exact size of the + // entries on our free list (these wrap a secondary replica vector allocation). + // So we fudge the numbers a bit, returning a lower bound approximation only. + // That's OK since this is a legacy database that's on the way out anyway. + vespalib::MemoryUsage mem_usage; + mem_usage.incAllocatedBytes(allocated_by_vec(_values) + allocated_by_vec(_db)); + mem_usage.incUsedBytes(used_by_vec(_values) + used_by_vec(_db)); + mem_usage.incDeadBytes(allocated_by_vec(_free) + allocated_by_vec(_freeValues)); + return mem_usage; +} + } // storage diff --git a/storage/src/vespa/storage/bucketdb/mapbucketdatabase.h b/storage/src/vespa/storage/bucketdb/mapbucketdatabase.h index 9fe5e2d7740..e41b797a321 100644 --- a/storage/src/vespa/storage/bucketdb/mapbucketdatabase.h +++ b/storage/src/vespa/storage/bucketdb/mapbucketdatabase.h @@ -30,6 +30,7 @@ public: void print(std::ostream& out, bool verbose, const std::string& indent) const override; std::unique_ptr<ReadGuard> acquire_read_guard() const override; + vespalib::MemoryUsage memory_usage() const noexcept override; private: struct E { E() : value(-1), e_0(-1), e_1(-1) {}; diff --git a/storage/src/vespa/storage/common/storagelink.cpp b/storage/src/vespa/storage/common/storagelink.cpp index a593cc913a8..431c90b27f2 100644 --- a/storage/src/vespa/storage/common/storagelink.cpp +++ b/storage/src/vespa/storage/common/storagelink.cpp @@ -123,9 +123,9 @@ void StorageLink::sendDown(const StorageMessage::SP& msg) default: LOG(error, "Link %s trying to send %s down while in state %s", toString().c_str(), msg->toString().c_str(), stateToString(getState())); - return; + assert(false); } - assert(msg); + assert(msg.get()); LOG(spam, "Storage Link %s to handle %s", toString().c_str(), msg->toString().c_str()); if (isBottom()) { LOG(spam, "Storage link %s at bottom of chain got message %s.", toString().c_str(), msg->toString().c_str()); @@ -165,9 +165,9 @@ void StorageLink::sendUp(const shared_ptr<StorageMessage> & msg) default: LOG(error, "Link %s trying to send %s up while in state %s", toString().c_str(), msg->toString(true).c_str(), stateToString(getState())); - return; + assert(false); } - assert(msg); + assert(msg.get()); if (isTop()) { ostringstream ost; ost << "Unhandled message at top of chain " << *msg << "."; diff --git a/storage/src/vespa/storage/config/stor-communicationmanager.def b/storage/src/vespa/storage/config/stor-communicationmanager.def index 4536ea97855..8f5b22aa7fa 100644 --- a/storage/src/vespa/storage/config/stor-communicationmanager.def +++ b/storage/src/vespa/storage/config/stor-communicationmanager.def @@ -29,9 +29,9 @@ mbus.compress.type enum {NONE, LZ4, ZSTD} default=LZ4 ## TTL for rpc target cache mbus.rpctargetcache.ttl double default = 600 -## Number of threads for network. +## Number of threads for mbus threadpool ## Any value below 1 will be 1. -mbus.num_threads int default=1 +mbus.num_threads int default=4 mbus.optimize_for enum {LATENCY, THROUGHPUT} default = LATENCY @@ -42,4 +42,4 @@ mbus.dispatch_on_encode bool default=true ## Enable to use above thread pool for decoding replies ## False will use network(fnet) thread ## Todo: Change default once verified in large scale deployment. -mbus.dispatch_on_decode bool default=true +mbus.dispatch_on_decode bool default=false diff --git a/storage/src/vespa/storage/distributor/bucketdb/bucketdbmetricupdater.cpp b/storage/src/vespa/storage/distributor/bucketdb/bucketdbmetricupdater.cpp index c211e775326..51eda0f948b 100644 --- a/storage/src/vespa/storage/distributor/bucketdb/bucketdbmetricupdater.cpp +++ b/storage/src/vespa/storage/distributor/bucketdb/bucketdbmetricupdater.cpp @@ -132,6 +132,8 @@ BucketDBMetricUpdater::Stats::propagateMetrics( { distributorMetrics.docsStored.set(_docCount); distributorMetrics.bytesStored.set(_byteCount); + distributorMetrics.mutable_dbs.memory_usage.update(_mutable_db_mem_usage); + distributorMetrics.read_only_dbs.memory_usage.update(_read_only_db_mem_usage); idealStateMetrics.buckets_toofewcopies.set(_tooFewCopies); idealStateMetrics.buckets_toomanycopies.set(_tooManyCopies); @@ -145,4 +147,10 @@ BucketDBMetricUpdater::reset() resetStats(); } +void BucketDBMetricUpdater::update_db_memory_usage(const vespalib::MemoryUsage& mem_usage, bool is_mutable_db) { + auto& target = (is_mutable_db ? _workingStats._mutable_db_mem_usage + : _workingStats._read_only_db_mem_usage); + target.merge(mem_usage); +} + } // storage::distributor diff --git a/storage/src/vespa/storage/distributor/bucketdb/bucketdbmetricupdater.h b/storage/src/vespa/storage/distributor/bucketdb/bucketdbmetricupdater.h index 7ef8479866f..766307f49c2 100644 --- a/storage/src/vespa/storage/distributor/bucketdb/bucketdbmetricupdater.h +++ b/storage/src/vespa/storage/distributor/bucketdb/bucketdbmetricupdater.h @@ -4,7 +4,7 @@ #include <vespa/storage/bucketdb/bucketdatabase.h> #include <vespa/storage/config/config-stor-distributormanager.h> - +#include <vespa/vespalib/util/memoryusage.h> #include <unordered_map> namespace storage::distributor { @@ -22,10 +22,12 @@ public: uint64_t _tooManyCopies; uint64_t _noTrusted; uint64_t _totalBuckets; + vespalib::MemoryUsage _mutable_db_mem_usage; + vespalib::MemoryUsage _read_only_db_mem_usage; Stats(); Stats(const Stats &rhs); - ~Stats() { } + ~Stats() = default; Stats &operator=(const Stats &rhs) = default; @@ -63,7 +65,6 @@ private: public: BucketDBMetricUpdater(); - ~BucketDBMetricUpdater(); void setMinimumReplicaCountingMode(ReplicaCountingMode mode) noexcept { @@ -98,6 +99,8 @@ public: return _lastCompleteStats; } + void update_db_memory_usage(const vespalib::MemoryUsage& mem_usage, bool is_mutable_db); + private: void updateMinReplicationStats(const BucketDatabase::Entry& entry, uint32_t trustedCopies); diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index 988af43a7be..3cca2847671 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -13,6 +13,7 @@ #include <vespa/storage/common/global_bucket_space_distribution_converter.h> #include <vespa/storageframework/generic/status/xmlstatusreporter.h> #include <vespa/document/bucket/fixed_bucket_spaces.h> +#include <vespa/vespalib/util/memoryusage.h> #include <vespa/log/log.h> LOG_SETUP(".distributor-main"); @@ -768,6 +769,16 @@ Distributor::updateInternalMetricsForCompletedScan() _must_send_updated_host_info = true; } _bucketSpacesStats = std::move(new_space_stats); + update_bucket_db_memory_usage_stats(); +} + +void Distributor::update_bucket_db_memory_usage_stats() { + for (auto& space : *_bucketSpaceRepo) { + _bucketDBMetricUpdater.update_db_memory_usage(space.second->getBucketDatabase().memory_usage(), true); + } + for (auto& space : *_readOnlyBucketSpaceRepo) { + _bucketDBMetricUpdater.update_db_memory_usage(space.second->getBucketDatabase().memory_usage(), false); + } } void diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index ac6e306a4fb..36e34592cba 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -226,6 +226,7 @@ private: * Takes metric lock. */ void updateInternalMetricsForCompletedScan(); + void update_bucket_db_memory_usage_stats(); void scanAllBuckets(); MaintenanceScanner::ScanResult scanNextBucket(); void enableNextConfig(); diff --git a/storage/src/vespa/storage/distributor/distributormetricsset.cpp b/storage/src/vespa/storage/distributor/distributormetricsset.cpp index 8266aeb29cd..70ab5229311 100644 --- a/storage/src/vespa/storage/distributor/distributormetricsset.cpp +++ b/storage/src/vespa/storage/distributor/distributormetricsset.cpp @@ -2,11 +2,19 @@ #include "distributormetricsset.h" #include <vespa/metrics/loadmetric.hpp> #include <vespa/metrics/summetric.hpp> +#include <vespa/vespalib/util/memoryusage.h> namespace storage::distributor { using metrics::MetricSet; +BucketDbMetrics::BucketDbMetrics(const vespalib::string& db_type, metrics::MetricSet* owner) + : metrics::MetricSet("bucket_db", {{"bucket_db_type", db_type}}, "", owner), + memory_usage(this) +{} + +BucketDbMetrics::~BucketDbMetrics() = default; + DistributorMetricSet::DistributorMetricSet(const metrics::LoadTypeSet& lt) : MetricSet("distributor", {{"distributor"}}, ""), puts(lt, PersistenceOperationMetricSet("puts"), this), @@ -41,7 +49,9 @@ DistributorMetricSet::DistributorMetricSet(const metrics::LoadTypeSet& lt) bytesStored("bytesstored", {{"logdefault"},{"yamasdefault"}}, "Number of bytes stored in all buckets controlled by " - "this distributor", this) + "this distributor", this), + mutable_dbs("mutable", this), + read_only_dbs("read_only", this) { docsStored.logOnlyIfSet(); bytesStored.logOnlyIfSet(); diff --git a/storage/src/vespa/storage/distributor/distributormetricsset.h b/storage/src/vespa/storage/distributor/distributormetricsset.h index d9c0711fd14..ce4025d8311 100644 --- a/storage/src/vespa/storage/distributor/distributormetricsset.h +++ b/storage/src/vespa/storage/distributor/distributormetricsset.h @@ -5,12 +5,21 @@ #include "update_metric_set.h" #include "visitormetricsset.h" #include <vespa/metrics/metrics.h> +#include <vespa/metrics/common/memory_usage_metrics.h> #include <vespa/documentapi/loadtypes/loadtypeset.h> +namespace vespalib { class MemoryUsage; } + namespace storage::distributor { -class DistributorMetricSet : public metrics::MetricSet -{ +struct BucketDbMetrics : metrics::MetricSet { + BucketDbMetrics(const vespalib::string& db_type, metrics::MetricSet* owner); + ~BucketDbMetrics() override; + + metrics::MemoryUsageMetrics memory_usage; +}; + +class DistributorMetricSet : public metrics::MetricSet { public: metrics::LoadMetric<PersistenceOperationMetricSet> puts; metrics::LoadMetric<UpdateMetricSet> updates; @@ -29,6 +38,8 @@ public: metrics::DoubleAverageMetric recoveryModeTime; metrics::LongValueMetric docsStored; metrics::LongValueMetric bytesStored; + BucketDbMetrics mutable_dbs; + BucketDbMetrics read_only_dbs; explicit DistributorMetricSet(const metrics::LoadTypeSet& lt); ~DistributorMetricSet() override; diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index aff2b0f624f..fa2b0cda018 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -14,16 +14,17 @@ #include <vespa/storageapi/message/state.h> #include <vespa/storageframework/generic/clock/timer.h> #include <vespa/vespalib/stllike/asciistream.h> -#include <vespa/vespalib/util/stringfmt.h> -#include <vespa/document/bucket/fixed_bucket_spaces.h> #include <vespa/vespalib/stllike/hash_map.hpp> +#include <vespa/vespalib/util/stringfmt.h> #include <vespa/log/bufferedlogger.h> +#include <vespa/document/bucket/fixed_bucket_spaces.h> +#include <vespa/documentapi/messagebus/messages/getdocumentreply.h> + LOG_SETUP(".communication.manager"); using vespalib::make_string; using document::FixedBucketSpaces; -using CommunicationManagerConfig = vespa::config::content::core::StorCommunicationmanagerConfig; namespace storage { @@ -280,17 +281,6 @@ struct PlaceHolderBucketResolver : public BucketResolver { } }; -mbus::RPCNetworkParams::OptimizeFor -convert(CommunicationManagerConfig::Mbus::OptimizeFor optimizeFor) { - switch (optimizeFor) { - case CommunicationManagerConfig::Mbus::OptimizeFor::LATENCY: - return mbus::RPCNetworkParams::OptimizeFor::LATENCY; - case CommunicationManagerConfig::Mbus::OptimizeFor::THROUGHPUT: - default: - return mbus::RPCNetworkParams::OptimizeFor::THROUGHPUT; - } -} - } CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, const config::ConfigUri & configUri) @@ -300,6 +290,7 @@ CommunicationManager::CommunicationManager(StorageComponentRegister& compReg, co _listener(), _eventQueue(), _mbus(), + _count(0), _configUri(configUri), _closed(false), _docApiConverter(configUri, std::make_shared<PlaceHolderBucketResolver>()) @@ -424,7 +415,7 @@ void CommunicationManager::configure(std::unique_ptr<CommunicationManagerConfig> params.setNumThreads(std::max(1, config->mbus.numThreads)); params.setDispatchOnDecode(config->mbus.dispatchOnDecode); params.setDispatchOnEncode(config->mbus.dispatchOnEncode); - params.setOptimizeFor(convert(config->mbus.optimizeFor)); + params.setTcpNoDelay(config->mbus.optimizeFor == CommunicationManagerConfig::Mbus::OptimizeFor::LATENCY); params.setIdentity(mbus::Identity(_component.getIdentity())); if (config->mbusport != -1) { @@ -489,8 +480,8 @@ void CommunicationManager::enqueue(std::shared_ptr<api::StorageMessage> msg) { assert(msg); - LOG(spam, "Process storage message %s, priority %d", msg->toString().c_str(), msg->getPriority()); - process(msg); + LOG(spam, "Enq storage message %s, priority %d", msg->toString().c_str(), msg->getPriority()); + _eventQueue.enqueue(std::move(msg)); } bool diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h index 8983dbdf057..c08ad214768 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.h +++ b/storage/src/vespa/storage/storageserver/communicationmanager.h @@ -116,7 +116,7 @@ private: void process(const std::shared_ptr<api::StorageMessage>& msg); - using CommunicationManagerConfig = vespa::config::content::core::StorCommunicationmanagerConfig; + using CommunicationManagerConfig= vespa::config::content::core::StorCommunicationmanagerConfig; using BucketspacesConfig = vespa::config::content::core::BucketspacesConfig; void configureMessageBusLimits(const CommunicationManagerConfig& cfg); @@ -133,6 +133,7 @@ private: std::unique_ptr<mbus::RPCMessageBus> _mbus; std::unique_ptr<mbus::DestinationSession> _messageBusSession; std::unique_ptr<mbus::SourceSession> _sourceSession; + uint32_t _count; vespalib::Lock _messageBusSentLock; std::map<api::StorageMessage::Id, std::shared_ptr<api::StorageCommand> > _messageBusSent; |