diff options
14 files changed, 152 insertions, 56 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java b/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java index 900061949f4..0b461b10963 100644 --- a/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java +++ b/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java @@ -16,6 +16,7 @@ import com.yahoo.jdisc.Response; import com.yahoo.jdisc.application.BindingMatch; import com.yahoo.jdisc.application.UriPattern; import com.yahoo.slime.Cursor; +import com.yahoo.text.StringUtilities; import com.yahoo.vespa.config.server.ApplicationRepository; import com.yahoo.vespa.config.server.application.ApplicationReindexing; import com.yahoo.vespa.config.server.application.ClusterReindexing; @@ -35,6 +36,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.stream.Stream; import static java.util.Map.Entry.comparingByKey; @@ -86,7 +88,7 @@ public class ApplicationHandler extends HttpHandler { if (isReindexingRequest(request)) { applicationRepository.modifyReindexing(applicationId, reindexing -> reindexing.enabled(false)); - return new JSONResponse(Response.Status.OK); + return createMessageResponse("Reindexing disabled"); } if (applicationRepository.delete(applicationId)) @@ -211,27 +213,20 @@ public class ApplicationHandler extends HttpHandler { } if (isReindexRequest(request)) { - triggerReindexing(request, applicationId); - return new JSONResponse(Response.Status.OK); + return triggerReindexing(request, applicationId); } if (isReindexingRequest(request)) { applicationRepository.modifyReindexing(applicationId, reindexing -> reindexing.enabled(true)); - return new JSONResponse(Response.Status.OK); + return createMessageResponse("Reindexing enabled"); } throw new NotFoundException("Illegal POST request '" + request.getUri() + "'"); } - private void triggerReindexing(HttpRequest request, ApplicationId applicationId) { - List<String> clusters = Optional.ofNullable(request.getProperty("clusterId")).stream() - .flatMap(value -> Stream.of(value.split(","))) - .filter(cluster -> ! cluster.isBlank()) - .collect(toList()); - List<String> types = Optional.ofNullable(request.getProperty("documentType")).stream() - .flatMap(value -> Stream.of(value.split(","))) - .filter(type -> ! type.isBlank()) - .collect(toList()); + private HttpResponse triggerReindexing(HttpRequest request, ApplicationId applicationId) { + Set<String> clusters = StringUtilities.split(request.getProperty("clusterId")); + Set<String> types = StringUtilities.split(request.getProperty("documentType")); Instant now = applicationRepository.clock().instant(); applicationRepository.modifyReindexing(applicationId, reindexing -> { if (clusters.isEmpty()) @@ -245,6 +240,14 @@ public class ApplicationHandler extends HttpHandler { reindexing = reindexing.withReady(cluster, type, now); return reindexing; }); + + String message = "Reindexing " + + (clusters.isEmpty() ? "" + : (types.isEmpty() ? "" + : "document types " + String.join(", ", types) + " in ") + + "clusters " + String.join(", ", clusters) + " of ") + + "application " + applicationId; + return createMessageResponse(message); } private HttpResponse getReindexingStatus(ApplicationId applicationId) { @@ -488,4 +491,8 @@ public class ApplicationHandler extends HttpHandler { } + private static JSONResponse createMessageResponse(String message) { + return new JSONResponse(Response.Status.OK) { { object.setString("message", message); } }; + } + } diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java index f2558fa50b8..a05a4a5559b 100644 --- a/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java +++ b/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java @@ -79,7 +79,7 @@ public class ApplicationHandlerTest { private final static TenantName mytenantName = TenantName.from("mytenant"); private final static ApplicationId myTenantApplicationId = ApplicationId.from(mytenantName, ApplicationName.defaultName(), InstanceName.defaultName()); - private final static ApplicationId applicationId = ApplicationId.from(TenantName.defaultName(), ApplicationName.defaultName(), InstanceName.defaultName()); + private final static ApplicationId applicationId = ApplicationId.defaultId(); private final static MockTesterClient testerClient = new MockTesterClient(); private static final MockLogRetriever logRetriever = new MockLogRetriever(); private static final Version vespaVersion = Version.fromString("7.8.9"); @@ -219,32 +219,32 @@ public class ApplicationHandlerTest { database.readReindexingStatus(applicationId).orElseThrow()); clock.advance(Duration.ofSeconds(1)); - reindex(applicationId, ""); + reindex(applicationId, "", "{\"message\":\"Reindexing application default.default\"}"); expected = expected.withReady(clock.instant()); assertEquals(expected, database.readReindexingStatus(applicationId).orElseThrow()); clock.advance(Duration.ofSeconds(1)); expected = expected.withReady(clock.instant()); - reindex(applicationId, "?clusterId="); + reindex(applicationId, "?clusterId=", "{\"message\":\"Reindexing application default.default\"}"); assertEquals(expected, database.readReindexingStatus(applicationId).orElseThrow()); clock.advance(Duration.ofSeconds(1)); expected = expected.withReady(clock.instant()); - reindex(applicationId, "?documentType=moo"); + reindex(applicationId, "?documentType=moo", "{\"message\":\"Reindexing application default.default\"}"); assertEquals(expected, database.readReindexingStatus(applicationId).orElseThrow()); clock.advance(Duration.ofSeconds(1)); - reindex(applicationId, "?clusterId=foo,boo"); + reindex(applicationId, "?clusterId=foo,boo", "{\"message\":\"Reindexing clusters foo, boo of application default.default\"}"); expected = expected.withReady("foo", clock.instant()) .withReady("boo", clock.instant()); assertEquals(expected, database.readReindexingStatus(applicationId).orElseThrow()); clock.advance(Duration.ofSeconds(1)); - reindex(applicationId, "?clusterId=foo,boo&documentType=bar,baz"); + reindex(applicationId, "?clusterId=foo,boo&documentType=bar,baz", "{\"message\":\"Reindexing document types bar, baz in clusters foo, boo of application default.default\"}"); expected = expected.withReady("foo", "bar", clock.instant()) .withReady("foo", "baz", clock.instant()) .withReady("boo", "bar", clock.instant()) @@ -252,12 +252,12 @@ public class ApplicationHandlerTest { assertEquals(expected, database.readReindexingStatus(applicationId).orElseThrow()); - reindexing(applicationId, DELETE); + reindexing(applicationId, DELETE, "{\"message\":\"Reindexing disabled\"}"); expected = expected.enabled(false); assertEquals(expected, database.readReindexingStatus(applicationId).orElseThrow()); - reindexing(applicationId, POST); + reindexing(applicationId, POST, "{\"message\":\"Reindexing enabled\"}"); expected = expected.enabled(true); assertEquals(expected, database.readReindexingStatus(applicationId).orElseThrow()); @@ -607,13 +607,9 @@ public class ApplicationHandlerTest { reindexing(application, method, expectedBody, 200); } - private void reindexing(ApplicationId application, Method method) throws IOException { - reindexing(application, method, null); - } - - private void reindex(ApplicationId application, String query) throws IOException { + private void reindex(ApplicationId application, String query, String message) throws IOException { String reindexUrl = toUrlPath(application, Zone.defaultZone(), true) + "/reindex" + query; - assertHttpStatusCodeAndMessage(createApplicationHandler().handle(createTestRequest(reindexUrl, POST)), 200, ""); + assertHttpStatusCodeAndMessage(createApplicationHandler().handle(createTestRequest(reindexUrl, POST)), 200, message); } private void restart(ApplicationId application, Zone zone) throws IOException { diff --git a/default_build_settings.cmake b/default_build_settings.cmake index 11a2c50aac1..d8bbf21605a 100644 --- a/default_build_settings.cmake +++ b/default_build_settings.cmake @@ -31,7 +31,7 @@ endfunction() function(setup_vespa_default_build_settings_centos_8) message("-- Setting up default build settings for centos 8") set(DEFAULT_EXTRA_INCLUDE_DIRECTORY "${VESPA_DEPS}/include" "/usr/include/openblas" PARENT_SCOPE) - set(DEFAULT_VESPA_LLVM_VERSION "9" PARENT_SCOPE) + set(DEFAULT_VESPA_LLVM_VERSION "10" PARENT_SCOPE) endfunction() function(setup_vespa_default_build_settings_darwin) diff --git a/dist/vespa.spec b/dist/vespa.spec index 6f4c8c65091..b4bbb13e7d6 100644 --- a/dist/vespa.spec +++ b/dist/vespa.spec @@ -64,7 +64,11 @@ BuildRequires: vespa-libzstd-devel >= 1.4.5-2 %endif %if 0%{?el8} BuildRequires: cmake >= 3.11.4-3 +%if 0%{?centos} +BuildRequires: llvm-devel >= 10.0.1 +%else BuildRequires: llvm-devel >= 9.0.1 +%endif BuildRequires: boost-devel >= 1.66 BuildRequires: openssl-devel BuildRequires: vespa-gtest >= 1.8.1-1 @@ -173,8 +177,13 @@ Requires: vespa-zstd >= 1.4.5-2 %define _extra_include_directory /usr/include/llvm7.0;%{_vespa_deps_prefix}/include;/usr/include/openblas %endif %if 0%{?el8} +%if 0%{?centos} +Requires: llvm-libs >= 10.0.1 +%define _vespa_llvm_version 10 +%else Requires: llvm-libs >= 9.0.1 %define _vespa_llvm_version 9 +%endif Requires: openssl-libs Requires: vespa-lz4 >= 1.9.2-2 Requires: vespa-onnxruntime = 1.4.0 diff --git a/fbench/src/fbench/fbench.cpp b/fbench/src/fbench/fbench.cpp index efac34409cc..b2bdc69eca4 100644 --- a/fbench/src/fbench/fbench.cpp +++ b/fbench/src/fbench/fbench.cpp @@ -291,7 +291,7 @@ FBench::Usage() printf(" [-s seconds] [-q queryFilePattern] [-o outputFilePattern]\n"); printf(" [-r restartLimit] [-m maxLineSize] [-k] <hostname> <port>\n\n"); printf(" -H <str> : append extra header to each get request.\n"); - printf(" -A <str> : assign autority. <str> should be hostname:port format. Overrides Host: header sent.\n"); + printf(" -A <str> : assign authority. <str> should be hostname:port format. Overrides Host: header sent.\n"); printf(" -P : use POST for requests instead of GET.\n"); printf(" -a <str> : append string to each query\n"); printf(" -n <num> : run with <num> parallel clients [10]\n"); diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp index 520e5c31cfb..e566f16dcdd 100644 --- a/storage/src/tests/distributor/externaloperationhandlertest.cpp +++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp @@ -610,10 +610,8 @@ TEST_F(OperationHandlerSequencingTest, put_with_bucket_lock_tas_token_is_rejecte auto put = makePutCommand("testdoctype1", "id:foo:testdoctype1:n=1:bar"); put->setCondition(bucket_lock_bypass_tas_condition("foo")); ASSERT_NO_FATAL_FAILURE(start_operation_verify_rejected(put)); - // TODO determine most appropriate error code here. Want to fail the bucket but - // not the entire visitor operation. Will likely need to be revisited (heh!) soon. - EXPECT_EQ("ReturnCode(REJECTED, Operation expects a read-for-write bucket lock to be present, " - "but none currently exists)", + EXPECT_EQ("ReturnCode(TEST_AND_SET_CONDITION_FAILED, Operation expects a read-for-write bucket " + "lock to be present, but none currently exists)", _sender.reply(0)->getResult().toString()); } @@ -625,8 +623,8 @@ TEST_F(OperationHandlerSequencingTest, put_with_bucket_lock_tas_token_is_rejecte Operation::SP op; ASSERT_NO_FATAL_FAILURE(start_operation_verify_not_rejected(makeUpdateCommand("testdoctype1", _dummy_id), op)); ASSERT_NO_FATAL_FAILURE(start_operation_verify_rejected(std::move(put))); - EXPECT_EQ("ReturnCode(REJECTED, Operation expects a read-for-write bucket lock to be present, " - "but none currently exists)", + EXPECT_EQ("ReturnCode(TEST_AND_SET_CONDITION_FAILED, Operation expects a read-for-write bucket " + "lock to be present, but none currently exists)", _sender.reply(0)->getResult().toString()); } diff --git a/storage/src/tests/visiting/visitortest.cpp b/storage/src/tests/visiting/visitortest.cpp index b362e368b9b..a8c317655ca 100644 --- a/storage/src/tests/visiting/visitortest.cpp +++ b/storage/src/tests/visiting/visitortest.cpp @@ -1,21 +1,22 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/config/common/exceptions.h> #include <vespa/document/fieldvalue/intfieldvalue.h> #include <vespa/document/fieldvalue/stringfieldvalue.h> #include <vespa/document/test/make_bucket_space.h> -#include <vespa/storageapi/message/datagram.h> -#include <vespa/storageapi/message/persistence.h> +#include <vespa/documentapi/messagebus/messages/putdocumentmessage.h> +#include <vespa/documentapi/messagebus/messages/removedocumentmessage.h> +#include <vespa/documentapi/messagebus/messages/visitor.h> +#include <vespa/storage/common/reindexing_constants.h> #include <vespa/storage/persistence/filestorage/filestormanager.h> #include <vespa/storage/visiting/visitormanager.h> +#include <vespa/storageapi/message/datagram.h> +#include <vespa/storageapi/message/persistence.h> #include <tests/common/testhelper.h> #include <tests/common/teststorageapp.h> #include <tests/common/dummystoragelink.h> #include <tests/storageserver/testvisitormessagesession.h> -#include <vespa/documentapi/messagebus/messages/putdocumentmessage.h> -#include <vespa/documentapi/messagebus/messages/removedocumentmessage.h> -#include <vespa/documentapi/messagebus/messages/visitor.h> #include <vespa/vespalib/io/fileutil.h> -#include <vespa/config/common/exceptions.h> #include <vespa/vespalib/gtest/gtest.h> #include <thread> #include <sys/stat.h> @@ -681,8 +682,7 @@ VisitorTest::sendInitialCreateVisitorAndGetIterRound() { GetIterCommand::SP getIterCmd; ASSERT_NO_FATAL_FAILURE(fetchSingleCommand<GetIterCommand>(*_bottom, getIterCmd)); - sendGetIterReply(*getIterCmd, api::ReturnCode(api::ReturnCode::OK), - 1, true); + sendGetIterReply(*getIterCmd, api::ReturnCode(api::ReturnCode::OK), 1, true); } } @@ -729,8 +729,7 @@ TEST_F(VisitorTest, no_visitor_notification_for_transient_failures) { } TEST_F(VisitorTest, notification_sent_if_transient_error_retried_many_times) { - constexpr size_t retries( - Visitor::TRANSIENT_ERROR_RETRIES_BEFORE_NOTIFY); + constexpr size_t retries = Visitor::TRANSIENT_ERROR_RETRIES_BEFORE_NOTIFY; ASSERT_NO_FATAL_FAILURE(initializeTest()); sendInitialCreateVisitorAndGetIterRound(); @@ -765,7 +764,6 @@ VisitorTest::doCompleteVisitingSession( const std::shared_ptr<api::CreateVisitorCommand>& cmd, std::shared_ptr<api::CreateVisitorReply>& reply_out) { - initializeTest(); _top->sendDown(cmd); sendCreateIteratorReply(); @@ -795,6 +793,7 @@ VisitorTest::doCompleteVisitingSession( } TEST_F(VisitorTest, no_mbus_tracing_if_trace_level_is_zero) { + ASSERT_NO_FATAL_FAILURE(initializeTest()); std::shared_ptr<api::CreateVisitorCommand> cmd(makeCreateVisitor()); cmd->getTrace().setLevel(0); std::shared_ptr<api::CreateVisitorReply> reply; @@ -803,6 +802,7 @@ TEST_F(VisitorTest, no_mbus_tracing_if_trace_level_is_zero) { } TEST_F(VisitorTest, reply_contains_trace_if_trace_level_above_zero) { + ASSERT_NO_FATAL_FAILURE(initializeTest()); std::shared_ptr<api::CreateVisitorCommand> cmd(makeCreateVisitor()); cmd->getTrace().setLevel(1); cmd->getTrace().trace(1,"at least one trace."); @@ -887,4 +887,65 @@ TEST_F(VisitorTest, test_visitor_invokes_weak_read_consistency_iteration) { "testvisitor", spi::ReadConsistency::WEAK); } +struct ReindexingVisitorTest : VisitorTest { + void respond_with_docs_from_persistence() { + sendCreateIteratorReply(); + GetIterCommand::SP get_iter_cmd; + // Reply to GetIter with a single doc and bucket completed + ASSERT_NO_FATAL_FAILURE(fetchSingleCommand<GetIterCommand>(*_bottom, get_iter_cmd)); + sendGetIterReply(*get_iter_cmd, api::ReturnCode(api::ReturnCode::OK), 1, true); + } + + void respond_to_client_put(api::ReturnCode::Result result) { + // Reply to the Put from "client" back to the visitor + std::vector<document::Document::SP> docs; + std::vector<document::DocumentId> doc_ids; + std::vector<std::string> info_messages; + getMessagesAndReply(1, getSession(0), docs, doc_ids, info_messages, result); + } + + void complete_visitor() { + DestroyIteratorCommand::SP destroy_iter_cmd; + ASSERT_NO_FATAL_FAILURE(fetchSingleCommand<DestroyIteratorCommand>(*_bottom, destroy_iter_cmd)); + } +}; + +TEST_F(ReindexingVisitorTest, puts_are_sent_with_tas_condition) { + ASSERT_NO_FATAL_FAILURE(initializeTest()); + auto cmd = makeCreateVisitor(VisitorOptions().withVisitorType("reindexingvisitor")); + cmd->getParameters().set(reindexing_bucket_lock_visitor_parameter_key(), "foobar"); + _top->sendDown(cmd); + + ASSERT_NO_FATAL_FAILURE(respond_with_docs_from_persistence()); + auto& session = getSession(0); + session.waitForMessages(1); + + ASSERT_EQ(session.sentMessages.size(), 1u); + auto* put_cmd = dynamic_cast<documentapi::PutDocumentMessage*>(session.sentMessages.front().get()); + ASSERT_TRUE(put_cmd); + auto token_str = vespalib::make_string("%s=foobar", reindexing_bucket_lock_bypass_prefix()); + EXPECT_EQ(put_cmd->getCondition().getSelection(), token_str); + + ASSERT_NO_FATAL_FAILURE(respond_to_client_put(api::ReturnCode::OK)); + ASSERT_NO_FATAL_FAILURE(complete_visitor()); + + ASSERT_NO_FATAL_FAILURE(verifyCreateVisitorReply(api::ReturnCode::OK)); + ASSERT_TRUE(waitUntilNoActiveVisitors()); +} + + +TEST_F(ReindexingVisitorTest, tas_responses_fail_the_visitor_and_are_rewritten_to_aborted) { + ASSERT_NO_FATAL_FAILURE(initializeTest()); + auto cmd = makeCreateVisitor(VisitorOptions().withVisitorType("reindexingvisitor")); + cmd->getParameters().set(reindexing_bucket_lock_visitor_parameter_key(), "foobar"); + _top->sendDown(cmd); + + ASSERT_NO_FATAL_FAILURE(respond_with_docs_from_persistence()); + ASSERT_NO_FATAL_FAILURE(respond_to_client_put(api::ReturnCode::TEST_AND_SET_CONDITION_FAILED)); + ASSERT_NO_FATAL_FAILURE(complete_visitor()); + + ASSERT_NO_FATAL_FAILURE(verifyCreateVisitorReply(api::ReturnCode::ABORTED, -1, -1)); + ASSERT_TRUE(waitUntilNoActiveVisitors()); +} + } // namespace storage diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp index 7c468457430..c7466c5f420 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp @@ -301,12 +301,12 @@ bool ExternalOperationHandler::onPut(const std::shared_ptr<api::PutCommand>& cmd cmd->setCondition(documentapi::TestAndSetCondition()); // Must clear TaS or the backend will reject the op allow = true; } else { - bounce_with_result(*cmd, api::ReturnCode(api::ReturnCode::REJECTED, + bounce_with_result(*cmd, api::ReturnCode(api::ReturnCode::TEST_AND_SET_CONDITION_FAILED, "Expected bucket lock token did not match actual lock token")); return true; } } else { - bounce_with_result(*cmd, api::ReturnCode(api::ReturnCode::REJECTED, + bounce_with_result(*cmd, api::ReturnCode(api::ReturnCode::TEST_AND_SET_CONDITION_FAILED, "Operation expects a read-for-write bucket lock to be present, " "but none currently exists")); return true; diff --git a/storage/src/vespa/storage/visiting/reindexing_visitor.cpp b/storage/src/vespa/storage/visiting/reindexing_visitor.cpp index 2d96876b641..5577bcd00ae 100644 --- a/storage/src/vespa/storage/visiting/reindexing_visitor.cpp +++ b/storage/src/vespa/storage/visiting/reindexing_visitor.cpp @@ -19,7 +19,8 @@ void ReindexingVisitor::handleDocuments(const document::BucketId& /*bucketId*/, HitCounter& hitCounter) { auto lock_token = make_lock_access_token(); - LOG(debug, "Visitor %s handling block of %zu documents.", _id.c_str(), entries.size()); + LOG(debug, "ReindexingVisitor %s handling block of %zu documents. Using access token '%s'", + _id.c_str(), entries.size(), lock_token.c_str()); for (auto& entry : entries) { if (entry->isRemove()) { // We don't reindex removed documents, as that would be very silly. @@ -34,6 +35,15 @@ void ReindexingVisitor::handleDocuments(const document::BucketId& /*bucketId*/, } } +bool ReindexingVisitor::remap_docapi_message_error_code(api::ReturnCode& in_out_code) { + if (in_out_code.getResult() == api::ReturnCode::TEST_AND_SET_CONDITION_FAILED) { + in_out_code = api::ReturnCode(api::ReturnCode::ABORTED, "Got TaS failure from upstream, indicating visitor is " + "outdated. Aborting session to allow client to retry"); + return true; + } + return Visitor::remap_docapi_message_error_code(in_out_code); +} + vespalib::string ReindexingVisitor::make_lock_access_token() const { vespalib::string prefix = reindexing_bucket_lock_bypass_prefix(); vespalib::stringref passed_token = visitor_parameters().get( diff --git a/storage/src/vespa/storage/visiting/reindexing_visitor.h b/storage/src/vespa/storage/visiting/reindexing_visitor.h index 815ef7e67fa..0fdba607134 100644 --- a/storage/src/vespa/storage/visiting/reindexing_visitor.h +++ b/storage/src/vespa/storage/visiting/reindexing_visitor.h @@ -21,6 +21,7 @@ public: private: void handleDocuments(const document::BucketId&, std::vector<spi::DocEntry::UP>&, HitCounter&) override; + bool remap_docapi_message_error_code(api::ReturnCode& in_out_code) override; vespalib::string make_lock_access_token() const; }; diff --git a/storage/src/vespa/storage/visiting/visitor.cpp b/storage/src/vespa/storage/visiting/visitor.cpp index c846e0357b4..e69721e8177 100644 --- a/storage/src/vespa/storage/visiting/visitor.cpp +++ b/storage/src/vespa/storage/visiting/visitor.cpp @@ -607,6 +607,11 @@ Visitor::visitor_parameters() const noexcept { return _initiatingCmd->getParameters(); } +bool +Visitor::remap_docapi_message_error_code(api::ReturnCode& in_out_code) { + return in_out_code.isCriticalForVisitor(); +} + void Visitor::handleDocumentApiReply(mbus::Reply::UP reply, VisitorThreadMetrics& metrics) { @@ -640,8 +645,7 @@ Visitor::handleDocumentApiReply(mbus::Reply::UP reply, VisitorThreadMetrics& met metrics.visitorDestinationFailureReplies.inc(); if (message->getType() == documentapi::DocumentProtocol::MESSAGE_VISITORINFO) { - LOG(debug, "Aborting visitor as we failed to talk to " - "controller: %s", + LOG(debug, "Aborting visitor as we failed to talk to controller: %s", reply->getError(0).toString().c_str()); api::ReturnCode returnCode( static_cast<api::ReturnCode::Result>( @@ -655,7 +659,8 @@ Visitor::handleDocumentApiReply(mbus::Reply::UP reply, VisitorThreadMetrics& met api::ReturnCode returnCode( static_cast<api::ReturnCode::Result>(reply->getError(0).getCode()), reply->getError(0).getMessage()); - if (returnCode.isCriticalForVisitor()) { + const bool should_fail = remap_docapi_message_error_code(returnCode); + if (should_fail) { // Abort - something is wrong with target. fail(returnCode, true); close(); diff --git a/storage/src/vespa/storage/visiting/visitor.h b/storage/src/vespa/storage/visiting/visitor.h index 6666308a0b9..d6b4c4b9381 100644 --- a/storage/src/vespa/storage/visiting/visitor.h +++ b/storage/src/vespa/storage/visiting/visitor.h @@ -350,6 +350,12 @@ protected: bool addBoundedTrace(uint32_t level, const vespalib::string& message); const vdslib::Parameters& visitor_parameters() const noexcept; + + // Possibly modifies the ReturnCode parameter in-place if its return code should + // be changed based on visitor subclass-specific behavior. + // Returns true if the visitor itself should be failed (aborted) with the + // error code, false if the DocumentAPI message should be retried later. + [[nodiscard]] virtual bool remap_docapi_message_error_code(api::ReturnCode& in_out_code); public: Visitor(StorageComponent& component); virtual ~Visitor(); diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/JobMetrics.java b/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/JobMetrics.java index a43e2156025..483057a828d 100644 --- a/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/JobMetrics.java +++ b/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/JobMetrics.java @@ -22,7 +22,7 @@ public class JobMetrics { /** Record a run for given job */ public void recordRunOf(String job) { - incompleteRuns.compute(job, (ignored, run) -> run == null ? 1 : ++run); + incompleteRuns.merge(job, 1L, Long::sum); } /** Record successful run of given job */ diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/ZooKeeperRunner.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/ZooKeeperRunner.java index 5eeb31c31bb..71ac4bbf62f 100644 --- a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/ZooKeeperRunner.java +++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/ZooKeeperRunner.java @@ -26,6 +26,7 @@ import static com.yahoo.vespa.zookeeper.Configurator.zookeeperServerHostnames; public class ZooKeeperRunner implements Runnable { private static final Logger log = java.util.logging.Logger.getLogger(ZooKeeperRunner.class.getName()); + private static final Duration shutdownTimeout = Duration.ofSeconds(10); private final ExecutorService executorService; private final ZookeeperServerConfig zookeeperServerConfig; @@ -40,10 +41,12 @@ public class ZooKeeperRunner implements Runnable { } void shutdown() { + log.log(Level.INFO, "Triggering shutdown"); executorService.shutdownNow(); + log.log(Level.INFO, "Shutdown triggered"); try { - if (!executorService.awaitTermination(10000, TimeUnit.MILLISECONDS)) { - log.log(Level.WARNING, "Failed to shut down within timeout"); + if (!executorService.awaitTermination(shutdownTimeout.toMillis(), TimeUnit.MILLISECONDS)) { + log.log(Level.WARNING, "Failed to shut down within " + shutdownTimeout); } } catch (InterruptedException e) { log.log(Level.INFO, "Interrupted waiting for executor to complete", e); @@ -71,7 +74,7 @@ public class ZooKeeperRunner implements Runnable { log.log(Level.INFO, "Failed interrupting task", e); } } - } while (Instant.now().isBefore(end)); + } while (Instant.now().isBefore(end) && !executorService.isShutdown()); } private void startServer(Path path) { |