From 446b338bd2985642837f79aee0a7e60b1436b472 Mon Sep 17 00:00:00 2001 From: Arnstein Ressem Date: Mon, 20 Mar 2017 21:56:18 +0100 Subject: Revert "Toregge/reduce wipe history arguments to index maintainer" --- searchcorespi/src/tests/plugin/plugin.cpp | 4 +- .../vespa/searchcorespi/index/iindexmanager.cpp | 3 +- .../src/vespa/searchcorespi/index/iindexmanager.h | 9 ++- .../vespa/searchcorespi/index/indexmaintainer.cpp | 88 +++++++++++++++------- .../vespa/searchcorespi/index/indexmaintainer.h | 21 +++++- 5 files changed, 87 insertions(+), 38 deletions(-) (limited to 'searchcorespi') diff --git a/searchcorespi/src/tests/plugin/plugin.cpp b/searchcorespi/src/tests/plugin/plugin.cpp index eff33e69464..0ebd22495f8 100644 --- a/searchcorespi/src/tests/plugin/plugin.cpp +++ b/searchcorespi/src/tests/plugin/plugin.cpp @@ -34,8 +34,8 @@ public: searchcorespi::IFlushTarget::List l; return l; } - virtual void setSchema(const Schema &, SerialNum) override { } - virtual void wipeHistory(SerialNum) override { } + virtual void setSchema(const Schema &) { } + virtual void wipeHistory(SerialNum , const Schema &) { } }; class IndexManagerFactory : public searchcorespi::IIndexManagerFactory diff --git a/searchcorespi/src/vespa/searchcorespi/index/iindexmanager.cpp b/searchcorespi/src/vespa/searchcorespi/index/iindexmanager.cpp index eb47aaf043e..b6d350ad52e 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/iindexmanager.cpp +++ b/searchcorespi/src/vespa/searchcorespi/index/iindexmanager.cpp @@ -4,9 +4,10 @@ namespace searchcorespi { void -IIndexManager::wipeHistory(SerialNum wipeSerial) +IIndexManager::wipeHistory(SerialNum wipeSerial, const Schema &historyFields) { (void) wipeSerial; + (void) historyFields; } IIndexManager::Reconfigurer::~Reconfigurer() diff --git a/searchcorespi/src/vespa/searchcorespi/index/iindexmanager.h b/searchcorespi/src/vespa/searchcorespi/index/iindexmanager.h index c2976699edb..b1e3e5515b5 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/iindexmanager.h +++ b/searchcorespi/src/vespa/searchcorespi/index/iindexmanager.h @@ -156,14 +156,17 @@ public: * * @param schema The new schema to start using. **/ - virtual void setSchema(const Schema &schema, SerialNum serialNum) = 0; + virtual void setSchema(const Schema &schema) = 0; /** - * Wipes remains of removed fields from this index manager. + * Wipes remains of removed fields from this index manager as specified in the history schema. + * This can for instance be removing these fields from disk indexes. + * The default implementation does nothing. * * @param wipeSerial The serial number of this wipe operation. + * @param historyFields The schema specifying which fields we should wipe away. **/ - virtual void wipeHistory(SerialNum wipeSerial); + virtual void wipeHistory(SerialNum wipeSerial, const Schema &historyFields); }; } // namespace searchcorespi diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp index c075e98e1fc..ce88cc1188b 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp +++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.cpp @@ -173,11 +173,11 @@ IndexMaintainer::reopenDiskIndexes(ISearchableIndexCollection &coll) } const string indexDir = d->getIndexDir(); vespalib::string schemaName = IndexDiskLayout::getSchemaFileName(indexDir); - Schema trimmedSchema; - if (!trimmedSchema.loadFromFile(schemaName)) { + Schema oldSchema; + if (!oldSchema.loadFromFile(schemaName)) { LOG(error, "Could not open schema '%s'", schemaName.c_str()); } - if (trimmedSchema != d->getSchema()) { + if (oldSchema != d->getSchema()) { IDiskIndex::SP newIndex(reloadDiskIndex(*d)); coll.replace(coll.getSourceId(i), newIndex); hasReopenedAnything = true; @@ -435,6 +435,16 @@ IndexMaintainer::FlushArgs::~FlushArgs() { } IndexMaintainer::FlushArgs::FlushArgs(FlushArgs &&) = default; IndexMaintainer::FlushArgs & IndexMaintainer::FlushArgs::operator=(FlushArgs &&) = default; +IndexMaintainer::WipeHistoryArgs::WipeHistoryArgs() + : _old_source_list(), + _new_source_list() +{ } + +IndexMaintainer::WipeHistoryArgs::WipeHistoryArgs(WipeHistoryArgs &&) = default; +IndexMaintainer::WipeHistoryArgs & IndexMaintainer::WipeHistoryArgs::operator=(WipeHistoryArgs &&) = default; + +IndexMaintainer::WipeHistoryArgs::~WipeHistoryArgs() { } + bool IndexMaintainer::doneInitFlush(FlushArgs *args, IMemoryIndex::SP *new_index) { @@ -758,6 +768,20 @@ IndexMaintainer::doneSetSchema(SetSchemaArgs &args, IMemoryIndex::SP &newIndex) } +bool +IndexMaintainer::doneWipeHistory(WipeHistoryArgs &args) +{ + assert(_ctx.getThreadingService().master().isCurrentThread()); // with idle index executor + LockGuard state_lock(_state_lock); + LockGuard lock(_new_search_lock); + if (args._old_source_list.get() != _source_list.get()) { + return false; // Flush or fusion had started/completed, must retry + } + _source_list = args._new_source_list; + return true; +} + + Schema IndexMaintainer::getSchema(void) const { @@ -867,7 +891,7 @@ IndexMaintainer::IndexMaintainer(const IndexMaintainerConfig &config, LOG(debug, "Index manager created with flushed serial num %" PRIu64, _flush_serial_num); sourceList->append(_current_index_id, _current_index); sourceList->setCurrentIndex(_current_index_id); - _source_list = std::move(sourceList); + _source_list.reset(sourceList.release()); _fusion_spec = spec; } @@ -1180,10 +1204,9 @@ IndexMaintainer::getFlushTargets(void) } void -IndexMaintainer::setSchema(const Schema & schema, SerialNum serialNum) +IndexMaintainer::setSchema(const Schema & schema) { assert(_ctx.getThreadingService().master().isCurrentThread()); - internalWipeHistory(schema, serialNum); IMemoryIndex::SP new_index(_operations.createMemoryIndex(schema, _current_serial_num)); SetSchemaArgs args; @@ -1198,37 +1221,46 @@ IndexMaintainer::setSchema(const Schema & schema, SerialNum serialNum) } void -IndexMaintainer::internalWipeHistory(const Schema &schema, SerialNum wipeSerial) +IndexMaintainer::wipeHistory(SerialNum wipeSerial, const Schema &historyFields) { assert(_ctx.getThreadingService().master().isCurrentThread()); - ISearchableIndexCollection::SP new_source_list; - IIndexCollection::SP coll = getSourceCollection(); - updateIndexSchemas(*coll, schema, wipeSerial); - updateActiveFusionWipeTimeSchema(schema); + for (;;) { + const Schema before_schema = getSchema(); + IIndexCollection::SP before_coll = getSourceCollection(); + + Schema::UP schema = Schema::make_union(before_schema, historyFields); + updateIndexSchemas(*before_coll, *schema, wipeSerial); + updateActiveFusionWipeTimeSchema(*schema); + + const Schema after_schema(getSchema()); + IIndexCollection::SP after_coll = getSourceCollection(); + if (before_schema == after_schema && + before_coll.get() == after_coll.get()) + { + break; + } + } { LockGuard state_lock(_state_lock); LockGuard lock(_index_update_lock); _changeGens.bumpWipeGen(); } - { - LockGuard state_lock(_state_lock); - new_source_list = std::make_shared(_selector, *_source_list); - } - if (reopenDiskIndexes(*new_source_list)) { - scheduleCommit(); - _ctx.getThreadingService().sync(); - // Everything should be quiet now. - LockGuard state_lock(_state_lock); - LockGuard lock(_new_search_lock); - _source_list = new_source_list; + for (bool success(false); !success;) { + WipeHistoryArgs args; + { + LockGuard state_lock(_state_lock); + args._old_source_list = _source_list; + args._new_source_list.reset(new IndexCollection(_selector, *args._old_source_list)); + } + if (reopenDiskIndexes(*args._new_source_list)) { + _ctx.getThreadingService().sync(); + // Everything should be quiet now. + success = doneWipeHistory(args); + } else { + success = true; + } } } -void -IndexMaintainer::wipeHistory(SerialNum wipeSerial) -{ - internalWipeHistory(getSchema(), wipeSerial); -} - } // namespace index } // namespace searchcorespi diff --git a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h index 34c25632012..c3cacd2cb25 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h +++ b/searchcorespi/src/vespa/searchcorespi/index/indexmaintainer.h @@ -95,7 +95,7 @@ class IndexMaintainer : public IIndexManager, uint32_t _source_selector_changes; // Protected by IUL // _selector is protected by SL + IUL ISourceSelector::SP _selector; - ISearchableIndexCollection::SP _source_list; // Protected by SL + NSL, only set by master thread + ISearchableIndexCollection::SP _source_list; // Protected by SL + NSL uint32_t _last_fusion_id; // Protected by SL + IUL uint32_t _next_id; // Protected by SL + IUL uint32_t _current_index_id; // Protected by SL + IUL @@ -272,6 +272,20 @@ class IndexMaintainer : public IIndexManager, void doneSetSchema(SetSchemaArgs &args, IMemoryIndex::SP &newIndex); + class WipeHistoryArgs + { + public: + ISearchableIndexCollection::SP _old_source_list; + ISearchableIndexCollection::SP _new_source_list; + + WipeHistoryArgs(); + WipeHistoryArgs(WipeHistoryArgs &&); + WipeHistoryArgs & operator=(WipeHistoryArgs &&); + + ~WipeHistoryArgs(); + }; + + bool doneWipeHistory(WipeHistoryArgs &args); Schema getSchema(void) const; Schema::SP getActiveFusionWipeTimeSchema() const; search::TuneFileAttributes getAttrTune(); @@ -287,7 +301,6 @@ class IndexMaintainer : public IIndexManager, bool makeSureAllRemainingWarmupIsDone(ISearchableIndexCollection::SP keepAlive); void scheduleCommit(); void commit(); - void internalWipeHistory(const Schema &schema, SerialNum wipeSerial); public: IndexMaintainer(const IndexMaintainer &) = delete; @@ -382,8 +395,8 @@ public: } IFlushTarget::List getFlushTargets() override; - void setSchema(const Schema & schema, SerialNum serialNum) override ; - void wipeHistory(SerialNum wipeSerial) override; + void setSchema(const Schema & schema) override ; + void wipeHistory(SerialNum wipeSerial, const Schema &historyFields) override; }; } // namespace index -- cgit v1.2.3