diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
commit | 72231250ed81e10d66bfe70701e64fa5fe50f712 (patch) | |
tree | 2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /searchcorespi |
Publish
Diffstat (limited to 'searchcorespi')
79 files changed, 5475 insertions, 0 deletions
diff --git a/searchcorespi/.gitignore b/searchcorespi/.gitignore new file mode 100644 index 00000000000..a9b20e8992d --- /dev/null +++ b/searchcorespi/.gitignore @@ -0,0 +1,2 @@ +Makefile +Testing diff --git a/searchcorespi/CMakeLists.txt b/searchcorespi/CMakeLists.txt new file mode 100644 index 00000000000..201a6ec29c2 --- /dev/null +++ b/searchcorespi/CMakeLists.txt @@ -0,0 +1,25 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_define_module( + DEPENDS + fastos + fnet + vespalog + vespalib + searchlib + searchcommon + metrics + config_cloudconfig + fastlib_fast + configdefinitions + document + persistencetypes + + LIBS + src/vespa/searchcorespi + src/vespa/searchcorespi/flush + src/vespa/searchcorespi/index + src/vespa/searchcorespi/plugin + + TESTS + src/tests/plugin +) diff --git a/searchcorespi/OWNERS b/searchcorespi/OWNERS new file mode 100644 index 00000000000..7066165775a --- /dev/null +++ b/searchcorespi/OWNERS @@ -0,0 +1,3 @@ +tegge +geirst +balder diff --git a/searchcorespi/src/.gitignore b/searchcorespi/src/.gitignore new file mode 100644 index 00000000000..49bfd9b85d3 --- /dev/null +++ b/searchcorespi/src/.gitignore @@ -0,0 +1,4 @@ +/Makefile.ini +/config_command.sh +/project.dsw +/searchcorespi.mak diff --git a/searchcorespi/src/testlist.txt b/searchcorespi/src/testlist.txt new file mode 100644 index 00000000000..df4c0bebe2c --- /dev/null +++ b/searchcorespi/src/testlist.txt @@ -0,0 +1 @@ +tests/plugin diff --git a/searchcorespi/src/tests/plugin/.gitignore b/searchcorespi/src/tests/plugin/.gitignore new file mode 100644 index 00000000000..e49000038ad --- /dev/null +++ b/searchcorespi/src/tests/plugin/.gitignore @@ -0,0 +1,5 @@ +Makefile +.depend +*_test +searchcorespi_factoryregistry_test_app +searchcorespi_plugin_test_app diff --git a/searchcorespi/src/tests/plugin/CMakeLists.txt b/searchcorespi/src/tests/plugin/CMakeLists.txt new file mode 100644 index 00000000000..ae70cc2d7f7 --- /dev/null +++ b/searchcorespi/src/tests/plugin/CMakeLists.txt @@ -0,0 +1,29 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(searchcorespi_plugin_test_app + SOURCES + plugin_test.cpp + DEPENDS + searchcorespi +) +vespa_add_test( + NAME searchcorespi_plugin_test_app + COMMAND searchcorespi_plugin_test_app + ENVIRONMENT "LD_LIBRARY_PATH=." +) +vespa_add_executable(searchcorespi_factoryregistry_test_app + SOURCES + factoryregistry_test.cpp + DEPENDS + searchcorespi +) +vespa_add_test(NAME searchcorespi_factoryregistry_test_app COMMAND searchcorespi_factoryregistry_test_app) +vespa_add_library(searchcorespi_tplugin + SOURCES + plugin.cpp + DEPENDS +) +vespa_add_library(searchcorespi_illegal-plugin + SOURCES + empty.cpp + DEPENDS +) diff --git a/searchcorespi/src/tests/plugin/DESC b/searchcorespi/src/tests/plugin/DESC new file mode 100644 index 00000000000..80d66100e85 --- /dev/null +++ b/searchcorespi/src/tests/plugin/DESC @@ -0,0 +1 @@ +Test of factory plugin interface. diff --git a/searchcorespi/src/tests/plugin/FILES b/searchcorespi/src/tests/plugin/FILES new file mode 100644 index 00000000000..1b658927feb --- /dev/null +++ b/searchcorespi/src/tests/plugin/FILES @@ -0,0 +1 @@ +plugin_test.cpp diff --git a/searchcorespi/src/tests/plugin/empty.cpp b/searchcorespi/src/tests/plugin/empty.cpp new file mode 100644 index 00000000000..d84fabdd5a2 --- /dev/null +++ b/searchcorespi/src/tests/plugin/empty.cpp @@ -0,0 +1 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. diff --git a/searchcorespi/src/tests/plugin/factoryregistry_test.cpp b/searchcorespi/src/tests/plugin/factoryregistry_test.cpp new file mode 100644 index 00000000000..16cb03a5496 --- /dev/null +++ b/searchcorespi/src/tests/plugin/factoryregistry_test.cpp @@ -0,0 +1,63 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Unit tests for factoryregistry. + +#include <vespa/log/log.h> +LOG_SETUP("factoryregistry_test"); +#include <vespa/fastos/fastos.h> + +#include <vespa/searchcorespi/plugin/factoryregistry.h> +#include <vespa/searchcorespi/plugin/iindexmanagerfactory.h> +#include <vespa/vespalib/stllike/string.h> +#include <vespa/vespalib/testkit/testapp.h> + +using vespalib::string; +using namespace searchcorespi; + +namespace { + +struct MyFactory : IIndexManagerFactory { + + virtual IIndexManager::UP createIndexManager(const IndexManagerConfig &, + const index::IndexMaintainerConfig &, + const index::IndexMaintainerContext &) { + return IIndexManager::UP(); + } + virtual config::ConfigKeySet getConfigKeys( + const string &, + const search::index::Schema &, + const config::ConfigInstance &) { + return config::ConfigKeySet(); + } +}; + +const string name = "factory"; + +TEST("require that factories can be added and removed") { + FactoryRegistry registry; + EXPECT_FALSE(registry.isRegistered(name)); + registry.add(name, IIndexManagerFactory::SP(new MyFactory)); + EXPECT_TRUE(registry.get(name).get()); + EXPECT_TRUE(registry.isRegistered(name)); + registry.remove(name); + EXPECT_EXCEPTION(registry.get(name), vespalib::IllegalArgumentException, + "No factory is registered with the name"); +} + +TEST("require that two factories with the same name cannot be added") { + FactoryRegistry registry; + registry.add(name, IIndexManagerFactory::SP(new MyFactory)); + EXPECT_EXCEPTION( + registry.add(name, IIndexManagerFactory::SP(new MyFactory)), + vespalib::IllegalArgumentException, + "A factory is already registered with the same name"); +} + +TEST("require that a non-existent factory cannot be removed") { + FactoryRegistry registry; + EXPECT_EXCEPTION(registry.remove(name), vespalib::IllegalArgumentException, + "No factory is registered with the name"); +} + +} // namespace + +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/searchcorespi/src/tests/plugin/plugin.cpp b/searchcorespi/src/tests/plugin/plugin.cpp new file mode 100644 index 00000000000..ecd8cc892d9 --- /dev/null +++ b/searchcorespi/src/tests/plugin/plugin.cpp @@ -0,0 +1,77 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/searchcorespi/plugin/iindexmanagerfactory.h> + +using namespace search; +using namespace search::index; +using namespace vespalib; +using namespace config; + +namespace searchcorespi { +class IndexManager : public searchcorespi::IIndexManager +{ +public: + + typedef search::SerialNum SerialNum; + typedef search::index::Schema Schema; + typedef document::Document Document; + using OnWriteDoneType = + const std::shared_ptr<search::IDestructorCallback> &; + virtual void putDocument(uint32_t, const Document &, SerialNum) override { } + virtual void removeDocument(uint32_t, SerialNum) override { } + virtual void commit(SerialNum, OnWriteDoneType) override { } + virtual void heartBeat(SerialNum ) {} + virtual SerialNum getCurrentSerialNum() const { return 0; } + virtual SerialNum getFlushedSerialNum() const { return 0; } + virtual IndexSearchable::SP getSearchable() const { + IndexSearchable::SP s; + return s; + } + virtual SearchableStats getSearchableStats() const { + SearchableStats s; + return s; + } + virtual searchcorespi::IFlushTarget::List getFlushTargets() { + searchcorespi::IFlushTarget::List l; + return l; + } + virtual void setSchema(const Schema & , const Schema &) { } + virtual void wipeHistory(SerialNum , const Schema &) { } +}; + +class IndexManagerFactory : public searchcorespi::IIndexManagerFactory +{ +public: + virtual IIndexManager::UP createIndexManager(const IndexManagerConfig &managerCfg, + const index::IndexMaintainerConfig &maintainerConfig, + const index::IndexMaintainerContext &maintainerContext); + + virtual ConfigKeySet getConfigKeys(const string &configId, + const Schema &schema, + const ConfigInstance &rootConfig); +}; + +IIndexManager::UP +IndexManagerFactory::createIndexManager(const IndexManagerConfig &, + const index::IndexMaintainerConfig &, + const index::IndexMaintainerContext &) +{ + return IIndexManager::UP(new IndexManager()); +} + +ConfigKeySet +IndexManagerFactory::getConfigKeys(const string &, + const Schema &, + const ConfigInstance &) +{ + ConfigKeySet keys; + return keys; +} + +} + +searchcorespi::IIndexManagerFactory * +createIndexManagerFactory() +{ + return new searchcorespi::IndexManagerFactory(); +} + diff --git a/searchcorespi/src/tests/plugin/plugin_test.cpp b/searchcorespi/src/tests/plugin/plugin_test.cpp new file mode 100644 index 00000000000..34692b4ed7b --- /dev/null +++ b/searchcorespi/src/tests/plugin/plugin_test.cpp @@ -0,0 +1,32 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/vespalib/testkit/testapp.h> +#include <vespa/searchcorespi/plugin/factoryloader.h> +#include <vespa/log/log.h> +LOG_SETUP("factory_test"); + +using namespace searchcorespi; + +namespace { +TEST("require that plugins can be loaded.") { + FactoryLoader fl; + IIndexManagerFactory::UP f = fl.create("searchcorespi_tplugin"); + ASSERT_TRUE(f.get()); +} + +TEST("require that non-existent plugin causes failure") { + FactoryLoader fl; + EXPECT_EXCEPTION(fl.create("no-such-plugin"), + vespalib::IllegalArgumentException, + "cannot open shared object file"); +} + +TEST("require that missing factory function causes failure") { + FactoryLoader fl; + EXPECT_EXCEPTION(fl.create("searchcorespi_illegal-plugin"), + vespalib::IllegalArgumentException, + "Failed locating symbol 'createIndexManagerFactory'"); +} +} // namespace + +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/searchcorespi/src/vespa/searchcorespi/.gitignore b/searchcorespi/src/vespa/searchcorespi/.gitignore new file mode 100644 index 00000000000..9d6ecd51398 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/.gitignore @@ -0,0 +1,3 @@ +/.depend +/Makefile +/libsearchcorespi.so.5.1 diff --git a/searchcorespi/src/vespa/searchcorespi/CMakeLists.txt b/searchcorespi/src/vespa/searchcorespi/CMakeLists.txt new file mode 100644 index 00000000000..a28c17c5c94 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/CMakeLists.txt @@ -0,0 +1,9 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(searchcorespi + SOURCES + $<TARGET_OBJECTS:searchcorespi_flush> + $<TARGET_OBJECTS:searchcorespi_index> + $<TARGET_OBJECTS:searchcorespi_plugin> + INSTALL lib64 + DEPENDS +) diff --git a/searchcorespi/src/vespa/searchcorespi/flush/.gitignore b/searchcorespi/src/vespa/searchcorespi/flush/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/flush/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/searchcorespi/src/vespa/searchcorespi/flush/CMakeLists.txt b/searchcorespi/src/vespa/searchcorespi/flush/CMakeLists.txt new file mode 100644 index 00000000000..1f565cb7aac --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/flush/CMakeLists.txt @@ -0,0 +1,6 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(searchcorespi_flush OBJECT + SOURCES + flushstats.cpp + DEPENDS +) diff --git a/searchcorespi/src/vespa/searchcorespi/flush/closureflushtask.h b/searchcorespi/src/vespa/searchcorespi/flush/closureflushtask.h new file mode 100644 index 00000000000..0cd653770fb --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/flush/closureflushtask.h @@ -0,0 +1,47 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "flushtask.h" +#include <vespa/vespalib/util/closure.h> + +namespace searchcorespi { + +class ClosureFlushTask : public FlushTask +{ + std::unique_ptr<vespalib::Closure> _closure; + search::SerialNum _flushSerial; + +public: + ClosureFlushTask(std::unique_ptr<vespalib::Closure> closure, + search::SerialNum flushSerial) + : _closure(std::move(closure)), + _flushSerial(flushSerial) + { + } + + virtual search::SerialNum + getFlushSerial() const + { + return _flushSerial; + } + + virtual void + run() + { + _closure->call(); + } +}; + +/** + * Wraps a Closure as a FlushTask + **/ +static inline FlushTask::UP +makeFlushTask(std::unique_ptr<vespalib::Closure> closure, + search::SerialNum flushSerial) +{ + return FlushTask::UP(new ClosureFlushTask(std::move(closure), + flushSerial)); +} + +} // namespace searchcorespi + diff --git a/searchcorespi/src/vespa/searchcorespi/flush/flushstats.cpp b/searchcorespi/src/vespa/searchcorespi/flush/flushstats.cpp new file mode 100644 index 00000000000..29318809335 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/flush/flushstats.cpp @@ -0,0 +1,16 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP(".searchcorespi.flush.flushstats"); + +#include "flushstats.h" + +namespace searchcorespi { + +FlushStats::FlushStats() : + _path(), + _pathElementsToLog(6) +{ +} + +} // namespace searchcorespi diff --git a/searchcorespi/src/vespa/searchcorespi/flush/flushstats.h b/searchcorespi/src/vespa/searchcorespi/flush/flushstats.h new file mode 100644 index 00000000000..b11712624ab --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/flush/flushstats.h @@ -0,0 +1,28 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/vespalib/stllike/string.h> + +namespace searchcorespi { + +/** + * Class with stats for what have been flushed. + */ +class FlushStats +{ +private: + vespalib::string _path; // path to data flushed + size_t _pathElementsToLog; + +public: + FlushStats(); + + void setPath(const vespalib::string & path) { _path = path; } + void setPathElementsToLog(size_t numElems) { _pathElementsToLog = numElems; } + + const vespalib::string & getPath() const { return _path; } + size_t getPathElementsToLog() const { return _pathElementsToLog; } +}; + +} // namespace searchcorespi + diff --git a/searchcorespi/src/vespa/searchcorespi/flush/flushtask.h b/searchcorespi/src/vespa/searchcorespi/flush/flushtask.h new file mode 100644 index 00000000000..cfcd6fd914d --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/flush/flushtask.h @@ -0,0 +1,20 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/vespalib/util/executor.h> +#include <vespa/vespalib/util/closure.h> +#include <vespa/searchlib/common/serialnum.h> + +namespace searchcorespi { + +class FlushTask : public vespalib::Executor::Task +{ +public: + typedef std::unique_ptr<FlushTask> UP; + + virtual search::SerialNum + getFlushSerial() const = 0; +}; + +} // namespace searchcorespi + diff --git a/searchcorespi/src/vespa/searchcorespi/flush/iflushtarget.h b/searchcorespi/src/vespa/searchcorespi/flush/iflushtarget.h new file mode 100644 index 00000000000..724ffe5e2bd --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/flush/iflushtarget.h @@ -0,0 +1,186 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "flushstats.h" +#include <string> +#include <vespa/searchlib/common/serialnum.h> +#include <vespa/vespalib/util/executor.h> +#include "flushtask.h" + +namespace searchcorespi { + +/** + * This abstract class represents a flushable object that uses + * getApproxBytesBeforeFlush() bytes of memory, that will be reduced to + * getApproxBytesAfterFlush() if flushed. + */ +class IFlushTarget +{ +public: + /** + * The flush types that a flush target can represent. + */ + enum class Type { + FLUSH, + SYNC, + GC, + OTHER + }; + + /** + * The component types that a flush target can be used for. + */ + enum class Component { + ATTRIBUTE, + INDEX, + DOCUMENT_STORE, + OTHER + }; + +private: + vespalib::string _name; + Type _type; + Component _component; + +public: + template<typename T> + class Gain { + public: + Gain() : _before(0), _after(0) { } + Gain(T before, T after) : _before(before), _after(after) { } + T getBefore() const { return _before; } + T getAfter() const { return _after; } + T gain() const { return _before - _after; } + double gainRate() const { return (_before != 0) ? double(gain())/_before : 0;} + Gain & operator += (const Gain & b) { _before += b.getBefore(); _after += b.getAfter(); return *this; } + static Gain noGain(size_t currentSize) { return Gain(currentSize, currentSize); } + private: + T _before; + T _after; + }; + typedef Gain<int64_t> MemoryGain; + typedef Gain<int64_t> DiskGain; + typedef search::SerialNum SerialNum; + typedef fastos::TimeStamp Time; + + /** + * Convenience typedefs. + */ + typedef std::shared_ptr<IFlushTarget> SP; + typedef std::vector<SP> List; + typedef FlushTask Task; + + /** + * Constructs a new instance of this class. + * + * @param name The handler-wide unique name of this target. + */ + IFlushTarget(const vespalib::string &name) + : _name(name), + _type(Type::OTHER), + _component(Component::OTHER) + { + } + + /** + * Constructs a new instance of this class. + * + * @param name The handler-wide unique name of this target. + * @param type The flush type of this target. + * @param component The component type of this target. + */ + IFlushTarget(const vespalib::string &name, + const Type &type, + const Component &component) + : _name(name), + _type(type), + _component(component) + { + } + + /** + * Virtual destructor required for inheritance. + */ + virtual ~IFlushTarget() { } + + /** + * Returns the handler-wide unique name of this target. + * + * @return The name of this. + */ + const vespalib::string & getName() const { return _name; } + + /** + * Returns the flush type of this target. + */ + Type getType() const { return _type; } + + /** + * Returns the component type of this target. + */ + Component getComponent() const { return _component; } + + /** + * Returns the approximate memory gain of this target, in bytes. + * + * @return The gain + */ + virtual MemoryGain getApproxMemoryGain() const = 0; + + /** + * Returns the approximate memory gain of this target, in bytes. + * + * @return The gain + */ + virtual DiskGain getApproxDiskGain() const = 0; + + /** + * Returns the approximate amount of bytes this target writes to disk if flushed. + */ + virtual uint64_t getApproxBytesToWriteToDisk() const = 0; + + /** + * Returns the last serial number for the transaction applied to + * target before it was flushed to disk. The transaction log can + * not be pruned beyond this. + * + * @return The last serial number represented in flushed target + */ + virtual SerialNum getFlushedSerialNum() const = 0; + + /** + * Returns the time of last flush. + * + * @return The last flush time. + */ + virtual Time getLastFlushTime() const = 0; + + /** + * Return if the traget itself is in bad need for a flush. + * + * @return true if an urgent flush is needed + */ + virtual bool needUrgentFlush() const { return false; } + + /** + * Initiates the flushing of temporary memory. This method must perform + * everything required to allow another thread to complete the flush. This + * method is called by the flush scheduler thread. + * + * @param currentSerial The current transaction serial number. + * @return The task used to complete the flush. + */ + virtual Task::UP initFlush(SerialNum currentSerial) = 0; + + /** + * Returns the stats for the last completed flush operation + * for this flush target. + * + * @return The stats for the last flush. + */ + virtual FlushStats getLastFlushStats() const = 0; + +}; + +} // namespace searchcorespi + diff --git a/searchcorespi/src/vespa/searchcorespi/index/.gitignore b/searchcorespi/src/vespa/searchcorespi/index/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/searchcorespi/src/vespa/searchcorespi/index/CMakeLists.txt b/searchcorespi/src/vespa/searchcorespi/index/CMakeLists.txt new file mode 100644 index 00000000000..65b006f8ea9 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/CMakeLists.txt @@ -0,0 +1,25 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(searchcorespi_index OBJECT + SOURCES + activediskindexes.cpp + diskindexcleaner.cpp + eventlogger.cpp + fusionrunner.cpp + iindexmanager.cpp + iindexcollection.cpp + index_manager_explorer.cpp + indexcollection.cpp + indexdisklayout.cpp + indexflushtarget.cpp + indexfusiontarget.cpp + indexmaintainer.cpp + indexmaintainerconfig.cpp + indexmaintainercontext.cpp + indexmanagerconfig.cpp + indexreadutilities.cpp + indexsearchable.cpp + indexwriteutilities.cpp + warmupindexcollection.cpp + isearchableindexcollection.cpp + DEPENDS +) diff --git a/searchcorespi/src/vespa/searchcorespi/index/activediskindexes.cpp b/searchcorespi/src/vespa/searchcorespi/index/activediskindexes.cpp new file mode 100644 index 00000000000..73bda4d94c8 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/activediskindexes.cpp @@ -0,0 +1,34 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP(".searchcorespi.index.activediskindexes"); + +#include "activediskindexes.h" + +using std::set; +using vespalib::string; +using vespalib::LockGuard; + +namespace searchcorespi { +namespace index { + +void ActiveDiskIndexes::setActive(const string &index) { + LockGuard lock(_lock); + _active.insert(index); +} + +void ActiveDiskIndexes::notActive(const string & index) { + LockGuard lock(_lock); + set<string>::iterator it = _active.find(index); + assert(it != _active.end()); + _active.erase(it); +} + +bool ActiveDiskIndexes::isActive(const string &index) const { + LockGuard lock(_lock); + return _active.find(index) != _active.end(); +} + +} // namespace index +} // namespace searchcorespi diff --git a/searchcorespi/src/vespa/searchcorespi/index/activediskindexes.h b/searchcorespi/src/vespa/searchcorespi/index/activediskindexes.h new file mode 100644 index 00000000000..98caeb1e4b0 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/activediskindexes.h @@ -0,0 +1,30 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/vespalib/stllike/string.h> +#include <vespa/vespalib/util/sync.h> +#include <set> + +namespace searchcorespi { +namespace index { + +/** + * Class used to keep track of the set of active disk indexes in an index maintainer. + * The index directories are used as identifiers. + */ +class ActiveDiskIndexes { + std::multiset<vespalib::string> _active; + vespalib::Lock _lock; + +public: + typedef std::shared_ptr<ActiveDiskIndexes> SP; + + void setActive(const vespalib::string & index); + void notActive(const vespalib::string & index); + bool isActive(const vespalib::string & index) const; +}; + +} // namespace index +} // namespace searchcorespi + diff --git a/searchcorespi/src/vespa/searchcorespi/index/diskindexcleaner.cpp b/searchcorespi/src/vespa/searchcorespi/index/diskindexcleaner.cpp new file mode 100644 index 00000000000..68e7853c820 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/diskindexcleaner.cpp @@ -0,0 +1,113 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP(".searchcorespi.index.diskindexcleaner"); + +#include "activediskindexes.h" +#include "diskindexcleaner.h" +#include <algorithm> +#include <sstream> +#include <vector> + +using std::istringstream; +using vespalib::string; +using std::vector; + +namespace searchcorespi { +namespace index { +namespace { +vector<string> readIndexes(const string &base_dir) { + vector<string> indexes; + FastOS_DirectoryScan dir_scan(base_dir.c_str()); + while (dir_scan.ReadNext()) { + string name = dir_scan.GetName(); + if (!dir_scan.IsDirectory() || name.find("index.") != 0) { + continue; + } + indexes.push_back(name); + } + return indexes; +} + +bool isValidIndex(const string &index_dir) { + FastOS_File serial_file((index_dir + "/serial.dat").c_str()); + return serial_file.OpenReadOnlyExisting(); +} + +uint32_t findLastFusionId(const string &base_dir, + const vector<string> &indexes) { + uint32_t fusion_id = 0; + const string prefix = "index.fusion."; + for (size_t i = 0; i < indexes.size(); ++i) { + if (indexes[i].find(prefix) != 0) { + continue; + } + if (!isValidIndex(base_dir + "/" + indexes[i])) { + continue; + } + + uint32_t new_id = 0; + istringstream ist(indexes[i].substr(prefix.size())); + ist >> new_id; + fusion_id = std::max(fusion_id, new_id); + } + return fusion_id; +} + +void removeDir(const string &dir) { + LOG(debug, "Removing index dir '%s'", dir.c_str()); + FastOS_FileInterface::EmptyAndRemoveDirectory(dir.c_str()); +} + +bool isOldIndex(const string &index, uint32_t last_fusion_id) { + string::size_type pos = index.rfind("."); + istringstream ist(index.substr(pos + 1)); + uint32_t id = last_fusion_id; + ist >> id; + if (id < last_fusion_id) { + return true; + } else if (id == last_fusion_id) { + return index.find("flush") != string::npos; + } + return false; +} + +void removeOld(const string &base_dir, const vector<string> &indexes, + const ActiveDiskIndexes &active_indexes) { + uint32_t last_fusion_id = findLastFusionId(base_dir, indexes); + for (size_t i = 0; i < indexes.size(); ++i) { + const string index_dir = base_dir + "/" + indexes[i]; + if (isOldIndex(indexes[i], last_fusion_id) && + !active_indexes.isActive(index_dir)) + { + removeDir(index_dir); + } + } +} + +void removeInvalid(const string &base_dir, const vector<string> &indexes) { + for (size_t i = 0; i < indexes.size(); ++i) { + const string index_dir = base_dir + "/" + indexes[i]; + if (!isValidIndex(index_dir)) { + LOG(debug, "Found invalid index dir '%s'", index_dir.c_str()); + removeDir(index_dir); + } + } +} +} // namespace + +void DiskIndexCleaner::clean(const string &base_dir, + const ActiveDiskIndexes &active_indexes) { + vector<string> indexes = readIndexes(base_dir); + removeOld(base_dir, indexes, active_indexes); + removeInvalid(base_dir, indexes); +} + +void DiskIndexCleaner::removeOldIndexes( + const string &base_dir, const ActiveDiskIndexes &active_indexes) { + vector<string> indexes = readIndexes(base_dir); + removeOld(base_dir, indexes, active_indexes); +} +} // namespace index +} // namespace searchcorespi diff --git a/searchcorespi/src/vespa/searchcorespi/index/diskindexcleaner.h b/searchcorespi/src/vespa/searchcorespi/index/diskindexcleaner.h new file mode 100644 index 00000000000..f8b98ffdf0d --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/diskindexcleaner.h @@ -0,0 +1,26 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/vespalib/stllike/string.h> + +namespace searchcorespi { +namespace index { +class ActiveDiskIndexes; + +/** + * Utility class used to clean and remove index directories. + */ +struct DiskIndexCleaner { + /** + * Deletes all indexes with id lower than the most recent fusion id. + */ + static void clean(const vespalib::string &index_dir, + const ActiveDiskIndexes& active_indexes); + static void removeOldIndexes(const vespalib::string &index_dir, + const ActiveDiskIndexes& active_indexes); +}; + +} // namespace index +} // namespace searchcorespi + diff --git a/searchcorespi/src/vespa/searchcorespi/index/eventlogger.cpp b/searchcorespi/src/vespa/searchcorespi/index/eventlogger.cpp new file mode 100644 index 00000000000..c5e4a382233 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/eventlogger.cpp @@ -0,0 +1,71 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP(".searchcorespi.index.eventlogger"); + +#include "eventlogger.h" +#include <vespa/searchlib/util/logutil.h> + +using vespalib::JSONStringer; +using search::util::LogUtil; + +namespace searchcorespi { +namespace index { + +void +EventLogger::diskIndexLoadStart(const vespalib::string &indexDir) +{ + JSONStringer jstr; + jstr.beginObject(); + jstr.appendKey("input"); + LogUtil::logDir(jstr, indexDir, 6); + jstr.endObject(); + EV_STATE("diskindex.load.start", jstr.toString().c_str()); +} + +void +EventLogger::diskIndexLoadComplete(const vespalib::string &indexDir, + int64_t elapsedTimeMs) +{ + JSONStringer jstr; + jstr.beginObject(); + jstr.appendKey("time.elapsed.ms").appendInt64(elapsedTimeMs); + jstr.appendKey("input"); + LogUtil::logDir(jstr, indexDir, 6); + jstr.endObject(); + EV_STATE("diskindex.load.complete", jstr.toString().c_str()); +} + +void +EventLogger::diskFusionStart(const std::vector<vespalib::string> &sources, + const vespalib::string &fusionDir) +{ + JSONStringer jstr; + jstr.beginObject(); + jstr.appendKey("inputs"); + jstr.beginArray(); + for (size_t i = 0; i < sources.size(); ++i) { + LogUtil::logDir(jstr, sources[i], 6); + } + jstr.endArray(); + jstr.appendKey("output"); + LogUtil::logDir(jstr, fusionDir, 6); + jstr.endObject(); + EV_STATE("fusion.start", jstr.toString().c_str()); +} + +void +EventLogger::diskFusionComplete(const vespalib::string &fusionDir, + int64_t elapsedTimeMs) +{ + JSONStringer jstr; + jstr.beginObject(); + jstr.appendKey("time.elapsed.ms").appendInt64(elapsedTimeMs); + jstr.appendKey("output"); + LogUtil::logDir(jstr, fusionDir, 6); + jstr.endObject(); + EV_STATE("fusion.complete", jstr.toString().c_str()); +} + +} // namespace index +} // namespace searchcorespi diff --git a/searchcorespi/src/vespa/searchcorespi/index/eventlogger.h b/searchcorespi/src/vespa/searchcorespi/index/eventlogger.h new file mode 100644 index 00000000000..ce2fd340fdd --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/eventlogger.h @@ -0,0 +1,25 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/vespalib/stllike/string.h> +#include <vector> + +namespace searchcorespi { +namespace index { + +/** + * Class used to log various events related to disk index handling. + **/ +struct EventLogger { + static void diskIndexLoadStart(const vespalib::string &indexDir); + static void diskIndexLoadComplete(const vespalib::string &indexDir, + int64_t elapsedTimeMs); + static void diskFusionStart(const std::vector<vespalib::string> &sources, + const vespalib::string &fusionDir); + static void diskFusionComplete(const vespalib::string &fusionDir, + int64_t elapsedTimeMs); +}; + +} // namespace index +} // namespace searchcorespi + diff --git a/searchcorespi/src/vespa/searchcorespi/index/fakeindexsearchable.h b/searchcorespi/src/vespa/searchcorespi/index/fakeindexsearchable.h new file mode 100644 index 00000000000..0db49c5f2df --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/fakeindexsearchable.h @@ -0,0 +1,43 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "indexsearchable.h" +#include <vespa/searchlib/queryeval/fake_searchable.h> + +namespace searchcorespi { + +/** + * A fake index searchable used for unit testing. + */ +class FakeIndexSearchable : public IndexSearchable { +private: + search::queryeval::FakeSearchable _fake; + +public: + FakeIndexSearchable() + : _fake() + { + } + + search::queryeval::FakeSearchable &getFake() { return _fake; } + + /** + * Implements IndexSearchable + */ + virtual Blueprint::UP + createBlueprint(const IRequestContext & requestContext, + const FieldSpec &field, + const Node &term, + const IAttributeContext &) + { + return _fake.createBlueprint(requestContext, field, term); + } + + virtual search::SearchableStats getSearchableStats() const { + return search::SearchableStats(); + } +}; + +} // namespace searchcorespi + diff --git a/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.cpp b/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.cpp new file mode 100644 index 00000000000..13bef302083 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.cpp @@ -0,0 +1,145 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP(".searchcorespi.index.fusionrunner"); + +#include "fusionrunner.h" +#include "eventlogger.h" +#include "fusionspec.h" +#include <vespa/searchlib/common/serialnumfileheadercontext.h> +#include <vespa/searchlib/attribute/fixedsourceselector.h> +#include <vespa/searchlib/queryeval/isourceselector.h> +#include <vespa/searchlib/util/dirtraverse.h> +#include <vespa/vespalib/util/jsonwriter.h> + +using search::FixedSourceSelector; +using search::TuneFileAttributes; +using search::TuneFileIndexing; +using search::common::FileHeaderContext; +using search::common::SerialNumFileHeaderContext; +using search::index::Schema; +using search::queryeval::ISourceSelector; +using search::diskindex::SelectorArray; +using search::SerialNum; +using std::vector; +using vespalib::string; +using vespalib::JSONStringer; + +namespace searchcorespi { +namespace index { + +FusionRunner::FusionRunner(const string &base_dir, + const Schema &schema, + const TuneFileAttributes &tuneFileAttributes, + const FileHeaderContext &fileHeaderContext) + : _diskLayout(base_dir), + _schema(schema), + _tuneFileAttributes(tuneFileAttributes), + _fileHeaderContext(fileHeaderContext) +{ +} + +namespace { + +void readSelectorArray(const string &selector_name, SelectorArray &selector_array, + const vector<uint8_t> &id_map, uint32_t base_id) { + FixedSourceSelector::UP selector = + FixedSourceSelector::load(selector_name); + if (base_id != selector->getBaseId()) { + selector = selector->cloneAndSubtract( + "tmp_for_fusion", base_id - selector->getBaseId()); + } + + const uint32_t num_docs = selector->getDocIdLimit(); + selector_array.reserve(num_docs); + ISourceSelector::Iterator::UP it = selector->createIterator(); + for (uint32_t i = 0; i < num_docs; ++i) { + search::queryeval::Source source = it->getSource(i); + assert(source < id_map.size()); + selector_array.push_back(id_map[source]); + } +} + +bool +writeFusionSelector(const IndexDiskLayout &diskLayout, uint32_t fusion_id, + uint32_t highest_doc_id, + const TuneFileAttributes &tuneFileAttributes, + const FileHeaderContext &fileHeaderContext) +{ + const search::queryeval::Source default_source = 0; + FixedSourceSelector fusion_selector(default_source, "fusion_selector"); + fusion_selector.setSource(highest_doc_id, default_source); + fusion_selector.setBaseId(fusion_id); + string selector_name = + IndexDiskLayout::getSelectorFileName(diskLayout.getFusionDir(fusion_id)); + if (!fusion_selector.extractSaveInfo(selector_name)->save( + tuneFileAttributes, fileHeaderContext)) { + LOG(warning, + "Unable to write source selector data for fusion.%u.", fusion_id); + return false; + } + return true; +} +} // namespace + +uint32_t +FusionRunner::fuse(const FusionSpec &fusion_spec, + SerialNum lastSerialNum, + IIndexMaintainerOperations &operations) +{ + const vector<uint32_t> &ids = fusion_spec.flush_ids; + if (ids.empty()) { + return 0; + } + const uint32_t fusion_id = ids.back(); + const string fusion_dir = _diskLayout.getFusionDir(fusion_id); + + vector<string> sources; + vector<uint8_t> id_map(fusion_id + 1); + if (fusion_spec.last_fusion_id != 0) { + id_map[0] = sources.size(); + sources.push_back( + _diskLayout.getFusionDir(fusion_spec.last_fusion_id)); + } + for (size_t i = 0; i < ids.size(); ++i) { + id_map[ids[i] - fusion_spec.last_fusion_id] = sources.size(); + sources.push_back(_diskLayout.getFlushDir(ids[i])); + } + + if (LOG_WOULD_LOG(event)) { + EventLogger::diskFusionStart(sources, + fusion_dir); + } + FastOS_Time timer; + timer.SetNow(); + + const string selector_name = + IndexDiskLayout::getSelectorFileName(_diskLayout.getFlushDir(fusion_id)); + SelectorArray selector_array; + readSelectorArray(selector_name, selector_array, + id_map, fusion_spec.last_fusion_id); + + if (!operations.runFusion(_schema, fusion_dir, sources, selector_array, + lastSerialNum)) { + return 0; + } + + const uint32_t highest_doc_id = selector_array.size() - 1; + SerialNumFileHeaderContext fileHeaderContext(_fileHeaderContext, + lastSerialNum); + if (!writeFusionSelector(_diskLayout, fusion_id, highest_doc_id, + _tuneFileAttributes, + fileHeaderContext)) { + return 0; + } + + if (LOG_WOULD_LOG(event)) { + EventLogger::diskFusionComplete(fusion_dir, + (int64_t)timer.MilliSecsToNow()); + } + return fusion_id; +} + +} // namespace index +} // namespace searchcorespi diff --git a/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.h b/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.h new file mode 100644 index 00000000000..097b76bc4cc --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.h @@ -0,0 +1,63 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "iindexmaintaineroperations.h" +#include "indexdisklayout.h" +#include <vespa/searchcommon/common/schema.h> +#include <vespa/searchlib/common/tunefileinfo.h> + +namespace search +{ + +namespace common +{ + +class FileHeaderContext; + +} + +} + +namespace searchcorespi { +namespace index { +class FusionSpec; + +/** + * FusionRunner runs fusion on a set of disk indexes, specified as a + * vector of ids. The disk indexes must be stored in directories named + * "index.flush.<id>" within the base dir, and the fusioned indexes + * will be stored similarly in directories named "index.fusion.<id>". + **/ +class FusionRunner { + const IndexDiskLayout _diskLayout; + const search::index::Schema _schema; + const search::TuneFileAttributes _tuneFileAttributes; + const search::common::FileHeaderContext &_fileHeaderContext; + +public: + /** + * Create a FusionRunner that operates on indexes stored in the + * base dir. + **/ + FusionRunner(const vespalib::string &base_dir, + const search::index::Schema &schema, + const search::TuneFileAttributes &tuneFileAttributes, + const search::common::FileHeaderContext &fileHeaderContext); + + /** + * Combine the indexes specified by the ids by running fusion. + * + * @param fusion_spec the specification on which indexes to run fusion on. + * @param lastSerialNum the serial number of the last flushed index part of the fusion spec. + * @param operations interface used for running the actual fusion. + * @return the id of the fusioned disk index + **/ + uint32_t fuse(const FusionSpec &fusion_spec, + search::SerialNum lastSerialNum, + IIndexMaintainerOperations &operations); +}; + +} // namespace index +} // namespace searchcorespi + diff --git a/searchcorespi/src/vespa/searchcorespi/index/fusionspec.h b/searchcorespi/src/vespa/searchcorespi/index/fusionspec.h new file mode 100644 index 00000000000..5e31698a7fc --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/fusionspec.h @@ -0,0 +1,24 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vector> + +namespace searchcorespi { +namespace index { + +/** + * Specifies a set of disk index ids for fusion. + * + * Note: All ids in FusionSpec are absolute ids. + **/ +struct FusionSpec { + uint32_t last_fusion_id; + std::vector<uint32_t> flush_ids; + + FusionSpec() : last_fusion_id(0), flush_ids() {} +}; + +} // namespace index +} // namespace searchcorespi + diff --git a/searchcorespi/src/vespa/searchcorespi/index/i_thread_service.h b/searchcorespi/src/vespa/searchcorespi/index/i_thread_service.h new file mode 100644 index 00000000000..ffa160f7c96 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/i_thread_service.h @@ -0,0 +1,34 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <boost/noncopyable.hpp> +#include <vespa/vespalib/util/runnable.h> +#include <vespa/vespalib/util/threadexecutor.h> + +namespace searchcorespi { +namespace index { + +/** + * Interface for a single thread used for write tasks. + */ +struct IThreadService : public boost::noncopyable, + public vespalib::ThreadExecutor +{ + virtual ~IThreadService() {} + + /** + * Run the given runnable in the underlying thread and wait until its done. + */ + virtual void run(vespalib::Runnable &runnable) = 0; + + /** + * Returns whether the current thread is the underlying thread. + */ + virtual bool isCurrentThread() const = 0; + +}; + +} // namespace index +} // namespace searchcorespi + + diff --git a/searchcorespi/src/vespa/searchcorespi/index/idiskindex.h b/searchcorespi/src/vespa/searchcorespi/index/idiskindex.h new file mode 100644 index 00000000000..7495939ee91 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/idiskindex.h @@ -0,0 +1,33 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/searchcorespi/index/indexsearchable.h> +#include <vespa/searchcommon/common/schema.h> +#include <vespa/vespalib/stllike/string.h> + +namespace searchcorespi { +namespace index { + +/** + * Interface for a disk index as seen from an index maintainer. + */ +struct IDiskIndex : public IndexSearchable { + typedef std::shared_ptr<IDiskIndex> SP; + virtual ~IDiskIndex() {} + + /** + * Returns the directory in which this disk index exists. + */ + virtual const vespalib::string &getIndexDir() const = 0; + + /** + * Returns the schema used by this disk index. + * Note that the schema should be part of the index on disk. + */ + virtual const search::index::Schema &getSchema() const = 0; +}; + +} // namespace index +} // namespace searchcorespi + + diff --git a/searchcorespi/src/vespa/searchcorespi/index/iindexcollection.cpp b/searchcorespi/src/vespa/searchcorespi/index/iindexcollection.cpp new file mode 100644 index 00000000000..961e908f1bb --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/iindexcollection.cpp @@ -0,0 +1,45 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/searchcorespi/index/iindexcollection.h> +#include <vespa/searchcorespi/index/idiskindex.h> +#include <vespa/vespalib/stllike/asciistream.h> + +namespace searchcorespi { + +using index::IDiskIndex; + +vespalib::string IIndexCollection::toString() const +{ + vespalib::asciistream s; + s << "selector : " << &getSourceSelector() << "(baseId=" << getSourceSelector().getBaseId() + << ", docidlimit=" << getSourceSelector().getDocIdLimit() + << ", defaultsource=" << uint32_t(getSourceSelector().getDefaultSource()) + << ")\n"; +#if 0 + search::queryeval::ISourceSelector::Iterator::UP it = getSourceSelector().createIterator(); + s << "{"; + for (size_t i(0), m(getSourceSelector().getDocIdLimit()); i < m; i++) { + s << uint32_t(it->getSource(i)) << ' '; + } + s << "}\n"; +#endif + s << getSourceCount() << " {"; + if (getSourceCount() > 0) { + for (size_t i(0), m(getSourceCount()); i < m; i++) { + if (i != 0) { + s << ", "; + } + const IndexSearchable & is(getSearchable(i)); + s << getSourceId(i) << " : " << &is << "("; + if (dynamic_cast<const IDiskIndex *>(&is) != NULL) { + s << dynamic_cast<const IDiskIndex &>(is).getIndexDir().c_str(); + } else { + s << typeid(is).name(); + } + s << ")"; + } + } + s << "}"; + return s.str(); +} + +} diff --git a/searchcorespi/src/vespa/searchcorespi/index/iindexcollection.h b/searchcorespi/src/vespa/searchcorespi/index/iindexcollection.h new file mode 100644 index 00000000000..e4b90f11bac --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/iindexcollection.h @@ -0,0 +1,50 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "indexsearchable.h" +#include <vespa/searchlib/queryeval/isourceselector.h> +#include <vespa/searchlib/util/searchable_stats.h> + +namespace searchcorespi { + +/** + * Interface for a set of index searchables with source ids, + * and a source selector for determining which index searchable to use for each document. + */ +class IIndexCollection { +protected: + typedef search::queryeval::ISourceSelector ISourceSelector; +public: + typedef std::unique_ptr<IIndexCollection> UP; + typedef std::shared_ptr<IIndexCollection> SP; + + virtual ~IIndexCollection() {} + + /** + * Returns the source selector used to determine which index to use for each document. + */ + virtual const ISourceSelector &getSourceSelector() const = 0; + + /** + * Returns the number sources (index searchables) for this collection. + */ + virtual size_t getSourceCount() const = 0; + + /** + * Returns the index searchable for source i (i in the range [0, getSourceCount()>). + */ + virtual IndexSearchable &getSearchable(uint32_t i) const = 0; + + /** + * Returns the source id for source i (i in the range [0, getSourceCount()>). + * The source id is used for this source in the source selector. + */ + virtual uint32_t getSourceId(uint32_t i) const = 0; + + virtual vespalib::string toString() const; + +}; + +} // namespace searchcorespi + diff --git a/searchcorespi/src/vespa/searchcorespi/index/iindexmaintaineroperations.h b/searchcorespi/src/vespa/searchcorespi/index/iindexmaintaineroperations.h new file mode 100644 index 00000000000..2b8a3ac3965 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/iindexmaintaineroperations.h @@ -0,0 +1,56 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "idiskindex.h" +#include "imemoryindex.h" +#include <vespa/searchcommon/common/schema.h> +#include <vespa/searchlib/common/serialnum.h> +#include <vespa/searchlib/diskindex/docidmapper.h> + +namespace searchcorespi { +namespace index { + +/** + * Interface for operations needed by an index maintainer. + */ +struct IIndexMaintainerOperations { + virtual ~IIndexMaintainerOperations() {} + + /** + * Creates a new memory index using the given schema. + */ + virtual IMemoryIndex::SP createMemoryIndex(const search::index::Schema &schema) = 0; + + /** + * Loads a disk index from the given directory. + */ + virtual IDiskIndex::SP loadDiskIndex(const vespalib::string &indexDir) = 0; + + /** + * Reloads the given disk index and returns a new instance. + */ + virtual IDiskIndex::SP reloadDiskIndex(const IDiskIndex &oldIndex) = 0; + + /** + * Runs fusion on a given set of input disk indexes to create a fusioned output disk index. + * The selector array contains a source for all local document ids ([0, docIdLimit>) + * in the range [0, sources.size()> and is used to determine in which input disk index + * a document is located. + * + * @param schema the schema of the resulting fusioned disk index. + * @param outputDir the output directory of the fusioned disk index. + * @param sources the directories of the input disk indexes. + * @param selectorArray the array specifying in which input disk index a document is located. + * @param lastSerialNum the serial number of the last operation in the last input disk index. + */ + virtual bool runFusion(const search::index::Schema &schema, + const vespalib::string &outputDir, + const std::vector<vespalib::string> &sources, + const search::diskindex::SelectorArray &selectorArray, + search::SerialNum lastSerialNum) = 0; +}; + +} // namespace index +} // namespace searchcorespi + + diff --git a/searchcorespi/src/vespa/searchcorespi/index/iindexmanager.cpp b/searchcorespi/src/vespa/searchcorespi/index/iindexmanager.cpp new file mode 100644 index 00000000000..b6d350ad52e --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/iindexmanager.cpp @@ -0,0 +1,17 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/searchcorespi/index/iindexmanager.h> + +namespace searchcorespi { + +void +IIndexManager::wipeHistory(SerialNum wipeSerial, const Schema &historyFields) +{ + (void) wipeSerial; + (void) historyFields; +} + +IIndexManager::Reconfigurer::~Reconfigurer() +{ +} + +} // namespace searchcorespi diff --git a/searchcorespi/src/vespa/searchcorespi/index/iindexmanager.h b/searchcorespi/src/vespa/searchcorespi/index/iindexmanager.h new file mode 100644 index 00000000000..e774629c787 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/iindexmanager.h @@ -0,0 +1,178 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/document/fieldvalue/document.h> +#include <vespa/searchcommon/common/schema.h> +#include <vespa/searchcorespi/flush/flushstats.h> +#include <vespa/searchcorespi/flush/iflushtarget.h> +#include <vespa/searchlib/common/serialnum.h> +#include <vespa/vespalib/util/closure.h> +#include "indexsearchable.h" + +namespace search +{ + +class IDestructorCallback; + +} + +namespace searchcorespi { + +/** + * Interface for an index manager: + * - Keeps track of a set of indexes (i.e. both memory indexes and disk indexes). + * - Documents can be inserted, updated or removed in/from the active memory index. + * - Enables search across all the indexes. + * - Manages the set of indexes through flush targets to the flush engine (i.e. flushing of memory + * indexes and fusion of disk indexes). + * + * Key items in this interface is <em>lid</em> which is a local document id assigned to each document. + * This is a numeric id in the range [0...Maximum number of documents concurrently in this DB>. + * Local document id 0 is reserved. Another key item is the <em>serialnumber</em> which is the + * serial number an operation was given when first seen by the searchcore. This is a monotonic + * increasing number used for sequencing operations and figuring out how up to date componets are. + * Used during restart/replay and for deciding when to flush. Both the lid and the serial number + * are persisted alongside an operation to ensure correct playback during recovery. + */ +class IIndexManager { +protected: + typedef document::Document Document; + typedef search::SerialNum SerialNum; + typedef search::index::Schema Schema; + +public: + using OnWriteDoneType = const std::shared_ptr<search::IDestructorCallback> &; + /** + * Interface used to signal when index manager has been reconfigured. + */ + struct Reconfigurer { + virtual ~Reconfigurer(); + /** + * Reconfigure index manager and infrastructure around it while system is in a quiescent state. + */ + virtual bool reconfigure(vespalib::Closure0<bool>::UP closure) = 0; + }; + + typedef std::unique_ptr<IIndexManager> UP; + typedef std::shared_ptr<IIndexManager> SP; + + virtual ~IIndexManager() {} + + /** + * Inserts a document into the index. This method is async, caller + * must either wait for notification about write done or sync + * indexFieldWriter executor in threading service to get sync + * behavior. + * + * If the inserted document id already exist the old version must + * be removed before inserting the new. + * + * @param lid The local document id for the document. + * + * @param doc The document to insert. + * + * @param serialNum The unique monotoninc increasing serial number + * for this operation. + **/ + virtual void putDocument(uint32_t lid, const Document &doc, SerialNum serialNum) = 0; + + /** + * Removes the given document from the index. This method is + * async, caller must either wait for notification about write + * done or sync indexFieldWriter executor in threading service to + * get sync behavior. + * + * @param lid The local document id for the document. + * + * @param serialNum The unique monotoninc increasing serial number + * for this operation. + **/ + virtual void removeDocument(uint32_t lid, SerialNum serialNum) = 0; + + /** + * Commits the document puts and removes since the last commit, + * making them searchable. This method is async, caller must + * either wait for notification about write done or sync + * indexFieldWriter executor in threading service to get sync + * behavior. + * + * @param serialNum The unique monotoninc increasing serial number + * for this operation. + * + * @param onWriteDone shared object that notifies write done when + * destructed. + **/ + virtual void commit(SerialNum serialNum, OnWriteDoneType onWriteDone) = 0; + + /** + * This method is called on a regular basis to update each component with what is the highest + * serial number for any component. This is for all components to be able to correctly tell its age. + * + * @param serialNum The serial number of the last known operation. + */ + virtual void heartBeat(SerialNum serialNum) = 0; + + /** + * Returns the current serial number of the index. + * This should also reflect any heart beats. + * + * @return current serial number of the component. + **/ + virtual SerialNum getCurrentSerialNum() const = 0; + + /** + * Returns the serial number of the last flushed index. + * + * @return the serial number of the last flushed index. + **/ + virtual SerialNum getFlushedSerialNum() const = 0; + + /** + * Returns the searchable that will give the correct search view of the index manager. + * Normally switched everytime underlying index structures are changed in a way that can not be + * handled in a thread safe way without locking. For instance flushing of memory index or + * starting using a new schema. + * + * @return the current searchable. + **/ + virtual IndexSearchable::SP getSearchable() const = 0; + + /** + * Returns searchable stats for this index manager. + * + * @return statistics gathered about underlying memory and disk indexes. + */ + virtual search::SearchableStats getSearchableStats() const = 0; + + /** + * Returns the list of all flush targets contained in this index manager. + * + * @return The list of flushable items in this component. + **/ + virtual IFlushTarget::List getFlushTargets() = 0; + + /** + * Sets the new schema and new fusion schema to be used by this index manager. + * The fusion schema is the union of the new schema and the history schema. + * The history schema keeps track of removed fields that have not been completely wiped yet. + * By using the fusion schema during fusion we ensure that removed fields are taken into the + * fusioned disk index to support the case where they are later re-applied. + * + * @param schema The new schema to start using. + * @param fusionSchema The new fusion schema to start using. + **/ + virtual void setSchema(const Schema &schema, const Schema &fusionSchema) = 0; + + /** + * Wipes remains of removed fields from this index manager as specified in the history schema. + * This can for instance be removing these fields from disk indexes. + * The default implementation does nothing. + * + * @param wipeSerial The serial number of this wipe operation. + * @param historyFields The schema specifying which fields we should wipe away. + **/ + virtual void wipeHistory(SerialNum wipeSerial, const Schema &historyFields); +}; + +} // namespace searchcorespi + diff --git a/searchcorespi/src/vespa/searchcorespi/index/imemoryindex.h b/searchcorespi/src/vespa/searchcorespi/index/imemoryindex.h new file mode 100644 index 00000000000..fabaf730cb7 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/imemoryindex.h @@ -0,0 +1,87 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/document/fieldvalue/document.h> +#include <vespa/searchcommon/common/schema.h> +#include <vespa/searchcorespi/index/indexsearchable.h> +#include <vespa/searchlib/common/serialnum.h> +#include <vespa/searchlib/util/memoryusage.h> +#include <vespa/vespalib/stllike/string.h> + +namespace search +{ + +class IDestructorCallback; + +} + +namespace searchcorespi { +namespace index { + +/** + * Interface for a memory index as seen from an index maintainer. + */ +struct IMemoryIndex : public searchcorespi::IndexSearchable { + typedef std::shared_ptr<IMemoryIndex> SP; + using OnWriteDoneType = + const std::shared_ptr<search::IDestructorCallback> &; + virtual ~IMemoryIndex() {} + + /** + * Returns true if this memory index has received any document insert operations. + */ + virtual bool hasReceivedDocumentInsert() const = 0; + + /** + * Returns the memory usage of this memory index. + */ + virtual search::MemoryUsage getMemoryUsage() const = 0; + + /** + * Returns the memory usage of an empty version of this memory index. + */ + virtual uint64_t getStaticMemoryFootprint() const = 0; + + /** + * Inserts the given document into this memory index. + * If the document already exists it should be removed first. + * + * @param lid the local document id. + * @param doc the document to insert. + */ + virtual void insertDocument(uint32_t lid, const document::Document &doc) = 0; + + /** + * Removes the given document from this memory index. + * + * @param lid the local document id. + */ + virtual void removeDocument(uint32_t lid) = 0; + + /** + * Commits the inserts and removes since the last commit, making them searchable. + **/ + virtual void commit(OnWriteDoneType onWriteDone) = 0; + + /** + * Flushes this memory index to disk as a disk index. + * After a flush it should be possible to load a IDiskIndex from the flush directory. + * Note that the schema used when constructing the memory index should be flushed as well + * since a IDiskIndex should be able to return the schema used by the disk index. + * + * @param flushDir the directory in which to save the flushed index. + * @param docIdLimit the largest local document id used + 1 + * @param serialNum the serial number of the last operation to the memory index. + */ + virtual void flushToDisk(const vespalib::string &flushDir, + uint32_t docIdLimit, + search::SerialNum serialNum) = 0; + + virtual void wipeHistory(const search::index::Schema &schema) = 0; + virtual search::index::Schema::SP getWipeTimeSchema() const = 0; +}; + +} // namespace index +} // namespace searchcorespi + + diff --git a/searchcorespi/src/vespa/searchcorespi/index/index_manager_explorer.cpp b/searchcorespi/src/vespa/searchcorespi/index/index_manager_explorer.cpp new file mode 100644 index 00000000000..f3695f2bd52 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/index_manager_explorer.cpp @@ -0,0 +1,28 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP(".searchcorespi.index.index_manager_explorer"); +#include "index_manager_explorer.h" + +#include <vespa/vespalib/data/slime/cursor.h> + +using vespalib::slime::Cursor; +using vespalib::slime::Inserter; + +namespace searchcorespi { + +IndexManagerExplorer::IndexManagerExplorer(IIndexManager::SP mgr) + : _mgr(std::move(mgr)) +{ +} + +void +IndexManagerExplorer::get_state(const Inserter &inserter, bool full) const +{ + (void) full; + Cursor &object = inserter.insertObject(); + object.setLong("lastSerialNum", _mgr->getCurrentSerialNum()); +} + +} // namespace searchcorespi diff --git a/searchcorespi/src/vespa/searchcorespi/index/index_manager_explorer.h b/searchcorespi/src/vespa/searchcorespi/index/index_manager_explorer.h new file mode 100644 index 00000000000..1f26d2e5c36 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/index_manager_explorer.h @@ -0,0 +1,25 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "iindexmanager.h" +#include <vespa/vespalib/net/state_explorer.h> + +namespace searchcorespi { + +/** + * Class used to explore the state of an index manager. + */ +class IndexManagerExplorer : public vespalib::StateExplorer +{ +private: + IIndexManager::SP _mgr; + +public: + IndexManagerExplorer(IIndexManager::SP mgr); + + // Implements vespalib::StateExplorer + virtual void get_state(const vespalib::slime::Inserter &inserter, bool full) const override; +}; + +} // namespace searchcorespi diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexcollection.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexcollection.cpp new file mode 100644 index 00000000000..0284c4a682e --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/indexcollection.cpp @@ -0,0 +1,223 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP(".searchcorespi.index.indexcollection"); + +#include "indexcollection.h" +#include <vespa/searchlib/queryeval/isourceselector.h> +#include <vespa/searchlib/queryeval/create_blueprint_visitor_helper.h> +#include <vespa/searchlib/queryeval/intermediate_blueprints.h> +#include <vespa/searchlib/queryeval/leaf_blueprints.h> + +using namespace search::queryeval; +using namespace search::query; +using search::attribute::IAttributeContext; + +namespace searchcorespi { + +IndexCollection::IndexCollection(const ISourceSelector::SP & selector) + : _source_selector(selector), + _sources() +{ +} + +IndexCollection::IndexCollection(const ISourceSelector::SP & selector, + const ISearchableIndexCollection &sources) + : _source_selector(selector), + _sources() +{ + for (size_t i(0), m(sources.getSourceCount()); i < m; i++) { + append(sources.getSourceId(i), sources.getSearchableSP(i)); + } + setCurrentIndex(sources.getCurrentIndex()); +} + +void +IndexCollection::setSource(uint32_t docId) +{ + assert( valid() ); + _source_selector->setSource(docId, getCurrentIndex()); +} + +ISearchableIndexCollection::UP +IndexCollection::replaceAndRenumber(const ISourceSelector::SP & selector, + const ISearchableIndexCollection &fsc, + uint32_t id_diff, + const IndexSearchable::SP &new_source) +{ + ISearchableIndexCollection::UP new_fsc(new IndexCollection(selector)); + new_fsc->append(0, new_source); + for (size_t i = 0; i < fsc.getSourceCount(); ++i) { + if (fsc.getSourceId(i) > id_diff) { + new_fsc->append(fsc.getSourceId(i) - id_diff, + fsc.getSearchableSP(i)); + } + } + return new_fsc; +} + +void +IndexCollection::append(uint32_t id, const IndexSearchable::SP &fs) +{ + _sources.push_back(SourceWithId(id, fs)); +} + +IndexSearchable::SP +IndexCollection::getSearchableSP(uint32_t i) const +{ + return _sources[i].source_wrapper; +} + +void +IndexCollection::replace(uint32_t id, const IndexSearchable::SP &fs) +{ + for (size_t i = 0; i < _sources.size(); ++i) { + if (_sources[i].id == id) { + _sources[i].source_wrapper = fs; + return; + } + } + LOG(warning, "Tried to replace Searchable %d, but it wasn't there.", id); + append(id, fs); +} + +const ISourceSelector & +IndexCollection::getSourceSelector() const +{ + return *_source_selector; +} + +size_t +IndexCollection::getSourceCount() const +{ + return _sources.size(); +} + +IndexSearchable & +IndexCollection::getSearchable(uint32_t i) const +{ + return *_sources[i].source_wrapper; +} + +uint32_t +IndexCollection::getSourceId(uint32_t i) const +{ + return _sources[i].id; +} + +search::SearchableStats +IndexCollection::getSearchableStats() const +{ + search::SearchableStats stats; + for (size_t i = 0; i < _sources.size(); ++i) { + stats.add(_sources[i].source_wrapper->getSearchableStats()); + } + return stats; +} + +namespace { + +struct Mixer { + const ISourceSelector &_selector; + std::unique_ptr<SourceBlenderBlueprint> _blender; + + Mixer(const ISourceSelector &selector) + : _selector(selector), _blender() {} + + void addIndex(Blueprint::UP index) { + if (_blender.get() == NULL) { + _blender.reset(new SourceBlenderBlueprint(_selector)); + } + _blender->addChild(std::move(index)); + } + + Blueprint::UP mix() { + if (_blender.get() == NULL) { + return Blueprint::UP(new EmptyBlueprint()); + } + return Blueprint::UP(_blender.release()); + } +}; + +class CreateBlueprintVisitor : public search::query::QueryVisitor { +private: + const IIndexCollection &_indexes; + const FieldSpecList &_fields; + const IAttributeContext &_attrCtx; + const IRequestContext &_requestContext; + Blueprint::UP _result; + + template <typename NodeType> + void visitTerm(NodeType &n) { + Mixer mixer(_indexes.getSourceSelector()); + for (size_t i = 0; i < _indexes.getSourceCount(); ++i) { + Blueprint::UP blueprint = _indexes.getSearchable(i).createBlueprint(_requestContext, _fields, n, _attrCtx); + blueprint->setSourceId(_indexes.getSourceId(i)); + mixer.addIndex(std::move(blueprint)); + } + _result = mixer.mix(); + } + + virtual void visit(And &) { } + virtual void visit(AndNot &) { } + virtual void visit(Or &) { } + virtual void visit(WeakAnd &) { } + virtual void visit(Equiv &) { } + virtual void visit(Rank &) { } + virtual void visit(Near &) { } + virtual void visit(ONear &) { } + + virtual void visit(WeightedSetTerm &n) { visitTerm(n); } + virtual void visit(DotProduct &n) { visitTerm(n); } + virtual void visit(WandTerm &n) { visitTerm(n); } + virtual void visit(Phrase &n) { visitTerm(n); } + virtual void visit(NumberTerm &n) { visitTerm(n); } + virtual void visit(LocationTerm &n) { visitTerm(n); } + virtual void visit(PrefixTerm &n) { visitTerm(n); } + virtual void visit(RangeTerm &n) { visitTerm(n); } + virtual void visit(StringTerm &n) { visitTerm(n); } + virtual void visit(SubstringTerm &n) { visitTerm(n); } + virtual void visit(SuffixTerm &n) { visitTerm(n); } + virtual void visit(PredicateQuery &n) { visitTerm(n); } + virtual void visit(RegExpTerm &n) { visitTerm(n); } + +public: + CreateBlueprintVisitor(const IIndexCollection &indexes, + const FieldSpecList &fields, + const IAttributeContext &attrCtx, + const IRequestContext & requestContext) + : _indexes(indexes), + _fields(fields), + _attrCtx(attrCtx), + _requestContext(requestContext), + _result() {} + + Blueprint::UP getResult() { return std::move(_result); } +}; + +} + +Blueprint::UP +IndexCollection::createBlueprint(const IRequestContext & requestContext, + const FieldSpec &field, + const Node &term, + const IAttributeContext &attrCtx) +{ + FieldSpecList fields; + fields.add(field); + return createBlueprint(requestContext, fields, term, attrCtx); +} + +Blueprint::UP +IndexCollection::createBlueprint(const IRequestContext & requestContext, + const FieldSpecList &fields, + const Node &term, + const IAttributeContext &attrCtx) +{ + CreateBlueprintVisitor visitor(*this, fields, attrCtx, requestContext); + const_cast<Node &>(term).accept(visitor); + return visitor.getResult(); +} + +} // namespace searchcorespi diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexcollection.h b/searchcorespi/src/vespa/searchcorespi/index/indexcollection.h new file mode 100644 index 00000000000..223b36fce99 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/indexcollection.h @@ -0,0 +1,73 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "isearchableindexcollection.h" +#include <vespa/searchlib/queryeval/isourceselector.h> +#include <vespa/searchlib/util/searchable_stats.h> +#include <memory> +#include <set> +#include <utility> +#include <vector> + +namespace searchcorespi { + +/** + * Holds a set of index searchables with source ids, and a source selector for + * determining which index to use for each document. + */ +class IndexCollection : public ISearchableIndexCollection +{ + struct SourceWithId { + uint32_t id; + IndexSearchable::SP source_wrapper; + + SourceWithId(uint32_t id_in, + const IndexSearchable::SP &source_in) + : id(id_in), source_wrapper(source_in) {} + SourceWithId() : id(0), source_wrapper() {} + }; + + // Selector shared across memory dumps, replaced on disk fusion operations + ISourceSelector::SP _source_selector; + std::vector<SourceWithId> _sources; + +public: + IndexCollection(const ISourceSelector::SP & selector); + IndexCollection(const ISourceSelector::SP & selector, + const ISearchableIndexCollection &sources); + + virtual void append(uint32_t id, const IndexSearchable::SP &source); + virtual void replace(uint32_t id, const IndexSearchable::SP &source); + virtual IndexSearchable::SP getSearchableSP(uint32_t i) const; + virtual void setSource(uint32_t docId); + + + // Implements IIndexCollection + virtual const ISourceSelector &getSourceSelector() const; + virtual size_t getSourceCount() const; + virtual IndexSearchable &getSearchable(uint32_t i) const; + virtual uint32_t getSourceId(uint32_t i) const; + + // Implements IndexSearchable + virtual Blueprint::UP + createBlueprint(const IRequestContext & requestContext, + const FieldSpec &field, + const Node &term, + const IAttributeContext &attrCtx); + virtual Blueprint::UP + createBlueprint(const IRequestContext & requestContext, + const FieldSpecList &fields, + const Node &term, + const IAttributeContext &attrCtx); + virtual search::SearchableStats getSearchableStats() const; + + static ISearchableIndexCollection::UP replaceAndRenumber( + const ISourceSelector::SP & selector, + const ISearchableIndexCollection &fsc, + uint32_t id_diff, + const IndexSearchable::SP &new_source); +}; + +} // namespace searchcorespi + diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexdisklayout.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexdisklayout.cpp new file mode 100644 index 00000000000..f886f7eb536 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/indexdisklayout.cpp @@ -0,0 +1,62 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP(".searchcorespi.index.indexdisklayout"); + +#include "indexdisklayout.h" +#include <sstream> + +namespace searchcorespi { +namespace index { + +const vespalib::string +IndexDiskLayout::FlushDirPrefix = vespalib::string("index.flush."); + +const vespalib::string +IndexDiskLayout::FusionDirPrefix = vespalib::string("index.fusion."); + +const vespalib::string +IndexDiskLayout::SerialNumTag = vespalib::string("Serial num"); + +IndexDiskLayout::IndexDiskLayout(const vespalib::string &baseDir) + : _baseDir(baseDir) +{ +} + +vespalib::string +IndexDiskLayout::getFlushDir(uint32_t sourceId) const +{ + std::ostringstream ost; + ost << _baseDir << "/" << FlushDirPrefix << sourceId; + return ost.str(); +} + +vespalib::string +IndexDiskLayout::getFusionDir(uint32_t sourceId) const +{ + std::ostringstream ost; + ost << _baseDir << "/" << FusionDirPrefix << sourceId; + return ost.str(); +} + +vespalib::string +IndexDiskLayout::getSerialNumFileName(const vespalib::string &dir) +{ + return dir + "/serial.dat"; +} + +vespalib::string +IndexDiskLayout::getSchemaFileName(const vespalib::string &dir) +{ + return dir + "/schema.txt"; +} + +vespalib::string +IndexDiskLayout::getSelectorFileName(const vespalib::string &dir) +{ + return dir + "/selector"; +} + +} // namespace index +} // namespace searchcorespi + diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexdisklayout.h b/searchcorespi/src/vespa/searchcorespi/index/indexdisklayout.h new file mode 100644 index 00000000000..0c010db2cc5 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/indexdisklayout.h @@ -0,0 +1,35 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/vespalib/stllike/string.h> + +namespace searchcorespi { +namespace index { + +/** + * Utility class used to get static aspects of the disk layout (i.e directory and file names) + * needed by the index maintainer. + */ +class IndexDiskLayout { +public: + static const vespalib::string FlushDirPrefix; + static const vespalib::string FusionDirPrefix; + static const vespalib::string SerialNumTag; + +private: + vespalib::string _baseDir; + +public: + IndexDiskLayout(const vespalib::string &baseDir); + vespalib::string getFlushDir(uint32_t sourceId) const; + vespalib::string getFusionDir(uint32_t sourceId) const; + + static vespalib::string getSerialNumFileName(const vespalib::string &dir); + static vespalib::string getSchemaFileName(const vespalib::string &dir); + static vespalib::string getSelectorFileName(const vespalib::string &dir); +}; + +} // namespace index +} // namespace searchcorespi + + diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.cpp new file mode 100644 index 00000000000..7b2088aeffe --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.cpp @@ -0,0 +1,86 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP(".searchcorespi.index.indexflushtarget"); + +#include "indexflushtarget.h" +#include <vespa/vespalib/util/closuretask.h> + +using vespalib::makeClosure; + +namespace searchcorespi { +namespace index { + +IndexFlushTarget::IndexFlushTarget(IndexMaintainer &indexMaintainer) + : IFlushTarget("memoryindex.flush", Type::FLUSH, Component::INDEX), + _indexMaintainer(indexMaintainer), + _flushStats(indexMaintainer.getFlushStats()), + _numFrozenMemoryIndexes(indexMaintainer.getNumFrozenMemoryIndexes()), + _maxFrozenMemoryIndexes(indexMaintainer.getMaxFrozenMemoryIndexes()), + _lastStats() +{ + _lastStats.setPathElementsToLog(7); +} + +IFlushTarget::MemoryGain +IndexFlushTarget::getApproxMemoryGain() const +{ + return MemoryGain(_flushStats.memory_before_bytes, _flushStats.memory_after_bytes); +} + +IFlushTarget::DiskGain +IndexFlushTarget::getApproxDiskGain() const +{ + return DiskGain(0, 0); +} + + +bool +IndexFlushTarget::needUrgentFlush(void) const +{ + bool urgent = _numFrozenMemoryIndexes > _maxFrozenMemoryIndexes; + SerialNum flushedSerial = _indexMaintainer.getFlushedSerialNum(); + LOG(debug, + "Num frozen: %" PRIu32 " Urgent: %d, flushedSerial=%" PRIu64, + _numFrozenMemoryIndexes, + static_cast<int>(urgent), + flushedSerial); + return urgent; +} + + +IFlushTarget::Time +IndexFlushTarget::getLastFlushTime() const +{ + return _indexMaintainer.getLastFlushTime(); +} + +IFlushTarget::SerialNum +IndexFlushTarget::getFlushedSerialNum() const +{ + return _indexMaintainer.getFlushedSerialNum(); +} + +IFlushTarget::Task::UP +IndexFlushTarget::initFlush(SerialNum serialNum) +{ + // the target must live until this task is done (handled by flush engine). + return _indexMaintainer.initFlush(serialNum, &_lastStats); +} + + +uint64_t +IndexFlushTarget::getApproxBytesToWriteToDisk() const +{ + MemoryGain gain(_flushStats.memory_before_bytes, + _flushStats.memory_after_bytes); + if (gain.getAfter() < gain.getBefore()) { + return gain.getBefore() - gain.getAfter(); + } else { + return 0; + } +} + + +} // namespace index +} // namespace searchcorespi diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.h b/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.h new file mode 100644 index 00000000000..fa4b2ad85fb --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.h @@ -0,0 +1,40 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/searchcorespi/flush/iflushtarget.h> +#include "indexmaintainer.h" + +namespace searchcorespi { +namespace index { + +/** + * Flush target for flushing a memory index in an IndexMaintainer. + **/ +class IndexFlushTarget : public IFlushTarget { +private: + IndexMaintainer &_indexMaintainer; + IndexMaintainer::FlushStats _flushStats; + uint32_t _numFrozenMemoryIndexes; + uint32_t _maxFrozenMemoryIndexes; + FlushStats _lastStats; + +public: + IndexFlushTarget(IndexMaintainer &indexMaintainer); + + // Implements IFlushTarget + virtual MemoryGain getApproxMemoryGain() const; + virtual DiskGain getApproxDiskGain() const; + virtual SerialNum getFlushedSerialNum() const; + virtual Time getLastFlushTime() const; + + virtual bool + needUrgentFlush() const; + + virtual Task::UP initFlush(SerialNum currentSerial); + virtual FlushStats getLastFlushStats() const { return _lastStats; } + virtual uint64_t getApproxBytesToWriteToDisk() const override; +}; + +} // namespace index +} // namespace searchcorespi + diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.cpp new file mode 100644 index 00000000000..412c5b8c4a2 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.cpp @@ -0,0 +1,107 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP(".searchcorespi.index.indexfusiontarget"); + +#include "indexfusiontarget.h" +#include "fusionspec.h" + +namespace searchcorespi { +namespace index { + +using search::SerialNum; +namespace { + +class Fusioner : public FlushTask { +private: + IndexMaintainer &_indexMaintainer; + FlushStats &_stats; + SerialNum _serialNum; +public: + Fusioner(IndexMaintainer &indexMaintainer, FlushStats &stats, SerialNum serialNum) : + _indexMaintainer(indexMaintainer), _stats(stats), _serialNum(serialNum) {} + virtual void run() { + vespalib::string outputFusionDir = _indexMaintainer.doFusion(_serialNum); + // the target must live until this task is done (handled by flush engine). + _stats.setPath(outputFusionDir); + } + + virtual SerialNum + getFlushSerial(void) const + { + return 0u; // Zero means that no tls syncing is needed + } +}; + +} +IndexFusionTarget::IndexFusionTarget(IndexMaintainer &indexMaintainer) + : IFlushTarget("memoryindex.fusion", Type::GC, Component::INDEX), + _indexMaintainer(indexMaintainer), + _fusionStats(indexMaintainer.getFusionStats()), + _lastStats() +{ + _lastStats.setPathElementsToLog(7); + LOG(debug, "New target, Num flushed: %d, Disk usage: %" PRIu64, _fusionStats.numUnfused, _fusionStats.diskUsage); +} + +IFlushTarget::MemoryGain +IndexFusionTarget::getApproxMemoryGain() const +{ + return MemoryGain(0, 0); +} + +IFlushTarget::DiskGain +IndexFusionTarget::getApproxDiskGain() const +{ + uint64_t diskUsageBefore = _fusionStats.diskUsage; + uint64_t diskUsageGain = + static_cast<uint64_t>((0.1 * + (diskUsageBefore * + std::max(0, + static_cast<int> + (_fusionStats.numUnfused - 1) + )))); + diskUsageGain = std::min(diskUsageGain, diskUsageBefore); + if (!_fusionStats._canRunFusion) + diskUsageGain = 0; + return DiskGain(diskUsageBefore, diskUsageBefore - diskUsageGain); +} + +bool +IndexFusionTarget::needUrgentFlush() const +{ + bool urgent = _fusionStats.numUnfused > _fusionStats.maxFlushed && + _fusionStats._canRunFusion; + LOG(debug, "Num flushed: %d Urgent: %d", _fusionStats.numUnfused, urgent); + return urgent; +} + +IFlushTarget::Time +IndexFusionTarget::getLastFlushTime() const +{ + return fastos::ClockSystem::now(); +} + +IFlushTarget::SerialNum +IndexFusionTarget::getFlushedSerialNum() const +{ + // Lack of fusion operation doesn't prevent transaction log + // pruning. + return _indexMaintainer.getCurrentSerialNum(); +} + +IFlushTarget::Task::UP +IndexFusionTarget::initFlush(SerialNum serialNum) +{ + return Task::UP(new Fusioner(_indexMaintainer, _lastStats, serialNum)); +} + +uint64_t +IndexFusionTarget::getApproxBytesToWriteToDisk() const +{ + return _fusionStats.diskUsage; +} + + +} // namespace index +} // namespace searchcorespi diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.h b/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.h new file mode 100644 index 00000000000..6b67a150cec --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.h @@ -0,0 +1,36 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/searchcorespi/flush/iflushtarget.h> +#include "indexmaintainer.h" + +namespace searchcorespi { +namespace index { + +/** + * Flush target for doing fusion on disk indexes in an IndexMaintainer. + **/ +class IndexFusionTarget : public IFlushTarget { +private: + IndexMaintainer &_indexMaintainer; + IndexMaintainer::FusionStats _fusionStats; + FlushStats _lastStats; + +public: + IndexFusionTarget(IndexMaintainer &indexMaintainer); + + // Implements IFlushTarget + virtual MemoryGain getApproxMemoryGain() const; + virtual DiskGain getApproxDiskGain() const; + virtual SerialNum getFlushedSerialNum() const; + virtual Time getLastFlushTime() const; + virtual bool needUrgentFlush() const; + + virtual Task::UP initFlush(SerialNum currentSerial); + virtual FlushStats getLastFlushStats() const { return _lastStats; } + virtual uint64_t getApproxBytesToWriteToDisk() const override; +}; + +} // namespace index +} // namespace searchcorespi + diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp new file mode 100644 index 00000000000..c8c7fd180a9 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp @@ -0,0 +1,1238 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP(".searchcorespi.index.indexmaintainer"); + +#include "indexmaintainer.h" +#include "diskindexcleaner.h" +#include "eventlogger.h" +#include "fusionrunner.h" +#include "indexflushtarget.h" +#include "indexfusiontarget.h" +#include "indexreadutilities.h" +#include "indexwriteutilities.h" +#include <vespa/searchlib/common/serialnumfileheadercontext.h> +#include <vespa/searchlib/attribute/fixedsourceselector.h> +#include <vespa/searchlib/common/fileheadercontext.h> +#include <vespa/searchlib/queryeval/isourceselector.h> +#include <vespa/searchlib/util/dirtraverse.h> +#include <vespa/searchlib/util/filekit.h> +#include <vespa/vespalib/util/autoclosurecaller.h> +#include <vespa/vespalib/util/closure.h> +#include <vespa/vespalib/util/closuretask.h> +#include <sstream> +#include <vector> +#include <vespa/searchcorespi/flush/closureflushtask.h> + +using document::Document; +using search::FixedSourceSelector; +using search::TuneFileAttributes; +using search::index::Schema; +using search::common::FileHeaderContext; +using search::queryeval::ISourceSelector; +using search::queryeval::Source; +using search::SerialNum; +using std::ostringstream; +using vespalib::makeClosure; +using vespalib::makeTask; +using vespalib::string; +using vespalib::Closure0; +using vespalib::Executor; +using vespalib::LockGuard; +using vespalib::Runnable; + +namespace searchcorespi { +namespace index { + +namespace +{ + +class ReconfigRunnable : public Runnable +{ +public: + bool &_result; + IIndexManager::Reconfigurer &_reconfigurer; + Closure0<bool>::UP _closure; + + ReconfigRunnable(bool &result, + IIndexManager::Reconfigurer &reconfigurer, + Closure0<bool>::UP closure) + : _result(result), + _reconfigurer(reconfigurer), + _closure(std::move(closure)) + { } + + virtual void run() { + _result = _reconfigurer.reconfigure(std::move(_closure)); + } +}; + +class ReconfigRunnableTask : public Executor::Task { +private: + IIndexManager::Reconfigurer &_reconfigurer; + Closure0<bool>::UP _closure; +public: + ReconfigRunnableTask(IIndexManager::Reconfigurer &reconfigurer, Closure0<bool>::UP closure) : + _reconfigurer(reconfigurer), + _closure(std::move(closure)) + { } + virtual void run() { + _reconfigurer.reconfigure(std::move(_closure)); + } +}; + +SerialNum noSerialNumHigh = std::numeric_limits<SerialNum>::max(); + + +class DiskIndexWithDestructorClosure : public IDiskIndex { +private: + vespalib::AutoClosureCaller _caller; + IDiskIndex::SP _index; + +public: + DiskIndexWithDestructorClosure(const IDiskIndex::SP &index, + vespalib::Closure::UP closure) + : _caller(std::move(closure)), + _index(index) + { } + const IDiskIndex &getWrapped() const { return *_index; } + + /** + * Implements searchcorespi::IndexSearchable + */ + virtual Blueprint::UP + createBlueprint(const IRequestContext & requestContext, + const FieldSpec &field, + const Node &term, + const IAttributeContext &attrCtx) + { + return _index->createBlueprint(requestContext, field, term, attrCtx); + } + virtual Blueprint::UP + createBlueprint(const IRequestContext & requestContext, + const FieldSpecList &fields, + const Node &term, + const IAttributeContext &attrCtx) + { + return _index->createBlueprint(requestContext, fields, term, attrCtx); + } + virtual search::SearchableStats getSearchableStats() const { return _index->getSearchableStats(); } + + /** + * Implements IDiskIndex + */ + virtual const vespalib::string &getIndexDir() const { return _index->getIndexDir(); } + virtual const search::index::Schema &getSchema() const { return _index->getSchema(); } + +}; + +} // namespace + +uint32_t +IndexMaintainer::getNewAbsoluteId() +{ + return _next_id++; +} + +string +IndexMaintainer::getFlushDir(uint32_t sourceId) const +{ + return _layout.getFlushDir(sourceId); +} + +string +IndexMaintainer::getFusionDir(uint32_t sourceId) const +{ + return _layout.getFusionDir(sourceId); +} + +bool +IndexMaintainer::reopenDiskIndexes(ISearchableIndexCollection &coll) +{ + bool hasReopenedAnything(false); + assert(_ctx.getThreadingService().master().isCurrentThread()); + uint32_t count = coll.getSourceCount(); + for (uint32_t i = 0; i < count; ++i) { + IndexSearchable &is = coll.getSearchable(i); + const DiskIndexWithDestructorClosure *const d = + dynamic_cast<const DiskIndexWithDestructorClosure *>(&is); + if (d == NULL) { + continue; // not a disk index + } + const string indexDir = d->getIndexDir(); + vespalib::string schemaName = IndexDiskLayout::getSchemaFileName(indexDir); + Schema oldSchema; + if (!oldSchema.loadFromFile(schemaName)) { + LOG(error, "Could not open schema '%s'", schemaName.c_str()); + } + if (oldSchema != d->getSchema()) { + IDiskIndex::SP newIndex(reloadDiskIndex(*d)); + coll.replace(coll.getSourceId(i), newIndex); + hasReopenedAnything = true; + } + } + return hasReopenedAnything; +} + +void +IndexMaintainer::updateDiskIndexSchema(const vespalib::string &indexDir, + const Schema &schema, + SerialNum wipeSerial) +{ + // Called by a flush worker thread OR document db executor thread + LockGuard lock(_schemaUpdateLock); + IndexWriteUtilities::updateDiskIndexSchema(indexDir, schema, wipeSerial); +} + +void +IndexMaintainer::updateIndexSchemas(IIndexCollection &coll, + const Schema &schema, + SerialNum wipeSerial) +{ + assert(_ctx.getThreadingService().master().isCurrentThread()); + uint32_t count = coll.getSourceCount(); + for (uint32_t i = 0; i < count; ++i) { + IndexSearchable &is = coll.getSearchable(i); + const DiskIndexWithDestructorClosure *const d = + dynamic_cast<const DiskIndexWithDestructorClosure *>(&is); + if (d == NULL) { + IMemoryIndex *const m = dynamic_cast<IMemoryIndex *>(&is); + if (m != NULL) { + m->wipeHistory(schema); + } + continue; + } + updateDiskIndexSchema(d->getIndexDir(), schema, wipeSerial); + } +} + +void +IndexMaintainer::updateActiveFusionWipeTimeSchema(const Schema &schema) +{ + assert(_ctx.getThreadingService().master().isCurrentThread()); + for (;;) { + Schema::SP activeFusionSchema; + Schema::SP activeFusionWipeTimeSchema; + Schema::SP newActiveFusionWipeTimeSchema; + { + LockGuard lock(_state_lock); + activeFusionSchema = _activeFusionSchema; + activeFusionWipeTimeSchema = _activeFusionWipeTimeSchema; + } + if (activeFusionSchema.get() == NULL) + return; // No active fusion + if (activeFusionWipeTimeSchema.get() == NULL) { + Schema::UP newSchema = Schema::intersect(*activeFusionSchema, schema); + newActiveFusionWipeTimeSchema.reset(newSchema.release()); + } else { + Schema::UP newSchema = Schema::intersect(*activeFusionWipeTimeSchema, schema); + newActiveFusionWipeTimeSchema.reset(newSchema.release()); + } + { + LockGuard slock(_state_lock); + LockGuard ilock(_index_update_lock); + if (activeFusionSchema.get() == _activeFusionSchema.get() && + activeFusionWipeTimeSchema.get() == _activeFusionWipeTimeSchema.get()) + { + _activeFusionWipeTimeSchema = newActiveFusionWipeTimeSchema; + break; + } + } + } +} + +void +IndexMaintainer::deactivateDiskIndexes(vespalib::string indexDir) +{ + _active_indexes->notActive(indexDir); + removeOldDiskIndexes(); +} + +IDiskIndex::SP +IndexMaintainer::loadDiskIndex(const string &indexDir) +{ + // Called by a flush worker thread OR CTOR (in document db init executor thread) + if (LOG_WOULD_LOG(event)) { + EventLogger::diskIndexLoadStart(indexDir); + } + FastOS_Time timer; + timer.SetNow(); + _active_indexes->setActive(indexDir); + IDiskIndex::SP retval(new DiskIndexWithDestructorClosure + (_operations.loadDiskIndex(indexDir), + makeClosure(this, &IndexMaintainer::deactivateDiskIndexes, indexDir))); + if (LOG_WOULD_LOG(event)) { + EventLogger::diskIndexLoadComplete(indexDir, (int64_t)timer.MilliSecsToNow()); + } + return retval; +} + +IDiskIndex::SP +IndexMaintainer::reloadDiskIndex(const IDiskIndex &oldIndex) +{ + // Called by a flush worker thread OR document db executor thread + const string indexDir = oldIndex.getIndexDir(); + if (LOG_WOULD_LOG(event)) { + EventLogger::diskIndexLoadStart(indexDir); + } + FastOS_Time timer; + timer.SetNow(); + _active_indexes->setActive(indexDir); + const IDiskIndex &wrappedDiskIndex = + (dynamic_cast<const DiskIndexWithDestructorClosure &>(oldIndex)).getWrapped(); + IDiskIndex::SP retval(new DiskIndexWithDestructorClosure + (_operations.reloadDiskIndex(wrappedDiskIndex), + makeClosure(this, &IndexMaintainer::deactivateDiskIndexes, indexDir))); + if (LOG_WOULD_LOG(event)) { + EventLogger::diskIndexLoadComplete(indexDir, (int64_t)timer.MilliSecsToNow()); + } + return retval; +} + +IDiskIndex::SP +IndexMaintainer::flushMemoryIndex(IMemoryIndex &memoryIndex, + uint32_t indexId, + uint32_t docIdLimit, + SerialNum serialNum) +{ + // Called by a flush worker thread + const string flushDir = getFlushDir(indexId); + memoryIndex.flushToDisk(flushDir, docIdLimit, serialNum); + Schema::SP wtSchema(memoryIndex.getWipeTimeSchema()); + if (wtSchema.get() != NULL) { + updateDiskIndexSchema(flushDir, *wtSchema, noSerialNumHigh); + } + return loadDiskIndex(flushDir); +} + +ISearchableIndexCollection::UP +IndexMaintainer::loadDiskIndexes(const FusionSpec &spec, ISearchableIndexCollection::UP sourceList) +{ + // Called by CTOR (in document db init executor thread) + uint32_t fusion_id = spec.last_fusion_id; + if (fusion_id != 0) { + sourceList->append(0, loadDiskIndex(getFusionDir(fusion_id))); + } + for (size_t i = 0; i < spec.flush_ids.size(); ++i) { + const uint32_t id = spec.flush_ids[i]; + const uint32_t relative_id = id - fusion_id; + sourceList->append(relative_id, loadDiskIndex(getFlushDir(id))); + } + return sourceList; +} + +namespace { + +ISearchableIndexCollection::SP +getLeaf(const LockGuard &newSearchLock, const ISearchableIndexCollection::SP & is, bool warn=false) +{ + if (dynamic_cast<const WarmupIndexCollection *>(is.get()) != NULL) { + if (warn) { + LOG(info, "Already warming up an index '%s'. Start using it immediately." + " This is an indication that you have configured your warmup interval too long.", + is->toString().c_str()); + } + const WarmupIndexCollection & wic(dynamic_cast<const WarmupIndexCollection &>(*is)); + return getLeaf(newSearchLock, wic.getNextIndexCollection(), warn); + } else { + return is; + } +} + +} + +/* + * Caller must hold _state_lock (SL). + */ +void +IndexMaintainer::replaceSource(uint32_t sourceId, const IndexSearchable::SP &source) +{ + assert(_ctx.getThreadingService().master().isCurrentThread()); + LockGuard lock(_new_search_lock); + ISearchableIndexCollection::UP indexes = createNewSourceCollection(lock); + indexes->replace(sourceId, source); + swapInNewIndex(lock, std::move(indexes), *source); +} + +/* + * Caller must hold _state_lock (SL) and _new_search_lock (NSL), the latter + * passed as guard. + */ +void +IndexMaintainer::swapInNewIndex(LockGuard & guard, + ISearchableIndexCollection::SP indexes, + IndexSearchable & source) +{ + assert(indexes->valid()); + (void) guard; + if (_diskIndexWarmupTime > 0) { + if (dynamic_cast<const IDiskIndex *>(&source) != NULL) { + LOG(debug, "Warming up a disk index."); + indexes = std::make_shared<WarmupIndexCollection> + (_diskIndexWarmupTime, + getLeaf(guard, _source_list, true), + indexes, + static_cast<IDiskIndex &>(source), + _ctx.getWarmupExecutor(), + *this); + } else { + LOG(debug, "No warmup needed as it is a memory index that is mapped in."); + } + } + LOG(debug, "Replacing indexcollection :\n%s\nwith\n%s", _source_list->toString().c_str(), indexes->toString().c_str()); + assert(indexes->valid()); + _source_list = std::move(indexes); +} + +/* + * Caller must hold _state_lock (SL). + */ +void +IndexMaintainer::appendSource(uint32_t sourceId, const IndexSearchable::SP &source) +{ + assert(_ctx.getThreadingService().master().isCurrentThread()); + LockGuard lock(_new_search_lock); + ISearchableIndexCollection::UP indexes = createNewSourceCollection(lock); + indexes->append(sourceId, source); + swapInNewIndex(lock, std::move(indexes), *source); +} + +ISearchableIndexCollection::UP +IndexMaintainer::createNewSourceCollection(const LockGuard &newSearchLock) +{ + ISearchableIndexCollection::SP currentLeaf(getLeaf(newSearchLock, _source_list)); + return ISearchableIndexCollection::UP(new IndexCollection(_selector, *currentLeaf)); +} + +bool +IndexMaintainer::doneInitFlush(FlushArgs *args, IMemoryIndex::SP *new_index) +{ + // Called by initFlush via reconfigurer + assert(_ctx.getThreadingService().master().isCurrentThread()); + LockGuard state_lock(_state_lock); + args->old_index = _current_index; + args->old_absolute_id = _current_index_id + _last_fusion_id; + args->old_source_list = _source_list; + string selector_name = IndexDiskLayout::getSelectorFileName(getFlushDir(args->old_absolute_id)); + args->flush_serial_num = _current_serial_num; + { + LockGuard lock(_index_update_lock); + // Handover of extra memory indexes to flush + args->_extraIndexes = _frozenMemoryIndexes; + _frozenMemoryIndexes.clear(); + } + + LOG(debug, "Flushing. Id = %u. Serial num = %llu", + args->old_absolute_id, (unsigned long long) args->flush_serial_num); + { + LockGuard lock(_index_update_lock); + if (!_current_index->hasReceivedDocumentInsert() && + _source_selector_changes == 0) { + args->_skippedEmptyLast = true; // Skip flush of empty memory index + } + + if (!args->_skippedEmptyLast) { + // Keep on using same source selector with extended valid range + args->save_info = getSourceSelector().extractSaveInfo(selector_name); + // XXX: Overflow issue in source selector + _current_index_id = getNewAbsoluteId() - _last_fusion_id; + assert(_current_index_id < ISourceSelector::SOURCE_LIMIT); + _source_selector_changes = 0; + } + _current_index = *new_index; + } + if (args->_skippedEmptyLast) { + replaceSource(_current_index_id, _current_index); + } else { + appendSource(_current_index_id, _current_index); + } + _source_list->setCurrentIndex(_current_index_id); + return true; +} + +void +IndexMaintainer::doFlush(FlushArgs args) +{ + // Called by a flush worker thread + FlushIds flushIds; // Absolute ids of flushed indexes + + flushFrozenMemoryIndexes(args, flushIds); + + if (!args._skippedEmptyLast) { + flushLastMemoryIndex(args, flushIds); + } + + assert(!flushIds.empty()); + if (args.stats != NULL) { + updateFlushStats(args); + } + + scheduleFusion(flushIds); +} + +void +IndexMaintainer::flushFrozenMemoryIndexes(FlushArgs &args, FlushIds &flushIds) +{ + // Called by a flush worker thread + for (FrozenMemoryIndexRef & frozen : args._extraIndexes) { + assert(frozen._absoluteId < args.old_absolute_id); + assert(flushIds.empty() || flushIds.back() < frozen._absoluteId); + + FlushArgs eArgs; + eArgs.old_index = frozen._index; + eArgs.flush_serial_num = frozen._serialNum; + eArgs.old_absolute_id = frozen._absoluteId; + const uint32_t docIdLimit = frozen._saveInfo->getHeader()._docIdLimit; + + flushMemoryIndex(eArgs, docIdLimit, *frozen._saveInfo, flushIds); + + // Drop references to old memory index and old save info. + frozen._index.reset(); + frozen._saveInfo.reset(); + } +} + +void +IndexMaintainer::flushLastMemoryIndex(FlushArgs &args, FlushIds &flushIds) +{ + // Called by a flush worker thread + const uint32_t docIdLimit = args.save_info->getHeader()._docIdLimit; + flushMemoryIndex(args, docIdLimit, *args.save_info, flushIds); +} + +void +IndexMaintainer::updateFlushStats(const FlushArgs &args) +{ + // Called by a flush worker thread + vespalib::string flushDir; + if (!args._skippedEmptyLast) { + flushDir = getFlushDir(args.old_absolute_id); + } else { + assert(!args._extraIndexes.empty()); + flushDir = getFlushDir(args._extraIndexes.back()._absoluteId); + } + args.stats->setPath(flushDir); +} + +void +IndexMaintainer::flushMemoryIndex(FlushArgs &args, + uint32_t docIdLimit, + FixedSourceSelector::SaveInfo &saveInfo, + FlushIds &flushIds) +{ + // Called by a flush worker thread + ChangeGens changeGens = getChangeGens(); + IMemoryIndex &memoryIndex = *args.old_index; + Schema::SP wtSchema = memoryIndex.getWipeTimeSchema(); + IDiskIndex::SP diskIndex = flushMemoryIndex(memoryIndex, args.old_absolute_id, + docIdLimit, args.flush_serial_num); + IndexWriteUtilities::writeSourceSelector(saveInfo, args.old_absolute_id, + getAttrTune(), _ctx.getFileHeaderContext(), + args.flush_serial_num); + IndexWriteUtilities::writeSerialNum(args.flush_serial_num, + getFlushDir(args.old_absolute_id), + _ctx.getFileHeaderContext()); + + // Post processing after memory index has been written to disk and + // opened as disk index. + args._changeGens = changeGens; + args._wtSchema = wtSchema; + reconfigureAfterFlush(args, diskIndex); + + flushIds.push_back(args.old_absolute_id); +} + + +void +IndexMaintainer::reconfigureAfterFlush(FlushArgs &args, IDiskIndex::SP &diskIndex) +{ + // Called by a flush worker thread + for (;;) { + // Call reconfig closure for this change + Closure0<bool>::UP closure(makeClosure(this, &IndexMaintainer::doneFlush, + &args, &diskIndex)); + if (reconfigure(std::move(closure))) { + return; + } + ChangeGens changeGens = getChangeGens(); + Schema::SP wtSchema = args.old_index->getWipeTimeSchema(); + const string indexDir = getFlushDir(args.old_absolute_id); + if (wtSchema.get() != NULL) { + updateDiskIndexSchema(indexDir, *wtSchema, noSerialNumHigh); + } + IDiskIndex::SP reloadedDiskIndex = reloadDiskIndex(*diskIndex); + diskIndex = reloadedDiskIndex; + args._changeGens = changeGens; + args._wtSchema = wtSchema; + } +} + + +bool +IndexMaintainer::doneFlush(FlushArgs *args, IDiskIndex::SP *disk_index) { + // Called by doFlush via reconfigurer + assert(_ctx.getThreadingService().master().isCurrentThread()); + LockGuard state_lock(_state_lock); + IMemoryIndex &memoryIndex = *args->old_index; + if (args->_changeGens != getChangeGens()) { + return false; // Must retry operation + } + if (args->_wtSchema.get() != memoryIndex.getWipeTimeSchema().get()) { + return false; // Must retry operation + } + _flush_serial_num = std::max(_flush_serial_num, args->flush_serial_num); + fastos::TimeStamp timeStamp = search::FileKit::getModificationTime((*disk_index)->getIndexDir()); + _lastFlushTime = timeStamp.time() > _lastFlushTime.time() ? timeStamp : _lastFlushTime; + const uint32_t old_id = args->old_absolute_id - _last_fusion_id; + replaceSource(old_id, *disk_index); + return true; +} + +void +IndexMaintainer::scheduleFusion(const FlushIds &flushIds) +{ + // Called by a flush worker thread + LOG(debug, "Scheduled fusion for id %u.", flushIds.back()); + LockGuard guard(_fusion_lock); + for (uint32_t id : flushIds) { + _fusion_spec.flush_ids.push_back(id); + } +} + +bool +IndexMaintainer::canRunFusion(const FusionSpec &spec) const +{ + return spec.flush_ids.size() > 1 || + (spec.flush_ids.size() > 0 && spec.last_fusion_id != 0); +} + +bool +IndexMaintainer::doneFusion(FusionArgs *args, IDiskIndex::SP *new_index) +{ + // Called by runFusion via reconfigurer + assert(_ctx.getThreadingService().master().isCurrentThread()); + LockGuard state_lock(_state_lock); + if (args->_changeGens != getChangeGens()) { + return false; // Must retry operation + } + if (args->_wtSchema.get() != getActiveFusionWipeTimeSchema().get()) { + return false; // Must retry operation + } + args->_old_source_list = _source_list; // delays destruction + uint32_t id_diff = args->_new_fusion_id - _last_fusion_id; + ostringstream ost; + ost << "sourceselector_fusion(" << args->_new_fusion_id << ")"; + { + LockGuard lock(_index_update_lock); + + // make new source selector with shifted values. + _selector.reset(getSourceSelector().cloneAndSubtract(ost.str(), id_diff).release()); + _source_selector_changes = 0; + _current_index_id -= id_diff; + _last_fusion_id = args->_new_fusion_id; + _selector->setBaseId(_last_fusion_id); + _activeFusionSchema.reset(); + _activeFusionWipeTimeSchema.reset(); + } + + ISearchableIndexCollection::SP currentLeaf; + { + LockGuard lock(_new_search_lock); + currentLeaf = getLeaf(lock, _source_list); + } + ISearchableIndexCollection::UP fsc = + IndexCollection::replaceAndRenumber(_selector, *currentLeaf, id_diff, *new_index); + fsc->setCurrentIndex(_current_index_id); + + { + LockGuard lock(_new_search_lock); + swapInNewIndex(lock, std::move(fsc), **new_index); + } + return true; +} + +bool +IndexMaintainer::makeSureAllRemainingWarmupIsDone(ISearchableIndexCollection::SP keepAlive) +{ + // called by warmupDone via reconfigurer, warmupDone() doesn't wait for us + assert(_ctx.getThreadingService().master().isCurrentThread()); + ISearchableIndexCollection::SP warmIndex; + { + LockGuard state_lock(_state_lock); + if (keepAlive.get() == _source_list.get()) { + LockGuard lock(_new_search_lock); + warmIndex = (getLeaf(lock, _source_list, false)); + _source_list = warmIndex; + } + } + if (warmIndex) { + LOG(info, "New index warmed up and switched in : %s", warmIndex->toString().c_str()); + } + LOG(info, "Sync warmupExecutor."); + _ctx.getWarmupExecutor().sync(); + LOG(info, "Now the keep alive of the warmupindexcollection should be gone."); + return true; +} + +void +IndexMaintainer::warmupDone(ISearchableIndexCollection::SP current) +{ + // Called by a search thread + LockGuard lock(_new_search_lock); + if (current.get() == _source_list.get()) { + auto makeSure = makeClosure(this, &IndexMaintainer::makeSureAllRemainingWarmupIsDone, current); + Executor::Task::UP task(new ReconfigRunnableTask(_ctx.getReconfigurer(), std::move(makeSure))); + _ctx.getThreadingService().master().execute(std::move(task)); + } else { + LOG(warning, "There has arrived a new IndexCollection while replacing the active index. " + "It can theoretically happen, but not very likely, so logging this as a warning."); + } +} + + +void +IndexMaintainer::doneSetSchema(SetSchemaArgs &args, IMemoryIndex::SP &newIndex) +{ + assert(_ctx.getThreadingService().master().isCurrentThread()); // with idle index executor + LockGuard state_lock(_state_lock); + typedef FixedSourceSelector::SaveInfo SaveInfo; + args._oldSchema = _schema; // Delay destruction + args._oldFusionSchema = _fusionSchema; + args._oldIndex = _current_index; // Delay destruction + args._oldSourceList = _source_list; // Delay destruction + uint32_t oldAbsoluteId = _current_index_id + _last_fusion_id; + string selectorName = IndexDiskLayout::getSelectorFileName(getFlushDir(oldAbsoluteId)); + SerialNum freezeSerialNum = _current_serial_num; + bool dropEmptyLast = false; + SaveInfo::UP saveInfo; + + LOG(info, "Making new schema. Id = %u. Serial num = %llu", oldAbsoluteId, (unsigned long long) freezeSerialNum); + { + LockGuard lock(_index_update_lock); + _schema = args._newSchema; + _fusionSchema = args._newFusionSchema; + if (!_current_index->hasReceivedDocumentInsert()) { + dropEmptyLast = true; // Skip flush of empty memory index + } + + if (!dropEmptyLast) { + // Keep on using same source selector with extended valid range + saveInfo = getSourceSelector().extractSaveInfo(selectorName); + // XXX: Overflow issue in source selector + _current_index_id = getNewAbsoluteId() - _last_fusion_id; + assert(_current_index_id < ISourceSelector::SOURCE_LIMIT); + // Extra index to flush next time flushing is performed + _frozenMemoryIndexes.emplace_back(args._oldIndex, freezeSerialNum, std::move(saveInfo), oldAbsoluteId); + } + _current_index = newIndex; + } + if (dropEmptyLast) { + replaceSource(_current_index_id, _current_index); + } else { + appendSource(_current_index_id, _current_index); + } + _source_list->setCurrentIndex(_current_index_id); +} + + +bool +IndexMaintainer::doneWipeHistory(WipeHistoryArgs &args) +{ + assert(_ctx.getThreadingService().master().isCurrentThread()); // with idle index executor + LockGuard state_lock(_state_lock); + LockGuard lock(_new_search_lock); + if (args._old_source_list.get() != _source_list.get()) { + return false; // Flush or fusion had started/completed, must retry + } + _source_list = args._new_source_list; + return true; +} + + +Schema +IndexMaintainer::getSchema(void) const +{ + LockGuard lock(_index_update_lock); + return _schema; +} + +Schema::SP +IndexMaintainer::getActiveFusionWipeTimeSchema(void) const +{ + LockGuard lock(_index_update_lock); + return _activeFusionWipeTimeSchema; +} + +TuneFileAttributes +IndexMaintainer::getAttrTune(void) +{ + return _tuneFileAttributes; +} + +IndexMaintainer::ChangeGens +IndexMaintainer::getChangeGens(void) +{ + LockGuard lock(_index_update_lock); + return _changeGens; +} + +bool +IndexMaintainer::reconfigure(Closure0<bool>::UP closure) +{ + // Called by a flush engine worker thread + bool result = false; + ReconfigRunnable runnable(result, _ctx.getReconfigurer(), std::move(closure)); + _ctx.getThreadingService().master().run(runnable); + return result; +} + +IndexMaintainer::IndexMaintainer(const IndexMaintainerConfig &config, + const IndexMaintainerContext &ctx, + IIndexMaintainerOperations &operations) + : _base_dir(config.getBaseDir()), + _diskIndexWarmupTime(config.getDiskIndexWarmupTime()), + _active_indexes(new ActiveDiskIndexes()), + _layout(config.getBaseDir()), + _schema(config.getSchema()), + _fusionSchema(config.getFusionSchema()), + _activeFusionSchema(), + _activeFusionWipeTimeSchema(), + _source_selector_changes(0), + _selector(), + _source_list(), + _last_fusion_id(), + _next_id(), + _current_index_id(), + _current_index(operations.createMemoryIndex(_schema)), + _current_serial_num(0), + _flush_serial_num(0), + _lastFlushTime(), + _frozenMemoryIndexes(), + _state_lock(), + _index_update_lock(), + _new_search_lock(), + _remove_lock(), + _fusion_spec(), + _fusion_lock(), + _maxFlushed(config.getMaxFlushed()), + _maxFrozen(10), + _changeGens(), + _schemaUpdateLock(), + _tuneFileAttributes(config.getTuneFileAttributes()), + _ctx(ctx), + _operations(operations) +{ + // Called by document db init executor thread + _changeGens.bumpWipeGen(); + DiskIndexCleaner::clean(_base_dir, *_active_indexes); + FusionSpec spec = IndexReadUtilities::readFusionSpec(_base_dir); + _next_id = 1 + (spec.flush_ids.empty() ? spec.last_fusion_id : spec.flush_ids.back()); + _last_fusion_id = spec.last_fusion_id; + + if (_next_id > 1) { + string latest_index_dir = spec.flush_ids.empty() + ? getFusionDir(_next_id - 1) + : getFlushDir(_next_id - 1); + + _flush_serial_num = IndexReadUtilities::readSerialNum(latest_index_dir); + _lastFlushTime = search::FileKit::getModificationTime(latest_index_dir); + _current_serial_num = _flush_serial_num; + const string selector = IndexDiskLayout::getSelectorFileName(latest_index_dir); + _selector.reset(FixedSourceSelector::load(selector).release()); + } else { + _flush_serial_num = 0; + _selector.reset(new FixedSourceSelector(0, "sourceselector", 1)); + } + uint32_t baseId(_selector->getBaseId()); + if (_last_fusion_id != baseId) { + assert(_last_fusion_id > baseId); + uint32_t id_diff = _last_fusion_id - baseId; + ostringstream ost; + ost << "sourceselector_fusion(" << _last_fusion_id << ")"; + _selector.reset(getSourceSelector().cloneAndSubtract(ost.str(), id_diff).release()); + assert(_last_fusion_id == _selector->getBaseId()); + } + _current_index_id = getNewAbsoluteId() - _last_fusion_id; + assert(_current_index_id < ISourceSelector::SOURCE_LIMIT); + ISearchableIndexCollection::UP sourceList(loadDiskIndexes(spec, ISearchableIndexCollection::UP(new IndexCollection(_selector)))); + LOG(debug, "Index manager created with flushed serial num %" PRIu64, _flush_serial_num); + sourceList->append(_current_index_id, _current_index); + sourceList->setCurrentIndex(_current_index_id); + _source_list.reset(sourceList.release()); + _fusion_spec = spec; +} + +IndexMaintainer::~IndexMaintainer() +{ + _source_list.reset(); + _frozenMemoryIndexes.clear(); + _selector.reset(); +} + +FlushTask::UP +IndexMaintainer::initFlush(SerialNum serialNum, searchcorespi::FlushStats * stats) +{ + assert(_ctx.getThreadingService().master().isCurrentThread()); // while flush engine scheduler thread waits + { + LockGuard lock(_index_update_lock); + _current_serial_num = std::max(_current_serial_num, serialNum); + } + + IMemoryIndex::SP new_index(_operations.createMemoryIndex(getSchema())); + FlushArgs args; + args.stats = stats; + scheduleCommit(); + // Ensure that all index thread tasks accessing memory index have completed. + _ctx.getThreadingService().sync(); + // Call reconfig closure for this change + Closure0<bool>::UP closure( makeClosure(this, &IndexMaintainer::doneInitFlush, &args, &new_index)); + bool success = _ctx.getReconfigurer().reconfigure(std::move(closure)); + assert(success); + (void) success; + if (args._skippedEmptyLast && args._extraIndexes.empty()) { + // No memory index to flush, it was empty + LockGuard lock(_state_lock); + _flush_serial_num = _current_serial_num; + _lastFlushTime = fastos::ClockSystem::now(); + LOG(debug, "No memory index to flush. Update serial number and flush time to current: " + "flushSerialNum(%" PRIu64 "), lastFlushTime(%f)", + _flush_serial_num, _lastFlushTime.sec()); + return FlushTask::UP(); + } + SerialNum realSerialNum = args.flush_serial_num; + return makeFlushTask(makeClosure(this, &IndexMaintainer::doFlush, std::move(args)), realSerialNum); +} + +FusionSpec +IndexMaintainer::getFusionSpec() +{ + // Only called by unit test + LockGuard guard(_fusion_lock); + return _fusion_spec; +} + +string +IndexMaintainer::doFusion(SerialNum serialNum) +{ + // Called by a flush engine worker thread + + // Make sure to update serial num in case it is something that does not receive any data. + // XXX: Wrong, and will cause data loss. + // XXX: Missing locking. + // XXX: Claims to have flushed memory index when starting fusion. + { + LockGuard lock(_index_update_lock); + _current_serial_num = std::max(_current_serial_num, serialNum); + } + + FusionSpec spec; + { + LockGuard guard(_fusion_lock); + if (!canRunFusion(_fusion_spec)) + return ""; + spec = _fusion_spec; + _fusion_spec.flush_ids.clear(); + } + + uint32_t new_fusion_id = runFusion(spec); + + LockGuard lock(_fusion_lock); + if (new_fusion_id == spec.last_fusion_id) { // Error running fusion. + LOG(warning, "Fusion failed for id %u.", spec.flush_ids.back()); + // Restore fusion spec. + copy(_fusion_spec.flush_ids.begin(), _fusion_spec.flush_ids.end(), back_inserter(spec.flush_ids)); + _fusion_spec.flush_ids.swap(spec.flush_ids); + } else { + _fusion_spec.last_fusion_id = new_fusion_id; + } + return getFusionDir(new_fusion_id); +} + + +uint32_t +IndexMaintainer::runFusion(const FusionSpec &fusion_spec) +{ + // Called by a flush engine worker thread + FusionArgs args; + TuneFileAttributes tuneFileAttributes(getAttrTune()); + { + LockGuard slock(_state_lock); + LockGuard ilock(_index_update_lock); + _activeFusionSchema.reset(new Schema(_fusionSchema)); + _activeFusionWipeTimeSchema.reset(); + args._schema = _fusionSchema; + } + FastOS_StatInfo statInfo; + string lastFlushDir(getFlushDir(fusion_spec.flush_ids.back())); + string lastSerialFile = IndexDiskLayout::getSerialNumFileName(lastFlushDir); + SerialNum serialNum = 0; + if (FastOS_File::Stat(lastSerialFile.c_str(), &statInfo)) { + serialNum = IndexReadUtilities::readSerialNum(lastFlushDir); + } + FusionRunner fusion_runner(_base_dir, args._schema, tuneFileAttributes, _ctx.getFileHeaderContext()); + uint32_t new_fusion_id = fusion_runner.fuse(fusion_spec, serialNum, _operations); + bool ok = (new_fusion_id != 0); + if (ok) { + ok = IndexWriteUtilities::copySerialNumFile(getFlushDir(fusion_spec.flush_ids.back()), + getFusionDir(new_fusion_id)); + } + if (!ok) { + LOG(error, "Fusion failed."); + string fail_dir = getFusionDir(fusion_spec.flush_ids.back()); + FastOS_FileInterface::EmptyAndRemoveDirectory(fail_dir.c_str()); + { + LockGuard slock(_state_lock); + LockGuard ilock(_index_update_lock); + _activeFusionSchema.reset(); + _activeFusionWipeTimeSchema.reset(); + } + return fusion_spec.last_fusion_id; + } + + const string new_fusion_dir = getFusionDir(new_fusion_id); + Schema::SP wtSchema = getActiveFusionWipeTimeSchema(); + if (wtSchema.get() != NULL) { + updateDiskIndexSchema(new_fusion_dir, *wtSchema, noSerialNumHigh); + } + ChangeGens changeGens = getChangeGens(); + IDiskIndex::SP new_index(loadDiskIndex(new_fusion_dir)); + + // Post processing after fusion operation has completed and new disk + // index has been opened. + + args._new_fusion_id = new_fusion_id; + args._changeGens = changeGens; + args._wtSchema = wtSchema; + for (;;) { + // Call reconfig closure for this change + Closure0<bool>::UP closure( makeClosure(this, &IndexMaintainer::doneFusion, &args, &new_index)); + bool success = reconfigure(std::move(closure)); + if (success) { + break; + } + changeGens = getChangeGens(); + wtSchema = getActiveFusionWipeTimeSchema(); + if (wtSchema.get() != NULL) { + updateDiskIndexSchema(new_fusion_dir, *wtSchema, noSerialNumHigh); + } + IDiskIndex::SP diskIndex2; + diskIndex2 = reloadDiskIndex(*new_index); + new_index = diskIndex2; + args._changeGens = changeGens; + args._wtSchema = wtSchema; + } + removeOldDiskIndexes(); + + return new_fusion_id; +} + +void +IndexMaintainer::removeOldDiskIndexes() +{ + LockGuard slock(_remove_lock); + DiskIndexCleaner::removeOldIndexes(_base_dir, *_active_indexes); +} + +IndexMaintainer::FlushStats +IndexMaintainer::getFlushStats() const +{ + // Called by flush engine scheduler thread (from getFlushTargets()) + FlushStats stats; + uint64_t source_selector_bytes; + uint32_t numFrozen = 0; + { + LockGuard lock(_index_update_lock); + source_selector_bytes = _selector->getDocIdLimit() * sizeof(Source); + stats.memory_before_bytes += _current_index->getMemoryUsage().allocatedBytes() + source_selector_bytes; + stats.memory_after_bytes += _current_index->getStaticMemoryFootprint() + source_selector_bytes; + numFrozen = _frozenMemoryIndexes.size(); + for (const FrozenMemoryIndexRef & frozen : _frozenMemoryIndexes) { + stats.memory_before_bytes += frozen._index->getMemoryUsage().allocatedBytes() + source_selector_bytes; + } + } + + if (!_source_selector_changes && stats.memory_after_bytes >= stats.memory_before_bytes) { + // Nothing is written if the index is empty. + stats.disk_write_bytes = 0; + stats.cpu_time_required = 0; + } else { + stats.disk_write_bytes = stats.memory_before_bytes + source_selector_bytes - stats.memory_after_bytes; + stats.cpu_time_required = source_selector_bytes * 3 * (1 + numFrozen) + stats.disk_write_bytes; + } + return stats; +} + +IndexMaintainer::FusionStats +IndexMaintainer::getFusionStats() const +{ + // Called by flush engine scheduler thread (from getFlushTargets()) + FusionStats stats; + IndexSearchable::SP source_list; + + { + LockGuard lock(_new_search_lock); + source_list = _source_list; + } + stats.diskUsage = source_list->getSearchableStats().sizeOnDisk(); + stats.maxFlushed = _maxFlushed; + { + LockGuard guard(_fusion_lock); + stats.numUnfused = _fusion_spec.flush_ids.size() + ((_fusion_spec.last_fusion_id != 0) ? 1 : 0); + stats._canRunFusion = canRunFusion(_fusion_spec); + } + LOG(debug, "Get fusion stats. Disk usage: %" PRIu64 ", maxflushed: %d", stats.diskUsage, stats.maxFlushed); + return stats; +} + +uint32_t +IndexMaintainer::getNumFrozenMemoryIndexes(void) const +{ + // Called by flush engine scheduler thread (from getFlushTargets()) + LockGuard state_lock(_index_update_lock); + return _frozenMemoryIndexes.size(); +} + +void +IndexMaintainer::putDocument(uint32_t lid, const Document &doc, SerialNum serialNum) +{ + assert(_ctx.getThreadingService().index().isCurrentThread()); + LockGuard lock(_index_update_lock); + try { + _current_index->insertDocument(lid, doc); + } catch (const vespalib::IllegalStateException & e) { + vespalib::string s = "Failed inserting document :\n" + doc.toXml(" ") + "\n"; + LOG(error, "%s", s.c_str()); + throw vespalib::IllegalStateException(s, e, VESPA_STRLOC); + } + _selector->setSource(lid, _current_index_id); + _source_list->setSource(lid); + ++_source_selector_changes; + _current_serial_num = serialNum; +} + +void +IndexMaintainer::removeDocument(uint32_t lid, SerialNum serialNum) +{ + assert(_ctx.getThreadingService().index().isCurrentThread()); + LockGuard lock(_index_update_lock); + _current_index->removeDocument(lid); + _selector->setSource(lid, _current_index_id); + _source_list->setSource(lid); + ++_source_selector_changes; + _current_serial_num = serialNum; +} + +void +IndexMaintainer::scheduleCommit() +{ + assert(_ctx.getThreadingService().master().isCurrentThread()); + _ctx.getThreadingService().index(). + execute(makeTask(makeClosure<IndexMaintainer *, IndexMaintainer, void>(this, &IndexMaintainer::commit))); +} + +void +IndexMaintainer::commit() +{ + // only triggered via scheduleCommit() + assert(_ctx.getThreadingService().index().isCurrentThread()); + LockGuard lock(_index_update_lock); + _current_index->commit(std::shared_ptr<search::IDestructorCallback>()); + // caller calls _ctx.getThreadingService().sync() +} + +void +IndexMaintainer::commit(SerialNum serialNum, OnWriteDoneType onWriteDone) +{ + assert(_ctx.getThreadingService().index().isCurrentThread()); + LockGuard lock(_index_update_lock); + _current_serial_num = serialNum; + _current_index->commit(onWriteDone); +} + +void +IndexMaintainer::heartBeat(SerialNum serialNum) +{ + assert(_ctx.getThreadingService().index().isCurrentThread()); + LockGuard lock(_index_update_lock); + _current_serial_num = serialNum; +} + +IFlushTarget::List +IndexMaintainer::getFlushTargets(void) +{ + // Called by flush engine scheduler thread + IFlushTarget::List ret; + IFlushTarget::SP indexFlush(new IndexFlushTarget(*this)); + IFlushTarget::SP indexFusion(new IndexFusionTarget(*this)); + ret.push_back(indexFlush); + ret.push_back(indexFusion); + return ret; +} + +void +IndexMaintainer::setSchema(const Schema & schema, const Schema & fusionSchema) +{ + assert(_ctx.getThreadingService().master().isCurrentThread()); + IMemoryIndex::SP new_index(_operations.createMemoryIndex(schema)); + SetSchemaArgs args; + + args._newSchema = schema; + args._newFusionSchema = fusionSchema; + scheduleCommit(); + // Ensure that all index thread tasks accessing memory index have completed. + _ctx.getThreadingService().sync(); + // Everything should be quiet now. + doneSetSchema(args, new_index); + // Source collection has now changed, caller must reconfigure further + // as appropriate. +} + +void +IndexMaintainer::wipeHistory(SerialNum wipeSerial, const Schema &historyFields) +{ + assert(_ctx.getThreadingService().master().isCurrentThread()); + { + LockGuard state_lock(_state_lock); + LockGuard lock(_index_update_lock); + _fusionSchema = _schema; + } + for (;;) { + const Schema before_schema = getSchema(); + IIndexCollection::SP before_coll = getSourceCollection(); + + Schema::UP schema = Schema::make_union(before_schema, historyFields); + updateIndexSchemas(*before_coll, *schema, wipeSerial); + updateActiveFusionWipeTimeSchema(*schema); + + const Schema after_schema(getSchema()); + IIndexCollection::SP after_coll = getSourceCollection(); + if (before_schema == after_schema && + before_coll.get() == after_coll.get()) + { + break; + } + } + { + LockGuard state_lock(_state_lock); + LockGuard lock(_index_update_lock); + _changeGens.bumpWipeGen(); + } + for (bool success(false); !success;) { + WipeHistoryArgs args; + { + LockGuard state_lock(_state_lock); + args._old_source_list = _source_list; + args._new_source_list.reset(new IndexCollection(_selector, *args._old_source_list)); + } + if (reopenDiskIndexes(*args._new_source_list)) { + _ctx.getThreadingService().sync(); + // Everything should be quiet now. + success = doneWipeHistory(args); + } else { + success = true; + } + } +} + +} // namespace index +} // namespace searchcorespi diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h new file mode 100644 index 00000000000..6e3b6fa9bdb --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h @@ -0,0 +1,412 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "activediskindexes.h" +#include "fusionspec.h" +#include "idiskindex.h" +#include "iindexmaintaineroperations.h" +#include "indexdisklayout.h" +#include "indexmaintainerconfig.h" +#include "indexmaintainercontext.h" +#include "imemoryindex.h" +#include "warmupindexcollection.h" +#include "ithreadingservice.h" +#include <boost/noncopyable.hpp> +#include <vespa/searchcorespi/index/iindexmanager.h> +#include <vespa/searchcorespi/index/indexsearchable.h> +#include <vespa/searchcorespi/index/indexcollection.h> +#include <vespa/searchcorespi/flush/iflushtarget.h> +#include <vespa/searchcorespi/flush/flushstats.h> +#include <vespa/searchlib/attribute/fixedsourceselector.h> +#include <vespa/searchlib/common/serialnum.h> +#include <vespa/vespalib/util/sync.h> +#include <memory> +#include <vector> + +namespace document { +class Document; +} + +namespace search { +namespace common { +class FileHeaderContext; +} +} + +namespace searchcorespi { +namespace index { + +/** + * The IndexMaintainer provides a holistic view of a set of disk and + * memory indexes. It allows updating the active memory index, enables search + * across all indexes, and manages the set of indexes through flushing + * of memory indexes and fusion of disk indexes. + */ +class IndexMaintainer : public boost::noncopyable, + public IIndexManager, + public IWarmupDone +{ + /** + * Extra memory that is frozen but not yet flushed. + */ + class FrozenMemoryIndexRef + { + public: + typedef search::FixedSourceSelector::SaveInfo SaveInfo; + typedef search::SerialNum SerialNum; + typedef std::shared_ptr<SaveInfo> SaveInfoSP; + + IMemoryIndex::SP _index; + SerialNum _serialNum; + SaveInfoSP _saveInfo; + uint32_t _absoluteId; + + FrozenMemoryIndexRef(const IMemoryIndex::SP &index, + SerialNum serialNum, + SaveInfo::UP saveInfo, + uint32_t absoluteId) + : _index(index), + _serialNum(serialNum), + _saveInfo(saveInfo.release()), + _absoluteId(absoluteId) + { } + }; + + class ChangeGens + { + public: + uint32_t _wipeGen; + + ChangeGens() : _wipeGen(0) { } + void bumpWipeGen(void) { ++_wipeGen; } + bool operator==(const ChangeGens &rhs) const { return _wipeGen == rhs._wipeGen; } + bool operator!=(const ChangeGens &rhs) const { return _wipeGen != rhs._wipeGen; } + }; + + typedef std::vector<uint32_t> FlushIds; + typedef std::vector<FrozenMemoryIndexRef> FrozenMemoryIndexRefs; + typedef search::queryeval::ISourceSelector ISourceSelector; + const vespalib::string _base_dir; + const double _diskIndexWarmupTime; + ActiveDiskIndexes::SP _active_indexes; + IndexDiskLayout _layout; + Schema _schema; // Protected by SL + IUL + Schema _fusionSchema; // Protected by SL + IUL + Schema::SP _activeFusionSchema; // Protected by SL + IUL + // Protected by SL + IUL + Schema::SP _activeFusionWipeTimeSchema; + uint32_t _source_selector_changes; // Protected by IUL + // _selector is protected by SL + IUL + ISourceSelector::SP _selector; + ISearchableIndexCollection::SP _source_list; // Protected by SL + NSL + uint32_t _last_fusion_id; // Protected by SL + IUL + uint32_t _next_id; // Protected by SL + IUL + uint32_t _current_index_id; // Protected by SL + IUL + IMemoryIndex::SP _current_index; // Protected by SL + IUL + SerialNum _current_serial_num;// Protected by IUL + SerialNum _flush_serial_num; // Protected by SL + fastos::TimeStamp _lastFlushTime; // Protected by SL + // Extra frozen memory indexes. This list is empty unless new + // memory index has been added by force (due to config change or + // data structure limitations). + // Protected by SL + IUL + FrozenMemoryIndexRefs _frozenMemoryIndexes; + /* + * Locks protecting state. + * + * Note about locking: + * + * A variable can be protected by multiple locks, e.g. SL + NSL. + * To change the variable, all of these locks must be held. + * To read the variable, holding any of these locks is sufficient. + * + * In the example above, getting NSL typically has lower latency, but + * fewer variables can be retrieved. Getting SL has a higher latency + * and allows a snapshot of multiple variables depending on each other + * to be retrieved. + * + * Flush threads typically performs some setup (take SL, copy a + * relevant portion of variables to an args class (FlushArgs, + * FusionArgs, SetSchemaArgs) and perform some disk io) that takes + * time before scheduling a state change task performed by the + * document db master thread. + * + * The scheduled state change task will fail if state changed too much + * after setup by the flush thread. The flush thread must then retry + * with an updated setup. + * + * Things get more complicated when handling multiple kinds of overlapping + * flush operations, e.g. dump from memory to disk, fusion, schema changes + * and wipe of old fields, since this will trigger more retries for some + * of the operations. + */ + vespalib::Lock _state_lock; // Outer lock (SL) + vespalib::Lock _index_update_lock; // Inner lock (IUL) + vespalib::Lock _new_search_lock; // Inner lock (NSL) + vespalib::Lock _remove_lock; // Lock for removing indexes. + // Protected by SL + IUL + FusionSpec _fusion_spec; // Protected by FL + vespalib::Lock _fusion_lock; // Fusion spec lock (FL) + uint32_t _maxFlushed; + uint32_t _maxFrozen; + ChangeGens _changeGens; // Protected by SL + IUL + vespalib::Lock _schemaUpdateLock; // Serialize rewrite of schema + const search::TuneFileAttributes _tuneFileAttributes; + const IndexMaintainerContext _ctx; + IIndexMaintainerOperations &_operations; + + search::FixedSourceSelector & getSourceSelector() { return static_cast<search::FixedSourceSelector &>(*_selector); } + const search::FixedSourceSelector & getSourceSelector() const { return static_cast<const search::FixedSourceSelector &>(*_selector); } + uint32_t getNewAbsoluteId(); + vespalib::string getFlushDir(uint32_t sourceId) const; + vespalib::string getFusionDir(uint32_t sourceId) const; + + /** + * Will reopen diskindexes if necessary due to schema changes. + * @param coll Indexcollection that will be updated with reloaded index. + * @return true if any reload has been performed + */ + bool reopenDiskIndexes(ISearchableIndexCollection &coll); + + void + updateDiskIndexSchema(const vespalib::string &indexDir, + const Schema &schema, + SerialNum wipeSerial); + + void + updateIndexSchemas(IIndexCollection &coll, + const Schema &schema, + SerialNum wipeSerial); + + void updateActiveFusionWipeTimeSchema(const Schema &schema); + void deactivateDiskIndexes(vespalib::string indexDir); + IDiskIndex::SP loadDiskIndex(const vespalib::string &indexDir); + IDiskIndex::SP reloadDiskIndex(const IDiskIndex &oldIndex); + + IDiskIndex::SP + flushMemoryIndex(IMemoryIndex &memoryIndex, + uint32_t indexId, + uint32_t docIdLimit, + SerialNum serialNum); + + ISearchableIndexCollection::UP loadDiskIndexes(const FusionSpec &spec, ISearchableIndexCollection::UP sourceList); + void replaceSource(uint32_t sourceId, const IndexSearchable::SP &source); + void appendSource(uint32_t sourceId, const IndexSearchable::SP &source); + void swapInNewIndex(vespalib::LockGuard & guard, ISearchableIndexCollection::SP indexes, IndexSearchable & source); + ISearchableIndexCollection::UP createNewSourceCollection(const vespalib::LockGuard &newSearchLock); + + struct FlushArgs { + IMemoryIndex::SP old_index; // Last memory index + uint32_t old_absolute_id; + ISearchableIndexCollection::SP old_source_list; // Delays destruction + search::FixedSourceSelector::SaveInfo::SP save_info; + SerialNum flush_serial_num; + FlushStats * stats; + bool _skippedEmptyLast; // Don't flush empty memory index + + // Extra indexes to flush before flushing last frozen memory index + // They are flushed before old_index. This list is empty unless + // new memory index has been added by force (due to config change + // or data structure limitations). + FrozenMemoryIndexRefs _extraIndexes; + ChangeGens _changeGens; + Schema::SP _wtSchema; + + FlushArgs(void) + : old_index(), + old_absolute_id(0), + old_source_list(), + save_info(), + flush_serial_num(), + stats(NULL), + _skippedEmptyLast(false), + _extraIndexes(), + _changeGens(), + _wtSchema() + { + } + }; + + bool doneInitFlush(FlushArgs *args, IMemoryIndex::SP *new_index); + void doFlush(FlushArgs args); + void flushFrozenMemoryIndexes(FlushArgs &args, FlushIds &flushIds); + void flushLastMemoryIndex(FlushArgs &args, FlushIds &flushIds); + void updateFlushStats(const FlushArgs &args); + void flushMemoryIndex(FlushArgs &args, uint32_t docIdLimit, + search::FixedSourceSelector::SaveInfo &saveInfo, FlushIds &flushIds); + void reconfigureAfterFlush(FlushArgs &args, IDiskIndex::SP &diskIndex); + bool doneFlush(FlushArgs *args, IDiskIndex::SP *disk_index); + + + class FusionArgs + { + public: + uint32_t _new_fusion_id; + ChangeGens _changeGens; + Schema _schema; + Schema::SP _wtSchema; + ISearchableIndexCollection::SP _old_source_list; // Delays destruction + + FusionArgs() + : _new_fusion_id(0u), + _changeGens(), + _schema(), + _wtSchema(), + _old_source_list() + { } + }; + + IFlushTarget::SP getFusionTarget(); + void scheduleFusion(const FlushIds &flushIds); + bool canRunFusion(const FusionSpec &spec) const; + bool doneFusion(FusionArgs *args, IDiskIndex::SP *new_index); + + class SetSchemaArgs + { + public: + Schema _newSchema; + Schema _newFusionSchema; + Schema _oldSchema; + Schema _oldFusionSchema; + IMemoryIndex::SP _oldIndex; + ISearchableIndexCollection::SP _oldSourceList; // Delays destruction + + SetSchemaArgs(void) + : _newSchema(), + _newFusionSchema(), + _oldSchema(), + _oldFusionSchema(), + _oldIndex(), + _oldSourceList() + { } + }; + + void doneSetSchema(SetSchemaArgs &args, IMemoryIndex::SP &newIndex); + + class WipeHistoryArgs + { + public: + ISearchableIndexCollection::SP _old_source_list; + ISearchableIndexCollection::SP _new_source_list; + + WipeHistoryArgs() + : _old_source_list(), + _new_source_list() + { } + }; + + bool doneWipeHistory(WipeHistoryArgs &args); + Schema getSchema(void) const; + Schema::SP getActiveFusionWipeTimeSchema(void) const; + search::TuneFileAttributes getAttrTune(void); + ChangeGens getChangeGens(void); + + /* + * Schedule document db executor task to use reconfigurer to + * reconfigure index manager with closure as argument. Wait for + * result. + */ + bool reconfigure(vespalib::Closure0<bool>::UP closure); + virtual void warmupDone(ISearchableIndexCollection::SP current); + bool makeSureAllRemainingWarmupIsDone(ISearchableIndexCollection::SP keepAlive); + void scheduleCommit(); + void commit(); + +public: + IndexMaintainer(const IndexMaintainerConfig &config, + const IndexMaintainerContext &context, + IIndexMaintainerOperations &operations); + ~IndexMaintainer(); + + /** + * Starts a new MemoryIndex, and dumps the previous one to disk. + * Updates flush stats when finished if specified. + **/ + FlushTask::UP initFlush(SerialNum serialNum, FlushStats * stats); + FusionSpec getFusionSpec(); + + /** + * Runs fusion for any available specs and return the output fusion directory. + */ + vespalib::string doFusion(SerialNum serialNum); + uint32_t runFusion(const FusionSpec &fusion_spec); + void removeOldDiskIndexes(); + + struct FlushStats + { + FlushStats() : + memory_before_bytes(0), + memory_after_bytes(0), + disk_write_bytes(0), + cpu_time_required(0) + { } + + uint64_t memory_before_bytes; + uint64_t memory_after_bytes; + uint64_t disk_write_bytes; + uint64_t cpu_time_required; + }; + + struct FusionStats + { + FusionStats() + : diskUsage(0), + maxFlushed(0), + numUnfused(0), + _canRunFusion(false) + { } + + uint64_t diskUsage; + uint32_t maxFlushed; + uint32_t numUnfused; + bool _canRunFusion; + }; + + /** + * Calculates an approximation of the cost of performing a flush. + **/ + FlushStats getFlushStats() const; + FusionStats getFusionStats() const; + const vespalib::string & getBaseDir() const { return _base_dir; } + uint32_t getNumFrozenMemoryIndexes() const; + uint32_t getMaxFrozenMemoryIndexes() const { return _maxFrozen; } + + fastos::TimeStamp getLastFlushTime() const { return _lastFlushTime; } + + // Implements IIndexManager + void putDocument(uint32_t lid, const Document &doc, SerialNum serialNum) override; + void removeDocument(uint32_t lid, SerialNum serialNum) override; + void commit(SerialNum serialNum, OnWriteDoneType onWriteDone) override; + void heartBeat(search::SerialNum serialNum) override; + + SerialNum getCurrentSerialNum() const override { + return _current_serial_num; + } + + SerialNum getFlushedSerialNum() const override { + return _flush_serial_num; + } + + IIndexCollection::SP getSourceCollection() const { + vespalib::LockGuard lock(_new_search_lock); + return _source_list; + } + + searchcorespi::IndexSearchable::SP getSearchable() const override { + vespalib::LockGuard lock(_new_search_lock); + return _source_list; + } + + search::SearchableStats getSearchableStats() const override { + vespalib::LockGuard lock(_new_search_lock); + return _source_list->getSearchableStats(); + } + + IFlushTarget::List getFlushTargets() override; + void setSchema(const Schema & schema, const Schema & fusionSchema) override ; + void wipeHistory(SerialNum wipeSerial, const Schema &historyFields) override; +}; + +} // namespace index +} // namespace searchcorespi + diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainerconfig.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainerconfig.cpp new file mode 100644 index 00000000000..e2838434a1b --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainerconfig.cpp @@ -0,0 +1,31 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP(".searchcorespi.index.indexmaintainerconfig"); + +#include "indexmaintainerconfig.h" + +using search::index::Schema; +using search::TuneFileAttributes; + +namespace searchcorespi { +namespace index { + +IndexMaintainerConfig::IndexMaintainerConfig(const vespalib::string &baseDir, + double diskIndexWarmupTime, + size_t maxFlushed, + const Schema &schema, + const Schema &fusionSchema, + const TuneFileAttributes &tuneFileAttributes) + : _baseDir(baseDir), + _diskIndexWarmupTime(diskIndexWarmupTime), + _maxFlushed(maxFlushed), + _schema(schema), + _fusionSchema(fusionSchema), + _tuneFileAttributes(tuneFileAttributes) +{ +} + +} // namespace index +} // namespace searchcorespi + diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainerconfig.h b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainerconfig.h new file mode 100644 index 00000000000..7ee3e0337cc --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainerconfig.h @@ -0,0 +1,74 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/searchcommon/common/schema.h> +#include <vespa/searchlib/common/tunefileinfo.h> +#include <vespa/vespalib/stllike/string.h> + +namespace searchcorespi { +namespace index { + +/** + * Class that keeps the config used when constructing an index maintainer. + */ +class IndexMaintainerConfig { +private: + const vespalib::string _baseDir; + const double _diskIndexWarmupTime; + const size_t _maxFlushed; + const search::index::Schema _schema; + const search::index::Schema _fusionSchema; + const search::TuneFileAttributes _tuneFileAttributes; + +public: + IndexMaintainerConfig(const vespalib::string &baseDir, + double diskIndexWarmupTime, + size_t maxFlushed, + const search::index::Schema &schema, + const search::index::Schema &fusionSchema, + const search::TuneFileAttributes &tuneFileAttributes); + + /** + * Returns the base directory in which the maintainer will store its indexes. + */ + const vespalib::string &getBaseDir() const { + return _baseDir; + } + + double getDiskIndexWarmupTime() const { + return _diskIndexWarmupTime; + } + + /** + * Returns the initial schema containing all current index fields. + */ + const search::index::Schema &getSchema() const { + return _schema; + } + + /** + * Returns the initial fusion schema containing all index fields that has been in the system. + * This includes all current fields and removed fields that has not been wiped yet. + * This schema is used during fusion to make sure that removed fields are transferred into the + * fusioned index in case they are re-introduced later on. + */ + const search::index::Schema &getFusionSchema() const { + return _fusionSchema; + } + + /** + * Returns the specification on how to read/write attribute vector data files. + */ + const search::TuneFileAttributes &getTuneFileAttributes() const { + return _tuneFileAttributes; + } + + size_t getMaxFlushed() const { + return _maxFlushed; + } +}; + +} // namespace index +} // namespace searchcorespi + + diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainercontext.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainercontext.cpp new file mode 100644 index 00000000000..d18920f5c7a --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainercontext.cpp @@ -0,0 +1,28 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP(".searchcorespi.index.indexmaintainercontext"); + +#include "indexmaintainercontext.h" + +using search::common::FileHeaderContext; +using search::TuneFileAttributes; +using searchcorespi::IIndexManager; + +namespace searchcorespi { +namespace index { + +IndexMaintainerContext::IndexMaintainerContext(IThreadingService &threadingService, + IIndexManager::Reconfigurer &reconfigurer, + const FileHeaderContext &fileHeaderContext, + vespalib::ThreadExecutor & warmupExecutor) + : _threadingService(threadingService), + _reconfigurer(reconfigurer), + _fileHeaderContext(fileHeaderContext), + _warmupExecutor(warmupExecutor) +{ +} + +} // namespace index +} // namespace searchcorespi + diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainercontext.h b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainercontext.h new file mode 100644 index 00000000000..ee8087bef99 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainercontext.h @@ -0,0 +1,59 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "ithreadingservice.h" +#include <vespa/searchcorespi/index/iindexmanager.h> +#include <vespa/searchlib/common/tunefileinfo.h> +#include <vespa/searchlib/common/fileheadercontext.h> +#include <vespa/vespalib/util/threadexecutor.h> + +namespace searchcorespi { +namespace index { + +/** + * Class that keeps the long-lived context used by an index maintainer. + */ +class IndexMaintainerContext { +private: + IThreadingService &_threadingService; + searchcorespi::IIndexManager::Reconfigurer &_reconfigurer; + const search::common::FileHeaderContext &_fileHeaderContext; + vespalib::ThreadExecutor & _warmupExecutor; + +public: + IndexMaintainerContext(IThreadingService &threadingService, + searchcorespi::IIndexManager::Reconfigurer &reconfigurer, + const search::common::FileHeaderContext &fileHeaderContext, + vespalib::ThreadExecutor & warmupExecutor); + + /** + * Returns the treading service that encapsulates the thread model used for writing. + */ + IThreadingService &getThreadingService() const { + return _threadingService; + } + + /** + * Returns the reconfigurer used to signal when the index maintainer has changed. + */ + searchcorespi::IIndexManager::Reconfigurer &getReconfigurer() const { + return _reconfigurer; + } + + /** + * Returns the context used to insert extra tags into file headers before writing them. + */ + const search::common::FileHeaderContext &getFileHeaderContext() const { + return _fileHeaderContext; + } + + /** + * @return The executor that should be used for warmup. + */ + vespalib::ThreadExecutor & getWarmupExecutor() const { return _warmupExecutor; } +}; + +} // namespace index +} // namespace searchcorespi + + diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmanagerconfig.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexmanagerconfig.cpp new file mode 100644 index 00000000000..830e4f68c45 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/indexmanagerconfig.cpp @@ -0,0 +1,20 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP(".searchcorespi.index.indexmanagerconfig"); + +#include "indexmanagerconfig.h" + +namespace searchcorespi { + +IndexManagerConfig::IndexManagerConfig(const vespalib::string &configId, + const config::ConfigSnapshot &configSnapshot, + size_t numSearcherThreads) + : _configId(configId), + _configSnapshot(configSnapshot), + _numSearcherThreads(numSearcherThreads) +{ +} + +} // namespace searchcorespi + diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmanagerconfig.h b/searchcorespi/src/vespa/searchcorespi/index/indexmanagerconfig.h new file mode 100644 index 00000000000..7019a10668a --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/indexmanagerconfig.h @@ -0,0 +1,41 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/config/retriever/configsnapshot.h> +#include <vespa/vespalib/stllike/string.h> + +namespace searchcorespi { + +/** + * Class that keeps the config used when constructing an index manager. + */ +class IndexManagerConfig { +private: + vespalib::string _configId; + const config::ConfigSnapshot &_configSnapshot; + size_t _numSearcherThreads; + +public: + IndexManagerConfig(const vespalib::string &configId, + const config::ConfigSnapshot &configSnapshot, + size_t numSearcherThreads); + + /** + * Returns the config id used to retrieve the configs from the config snapshot instance. + */ + const vespalib::string &getConfigId() const { return _configId; } + + /** + * Returns the snapshot containing configs to be used by the index manager. + */ + const config::ConfigSnapshot &getConfigSnapshot() const { return _configSnapshot; } + + /** + * Returns the number of searcher threads that are used to query the index manager. + */ + size_t getNumSearcherThreads() const { return _numSearcherThreads; } +}; + +} // namespace searchcorespi + + diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexreadutilities.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexreadutilities.cpp new file mode 100644 index 00000000000..d2c76612833 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/indexreadutilities.cpp @@ -0,0 +1,91 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP(".searchcorespi.index.indexreadutilities"); + +#include "indexreadutilities.h" +#include "indexdisklayout.h" +#include <vespa/fastlib/io/bufferedfile.h> +#include <vespa/vespalib/data/fileheader.h> +#include <set> +#include <vector> + +using search::SerialNum; +using vespalib::FileHeader; + +namespace searchcorespi { +namespace index { + +namespace { + +/** + * Assumes that cleanup has removed all obsolete index dirs. + **/ +void +scanForIndexes(const vespalib::string &baseDir, + std::vector<vespalib::string> &flushDirs, + vespalib::string &fusionDir) +{ + FastOS_DirectoryScan dirScan(baseDir.c_str()); + while (dirScan.ReadNext()) { + if (!dirScan.IsDirectory()) { + continue; + } + vespalib::string name = dirScan.GetName(); + if (name.find(IndexDiskLayout::FlushDirPrefix) == 0) { + flushDirs.push_back(name); + } + if (name.find(IndexDiskLayout::FusionDirPrefix) == 0) { + if (!fusionDir.empty()) { + // Should never happen, since we run cleanup before load. + LOG(warning, "Base directory '%s' contains multiple fusion indexes", + baseDir.c_str()); + } + fusionDir = name; + } + } +} + +} + +FusionSpec +IndexReadUtilities::readFusionSpec(const vespalib::string &baseDir) +{ + std::vector<vespalib::string> flushDirs; + vespalib::string fusionDir; + scanForIndexes(baseDir, flushDirs, fusionDir); + + uint32_t fusionId = 0; + if (!fusionDir.empty()) { + fusionId = atoi(fusionDir.substr(IndexDiskLayout::FusionDirPrefix.size()).c_str()); + } + std::set<uint32_t> flushIds; + for (size_t i = 0; i < flushDirs.size(); ++i) { + uint32_t id = atoi(flushDirs[i].substr(IndexDiskLayout::FlushDirPrefix.size()).c_str()); + flushIds.insert(id); + } + + FusionSpec fusionSpec; + fusionSpec.last_fusion_id = fusionId; + fusionSpec.flush_ids.assign(flushIds.begin(), flushIds.end()); + return fusionSpec; +} + +SerialNum +IndexReadUtilities::readSerialNum(const vespalib::string &dir) +{ + const vespalib::string fileName = IndexDiskLayout::getSerialNumFileName(dir); + Fast_BufferedFile file; + file.ReadOpen(fileName.c_str()); + + FileHeader fileHeader; + fileHeader.readFile(file); + if (fileHeader.hasTag(IndexDiskLayout::SerialNumTag)) { + return fileHeader.getTag(IndexDiskLayout::SerialNumTag).asInteger(); + } + return 0; +} + +} // namespace index +} // namespace searchcorespi + diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexreadutilities.h b/searchcorespi/src/vespa/searchcorespi/index/indexreadutilities.h new file mode 100644 index 00000000000..9169f845c1b --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/indexreadutilities.h @@ -0,0 +1,23 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "fusionspec.h" +#include <vespa/searchlib/common/serialnum.h> +#include <vespa/vespalib/stllike/string.h> + +namespace searchcorespi { +namespace index { + +/** + * Utility class with functions to read aspects of an index from disk. + * Used by the index maintainer. + */ +struct IndexReadUtilities { + static FusionSpec readFusionSpec(const vespalib::string &baseDir); + static search::SerialNum readSerialNum(const vespalib::string &dir); +}; + +} // namespace index +} // namespace searchcorespi + + diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexsearchable.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexsearchable.cpp new file mode 100644 index 00000000000..0f670d31855 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/indexsearchable.cpp @@ -0,0 +1,35 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP(".searchcorespi.index.indexsearchable"); +#include "indexsearchable.h" +#include <vespa/searchlib/queryeval/intermediate_blueprints.h> +#include <vespa/searchlib/queryeval/leaf_blueprints.h> + +using namespace search::queryeval; + +namespace searchcorespi { + +IndexSearchable::Blueprint::UP +IndexSearchable::createBlueprint(const IRequestContext & requestContext, + const FieldSpecList &fields, + const Node &term, + const IAttributeContext &attrCtx) +{ + if (fields.empty()) { + return Blueprint::UP(new EmptyBlueprint()); + } + if (fields.size() == 1) { + return createBlueprint(requestContext, fields[0], term, attrCtx); + } + OrBlueprint *b = new OrBlueprint(); + Blueprint::UP result(b); + for (size_t i = 0; i < fields.size(); ++i) { + b->addChild(createBlueprint(requestContext, fields[i], term, attrCtx)); + } + return result; +} + +} // namespace searchcorespi + diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexsearchable.h b/searchcorespi/src/vespa/searchcorespi/index/indexsearchable.h new file mode 100644 index 00000000000..2d4d7cd9674 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/indexsearchable.h @@ -0,0 +1,77 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/searchcommon/attribute/iattributecontext.h> +#include <vespa/searchlib/query/tree/node.h> +#include <vespa/searchlib/queryeval/field_spec.h> +#include <vespa/searchlib/queryeval/blueprint.h> +#include <vespa/searchlib/queryeval/irequestcontext.h> +#include <vespa/searchlib/util/searchable_stats.h> + +namespace searchcorespi { + +/** + * Abstract class extended by components to expose content that can be + * searched by a query term. A IndexSearchable component supports searching + * in one or more named fields. The Blueprint created by a Searchable + * is an intermediate query representation that is later used to + * create the actual search iterators used to produce matches. + * + * The class is a specialized version of search::queryeval::Searchable + * that let the components access a per query attribute context that expose + * attribute vectors that can be utilized during query evaluation. + **/ +class IndexSearchable +{ +protected: + typedef search::queryeval::IRequestContext IRequestContext; + typedef search::queryeval::FieldSpec FieldSpec; + typedef search::queryeval::FieldSpecList FieldSpecList; + typedef search::query::Node Node; + typedef search::attribute::IAttributeContext IAttributeContext; + typedef search::queryeval::Blueprint Blueprint; +public: + typedef std::shared_ptr<IndexSearchable> SP; + + IndexSearchable() {} + + virtual ~IndexSearchable() {} + + /** + * Create a blueprint searching a single field. + * + * @return blueprint + * @param field the field to search + * @param term the query tree term + * @param attrCtx the per query attribute context + **/ + virtual Blueprint::UP + createBlueprint(const IRequestContext & requestContext, + const FieldSpec &field, + const Node &term, + const IAttributeContext &attrCtx) = 0; + + /** + * Create a blueprint searching a set of fields. The default + * implementation of this function will create blueprints for + * individual fields and combine them with an OR blueprint. + * + * @return blueprint + * @param fields the set of fields to search + * @param term the query tree term + * @param attrCtx the per query attribute context + **/ + virtual Blueprint::UP + createBlueprint(const IRequestContext & requestContext, + const FieldSpecList &fields, + const Node &term, + const IAttributeContext &attrCtx); + + /** + * Returns the searchable stats for this index searchable. + */ + virtual search::SearchableStats getSearchableStats() const = 0; +}; + +} // namespace searchcorespi diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexwriteutilities.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexwriteutilities.cpp new file mode 100644 index 00000000000..731e233d289 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/indexwriteutilities.cpp @@ -0,0 +1,189 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/fastos/fastos.h> +#include <vespa/log/log.h> +LOG_SETUP(".searchcorespi.index.indexwriteutilities"); + +#include "indexwriteutilities.h" +#include "indexdisklayout.h" +#include "indexreadutilities.h" +#include <vespa/searchlib/common/serialnumfileheadercontext.h> +#include <vespa/searchlib/index/schemautil.h> +#include <vespa/vespalib/data/fileheader.h> +#include <vespa/vespalib/util/exception.h> +#include <sstream> + +using search::FixedSourceSelector; +using search::TuneFileAttributes; +using search::common::FileHeaderContext; +using search::common::SerialNumFileHeaderContext; +using search::index::Schema; +using search::index::SchemaUtil; +using search::SerialNum; +using vespalib::IllegalStateException; +using vespalib::FileHeader; + +namespace searchcorespi { +namespace index { + +namespace { + +SerialNum noSerialNumHigh = std::numeric_limits<SerialNum>::max(); + +} + +void +IndexWriteUtilities::writeSerialNum(SerialNum serialNum, + const vespalib::string &dir, + const FileHeaderContext &fileHeaderContext) +{ + const vespalib::string fileName = + IndexDiskLayout::getSerialNumFileName(dir); + const vespalib::string tmpFileName = fileName + ".tmp"; + + SerialNumFileHeaderContext snFileHeaderContext(fileHeaderContext, + serialNum); + Fast_BufferedFile file; + file.WriteOpen(tmpFileName.c_str()); + FileHeader fileHeader; + snFileHeaderContext.addTags(fileHeader, fileName); + fileHeader.putTag(FileHeader::Tag(IndexDiskLayout::SerialNumTag, + serialNum)); + bool ok = (fileHeader.writeFile(file) >= fileHeader.getSize()); + if (!file.Sync()) { + ok = false; + LOG(error, + "Unable to fsync '%s'", + tmpFileName.c_str()); + } + file.Close(); + + if (ok) { + FastOS_File renameFile(tmpFileName.c_str()); + ok &= renameFile.Rename(fileName.c_str()); + } + if (!ok) { + std::ostringstream msg; + msg << "Unable to write serial number to '" << dir << "'."; + throw IllegalStateException(msg.str()); + } +} + +bool +IndexWriteUtilities::copySerialNumFile(const vespalib::string &sourceDir, + const vespalib::string &destDir) +{ + vespalib::string source = IndexDiskLayout::getSerialNumFileName(sourceDir); + vespalib::string dest = IndexDiskLayout::getSerialNumFileName(destDir); + vespalib::string tmpDest = dest + ".tmp"; + if (!FastOS_FileInterface::CopyFile(source.c_str(), tmpDest.c_str())) { + LOG(error, "Unable to copy file '%s'", source.c_str()); + return false; + } + FastOS_File file(tmpDest.c_str()); + if (!file.OpenReadWrite()) { + LOG(error, + "Unable to open '%s' for fsync", + tmpDest.c_str()); + return false; + } + if (!file.Sync()) { + LOG(error, + "Unable to fsync '%s'", + tmpDest.c_str()); + return false; + } + file.Close(); + if (!file.Rename(dest.c_str())) { + LOG(error, + "Unable to rename file '%s' to '%s'", + tmpDest.c_str(), dest.c_str()); + return false; + } + return true; +} + +void +IndexWriteUtilities::writeSourceSelector(FixedSourceSelector::SaveInfo & + saveInfo, + uint32_t sourceId, + const TuneFileAttributes & + tuneFileAttributes, + const FileHeaderContext & + fileHeaderContext, + SerialNum serialNum) +{ + SerialNumFileHeaderContext snFileHeaderContext(fileHeaderContext, + serialNum); + if (!saveInfo.save(tuneFileAttributes, snFileHeaderContext)) { + std::ostringstream msg; + msg << "Flush of sourceselector failed. Source id = " << sourceId; + throw IllegalStateException(msg.str()); + } +} + +void +IndexWriteUtilities::updateDiskIndexSchema(const vespalib::string &indexDir, + const Schema &schema, + SerialNum wipeSerial) +{ + vespalib::string schemaName = IndexDiskLayout::getSchemaFileName(indexDir); + Schema oldSchema; + if (!oldSchema.loadFromFile(schemaName)) { + LOG(error, "Could not open schema '%s'", + schemaName.c_str()); + return; + } + if (!SchemaUtil::validateSchema(oldSchema)) { + LOG(error, "Could not validate schema loaded from '%s'", + schemaName.c_str()); + return; + } + Schema::UP newSchema = Schema::intersect(oldSchema, schema); + if (*newSchema == oldSchema) { + return; + } + if (wipeSerial != noSerialNumHigh) { + SerialNum oldSerial = IndexReadUtilities::readSerialNum(indexDir); + if (oldSerial >= wipeSerial) { + return; + } + } + vespalib::string schemaTmpName = schemaName + ".tmp"; + vespalib::string schemaOrigName = schemaName + ".orig"; + if (!newSchema->saveToFile(schemaTmpName)) { + LOG(error, "Could not save schema to '%s'", + schemaTmpName.c_str()); + } + // XXX: FastOS layer violation + FastOS_StatInfo statInfo; + bool statres; + statres = FastOS_File::Stat(schemaOrigName.c_str(), &statInfo); + if (!statres) { + if (statInfo._error != FastOS_StatInfo::FileNotFound) { + LOG(error, "Failed to stat orig schema '%s': %s", + schemaOrigName.c_str(), + FastOS_File::getLastErrorString().c_str()); + } + int linkres = ::link(schemaName.c_str(), schemaOrigName.c_str()); + if (linkres != 0) { + LOG(error, "Could not link '%s' to '%s': %s", + schemaOrigName.c_str(), + schemaName.c_str(), + FastOS_File::getLastErrorString().c_str()); + } + } + // XXX: FastOS layer violation + int renameres = ::rename(schemaTmpName.c_str(), schemaName.c_str()); + if (renameres != 0) { + int error = errno; + std::string errString = FastOS_File::getErrorString(error); + LOG(error, "Could not rename '%s' to '%s': %s", + schemaTmpName.c_str(), + schemaName.c_str(), + errString.c_str()); + } +} + +} // namespace index +} // namespace searchcorespi + diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexwriteutilities.h b/searchcorespi/src/vespa/searchcorespi/index/indexwriteutilities.h new file mode 100644 index 00000000000..c08b850bd6f --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/indexwriteutilities.h @@ -0,0 +1,46 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/searchcommon/common/schema.h> +#include <vespa/searchlib/attribute/fixedsourceselector.h> +#include <vespa/searchlib/common/tunefileinfo.h> +#include <vespa/searchlib/common/fileheadercontext.h> +#include <vespa/searchlib/common/serialnum.h> +#include <vespa/vespalib/stllike/string.h> + +namespace searchcorespi { +namespace index { + +/** + * Utility class with functions to write aspects of an index to disk. + * Used by the index maintainer. + */ +struct IndexWriteUtilities +{ + static void + writeSerialNum(search::SerialNum serialNum, + const vespalib::string &dir, + const search::common::FileHeaderContext &fileHeaderContext); + + static bool + copySerialNumFile(const vespalib::string &sourceDir, + const vespalib::string &destDir); + + static void + writeSourceSelector(search::FixedSourceSelector::SaveInfo &saveInfo, + uint32_t sourceId, + const search::TuneFileAttributes &tuneFileAttributes, + const search::common::FileHeaderContext & + fileHeaderContext, + search::SerialNum serialNum); + + static void + updateDiskIndexSchema(const vespalib::string &indexDir, + const search::index::Schema &schema, + search::SerialNum wipeSerial); +}; + +} // namespace index +} // namespace searchcorespi + + diff --git a/searchcorespi/src/vespa/searchcorespi/index/isearchableindexcollection.cpp b/searchcorespi/src/vespa/searchcorespi/index/isearchableindexcollection.cpp new file mode 100644 index 00000000000..484a5137a81 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/isearchableindexcollection.cpp @@ -0,0 +1,31 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/searchcorespi/index/isearchableindexcollection.h> + +namespace searchcorespi { + +using search::queryeval::ISourceSelector; + +void +ISearchableIndexCollection::setCurrentIndex(uint32_t id) +{ + assert( id < ISourceSelector::SOURCE_LIMIT); + + _currentIndex = id; +} + +uint32_t +ISearchableIndexCollection::getCurrentIndex() const +{ + assert( valid() ); + + return _currentIndex; +} + +bool +ISearchableIndexCollection::valid() const +{ + return (_currentIndex > 0) && (_currentIndex < ISourceSelector::SOURCE_LIMIT); +} + +} diff --git a/searchcorespi/src/vespa/searchcorespi/index/isearchableindexcollection.h b/searchcorespi/src/vespa/searchcorespi/index/isearchableindexcollection.h new file mode 100644 index 00000000000..665f08e0e04 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/isearchableindexcollection.h @@ -0,0 +1,33 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "iindexcollection.h" +#include "indexsearchable.h" + +namespace searchcorespi { + +/** + * Interface to both an IndexCollection and to an IndexSearchable + */ +class ISearchableIndexCollection : public IIndexCollection, + public IndexSearchable +{ +public: + ISearchableIndexCollection() : _currentIndex(-1) { } + typedef std::unique_ptr<ISearchableIndexCollection> UP; + typedef std::shared_ptr<ISearchableIndexCollection> SP; + virtual void append(uint32_t id, const IndexSearchable::SP &source) = 0; + virtual void replace(uint32_t id, const IndexSearchable::SP &source) = 0; + virtual IndexSearchable::SP getSearchableSP(uint32_t i) const = 0; + virtual void setSource(uint32_t docId) = 0; + + void setCurrentIndex(uint32_t id); + uint32_t getCurrentIndex() const; + bool valid() const; +private: + int32_t _currentIndex; +}; + +} // namespace searchcorespi + diff --git a/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h b/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h new file mode 100644 index 00000000000..ec68b374487 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h @@ -0,0 +1,74 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "i_thread_service.h" +#include <boost/noncopyable.hpp> +#include <vespa/vespalib/util/runnable.h> +#include <vespa/vespalib/util/executor.h> +#include <vespa/vespalib/util/syncable.h> +#include <vespa/searchlib/common/isequencedtaskexecutor.h> + +namespace searchcorespi { +namespace index { + +/** + * Interface for the thread model used for write tasks. + * + * We have multiple write threads: + * + * 1. The master write thread used for the majority of write tasks. + * + * 2. The index write thread used for doing changes to the memory + * index, either directly (for data not bound to a field) or via + * index field inverter executor or index field writer executor. + * + * 3. The index field inverter executor is used to populate field + * inverters with data from document fields. Scheduled tasks for + * the same field are executed in sequence. + * + * 4. The index field writer executor is used to sort data in field + * inverters before pushing the data to the memory field indexes. + * Scheduled tasks for the same field are executed in sequence. + * + * The master write thread is always the one giving tasks to the index + * write thread. + * + * The index write thread extracts fields from documents and gives + * task to the index field inverter executor and the index field + * writer executor. + * + * The index field inverter executor and index field writer executor + * are separate to allow for double buffering, i.e. populate one set + * of field inverters using the index field inverter executor while + * another set of field inverters are handled by the index field + * writer executor. + * + * We might decide to allow index field inverter tasks to schedule + * tasks to the index field writer executor, so draining logic needs + * to sync index field inverter executor before syncing index field + * writer executor. + */ +struct IThreadingService : public boost::noncopyable, + public vespalib::Syncable +{ + virtual ~IThreadingService() {} + + /** + * Returns a reference to the master write thread. + */ + virtual IThreadService &master() = 0; + + /** + * Returns a reference to the index write thread. + */ + virtual IThreadService &index() = 0; + + virtual search::ISequencedTaskExecutor &indexFieldInverter() = 0; + + virtual search::ISequencedTaskExecutor &indexFieldWriter() = 0; + + virtual search::ISequencedTaskExecutor &attributeFieldWriter() = 0; +}; + +} // namespace index +} // namespace searchcorespi diff --git a/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.cpp b/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.cpp new file mode 100644 index 00000000000..7d630a2c668 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.cpp @@ -0,0 +1,220 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/searchcorespi/index/warmupindexcollection.h> +#include <vespa/searchcorespi/index/idiskindex.h> +#include <vespa/vespalib/util/closuretask.h> +#include <vespa/searchlib/fef/matchdatalayout.h> +#include <vespa/searchlib/query/tree/termnodes.h> +#include <vespa/log/log.h> + +LOG_SETUP(".searchcorespi.index.warmupindexcollection"); + +namespace searchcorespi { + +using search::query::StringBase; +using search::queryeval::Blueprint; +using search::fef::MatchDataLayout; +using search::queryeval::SearchIterator; +using search::queryeval::ISourceSelector; +using vespalib::makeTask; +using vespalib::makeClosure; +using index::IDiskIndex; +using fastos::TimeStamp; +using fastos::ClockSystem; + +WarmupIndexCollection::WarmupIndexCollection(double warmupSeconds, + ISearchableIndexCollection::SP prev, + ISearchableIndexCollection::SP next, + IndexSearchable & warmup, + vespalib::ThreadExecutor & executor, + IWarmupDone & warmupDone) : + _prev(prev), + _next(next), + _warmup(warmup), + _executor(executor), + _warmupDone(warmupDone), + _warmupEndTime(ClockSystem::now() + TimeStamp::Seconds(warmupSeconds)), + _handledTerms() +{ + if (next->valid()) { + setCurrentIndex(next->getCurrentIndex()); + } else { + LOG(warning, "Next index is not valid, Dangerous !! : %s", next->toString().c_str()); + } + LOG(debug, "For %g seconds I will warm up %s.", warmupSeconds, typeid(_warmup).name()); + LOG(debug, "%s", toString().c_str()); +} + +void +WarmupIndexCollection::setSource(uint32_t docId) +{ + assert(_prev->valid()); + assert(_next->valid()); + _prev->setSource(docId); + _next->setSource(docId); +} + +vespalib::string +WarmupIndexCollection::toString() const +{ + vespalib::asciistream os; + os << "warmup : "; + if (dynamic_cast<const IDiskIndex *>(&_warmup) != NULL) { + os << static_cast<const IDiskIndex &>(_warmup).getIndexDir(); + } else { + os << typeid(_warmup).name(); + } + os << "\n"; + os << "next : " << _next->toString() << "\n"; + os << "prev : " << _prev->toString() << "\n"; + return os.str(); +} + +WarmupIndexCollection::~WarmupIndexCollection() +{ + if (_warmupEndTime != 0) { + LOG(info, + "Warmup aborted due to new state change or application shutdown"); + } + _executor.sync(); +} + +const ISourceSelector & +WarmupIndexCollection::getSourceSelector() const +{ + return _next->getSourceSelector(); +} + +size_t +WarmupIndexCollection::getSourceCount() const +{ + return _next->getSourceCount(); +} + +IndexSearchable & +WarmupIndexCollection::getSearchable(uint32_t i) const +{ + return _next->getSearchable(i); +} + +uint32_t +WarmupIndexCollection::getSourceId(uint32_t i) const +{ + return _next->getSourceId(i); +} + +void +WarmupIndexCollection::fireWarmup(Task::UP task) +{ + fastos::TimeStamp now(fastos::ClockSystem::now()); + if (now < _warmupEndTime) { + _executor.execute(std::move(task)); + } else { + vespalib::LockGuard guard(_lock); + if (_warmupEndTime != 0) { + _warmupEndTime = 0; + guard.unlock(); + LOG(info, "Done warming up. Posting WarmupDoneTask"); + _warmupDone.warmupDone(shared_from_this()); + } + } +} + +bool +WarmupIndexCollection::handledBefore(uint32_t fieldId, const Node &term) +{ + const StringBase * sb(dynamic_cast<const StringBase *>(&term)); + if (sb != NULL) { + const vespalib::string & s = sb->getTerm(); + vespalib::LockGuard guard(_lock); + TermMap::insert_result found = _handledTerms[fieldId].insert(s); + return ! found.second; + } + return true; +} +Blueprint::UP +WarmupIndexCollection::createBlueprint(const IRequestContext & requestContext, + const FieldSpec &field, + const Node &term, + const IAttributeContext &attrCtx) +{ + if ( _warmupEndTime == 0) { + // warmup done + return _next->createBlueprint(requestContext, field, term, attrCtx); + } + if ( ! handledBefore(field.getFieldId(), term) ) { + MatchDataLayout mdl; + FieldSpec fs(field.getName(), field.getFieldId(), mdl.allocTermField(field.getFieldId()), field.isFilter()); + Task::UP task(new WarmupTask(mdl.createMatchData(), *this)); + static_cast<WarmupTask &>(*task).createBlueprint(fs, term, attrCtx); + fireWarmup(std::move(task)); + } + return _prev->createBlueprint(requestContext, field, term, attrCtx); +} + +Blueprint::UP +WarmupIndexCollection::createBlueprint(const IRequestContext & requestContext, + const FieldSpecList &fields, + const Node &term, + const IAttributeContext &attrCtx) +{ + if ( _warmupEndTime == 0) { + // warmup done + return _next->createBlueprint(requestContext, fields, term, attrCtx); + } + MatchDataLayout mdl; + FieldSpecList fsl; + bool needWarmUp(false); + for(size_t i(0); i < fields.size(); i++) { + const FieldSpec & f(fields[i]); + FieldSpec fs(f.getName(), f.getFieldId(), mdl.allocTermField(f.getFieldId()), f.isFilter()); + fsl.add(fs); + needWarmUp = needWarmUp || ! handledBefore(fs.getFieldId(), term); + } + if (needWarmUp) { + Task::UP task(new WarmupTask(mdl.createMatchData(), *this)); + static_cast<WarmupTask &>(*task).createBlueprint(fsl, term, attrCtx); + fireWarmup(std::move(task)); + } + return _prev->createBlueprint(requestContext, fields, term, attrCtx); +} + +search::SearchableStats +WarmupIndexCollection::getSearchableStats() const +{ + return _prev->getSearchableStats(); +} + +void +WarmupIndexCollection::append(uint32_t id, const IndexSearchable::SP &source) +{ + _next->append(id, source); +} + +void +WarmupIndexCollection::replace(uint32_t id, const IndexSearchable::SP &source) +{ + _next->replace(id, source); +} + +IndexSearchable::SP +WarmupIndexCollection::getSearchableSP(uint32_t i) const +{ + return _next->getSearchableSP(i); +} + +void +WarmupIndexCollection::WarmupTask::run() +{ + if (_warmup._warmupEndTime != 0) { + LOG(debug, "Warming up %s", _bluePrint->asString().c_str()); + _bluePrint->fetchPostings(true); + SearchIterator::UP it(_bluePrint->createSearch(*_matchData, true)); + it->initFullRange(); + for (it->seek(0); !it->isAtEnd(); it->seek(it->getDocId()+1)) { + } + } else { + LOG(debug, "Warmup has finished, ignoring task."); + } +} + +} diff --git a/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.h b/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.h new file mode 100644 index 00000000000..7457b676f31 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.h @@ -0,0 +1,110 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/searchcorespi/index/isearchableindexcollection.h> +#include <vespa/vespalib/util/threadexecutor.h> +#include <vespa/searchlib/queryeval/fake_requestcontext.h> + +namespace searchcorespi { + +class IWarmupDone { +public: + virtual ~IWarmupDone() { } + virtual void warmupDone(ISearchableIndexCollection::SP current) = 0; +}; +/** + * Index collection that holds a reference to the active one and a new one that + * is to be warmed up. + */ +class WarmupIndexCollection : public ISearchableIndexCollection, + public std::enable_shared_from_this<WarmupIndexCollection> +{ +public: + typedef std::shared_ptr<WarmupIndexCollection> SP; + WarmupIndexCollection(double warmupSeconds, + ISearchableIndexCollection::SP prev, + ISearchableIndexCollection::SP next, + IndexSearchable & warmup, + vespalib::ThreadExecutor & executor, + IWarmupDone & warmupDone); + ~WarmupIndexCollection(); + // Implements IIndexCollection + const ISourceSelector &getSourceSelector() const override; + size_t getSourceCount() const override; + IndexSearchable &getSearchable(uint32_t i) const override; + uint32_t getSourceId(uint32_t i) const override; + + // Implements IndexSearchable + Blueprint::UP + createBlueprint(const IRequestContext & requestContext, + const FieldSpec &field, + const Node &term, + const IAttributeContext &attrCtx) override; + Blueprint::UP + createBlueprint(const IRequestContext & requestContext, + const FieldSpecList &fields, + const Node &term, + const IAttributeContext &attrCtx); + search::SearchableStats getSearchableStats() const override; + + // Implements ISearchableIndexCollection + void append(uint32_t id, const IndexSearchable::SP &source) override; + void replace(uint32_t id, const IndexSearchable::SP &source) override; + IndexSearchable::SP getSearchableSP(uint32_t i) const override; + void setSource(uint32_t docId) override; + + const ISearchableIndexCollection::SP & getNextIndexCollection() const { return _next; } + vespalib::string toString() const override; +private: + typedef search::fef::MatchData MatchData; + typedef search::queryeval::FakeRequestContext FakeRequestContext; + typedef vespalib::Executor::Task Task; + class WarmupTask : public Task { + public: + WarmupTask(MatchData::UP md, WarmupIndexCollection & warmup) : + _warmup(warmup), + _matchData(std::move(md)), + _bluePrint(), + _requestContext() + { } + WarmupTask & + createBlueprint(const FieldSpec &field, + const Node &term, + const IAttributeContext &attrCtx) + { + _bluePrint = _warmup.createBlueprint(_requestContext, field, term, attrCtx); + return *this; + } + WarmupTask & + createBlueprint(const FieldSpecList &fields, + const Node &term, + const IAttributeContext &attrCtx) + { + _bluePrint = _warmup.createBlueprint(_requestContext, fields, term, attrCtx); + return *this; + } + private: + void run() override; + WarmupIndexCollection & _warmup; + MatchData::UP _matchData; + Blueprint::UP _bluePrint; + FakeRequestContext _requestContext; + }; + + void fireWarmup(Task::UP task); + bool handledBefore(uint32_t fieldId, const Node &term); + + ISearchableIndexCollection::SP _prev; + ISearchableIndexCollection::SP _next; + IndexSearchable & _warmup; + vespalib::ThreadExecutor & _executor; + IWarmupDone & _warmupDone; + fastos::TimeStamp _warmupEndTime; + vespalib::Lock _lock; + typedef vespalib::hash_set<vespalib::string> TermMap; + typedef vespalib::hash_map<uint32_t, TermMap> FieldTermMap; + FieldTermMap _handledTerms; +}; + +} // namespace searchcorespi diff --git a/searchcorespi/src/vespa/searchcorespi/plugin/.gitignore b/searchcorespi/src/vespa/searchcorespi/plugin/.gitignore new file mode 100644 index 00000000000..7e7c0fe7fae --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/plugin/.gitignore @@ -0,0 +1,2 @@ +/.depend +/Makefile diff --git a/searchcorespi/src/vespa/searchcorespi/plugin/CMakeLists.txt b/searchcorespi/src/vespa/searchcorespi/plugin/CMakeLists.txt new file mode 100644 index 00000000000..b0dd8095850 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/plugin/CMakeLists.txt @@ -0,0 +1,7 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_library(searchcorespi_plugin OBJECT + SOURCES + factoryregistry.cpp + factoryloader.cpp + DEPENDS +) diff --git a/searchcorespi/src/vespa/searchcorespi/plugin/factoryloader.cpp b/searchcorespi/src/vespa/searchcorespi/plugin/factoryloader.cpp new file mode 100644 index 00000000000..d36705c4d05 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/plugin/factoryloader.cpp @@ -0,0 +1,33 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/searchcorespi/plugin/factoryloader.h> +#include <vespa/vespalib/util/exceptions.h> + +using vespalib::stringref; +using vespalib::make_string; +using vespalib::IllegalArgumentException; + +namespace searchcorespi { + +FactoryLoader::FactoryLoader() : + _libraries() +{ +} + +FactoryLoader::~FactoryLoader() +{ +} + +IIndexManagerFactory::UP +FactoryLoader::create(const stringref & factory) +{ + typedef IIndexManagerFactory* (*FuncT)(); + _libraries.loadLibrary(factory); + const FastOS_DynamicLibrary & lib = *_libraries.get(factory); + FuncT registrationMethod = reinterpret_cast<FuncT>(lib.GetSymbol("createIndexManagerFactory")); + if (registrationMethod == NULL) { + throw IllegalArgumentException(make_string("Failed locating symbol 'createIndexManagerFactory' in library '%s' for factory '%s'.", lib.GetLibName(), factory.c_str())); + } + return IIndexManagerFactory::UP(registrationMethod()); +} + +} diff --git a/searchcorespi/src/vespa/searchcorespi/plugin/factoryloader.h b/searchcorespi/src/vespa/searchcorespi/plugin/factoryloader.h new file mode 100644 index 00000000000..a97ba9c1c82 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/plugin/factoryloader.h @@ -0,0 +1,26 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/searchcorespi/plugin/iindexmanagerfactory.h> +#include <vespa/vespalib/util/librarypool.h> + +namespace searchcorespi { + +class FactoryLoader +{ +public: + FactoryLoader(); + ~FactoryLoader(); + /** + * Will load the library containing the factory. It will then locate the 'createIndexManagerFactory' + * symbol and run it to create the factory. + * @param the name of the library. Like 'vesparise'. + * @return the factory that is created. + */ + IIndexManagerFactory::UP create(const vespalib::stringref & factory); +private: + vespalib::LibraryPool _libraries; +}; + +} // namespace searchcorespi + diff --git a/searchcorespi/src/vespa/searchcorespi/plugin/factoryregistry.cpp b/searchcorespi/src/vespa/searchcorespi/plugin/factoryregistry.cpp new file mode 100644 index 00000000000..03ed39c8d30 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/plugin/factoryregistry.cpp @@ -0,0 +1,57 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/searchcorespi/plugin/factoryregistry.h> + +using vespalib::LockGuard; +using vespalib::IllegalArgumentException; +using vespalib::stringref; +using vespalib::string; + +namespace searchcorespi { + +FactoryRegistry::FactoryRegistry() +{ +} + +FactoryRegistry::~FactoryRegistry() +{ +} + +void FactoryRegistry::add(const stringref & uniqueName, const IIndexManagerFactory::SP & factory) +{ + LockGuard guard(_lock); + if (_registry.find(uniqueName) == _registry.end()) { + _registry[uniqueName] = factory; + } else { + throw IllegalArgumentException("A factory is already registered with the same name as '" + uniqueName + "'.", VESPA_STRLOC); + } +} + +void FactoryRegistry::remove(const stringref & uniqueName) +{ + LockGuard guard(_lock); + if (_registry.find(uniqueName) == _registry.end()) { + throw IllegalArgumentException("No factory is registered with the name of '" + uniqueName + "'.", VESPA_STRLOC); + } + _registry.erase(uniqueName); +} + +const IIndexManagerFactory::SP & +FactoryRegistry::get(const stringref & uniqueName) const +{ + LockGuard guard(_lock); + Registry::const_iterator found = _registry.find(uniqueName); + if (found == _registry.end()) { + throw IllegalArgumentException("No factory is registered with the name of '" + uniqueName + "'.", VESPA_STRLOC); + } + return found->second; +} + +bool +FactoryRegistry::isRegistered(const vespalib::stringref & uniqueName) const +{ + LockGuard guard(_lock); + Registry::const_iterator found = _registry.find(uniqueName); + return found != _registry.end(); +} + +} diff --git a/searchcorespi/src/vespa/searchcorespi/plugin/factoryregistry.h b/searchcorespi/src/vespa/searchcorespi/plugin/factoryregistry.h new file mode 100644 index 00000000000..3e59eb5a852 --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/plugin/factoryregistry.h @@ -0,0 +1,50 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/searchcorespi/plugin/iindexmanagerfactory.h> + +namespace searchcorespi { + +/** + */ +class FactoryRegistry +{ +public: + FactoryRegistry(); + ~FactoryRegistry(); + /** + * This will register the plugged in factory under its official + * name. The plugin should call this method when it is loaded. E.g. + * by using either '__attribute__((constructor))' or using a + * global static object that will use its constructor to register + * the factory. + * @param uniqueName This is a name that is unique over all IndexManager factories. + * @param factory The factory instance for producing IndexManagers. + * @throws vespalib::IllegalArgument if factory is already registered. + */ + void add(const vespalib::stringref & uniqueName, const IIndexManagerFactory::SP & factory); + /** + * Will unregister a factory. Should be called when a sharedlibrary is being unloaded. + * @param uniqueName Unique name of factory to remove from registry. + * @throws vespalib::IllegalArgument if factory is already registered. + */ + void remove(const vespalib::stringref & uniqueName); + /** + * This method will fetch a factory given its unique name. + * @param name The name of the factory to return. + * @return The factory. + */ + const IIndexManagerFactory::SP & get(const vespalib::stringref & uniqueName) const; + /** + * Returns true if a factory with the given name has been registered. + */ + bool isRegistered(const vespalib::stringref & uniqueName) const; + +private: + typedef std::map<vespalib::string, IIndexManagerFactory::SP> Registry; + Registry _registry; + vespalib::Lock _lock; +}; + +} // namespace searchcorespi + diff --git a/searchcorespi/src/vespa/searchcorespi/plugin/iindexmanagerfactory.h b/searchcorespi/src/vespa/searchcorespi/plugin/iindexmanagerfactory.h new file mode 100644 index 00000000000..982f48cc96a --- /dev/null +++ b/searchcorespi/src/vespa/searchcorespi/plugin/iindexmanagerfactory.h @@ -0,0 +1,76 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/searchcorespi/index/iindexmanager.h> +#include <vespa/searchcorespi/index/indexmaintainerconfig.h> +#include <vespa/searchcorespi/index/indexmaintainercontext.h> +#include <vespa/searchcorespi/index/indexmanagerconfig.h> +#include <vespa/config/configgen/configinstance.h> +#include <vespa/config/retriever/configsnapshot.h> +#include <vespa/config/retriever/configkeyset.h> + +namespace searchcorespi { + +/** + * Interface for an index manager factory. Every provider of an index manager is supposed to provide a + * factory for producing them. It is given the basedir, the schema and a collection of configs. + * The factory implementation must pick the config it needs and return an IIndexManager instance. + * The factory is registered by using the registerFactory() method. + */ +class IIndexManagerFactory +{ +public: + typedef std::shared_ptr<IIndexManagerFactory> SP; + typedef std::unique_ptr<IIndexManagerFactory> UP; + + virtual ~IIndexManagerFactory() {} + + /** + * This method will be called by a document db when it needs to create an index manager that + * uses an index maintainer (with source selector) in its implementation. + * The factory implementation must use RTTI to figure out what configs are what. + * It should receive all configs it needs, but wise to do sanity checking. + * + * @param managerConfig The config that will be used to construct an index manager. + * Note that if the factory used a different config id when populating the + * ConfigKeySet compared to the one in this config instance, it must + * also override the config id when fetching from the config snapshot. + * The root config received in the @ref getConfigKeys() call will also be + * part of the config snapshot in this config instance. + * @param maintainerConfig The config needed to construct an index maintainer. + * @param maintainerContext The context object used by an index maintainer during its lifetime. + * @return The index manager created or NULL if not, fx if configs are not as expected. + */ + virtual IIndexManager::UP createIndexManager(const IndexManagerConfig &managerConfig, + const index::IndexMaintainerConfig &maintainerConfig, + const index::IndexMaintainerContext &maintainerContext) = 0; + + /** + * The factory must return the set of config keys that it will require the config from. + * This will facilitate that the searchcore can fetch all configs needed in a pluggable way. + * + * @param configId The config id to use when generating the config keys. + * @param schema This is the initial index schema to be used. + * @param rootConfig This is an config instance that is the root config for the factory. + * Based on this config it must be able to tell if it needs any other config, + * and in that case provide the config keys. + * @return The set containing keys for all configs required. + */ + virtual config::ConfigKeySet getConfigKeys(const vespalib::string &configId, + const search::index::Schema &schema, + const config::ConfigInstance &rootConfig) = 0; +}; + +} // namespace searchcorespi + +extern "C" { +/** + * This is a method that each shared library must have in order provide a factory. + * This will be called by the one loading the library. + * @return The created factory that the caller will take ownership of. + */ +searchcorespi::IIndexManagerFactory * createIndexManagerFactory(); + +} + + diff --git a/searchcorespi/testrun/.gitignore b/searchcorespi/testrun/.gitignore new file mode 100644 index 00000000000..1b1bf19bb72 --- /dev/null +++ b/searchcorespi/testrun/.gitignore @@ -0,0 +1,15 @@ +/test-report.html +/test-report.html.bottom +/test-report.html.entry +/test-report.html.summary +/test-report.html.top +/test.1.plugin.desc.file.txt +/test.1.plugin.file.plugin_test.cpp.txt +/test.1.plugin.files.html +/test.1.plugin.log.file.txt +/tmp.end-time +/tmp.plugin-time +/tmp.plugin.log-control +/tmp.start-time +/test.*.*.result +/Makefile |