diff options
15 files changed, 41 insertions, 28 deletions
diff --git a/searchcore/src/tests/proton/attribute/exclusive_attribute_read_accessor/exclusive_attribute_read_accessor_test.cpp b/searchcore/src/tests/proton/attribute/exclusive_attribute_read_accessor/exclusive_attribute_read_accessor_test.cpp index 6f91b1a6f6f..19af6a699c4 100644 --- a/searchcore/src/tests/proton/attribute/exclusive_attribute_read_accessor/exclusive_attribute_read_accessor_test.cpp +++ b/searchcore/src/tests/proton/attribute/exclusive_attribute_read_accessor/exclusive_attribute_read_accessor_test.cpp @@ -38,7 +38,7 @@ TEST_F("require that attribute write thread is blocked while guard is held", Fix { ReadGuard::UP guard = f.accessor.takeGuard(); Gate gate; - f.writer->execute(f.writer->getExecutorId(f.attribute->getNamePrefix()), [&gate]() { gate.countDown(); }); + f.writer->execute(f.writer->getExecutorIdFromName(f.attribute->getNamePrefix()), [&gate]() { gate.countDown(); }); bool reachedZero = gate.await(100); EXPECT_FALSE(reachedZero); EXPECT_EQUAL(1u, gate.getCount()); diff --git a/searchcore/src/tests/proton/docsummary/docsummary.cpp b/searchcore/src/tests/proton/docsummary/docsummary.cpp index b6d6d2437d8..7d27c3b21f4 100644 --- a/searchcore/src/tests/proton/docsummary/docsummary.cpp +++ b/searchcore/src/tests/proton/docsummary/docsummary.cpp @@ -791,7 +791,7 @@ Test::requireThatAttributesAreUsed() search::AttributeVector *bjAttr = attributeManager->getWritableAttribute("bj"); auto bjTensorAttr = dynamic_cast<search::tensor::TensorAttribute *>(bjAttr); - attributeFieldWriter.execute(attributeFieldWriter.getExecutorId(bjAttr->getNamePrefix()), + attributeFieldWriter.execute(attributeFieldWriter.getExecutorIdFromName(bjAttr->getNamePrefix()), [&]() { bjTensorAttr->setTensor(3, *make_tensor(TensorSpec("tensor(x{},y{})") .add({{"x", "a"}, {"y", "b"}}, 4))); diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp index 9b54ae816e0..a91b60d5166 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/attribute_writer.cpp @@ -281,7 +281,7 @@ public: FieldContext::FieldContext(ISequencedTaskExecutor &writer, AttributeVector *attr) : _name(attr->getName()), - _executorId(writer.getExecutorId(attr->getNamePrefix())), + _executorId(writer.getExecutorIdFromName(attr->getNamePrefix())), _attr(attr), _use_two_phase_put(use_two_phase_put_for_attribute(*attr)) { @@ -645,7 +645,7 @@ AttributeWriter::AttributeWriter(proton::IAttributeManager::SP mgr) void AttributeWriter::setupAttriuteMapping() { for (auto attr : getWritableAttributes()) { vespalib::stringref name = attr->getName(); - _attrMap[name] = AttrWithId(attr, _attributeFieldWriter.getExecutorId(attr->getNamePrefix())); + _attrMap[name] = AttrWithId(attr, _attributeFieldWriter.getExecutorIdFromName(attr->getNamePrefix())); } } diff --git a/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp b/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp index 319ae2dcad1..b5937438a9f 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/attributemanager.cpp @@ -63,7 +63,9 @@ std::shared_ptr<ShrinkLidSpaceFlushTarget> allocShrinker(const AttributeVector:: using Type = IFlushTarget::Type; using Component = IFlushTarget::Component; - auto shrinkwrap = std::make_shared<ThreadedCompactableLidSpace>(attr, attributeFieldWriter, attributeFieldWriter.getExecutorId(attr->getNamePrefix())); + auto shrinkwrap = std::make_shared<ThreadedCompactableLidSpace>(attr, attributeFieldWriter, + attributeFieldWriter.getExecutorIdFromName( + attr->getNamePrefix())); const vespalib::string &name = attr->getName(); auto dir = diskLayout.createAttributeDir(name); search::SerialNum shrinkSerialNum = estimateShrinkSerialNum(*attr); @@ -564,7 +566,7 @@ AttributeManager::asyncForEachAttribute(std::shared_ptr<IConstAttributeFunctor> continue; } AttributeVector::SP attrsp = attr.second.getAttribute(); - _attributeFieldWriter.execute(_attributeFieldWriter.getExecutorId(attrsp->getNamePrefix()), + _attributeFieldWriter.execute(_attributeFieldWriter.getExecutorIdFromName(attrsp->getNamePrefix()), [attrsp, func]() { (*func)(*attrsp); }); } } @@ -577,7 +579,7 @@ AttributeManager::asyncForAttribute(const vespalib::string &name, std::unique_pt } AttributeVector::SP attrsp = itr->second.getAttribute(); vespalib::string attrName = attrsp->getNamePrefix(); - _attributeFieldWriter.execute(_attributeFieldWriter.getExecutorId(attrName), + _attributeFieldWriter.execute(_attributeFieldWriter.getExecutorIdFromName(attrName), [attr=std::move(attrsp), func=std::move(func)]() { (*func)(*attr); }); } diff --git a/searchcore/src/vespa/searchcore/proton/attribute/exclusive_attribute_read_accessor.cpp b/searchcore/src/vespa/searchcore/proton/attribute/exclusive_attribute_read_accessor.cpp index 9543897407d..16f06bdde93 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/exclusive_attribute_read_accessor.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/exclusive_attribute_read_accessor.cpp @@ -51,7 +51,7 @@ ExclusiveAttributeReadAccessor::takeGuard() { GateSP entranceGate = std::make_shared<Gate>(); GateSP exitGate = std::make_shared<Gate>(); - _attributeFieldWriter.execute(_attributeFieldWriter.getExecutorId(_attribute->getNamePrefix()), + _attributeFieldWriter.execute(_attributeFieldWriter.getExecutorIdFromName(_attribute->getNamePrefix()), [this, entranceGate, exitGate]() { attributeWriteBlockingTask(_attribute, entranceGate, exitGate); }); entranceGate->await(); return std::make_unique<Guard>(*_attribute, exitGate); diff --git a/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.cpp b/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.cpp index 7c02ec66014..90d0c0cab3d 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/filter_attribute_manager.cpp @@ -196,7 +196,7 @@ FilterAttributeManager::asyncForEachAttribute(std::shared_ptr<IConstAttributeFun search::AttributeVector::SP attrsp = guard.getSP(); // Name must be extracted in document db master thread or attribute // writer thread - attributeFieldWriter.execute(attributeFieldWriter.getExecutorId(attrsp->getNamePrefix()), + attributeFieldWriter.execute(attributeFieldWriter.getExecutorIdFromName(attrsp->getNamePrefix()), [attrsp, func]() { (*func)(*attrsp); }); } } @@ -207,7 +207,7 @@ FilterAttributeManager::asyncForAttribute(const vespalib::string &name, std::uni if (!attr) { return; } vespalib::ISequencedTaskExecutor &attributeFieldWriter = getAttributeFieldWriter(); vespalib::string attrName = (*attr)->getNamePrefix(); - attributeFieldWriter.execute(attributeFieldWriter.getExecutorId(attrName), + attributeFieldWriter.execute(attributeFieldWriter.getExecutorIdFromName(attrName), [attr=std::move(attr), func=std::move(func)]() mutable { (*func)(**attr); }); diff --git a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp index 2b5f4b028dc..4edb64b861a 100644 --- a/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp +++ b/searchcore/src/vespa/searchcore/proton/attribute/flushableattribute.cpp @@ -230,7 +230,7 @@ FlushableAttribute::initFlush(SerialNum currentSerial) // Called by document db executor std::promise<IFlushTarget::Task::UP> promise; std::future<IFlushTarget::Task::UP> future = promise.get_future(); - _attributeFieldWriter.execute(_attributeFieldWriter.getExecutorId(_attr->getNamePrefix()), + _attributeFieldWriter.execute(_attributeFieldWriter.getExecutorIdFromName(_attr->getNamePrefix()), [&]() { promise.set_value(internalInitFlush(currentSerial)); }); return future.get(); } diff --git a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp index 7d5eabe7de8..57e284cc61b 100644 --- a/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp +++ b/searchcore/src/vespa/searchcore/proton/reference/gid_to_lid_change_listener.cpp @@ -11,7 +11,7 @@ GidToLidChangeListener::GidToLidChangeListener(vespalib::ISequencedTaskExecutor const vespalib::string &name, const vespalib::string &docTypeName) : _attributeFieldWriter(attributeFieldWriter), - _executorId(_attributeFieldWriter.getExecutorId(attr->getNamePrefix())), + _executorId(_attributeFieldWriter.getExecutorIdFromName(attr->getNamePrefix())), _attr(std::move(attr)), _refCount(refCount), _name(name), diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp index 10f3f6089e3..9c06ecd3c8f 100644 --- a/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp +++ b/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp @@ -63,6 +63,8 @@ public: } }; +vespalib::stringref ZERO("0"); + TEST_F("testExecute", Fixture) { std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); EXPECT_EQUAL(0, tv->_val); @@ -118,8 +120,8 @@ TEST_F("require that task with same string component id are serialized", Fixture std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); EXPECT_EQUAL(0, tv->_val); auto test2 = [&]() { tv->modify(14, 42); }; - f._threads.execute(f._threads.getExecutorId("0"), [&]() { usleep(2000); tv->modify(0, 14); }); - f._threads.execute(f._threads.getExecutorId("0"), test2); + f._threads.execute(f._threads.getExecutorIdFromName(ZERO), [&]() { usleep(2000); tv->modify(0, 14); }); + f._threads.execute(f._threads.getExecutorIdFromName(ZERO), test2); tv->wait(2); EXPECT_EQUAL(0, tv->_fail); EXPECT_EQUAL(42, tv->_val); @@ -136,8 +138,8 @@ int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int t for (tryCnt = 0; tryCnt < tryLimit; ++tryCnt) { std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); EXPECT_EQUAL(0, tv->_val); - f._threads.execute(f._threads.getExecutorId("0"), [&]() { usleep(2000); tv->modify(0, 14); }); - f._threads.execute(f._threads.getExecutorId(altComponentId), [&]() { tv->modify(14, 42); }); + f._threads.execute(f._threads.getExecutorIdFromName(ZERO), [&]() { usleep(2000); tv->modify(0, 14); }); + f._threads.execute(f._threads.getExecutorIdFromName(altComponentId), [&]() { tv->modify(14, 42); }); tv->wait(2); if (tv->_fail != 1) { continue; @@ -156,10 +158,10 @@ vespalib::string makeAltComponentId(Fixture &f) { int tryCnt = 0; char altComponentId[20]; - ISequencedTaskExecutor::ExecutorId executorId0 = f._threads.getExecutorId("0"); + ISequencedTaskExecutor::ExecutorId executorId0 = f._threads.getExecutorIdFromName(ZERO); for (tryCnt = 1; tryCnt < 100; ++tryCnt) { sprintf(altComponentId, "%d", tryCnt); - if (f._threads.getExecutorId(altComponentId) == executorId0) { + if (f._threads.getExecutorIdFromName(altComponentId) == executorId0) { break; } } diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp index 70d0f1c743d..6128386837d 100644 --- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp +++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp @@ -65,6 +65,8 @@ public: } }; +vespalib::stringref ZERO("0"); + TEST_F("testExecute", Fixture) { std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); EXPECT_EQUAL(0, tv->_val); @@ -120,8 +122,8 @@ TEST_F("require that task with same string component id are serialized", Fixture std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); EXPECT_EQUAL(0, tv->_val); auto test2 = [=]() { tv->modify(14, 42); }; - f._threads->execute(f._threads->getExecutorId("0"), [=]() { usleep(2000); tv->modify(0, 14); }); - f._threads->execute(f._threads->getExecutorId("0"), test2); + f._threads->execute(f._threads->getExecutorIdFromName(ZERO), [=]() { usleep(2000); tv->modify(0, 14); }); + f._threads->execute(f._threads->getExecutorIdFromName(ZERO), test2); tv->wait(2); EXPECT_EQUAL(0, tv->_fail); EXPECT_EQUAL(42, tv->_val); @@ -138,8 +140,8 @@ int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int t for (tryCnt = 0; tryCnt < tryLimit; ++tryCnt) { std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); EXPECT_EQUAL(0, tv->_val); - f._threads->execute(f._threads->getExecutorId("0"), [=]() { usleep(2000); tv->modify(0, 14); }); - f._threads->execute(f._threads->getExecutorId(altComponentId), [=]() { tv->modify(14, 42); }); + f._threads->execute(f._threads->getExecutorIdFromName(ZERO), [=]() { usleep(2000); tv->modify(0, 14); }); + f._threads->execute(f._threads->getExecutorIdFromName(altComponentId), [=]() { tv->modify(14, 42); }); tv->wait(2); if (tv->_fail != 1) { continue; @@ -158,10 +160,10 @@ vespalib::string makeAltComponentId(Fixture &f) { int tryCnt = 0; char altComponentId[20]; - ISequencedTaskExecutor::ExecutorId executorId0 = f._threads->getExecutorId("0"); + ISequencedTaskExecutor::ExecutorId executorId0 = f._threads->getExecutorIdFromName(ZERO); for (tryCnt = 1; tryCnt < 100; ++tryCnt) { sprintf(altComponentId, "%d", tryCnt); - if (f._threads->getExecutorId(altComponentId) == executorId0) { + if (f._threads->getExecutorIdFromName(altComponentId) == executorId0) { break; } } diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp index 50bc3b020a8..3e87749c794 100644 --- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp @@ -256,6 +256,11 @@ AdaptiveSequencedExecutor::~AdaptiveSequencedExecutor() assert(_worker_stack.empty()); } +ISequencedTaskExecutor::ExecutorId +AdaptiveSequencedExecutor::getExecutorId(uint64_t component) const { + return ExecutorId(component % _strands.size()); +} + void AdaptiveSequencedExecutor::executeTask(ExecutorId id, Task::UP task) { diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h index bc3457a72ef..a4d3ac97758 100644 --- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h +++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h @@ -117,6 +117,7 @@ public: AdaptiveSequencedExecutor(size_t num_strands, size_t num_threads, size_t max_waiting, size_t max_pending); ~AdaptiveSequencedExecutor() override; + ExecutorId getExecutorId(uint64_t component) const override; void executeTask(ExecutorId id, Task::UP task) override; void sync() override; void setTaskLimit(uint32_t task_limit) override; diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp index d05702cc85b..f8f1f64fac5 100644 --- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp @@ -23,7 +23,7 @@ ISequencedTaskExecutor::ISequencedTaskExecutor(uint32_t numExecutors) ISequencedTaskExecutor::~ISequencedTaskExecutor() = default; ISequencedTaskExecutor::ExecutorId -ISequencedTaskExecutor::getExecutorId(vespalib::stringref componentId) const { +ISequencedTaskExecutor::getExecutorIdFromName(vespalib::stringref componentId) const { vespalib::hash<vespalib::stringref> hashfun; return getExecutorId(hashfun(componentId)); } diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h index cd2a6c6f0d8..034e1520b8d 100644 --- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h @@ -37,10 +37,10 @@ public: * @param componentId component id * @return executor id */ - ExecutorId getExecutorId(uint64_t componentId) const; + virtual ExecutorId getExecutorId(uint64_t componentId) const; uint32_t getNumExecutors() const { return _numExecutors; } - ExecutorId getExecutorId(vespalib::stringref componentId) const; + ExecutorId getExecutorIdFromName(vespalib::stringref componentId) const; /** * Schedule a task to run after all previously scheduled tasks with diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp index a0c2f0ac237..0fd78d8dcf6 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp @@ -18,7 +18,8 @@ std::unique_ptr<ISequencedTaskExecutor> SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit, OptimizeFor optimize, uint32_t kindOfWatermark, duration reactionTime) { if (optimize == OptimizeFor::ADAPTIVE) { - return std::make_unique<AdaptiveSequencedExecutor>(threads, threads, kindOfWatermark, taskLimit); + size_t num_strands = std::min(taskLimit, threads*32); + return std::make_unique<AdaptiveSequencedExecutor>(num_strands, threads, kindOfWatermark, taskLimit); } else { auto executors = std::make_unique<std::vector<std::unique_ptr<SyncableThreadExecutor>>>(); executors->reserve(threads); |