aboutsummaryrefslogtreecommitdiffstats
path: root/searchcorespi/src
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
committerJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
commit72231250ed81e10d66bfe70701e64fa5fe50f712 (patch)
tree2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /searchcorespi/src
Publish
Diffstat (limited to 'searchcorespi/src')
-rw-r--r--searchcorespi/src/.gitignore4
-rw-r--r--searchcorespi/src/testlist.txt1
-rw-r--r--searchcorespi/src/tests/plugin/.gitignore5
-rw-r--r--searchcorespi/src/tests/plugin/CMakeLists.txt29
-rw-r--r--searchcorespi/src/tests/plugin/DESC1
-rw-r--r--searchcorespi/src/tests/plugin/FILES1
-rw-r--r--searchcorespi/src/tests/plugin/empty.cpp1
-rw-r--r--searchcorespi/src/tests/plugin/factoryregistry_test.cpp63
-rw-r--r--searchcorespi/src/tests/plugin/plugin.cpp77
-rw-r--r--searchcorespi/src/tests/plugin/plugin_test.cpp32
-rw-r--r--searchcorespi/src/vespa/searchcorespi/.gitignore3
-rw-r--r--searchcorespi/src/vespa/searchcorespi/CMakeLists.txt9
-rw-r--r--searchcorespi/src/vespa/searchcorespi/flush/.gitignore2
-rw-r--r--searchcorespi/src/vespa/searchcorespi/flush/CMakeLists.txt6
-rw-r--r--searchcorespi/src/vespa/searchcorespi/flush/closureflushtask.h47
-rw-r--r--searchcorespi/src/vespa/searchcorespi/flush/flushstats.cpp16
-rw-r--r--searchcorespi/src/vespa/searchcorespi/flush/flushstats.h28
-rw-r--r--searchcorespi/src/vespa/searchcorespi/flush/flushtask.h20
-rw-r--r--searchcorespi/src/vespa/searchcorespi/flush/iflushtarget.h186
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/.gitignore2
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/CMakeLists.txt25
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/activediskindexes.cpp34
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/activediskindexes.h30
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/diskindexcleaner.cpp113
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/diskindexcleaner.h26
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/eventlogger.cpp71
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/eventlogger.h25
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/fakeindexsearchable.h43
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/fusionrunner.cpp145
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/fusionrunner.h63
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/fusionspec.h24
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/i_thread_service.h34
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/idiskindex.h33
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/iindexcollection.cpp45
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/iindexcollection.h50
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/iindexmaintaineroperations.h56
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/iindexmanager.cpp17
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/iindexmanager.h178
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/imemoryindex.h87
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/index_manager_explorer.cpp28
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/index_manager_explorer.h25
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexcollection.cpp223
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexcollection.h73
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexdisklayout.cpp62
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexdisklayout.h35
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.cpp86
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.h40
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.cpp107
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.h36
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp1238
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h412
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexmaintainerconfig.cpp31
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexmaintainerconfig.h74
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexmaintainercontext.cpp28
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexmaintainercontext.h59
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexmanagerconfig.cpp20
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexmanagerconfig.h41
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexreadutilities.cpp91
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexreadutilities.h23
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexsearchable.cpp35
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexsearchable.h77
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexwriteutilities.cpp189
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/indexwriteutilities.h46
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/isearchableindexcollection.cpp31
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/isearchableindexcollection.h33
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h74
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.cpp220
-rw-r--r--searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.h110
-rw-r--r--searchcorespi/src/vespa/searchcorespi/plugin/.gitignore2
-rw-r--r--searchcorespi/src/vespa/searchcorespi/plugin/CMakeLists.txt7
-rw-r--r--searchcorespi/src/vespa/searchcorespi/plugin/factoryloader.cpp33
-rw-r--r--searchcorespi/src/vespa/searchcorespi/plugin/factoryloader.h26
-rw-r--r--searchcorespi/src/vespa/searchcorespi/plugin/factoryregistry.cpp57
-rw-r--r--searchcorespi/src/vespa/searchcorespi/plugin/factoryregistry.h50
-rw-r--r--searchcorespi/src/vespa/searchcorespi/plugin/iindexmanagerfactory.h76
75 files changed, 5430 insertions, 0 deletions
diff --git a/searchcorespi/src/.gitignore b/searchcorespi/src/.gitignore
new file mode 100644
index 00000000000..49bfd9b85d3
--- /dev/null
+++ b/searchcorespi/src/.gitignore
@@ -0,0 +1,4 @@
+/Makefile.ini
+/config_command.sh
+/project.dsw
+/searchcorespi.mak
diff --git a/searchcorespi/src/testlist.txt b/searchcorespi/src/testlist.txt
new file mode 100644
index 00000000000..df4c0bebe2c
--- /dev/null
+++ b/searchcorespi/src/testlist.txt
@@ -0,0 +1 @@
+tests/plugin
diff --git a/searchcorespi/src/tests/plugin/.gitignore b/searchcorespi/src/tests/plugin/.gitignore
new file mode 100644
index 00000000000..e49000038ad
--- /dev/null
+++ b/searchcorespi/src/tests/plugin/.gitignore
@@ -0,0 +1,5 @@
+Makefile
+.depend
+*_test
+searchcorespi_factoryregistry_test_app
+searchcorespi_plugin_test_app
diff --git a/searchcorespi/src/tests/plugin/CMakeLists.txt b/searchcorespi/src/tests/plugin/CMakeLists.txt
new file mode 100644
index 00000000000..ae70cc2d7f7
--- /dev/null
+++ b/searchcorespi/src/tests/plugin/CMakeLists.txt
@@ -0,0 +1,29 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(searchcorespi_plugin_test_app
+ SOURCES
+ plugin_test.cpp
+ DEPENDS
+ searchcorespi
+)
+vespa_add_test(
+ NAME searchcorespi_plugin_test_app
+ COMMAND searchcorespi_plugin_test_app
+ ENVIRONMENT "LD_LIBRARY_PATH=."
+)
+vespa_add_executable(searchcorespi_factoryregistry_test_app
+ SOURCES
+ factoryregistry_test.cpp
+ DEPENDS
+ searchcorespi
+)
+vespa_add_test(NAME searchcorespi_factoryregistry_test_app COMMAND searchcorespi_factoryregistry_test_app)
+vespa_add_library(searchcorespi_tplugin
+ SOURCES
+ plugin.cpp
+ DEPENDS
+)
+vespa_add_library(searchcorespi_illegal-plugin
+ SOURCES
+ empty.cpp
+ DEPENDS
+)
diff --git a/searchcorespi/src/tests/plugin/DESC b/searchcorespi/src/tests/plugin/DESC
new file mode 100644
index 00000000000..80d66100e85
--- /dev/null
+++ b/searchcorespi/src/tests/plugin/DESC
@@ -0,0 +1 @@
+Test of factory plugin interface.
diff --git a/searchcorespi/src/tests/plugin/FILES b/searchcorespi/src/tests/plugin/FILES
new file mode 100644
index 00000000000..1b658927feb
--- /dev/null
+++ b/searchcorespi/src/tests/plugin/FILES
@@ -0,0 +1 @@
+plugin_test.cpp
diff --git a/searchcorespi/src/tests/plugin/empty.cpp b/searchcorespi/src/tests/plugin/empty.cpp
new file mode 100644
index 00000000000..d84fabdd5a2
--- /dev/null
+++ b/searchcorespi/src/tests/plugin/empty.cpp
@@ -0,0 +1 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
diff --git a/searchcorespi/src/tests/plugin/factoryregistry_test.cpp b/searchcorespi/src/tests/plugin/factoryregistry_test.cpp
new file mode 100644
index 00000000000..16cb03a5496
--- /dev/null
+++ b/searchcorespi/src/tests/plugin/factoryregistry_test.cpp
@@ -0,0 +1,63 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// Unit tests for factoryregistry.
+
+#include <vespa/log/log.h>
+LOG_SETUP("factoryregistry_test");
+#include <vespa/fastos/fastos.h>
+
+#include <vespa/searchcorespi/plugin/factoryregistry.h>
+#include <vespa/searchcorespi/plugin/iindexmanagerfactory.h>
+#include <vespa/vespalib/stllike/string.h>
+#include <vespa/vespalib/testkit/testapp.h>
+
+using vespalib::string;
+using namespace searchcorespi;
+
+namespace {
+
+struct MyFactory : IIndexManagerFactory {
+
+ virtual IIndexManager::UP createIndexManager(const IndexManagerConfig &,
+ const index::IndexMaintainerConfig &,
+ const index::IndexMaintainerContext &) {
+ return IIndexManager::UP();
+ }
+ virtual config::ConfigKeySet getConfigKeys(
+ const string &,
+ const search::index::Schema &,
+ const config::ConfigInstance &) {
+ return config::ConfigKeySet();
+ }
+};
+
+const string name = "factory";
+
+TEST("require that factories can be added and removed") {
+ FactoryRegistry registry;
+ EXPECT_FALSE(registry.isRegistered(name));
+ registry.add(name, IIndexManagerFactory::SP(new MyFactory));
+ EXPECT_TRUE(registry.get(name).get());
+ EXPECT_TRUE(registry.isRegistered(name));
+ registry.remove(name);
+ EXPECT_EXCEPTION(registry.get(name), vespalib::IllegalArgumentException,
+ "No factory is registered with the name");
+}
+
+TEST("require that two factories with the same name cannot be added") {
+ FactoryRegistry registry;
+ registry.add(name, IIndexManagerFactory::SP(new MyFactory));
+ EXPECT_EXCEPTION(
+ registry.add(name, IIndexManagerFactory::SP(new MyFactory)),
+ vespalib::IllegalArgumentException,
+ "A factory is already registered with the same name");
+}
+
+TEST("require that a non-existent factory cannot be removed") {
+ FactoryRegistry registry;
+ EXPECT_EXCEPTION(registry.remove(name), vespalib::IllegalArgumentException,
+ "No factory is registered with the name");
+}
+
+} // namespace
+
+TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/searchcorespi/src/tests/plugin/plugin.cpp b/searchcorespi/src/tests/plugin/plugin.cpp
new file mode 100644
index 00000000000..ecd8cc892d9
--- /dev/null
+++ b/searchcorespi/src/tests/plugin/plugin.cpp
@@ -0,0 +1,77 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/searchcorespi/plugin/iindexmanagerfactory.h>
+
+using namespace search;
+using namespace search::index;
+using namespace vespalib;
+using namespace config;
+
+namespace searchcorespi {
+class IndexManager : public searchcorespi::IIndexManager
+{
+public:
+
+ typedef search::SerialNum SerialNum;
+ typedef search::index::Schema Schema;
+ typedef document::Document Document;
+ using OnWriteDoneType =
+ const std::shared_ptr<search::IDestructorCallback> &;
+ virtual void putDocument(uint32_t, const Document &, SerialNum) override { }
+ virtual void removeDocument(uint32_t, SerialNum) override { }
+ virtual void commit(SerialNum, OnWriteDoneType) override { }
+ virtual void heartBeat(SerialNum ) {}
+ virtual SerialNum getCurrentSerialNum() const { return 0; }
+ virtual SerialNum getFlushedSerialNum() const { return 0; }
+ virtual IndexSearchable::SP getSearchable() const {
+ IndexSearchable::SP s;
+ return s;
+ }
+ virtual SearchableStats getSearchableStats() const {
+ SearchableStats s;
+ return s;
+ }
+ virtual searchcorespi::IFlushTarget::List getFlushTargets() {
+ searchcorespi::IFlushTarget::List l;
+ return l;
+ }
+ virtual void setSchema(const Schema & , const Schema &) { }
+ virtual void wipeHistory(SerialNum , const Schema &) { }
+};
+
+class IndexManagerFactory : public searchcorespi::IIndexManagerFactory
+{
+public:
+ virtual IIndexManager::UP createIndexManager(const IndexManagerConfig &managerCfg,
+ const index::IndexMaintainerConfig &maintainerConfig,
+ const index::IndexMaintainerContext &maintainerContext);
+
+ virtual ConfigKeySet getConfigKeys(const string &configId,
+ const Schema &schema,
+ const ConfigInstance &rootConfig);
+};
+
+IIndexManager::UP
+IndexManagerFactory::createIndexManager(const IndexManagerConfig &,
+ const index::IndexMaintainerConfig &,
+ const index::IndexMaintainerContext &)
+{
+ return IIndexManager::UP(new IndexManager());
+}
+
+ConfigKeySet
+IndexManagerFactory::getConfigKeys(const string &,
+ const Schema &,
+ const ConfigInstance &)
+{
+ ConfigKeySet keys;
+ return keys;
+}
+
+}
+
+searchcorespi::IIndexManagerFactory *
+createIndexManagerFactory()
+{
+ return new searchcorespi::IndexManagerFactory();
+}
+
diff --git a/searchcorespi/src/tests/plugin/plugin_test.cpp b/searchcorespi/src/tests/plugin/plugin_test.cpp
new file mode 100644
index 00000000000..34692b4ed7b
--- /dev/null
+++ b/searchcorespi/src/tests/plugin/plugin_test.cpp
@@ -0,0 +1,32 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include <vespa/vespalib/testkit/testapp.h>
+#include <vespa/searchcorespi/plugin/factoryloader.h>
+#include <vespa/log/log.h>
+LOG_SETUP("factory_test");
+
+using namespace searchcorespi;
+
+namespace {
+TEST("require that plugins can be loaded.") {
+ FactoryLoader fl;
+ IIndexManagerFactory::UP f = fl.create("searchcorespi_tplugin");
+ ASSERT_TRUE(f.get());
+}
+
+TEST("require that non-existent plugin causes failure") {
+ FactoryLoader fl;
+ EXPECT_EXCEPTION(fl.create("no-such-plugin"),
+ vespalib::IllegalArgumentException,
+ "cannot open shared object file");
+}
+
+TEST("require that missing factory function causes failure") {
+ FactoryLoader fl;
+ EXPECT_EXCEPTION(fl.create("searchcorespi_illegal-plugin"),
+ vespalib::IllegalArgumentException,
+ "Failed locating symbol 'createIndexManagerFactory'");
+}
+} // namespace
+
+TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/searchcorespi/src/vespa/searchcorespi/.gitignore b/searchcorespi/src/vespa/searchcorespi/.gitignore
new file mode 100644
index 00000000000..9d6ecd51398
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/.gitignore
@@ -0,0 +1,3 @@
+/.depend
+/Makefile
+/libsearchcorespi.so.5.1
diff --git a/searchcorespi/src/vespa/searchcorespi/CMakeLists.txt b/searchcorespi/src/vespa/searchcorespi/CMakeLists.txt
new file mode 100644
index 00000000000..a28c17c5c94
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/CMakeLists.txt
@@ -0,0 +1,9 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_library(searchcorespi
+ SOURCES
+ $<TARGET_OBJECTS:searchcorespi_flush>
+ $<TARGET_OBJECTS:searchcorespi_index>
+ $<TARGET_OBJECTS:searchcorespi_plugin>
+ INSTALL lib64
+ DEPENDS
+)
diff --git a/searchcorespi/src/vespa/searchcorespi/flush/.gitignore b/searchcorespi/src/vespa/searchcorespi/flush/.gitignore
new file mode 100644
index 00000000000..7e7c0fe7fae
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/flush/.gitignore
@@ -0,0 +1,2 @@
+/.depend
+/Makefile
diff --git a/searchcorespi/src/vespa/searchcorespi/flush/CMakeLists.txt b/searchcorespi/src/vespa/searchcorespi/flush/CMakeLists.txt
new file mode 100644
index 00000000000..1f565cb7aac
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/flush/CMakeLists.txt
@@ -0,0 +1,6 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_library(searchcorespi_flush OBJECT
+ SOURCES
+ flushstats.cpp
+ DEPENDS
+)
diff --git a/searchcorespi/src/vespa/searchcorespi/flush/closureflushtask.h b/searchcorespi/src/vespa/searchcorespi/flush/closureflushtask.h
new file mode 100644
index 00000000000..0cd653770fb
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/flush/closureflushtask.h
@@ -0,0 +1,47 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "flushtask.h"
+#include <vespa/vespalib/util/closure.h>
+
+namespace searchcorespi {
+
+class ClosureFlushTask : public FlushTask
+{
+ std::unique_ptr<vespalib::Closure> _closure;
+ search::SerialNum _flushSerial;
+
+public:
+ ClosureFlushTask(std::unique_ptr<vespalib::Closure> closure,
+ search::SerialNum flushSerial)
+ : _closure(std::move(closure)),
+ _flushSerial(flushSerial)
+ {
+ }
+
+ virtual search::SerialNum
+ getFlushSerial() const
+ {
+ return _flushSerial;
+ }
+
+ virtual void
+ run()
+ {
+ _closure->call();
+ }
+};
+
+/**
+ * Wraps a Closure as a FlushTask
+ **/
+static inline FlushTask::UP
+makeFlushTask(std::unique_ptr<vespalib::Closure> closure,
+ search::SerialNum flushSerial)
+{
+ return FlushTask::UP(new ClosureFlushTask(std::move(closure),
+ flushSerial));
+}
+
+} // namespace searchcorespi
+
diff --git a/searchcorespi/src/vespa/searchcorespi/flush/flushstats.cpp b/searchcorespi/src/vespa/searchcorespi/flush/flushstats.cpp
new file mode 100644
index 00000000000..29318809335
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/flush/flushstats.cpp
@@ -0,0 +1,16 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include <vespa/log/log.h>
+LOG_SETUP(".searchcorespi.flush.flushstats");
+
+#include "flushstats.h"
+
+namespace searchcorespi {
+
+FlushStats::FlushStats() :
+ _path(),
+ _pathElementsToLog(6)
+{
+}
+
+} // namespace searchcorespi
diff --git a/searchcorespi/src/vespa/searchcorespi/flush/flushstats.h b/searchcorespi/src/vespa/searchcorespi/flush/flushstats.h
new file mode 100644
index 00000000000..b11712624ab
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/flush/flushstats.h
@@ -0,0 +1,28 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/vespalib/stllike/string.h>
+
+namespace searchcorespi {
+
+/**
+ * Class with stats for what have been flushed.
+ */
+class FlushStats
+{
+private:
+ vespalib::string _path; // path to data flushed
+ size_t _pathElementsToLog;
+
+public:
+ FlushStats();
+
+ void setPath(const vespalib::string & path) { _path = path; }
+ void setPathElementsToLog(size_t numElems) { _pathElementsToLog = numElems; }
+
+ const vespalib::string & getPath() const { return _path; }
+ size_t getPathElementsToLog() const { return _pathElementsToLog; }
+};
+
+} // namespace searchcorespi
+
diff --git a/searchcorespi/src/vespa/searchcorespi/flush/flushtask.h b/searchcorespi/src/vespa/searchcorespi/flush/flushtask.h
new file mode 100644
index 00000000000..cfcd6fd914d
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/flush/flushtask.h
@@ -0,0 +1,20 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/vespalib/util/executor.h>
+#include <vespa/vespalib/util/closure.h>
+#include <vespa/searchlib/common/serialnum.h>
+
+namespace searchcorespi {
+
+class FlushTask : public vespalib::Executor::Task
+{
+public:
+ typedef std::unique_ptr<FlushTask> UP;
+
+ virtual search::SerialNum
+ getFlushSerial() const = 0;
+};
+
+} // namespace searchcorespi
+
diff --git a/searchcorespi/src/vespa/searchcorespi/flush/iflushtarget.h b/searchcorespi/src/vespa/searchcorespi/flush/iflushtarget.h
new file mode 100644
index 00000000000..724ffe5e2bd
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/flush/iflushtarget.h
@@ -0,0 +1,186 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "flushstats.h"
+#include <string>
+#include <vespa/searchlib/common/serialnum.h>
+#include <vespa/vespalib/util/executor.h>
+#include "flushtask.h"
+
+namespace searchcorespi {
+
+/**
+ * This abstract class represents a flushable object that uses
+ * getApproxBytesBeforeFlush() bytes of memory, that will be reduced to
+ * getApproxBytesAfterFlush() if flushed.
+ */
+class IFlushTarget
+{
+public:
+ /**
+ * The flush types that a flush target can represent.
+ */
+ enum class Type {
+ FLUSH,
+ SYNC,
+ GC,
+ OTHER
+ };
+
+ /**
+ * The component types that a flush target can be used for.
+ */
+ enum class Component {
+ ATTRIBUTE,
+ INDEX,
+ DOCUMENT_STORE,
+ OTHER
+ };
+
+private:
+ vespalib::string _name;
+ Type _type;
+ Component _component;
+
+public:
+ template<typename T>
+ class Gain {
+ public:
+ Gain() : _before(0), _after(0) { }
+ Gain(T before, T after) : _before(before), _after(after) { }
+ T getBefore() const { return _before; }
+ T getAfter() const { return _after; }
+ T gain() const { return _before - _after; }
+ double gainRate() const { return (_before != 0) ? double(gain())/_before : 0;}
+ Gain & operator += (const Gain & b) { _before += b.getBefore(); _after += b.getAfter(); return *this; }
+ static Gain noGain(size_t currentSize) { return Gain(currentSize, currentSize); }
+ private:
+ T _before;
+ T _after;
+ };
+ typedef Gain<int64_t> MemoryGain;
+ typedef Gain<int64_t> DiskGain;
+ typedef search::SerialNum SerialNum;
+ typedef fastos::TimeStamp Time;
+
+ /**
+ * Convenience typedefs.
+ */
+ typedef std::shared_ptr<IFlushTarget> SP;
+ typedef std::vector<SP> List;
+ typedef FlushTask Task;
+
+ /**
+ * Constructs a new instance of this class.
+ *
+ * @param name The handler-wide unique name of this target.
+ */
+ IFlushTarget(const vespalib::string &name)
+ : _name(name),
+ _type(Type::OTHER),
+ _component(Component::OTHER)
+ {
+ }
+
+ /**
+ * Constructs a new instance of this class.
+ *
+ * @param name The handler-wide unique name of this target.
+ * @param type The flush type of this target.
+ * @param component The component type of this target.
+ */
+ IFlushTarget(const vespalib::string &name,
+ const Type &type,
+ const Component &component)
+ : _name(name),
+ _type(type),
+ _component(component)
+ {
+ }
+
+ /**
+ * Virtual destructor required for inheritance.
+ */
+ virtual ~IFlushTarget() { }
+
+ /**
+ * Returns the handler-wide unique name of this target.
+ *
+ * @return The name of this.
+ */
+ const vespalib::string & getName() const { return _name; }
+
+ /**
+ * Returns the flush type of this target.
+ */
+ Type getType() const { return _type; }
+
+ /**
+ * Returns the component type of this target.
+ */
+ Component getComponent() const { return _component; }
+
+ /**
+ * Returns the approximate memory gain of this target, in bytes.
+ *
+ * @return The gain
+ */
+ virtual MemoryGain getApproxMemoryGain() const = 0;
+
+ /**
+ * Returns the approximate memory gain of this target, in bytes.
+ *
+ * @return The gain
+ */
+ virtual DiskGain getApproxDiskGain() const = 0;
+
+ /**
+ * Returns the approximate amount of bytes this target writes to disk if flushed.
+ */
+ virtual uint64_t getApproxBytesToWriteToDisk() const = 0;
+
+ /**
+ * Returns the last serial number for the transaction applied to
+ * target before it was flushed to disk. The transaction log can
+ * not be pruned beyond this.
+ *
+ * @return The last serial number represented in flushed target
+ */
+ virtual SerialNum getFlushedSerialNum() const = 0;
+
+ /**
+ * Returns the time of last flush.
+ *
+ * @return The last flush time.
+ */
+ virtual Time getLastFlushTime() const = 0;
+
+ /**
+ * Return if the traget itself is in bad need for a flush.
+ *
+ * @return true if an urgent flush is needed
+ */
+ virtual bool needUrgentFlush() const { return false; }
+
+ /**
+ * Initiates the flushing of temporary memory. This method must perform
+ * everything required to allow another thread to complete the flush. This
+ * method is called by the flush scheduler thread.
+ *
+ * @param currentSerial The current transaction serial number.
+ * @return The task used to complete the flush.
+ */
+ virtual Task::UP initFlush(SerialNum currentSerial) = 0;
+
+ /**
+ * Returns the stats for the last completed flush operation
+ * for this flush target.
+ *
+ * @return The stats for the last flush.
+ */
+ virtual FlushStats getLastFlushStats() const = 0;
+
+};
+
+} // namespace searchcorespi
+
diff --git a/searchcorespi/src/vespa/searchcorespi/index/.gitignore b/searchcorespi/src/vespa/searchcorespi/index/.gitignore
new file mode 100644
index 00000000000..7e7c0fe7fae
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/.gitignore
@@ -0,0 +1,2 @@
+/.depend
+/Makefile
diff --git a/searchcorespi/src/vespa/searchcorespi/index/CMakeLists.txt b/searchcorespi/src/vespa/searchcorespi/index/CMakeLists.txt
new file mode 100644
index 00000000000..65b006f8ea9
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/CMakeLists.txt
@@ -0,0 +1,25 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_library(searchcorespi_index OBJECT
+ SOURCES
+ activediskindexes.cpp
+ diskindexcleaner.cpp
+ eventlogger.cpp
+ fusionrunner.cpp
+ iindexmanager.cpp
+ iindexcollection.cpp
+ index_manager_explorer.cpp
+ indexcollection.cpp
+ indexdisklayout.cpp
+ indexflushtarget.cpp
+ indexfusiontarget.cpp
+ indexmaintainer.cpp
+ indexmaintainerconfig.cpp
+ indexmaintainercontext.cpp
+ indexmanagerconfig.cpp
+ indexreadutilities.cpp
+ indexsearchable.cpp
+ indexwriteutilities.cpp
+ warmupindexcollection.cpp
+ isearchableindexcollection.cpp
+ DEPENDS
+)
diff --git a/searchcorespi/src/vespa/searchcorespi/index/activediskindexes.cpp b/searchcorespi/src/vespa/searchcorespi/index/activediskindexes.cpp
new file mode 100644
index 00000000000..73bda4d94c8
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/activediskindexes.cpp
@@ -0,0 +1,34 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/fastos/fastos.h>
+#include <vespa/log/log.h>
+LOG_SETUP(".searchcorespi.index.activediskindexes");
+
+#include "activediskindexes.h"
+
+using std::set;
+using vespalib::string;
+using vespalib::LockGuard;
+
+namespace searchcorespi {
+namespace index {
+
+void ActiveDiskIndexes::setActive(const string &index) {
+ LockGuard lock(_lock);
+ _active.insert(index);
+}
+
+void ActiveDiskIndexes::notActive(const string & index) {
+ LockGuard lock(_lock);
+ set<string>::iterator it = _active.find(index);
+ assert(it != _active.end());
+ _active.erase(it);
+}
+
+bool ActiveDiskIndexes::isActive(const string &index) const {
+ LockGuard lock(_lock);
+ return _active.find(index) != _active.end();
+}
+
+} // namespace index
+} // namespace searchcorespi
diff --git a/searchcorespi/src/vespa/searchcorespi/index/activediskindexes.h b/searchcorespi/src/vespa/searchcorespi/index/activediskindexes.h
new file mode 100644
index 00000000000..98caeb1e4b0
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/activediskindexes.h
@@ -0,0 +1,30 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/stllike/string.h>
+#include <vespa/vespalib/util/sync.h>
+#include <set>
+
+namespace searchcorespi {
+namespace index {
+
+/**
+ * Class used to keep track of the set of active disk indexes in an index maintainer.
+ * The index directories are used as identifiers.
+ */
+class ActiveDiskIndexes {
+ std::multiset<vespalib::string> _active;
+ vespalib::Lock _lock;
+
+public:
+ typedef std::shared_ptr<ActiveDiskIndexes> SP;
+
+ void setActive(const vespalib::string & index);
+ void notActive(const vespalib::string & index);
+ bool isActive(const vespalib::string & index) const;
+};
+
+} // namespace index
+} // namespace searchcorespi
+
diff --git a/searchcorespi/src/vespa/searchcorespi/index/diskindexcleaner.cpp b/searchcorespi/src/vespa/searchcorespi/index/diskindexcleaner.cpp
new file mode 100644
index 00000000000..68e7853c820
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/diskindexcleaner.cpp
@@ -0,0 +1,113 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/fastos/fastos.h>
+#include <vespa/log/log.h>
+LOG_SETUP(".searchcorespi.index.diskindexcleaner");
+
+#include "activediskindexes.h"
+#include "diskindexcleaner.h"
+#include <algorithm>
+#include <sstream>
+#include <vector>
+
+using std::istringstream;
+using vespalib::string;
+using std::vector;
+
+namespace searchcorespi {
+namespace index {
+namespace {
+vector<string> readIndexes(const string &base_dir) {
+ vector<string> indexes;
+ FastOS_DirectoryScan dir_scan(base_dir.c_str());
+ while (dir_scan.ReadNext()) {
+ string name = dir_scan.GetName();
+ if (!dir_scan.IsDirectory() || name.find("index.") != 0) {
+ continue;
+ }
+ indexes.push_back(name);
+ }
+ return indexes;
+}
+
+bool isValidIndex(const string &index_dir) {
+ FastOS_File serial_file((index_dir + "/serial.dat").c_str());
+ return serial_file.OpenReadOnlyExisting();
+}
+
+uint32_t findLastFusionId(const string &base_dir,
+ const vector<string> &indexes) {
+ uint32_t fusion_id = 0;
+ const string prefix = "index.fusion.";
+ for (size_t i = 0; i < indexes.size(); ++i) {
+ if (indexes[i].find(prefix) != 0) {
+ continue;
+ }
+ if (!isValidIndex(base_dir + "/" + indexes[i])) {
+ continue;
+ }
+
+ uint32_t new_id = 0;
+ istringstream ist(indexes[i].substr(prefix.size()));
+ ist >> new_id;
+ fusion_id = std::max(fusion_id, new_id);
+ }
+ return fusion_id;
+}
+
+void removeDir(const string &dir) {
+ LOG(debug, "Removing index dir '%s'", dir.c_str());
+ FastOS_FileInterface::EmptyAndRemoveDirectory(dir.c_str());
+}
+
+bool isOldIndex(const string &index, uint32_t last_fusion_id) {
+ string::size_type pos = index.rfind(".");
+ istringstream ist(index.substr(pos + 1));
+ uint32_t id = last_fusion_id;
+ ist >> id;
+ if (id < last_fusion_id) {
+ return true;
+ } else if (id == last_fusion_id) {
+ return index.find("flush") != string::npos;
+ }
+ return false;
+}
+
+void removeOld(const string &base_dir, const vector<string> &indexes,
+ const ActiveDiskIndexes &active_indexes) {
+ uint32_t last_fusion_id = findLastFusionId(base_dir, indexes);
+ for (size_t i = 0; i < indexes.size(); ++i) {
+ const string index_dir = base_dir + "/" + indexes[i];
+ if (isOldIndex(indexes[i], last_fusion_id) &&
+ !active_indexes.isActive(index_dir))
+ {
+ removeDir(index_dir);
+ }
+ }
+}
+
+void removeInvalid(const string &base_dir, const vector<string> &indexes) {
+ for (size_t i = 0; i < indexes.size(); ++i) {
+ const string index_dir = base_dir + "/" + indexes[i];
+ if (!isValidIndex(index_dir)) {
+ LOG(debug, "Found invalid index dir '%s'", index_dir.c_str());
+ removeDir(index_dir);
+ }
+ }
+}
+} // namespace
+
+void DiskIndexCleaner::clean(const string &base_dir,
+ const ActiveDiskIndexes &active_indexes) {
+ vector<string> indexes = readIndexes(base_dir);
+ removeOld(base_dir, indexes, active_indexes);
+ removeInvalid(base_dir, indexes);
+}
+
+void DiskIndexCleaner::removeOldIndexes(
+ const string &base_dir, const ActiveDiskIndexes &active_indexes) {
+ vector<string> indexes = readIndexes(base_dir);
+ removeOld(base_dir, indexes, active_indexes);
+}
+} // namespace index
+} // namespace searchcorespi
diff --git a/searchcorespi/src/vespa/searchcorespi/index/diskindexcleaner.h b/searchcorespi/src/vespa/searchcorespi/index/diskindexcleaner.h
new file mode 100644
index 00000000000..f8b98ffdf0d
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/diskindexcleaner.h
@@ -0,0 +1,26 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/stllike/string.h>
+
+namespace searchcorespi {
+namespace index {
+class ActiveDiskIndexes;
+
+/**
+ * Utility class used to clean and remove index directories.
+ */
+struct DiskIndexCleaner {
+ /**
+ * Deletes all indexes with id lower than the most recent fusion id.
+ */
+ static void clean(const vespalib::string &index_dir,
+ const ActiveDiskIndexes& active_indexes);
+ static void removeOldIndexes(const vespalib::string &index_dir,
+ const ActiveDiskIndexes& active_indexes);
+};
+
+} // namespace index
+} // namespace searchcorespi
+
diff --git a/searchcorespi/src/vespa/searchcorespi/index/eventlogger.cpp b/searchcorespi/src/vespa/searchcorespi/index/eventlogger.cpp
new file mode 100644
index 00000000000..c5e4a382233
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/eventlogger.cpp
@@ -0,0 +1,71 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include <vespa/log/log.h>
+LOG_SETUP(".searchcorespi.index.eventlogger");
+
+#include "eventlogger.h"
+#include <vespa/searchlib/util/logutil.h>
+
+using vespalib::JSONStringer;
+using search::util::LogUtil;
+
+namespace searchcorespi {
+namespace index {
+
+void
+EventLogger::diskIndexLoadStart(const vespalib::string &indexDir)
+{
+ JSONStringer jstr;
+ jstr.beginObject();
+ jstr.appendKey("input");
+ LogUtil::logDir(jstr, indexDir, 6);
+ jstr.endObject();
+ EV_STATE("diskindex.load.start", jstr.toString().c_str());
+}
+
+void
+EventLogger::diskIndexLoadComplete(const vespalib::string &indexDir,
+ int64_t elapsedTimeMs)
+{
+ JSONStringer jstr;
+ jstr.beginObject();
+ jstr.appendKey("time.elapsed.ms").appendInt64(elapsedTimeMs);
+ jstr.appendKey("input");
+ LogUtil::logDir(jstr, indexDir, 6);
+ jstr.endObject();
+ EV_STATE("diskindex.load.complete", jstr.toString().c_str());
+}
+
+void
+EventLogger::diskFusionStart(const std::vector<vespalib::string> &sources,
+ const vespalib::string &fusionDir)
+{
+ JSONStringer jstr;
+ jstr.beginObject();
+ jstr.appendKey("inputs");
+ jstr.beginArray();
+ for (size_t i = 0; i < sources.size(); ++i) {
+ LogUtil::logDir(jstr, sources[i], 6);
+ }
+ jstr.endArray();
+ jstr.appendKey("output");
+ LogUtil::logDir(jstr, fusionDir, 6);
+ jstr.endObject();
+ EV_STATE("fusion.start", jstr.toString().c_str());
+}
+
+void
+EventLogger::diskFusionComplete(const vespalib::string &fusionDir,
+ int64_t elapsedTimeMs)
+{
+ JSONStringer jstr;
+ jstr.beginObject();
+ jstr.appendKey("time.elapsed.ms").appendInt64(elapsedTimeMs);
+ jstr.appendKey("output");
+ LogUtil::logDir(jstr, fusionDir, 6);
+ jstr.endObject();
+ EV_STATE("fusion.complete", jstr.toString().c_str());
+}
+
+} // namespace index
+} // namespace searchcorespi
diff --git a/searchcorespi/src/vespa/searchcorespi/index/eventlogger.h b/searchcorespi/src/vespa/searchcorespi/index/eventlogger.h
new file mode 100644
index 00000000000..ce2fd340fdd
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/eventlogger.h
@@ -0,0 +1,25 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/vespalib/stllike/string.h>
+#include <vector>
+
+namespace searchcorespi {
+namespace index {
+
+/**
+ * Class used to log various events related to disk index handling.
+ **/
+struct EventLogger {
+ static void diskIndexLoadStart(const vespalib::string &indexDir);
+ static void diskIndexLoadComplete(const vespalib::string &indexDir,
+ int64_t elapsedTimeMs);
+ static void diskFusionStart(const std::vector<vespalib::string> &sources,
+ const vespalib::string &fusionDir);
+ static void diskFusionComplete(const vespalib::string &fusionDir,
+ int64_t elapsedTimeMs);
+};
+
+} // namespace index
+} // namespace searchcorespi
+
diff --git a/searchcorespi/src/vespa/searchcorespi/index/fakeindexsearchable.h b/searchcorespi/src/vespa/searchcorespi/index/fakeindexsearchable.h
new file mode 100644
index 00000000000..0db49c5f2df
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/fakeindexsearchable.h
@@ -0,0 +1,43 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "indexsearchable.h"
+#include <vespa/searchlib/queryeval/fake_searchable.h>
+
+namespace searchcorespi {
+
+/**
+ * A fake index searchable used for unit testing.
+ */
+class FakeIndexSearchable : public IndexSearchable {
+private:
+ search::queryeval::FakeSearchable _fake;
+
+public:
+ FakeIndexSearchable()
+ : _fake()
+ {
+ }
+
+ search::queryeval::FakeSearchable &getFake() { return _fake; }
+
+ /**
+ * Implements IndexSearchable
+ */
+ virtual Blueprint::UP
+ createBlueprint(const IRequestContext & requestContext,
+ const FieldSpec &field,
+ const Node &term,
+ const IAttributeContext &)
+ {
+ return _fake.createBlueprint(requestContext, field, term);
+ }
+
+ virtual search::SearchableStats getSearchableStats() const {
+ return search::SearchableStats();
+ }
+};
+
+} // namespace searchcorespi
+
diff --git a/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.cpp b/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.cpp
new file mode 100644
index 00000000000..13bef302083
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.cpp
@@ -0,0 +1,145 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/fastos/fastos.h>
+#include <vespa/log/log.h>
+LOG_SETUP(".searchcorespi.index.fusionrunner");
+
+#include "fusionrunner.h"
+#include "eventlogger.h"
+#include "fusionspec.h"
+#include <vespa/searchlib/common/serialnumfileheadercontext.h>
+#include <vespa/searchlib/attribute/fixedsourceselector.h>
+#include <vespa/searchlib/queryeval/isourceselector.h>
+#include <vespa/searchlib/util/dirtraverse.h>
+#include <vespa/vespalib/util/jsonwriter.h>
+
+using search::FixedSourceSelector;
+using search::TuneFileAttributes;
+using search::TuneFileIndexing;
+using search::common::FileHeaderContext;
+using search::common::SerialNumFileHeaderContext;
+using search::index::Schema;
+using search::queryeval::ISourceSelector;
+using search::diskindex::SelectorArray;
+using search::SerialNum;
+using std::vector;
+using vespalib::string;
+using vespalib::JSONStringer;
+
+namespace searchcorespi {
+namespace index {
+
+FusionRunner::FusionRunner(const string &base_dir,
+ const Schema &schema,
+ const TuneFileAttributes &tuneFileAttributes,
+ const FileHeaderContext &fileHeaderContext)
+ : _diskLayout(base_dir),
+ _schema(schema),
+ _tuneFileAttributes(tuneFileAttributes),
+ _fileHeaderContext(fileHeaderContext)
+{
+}
+
+namespace {
+
+void readSelectorArray(const string &selector_name, SelectorArray &selector_array,
+ const vector<uint8_t> &id_map, uint32_t base_id) {
+ FixedSourceSelector::UP selector =
+ FixedSourceSelector::load(selector_name);
+ if (base_id != selector->getBaseId()) {
+ selector = selector->cloneAndSubtract(
+ "tmp_for_fusion", base_id - selector->getBaseId());
+ }
+
+ const uint32_t num_docs = selector->getDocIdLimit();
+ selector_array.reserve(num_docs);
+ ISourceSelector::Iterator::UP it = selector->createIterator();
+ for (uint32_t i = 0; i < num_docs; ++i) {
+ search::queryeval::Source source = it->getSource(i);
+ assert(source < id_map.size());
+ selector_array.push_back(id_map[source]);
+ }
+}
+
+bool
+writeFusionSelector(const IndexDiskLayout &diskLayout, uint32_t fusion_id,
+ uint32_t highest_doc_id,
+ const TuneFileAttributes &tuneFileAttributes,
+ const FileHeaderContext &fileHeaderContext)
+{
+ const search::queryeval::Source default_source = 0;
+ FixedSourceSelector fusion_selector(default_source, "fusion_selector");
+ fusion_selector.setSource(highest_doc_id, default_source);
+ fusion_selector.setBaseId(fusion_id);
+ string selector_name =
+ IndexDiskLayout::getSelectorFileName(diskLayout.getFusionDir(fusion_id));
+ if (!fusion_selector.extractSaveInfo(selector_name)->save(
+ tuneFileAttributes, fileHeaderContext)) {
+ LOG(warning,
+ "Unable to write source selector data for fusion.%u.", fusion_id);
+ return false;
+ }
+ return true;
+}
+} // namespace
+
+uint32_t
+FusionRunner::fuse(const FusionSpec &fusion_spec,
+ SerialNum lastSerialNum,
+ IIndexMaintainerOperations &operations)
+{
+ const vector<uint32_t> &ids = fusion_spec.flush_ids;
+ if (ids.empty()) {
+ return 0;
+ }
+ const uint32_t fusion_id = ids.back();
+ const string fusion_dir = _diskLayout.getFusionDir(fusion_id);
+
+ vector<string> sources;
+ vector<uint8_t> id_map(fusion_id + 1);
+ if (fusion_spec.last_fusion_id != 0) {
+ id_map[0] = sources.size();
+ sources.push_back(
+ _diskLayout.getFusionDir(fusion_spec.last_fusion_id));
+ }
+ for (size_t i = 0; i < ids.size(); ++i) {
+ id_map[ids[i] - fusion_spec.last_fusion_id] = sources.size();
+ sources.push_back(_diskLayout.getFlushDir(ids[i]));
+ }
+
+ if (LOG_WOULD_LOG(event)) {
+ EventLogger::diskFusionStart(sources,
+ fusion_dir);
+ }
+ FastOS_Time timer;
+ timer.SetNow();
+
+ const string selector_name =
+ IndexDiskLayout::getSelectorFileName(_diskLayout.getFlushDir(fusion_id));
+ SelectorArray selector_array;
+ readSelectorArray(selector_name, selector_array,
+ id_map, fusion_spec.last_fusion_id);
+
+ if (!operations.runFusion(_schema, fusion_dir, sources, selector_array,
+ lastSerialNum)) {
+ return 0;
+ }
+
+ const uint32_t highest_doc_id = selector_array.size() - 1;
+ SerialNumFileHeaderContext fileHeaderContext(_fileHeaderContext,
+ lastSerialNum);
+ if (!writeFusionSelector(_diskLayout, fusion_id, highest_doc_id,
+ _tuneFileAttributes,
+ fileHeaderContext)) {
+ return 0;
+ }
+
+ if (LOG_WOULD_LOG(event)) {
+ EventLogger::diskFusionComplete(fusion_dir,
+ (int64_t)timer.MilliSecsToNow());
+ }
+ return fusion_id;
+}
+
+} // namespace index
+} // namespace searchcorespi
diff --git a/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.h b/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.h
new file mode 100644
index 00000000000..097b76bc4cc
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/fusionrunner.h
@@ -0,0 +1,63 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "iindexmaintaineroperations.h"
+#include "indexdisklayout.h"
+#include <vespa/searchcommon/common/schema.h>
+#include <vespa/searchlib/common/tunefileinfo.h>
+
+namespace search
+{
+
+namespace common
+{
+
+class FileHeaderContext;
+
+}
+
+}
+
+namespace searchcorespi {
+namespace index {
+class FusionSpec;
+
+/**
+ * FusionRunner runs fusion on a set of disk indexes, specified as a
+ * vector of ids. The disk indexes must be stored in directories named
+ * "index.flush.<id>" within the base dir, and the fusioned indexes
+ * will be stored similarly in directories named "index.fusion.<id>".
+ **/
+class FusionRunner {
+ const IndexDiskLayout _diskLayout;
+ const search::index::Schema _schema;
+ const search::TuneFileAttributes _tuneFileAttributes;
+ const search::common::FileHeaderContext &_fileHeaderContext;
+
+public:
+ /**
+ * Create a FusionRunner that operates on indexes stored in the
+ * base dir.
+ **/
+ FusionRunner(const vespalib::string &base_dir,
+ const search::index::Schema &schema,
+ const search::TuneFileAttributes &tuneFileAttributes,
+ const search::common::FileHeaderContext &fileHeaderContext);
+
+ /**
+ * Combine the indexes specified by the ids by running fusion.
+ *
+ * @param fusion_spec the specification on which indexes to run fusion on.
+ * @param lastSerialNum the serial number of the last flushed index part of the fusion spec.
+ * @param operations interface used for running the actual fusion.
+ * @return the id of the fusioned disk index
+ **/
+ uint32_t fuse(const FusionSpec &fusion_spec,
+ search::SerialNum lastSerialNum,
+ IIndexMaintainerOperations &operations);
+};
+
+} // namespace index
+} // namespace searchcorespi
+
diff --git a/searchcorespi/src/vespa/searchcorespi/index/fusionspec.h b/searchcorespi/src/vespa/searchcorespi/index/fusionspec.h
new file mode 100644
index 00000000000..5e31698a7fc
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/fusionspec.h
@@ -0,0 +1,24 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vector>
+
+namespace searchcorespi {
+namespace index {
+
+/**
+ * Specifies a set of disk index ids for fusion.
+ *
+ * Note: All ids in FusionSpec are absolute ids.
+ **/
+struct FusionSpec {
+ uint32_t last_fusion_id;
+ std::vector<uint32_t> flush_ids;
+
+ FusionSpec() : last_fusion_id(0), flush_ids() {}
+};
+
+} // namespace index
+} // namespace searchcorespi
+
diff --git a/searchcorespi/src/vespa/searchcorespi/index/i_thread_service.h b/searchcorespi/src/vespa/searchcorespi/index/i_thread_service.h
new file mode 100644
index 00000000000..ffa160f7c96
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/i_thread_service.h
@@ -0,0 +1,34 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <boost/noncopyable.hpp>
+#include <vespa/vespalib/util/runnable.h>
+#include <vespa/vespalib/util/threadexecutor.h>
+
+namespace searchcorespi {
+namespace index {
+
+/**
+ * Interface for a single thread used for write tasks.
+ */
+struct IThreadService : public boost::noncopyable,
+ public vespalib::ThreadExecutor
+{
+ virtual ~IThreadService() {}
+
+ /**
+ * Run the given runnable in the underlying thread and wait until its done.
+ */
+ virtual void run(vespalib::Runnable &runnable) = 0;
+
+ /**
+ * Returns whether the current thread is the underlying thread.
+ */
+ virtual bool isCurrentThread() const = 0;
+
+};
+
+} // namespace index
+} // namespace searchcorespi
+
+
diff --git a/searchcorespi/src/vespa/searchcorespi/index/idiskindex.h b/searchcorespi/src/vespa/searchcorespi/index/idiskindex.h
new file mode 100644
index 00000000000..7495939ee91
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/idiskindex.h
@@ -0,0 +1,33 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/searchcorespi/index/indexsearchable.h>
+#include <vespa/searchcommon/common/schema.h>
+#include <vespa/vespalib/stllike/string.h>
+
+namespace searchcorespi {
+namespace index {
+
+/**
+ * Interface for a disk index as seen from an index maintainer.
+ */
+struct IDiskIndex : public IndexSearchable {
+ typedef std::shared_ptr<IDiskIndex> SP;
+ virtual ~IDiskIndex() {}
+
+ /**
+ * Returns the directory in which this disk index exists.
+ */
+ virtual const vespalib::string &getIndexDir() const = 0;
+
+ /**
+ * Returns the schema used by this disk index.
+ * Note that the schema should be part of the index on disk.
+ */
+ virtual const search::index::Schema &getSchema() const = 0;
+};
+
+} // namespace index
+} // namespace searchcorespi
+
+
diff --git a/searchcorespi/src/vespa/searchcorespi/index/iindexcollection.cpp b/searchcorespi/src/vespa/searchcorespi/index/iindexcollection.cpp
new file mode 100644
index 00000000000..961e908f1bb
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/iindexcollection.cpp
@@ -0,0 +1,45 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/searchcorespi/index/iindexcollection.h>
+#include <vespa/searchcorespi/index/idiskindex.h>
+#include <vespa/vespalib/stllike/asciistream.h>
+
+namespace searchcorespi {
+
+using index::IDiskIndex;
+
+vespalib::string IIndexCollection::toString() const
+{
+ vespalib::asciistream s;
+ s << "selector : " << &getSourceSelector() << "(baseId=" << getSourceSelector().getBaseId()
+ << ", docidlimit=" << getSourceSelector().getDocIdLimit()
+ << ", defaultsource=" << uint32_t(getSourceSelector().getDefaultSource())
+ << ")\n";
+#if 0
+ search::queryeval::ISourceSelector::Iterator::UP it = getSourceSelector().createIterator();
+ s << "{";
+ for (size_t i(0), m(getSourceSelector().getDocIdLimit()); i < m; i++) {
+ s << uint32_t(it->getSource(i)) << ' ';
+ }
+ s << "}\n";
+#endif
+ s << getSourceCount() << " {";
+ if (getSourceCount() > 0) {
+ for (size_t i(0), m(getSourceCount()); i < m; i++) {
+ if (i != 0) {
+ s << ", ";
+ }
+ const IndexSearchable & is(getSearchable(i));
+ s << getSourceId(i) << " : " << &is << "(";
+ if (dynamic_cast<const IDiskIndex *>(&is) != NULL) {
+ s << dynamic_cast<const IDiskIndex &>(is).getIndexDir().c_str();
+ } else {
+ s << typeid(is).name();
+ }
+ s << ")";
+ }
+ }
+ s << "}";
+ return s.str();
+}
+
+}
diff --git a/searchcorespi/src/vespa/searchcorespi/index/iindexcollection.h b/searchcorespi/src/vespa/searchcorespi/index/iindexcollection.h
new file mode 100644
index 00000000000..e4b90f11bac
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/iindexcollection.h
@@ -0,0 +1,50 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "indexsearchable.h"
+#include <vespa/searchlib/queryeval/isourceselector.h>
+#include <vespa/searchlib/util/searchable_stats.h>
+
+namespace searchcorespi {
+
+/**
+ * Interface for a set of index searchables with source ids,
+ * and a source selector for determining which index searchable to use for each document.
+ */
+class IIndexCollection {
+protected:
+ typedef search::queryeval::ISourceSelector ISourceSelector;
+public:
+ typedef std::unique_ptr<IIndexCollection> UP;
+ typedef std::shared_ptr<IIndexCollection> SP;
+
+ virtual ~IIndexCollection() {}
+
+ /**
+ * Returns the source selector used to determine which index to use for each document.
+ */
+ virtual const ISourceSelector &getSourceSelector() const = 0;
+
+ /**
+ * Returns the number sources (index searchables) for this collection.
+ */
+ virtual size_t getSourceCount() const = 0;
+
+ /**
+ * Returns the index searchable for source i (i in the range [0, getSourceCount()>).
+ */
+ virtual IndexSearchable &getSearchable(uint32_t i) const = 0;
+
+ /**
+ * Returns the source id for source i (i in the range [0, getSourceCount()>).
+ * The source id is used for this source in the source selector.
+ */
+ virtual uint32_t getSourceId(uint32_t i) const = 0;
+
+ virtual vespalib::string toString() const;
+
+};
+
+} // namespace searchcorespi
+
diff --git a/searchcorespi/src/vespa/searchcorespi/index/iindexmaintaineroperations.h b/searchcorespi/src/vespa/searchcorespi/index/iindexmaintaineroperations.h
new file mode 100644
index 00000000000..2b8a3ac3965
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/iindexmaintaineroperations.h
@@ -0,0 +1,56 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "idiskindex.h"
+#include "imemoryindex.h"
+#include <vespa/searchcommon/common/schema.h>
+#include <vespa/searchlib/common/serialnum.h>
+#include <vespa/searchlib/diskindex/docidmapper.h>
+
+namespace searchcorespi {
+namespace index {
+
+/**
+ * Interface for operations needed by an index maintainer.
+ */
+struct IIndexMaintainerOperations {
+ virtual ~IIndexMaintainerOperations() {}
+
+ /**
+ * Creates a new memory index using the given schema.
+ */
+ virtual IMemoryIndex::SP createMemoryIndex(const search::index::Schema &schema) = 0;
+
+ /**
+ * Loads a disk index from the given directory.
+ */
+ virtual IDiskIndex::SP loadDiskIndex(const vespalib::string &indexDir) = 0;
+
+ /**
+ * Reloads the given disk index and returns a new instance.
+ */
+ virtual IDiskIndex::SP reloadDiskIndex(const IDiskIndex &oldIndex) = 0;
+
+ /**
+ * Runs fusion on a given set of input disk indexes to create a fusioned output disk index.
+ * The selector array contains a source for all local document ids ([0, docIdLimit>)
+ * in the range [0, sources.size()> and is used to determine in which input disk index
+ * a document is located.
+ *
+ * @param schema the schema of the resulting fusioned disk index.
+ * @param outputDir the output directory of the fusioned disk index.
+ * @param sources the directories of the input disk indexes.
+ * @param selectorArray the array specifying in which input disk index a document is located.
+ * @param lastSerialNum the serial number of the last operation in the last input disk index.
+ */
+ virtual bool runFusion(const search::index::Schema &schema,
+ const vespalib::string &outputDir,
+ const std::vector<vespalib::string> &sources,
+ const search::diskindex::SelectorArray &selectorArray,
+ search::SerialNum lastSerialNum) = 0;
+};
+
+} // namespace index
+} // namespace searchcorespi
+
+
diff --git a/searchcorespi/src/vespa/searchcorespi/index/iindexmanager.cpp b/searchcorespi/src/vespa/searchcorespi/index/iindexmanager.cpp
new file mode 100644
index 00000000000..b6d350ad52e
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/iindexmanager.cpp
@@ -0,0 +1,17 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/searchcorespi/index/iindexmanager.h>
+
+namespace searchcorespi {
+
+void
+IIndexManager::wipeHistory(SerialNum wipeSerial, const Schema &historyFields)
+{
+ (void) wipeSerial;
+ (void) historyFields;
+}
+
+IIndexManager::Reconfigurer::~Reconfigurer()
+{
+}
+
+} // namespace searchcorespi
diff --git a/searchcorespi/src/vespa/searchcorespi/index/iindexmanager.h b/searchcorespi/src/vespa/searchcorespi/index/iindexmanager.h
new file mode 100644
index 00000000000..e774629c787
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/iindexmanager.h
@@ -0,0 +1,178 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/document/fieldvalue/document.h>
+#include <vespa/searchcommon/common/schema.h>
+#include <vespa/searchcorespi/flush/flushstats.h>
+#include <vespa/searchcorespi/flush/iflushtarget.h>
+#include <vespa/searchlib/common/serialnum.h>
+#include <vespa/vespalib/util/closure.h>
+#include "indexsearchable.h"
+
+namespace search
+{
+
+class IDestructorCallback;
+
+}
+
+namespace searchcorespi {
+
+/**
+ * Interface for an index manager:
+ * - Keeps track of a set of indexes (i.e. both memory indexes and disk indexes).
+ * - Documents can be inserted, updated or removed in/from the active memory index.
+ * - Enables search across all the indexes.
+ * - Manages the set of indexes through flush targets to the flush engine (i.e. flushing of memory
+ * indexes and fusion of disk indexes).
+ *
+ * Key items in this interface is <em>lid</em> which is a local document id assigned to each document.
+ * This is a numeric id in the range [0...Maximum number of documents concurrently in this DB>.
+ * Local document id 0 is reserved. Another key item is the <em>serialnumber</em> which is the
+ * serial number an operation was given when first seen by the searchcore. This is a monotonic
+ * increasing number used for sequencing operations and figuring out how up to date componets are.
+ * Used during restart/replay and for deciding when to flush. Both the lid and the serial number
+ * are persisted alongside an operation to ensure correct playback during recovery.
+ */
+class IIndexManager {
+protected:
+ typedef document::Document Document;
+ typedef search::SerialNum SerialNum;
+ typedef search::index::Schema Schema;
+
+public:
+ using OnWriteDoneType = const std::shared_ptr<search::IDestructorCallback> &;
+ /**
+ * Interface used to signal when index manager has been reconfigured.
+ */
+ struct Reconfigurer {
+ virtual ~Reconfigurer();
+ /**
+ * Reconfigure index manager and infrastructure around it while system is in a quiescent state.
+ */
+ virtual bool reconfigure(vespalib::Closure0<bool>::UP closure) = 0;
+ };
+
+ typedef std::unique_ptr<IIndexManager> UP;
+ typedef std::shared_ptr<IIndexManager> SP;
+
+ virtual ~IIndexManager() {}
+
+ /**
+ * Inserts a document into the index. This method is async, caller
+ * must either wait for notification about write done or sync
+ * indexFieldWriter executor in threading service to get sync
+ * behavior.
+ *
+ * If the inserted document id already exist the old version must
+ * be removed before inserting the new.
+ *
+ * @param lid The local document id for the document.
+ *
+ * @param doc The document to insert.
+ *
+ * @param serialNum The unique monotoninc increasing serial number
+ * for this operation.
+ **/
+ virtual void putDocument(uint32_t lid, const Document &doc, SerialNum serialNum) = 0;
+
+ /**
+ * Removes the given document from the index. This method is
+ * async, caller must either wait for notification about write
+ * done or sync indexFieldWriter executor in threading service to
+ * get sync behavior.
+ *
+ * @param lid The local document id for the document.
+ *
+ * @param serialNum The unique monotoninc increasing serial number
+ * for this operation.
+ **/
+ virtual void removeDocument(uint32_t lid, SerialNum serialNum) = 0;
+
+ /**
+ * Commits the document puts and removes since the last commit,
+ * making them searchable. This method is async, caller must
+ * either wait for notification about write done or sync
+ * indexFieldWriter executor in threading service to get sync
+ * behavior.
+ *
+ * @param serialNum The unique monotoninc increasing serial number
+ * for this operation.
+ *
+ * @param onWriteDone shared object that notifies write done when
+ * destructed.
+ **/
+ virtual void commit(SerialNum serialNum, OnWriteDoneType onWriteDone) = 0;
+
+ /**
+ * This method is called on a regular basis to update each component with what is the highest
+ * serial number for any component. This is for all components to be able to correctly tell its age.
+ *
+ * @param serialNum The serial number of the last known operation.
+ */
+ virtual void heartBeat(SerialNum serialNum) = 0;
+
+ /**
+ * Returns the current serial number of the index.
+ * This should also reflect any heart beats.
+ *
+ * @return current serial number of the component.
+ **/
+ virtual SerialNum getCurrentSerialNum() const = 0;
+
+ /**
+ * Returns the serial number of the last flushed index.
+ *
+ * @return the serial number of the last flushed index.
+ **/
+ virtual SerialNum getFlushedSerialNum() const = 0;
+
+ /**
+ * Returns the searchable that will give the correct search view of the index manager.
+ * Normally switched everytime underlying index structures are changed in a way that can not be
+ * handled in a thread safe way without locking. For instance flushing of memory index or
+ * starting using a new schema.
+ *
+ * @return the current searchable.
+ **/
+ virtual IndexSearchable::SP getSearchable() const = 0;
+
+ /**
+ * Returns searchable stats for this index manager.
+ *
+ * @return statistics gathered about underlying memory and disk indexes.
+ */
+ virtual search::SearchableStats getSearchableStats() const = 0;
+
+ /**
+ * Returns the list of all flush targets contained in this index manager.
+ *
+ * @return The list of flushable items in this component.
+ **/
+ virtual IFlushTarget::List getFlushTargets() = 0;
+
+ /**
+ * Sets the new schema and new fusion schema to be used by this index manager.
+ * The fusion schema is the union of the new schema and the history schema.
+ * The history schema keeps track of removed fields that have not been completely wiped yet.
+ * By using the fusion schema during fusion we ensure that removed fields are taken into the
+ * fusioned disk index to support the case where they are later re-applied.
+ *
+ * @param schema The new schema to start using.
+ * @param fusionSchema The new fusion schema to start using.
+ **/
+ virtual void setSchema(const Schema &schema, const Schema &fusionSchema) = 0;
+
+ /**
+ * Wipes remains of removed fields from this index manager as specified in the history schema.
+ * This can for instance be removing these fields from disk indexes.
+ * The default implementation does nothing.
+ *
+ * @param wipeSerial The serial number of this wipe operation.
+ * @param historyFields The schema specifying which fields we should wipe away.
+ **/
+ virtual void wipeHistory(SerialNum wipeSerial, const Schema &historyFields);
+};
+
+} // namespace searchcorespi
+
diff --git a/searchcorespi/src/vespa/searchcorespi/index/imemoryindex.h b/searchcorespi/src/vespa/searchcorespi/index/imemoryindex.h
new file mode 100644
index 00000000000..fabaf730cb7
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/imemoryindex.h
@@ -0,0 +1,87 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/document/fieldvalue/document.h>
+#include <vespa/searchcommon/common/schema.h>
+#include <vespa/searchcorespi/index/indexsearchable.h>
+#include <vespa/searchlib/common/serialnum.h>
+#include <vespa/searchlib/util/memoryusage.h>
+#include <vespa/vespalib/stllike/string.h>
+
+namespace search
+{
+
+class IDestructorCallback;
+
+}
+
+namespace searchcorespi {
+namespace index {
+
+/**
+ * Interface for a memory index as seen from an index maintainer.
+ */
+struct IMemoryIndex : public searchcorespi::IndexSearchable {
+ typedef std::shared_ptr<IMemoryIndex> SP;
+ using OnWriteDoneType =
+ const std::shared_ptr<search::IDestructorCallback> &;
+ virtual ~IMemoryIndex() {}
+
+ /**
+ * Returns true if this memory index has received any document insert operations.
+ */
+ virtual bool hasReceivedDocumentInsert() const = 0;
+
+ /**
+ * Returns the memory usage of this memory index.
+ */
+ virtual search::MemoryUsage getMemoryUsage() const = 0;
+
+ /**
+ * Returns the memory usage of an empty version of this memory index.
+ */
+ virtual uint64_t getStaticMemoryFootprint() const = 0;
+
+ /**
+ * Inserts the given document into this memory index.
+ * If the document already exists it should be removed first.
+ *
+ * @param lid the local document id.
+ * @param doc the document to insert.
+ */
+ virtual void insertDocument(uint32_t lid, const document::Document &doc) = 0;
+
+ /**
+ * Removes the given document from this memory index.
+ *
+ * @param lid the local document id.
+ */
+ virtual void removeDocument(uint32_t lid) = 0;
+
+ /**
+ * Commits the inserts and removes since the last commit, making them searchable.
+ **/
+ virtual void commit(OnWriteDoneType onWriteDone) = 0;
+
+ /**
+ * Flushes this memory index to disk as a disk index.
+ * After a flush it should be possible to load a IDiskIndex from the flush directory.
+ * Note that the schema used when constructing the memory index should be flushed as well
+ * since a IDiskIndex should be able to return the schema used by the disk index.
+ *
+ * @param flushDir the directory in which to save the flushed index.
+ * @param docIdLimit the largest local document id used + 1
+ * @param serialNum the serial number of the last operation to the memory index.
+ */
+ virtual void flushToDisk(const vespalib::string &flushDir,
+ uint32_t docIdLimit,
+ search::SerialNum serialNum) = 0;
+
+ virtual void wipeHistory(const search::index::Schema &schema) = 0;
+ virtual search::index::Schema::SP getWipeTimeSchema() const = 0;
+};
+
+} // namespace index
+} // namespace searchcorespi
+
+
diff --git a/searchcorespi/src/vespa/searchcorespi/index/index_manager_explorer.cpp b/searchcorespi/src/vespa/searchcorespi/index/index_manager_explorer.cpp
new file mode 100644
index 00000000000..f3695f2bd52
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/index_manager_explorer.cpp
@@ -0,0 +1,28 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/fastos/fastos.h>
+#include <vespa/log/log.h>
+LOG_SETUP(".searchcorespi.index.index_manager_explorer");
+#include "index_manager_explorer.h"
+
+#include <vespa/vespalib/data/slime/cursor.h>
+
+using vespalib::slime::Cursor;
+using vespalib::slime::Inserter;
+
+namespace searchcorespi {
+
+IndexManagerExplorer::IndexManagerExplorer(IIndexManager::SP mgr)
+ : _mgr(std::move(mgr))
+{
+}
+
+void
+IndexManagerExplorer::get_state(const Inserter &inserter, bool full) const
+{
+ (void) full;
+ Cursor &object = inserter.insertObject();
+ object.setLong("lastSerialNum", _mgr->getCurrentSerialNum());
+}
+
+} // namespace searchcorespi
diff --git a/searchcorespi/src/vespa/searchcorespi/index/index_manager_explorer.h b/searchcorespi/src/vespa/searchcorespi/index/index_manager_explorer.h
new file mode 100644
index 00000000000..1f26d2e5c36
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/index_manager_explorer.h
@@ -0,0 +1,25 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "iindexmanager.h"
+#include <vespa/vespalib/net/state_explorer.h>
+
+namespace searchcorespi {
+
+/**
+ * Class used to explore the state of an index manager.
+ */
+class IndexManagerExplorer : public vespalib::StateExplorer
+{
+private:
+ IIndexManager::SP _mgr;
+
+public:
+ IndexManagerExplorer(IIndexManager::SP mgr);
+
+ // Implements vespalib::StateExplorer
+ virtual void get_state(const vespalib::slime::Inserter &inserter, bool full) const override;
+};
+
+} // namespace searchcorespi
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexcollection.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexcollection.cpp
new file mode 100644
index 00000000000..0284c4a682e
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexcollection.cpp
@@ -0,0 +1,223 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/fastos/fastos.h>
+#include <vespa/log/log.h>
+LOG_SETUP(".searchcorespi.index.indexcollection");
+
+#include "indexcollection.h"
+#include <vespa/searchlib/queryeval/isourceselector.h>
+#include <vespa/searchlib/queryeval/create_blueprint_visitor_helper.h>
+#include <vespa/searchlib/queryeval/intermediate_blueprints.h>
+#include <vespa/searchlib/queryeval/leaf_blueprints.h>
+
+using namespace search::queryeval;
+using namespace search::query;
+using search::attribute::IAttributeContext;
+
+namespace searchcorespi {
+
+IndexCollection::IndexCollection(const ISourceSelector::SP & selector)
+ : _source_selector(selector),
+ _sources()
+{
+}
+
+IndexCollection::IndexCollection(const ISourceSelector::SP & selector,
+ const ISearchableIndexCollection &sources)
+ : _source_selector(selector),
+ _sources()
+{
+ for (size_t i(0), m(sources.getSourceCount()); i < m; i++) {
+ append(sources.getSourceId(i), sources.getSearchableSP(i));
+ }
+ setCurrentIndex(sources.getCurrentIndex());
+}
+
+void
+IndexCollection::setSource(uint32_t docId)
+{
+ assert( valid() );
+ _source_selector->setSource(docId, getCurrentIndex());
+}
+
+ISearchableIndexCollection::UP
+IndexCollection::replaceAndRenumber(const ISourceSelector::SP & selector,
+ const ISearchableIndexCollection &fsc,
+ uint32_t id_diff,
+ const IndexSearchable::SP &new_source)
+{
+ ISearchableIndexCollection::UP new_fsc(new IndexCollection(selector));
+ new_fsc->append(0, new_source);
+ for (size_t i = 0; i < fsc.getSourceCount(); ++i) {
+ if (fsc.getSourceId(i) > id_diff) {
+ new_fsc->append(fsc.getSourceId(i) - id_diff,
+ fsc.getSearchableSP(i));
+ }
+ }
+ return new_fsc;
+}
+
+void
+IndexCollection::append(uint32_t id, const IndexSearchable::SP &fs)
+{
+ _sources.push_back(SourceWithId(id, fs));
+}
+
+IndexSearchable::SP
+IndexCollection::getSearchableSP(uint32_t i) const
+{
+ return _sources[i].source_wrapper;
+}
+
+void
+IndexCollection::replace(uint32_t id, const IndexSearchable::SP &fs)
+{
+ for (size_t i = 0; i < _sources.size(); ++i) {
+ if (_sources[i].id == id) {
+ _sources[i].source_wrapper = fs;
+ return;
+ }
+ }
+ LOG(warning, "Tried to replace Searchable %d, but it wasn't there.", id);
+ append(id, fs);
+}
+
+const ISourceSelector &
+IndexCollection::getSourceSelector() const
+{
+ return *_source_selector;
+}
+
+size_t
+IndexCollection::getSourceCount() const
+{
+ return _sources.size();
+}
+
+IndexSearchable &
+IndexCollection::getSearchable(uint32_t i) const
+{
+ return *_sources[i].source_wrapper;
+}
+
+uint32_t
+IndexCollection::getSourceId(uint32_t i) const
+{
+ return _sources[i].id;
+}
+
+search::SearchableStats
+IndexCollection::getSearchableStats() const
+{
+ search::SearchableStats stats;
+ for (size_t i = 0; i < _sources.size(); ++i) {
+ stats.add(_sources[i].source_wrapper->getSearchableStats());
+ }
+ return stats;
+}
+
+namespace {
+
+struct Mixer {
+ const ISourceSelector &_selector;
+ std::unique_ptr<SourceBlenderBlueprint> _blender;
+
+ Mixer(const ISourceSelector &selector)
+ : _selector(selector), _blender() {}
+
+ void addIndex(Blueprint::UP index) {
+ if (_blender.get() == NULL) {
+ _blender.reset(new SourceBlenderBlueprint(_selector));
+ }
+ _blender->addChild(std::move(index));
+ }
+
+ Blueprint::UP mix() {
+ if (_blender.get() == NULL) {
+ return Blueprint::UP(new EmptyBlueprint());
+ }
+ return Blueprint::UP(_blender.release());
+ }
+};
+
+class CreateBlueprintVisitor : public search::query::QueryVisitor {
+private:
+ const IIndexCollection &_indexes;
+ const FieldSpecList &_fields;
+ const IAttributeContext &_attrCtx;
+ const IRequestContext &_requestContext;
+ Blueprint::UP _result;
+
+ template <typename NodeType>
+ void visitTerm(NodeType &n) {
+ Mixer mixer(_indexes.getSourceSelector());
+ for (size_t i = 0; i < _indexes.getSourceCount(); ++i) {
+ Blueprint::UP blueprint = _indexes.getSearchable(i).createBlueprint(_requestContext, _fields, n, _attrCtx);
+ blueprint->setSourceId(_indexes.getSourceId(i));
+ mixer.addIndex(std::move(blueprint));
+ }
+ _result = mixer.mix();
+ }
+
+ virtual void visit(And &) { }
+ virtual void visit(AndNot &) { }
+ virtual void visit(Or &) { }
+ virtual void visit(WeakAnd &) { }
+ virtual void visit(Equiv &) { }
+ virtual void visit(Rank &) { }
+ virtual void visit(Near &) { }
+ virtual void visit(ONear &) { }
+
+ virtual void visit(WeightedSetTerm &n) { visitTerm(n); }
+ virtual void visit(DotProduct &n) { visitTerm(n); }
+ virtual void visit(WandTerm &n) { visitTerm(n); }
+ virtual void visit(Phrase &n) { visitTerm(n); }
+ virtual void visit(NumberTerm &n) { visitTerm(n); }
+ virtual void visit(LocationTerm &n) { visitTerm(n); }
+ virtual void visit(PrefixTerm &n) { visitTerm(n); }
+ virtual void visit(RangeTerm &n) { visitTerm(n); }
+ virtual void visit(StringTerm &n) { visitTerm(n); }
+ virtual void visit(SubstringTerm &n) { visitTerm(n); }
+ virtual void visit(SuffixTerm &n) { visitTerm(n); }
+ virtual void visit(PredicateQuery &n) { visitTerm(n); }
+ virtual void visit(RegExpTerm &n) { visitTerm(n); }
+
+public:
+ CreateBlueprintVisitor(const IIndexCollection &indexes,
+ const FieldSpecList &fields,
+ const IAttributeContext &attrCtx,
+ const IRequestContext & requestContext)
+ : _indexes(indexes),
+ _fields(fields),
+ _attrCtx(attrCtx),
+ _requestContext(requestContext),
+ _result() {}
+
+ Blueprint::UP getResult() { return std::move(_result); }
+};
+
+}
+
+Blueprint::UP
+IndexCollection::createBlueprint(const IRequestContext & requestContext,
+ const FieldSpec &field,
+ const Node &term,
+ const IAttributeContext &attrCtx)
+{
+ FieldSpecList fields;
+ fields.add(field);
+ return createBlueprint(requestContext, fields, term, attrCtx);
+}
+
+Blueprint::UP
+IndexCollection::createBlueprint(const IRequestContext & requestContext,
+ const FieldSpecList &fields,
+ const Node &term,
+ const IAttributeContext &attrCtx)
+{
+ CreateBlueprintVisitor visitor(*this, fields, attrCtx, requestContext);
+ const_cast<Node &>(term).accept(visitor);
+ return visitor.getResult();
+}
+
+} // namespace searchcorespi
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexcollection.h b/searchcorespi/src/vespa/searchcorespi/index/indexcollection.h
new file mode 100644
index 00000000000..223b36fce99
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexcollection.h
@@ -0,0 +1,73 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "isearchableindexcollection.h"
+#include <vespa/searchlib/queryeval/isourceselector.h>
+#include <vespa/searchlib/util/searchable_stats.h>
+#include <memory>
+#include <set>
+#include <utility>
+#include <vector>
+
+namespace searchcorespi {
+
+/**
+ * Holds a set of index searchables with source ids, and a source selector for
+ * determining which index to use for each document.
+ */
+class IndexCollection : public ISearchableIndexCollection
+{
+ struct SourceWithId {
+ uint32_t id;
+ IndexSearchable::SP source_wrapper;
+
+ SourceWithId(uint32_t id_in,
+ const IndexSearchable::SP &source_in)
+ : id(id_in), source_wrapper(source_in) {}
+ SourceWithId() : id(0), source_wrapper() {}
+ };
+
+ // Selector shared across memory dumps, replaced on disk fusion operations
+ ISourceSelector::SP _source_selector;
+ std::vector<SourceWithId> _sources;
+
+public:
+ IndexCollection(const ISourceSelector::SP & selector);
+ IndexCollection(const ISourceSelector::SP & selector,
+ const ISearchableIndexCollection &sources);
+
+ virtual void append(uint32_t id, const IndexSearchable::SP &source);
+ virtual void replace(uint32_t id, const IndexSearchable::SP &source);
+ virtual IndexSearchable::SP getSearchableSP(uint32_t i) const;
+ virtual void setSource(uint32_t docId);
+
+
+ // Implements IIndexCollection
+ virtual const ISourceSelector &getSourceSelector() const;
+ virtual size_t getSourceCount() const;
+ virtual IndexSearchable &getSearchable(uint32_t i) const;
+ virtual uint32_t getSourceId(uint32_t i) const;
+
+ // Implements IndexSearchable
+ virtual Blueprint::UP
+ createBlueprint(const IRequestContext & requestContext,
+ const FieldSpec &field,
+ const Node &term,
+ const IAttributeContext &attrCtx);
+ virtual Blueprint::UP
+ createBlueprint(const IRequestContext & requestContext,
+ const FieldSpecList &fields,
+ const Node &term,
+ const IAttributeContext &attrCtx);
+ virtual search::SearchableStats getSearchableStats() const;
+
+ static ISearchableIndexCollection::UP replaceAndRenumber(
+ const ISourceSelector::SP & selector,
+ const ISearchableIndexCollection &fsc,
+ uint32_t id_diff,
+ const IndexSearchable::SP &new_source);
+};
+
+} // namespace searchcorespi
+
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexdisklayout.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexdisklayout.cpp
new file mode 100644
index 00000000000..f886f7eb536
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexdisklayout.cpp
@@ -0,0 +1,62 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include <vespa/log/log.h>
+LOG_SETUP(".searchcorespi.index.indexdisklayout");
+
+#include "indexdisklayout.h"
+#include <sstream>
+
+namespace searchcorespi {
+namespace index {
+
+const vespalib::string
+IndexDiskLayout::FlushDirPrefix = vespalib::string("index.flush.");
+
+const vespalib::string
+IndexDiskLayout::FusionDirPrefix = vespalib::string("index.fusion.");
+
+const vespalib::string
+IndexDiskLayout::SerialNumTag = vespalib::string("Serial num");
+
+IndexDiskLayout::IndexDiskLayout(const vespalib::string &baseDir)
+ : _baseDir(baseDir)
+{
+}
+
+vespalib::string
+IndexDiskLayout::getFlushDir(uint32_t sourceId) const
+{
+ std::ostringstream ost;
+ ost << _baseDir << "/" << FlushDirPrefix << sourceId;
+ return ost.str();
+}
+
+vespalib::string
+IndexDiskLayout::getFusionDir(uint32_t sourceId) const
+{
+ std::ostringstream ost;
+ ost << _baseDir << "/" << FusionDirPrefix << sourceId;
+ return ost.str();
+}
+
+vespalib::string
+IndexDiskLayout::getSerialNumFileName(const vespalib::string &dir)
+{
+ return dir + "/serial.dat";
+}
+
+vespalib::string
+IndexDiskLayout::getSchemaFileName(const vespalib::string &dir)
+{
+ return dir + "/schema.txt";
+}
+
+vespalib::string
+IndexDiskLayout::getSelectorFileName(const vespalib::string &dir)
+{
+ return dir + "/selector";
+}
+
+} // namespace index
+} // namespace searchcorespi
+
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexdisklayout.h b/searchcorespi/src/vespa/searchcorespi/index/indexdisklayout.h
new file mode 100644
index 00000000000..0c010db2cc5
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexdisklayout.h
@@ -0,0 +1,35 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/vespalib/stllike/string.h>
+
+namespace searchcorespi {
+namespace index {
+
+/**
+ * Utility class used to get static aspects of the disk layout (i.e directory and file names)
+ * needed by the index maintainer.
+ */
+class IndexDiskLayout {
+public:
+ static const vespalib::string FlushDirPrefix;
+ static const vespalib::string FusionDirPrefix;
+ static const vespalib::string SerialNumTag;
+
+private:
+ vespalib::string _baseDir;
+
+public:
+ IndexDiskLayout(const vespalib::string &baseDir);
+ vespalib::string getFlushDir(uint32_t sourceId) const;
+ vespalib::string getFusionDir(uint32_t sourceId) const;
+
+ static vespalib::string getSerialNumFileName(const vespalib::string &dir);
+ static vespalib::string getSchemaFileName(const vespalib::string &dir);
+ static vespalib::string getSelectorFileName(const vespalib::string &dir);
+};
+
+} // namespace index
+} // namespace searchcorespi
+
+
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.cpp
new file mode 100644
index 00000000000..7b2088aeffe
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.cpp
@@ -0,0 +1,86 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include <vespa/log/log.h>
+LOG_SETUP(".searchcorespi.index.indexflushtarget");
+
+#include "indexflushtarget.h"
+#include <vespa/vespalib/util/closuretask.h>
+
+using vespalib::makeClosure;
+
+namespace searchcorespi {
+namespace index {
+
+IndexFlushTarget::IndexFlushTarget(IndexMaintainer &indexMaintainer)
+ : IFlushTarget("memoryindex.flush", Type::FLUSH, Component::INDEX),
+ _indexMaintainer(indexMaintainer),
+ _flushStats(indexMaintainer.getFlushStats()),
+ _numFrozenMemoryIndexes(indexMaintainer.getNumFrozenMemoryIndexes()),
+ _maxFrozenMemoryIndexes(indexMaintainer.getMaxFrozenMemoryIndexes()),
+ _lastStats()
+{
+ _lastStats.setPathElementsToLog(7);
+}
+
+IFlushTarget::MemoryGain
+IndexFlushTarget::getApproxMemoryGain() const
+{
+ return MemoryGain(_flushStats.memory_before_bytes, _flushStats.memory_after_bytes);
+}
+
+IFlushTarget::DiskGain
+IndexFlushTarget::getApproxDiskGain() const
+{
+ return DiskGain(0, 0);
+}
+
+
+bool
+IndexFlushTarget::needUrgentFlush(void) const
+{
+ bool urgent = _numFrozenMemoryIndexes > _maxFrozenMemoryIndexes;
+ SerialNum flushedSerial = _indexMaintainer.getFlushedSerialNum();
+ LOG(debug,
+ "Num frozen: %" PRIu32 " Urgent: %d, flushedSerial=%" PRIu64,
+ _numFrozenMemoryIndexes,
+ static_cast<int>(urgent),
+ flushedSerial);
+ return urgent;
+}
+
+
+IFlushTarget::Time
+IndexFlushTarget::getLastFlushTime() const
+{
+ return _indexMaintainer.getLastFlushTime();
+}
+
+IFlushTarget::SerialNum
+IndexFlushTarget::getFlushedSerialNum() const
+{
+ return _indexMaintainer.getFlushedSerialNum();
+}
+
+IFlushTarget::Task::UP
+IndexFlushTarget::initFlush(SerialNum serialNum)
+{
+ // the target must live until this task is done (handled by flush engine).
+ return _indexMaintainer.initFlush(serialNum, &_lastStats);
+}
+
+
+uint64_t
+IndexFlushTarget::getApproxBytesToWriteToDisk() const
+{
+ MemoryGain gain(_flushStats.memory_before_bytes,
+ _flushStats.memory_after_bytes);
+ if (gain.getAfter() < gain.getBefore()) {
+ return gain.getBefore() - gain.getAfter();
+ } else {
+ return 0;
+ }
+}
+
+
+} // namespace index
+} // namespace searchcorespi
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.h b/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.h
new file mode 100644
index 00000000000..fa4b2ad85fb
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexflushtarget.h
@@ -0,0 +1,40 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/searchcorespi/flush/iflushtarget.h>
+#include "indexmaintainer.h"
+
+namespace searchcorespi {
+namespace index {
+
+/**
+ * Flush target for flushing a memory index in an IndexMaintainer.
+ **/
+class IndexFlushTarget : public IFlushTarget {
+private:
+ IndexMaintainer &_indexMaintainer;
+ IndexMaintainer::FlushStats _flushStats;
+ uint32_t _numFrozenMemoryIndexes;
+ uint32_t _maxFrozenMemoryIndexes;
+ FlushStats _lastStats;
+
+public:
+ IndexFlushTarget(IndexMaintainer &indexMaintainer);
+
+ // Implements IFlushTarget
+ virtual MemoryGain getApproxMemoryGain() const;
+ virtual DiskGain getApproxDiskGain() const;
+ virtual SerialNum getFlushedSerialNum() const;
+ virtual Time getLastFlushTime() const;
+
+ virtual bool
+ needUrgentFlush() const;
+
+ virtual Task::UP initFlush(SerialNum currentSerial);
+ virtual FlushStats getLastFlushStats() const { return _lastStats; }
+ virtual uint64_t getApproxBytesToWriteToDisk() const override;
+};
+
+} // namespace index
+} // namespace searchcorespi
+
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.cpp
new file mode 100644
index 00000000000..412c5b8c4a2
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.cpp
@@ -0,0 +1,107 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include <vespa/log/log.h>
+LOG_SETUP(".searchcorespi.index.indexfusiontarget");
+
+#include "indexfusiontarget.h"
+#include "fusionspec.h"
+
+namespace searchcorespi {
+namespace index {
+
+using search::SerialNum;
+namespace {
+
+class Fusioner : public FlushTask {
+private:
+ IndexMaintainer &_indexMaintainer;
+ FlushStats &_stats;
+ SerialNum _serialNum;
+public:
+ Fusioner(IndexMaintainer &indexMaintainer, FlushStats &stats, SerialNum serialNum) :
+ _indexMaintainer(indexMaintainer), _stats(stats), _serialNum(serialNum) {}
+ virtual void run() {
+ vespalib::string outputFusionDir = _indexMaintainer.doFusion(_serialNum);
+ // the target must live until this task is done (handled by flush engine).
+ _stats.setPath(outputFusionDir);
+ }
+
+ virtual SerialNum
+ getFlushSerial(void) const
+ {
+ return 0u; // Zero means that no tls syncing is needed
+ }
+};
+
+}
+IndexFusionTarget::IndexFusionTarget(IndexMaintainer &indexMaintainer)
+ : IFlushTarget("memoryindex.fusion", Type::GC, Component::INDEX),
+ _indexMaintainer(indexMaintainer),
+ _fusionStats(indexMaintainer.getFusionStats()),
+ _lastStats()
+{
+ _lastStats.setPathElementsToLog(7);
+ LOG(debug, "New target, Num flushed: %d, Disk usage: %" PRIu64, _fusionStats.numUnfused, _fusionStats.diskUsage);
+}
+
+IFlushTarget::MemoryGain
+IndexFusionTarget::getApproxMemoryGain() const
+{
+ return MemoryGain(0, 0);
+}
+
+IFlushTarget::DiskGain
+IndexFusionTarget::getApproxDiskGain() const
+{
+ uint64_t diskUsageBefore = _fusionStats.diskUsage;
+ uint64_t diskUsageGain =
+ static_cast<uint64_t>((0.1 *
+ (diskUsageBefore *
+ std::max(0,
+ static_cast<int>
+ (_fusionStats.numUnfused - 1)
+ ))));
+ diskUsageGain = std::min(diskUsageGain, diskUsageBefore);
+ if (!_fusionStats._canRunFusion)
+ diskUsageGain = 0;
+ return DiskGain(diskUsageBefore, diskUsageBefore - diskUsageGain);
+}
+
+bool
+IndexFusionTarget::needUrgentFlush() const
+{
+ bool urgent = _fusionStats.numUnfused > _fusionStats.maxFlushed &&
+ _fusionStats._canRunFusion;
+ LOG(debug, "Num flushed: %d Urgent: %d", _fusionStats.numUnfused, urgent);
+ return urgent;
+}
+
+IFlushTarget::Time
+IndexFusionTarget::getLastFlushTime() const
+{
+ return fastos::ClockSystem::now();
+}
+
+IFlushTarget::SerialNum
+IndexFusionTarget::getFlushedSerialNum() const
+{
+ // Lack of fusion operation doesn't prevent transaction log
+ // pruning.
+ return _indexMaintainer.getCurrentSerialNum();
+}
+
+IFlushTarget::Task::UP
+IndexFusionTarget::initFlush(SerialNum serialNum)
+{
+ return Task::UP(new Fusioner(_indexMaintainer, _lastStats, serialNum));
+}
+
+uint64_t
+IndexFusionTarget::getApproxBytesToWriteToDisk() const
+{
+ return _fusionStats.diskUsage;
+}
+
+
+} // namespace index
+} // namespace searchcorespi
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.h b/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.h
new file mode 100644
index 00000000000..6b67a150cec
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexfusiontarget.h
@@ -0,0 +1,36 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/searchcorespi/flush/iflushtarget.h>
+#include "indexmaintainer.h"
+
+namespace searchcorespi {
+namespace index {
+
+/**
+ * Flush target for doing fusion on disk indexes in an IndexMaintainer.
+ **/
+class IndexFusionTarget : public IFlushTarget {
+private:
+ IndexMaintainer &_indexMaintainer;
+ IndexMaintainer::FusionStats _fusionStats;
+ FlushStats _lastStats;
+
+public:
+ IndexFusionTarget(IndexMaintainer &indexMaintainer);
+
+ // Implements IFlushTarget
+ virtual MemoryGain getApproxMemoryGain() const;
+ virtual DiskGain getApproxDiskGain() const;
+ virtual SerialNum getFlushedSerialNum() const;
+ virtual Time getLastFlushTime() const;
+ virtual bool needUrgentFlush() const;
+
+ virtual Task::UP initFlush(SerialNum currentSerial);
+ virtual FlushStats getLastFlushStats() const { return _lastStats; }
+ virtual uint64_t getApproxBytesToWriteToDisk() const override;
+};
+
+} // namespace index
+} // namespace searchcorespi
+
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp
new file mode 100644
index 00000000000..c8c7fd180a9
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp
@@ -0,0 +1,1238 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include <vespa/log/log.h>
+LOG_SETUP(".searchcorespi.index.indexmaintainer");
+
+#include "indexmaintainer.h"
+#include "diskindexcleaner.h"
+#include "eventlogger.h"
+#include "fusionrunner.h"
+#include "indexflushtarget.h"
+#include "indexfusiontarget.h"
+#include "indexreadutilities.h"
+#include "indexwriteutilities.h"
+#include <vespa/searchlib/common/serialnumfileheadercontext.h>
+#include <vespa/searchlib/attribute/fixedsourceselector.h>
+#include <vespa/searchlib/common/fileheadercontext.h>
+#include <vespa/searchlib/queryeval/isourceselector.h>
+#include <vespa/searchlib/util/dirtraverse.h>
+#include <vespa/searchlib/util/filekit.h>
+#include <vespa/vespalib/util/autoclosurecaller.h>
+#include <vespa/vespalib/util/closure.h>
+#include <vespa/vespalib/util/closuretask.h>
+#include <sstream>
+#include <vector>
+#include <vespa/searchcorespi/flush/closureflushtask.h>
+
+using document::Document;
+using search::FixedSourceSelector;
+using search::TuneFileAttributes;
+using search::index::Schema;
+using search::common::FileHeaderContext;
+using search::queryeval::ISourceSelector;
+using search::queryeval::Source;
+using search::SerialNum;
+using std::ostringstream;
+using vespalib::makeClosure;
+using vespalib::makeTask;
+using vespalib::string;
+using vespalib::Closure0;
+using vespalib::Executor;
+using vespalib::LockGuard;
+using vespalib::Runnable;
+
+namespace searchcorespi {
+namespace index {
+
+namespace
+{
+
+class ReconfigRunnable : public Runnable
+{
+public:
+ bool &_result;
+ IIndexManager::Reconfigurer &_reconfigurer;
+ Closure0<bool>::UP _closure;
+
+ ReconfigRunnable(bool &result,
+ IIndexManager::Reconfigurer &reconfigurer,
+ Closure0<bool>::UP closure)
+ : _result(result),
+ _reconfigurer(reconfigurer),
+ _closure(std::move(closure))
+ { }
+
+ virtual void run() {
+ _result = _reconfigurer.reconfigure(std::move(_closure));
+ }
+};
+
+class ReconfigRunnableTask : public Executor::Task {
+private:
+ IIndexManager::Reconfigurer &_reconfigurer;
+ Closure0<bool>::UP _closure;
+public:
+ ReconfigRunnableTask(IIndexManager::Reconfigurer &reconfigurer, Closure0<bool>::UP closure) :
+ _reconfigurer(reconfigurer),
+ _closure(std::move(closure))
+ { }
+ virtual void run() {
+ _reconfigurer.reconfigure(std::move(_closure));
+ }
+};
+
+SerialNum noSerialNumHigh = std::numeric_limits<SerialNum>::max();
+
+
+class DiskIndexWithDestructorClosure : public IDiskIndex {
+private:
+ vespalib::AutoClosureCaller _caller;
+ IDiskIndex::SP _index;
+
+public:
+ DiskIndexWithDestructorClosure(const IDiskIndex::SP &index,
+ vespalib::Closure::UP closure)
+ : _caller(std::move(closure)),
+ _index(index)
+ { }
+ const IDiskIndex &getWrapped() const { return *_index; }
+
+ /**
+ * Implements searchcorespi::IndexSearchable
+ */
+ virtual Blueprint::UP
+ createBlueprint(const IRequestContext & requestContext,
+ const FieldSpec &field,
+ const Node &term,
+ const IAttributeContext &attrCtx)
+ {
+ return _index->createBlueprint(requestContext, field, term, attrCtx);
+ }
+ virtual Blueprint::UP
+ createBlueprint(const IRequestContext & requestContext,
+ const FieldSpecList &fields,
+ const Node &term,
+ const IAttributeContext &attrCtx)
+ {
+ return _index->createBlueprint(requestContext, fields, term, attrCtx);
+ }
+ virtual search::SearchableStats getSearchableStats() const { return _index->getSearchableStats(); }
+
+ /**
+ * Implements IDiskIndex
+ */
+ virtual const vespalib::string &getIndexDir() const { return _index->getIndexDir(); }
+ virtual const search::index::Schema &getSchema() const { return _index->getSchema(); }
+
+};
+
+} // namespace
+
+uint32_t
+IndexMaintainer::getNewAbsoluteId()
+{
+ return _next_id++;
+}
+
+string
+IndexMaintainer::getFlushDir(uint32_t sourceId) const
+{
+ return _layout.getFlushDir(sourceId);
+}
+
+string
+IndexMaintainer::getFusionDir(uint32_t sourceId) const
+{
+ return _layout.getFusionDir(sourceId);
+}
+
+bool
+IndexMaintainer::reopenDiskIndexes(ISearchableIndexCollection &coll)
+{
+ bool hasReopenedAnything(false);
+ assert(_ctx.getThreadingService().master().isCurrentThread());
+ uint32_t count = coll.getSourceCount();
+ for (uint32_t i = 0; i < count; ++i) {
+ IndexSearchable &is = coll.getSearchable(i);
+ const DiskIndexWithDestructorClosure *const d =
+ dynamic_cast<const DiskIndexWithDestructorClosure *>(&is);
+ if (d == NULL) {
+ continue; // not a disk index
+ }
+ const string indexDir = d->getIndexDir();
+ vespalib::string schemaName = IndexDiskLayout::getSchemaFileName(indexDir);
+ Schema oldSchema;
+ if (!oldSchema.loadFromFile(schemaName)) {
+ LOG(error, "Could not open schema '%s'", schemaName.c_str());
+ }
+ if (oldSchema != d->getSchema()) {
+ IDiskIndex::SP newIndex(reloadDiskIndex(*d));
+ coll.replace(coll.getSourceId(i), newIndex);
+ hasReopenedAnything = true;
+ }
+ }
+ return hasReopenedAnything;
+}
+
+void
+IndexMaintainer::updateDiskIndexSchema(const vespalib::string &indexDir,
+ const Schema &schema,
+ SerialNum wipeSerial)
+{
+ // Called by a flush worker thread OR document db executor thread
+ LockGuard lock(_schemaUpdateLock);
+ IndexWriteUtilities::updateDiskIndexSchema(indexDir, schema, wipeSerial);
+}
+
+void
+IndexMaintainer::updateIndexSchemas(IIndexCollection &coll,
+ const Schema &schema,
+ SerialNum wipeSerial)
+{
+ assert(_ctx.getThreadingService().master().isCurrentThread());
+ uint32_t count = coll.getSourceCount();
+ for (uint32_t i = 0; i < count; ++i) {
+ IndexSearchable &is = coll.getSearchable(i);
+ const DiskIndexWithDestructorClosure *const d =
+ dynamic_cast<const DiskIndexWithDestructorClosure *>(&is);
+ if (d == NULL) {
+ IMemoryIndex *const m = dynamic_cast<IMemoryIndex *>(&is);
+ if (m != NULL) {
+ m->wipeHistory(schema);
+ }
+ continue;
+ }
+ updateDiskIndexSchema(d->getIndexDir(), schema, wipeSerial);
+ }
+}
+
+void
+IndexMaintainer::updateActiveFusionWipeTimeSchema(const Schema &schema)
+{
+ assert(_ctx.getThreadingService().master().isCurrentThread());
+ for (;;) {
+ Schema::SP activeFusionSchema;
+ Schema::SP activeFusionWipeTimeSchema;
+ Schema::SP newActiveFusionWipeTimeSchema;
+ {
+ LockGuard lock(_state_lock);
+ activeFusionSchema = _activeFusionSchema;
+ activeFusionWipeTimeSchema = _activeFusionWipeTimeSchema;
+ }
+ if (activeFusionSchema.get() == NULL)
+ return; // No active fusion
+ if (activeFusionWipeTimeSchema.get() == NULL) {
+ Schema::UP newSchema = Schema::intersect(*activeFusionSchema, schema);
+ newActiveFusionWipeTimeSchema.reset(newSchema.release());
+ } else {
+ Schema::UP newSchema = Schema::intersect(*activeFusionWipeTimeSchema, schema);
+ newActiveFusionWipeTimeSchema.reset(newSchema.release());
+ }
+ {
+ LockGuard slock(_state_lock);
+ LockGuard ilock(_index_update_lock);
+ if (activeFusionSchema.get() == _activeFusionSchema.get() &&
+ activeFusionWipeTimeSchema.get() == _activeFusionWipeTimeSchema.get())
+ {
+ _activeFusionWipeTimeSchema = newActiveFusionWipeTimeSchema;
+ break;
+ }
+ }
+ }
+}
+
+void
+IndexMaintainer::deactivateDiskIndexes(vespalib::string indexDir)
+{
+ _active_indexes->notActive(indexDir);
+ removeOldDiskIndexes();
+}
+
+IDiskIndex::SP
+IndexMaintainer::loadDiskIndex(const string &indexDir)
+{
+ // Called by a flush worker thread OR CTOR (in document db init executor thread)
+ if (LOG_WOULD_LOG(event)) {
+ EventLogger::diskIndexLoadStart(indexDir);
+ }
+ FastOS_Time timer;
+ timer.SetNow();
+ _active_indexes->setActive(indexDir);
+ IDiskIndex::SP retval(new DiskIndexWithDestructorClosure
+ (_operations.loadDiskIndex(indexDir),
+ makeClosure(this, &IndexMaintainer::deactivateDiskIndexes, indexDir)));
+ if (LOG_WOULD_LOG(event)) {
+ EventLogger::diskIndexLoadComplete(indexDir, (int64_t)timer.MilliSecsToNow());
+ }
+ return retval;
+}
+
+IDiskIndex::SP
+IndexMaintainer::reloadDiskIndex(const IDiskIndex &oldIndex)
+{
+ // Called by a flush worker thread OR document db executor thread
+ const string indexDir = oldIndex.getIndexDir();
+ if (LOG_WOULD_LOG(event)) {
+ EventLogger::diskIndexLoadStart(indexDir);
+ }
+ FastOS_Time timer;
+ timer.SetNow();
+ _active_indexes->setActive(indexDir);
+ const IDiskIndex &wrappedDiskIndex =
+ (dynamic_cast<const DiskIndexWithDestructorClosure &>(oldIndex)).getWrapped();
+ IDiskIndex::SP retval(new DiskIndexWithDestructorClosure
+ (_operations.reloadDiskIndex(wrappedDiskIndex),
+ makeClosure(this, &IndexMaintainer::deactivateDiskIndexes, indexDir)));
+ if (LOG_WOULD_LOG(event)) {
+ EventLogger::diskIndexLoadComplete(indexDir, (int64_t)timer.MilliSecsToNow());
+ }
+ return retval;
+}
+
+IDiskIndex::SP
+IndexMaintainer::flushMemoryIndex(IMemoryIndex &memoryIndex,
+ uint32_t indexId,
+ uint32_t docIdLimit,
+ SerialNum serialNum)
+{
+ // Called by a flush worker thread
+ const string flushDir = getFlushDir(indexId);
+ memoryIndex.flushToDisk(flushDir, docIdLimit, serialNum);
+ Schema::SP wtSchema(memoryIndex.getWipeTimeSchema());
+ if (wtSchema.get() != NULL) {
+ updateDiskIndexSchema(flushDir, *wtSchema, noSerialNumHigh);
+ }
+ return loadDiskIndex(flushDir);
+}
+
+ISearchableIndexCollection::UP
+IndexMaintainer::loadDiskIndexes(const FusionSpec &spec, ISearchableIndexCollection::UP sourceList)
+{
+ // Called by CTOR (in document db init executor thread)
+ uint32_t fusion_id = spec.last_fusion_id;
+ if (fusion_id != 0) {
+ sourceList->append(0, loadDiskIndex(getFusionDir(fusion_id)));
+ }
+ for (size_t i = 0; i < spec.flush_ids.size(); ++i) {
+ const uint32_t id = spec.flush_ids[i];
+ const uint32_t relative_id = id - fusion_id;
+ sourceList->append(relative_id, loadDiskIndex(getFlushDir(id)));
+ }
+ return sourceList;
+}
+
+namespace {
+
+ISearchableIndexCollection::SP
+getLeaf(const LockGuard &newSearchLock, const ISearchableIndexCollection::SP & is, bool warn=false)
+{
+ if (dynamic_cast<const WarmupIndexCollection *>(is.get()) != NULL) {
+ if (warn) {
+ LOG(info, "Already warming up an index '%s'. Start using it immediately."
+ " This is an indication that you have configured your warmup interval too long.",
+ is->toString().c_str());
+ }
+ const WarmupIndexCollection & wic(dynamic_cast<const WarmupIndexCollection &>(*is));
+ return getLeaf(newSearchLock, wic.getNextIndexCollection(), warn);
+ } else {
+ return is;
+ }
+}
+
+}
+
+/*
+ * Caller must hold _state_lock (SL).
+ */
+void
+IndexMaintainer::replaceSource(uint32_t sourceId, const IndexSearchable::SP &source)
+{
+ assert(_ctx.getThreadingService().master().isCurrentThread());
+ LockGuard lock(_new_search_lock);
+ ISearchableIndexCollection::UP indexes = createNewSourceCollection(lock);
+ indexes->replace(sourceId, source);
+ swapInNewIndex(lock, std::move(indexes), *source);
+}
+
+/*
+ * Caller must hold _state_lock (SL) and _new_search_lock (NSL), the latter
+ * passed as guard.
+ */
+void
+IndexMaintainer::swapInNewIndex(LockGuard & guard,
+ ISearchableIndexCollection::SP indexes,
+ IndexSearchable & source)
+{
+ assert(indexes->valid());
+ (void) guard;
+ if (_diskIndexWarmupTime > 0) {
+ if (dynamic_cast<const IDiskIndex *>(&source) != NULL) {
+ LOG(debug, "Warming up a disk index.");
+ indexes = std::make_shared<WarmupIndexCollection>
+ (_diskIndexWarmupTime,
+ getLeaf(guard, _source_list, true),
+ indexes,
+ static_cast<IDiskIndex &>(source),
+ _ctx.getWarmupExecutor(),
+ *this);
+ } else {
+ LOG(debug, "No warmup needed as it is a memory index that is mapped in.");
+ }
+ }
+ LOG(debug, "Replacing indexcollection :\n%s\nwith\n%s", _source_list->toString().c_str(), indexes->toString().c_str());
+ assert(indexes->valid());
+ _source_list = std::move(indexes);
+}
+
+/*
+ * Caller must hold _state_lock (SL).
+ */
+void
+IndexMaintainer::appendSource(uint32_t sourceId, const IndexSearchable::SP &source)
+{
+ assert(_ctx.getThreadingService().master().isCurrentThread());
+ LockGuard lock(_new_search_lock);
+ ISearchableIndexCollection::UP indexes = createNewSourceCollection(lock);
+ indexes->append(sourceId, source);
+ swapInNewIndex(lock, std::move(indexes), *source);
+}
+
+ISearchableIndexCollection::UP
+IndexMaintainer::createNewSourceCollection(const LockGuard &newSearchLock)
+{
+ ISearchableIndexCollection::SP currentLeaf(getLeaf(newSearchLock, _source_list));
+ return ISearchableIndexCollection::UP(new IndexCollection(_selector, *currentLeaf));
+}
+
+bool
+IndexMaintainer::doneInitFlush(FlushArgs *args, IMemoryIndex::SP *new_index)
+{
+ // Called by initFlush via reconfigurer
+ assert(_ctx.getThreadingService().master().isCurrentThread());
+ LockGuard state_lock(_state_lock);
+ args->old_index = _current_index;
+ args->old_absolute_id = _current_index_id + _last_fusion_id;
+ args->old_source_list = _source_list;
+ string selector_name = IndexDiskLayout::getSelectorFileName(getFlushDir(args->old_absolute_id));
+ args->flush_serial_num = _current_serial_num;
+ {
+ LockGuard lock(_index_update_lock);
+ // Handover of extra memory indexes to flush
+ args->_extraIndexes = _frozenMemoryIndexes;
+ _frozenMemoryIndexes.clear();
+ }
+
+ LOG(debug, "Flushing. Id = %u. Serial num = %llu",
+ args->old_absolute_id, (unsigned long long) args->flush_serial_num);
+ {
+ LockGuard lock(_index_update_lock);
+ if (!_current_index->hasReceivedDocumentInsert() &&
+ _source_selector_changes == 0) {
+ args->_skippedEmptyLast = true; // Skip flush of empty memory index
+ }
+
+ if (!args->_skippedEmptyLast) {
+ // Keep on using same source selector with extended valid range
+ args->save_info = getSourceSelector().extractSaveInfo(selector_name);
+ // XXX: Overflow issue in source selector
+ _current_index_id = getNewAbsoluteId() - _last_fusion_id;
+ assert(_current_index_id < ISourceSelector::SOURCE_LIMIT);
+ _source_selector_changes = 0;
+ }
+ _current_index = *new_index;
+ }
+ if (args->_skippedEmptyLast) {
+ replaceSource(_current_index_id, _current_index);
+ } else {
+ appendSource(_current_index_id, _current_index);
+ }
+ _source_list->setCurrentIndex(_current_index_id);
+ return true;
+}
+
+void
+IndexMaintainer::doFlush(FlushArgs args)
+{
+ // Called by a flush worker thread
+ FlushIds flushIds; // Absolute ids of flushed indexes
+
+ flushFrozenMemoryIndexes(args, flushIds);
+
+ if (!args._skippedEmptyLast) {
+ flushLastMemoryIndex(args, flushIds);
+ }
+
+ assert(!flushIds.empty());
+ if (args.stats != NULL) {
+ updateFlushStats(args);
+ }
+
+ scheduleFusion(flushIds);
+}
+
+void
+IndexMaintainer::flushFrozenMemoryIndexes(FlushArgs &args, FlushIds &flushIds)
+{
+ // Called by a flush worker thread
+ for (FrozenMemoryIndexRef & frozen : args._extraIndexes) {
+ assert(frozen._absoluteId < args.old_absolute_id);
+ assert(flushIds.empty() || flushIds.back() < frozen._absoluteId);
+
+ FlushArgs eArgs;
+ eArgs.old_index = frozen._index;
+ eArgs.flush_serial_num = frozen._serialNum;
+ eArgs.old_absolute_id = frozen._absoluteId;
+ const uint32_t docIdLimit = frozen._saveInfo->getHeader()._docIdLimit;
+
+ flushMemoryIndex(eArgs, docIdLimit, *frozen._saveInfo, flushIds);
+
+ // Drop references to old memory index and old save info.
+ frozen._index.reset();
+ frozen._saveInfo.reset();
+ }
+}
+
+void
+IndexMaintainer::flushLastMemoryIndex(FlushArgs &args, FlushIds &flushIds)
+{
+ // Called by a flush worker thread
+ const uint32_t docIdLimit = args.save_info->getHeader()._docIdLimit;
+ flushMemoryIndex(args, docIdLimit, *args.save_info, flushIds);
+}
+
+void
+IndexMaintainer::updateFlushStats(const FlushArgs &args)
+{
+ // Called by a flush worker thread
+ vespalib::string flushDir;
+ if (!args._skippedEmptyLast) {
+ flushDir = getFlushDir(args.old_absolute_id);
+ } else {
+ assert(!args._extraIndexes.empty());
+ flushDir = getFlushDir(args._extraIndexes.back()._absoluteId);
+ }
+ args.stats->setPath(flushDir);
+}
+
+void
+IndexMaintainer::flushMemoryIndex(FlushArgs &args,
+ uint32_t docIdLimit,
+ FixedSourceSelector::SaveInfo &saveInfo,
+ FlushIds &flushIds)
+{
+ // Called by a flush worker thread
+ ChangeGens changeGens = getChangeGens();
+ IMemoryIndex &memoryIndex = *args.old_index;
+ Schema::SP wtSchema = memoryIndex.getWipeTimeSchema();
+ IDiskIndex::SP diskIndex = flushMemoryIndex(memoryIndex, args.old_absolute_id,
+ docIdLimit, args.flush_serial_num);
+ IndexWriteUtilities::writeSourceSelector(saveInfo, args.old_absolute_id,
+ getAttrTune(), _ctx.getFileHeaderContext(),
+ args.flush_serial_num);
+ IndexWriteUtilities::writeSerialNum(args.flush_serial_num,
+ getFlushDir(args.old_absolute_id),
+ _ctx.getFileHeaderContext());
+
+ // Post processing after memory index has been written to disk and
+ // opened as disk index.
+ args._changeGens = changeGens;
+ args._wtSchema = wtSchema;
+ reconfigureAfterFlush(args, diskIndex);
+
+ flushIds.push_back(args.old_absolute_id);
+}
+
+
+void
+IndexMaintainer::reconfigureAfterFlush(FlushArgs &args, IDiskIndex::SP &diskIndex)
+{
+ // Called by a flush worker thread
+ for (;;) {
+ // Call reconfig closure for this change
+ Closure0<bool>::UP closure(makeClosure(this, &IndexMaintainer::doneFlush,
+ &args, &diskIndex));
+ if (reconfigure(std::move(closure))) {
+ return;
+ }
+ ChangeGens changeGens = getChangeGens();
+ Schema::SP wtSchema = args.old_index->getWipeTimeSchema();
+ const string indexDir = getFlushDir(args.old_absolute_id);
+ if (wtSchema.get() != NULL) {
+ updateDiskIndexSchema(indexDir, *wtSchema, noSerialNumHigh);
+ }
+ IDiskIndex::SP reloadedDiskIndex = reloadDiskIndex(*diskIndex);
+ diskIndex = reloadedDiskIndex;
+ args._changeGens = changeGens;
+ args._wtSchema = wtSchema;
+ }
+}
+
+
+bool
+IndexMaintainer::doneFlush(FlushArgs *args, IDiskIndex::SP *disk_index) {
+ // Called by doFlush via reconfigurer
+ assert(_ctx.getThreadingService().master().isCurrentThread());
+ LockGuard state_lock(_state_lock);
+ IMemoryIndex &memoryIndex = *args->old_index;
+ if (args->_changeGens != getChangeGens()) {
+ return false; // Must retry operation
+ }
+ if (args->_wtSchema.get() != memoryIndex.getWipeTimeSchema().get()) {
+ return false; // Must retry operation
+ }
+ _flush_serial_num = std::max(_flush_serial_num, args->flush_serial_num);
+ fastos::TimeStamp timeStamp = search::FileKit::getModificationTime((*disk_index)->getIndexDir());
+ _lastFlushTime = timeStamp.time() > _lastFlushTime.time() ? timeStamp : _lastFlushTime;
+ const uint32_t old_id = args->old_absolute_id - _last_fusion_id;
+ replaceSource(old_id, *disk_index);
+ return true;
+}
+
+void
+IndexMaintainer::scheduleFusion(const FlushIds &flushIds)
+{
+ // Called by a flush worker thread
+ LOG(debug, "Scheduled fusion for id %u.", flushIds.back());
+ LockGuard guard(_fusion_lock);
+ for (uint32_t id : flushIds) {
+ _fusion_spec.flush_ids.push_back(id);
+ }
+}
+
+bool
+IndexMaintainer::canRunFusion(const FusionSpec &spec) const
+{
+ return spec.flush_ids.size() > 1 ||
+ (spec.flush_ids.size() > 0 && spec.last_fusion_id != 0);
+}
+
+bool
+IndexMaintainer::doneFusion(FusionArgs *args, IDiskIndex::SP *new_index)
+{
+ // Called by runFusion via reconfigurer
+ assert(_ctx.getThreadingService().master().isCurrentThread());
+ LockGuard state_lock(_state_lock);
+ if (args->_changeGens != getChangeGens()) {
+ return false; // Must retry operation
+ }
+ if (args->_wtSchema.get() != getActiveFusionWipeTimeSchema().get()) {
+ return false; // Must retry operation
+ }
+ args->_old_source_list = _source_list; // delays destruction
+ uint32_t id_diff = args->_new_fusion_id - _last_fusion_id;
+ ostringstream ost;
+ ost << "sourceselector_fusion(" << args->_new_fusion_id << ")";
+ {
+ LockGuard lock(_index_update_lock);
+
+ // make new source selector with shifted values.
+ _selector.reset(getSourceSelector().cloneAndSubtract(ost.str(), id_diff).release());
+ _source_selector_changes = 0;
+ _current_index_id -= id_diff;
+ _last_fusion_id = args->_new_fusion_id;
+ _selector->setBaseId(_last_fusion_id);
+ _activeFusionSchema.reset();
+ _activeFusionWipeTimeSchema.reset();
+ }
+
+ ISearchableIndexCollection::SP currentLeaf;
+ {
+ LockGuard lock(_new_search_lock);
+ currentLeaf = getLeaf(lock, _source_list);
+ }
+ ISearchableIndexCollection::UP fsc =
+ IndexCollection::replaceAndRenumber(_selector, *currentLeaf, id_diff, *new_index);
+ fsc->setCurrentIndex(_current_index_id);
+
+ {
+ LockGuard lock(_new_search_lock);
+ swapInNewIndex(lock, std::move(fsc), **new_index);
+ }
+ return true;
+}
+
+bool
+IndexMaintainer::makeSureAllRemainingWarmupIsDone(ISearchableIndexCollection::SP keepAlive)
+{
+ // called by warmupDone via reconfigurer, warmupDone() doesn't wait for us
+ assert(_ctx.getThreadingService().master().isCurrentThread());
+ ISearchableIndexCollection::SP warmIndex;
+ {
+ LockGuard state_lock(_state_lock);
+ if (keepAlive.get() == _source_list.get()) {
+ LockGuard lock(_new_search_lock);
+ warmIndex = (getLeaf(lock, _source_list, false));
+ _source_list = warmIndex;
+ }
+ }
+ if (warmIndex) {
+ LOG(info, "New index warmed up and switched in : %s", warmIndex->toString().c_str());
+ }
+ LOG(info, "Sync warmupExecutor.");
+ _ctx.getWarmupExecutor().sync();
+ LOG(info, "Now the keep alive of the warmupindexcollection should be gone.");
+ return true;
+}
+
+void
+IndexMaintainer::warmupDone(ISearchableIndexCollection::SP current)
+{
+ // Called by a search thread
+ LockGuard lock(_new_search_lock);
+ if (current.get() == _source_list.get()) {
+ auto makeSure = makeClosure(this, &IndexMaintainer::makeSureAllRemainingWarmupIsDone, current);
+ Executor::Task::UP task(new ReconfigRunnableTask(_ctx.getReconfigurer(), std::move(makeSure)));
+ _ctx.getThreadingService().master().execute(std::move(task));
+ } else {
+ LOG(warning, "There has arrived a new IndexCollection while replacing the active index. "
+ "It can theoretically happen, but not very likely, so logging this as a warning.");
+ }
+}
+
+
+void
+IndexMaintainer::doneSetSchema(SetSchemaArgs &args, IMemoryIndex::SP &newIndex)
+{
+ assert(_ctx.getThreadingService().master().isCurrentThread()); // with idle index executor
+ LockGuard state_lock(_state_lock);
+ typedef FixedSourceSelector::SaveInfo SaveInfo;
+ args._oldSchema = _schema; // Delay destruction
+ args._oldFusionSchema = _fusionSchema;
+ args._oldIndex = _current_index; // Delay destruction
+ args._oldSourceList = _source_list; // Delay destruction
+ uint32_t oldAbsoluteId = _current_index_id + _last_fusion_id;
+ string selectorName = IndexDiskLayout::getSelectorFileName(getFlushDir(oldAbsoluteId));
+ SerialNum freezeSerialNum = _current_serial_num;
+ bool dropEmptyLast = false;
+ SaveInfo::UP saveInfo;
+
+ LOG(info, "Making new schema. Id = %u. Serial num = %llu", oldAbsoluteId, (unsigned long long) freezeSerialNum);
+ {
+ LockGuard lock(_index_update_lock);
+ _schema = args._newSchema;
+ _fusionSchema = args._newFusionSchema;
+ if (!_current_index->hasReceivedDocumentInsert()) {
+ dropEmptyLast = true; // Skip flush of empty memory index
+ }
+
+ if (!dropEmptyLast) {
+ // Keep on using same source selector with extended valid range
+ saveInfo = getSourceSelector().extractSaveInfo(selectorName);
+ // XXX: Overflow issue in source selector
+ _current_index_id = getNewAbsoluteId() - _last_fusion_id;
+ assert(_current_index_id < ISourceSelector::SOURCE_LIMIT);
+ // Extra index to flush next time flushing is performed
+ _frozenMemoryIndexes.emplace_back(args._oldIndex, freezeSerialNum, std::move(saveInfo), oldAbsoluteId);
+ }
+ _current_index = newIndex;
+ }
+ if (dropEmptyLast) {
+ replaceSource(_current_index_id, _current_index);
+ } else {
+ appendSource(_current_index_id, _current_index);
+ }
+ _source_list->setCurrentIndex(_current_index_id);
+}
+
+
+bool
+IndexMaintainer::doneWipeHistory(WipeHistoryArgs &args)
+{
+ assert(_ctx.getThreadingService().master().isCurrentThread()); // with idle index executor
+ LockGuard state_lock(_state_lock);
+ LockGuard lock(_new_search_lock);
+ if (args._old_source_list.get() != _source_list.get()) {
+ return false; // Flush or fusion had started/completed, must retry
+ }
+ _source_list = args._new_source_list;
+ return true;
+}
+
+
+Schema
+IndexMaintainer::getSchema(void) const
+{
+ LockGuard lock(_index_update_lock);
+ return _schema;
+}
+
+Schema::SP
+IndexMaintainer::getActiveFusionWipeTimeSchema(void) const
+{
+ LockGuard lock(_index_update_lock);
+ return _activeFusionWipeTimeSchema;
+}
+
+TuneFileAttributes
+IndexMaintainer::getAttrTune(void)
+{
+ return _tuneFileAttributes;
+}
+
+IndexMaintainer::ChangeGens
+IndexMaintainer::getChangeGens(void)
+{
+ LockGuard lock(_index_update_lock);
+ return _changeGens;
+}
+
+bool
+IndexMaintainer::reconfigure(Closure0<bool>::UP closure)
+{
+ // Called by a flush engine worker thread
+ bool result = false;
+ ReconfigRunnable runnable(result, _ctx.getReconfigurer(), std::move(closure));
+ _ctx.getThreadingService().master().run(runnable);
+ return result;
+}
+
+IndexMaintainer::IndexMaintainer(const IndexMaintainerConfig &config,
+ const IndexMaintainerContext &ctx,
+ IIndexMaintainerOperations &operations)
+ : _base_dir(config.getBaseDir()),
+ _diskIndexWarmupTime(config.getDiskIndexWarmupTime()),
+ _active_indexes(new ActiveDiskIndexes()),
+ _layout(config.getBaseDir()),
+ _schema(config.getSchema()),
+ _fusionSchema(config.getFusionSchema()),
+ _activeFusionSchema(),
+ _activeFusionWipeTimeSchema(),
+ _source_selector_changes(0),
+ _selector(),
+ _source_list(),
+ _last_fusion_id(),
+ _next_id(),
+ _current_index_id(),
+ _current_index(operations.createMemoryIndex(_schema)),
+ _current_serial_num(0),
+ _flush_serial_num(0),
+ _lastFlushTime(),
+ _frozenMemoryIndexes(),
+ _state_lock(),
+ _index_update_lock(),
+ _new_search_lock(),
+ _remove_lock(),
+ _fusion_spec(),
+ _fusion_lock(),
+ _maxFlushed(config.getMaxFlushed()),
+ _maxFrozen(10),
+ _changeGens(),
+ _schemaUpdateLock(),
+ _tuneFileAttributes(config.getTuneFileAttributes()),
+ _ctx(ctx),
+ _operations(operations)
+{
+ // Called by document db init executor thread
+ _changeGens.bumpWipeGen();
+ DiskIndexCleaner::clean(_base_dir, *_active_indexes);
+ FusionSpec spec = IndexReadUtilities::readFusionSpec(_base_dir);
+ _next_id = 1 + (spec.flush_ids.empty() ? spec.last_fusion_id : spec.flush_ids.back());
+ _last_fusion_id = spec.last_fusion_id;
+
+ if (_next_id > 1) {
+ string latest_index_dir = spec.flush_ids.empty()
+ ? getFusionDir(_next_id - 1)
+ : getFlushDir(_next_id - 1);
+
+ _flush_serial_num = IndexReadUtilities::readSerialNum(latest_index_dir);
+ _lastFlushTime = search::FileKit::getModificationTime(latest_index_dir);
+ _current_serial_num = _flush_serial_num;
+ const string selector = IndexDiskLayout::getSelectorFileName(latest_index_dir);
+ _selector.reset(FixedSourceSelector::load(selector).release());
+ } else {
+ _flush_serial_num = 0;
+ _selector.reset(new FixedSourceSelector(0, "sourceselector", 1));
+ }
+ uint32_t baseId(_selector->getBaseId());
+ if (_last_fusion_id != baseId) {
+ assert(_last_fusion_id > baseId);
+ uint32_t id_diff = _last_fusion_id - baseId;
+ ostringstream ost;
+ ost << "sourceselector_fusion(" << _last_fusion_id << ")";
+ _selector.reset(getSourceSelector().cloneAndSubtract(ost.str(), id_diff).release());
+ assert(_last_fusion_id == _selector->getBaseId());
+ }
+ _current_index_id = getNewAbsoluteId() - _last_fusion_id;
+ assert(_current_index_id < ISourceSelector::SOURCE_LIMIT);
+ ISearchableIndexCollection::UP sourceList(loadDiskIndexes(spec, ISearchableIndexCollection::UP(new IndexCollection(_selector))));
+ LOG(debug, "Index manager created with flushed serial num %" PRIu64, _flush_serial_num);
+ sourceList->append(_current_index_id, _current_index);
+ sourceList->setCurrentIndex(_current_index_id);
+ _source_list.reset(sourceList.release());
+ _fusion_spec = spec;
+}
+
+IndexMaintainer::~IndexMaintainer()
+{
+ _source_list.reset();
+ _frozenMemoryIndexes.clear();
+ _selector.reset();
+}
+
+FlushTask::UP
+IndexMaintainer::initFlush(SerialNum serialNum, searchcorespi::FlushStats * stats)
+{
+ assert(_ctx.getThreadingService().master().isCurrentThread()); // while flush engine scheduler thread waits
+ {
+ LockGuard lock(_index_update_lock);
+ _current_serial_num = std::max(_current_serial_num, serialNum);
+ }
+
+ IMemoryIndex::SP new_index(_operations.createMemoryIndex(getSchema()));
+ FlushArgs args;
+ args.stats = stats;
+ scheduleCommit();
+ // Ensure that all index thread tasks accessing memory index have completed.
+ _ctx.getThreadingService().sync();
+ // Call reconfig closure for this change
+ Closure0<bool>::UP closure( makeClosure(this, &IndexMaintainer::doneInitFlush, &args, &new_index));
+ bool success = _ctx.getReconfigurer().reconfigure(std::move(closure));
+ assert(success);
+ (void) success;
+ if (args._skippedEmptyLast && args._extraIndexes.empty()) {
+ // No memory index to flush, it was empty
+ LockGuard lock(_state_lock);
+ _flush_serial_num = _current_serial_num;
+ _lastFlushTime = fastos::ClockSystem::now();
+ LOG(debug, "No memory index to flush. Update serial number and flush time to current: "
+ "flushSerialNum(%" PRIu64 "), lastFlushTime(%f)",
+ _flush_serial_num, _lastFlushTime.sec());
+ return FlushTask::UP();
+ }
+ SerialNum realSerialNum = args.flush_serial_num;
+ return makeFlushTask(makeClosure(this, &IndexMaintainer::doFlush, std::move(args)), realSerialNum);
+}
+
+FusionSpec
+IndexMaintainer::getFusionSpec()
+{
+ // Only called by unit test
+ LockGuard guard(_fusion_lock);
+ return _fusion_spec;
+}
+
+string
+IndexMaintainer::doFusion(SerialNum serialNum)
+{
+ // Called by a flush engine worker thread
+
+ // Make sure to update serial num in case it is something that does not receive any data.
+ // XXX: Wrong, and will cause data loss.
+ // XXX: Missing locking.
+ // XXX: Claims to have flushed memory index when starting fusion.
+ {
+ LockGuard lock(_index_update_lock);
+ _current_serial_num = std::max(_current_serial_num, serialNum);
+ }
+
+ FusionSpec spec;
+ {
+ LockGuard guard(_fusion_lock);
+ if (!canRunFusion(_fusion_spec))
+ return "";
+ spec = _fusion_spec;
+ _fusion_spec.flush_ids.clear();
+ }
+
+ uint32_t new_fusion_id = runFusion(spec);
+
+ LockGuard lock(_fusion_lock);
+ if (new_fusion_id == spec.last_fusion_id) { // Error running fusion.
+ LOG(warning, "Fusion failed for id %u.", spec.flush_ids.back());
+ // Restore fusion spec.
+ copy(_fusion_spec.flush_ids.begin(), _fusion_spec.flush_ids.end(), back_inserter(spec.flush_ids));
+ _fusion_spec.flush_ids.swap(spec.flush_ids);
+ } else {
+ _fusion_spec.last_fusion_id = new_fusion_id;
+ }
+ return getFusionDir(new_fusion_id);
+}
+
+
+uint32_t
+IndexMaintainer::runFusion(const FusionSpec &fusion_spec)
+{
+ // Called by a flush engine worker thread
+ FusionArgs args;
+ TuneFileAttributes tuneFileAttributes(getAttrTune());
+ {
+ LockGuard slock(_state_lock);
+ LockGuard ilock(_index_update_lock);
+ _activeFusionSchema.reset(new Schema(_fusionSchema));
+ _activeFusionWipeTimeSchema.reset();
+ args._schema = _fusionSchema;
+ }
+ FastOS_StatInfo statInfo;
+ string lastFlushDir(getFlushDir(fusion_spec.flush_ids.back()));
+ string lastSerialFile = IndexDiskLayout::getSerialNumFileName(lastFlushDir);
+ SerialNum serialNum = 0;
+ if (FastOS_File::Stat(lastSerialFile.c_str(), &statInfo)) {
+ serialNum = IndexReadUtilities::readSerialNum(lastFlushDir);
+ }
+ FusionRunner fusion_runner(_base_dir, args._schema, tuneFileAttributes, _ctx.getFileHeaderContext());
+ uint32_t new_fusion_id = fusion_runner.fuse(fusion_spec, serialNum, _operations);
+ bool ok = (new_fusion_id != 0);
+ if (ok) {
+ ok = IndexWriteUtilities::copySerialNumFile(getFlushDir(fusion_spec.flush_ids.back()),
+ getFusionDir(new_fusion_id));
+ }
+ if (!ok) {
+ LOG(error, "Fusion failed.");
+ string fail_dir = getFusionDir(fusion_spec.flush_ids.back());
+ FastOS_FileInterface::EmptyAndRemoveDirectory(fail_dir.c_str());
+ {
+ LockGuard slock(_state_lock);
+ LockGuard ilock(_index_update_lock);
+ _activeFusionSchema.reset();
+ _activeFusionWipeTimeSchema.reset();
+ }
+ return fusion_spec.last_fusion_id;
+ }
+
+ const string new_fusion_dir = getFusionDir(new_fusion_id);
+ Schema::SP wtSchema = getActiveFusionWipeTimeSchema();
+ if (wtSchema.get() != NULL) {
+ updateDiskIndexSchema(new_fusion_dir, *wtSchema, noSerialNumHigh);
+ }
+ ChangeGens changeGens = getChangeGens();
+ IDiskIndex::SP new_index(loadDiskIndex(new_fusion_dir));
+
+ // Post processing after fusion operation has completed and new disk
+ // index has been opened.
+
+ args._new_fusion_id = new_fusion_id;
+ args._changeGens = changeGens;
+ args._wtSchema = wtSchema;
+ for (;;) {
+ // Call reconfig closure for this change
+ Closure0<bool>::UP closure( makeClosure(this, &IndexMaintainer::doneFusion, &args, &new_index));
+ bool success = reconfigure(std::move(closure));
+ if (success) {
+ break;
+ }
+ changeGens = getChangeGens();
+ wtSchema = getActiveFusionWipeTimeSchema();
+ if (wtSchema.get() != NULL) {
+ updateDiskIndexSchema(new_fusion_dir, *wtSchema, noSerialNumHigh);
+ }
+ IDiskIndex::SP diskIndex2;
+ diskIndex2 = reloadDiskIndex(*new_index);
+ new_index = diskIndex2;
+ args._changeGens = changeGens;
+ args._wtSchema = wtSchema;
+ }
+ removeOldDiskIndexes();
+
+ return new_fusion_id;
+}
+
+void
+IndexMaintainer::removeOldDiskIndexes()
+{
+ LockGuard slock(_remove_lock);
+ DiskIndexCleaner::removeOldIndexes(_base_dir, *_active_indexes);
+}
+
+IndexMaintainer::FlushStats
+IndexMaintainer::getFlushStats() const
+{
+ // Called by flush engine scheduler thread (from getFlushTargets())
+ FlushStats stats;
+ uint64_t source_selector_bytes;
+ uint32_t numFrozen = 0;
+ {
+ LockGuard lock(_index_update_lock);
+ source_selector_bytes = _selector->getDocIdLimit() * sizeof(Source);
+ stats.memory_before_bytes += _current_index->getMemoryUsage().allocatedBytes() + source_selector_bytes;
+ stats.memory_after_bytes += _current_index->getStaticMemoryFootprint() + source_selector_bytes;
+ numFrozen = _frozenMemoryIndexes.size();
+ for (const FrozenMemoryIndexRef & frozen : _frozenMemoryIndexes) {
+ stats.memory_before_bytes += frozen._index->getMemoryUsage().allocatedBytes() + source_selector_bytes;
+ }
+ }
+
+ if (!_source_selector_changes && stats.memory_after_bytes >= stats.memory_before_bytes) {
+ // Nothing is written if the index is empty.
+ stats.disk_write_bytes = 0;
+ stats.cpu_time_required = 0;
+ } else {
+ stats.disk_write_bytes = stats.memory_before_bytes + source_selector_bytes - stats.memory_after_bytes;
+ stats.cpu_time_required = source_selector_bytes * 3 * (1 + numFrozen) + stats.disk_write_bytes;
+ }
+ return stats;
+}
+
+IndexMaintainer::FusionStats
+IndexMaintainer::getFusionStats() const
+{
+ // Called by flush engine scheduler thread (from getFlushTargets())
+ FusionStats stats;
+ IndexSearchable::SP source_list;
+
+ {
+ LockGuard lock(_new_search_lock);
+ source_list = _source_list;
+ }
+ stats.diskUsage = source_list->getSearchableStats().sizeOnDisk();
+ stats.maxFlushed = _maxFlushed;
+ {
+ LockGuard guard(_fusion_lock);
+ stats.numUnfused = _fusion_spec.flush_ids.size() + ((_fusion_spec.last_fusion_id != 0) ? 1 : 0);
+ stats._canRunFusion = canRunFusion(_fusion_spec);
+ }
+ LOG(debug, "Get fusion stats. Disk usage: %" PRIu64 ", maxflushed: %d", stats.diskUsage, stats.maxFlushed);
+ return stats;
+}
+
+uint32_t
+IndexMaintainer::getNumFrozenMemoryIndexes(void) const
+{
+ // Called by flush engine scheduler thread (from getFlushTargets())
+ LockGuard state_lock(_index_update_lock);
+ return _frozenMemoryIndexes.size();
+}
+
+void
+IndexMaintainer::putDocument(uint32_t lid, const Document &doc, SerialNum serialNum)
+{
+ assert(_ctx.getThreadingService().index().isCurrentThread());
+ LockGuard lock(_index_update_lock);
+ try {
+ _current_index->insertDocument(lid, doc);
+ } catch (const vespalib::IllegalStateException & e) {
+ vespalib::string s = "Failed inserting document :\n" + doc.toXml(" ") + "\n";
+ LOG(error, "%s", s.c_str());
+ throw vespalib::IllegalStateException(s, e, VESPA_STRLOC);
+ }
+ _selector->setSource(lid, _current_index_id);
+ _source_list->setSource(lid);
+ ++_source_selector_changes;
+ _current_serial_num = serialNum;
+}
+
+void
+IndexMaintainer::removeDocument(uint32_t lid, SerialNum serialNum)
+{
+ assert(_ctx.getThreadingService().index().isCurrentThread());
+ LockGuard lock(_index_update_lock);
+ _current_index->removeDocument(lid);
+ _selector->setSource(lid, _current_index_id);
+ _source_list->setSource(lid);
+ ++_source_selector_changes;
+ _current_serial_num = serialNum;
+}
+
+void
+IndexMaintainer::scheduleCommit()
+{
+ assert(_ctx.getThreadingService().master().isCurrentThread());
+ _ctx.getThreadingService().index().
+ execute(makeTask(makeClosure<IndexMaintainer *, IndexMaintainer, void>(this, &IndexMaintainer::commit)));
+}
+
+void
+IndexMaintainer::commit()
+{
+ // only triggered via scheduleCommit()
+ assert(_ctx.getThreadingService().index().isCurrentThread());
+ LockGuard lock(_index_update_lock);
+ _current_index->commit(std::shared_ptr<search::IDestructorCallback>());
+ // caller calls _ctx.getThreadingService().sync()
+}
+
+void
+IndexMaintainer::commit(SerialNum serialNum, OnWriteDoneType onWriteDone)
+{
+ assert(_ctx.getThreadingService().index().isCurrentThread());
+ LockGuard lock(_index_update_lock);
+ _current_serial_num = serialNum;
+ _current_index->commit(onWriteDone);
+}
+
+void
+IndexMaintainer::heartBeat(SerialNum serialNum)
+{
+ assert(_ctx.getThreadingService().index().isCurrentThread());
+ LockGuard lock(_index_update_lock);
+ _current_serial_num = serialNum;
+}
+
+IFlushTarget::List
+IndexMaintainer::getFlushTargets(void)
+{
+ // Called by flush engine scheduler thread
+ IFlushTarget::List ret;
+ IFlushTarget::SP indexFlush(new IndexFlushTarget(*this));
+ IFlushTarget::SP indexFusion(new IndexFusionTarget(*this));
+ ret.push_back(indexFlush);
+ ret.push_back(indexFusion);
+ return ret;
+}
+
+void
+IndexMaintainer::setSchema(const Schema & schema, const Schema & fusionSchema)
+{
+ assert(_ctx.getThreadingService().master().isCurrentThread());
+ IMemoryIndex::SP new_index(_operations.createMemoryIndex(schema));
+ SetSchemaArgs args;
+
+ args._newSchema = schema;
+ args._newFusionSchema = fusionSchema;
+ scheduleCommit();
+ // Ensure that all index thread tasks accessing memory index have completed.
+ _ctx.getThreadingService().sync();
+ // Everything should be quiet now.
+ doneSetSchema(args, new_index);
+ // Source collection has now changed, caller must reconfigure further
+ // as appropriate.
+}
+
+void
+IndexMaintainer::wipeHistory(SerialNum wipeSerial, const Schema &historyFields)
+{
+ assert(_ctx.getThreadingService().master().isCurrentThread());
+ {
+ LockGuard state_lock(_state_lock);
+ LockGuard lock(_index_update_lock);
+ _fusionSchema = _schema;
+ }
+ for (;;) {
+ const Schema before_schema = getSchema();
+ IIndexCollection::SP before_coll = getSourceCollection();
+
+ Schema::UP schema = Schema::make_union(before_schema, historyFields);
+ updateIndexSchemas(*before_coll, *schema, wipeSerial);
+ updateActiveFusionWipeTimeSchema(*schema);
+
+ const Schema after_schema(getSchema());
+ IIndexCollection::SP after_coll = getSourceCollection();
+ if (before_schema == after_schema &&
+ before_coll.get() == after_coll.get())
+ {
+ break;
+ }
+ }
+ {
+ LockGuard state_lock(_state_lock);
+ LockGuard lock(_index_update_lock);
+ _changeGens.bumpWipeGen();
+ }
+ for (bool success(false); !success;) {
+ WipeHistoryArgs args;
+ {
+ LockGuard state_lock(_state_lock);
+ args._old_source_list = _source_list;
+ args._new_source_list.reset(new IndexCollection(_selector, *args._old_source_list));
+ }
+ if (reopenDiskIndexes(*args._new_source_list)) {
+ _ctx.getThreadingService().sync();
+ // Everything should be quiet now.
+ success = doneWipeHistory(args);
+ } else {
+ success = true;
+ }
+ }
+}
+
+} // namespace index
+} // namespace searchcorespi
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h
new file mode 100644
index 00000000000..6e3b6fa9bdb
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h
@@ -0,0 +1,412 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "activediskindexes.h"
+#include "fusionspec.h"
+#include "idiskindex.h"
+#include "iindexmaintaineroperations.h"
+#include "indexdisklayout.h"
+#include "indexmaintainerconfig.h"
+#include "indexmaintainercontext.h"
+#include "imemoryindex.h"
+#include "warmupindexcollection.h"
+#include "ithreadingservice.h"
+#include <boost/noncopyable.hpp>
+#include <vespa/searchcorespi/index/iindexmanager.h>
+#include <vespa/searchcorespi/index/indexsearchable.h>
+#include <vespa/searchcorespi/index/indexcollection.h>
+#include <vespa/searchcorespi/flush/iflushtarget.h>
+#include <vespa/searchcorespi/flush/flushstats.h>
+#include <vespa/searchlib/attribute/fixedsourceselector.h>
+#include <vespa/searchlib/common/serialnum.h>
+#include <vespa/vespalib/util/sync.h>
+#include <memory>
+#include <vector>
+
+namespace document {
+class Document;
+}
+
+namespace search {
+namespace common {
+class FileHeaderContext;
+}
+}
+
+namespace searchcorespi {
+namespace index {
+
+/**
+ * The IndexMaintainer provides a holistic view of a set of disk and
+ * memory indexes. It allows updating the active memory index, enables search
+ * across all indexes, and manages the set of indexes through flushing
+ * of memory indexes and fusion of disk indexes.
+ */
+class IndexMaintainer : public boost::noncopyable,
+ public IIndexManager,
+ public IWarmupDone
+{
+ /**
+ * Extra memory that is frozen but not yet flushed.
+ */
+ class FrozenMemoryIndexRef
+ {
+ public:
+ typedef search::FixedSourceSelector::SaveInfo SaveInfo;
+ typedef search::SerialNum SerialNum;
+ typedef std::shared_ptr<SaveInfo> SaveInfoSP;
+
+ IMemoryIndex::SP _index;
+ SerialNum _serialNum;
+ SaveInfoSP _saveInfo;
+ uint32_t _absoluteId;
+
+ FrozenMemoryIndexRef(const IMemoryIndex::SP &index,
+ SerialNum serialNum,
+ SaveInfo::UP saveInfo,
+ uint32_t absoluteId)
+ : _index(index),
+ _serialNum(serialNum),
+ _saveInfo(saveInfo.release()),
+ _absoluteId(absoluteId)
+ { }
+ };
+
+ class ChangeGens
+ {
+ public:
+ uint32_t _wipeGen;
+
+ ChangeGens() : _wipeGen(0) { }
+ void bumpWipeGen(void) { ++_wipeGen; }
+ bool operator==(const ChangeGens &rhs) const { return _wipeGen == rhs._wipeGen; }
+ bool operator!=(const ChangeGens &rhs) const { return _wipeGen != rhs._wipeGen; }
+ };
+
+ typedef std::vector<uint32_t> FlushIds;
+ typedef std::vector<FrozenMemoryIndexRef> FrozenMemoryIndexRefs;
+ typedef search::queryeval::ISourceSelector ISourceSelector;
+ const vespalib::string _base_dir;
+ const double _diskIndexWarmupTime;
+ ActiveDiskIndexes::SP _active_indexes;
+ IndexDiskLayout _layout;
+ Schema _schema; // Protected by SL + IUL
+ Schema _fusionSchema; // Protected by SL + IUL
+ Schema::SP _activeFusionSchema; // Protected by SL + IUL
+ // Protected by SL + IUL
+ Schema::SP _activeFusionWipeTimeSchema;
+ uint32_t _source_selector_changes; // Protected by IUL
+ // _selector is protected by SL + IUL
+ ISourceSelector::SP _selector;
+ ISearchableIndexCollection::SP _source_list; // Protected by SL + NSL
+ uint32_t _last_fusion_id; // Protected by SL + IUL
+ uint32_t _next_id; // Protected by SL + IUL
+ uint32_t _current_index_id; // Protected by SL + IUL
+ IMemoryIndex::SP _current_index; // Protected by SL + IUL
+ SerialNum _current_serial_num;// Protected by IUL
+ SerialNum _flush_serial_num; // Protected by SL
+ fastos::TimeStamp _lastFlushTime; // Protected by SL
+ // Extra frozen memory indexes. This list is empty unless new
+ // memory index has been added by force (due to config change or
+ // data structure limitations).
+ // Protected by SL + IUL
+ FrozenMemoryIndexRefs _frozenMemoryIndexes;
+ /*
+ * Locks protecting state.
+ *
+ * Note about locking:
+ *
+ * A variable can be protected by multiple locks, e.g. SL + NSL.
+ * To change the variable, all of these locks must be held.
+ * To read the variable, holding any of these locks is sufficient.
+ *
+ * In the example above, getting NSL typically has lower latency, but
+ * fewer variables can be retrieved. Getting SL has a higher latency
+ * and allows a snapshot of multiple variables depending on each other
+ * to be retrieved.
+ *
+ * Flush threads typically performs some setup (take SL, copy a
+ * relevant portion of variables to an args class (FlushArgs,
+ * FusionArgs, SetSchemaArgs) and perform some disk io) that takes
+ * time before scheduling a state change task performed by the
+ * document db master thread.
+ *
+ * The scheduled state change task will fail if state changed too much
+ * after setup by the flush thread. The flush thread must then retry
+ * with an updated setup.
+ *
+ * Things get more complicated when handling multiple kinds of overlapping
+ * flush operations, e.g. dump from memory to disk, fusion, schema changes
+ * and wipe of old fields, since this will trigger more retries for some
+ * of the operations.
+ */
+ vespalib::Lock _state_lock; // Outer lock (SL)
+ vespalib::Lock _index_update_lock; // Inner lock (IUL)
+ vespalib::Lock _new_search_lock; // Inner lock (NSL)
+ vespalib::Lock _remove_lock; // Lock for removing indexes.
+ // Protected by SL + IUL
+ FusionSpec _fusion_spec; // Protected by FL
+ vespalib::Lock _fusion_lock; // Fusion spec lock (FL)
+ uint32_t _maxFlushed;
+ uint32_t _maxFrozen;
+ ChangeGens _changeGens; // Protected by SL + IUL
+ vespalib::Lock _schemaUpdateLock; // Serialize rewrite of schema
+ const search::TuneFileAttributes _tuneFileAttributes;
+ const IndexMaintainerContext _ctx;
+ IIndexMaintainerOperations &_operations;
+
+ search::FixedSourceSelector & getSourceSelector() { return static_cast<search::FixedSourceSelector &>(*_selector); }
+ const search::FixedSourceSelector & getSourceSelector() const { return static_cast<const search::FixedSourceSelector &>(*_selector); }
+ uint32_t getNewAbsoluteId();
+ vespalib::string getFlushDir(uint32_t sourceId) const;
+ vespalib::string getFusionDir(uint32_t sourceId) const;
+
+ /**
+ * Will reopen diskindexes if necessary due to schema changes.
+ * @param coll Indexcollection that will be updated with reloaded index.
+ * @return true if any reload has been performed
+ */
+ bool reopenDiskIndexes(ISearchableIndexCollection &coll);
+
+ void
+ updateDiskIndexSchema(const vespalib::string &indexDir,
+ const Schema &schema,
+ SerialNum wipeSerial);
+
+ void
+ updateIndexSchemas(IIndexCollection &coll,
+ const Schema &schema,
+ SerialNum wipeSerial);
+
+ void updateActiveFusionWipeTimeSchema(const Schema &schema);
+ void deactivateDiskIndexes(vespalib::string indexDir);
+ IDiskIndex::SP loadDiskIndex(const vespalib::string &indexDir);
+ IDiskIndex::SP reloadDiskIndex(const IDiskIndex &oldIndex);
+
+ IDiskIndex::SP
+ flushMemoryIndex(IMemoryIndex &memoryIndex,
+ uint32_t indexId,
+ uint32_t docIdLimit,
+ SerialNum serialNum);
+
+ ISearchableIndexCollection::UP loadDiskIndexes(const FusionSpec &spec, ISearchableIndexCollection::UP sourceList);
+ void replaceSource(uint32_t sourceId, const IndexSearchable::SP &source);
+ void appendSource(uint32_t sourceId, const IndexSearchable::SP &source);
+ void swapInNewIndex(vespalib::LockGuard & guard, ISearchableIndexCollection::SP indexes, IndexSearchable & source);
+ ISearchableIndexCollection::UP createNewSourceCollection(const vespalib::LockGuard &newSearchLock);
+
+ struct FlushArgs {
+ IMemoryIndex::SP old_index; // Last memory index
+ uint32_t old_absolute_id;
+ ISearchableIndexCollection::SP old_source_list; // Delays destruction
+ search::FixedSourceSelector::SaveInfo::SP save_info;
+ SerialNum flush_serial_num;
+ FlushStats * stats;
+ bool _skippedEmptyLast; // Don't flush empty memory index
+
+ // Extra indexes to flush before flushing last frozen memory index
+ // They are flushed before old_index. This list is empty unless
+ // new memory index has been added by force (due to config change
+ // or data structure limitations).
+ FrozenMemoryIndexRefs _extraIndexes;
+ ChangeGens _changeGens;
+ Schema::SP _wtSchema;
+
+ FlushArgs(void)
+ : old_index(),
+ old_absolute_id(0),
+ old_source_list(),
+ save_info(),
+ flush_serial_num(),
+ stats(NULL),
+ _skippedEmptyLast(false),
+ _extraIndexes(),
+ _changeGens(),
+ _wtSchema()
+ {
+ }
+ };
+
+ bool doneInitFlush(FlushArgs *args, IMemoryIndex::SP *new_index);
+ void doFlush(FlushArgs args);
+ void flushFrozenMemoryIndexes(FlushArgs &args, FlushIds &flushIds);
+ void flushLastMemoryIndex(FlushArgs &args, FlushIds &flushIds);
+ void updateFlushStats(const FlushArgs &args);
+ void flushMemoryIndex(FlushArgs &args, uint32_t docIdLimit,
+ search::FixedSourceSelector::SaveInfo &saveInfo, FlushIds &flushIds);
+ void reconfigureAfterFlush(FlushArgs &args, IDiskIndex::SP &diskIndex);
+ bool doneFlush(FlushArgs *args, IDiskIndex::SP *disk_index);
+
+
+ class FusionArgs
+ {
+ public:
+ uint32_t _new_fusion_id;
+ ChangeGens _changeGens;
+ Schema _schema;
+ Schema::SP _wtSchema;
+ ISearchableIndexCollection::SP _old_source_list; // Delays destruction
+
+ FusionArgs()
+ : _new_fusion_id(0u),
+ _changeGens(),
+ _schema(),
+ _wtSchema(),
+ _old_source_list()
+ { }
+ };
+
+ IFlushTarget::SP getFusionTarget();
+ void scheduleFusion(const FlushIds &flushIds);
+ bool canRunFusion(const FusionSpec &spec) const;
+ bool doneFusion(FusionArgs *args, IDiskIndex::SP *new_index);
+
+ class SetSchemaArgs
+ {
+ public:
+ Schema _newSchema;
+ Schema _newFusionSchema;
+ Schema _oldSchema;
+ Schema _oldFusionSchema;
+ IMemoryIndex::SP _oldIndex;
+ ISearchableIndexCollection::SP _oldSourceList; // Delays destruction
+
+ SetSchemaArgs(void)
+ : _newSchema(),
+ _newFusionSchema(),
+ _oldSchema(),
+ _oldFusionSchema(),
+ _oldIndex(),
+ _oldSourceList()
+ { }
+ };
+
+ void doneSetSchema(SetSchemaArgs &args, IMemoryIndex::SP &newIndex);
+
+ class WipeHistoryArgs
+ {
+ public:
+ ISearchableIndexCollection::SP _old_source_list;
+ ISearchableIndexCollection::SP _new_source_list;
+
+ WipeHistoryArgs()
+ : _old_source_list(),
+ _new_source_list()
+ { }
+ };
+
+ bool doneWipeHistory(WipeHistoryArgs &args);
+ Schema getSchema(void) const;
+ Schema::SP getActiveFusionWipeTimeSchema(void) const;
+ search::TuneFileAttributes getAttrTune(void);
+ ChangeGens getChangeGens(void);
+
+ /*
+ * Schedule document db executor task to use reconfigurer to
+ * reconfigure index manager with closure as argument. Wait for
+ * result.
+ */
+ bool reconfigure(vespalib::Closure0<bool>::UP closure);
+ virtual void warmupDone(ISearchableIndexCollection::SP current);
+ bool makeSureAllRemainingWarmupIsDone(ISearchableIndexCollection::SP keepAlive);
+ void scheduleCommit();
+ void commit();
+
+public:
+ IndexMaintainer(const IndexMaintainerConfig &config,
+ const IndexMaintainerContext &context,
+ IIndexMaintainerOperations &operations);
+ ~IndexMaintainer();
+
+ /**
+ * Starts a new MemoryIndex, and dumps the previous one to disk.
+ * Updates flush stats when finished if specified.
+ **/
+ FlushTask::UP initFlush(SerialNum serialNum, FlushStats * stats);
+ FusionSpec getFusionSpec();
+
+ /**
+ * Runs fusion for any available specs and return the output fusion directory.
+ */
+ vespalib::string doFusion(SerialNum serialNum);
+ uint32_t runFusion(const FusionSpec &fusion_spec);
+ void removeOldDiskIndexes();
+
+ struct FlushStats
+ {
+ FlushStats() :
+ memory_before_bytes(0),
+ memory_after_bytes(0),
+ disk_write_bytes(0),
+ cpu_time_required(0)
+ { }
+
+ uint64_t memory_before_bytes;
+ uint64_t memory_after_bytes;
+ uint64_t disk_write_bytes;
+ uint64_t cpu_time_required;
+ };
+
+ struct FusionStats
+ {
+ FusionStats()
+ : diskUsage(0),
+ maxFlushed(0),
+ numUnfused(0),
+ _canRunFusion(false)
+ { }
+
+ uint64_t diskUsage;
+ uint32_t maxFlushed;
+ uint32_t numUnfused;
+ bool _canRunFusion;
+ };
+
+ /**
+ * Calculates an approximation of the cost of performing a flush.
+ **/
+ FlushStats getFlushStats() const;
+ FusionStats getFusionStats() const;
+ const vespalib::string & getBaseDir() const { return _base_dir; }
+ uint32_t getNumFrozenMemoryIndexes() const;
+ uint32_t getMaxFrozenMemoryIndexes() const { return _maxFrozen; }
+
+ fastos::TimeStamp getLastFlushTime() const { return _lastFlushTime; }
+
+ // Implements IIndexManager
+ void putDocument(uint32_t lid, const Document &doc, SerialNum serialNum) override;
+ void removeDocument(uint32_t lid, SerialNum serialNum) override;
+ void commit(SerialNum serialNum, OnWriteDoneType onWriteDone) override;
+ void heartBeat(search::SerialNum serialNum) override;
+
+ SerialNum getCurrentSerialNum() const override {
+ return _current_serial_num;
+ }
+
+ SerialNum getFlushedSerialNum() const override {
+ return _flush_serial_num;
+ }
+
+ IIndexCollection::SP getSourceCollection() const {
+ vespalib::LockGuard lock(_new_search_lock);
+ return _source_list;
+ }
+
+ searchcorespi::IndexSearchable::SP getSearchable() const override {
+ vespalib::LockGuard lock(_new_search_lock);
+ return _source_list;
+ }
+
+ search::SearchableStats getSearchableStats() const override {
+ vespalib::LockGuard lock(_new_search_lock);
+ return _source_list->getSearchableStats();
+ }
+
+ IFlushTarget::List getFlushTargets() override;
+ void setSchema(const Schema & schema, const Schema & fusionSchema) override ;
+ void wipeHistory(SerialNum wipeSerial, const Schema &historyFields) override;
+};
+
+} // namespace index
+} // namespace searchcorespi
+
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainerconfig.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainerconfig.cpp
new file mode 100644
index 00000000000..e2838434a1b
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainerconfig.cpp
@@ -0,0 +1,31 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include <vespa/log/log.h>
+LOG_SETUP(".searchcorespi.index.indexmaintainerconfig");
+
+#include "indexmaintainerconfig.h"
+
+using search::index::Schema;
+using search::TuneFileAttributes;
+
+namespace searchcorespi {
+namespace index {
+
+IndexMaintainerConfig::IndexMaintainerConfig(const vespalib::string &baseDir,
+ double diskIndexWarmupTime,
+ size_t maxFlushed,
+ const Schema &schema,
+ const Schema &fusionSchema,
+ const TuneFileAttributes &tuneFileAttributes)
+ : _baseDir(baseDir),
+ _diskIndexWarmupTime(diskIndexWarmupTime),
+ _maxFlushed(maxFlushed),
+ _schema(schema),
+ _fusionSchema(fusionSchema),
+ _tuneFileAttributes(tuneFileAttributes)
+{
+}
+
+} // namespace index
+} // namespace searchcorespi
+
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainerconfig.h b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainerconfig.h
new file mode 100644
index 00000000000..7ee3e0337cc
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainerconfig.h
@@ -0,0 +1,74 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/searchcommon/common/schema.h>
+#include <vespa/searchlib/common/tunefileinfo.h>
+#include <vespa/vespalib/stllike/string.h>
+
+namespace searchcorespi {
+namespace index {
+
+/**
+ * Class that keeps the config used when constructing an index maintainer.
+ */
+class IndexMaintainerConfig {
+private:
+ const vespalib::string _baseDir;
+ const double _diskIndexWarmupTime;
+ const size_t _maxFlushed;
+ const search::index::Schema _schema;
+ const search::index::Schema _fusionSchema;
+ const search::TuneFileAttributes _tuneFileAttributes;
+
+public:
+ IndexMaintainerConfig(const vespalib::string &baseDir,
+ double diskIndexWarmupTime,
+ size_t maxFlushed,
+ const search::index::Schema &schema,
+ const search::index::Schema &fusionSchema,
+ const search::TuneFileAttributes &tuneFileAttributes);
+
+ /**
+ * Returns the base directory in which the maintainer will store its indexes.
+ */
+ const vespalib::string &getBaseDir() const {
+ return _baseDir;
+ }
+
+ double getDiskIndexWarmupTime() const {
+ return _diskIndexWarmupTime;
+ }
+
+ /**
+ * Returns the initial schema containing all current index fields.
+ */
+ const search::index::Schema &getSchema() const {
+ return _schema;
+ }
+
+ /**
+ * Returns the initial fusion schema containing all index fields that has been in the system.
+ * This includes all current fields and removed fields that has not been wiped yet.
+ * This schema is used during fusion to make sure that removed fields are transferred into the
+ * fusioned index in case they are re-introduced later on.
+ */
+ const search::index::Schema &getFusionSchema() const {
+ return _fusionSchema;
+ }
+
+ /**
+ * Returns the specification on how to read/write attribute vector data files.
+ */
+ const search::TuneFileAttributes &getTuneFileAttributes() const {
+ return _tuneFileAttributes;
+ }
+
+ size_t getMaxFlushed() const {
+ return _maxFlushed;
+ }
+};
+
+} // namespace index
+} // namespace searchcorespi
+
+
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainercontext.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainercontext.cpp
new file mode 100644
index 00000000000..d18920f5c7a
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainercontext.cpp
@@ -0,0 +1,28 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include <vespa/log/log.h>
+LOG_SETUP(".searchcorespi.index.indexmaintainercontext");
+
+#include "indexmaintainercontext.h"
+
+using search::common::FileHeaderContext;
+using search::TuneFileAttributes;
+using searchcorespi::IIndexManager;
+
+namespace searchcorespi {
+namespace index {
+
+IndexMaintainerContext::IndexMaintainerContext(IThreadingService &threadingService,
+ IIndexManager::Reconfigurer &reconfigurer,
+ const FileHeaderContext &fileHeaderContext,
+ vespalib::ThreadExecutor & warmupExecutor)
+ : _threadingService(threadingService),
+ _reconfigurer(reconfigurer),
+ _fileHeaderContext(fileHeaderContext),
+ _warmupExecutor(warmupExecutor)
+{
+}
+
+} // namespace index
+} // namespace searchcorespi
+
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainercontext.h b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainercontext.h
new file mode 100644
index 00000000000..ee8087bef99
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainercontext.h
@@ -0,0 +1,59 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "ithreadingservice.h"
+#include <vespa/searchcorespi/index/iindexmanager.h>
+#include <vespa/searchlib/common/tunefileinfo.h>
+#include <vespa/searchlib/common/fileheadercontext.h>
+#include <vespa/vespalib/util/threadexecutor.h>
+
+namespace searchcorespi {
+namespace index {
+
+/**
+ * Class that keeps the long-lived context used by an index maintainer.
+ */
+class IndexMaintainerContext {
+private:
+ IThreadingService &_threadingService;
+ searchcorespi::IIndexManager::Reconfigurer &_reconfigurer;
+ const search::common::FileHeaderContext &_fileHeaderContext;
+ vespalib::ThreadExecutor & _warmupExecutor;
+
+public:
+ IndexMaintainerContext(IThreadingService &threadingService,
+ searchcorespi::IIndexManager::Reconfigurer &reconfigurer,
+ const search::common::FileHeaderContext &fileHeaderContext,
+ vespalib::ThreadExecutor & warmupExecutor);
+
+ /**
+ * Returns the treading service that encapsulates the thread model used for writing.
+ */
+ IThreadingService &getThreadingService() const {
+ return _threadingService;
+ }
+
+ /**
+ * Returns the reconfigurer used to signal when the index maintainer has changed.
+ */
+ searchcorespi::IIndexManager::Reconfigurer &getReconfigurer() const {
+ return _reconfigurer;
+ }
+
+ /**
+ * Returns the context used to insert extra tags into file headers before writing them.
+ */
+ const search::common::FileHeaderContext &getFileHeaderContext() const {
+ return _fileHeaderContext;
+ }
+
+ /**
+ * @return The executor that should be used for warmup.
+ */
+ vespalib::ThreadExecutor & getWarmupExecutor() const { return _warmupExecutor; }
+};
+
+} // namespace index
+} // namespace searchcorespi
+
+
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmanagerconfig.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexmanagerconfig.cpp
new file mode 100644
index 00000000000..830e4f68c45
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexmanagerconfig.cpp
@@ -0,0 +1,20 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include <vespa/log/log.h>
+LOG_SETUP(".searchcorespi.index.indexmanagerconfig");
+
+#include "indexmanagerconfig.h"
+
+namespace searchcorespi {
+
+IndexManagerConfig::IndexManagerConfig(const vespalib::string &configId,
+ const config::ConfigSnapshot &configSnapshot,
+ size_t numSearcherThreads)
+ : _configId(configId),
+ _configSnapshot(configSnapshot),
+ _numSearcherThreads(numSearcherThreads)
+{
+}
+
+} // namespace searchcorespi
+
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmanagerconfig.h b/searchcorespi/src/vespa/searchcorespi/index/indexmanagerconfig.h
new file mode 100644
index 00000000000..7019a10668a
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexmanagerconfig.h
@@ -0,0 +1,41 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/config/retriever/configsnapshot.h>
+#include <vespa/vespalib/stllike/string.h>
+
+namespace searchcorespi {
+
+/**
+ * Class that keeps the config used when constructing an index manager.
+ */
+class IndexManagerConfig {
+private:
+ vespalib::string _configId;
+ const config::ConfigSnapshot &_configSnapshot;
+ size_t _numSearcherThreads;
+
+public:
+ IndexManagerConfig(const vespalib::string &configId,
+ const config::ConfigSnapshot &configSnapshot,
+ size_t numSearcherThreads);
+
+ /**
+ * Returns the config id used to retrieve the configs from the config snapshot instance.
+ */
+ const vespalib::string &getConfigId() const { return _configId; }
+
+ /**
+ * Returns the snapshot containing configs to be used by the index manager.
+ */
+ const config::ConfigSnapshot &getConfigSnapshot() const { return _configSnapshot; }
+
+ /**
+ * Returns the number of searcher threads that are used to query the index manager.
+ */
+ size_t getNumSearcherThreads() const { return _numSearcherThreads; }
+};
+
+} // namespace searchcorespi
+
+
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexreadutilities.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexreadutilities.cpp
new file mode 100644
index 00000000000..d2c76612833
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexreadutilities.cpp
@@ -0,0 +1,91 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include <vespa/log/log.h>
+LOG_SETUP(".searchcorespi.index.indexreadutilities");
+
+#include "indexreadutilities.h"
+#include "indexdisklayout.h"
+#include <vespa/fastlib/io/bufferedfile.h>
+#include <vespa/vespalib/data/fileheader.h>
+#include <set>
+#include <vector>
+
+using search::SerialNum;
+using vespalib::FileHeader;
+
+namespace searchcorespi {
+namespace index {
+
+namespace {
+
+/**
+ * Assumes that cleanup has removed all obsolete index dirs.
+ **/
+void
+scanForIndexes(const vespalib::string &baseDir,
+ std::vector<vespalib::string> &flushDirs,
+ vespalib::string &fusionDir)
+{
+ FastOS_DirectoryScan dirScan(baseDir.c_str());
+ while (dirScan.ReadNext()) {
+ if (!dirScan.IsDirectory()) {
+ continue;
+ }
+ vespalib::string name = dirScan.GetName();
+ if (name.find(IndexDiskLayout::FlushDirPrefix) == 0) {
+ flushDirs.push_back(name);
+ }
+ if (name.find(IndexDiskLayout::FusionDirPrefix) == 0) {
+ if (!fusionDir.empty()) {
+ // Should never happen, since we run cleanup before load.
+ LOG(warning, "Base directory '%s' contains multiple fusion indexes",
+ baseDir.c_str());
+ }
+ fusionDir = name;
+ }
+ }
+}
+
+}
+
+FusionSpec
+IndexReadUtilities::readFusionSpec(const vespalib::string &baseDir)
+{
+ std::vector<vespalib::string> flushDirs;
+ vespalib::string fusionDir;
+ scanForIndexes(baseDir, flushDirs, fusionDir);
+
+ uint32_t fusionId = 0;
+ if (!fusionDir.empty()) {
+ fusionId = atoi(fusionDir.substr(IndexDiskLayout::FusionDirPrefix.size()).c_str());
+ }
+ std::set<uint32_t> flushIds;
+ for (size_t i = 0; i < flushDirs.size(); ++i) {
+ uint32_t id = atoi(flushDirs[i].substr(IndexDiskLayout::FlushDirPrefix.size()).c_str());
+ flushIds.insert(id);
+ }
+
+ FusionSpec fusionSpec;
+ fusionSpec.last_fusion_id = fusionId;
+ fusionSpec.flush_ids.assign(flushIds.begin(), flushIds.end());
+ return fusionSpec;
+}
+
+SerialNum
+IndexReadUtilities::readSerialNum(const vespalib::string &dir)
+{
+ const vespalib::string fileName = IndexDiskLayout::getSerialNumFileName(dir);
+ Fast_BufferedFile file;
+ file.ReadOpen(fileName.c_str());
+
+ FileHeader fileHeader;
+ fileHeader.readFile(file);
+ if (fileHeader.hasTag(IndexDiskLayout::SerialNumTag)) {
+ return fileHeader.getTag(IndexDiskLayout::SerialNumTag).asInteger();
+ }
+ return 0;
+}
+
+} // namespace index
+} // namespace searchcorespi
+
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexreadutilities.h b/searchcorespi/src/vespa/searchcorespi/index/indexreadutilities.h
new file mode 100644
index 00000000000..9169f845c1b
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexreadutilities.h
@@ -0,0 +1,23 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "fusionspec.h"
+#include <vespa/searchlib/common/serialnum.h>
+#include <vespa/vespalib/stllike/string.h>
+
+namespace searchcorespi {
+namespace index {
+
+/**
+ * Utility class with functions to read aspects of an index from disk.
+ * Used by the index maintainer.
+ */
+struct IndexReadUtilities {
+ static FusionSpec readFusionSpec(const vespalib::string &baseDir);
+ static search::SerialNum readSerialNum(const vespalib::string &dir);
+};
+
+} // namespace index
+} // namespace searchcorespi
+
+
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexsearchable.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexsearchable.cpp
new file mode 100644
index 00000000000..0f670d31855
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexsearchable.cpp
@@ -0,0 +1,35 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/fastos/fastos.h>
+#include <vespa/log/log.h>
+LOG_SETUP(".searchcorespi.index.indexsearchable");
+#include "indexsearchable.h"
+#include <vespa/searchlib/queryeval/intermediate_blueprints.h>
+#include <vespa/searchlib/queryeval/leaf_blueprints.h>
+
+using namespace search::queryeval;
+
+namespace searchcorespi {
+
+IndexSearchable::Blueprint::UP
+IndexSearchable::createBlueprint(const IRequestContext & requestContext,
+ const FieldSpecList &fields,
+ const Node &term,
+ const IAttributeContext &attrCtx)
+{
+ if (fields.empty()) {
+ return Blueprint::UP(new EmptyBlueprint());
+ }
+ if (fields.size() == 1) {
+ return createBlueprint(requestContext, fields[0], term, attrCtx);
+ }
+ OrBlueprint *b = new OrBlueprint();
+ Blueprint::UP result(b);
+ for (size_t i = 0; i < fields.size(); ++i) {
+ b->addChild(createBlueprint(requestContext, fields[i], term, attrCtx));
+ }
+ return result;
+}
+
+} // namespace searchcorespi
+
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexsearchable.h b/searchcorespi/src/vespa/searchcorespi/index/indexsearchable.h
new file mode 100644
index 00000000000..2d4d7cd9674
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexsearchable.h
@@ -0,0 +1,77 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/searchcommon/attribute/iattributecontext.h>
+#include <vespa/searchlib/query/tree/node.h>
+#include <vespa/searchlib/queryeval/field_spec.h>
+#include <vespa/searchlib/queryeval/blueprint.h>
+#include <vespa/searchlib/queryeval/irequestcontext.h>
+#include <vespa/searchlib/util/searchable_stats.h>
+
+namespace searchcorespi {
+
+/**
+ * Abstract class extended by components to expose content that can be
+ * searched by a query term. A IndexSearchable component supports searching
+ * in one or more named fields. The Blueprint created by a Searchable
+ * is an intermediate query representation that is later used to
+ * create the actual search iterators used to produce matches.
+ *
+ * The class is a specialized version of search::queryeval::Searchable
+ * that let the components access a per query attribute context that expose
+ * attribute vectors that can be utilized during query evaluation.
+ **/
+class IndexSearchable
+{
+protected:
+ typedef search::queryeval::IRequestContext IRequestContext;
+ typedef search::queryeval::FieldSpec FieldSpec;
+ typedef search::queryeval::FieldSpecList FieldSpecList;
+ typedef search::query::Node Node;
+ typedef search::attribute::IAttributeContext IAttributeContext;
+ typedef search::queryeval::Blueprint Blueprint;
+public:
+ typedef std::shared_ptr<IndexSearchable> SP;
+
+ IndexSearchable() {}
+
+ virtual ~IndexSearchable() {}
+
+ /**
+ * Create a blueprint searching a single field.
+ *
+ * @return blueprint
+ * @param field the field to search
+ * @param term the query tree term
+ * @param attrCtx the per query attribute context
+ **/
+ virtual Blueprint::UP
+ createBlueprint(const IRequestContext & requestContext,
+ const FieldSpec &field,
+ const Node &term,
+ const IAttributeContext &attrCtx) = 0;
+
+ /**
+ * Create a blueprint searching a set of fields. The default
+ * implementation of this function will create blueprints for
+ * individual fields and combine them with an OR blueprint.
+ *
+ * @return blueprint
+ * @param fields the set of fields to search
+ * @param term the query tree term
+ * @param attrCtx the per query attribute context
+ **/
+ virtual Blueprint::UP
+ createBlueprint(const IRequestContext & requestContext,
+ const FieldSpecList &fields,
+ const Node &term,
+ const IAttributeContext &attrCtx);
+
+ /**
+ * Returns the searchable stats for this index searchable.
+ */
+ virtual search::SearchableStats getSearchableStats() const = 0;
+};
+
+} // namespace searchcorespi
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexwriteutilities.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexwriteutilities.cpp
new file mode 100644
index 00000000000..731e233d289
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexwriteutilities.cpp
@@ -0,0 +1,189 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/fastos/fastos.h>
+#include <vespa/log/log.h>
+LOG_SETUP(".searchcorespi.index.indexwriteutilities");
+
+#include "indexwriteutilities.h"
+#include "indexdisklayout.h"
+#include "indexreadutilities.h"
+#include <vespa/searchlib/common/serialnumfileheadercontext.h>
+#include <vespa/searchlib/index/schemautil.h>
+#include <vespa/vespalib/data/fileheader.h>
+#include <vespa/vespalib/util/exception.h>
+#include <sstream>
+
+using search::FixedSourceSelector;
+using search::TuneFileAttributes;
+using search::common::FileHeaderContext;
+using search::common::SerialNumFileHeaderContext;
+using search::index::Schema;
+using search::index::SchemaUtil;
+using search::SerialNum;
+using vespalib::IllegalStateException;
+using vespalib::FileHeader;
+
+namespace searchcorespi {
+namespace index {
+
+namespace {
+
+SerialNum noSerialNumHigh = std::numeric_limits<SerialNum>::max();
+
+}
+
+void
+IndexWriteUtilities::writeSerialNum(SerialNum serialNum,
+ const vespalib::string &dir,
+ const FileHeaderContext &fileHeaderContext)
+{
+ const vespalib::string fileName =
+ IndexDiskLayout::getSerialNumFileName(dir);
+ const vespalib::string tmpFileName = fileName + ".tmp";
+
+ SerialNumFileHeaderContext snFileHeaderContext(fileHeaderContext,
+ serialNum);
+ Fast_BufferedFile file;
+ file.WriteOpen(tmpFileName.c_str());
+ FileHeader fileHeader;
+ snFileHeaderContext.addTags(fileHeader, fileName);
+ fileHeader.putTag(FileHeader::Tag(IndexDiskLayout::SerialNumTag,
+ serialNum));
+ bool ok = (fileHeader.writeFile(file) >= fileHeader.getSize());
+ if (!file.Sync()) {
+ ok = false;
+ LOG(error,
+ "Unable to fsync '%s'",
+ tmpFileName.c_str());
+ }
+ file.Close();
+
+ if (ok) {
+ FastOS_File renameFile(tmpFileName.c_str());
+ ok &= renameFile.Rename(fileName.c_str());
+ }
+ if (!ok) {
+ std::ostringstream msg;
+ msg << "Unable to write serial number to '" << dir << "'.";
+ throw IllegalStateException(msg.str());
+ }
+}
+
+bool
+IndexWriteUtilities::copySerialNumFile(const vespalib::string &sourceDir,
+ const vespalib::string &destDir)
+{
+ vespalib::string source = IndexDiskLayout::getSerialNumFileName(sourceDir);
+ vespalib::string dest = IndexDiskLayout::getSerialNumFileName(destDir);
+ vespalib::string tmpDest = dest + ".tmp";
+ if (!FastOS_FileInterface::CopyFile(source.c_str(), tmpDest.c_str())) {
+ LOG(error, "Unable to copy file '%s'", source.c_str());
+ return false;
+ }
+ FastOS_File file(tmpDest.c_str());
+ if (!file.OpenReadWrite()) {
+ LOG(error,
+ "Unable to open '%s' for fsync",
+ tmpDest.c_str());
+ return false;
+ }
+ if (!file.Sync()) {
+ LOG(error,
+ "Unable to fsync '%s'",
+ tmpDest.c_str());
+ return false;
+ }
+ file.Close();
+ if (!file.Rename(dest.c_str())) {
+ LOG(error,
+ "Unable to rename file '%s' to '%s'",
+ tmpDest.c_str(), dest.c_str());
+ return false;
+ }
+ return true;
+}
+
+void
+IndexWriteUtilities::writeSourceSelector(FixedSourceSelector::SaveInfo &
+ saveInfo,
+ uint32_t sourceId,
+ const TuneFileAttributes &
+ tuneFileAttributes,
+ const FileHeaderContext &
+ fileHeaderContext,
+ SerialNum serialNum)
+{
+ SerialNumFileHeaderContext snFileHeaderContext(fileHeaderContext,
+ serialNum);
+ if (!saveInfo.save(tuneFileAttributes, snFileHeaderContext)) {
+ std::ostringstream msg;
+ msg << "Flush of sourceselector failed. Source id = " << sourceId;
+ throw IllegalStateException(msg.str());
+ }
+}
+
+void
+IndexWriteUtilities::updateDiskIndexSchema(const vespalib::string &indexDir,
+ const Schema &schema,
+ SerialNum wipeSerial)
+{
+ vespalib::string schemaName = IndexDiskLayout::getSchemaFileName(indexDir);
+ Schema oldSchema;
+ if (!oldSchema.loadFromFile(schemaName)) {
+ LOG(error, "Could not open schema '%s'",
+ schemaName.c_str());
+ return;
+ }
+ if (!SchemaUtil::validateSchema(oldSchema)) {
+ LOG(error, "Could not validate schema loaded from '%s'",
+ schemaName.c_str());
+ return;
+ }
+ Schema::UP newSchema = Schema::intersect(oldSchema, schema);
+ if (*newSchema == oldSchema) {
+ return;
+ }
+ if (wipeSerial != noSerialNumHigh) {
+ SerialNum oldSerial = IndexReadUtilities::readSerialNum(indexDir);
+ if (oldSerial >= wipeSerial) {
+ return;
+ }
+ }
+ vespalib::string schemaTmpName = schemaName + ".tmp";
+ vespalib::string schemaOrigName = schemaName + ".orig";
+ if (!newSchema->saveToFile(schemaTmpName)) {
+ LOG(error, "Could not save schema to '%s'",
+ schemaTmpName.c_str());
+ }
+ // XXX: FastOS layer violation
+ FastOS_StatInfo statInfo;
+ bool statres;
+ statres = FastOS_File::Stat(schemaOrigName.c_str(), &statInfo);
+ if (!statres) {
+ if (statInfo._error != FastOS_StatInfo::FileNotFound) {
+ LOG(error, "Failed to stat orig schema '%s': %s",
+ schemaOrigName.c_str(),
+ FastOS_File::getLastErrorString().c_str());
+ }
+ int linkres = ::link(schemaName.c_str(), schemaOrigName.c_str());
+ if (linkres != 0) {
+ LOG(error, "Could not link '%s' to '%s': %s",
+ schemaOrigName.c_str(),
+ schemaName.c_str(),
+ FastOS_File::getLastErrorString().c_str());
+ }
+ }
+ // XXX: FastOS layer violation
+ int renameres = ::rename(schemaTmpName.c_str(), schemaName.c_str());
+ if (renameres != 0) {
+ int error = errno;
+ std::string errString = FastOS_File::getErrorString(error);
+ LOG(error, "Could not rename '%s' to '%s': %s",
+ schemaTmpName.c_str(),
+ schemaName.c_str(),
+ errString.c_str());
+ }
+}
+
+} // namespace index
+} // namespace searchcorespi
+
diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexwriteutilities.h b/searchcorespi/src/vespa/searchcorespi/index/indexwriteutilities.h
new file mode 100644
index 00000000000..c08b850bd6f
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/indexwriteutilities.h
@@ -0,0 +1,46 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/searchcommon/common/schema.h>
+#include <vespa/searchlib/attribute/fixedsourceselector.h>
+#include <vespa/searchlib/common/tunefileinfo.h>
+#include <vespa/searchlib/common/fileheadercontext.h>
+#include <vespa/searchlib/common/serialnum.h>
+#include <vespa/vespalib/stllike/string.h>
+
+namespace searchcorespi {
+namespace index {
+
+/**
+ * Utility class with functions to write aspects of an index to disk.
+ * Used by the index maintainer.
+ */
+struct IndexWriteUtilities
+{
+ static void
+ writeSerialNum(search::SerialNum serialNum,
+ const vespalib::string &dir,
+ const search::common::FileHeaderContext &fileHeaderContext);
+
+ static bool
+ copySerialNumFile(const vespalib::string &sourceDir,
+ const vespalib::string &destDir);
+
+ static void
+ writeSourceSelector(search::FixedSourceSelector::SaveInfo &saveInfo,
+ uint32_t sourceId,
+ const search::TuneFileAttributes &tuneFileAttributes,
+ const search::common::FileHeaderContext &
+ fileHeaderContext,
+ search::SerialNum serialNum);
+
+ static void
+ updateDiskIndexSchema(const vespalib::string &indexDir,
+ const search::index::Schema &schema,
+ search::SerialNum wipeSerial);
+};
+
+} // namespace index
+} // namespace searchcorespi
+
+
diff --git a/searchcorespi/src/vespa/searchcorespi/index/isearchableindexcollection.cpp b/searchcorespi/src/vespa/searchcorespi/index/isearchableindexcollection.cpp
new file mode 100644
index 00000000000..484a5137a81
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/isearchableindexcollection.cpp
@@ -0,0 +1,31 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/searchcorespi/index/isearchableindexcollection.h>
+
+namespace searchcorespi {
+
+using search::queryeval::ISourceSelector;
+
+void
+ISearchableIndexCollection::setCurrentIndex(uint32_t id)
+{
+ assert( id < ISourceSelector::SOURCE_LIMIT);
+
+ _currentIndex = id;
+}
+
+uint32_t
+ISearchableIndexCollection::getCurrentIndex() const
+{
+ assert( valid() );
+
+ return _currentIndex;
+}
+
+bool
+ISearchableIndexCollection::valid() const
+{
+ return (_currentIndex > 0) && (_currentIndex < ISourceSelector::SOURCE_LIMIT);
+}
+
+}
diff --git a/searchcorespi/src/vespa/searchcorespi/index/isearchableindexcollection.h b/searchcorespi/src/vespa/searchcorespi/index/isearchableindexcollection.h
new file mode 100644
index 00000000000..665f08e0e04
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/isearchableindexcollection.h
@@ -0,0 +1,33 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "iindexcollection.h"
+#include "indexsearchable.h"
+
+namespace searchcorespi {
+
+/**
+ * Interface to both an IndexCollection and to an IndexSearchable
+ */
+class ISearchableIndexCollection : public IIndexCollection,
+ public IndexSearchable
+{
+public:
+ ISearchableIndexCollection() : _currentIndex(-1) { }
+ typedef std::unique_ptr<ISearchableIndexCollection> UP;
+ typedef std::shared_ptr<ISearchableIndexCollection> SP;
+ virtual void append(uint32_t id, const IndexSearchable::SP &source) = 0;
+ virtual void replace(uint32_t id, const IndexSearchable::SP &source) = 0;
+ virtual IndexSearchable::SP getSearchableSP(uint32_t i) const = 0;
+ virtual void setSource(uint32_t docId) = 0;
+
+ void setCurrentIndex(uint32_t id);
+ uint32_t getCurrentIndex() const;
+ bool valid() const;
+private:
+ int32_t _currentIndex;
+};
+
+} // namespace searchcorespi
+
diff --git a/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h b/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h
new file mode 100644
index 00000000000..ec68b374487
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h
@@ -0,0 +1,74 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "i_thread_service.h"
+#include <boost/noncopyable.hpp>
+#include <vespa/vespalib/util/runnable.h>
+#include <vespa/vespalib/util/executor.h>
+#include <vespa/vespalib/util/syncable.h>
+#include <vespa/searchlib/common/isequencedtaskexecutor.h>
+
+namespace searchcorespi {
+namespace index {
+
+/**
+ * Interface for the thread model used for write tasks.
+ *
+ * We have multiple write threads:
+ *
+ * 1. The master write thread used for the majority of write tasks.
+ *
+ * 2. The index write thread used for doing changes to the memory
+ * index, either directly (for data not bound to a field) or via
+ * index field inverter executor or index field writer executor.
+ *
+ * 3. The index field inverter executor is used to populate field
+ * inverters with data from document fields. Scheduled tasks for
+ * the same field are executed in sequence.
+ *
+ * 4. The index field writer executor is used to sort data in field
+ * inverters before pushing the data to the memory field indexes.
+ * Scheduled tasks for the same field are executed in sequence.
+ *
+ * The master write thread is always the one giving tasks to the index
+ * write thread.
+ *
+ * The index write thread extracts fields from documents and gives
+ * task to the index field inverter executor and the index field
+ * writer executor.
+ *
+ * The index field inverter executor and index field writer executor
+ * are separate to allow for double buffering, i.e. populate one set
+ * of field inverters using the index field inverter executor while
+ * another set of field inverters are handled by the index field
+ * writer executor.
+ *
+ * We might decide to allow index field inverter tasks to schedule
+ * tasks to the index field writer executor, so draining logic needs
+ * to sync index field inverter executor before syncing index field
+ * writer executor.
+ */
+struct IThreadingService : public boost::noncopyable,
+ public vespalib::Syncable
+{
+ virtual ~IThreadingService() {}
+
+ /**
+ * Returns a reference to the master write thread.
+ */
+ virtual IThreadService &master() = 0;
+
+ /**
+ * Returns a reference to the index write thread.
+ */
+ virtual IThreadService &index() = 0;
+
+ virtual search::ISequencedTaskExecutor &indexFieldInverter() = 0;
+
+ virtual search::ISequencedTaskExecutor &indexFieldWriter() = 0;
+
+ virtual search::ISequencedTaskExecutor &attributeFieldWriter() = 0;
+};
+
+} // namespace index
+} // namespace searchcorespi
diff --git a/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.cpp b/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.cpp
new file mode 100644
index 00000000000..7d630a2c668
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.cpp
@@ -0,0 +1,220 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/searchcorespi/index/warmupindexcollection.h>
+#include <vespa/searchcorespi/index/idiskindex.h>
+#include <vespa/vespalib/util/closuretask.h>
+#include <vespa/searchlib/fef/matchdatalayout.h>
+#include <vespa/searchlib/query/tree/termnodes.h>
+#include <vespa/log/log.h>
+
+LOG_SETUP(".searchcorespi.index.warmupindexcollection");
+
+namespace searchcorespi {
+
+using search::query::StringBase;
+using search::queryeval::Blueprint;
+using search::fef::MatchDataLayout;
+using search::queryeval::SearchIterator;
+using search::queryeval::ISourceSelector;
+using vespalib::makeTask;
+using vespalib::makeClosure;
+using index::IDiskIndex;
+using fastos::TimeStamp;
+using fastos::ClockSystem;
+
+WarmupIndexCollection::WarmupIndexCollection(double warmupSeconds,
+ ISearchableIndexCollection::SP prev,
+ ISearchableIndexCollection::SP next,
+ IndexSearchable & warmup,
+ vespalib::ThreadExecutor & executor,
+ IWarmupDone & warmupDone) :
+ _prev(prev),
+ _next(next),
+ _warmup(warmup),
+ _executor(executor),
+ _warmupDone(warmupDone),
+ _warmupEndTime(ClockSystem::now() + TimeStamp::Seconds(warmupSeconds)),
+ _handledTerms()
+{
+ if (next->valid()) {
+ setCurrentIndex(next->getCurrentIndex());
+ } else {
+ LOG(warning, "Next index is not valid, Dangerous !! : %s", next->toString().c_str());
+ }
+ LOG(debug, "For %g seconds I will warm up %s.", warmupSeconds, typeid(_warmup).name());
+ LOG(debug, "%s", toString().c_str());
+}
+
+void
+WarmupIndexCollection::setSource(uint32_t docId)
+{
+ assert(_prev->valid());
+ assert(_next->valid());
+ _prev->setSource(docId);
+ _next->setSource(docId);
+}
+
+vespalib::string
+WarmupIndexCollection::toString() const
+{
+ vespalib::asciistream os;
+ os << "warmup : ";
+ if (dynamic_cast<const IDiskIndex *>(&_warmup) != NULL) {
+ os << static_cast<const IDiskIndex &>(_warmup).getIndexDir();
+ } else {
+ os << typeid(_warmup).name();
+ }
+ os << "\n";
+ os << "next : " << _next->toString() << "\n";
+ os << "prev : " << _prev->toString() << "\n";
+ return os.str();
+}
+
+WarmupIndexCollection::~WarmupIndexCollection()
+{
+ if (_warmupEndTime != 0) {
+ LOG(info,
+ "Warmup aborted due to new state change or application shutdown");
+ }
+ _executor.sync();
+}
+
+const ISourceSelector &
+WarmupIndexCollection::getSourceSelector() const
+{
+ return _next->getSourceSelector();
+}
+
+size_t
+WarmupIndexCollection::getSourceCount() const
+{
+ return _next->getSourceCount();
+}
+
+IndexSearchable &
+WarmupIndexCollection::getSearchable(uint32_t i) const
+{
+ return _next->getSearchable(i);
+}
+
+uint32_t
+WarmupIndexCollection::getSourceId(uint32_t i) const
+{
+ return _next->getSourceId(i);
+}
+
+void
+WarmupIndexCollection::fireWarmup(Task::UP task)
+{
+ fastos::TimeStamp now(fastos::ClockSystem::now());
+ if (now < _warmupEndTime) {
+ _executor.execute(std::move(task));
+ } else {
+ vespalib::LockGuard guard(_lock);
+ if (_warmupEndTime != 0) {
+ _warmupEndTime = 0;
+ guard.unlock();
+ LOG(info, "Done warming up. Posting WarmupDoneTask");
+ _warmupDone.warmupDone(shared_from_this());
+ }
+ }
+}
+
+bool
+WarmupIndexCollection::handledBefore(uint32_t fieldId, const Node &term)
+{
+ const StringBase * sb(dynamic_cast<const StringBase *>(&term));
+ if (sb != NULL) {
+ const vespalib::string & s = sb->getTerm();
+ vespalib::LockGuard guard(_lock);
+ TermMap::insert_result found = _handledTerms[fieldId].insert(s);
+ return ! found.second;
+ }
+ return true;
+}
+Blueprint::UP
+WarmupIndexCollection::createBlueprint(const IRequestContext & requestContext,
+ const FieldSpec &field,
+ const Node &term,
+ const IAttributeContext &attrCtx)
+{
+ if ( _warmupEndTime == 0) {
+ // warmup done
+ return _next->createBlueprint(requestContext, field, term, attrCtx);
+ }
+ if ( ! handledBefore(field.getFieldId(), term) ) {
+ MatchDataLayout mdl;
+ FieldSpec fs(field.getName(), field.getFieldId(), mdl.allocTermField(field.getFieldId()), field.isFilter());
+ Task::UP task(new WarmupTask(mdl.createMatchData(), *this));
+ static_cast<WarmupTask &>(*task).createBlueprint(fs, term, attrCtx);
+ fireWarmup(std::move(task));
+ }
+ return _prev->createBlueprint(requestContext, field, term, attrCtx);
+}
+
+Blueprint::UP
+WarmupIndexCollection::createBlueprint(const IRequestContext & requestContext,
+ const FieldSpecList &fields,
+ const Node &term,
+ const IAttributeContext &attrCtx)
+{
+ if ( _warmupEndTime == 0) {
+ // warmup done
+ return _next->createBlueprint(requestContext, fields, term, attrCtx);
+ }
+ MatchDataLayout mdl;
+ FieldSpecList fsl;
+ bool needWarmUp(false);
+ for(size_t i(0); i < fields.size(); i++) {
+ const FieldSpec & f(fields[i]);
+ FieldSpec fs(f.getName(), f.getFieldId(), mdl.allocTermField(f.getFieldId()), f.isFilter());
+ fsl.add(fs);
+ needWarmUp = needWarmUp || ! handledBefore(fs.getFieldId(), term);
+ }
+ if (needWarmUp) {
+ Task::UP task(new WarmupTask(mdl.createMatchData(), *this));
+ static_cast<WarmupTask &>(*task).createBlueprint(fsl, term, attrCtx);
+ fireWarmup(std::move(task));
+ }
+ return _prev->createBlueprint(requestContext, fields, term, attrCtx);
+}
+
+search::SearchableStats
+WarmupIndexCollection::getSearchableStats() const
+{
+ return _prev->getSearchableStats();
+}
+
+void
+WarmupIndexCollection::append(uint32_t id, const IndexSearchable::SP &source)
+{
+ _next->append(id, source);
+}
+
+void
+WarmupIndexCollection::replace(uint32_t id, const IndexSearchable::SP &source)
+{
+ _next->replace(id, source);
+}
+
+IndexSearchable::SP
+WarmupIndexCollection::getSearchableSP(uint32_t i) const
+{
+ return _next->getSearchableSP(i);
+}
+
+void
+WarmupIndexCollection::WarmupTask::run()
+{
+ if (_warmup._warmupEndTime != 0) {
+ LOG(debug, "Warming up %s", _bluePrint->asString().c_str());
+ _bluePrint->fetchPostings(true);
+ SearchIterator::UP it(_bluePrint->createSearch(*_matchData, true));
+ it->initFullRange();
+ for (it->seek(0); !it->isAtEnd(); it->seek(it->getDocId()+1)) {
+ }
+ } else {
+ LOG(debug, "Warmup has finished, ignoring task.");
+ }
+}
+
+}
diff --git a/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.h b/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.h
new file mode 100644
index 00000000000..7457b676f31
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/index/warmupindexcollection.h
@@ -0,0 +1,110 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/searchcorespi/index/isearchableindexcollection.h>
+#include <vespa/vespalib/util/threadexecutor.h>
+#include <vespa/searchlib/queryeval/fake_requestcontext.h>
+
+namespace searchcorespi {
+
+class IWarmupDone {
+public:
+ virtual ~IWarmupDone() { }
+ virtual void warmupDone(ISearchableIndexCollection::SP current) = 0;
+};
+/**
+ * Index collection that holds a reference to the active one and a new one that
+ * is to be warmed up.
+ */
+class WarmupIndexCollection : public ISearchableIndexCollection,
+ public std::enable_shared_from_this<WarmupIndexCollection>
+{
+public:
+ typedef std::shared_ptr<WarmupIndexCollection> SP;
+ WarmupIndexCollection(double warmupSeconds,
+ ISearchableIndexCollection::SP prev,
+ ISearchableIndexCollection::SP next,
+ IndexSearchable & warmup,
+ vespalib::ThreadExecutor & executor,
+ IWarmupDone & warmupDone);
+ ~WarmupIndexCollection();
+ // Implements IIndexCollection
+ const ISourceSelector &getSourceSelector() const override;
+ size_t getSourceCount() const override;
+ IndexSearchable &getSearchable(uint32_t i) const override;
+ uint32_t getSourceId(uint32_t i) const override;
+
+ // Implements IndexSearchable
+ Blueprint::UP
+ createBlueprint(const IRequestContext & requestContext,
+ const FieldSpec &field,
+ const Node &term,
+ const IAttributeContext &attrCtx) override;
+ Blueprint::UP
+ createBlueprint(const IRequestContext & requestContext,
+ const FieldSpecList &fields,
+ const Node &term,
+ const IAttributeContext &attrCtx);
+ search::SearchableStats getSearchableStats() const override;
+
+ // Implements ISearchableIndexCollection
+ void append(uint32_t id, const IndexSearchable::SP &source) override;
+ void replace(uint32_t id, const IndexSearchable::SP &source) override;
+ IndexSearchable::SP getSearchableSP(uint32_t i) const override;
+ void setSource(uint32_t docId) override;
+
+ const ISearchableIndexCollection::SP & getNextIndexCollection() const { return _next; }
+ vespalib::string toString() const override;
+private:
+ typedef search::fef::MatchData MatchData;
+ typedef search::queryeval::FakeRequestContext FakeRequestContext;
+ typedef vespalib::Executor::Task Task;
+ class WarmupTask : public Task {
+ public:
+ WarmupTask(MatchData::UP md, WarmupIndexCollection & warmup) :
+ _warmup(warmup),
+ _matchData(std::move(md)),
+ _bluePrint(),
+ _requestContext()
+ { }
+ WarmupTask &
+ createBlueprint(const FieldSpec &field,
+ const Node &term,
+ const IAttributeContext &attrCtx)
+ {
+ _bluePrint = _warmup.createBlueprint(_requestContext, field, term, attrCtx);
+ return *this;
+ }
+ WarmupTask &
+ createBlueprint(const FieldSpecList &fields,
+ const Node &term,
+ const IAttributeContext &attrCtx)
+ {
+ _bluePrint = _warmup.createBlueprint(_requestContext, fields, term, attrCtx);
+ return *this;
+ }
+ private:
+ void run() override;
+ WarmupIndexCollection & _warmup;
+ MatchData::UP _matchData;
+ Blueprint::UP _bluePrint;
+ FakeRequestContext _requestContext;
+ };
+
+ void fireWarmup(Task::UP task);
+ bool handledBefore(uint32_t fieldId, const Node &term);
+
+ ISearchableIndexCollection::SP _prev;
+ ISearchableIndexCollection::SP _next;
+ IndexSearchable & _warmup;
+ vespalib::ThreadExecutor & _executor;
+ IWarmupDone & _warmupDone;
+ fastos::TimeStamp _warmupEndTime;
+ vespalib::Lock _lock;
+ typedef vespalib::hash_set<vespalib::string> TermMap;
+ typedef vespalib::hash_map<uint32_t, TermMap> FieldTermMap;
+ FieldTermMap _handledTerms;
+};
+
+} // namespace searchcorespi
diff --git a/searchcorespi/src/vespa/searchcorespi/plugin/.gitignore b/searchcorespi/src/vespa/searchcorespi/plugin/.gitignore
new file mode 100644
index 00000000000..7e7c0fe7fae
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/plugin/.gitignore
@@ -0,0 +1,2 @@
+/.depend
+/Makefile
diff --git a/searchcorespi/src/vespa/searchcorespi/plugin/CMakeLists.txt b/searchcorespi/src/vespa/searchcorespi/plugin/CMakeLists.txt
new file mode 100644
index 00000000000..b0dd8095850
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/plugin/CMakeLists.txt
@@ -0,0 +1,7 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_library(searchcorespi_plugin OBJECT
+ SOURCES
+ factoryregistry.cpp
+ factoryloader.cpp
+ DEPENDS
+)
diff --git a/searchcorespi/src/vespa/searchcorespi/plugin/factoryloader.cpp b/searchcorespi/src/vespa/searchcorespi/plugin/factoryloader.cpp
new file mode 100644
index 00000000000..d36705c4d05
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/plugin/factoryloader.cpp
@@ -0,0 +1,33 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/searchcorespi/plugin/factoryloader.h>
+#include <vespa/vespalib/util/exceptions.h>
+
+using vespalib::stringref;
+using vespalib::make_string;
+using vespalib::IllegalArgumentException;
+
+namespace searchcorespi {
+
+FactoryLoader::FactoryLoader() :
+ _libraries()
+{
+}
+
+FactoryLoader::~FactoryLoader()
+{
+}
+
+IIndexManagerFactory::UP
+FactoryLoader::create(const stringref & factory)
+{
+ typedef IIndexManagerFactory* (*FuncT)();
+ _libraries.loadLibrary(factory);
+ const FastOS_DynamicLibrary & lib = *_libraries.get(factory);
+ FuncT registrationMethod = reinterpret_cast<FuncT>(lib.GetSymbol("createIndexManagerFactory"));
+ if (registrationMethod == NULL) {
+ throw IllegalArgumentException(make_string("Failed locating symbol 'createIndexManagerFactory' in library '%s' for factory '%s'.", lib.GetLibName(), factory.c_str()));
+ }
+ return IIndexManagerFactory::UP(registrationMethod());
+}
+
+}
diff --git a/searchcorespi/src/vespa/searchcorespi/plugin/factoryloader.h b/searchcorespi/src/vespa/searchcorespi/plugin/factoryloader.h
new file mode 100644
index 00000000000..a97ba9c1c82
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/plugin/factoryloader.h
@@ -0,0 +1,26 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/searchcorespi/plugin/iindexmanagerfactory.h>
+#include <vespa/vespalib/util/librarypool.h>
+
+namespace searchcorespi {
+
+class FactoryLoader
+{
+public:
+ FactoryLoader();
+ ~FactoryLoader();
+ /**
+ * Will load the library containing the factory. It will then locate the 'createIndexManagerFactory'
+ * symbol and run it to create the factory.
+ * @param the name of the library. Like 'vesparise'.
+ * @return the factory that is created.
+ */
+ IIndexManagerFactory::UP create(const vespalib::stringref & factory);
+private:
+ vespalib::LibraryPool _libraries;
+};
+
+} // namespace searchcorespi
+
diff --git a/searchcorespi/src/vespa/searchcorespi/plugin/factoryregistry.cpp b/searchcorespi/src/vespa/searchcorespi/plugin/factoryregistry.cpp
new file mode 100644
index 00000000000..03ed39c8d30
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/plugin/factoryregistry.cpp
@@ -0,0 +1,57 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/searchcorespi/plugin/factoryregistry.h>
+
+using vespalib::LockGuard;
+using vespalib::IllegalArgumentException;
+using vespalib::stringref;
+using vespalib::string;
+
+namespace searchcorespi {
+
+FactoryRegistry::FactoryRegistry()
+{
+}
+
+FactoryRegistry::~FactoryRegistry()
+{
+}
+
+void FactoryRegistry::add(const stringref & uniqueName, const IIndexManagerFactory::SP & factory)
+{
+ LockGuard guard(_lock);
+ if (_registry.find(uniqueName) == _registry.end()) {
+ _registry[uniqueName] = factory;
+ } else {
+ throw IllegalArgumentException("A factory is already registered with the same name as '" + uniqueName + "'.", VESPA_STRLOC);
+ }
+}
+
+void FactoryRegistry::remove(const stringref & uniqueName)
+{
+ LockGuard guard(_lock);
+ if (_registry.find(uniqueName) == _registry.end()) {
+ throw IllegalArgumentException("No factory is registered with the name of '" + uniqueName + "'.", VESPA_STRLOC);
+ }
+ _registry.erase(uniqueName);
+}
+
+const IIndexManagerFactory::SP &
+FactoryRegistry::get(const stringref & uniqueName) const
+{
+ LockGuard guard(_lock);
+ Registry::const_iterator found = _registry.find(uniqueName);
+ if (found == _registry.end()) {
+ throw IllegalArgumentException("No factory is registered with the name of '" + uniqueName + "'.", VESPA_STRLOC);
+ }
+ return found->second;
+}
+
+bool
+FactoryRegistry::isRegistered(const vespalib::stringref & uniqueName) const
+{
+ LockGuard guard(_lock);
+ Registry::const_iterator found = _registry.find(uniqueName);
+ return found != _registry.end();
+}
+
+}
diff --git a/searchcorespi/src/vespa/searchcorespi/plugin/factoryregistry.h b/searchcorespi/src/vespa/searchcorespi/plugin/factoryregistry.h
new file mode 100644
index 00000000000..3e59eb5a852
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/plugin/factoryregistry.h
@@ -0,0 +1,50 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/searchcorespi/plugin/iindexmanagerfactory.h>
+
+namespace searchcorespi {
+
+/**
+ */
+class FactoryRegistry
+{
+public:
+ FactoryRegistry();
+ ~FactoryRegistry();
+ /**
+ * This will register the plugged in factory under its official
+ * name. The plugin should call this method when it is loaded. E.g.
+ * by using either '__attribute__((constructor))' or using a
+ * global static object that will use its constructor to register
+ * the factory.
+ * @param uniqueName This is a name that is unique over all IndexManager factories.
+ * @param factory The factory instance for producing IndexManagers.
+ * @throws vespalib::IllegalArgument if factory is already registered.
+ */
+ void add(const vespalib::stringref & uniqueName, const IIndexManagerFactory::SP & factory);
+ /**
+ * Will unregister a factory. Should be called when a sharedlibrary is being unloaded.
+ * @param uniqueName Unique name of factory to remove from registry.
+ * @throws vespalib::IllegalArgument if factory is already registered.
+ */
+ void remove(const vespalib::stringref & uniqueName);
+ /**
+ * This method will fetch a factory given its unique name.
+ * @param name The name of the factory to return.
+ * @return The factory.
+ */
+ const IIndexManagerFactory::SP & get(const vespalib::stringref & uniqueName) const;
+ /**
+ * Returns true if a factory with the given name has been registered.
+ */
+ bool isRegistered(const vespalib::stringref & uniqueName) const;
+
+private:
+ typedef std::map<vespalib::string, IIndexManagerFactory::SP> Registry;
+ Registry _registry;
+ vespalib::Lock _lock;
+};
+
+} // namespace searchcorespi
+
diff --git a/searchcorespi/src/vespa/searchcorespi/plugin/iindexmanagerfactory.h b/searchcorespi/src/vespa/searchcorespi/plugin/iindexmanagerfactory.h
new file mode 100644
index 00000000000..982f48cc96a
--- /dev/null
+++ b/searchcorespi/src/vespa/searchcorespi/plugin/iindexmanagerfactory.h
@@ -0,0 +1,76 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/searchcorespi/index/iindexmanager.h>
+#include <vespa/searchcorespi/index/indexmaintainerconfig.h>
+#include <vespa/searchcorespi/index/indexmaintainercontext.h>
+#include <vespa/searchcorespi/index/indexmanagerconfig.h>
+#include <vespa/config/configgen/configinstance.h>
+#include <vespa/config/retriever/configsnapshot.h>
+#include <vespa/config/retriever/configkeyset.h>
+
+namespace searchcorespi {
+
+/**
+ * Interface for an index manager factory. Every provider of an index manager is supposed to provide a
+ * factory for producing them. It is given the basedir, the schema and a collection of configs.
+ * The factory implementation must pick the config it needs and return an IIndexManager instance.
+ * The factory is registered by using the registerFactory() method.
+ */
+class IIndexManagerFactory
+{
+public:
+ typedef std::shared_ptr<IIndexManagerFactory> SP;
+ typedef std::unique_ptr<IIndexManagerFactory> UP;
+
+ virtual ~IIndexManagerFactory() {}
+
+ /**
+ * This method will be called by a document db when it needs to create an index manager that
+ * uses an index maintainer (with source selector) in its implementation.
+ * The factory implementation must use RTTI to figure out what configs are what.
+ * It should receive all configs it needs, but wise to do sanity checking.
+ *
+ * @param managerConfig The config that will be used to construct an index manager.
+ * Note that if the factory used a different config id when populating the
+ * ConfigKeySet compared to the one in this config instance, it must
+ * also override the config id when fetching from the config snapshot.
+ * The root config received in the @ref getConfigKeys() call will also be
+ * part of the config snapshot in this config instance.
+ * @param maintainerConfig The config needed to construct an index maintainer.
+ * @param maintainerContext The context object used by an index maintainer during its lifetime.
+ * @return The index manager created or NULL if not, fx if configs are not as expected.
+ */
+ virtual IIndexManager::UP createIndexManager(const IndexManagerConfig &managerConfig,
+ const index::IndexMaintainerConfig &maintainerConfig,
+ const index::IndexMaintainerContext &maintainerContext) = 0;
+
+ /**
+ * The factory must return the set of config keys that it will require the config from.
+ * This will facilitate that the searchcore can fetch all configs needed in a pluggable way.
+ *
+ * @param configId The config id to use when generating the config keys.
+ * @param schema This is the initial index schema to be used.
+ * @param rootConfig This is an config instance that is the root config for the factory.
+ * Based on this config it must be able to tell if it needs any other config,
+ * and in that case provide the config keys.
+ * @return The set containing keys for all configs required.
+ */
+ virtual config::ConfigKeySet getConfigKeys(const vespalib::string &configId,
+ const search::index::Schema &schema,
+ const config::ConfigInstance &rootConfig) = 0;
+};
+
+} // namespace searchcorespi
+
+extern "C" {
+/**
+ * This is a method that each shared library must have in order provide a factory.
+ * This will be called by the one loading the library.
+ * @return The created factory that the caller will take ownership of.
+ */
+searchcorespi::IIndexManagerFactory * createIndexManagerFactory();
+
+}
+
+