summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java33
-rw-r--r--configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java24
-rw-r--r--default_build_settings.cmake2
-rw-r--r--dist/vespa.spec9
-rw-r--r--fbench/src/fbench/fbench.cpp2
-rw-r--r--storage/src/tests/distributor/externaloperationhandlertest.cpp10
-rw-r--r--storage/src/tests/visiting/visitortest.cpp83
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.cpp4
-rw-r--r--storage/src/vespa/storage/visiting/reindexing_visitor.cpp12
-rw-r--r--storage/src/vespa/storage/visiting/reindexing_visitor.h1
-rw-r--r--storage/src/vespa/storage/visiting/visitor.cpp11
-rw-r--r--storage/src/vespa/storage/visiting/visitor.h6
-rw-r--r--vespajlib/src/main/java/com/yahoo/concurrent/maintenance/JobMetrics.java2
-rw-r--r--zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/ZooKeeperRunner.java9
14 files changed, 152 insertions, 56 deletions
diff --git a/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java b/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java
index 900061949f4..0b461b10963 100644
--- a/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java
+++ b/configserver/src/main/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandler.java
@@ -16,6 +16,7 @@ import com.yahoo.jdisc.Response;
import com.yahoo.jdisc.application.BindingMatch;
import com.yahoo.jdisc.application.UriPattern;
import com.yahoo.slime.Cursor;
+import com.yahoo.text.StringUtilities;
import com.yahoo.vespa.config.server.ApplicationRepository;
import com.yahoo.vespa.config.server.application.ApplicationReindexing;
import com.yahoo.vespa.config.server.application.ClusterReindexing;
@@ -35,6 +36,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.stream.Stream;
import static java.util.Map.Entry.comparingByKey;
@@ -86,7 +88,7 @@ public class ApplicationHandler extends HttpHandler {
if (isReindexingRequest(request)) {
applicationRepository.modifyReindexing(applicationId, reindexing -> reindexing.enabled(false));
- return new JSONResponse(Response.Status.OK);
+ return createMessageResponse("Reindexing disabled");
}
if (applicationRepository.delete(applicationId))
@@ -211,27 +213,20 @@ public class ApplicationHandler extends HttpHandler {
}
if (isReindexRequest(request)) {
- triggerReindexing(request, applicationId);
- return new JSONResponse(Response.Status.OK);
+ return triggerReindexing(request, applicationId);
}
if (isReindexingRequest(request)) {
applicationRepository.modifyReindexing(applicationId, reindexing -> reindexing.enabled(true));
- return new JSONResponse(Response.Status.OK);
+ return createMessageResponse("Reindexing enabled");
}
throw new NotFoundException("Illegal POST request '" + request.getUri() + "'");
}
- private void triggerReindexing(HttpRequest request, ApplicationId applicationId) {
- List<String> clusters = Optional.ofNullable(request.getProperty("clusterId")).stream()
- .flatMap(value -> Stream.of(value.split(",")))
- .filter(cluster -> ! cluster.isBlank())
- .collect(toList());
- List<String> types = Optional.ofNullable(request.getProperty("documentType")).stream()
- .flatMap(value -> Stream.of(value.split(",")))
- .filter(type -> ! type.isBlank())
- .collect(toList());
+ private HttpResponse triggerReindexing(HttpRequest request, ApplicationId applicationId) {
+ Set<String> clusters = StringUtilities.split(request.getProperty("clusterId"));
+ Set<String> types = StringUtilities.split(request.getProperty("documentType"));
Instant now = applicationRepository.clock().instant();
applicationRepository.modifyReindexing(applicationId, reindexing -> {
if (clusters.isEmpty())
@@ -245,6 +240,14 @@ public class ApplicationHandler extends HttpHandler {
reindexing = reindexing.withReady(cluster, type, now);
return reindexing;
});
+
+ String message = "Reindexing " +
+ (clusters.isEmpty() ? ""
+ : (types.isEmpty() ? ""
+ : "document types " + String.join(", ", types) + " in ") +
+ "clusters " + String.join(", ", clusters) + " of ") +
+ "application " + applicationId;
+ return createMessageResponse(message);
}
private HttpResponse getReindexingStatus(ApplicationId applicationId) {
@@ -488,4 +491,8 @@ public class ApplicationHandler extends HttpHandler {
}
+ private static JSONResponse createMessageResponse(String message) {
+ return new JSONResponse(Response.Status.OK) { { object.setString("message", message); } };
+ }
+
}
diff --git a/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java b/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java
index f2558fa50b8..a05a4a5559b 100644
--- a/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java
+++ b/configserver/src/test/java/com/yahoo/vespa/config/server/http/v2/ApplicationHandlerTest.java
@@ -79,7 +79,7 @@ public class ApplicationHandlerTest {
private final static TenantName mytenantName = TenantName.from("mytenant");
private final static ApplicationId myTenantApplicationId = ApplicationId.from(mytenantName, ApplicationName.defaultName(), InstanceName.defaultName());
- private final static ApplicationId applicationId = ApplicationId.from(TenantName.defaultName(), ApplicationName.defaultName(), InstanceName.defaultName());
+ private final static ApplicationId applicationId = ApplicationId.defaultId();
private final static MockTesterClient testerClient = new MockTesterClient();
private static final MockLogRetriever logRetriever = new MockLogRetriever();
private static final Version vespaVersion = Version.fromString("7.8.9");
@@ -219,32 +219,32 @@ public class ApplicationHandlerTest {
database.readReindexingStatus(applicationId).orElseThrow());
clock.advance(Duration.ofSeconds(1));
- reindex(applicationId, "");
+ reindex(applicationId, "", "{\"message\":\"Reindexing application default.default\"}");
expected = expected.withReady(clock.instant());
assertEquals(expected,
database.readReindexingStatus(applicationId).orElseThrow());
clock.advance(Duration.ofSeconds(1));
expected = expected.withReady(clock.instant());
- reindex(applicationId, "?clusterId=");
+ reindex(applicationId, "?clusterId=", "{\"message\":\"Reindexing application default.default\"}");
assertEquals(expected,
database.readReindexingStatus(applicationId).orElseThrow());
clock.advance(Duration.ofSeconds(1));
expected = expected.withReady(clock.instant());
- reindex(applicationId, "?documentType=moo");
+ reindex(applicationId, "?documentType=moo", "{\"message\":\"Reindexing application default.default\"}");
assertEquals(expected,
database.readReindexingStatus(applicationId).orElseThrow());
clock.advance(Duration.ofSeconds(1));
- reindex(applicationId, "?clusterId=foo,boo");
+ reindex(applicationId, "?clusterId=foo,boo", "{\"message\":\"Reindexing clusters foo, boo of application default.default\"}");
expected = expected.withReady("foo", clock.instant())
.withReady("boo", clock.instant());
assertEquals(expected,
database.readReindexingStatus(applicationId).orElseThrow());
clock.advance(Duration.ofSeconds(1));
- reindex(applicationId, "?clusterId=foo,boo&documentType=bar,baz");
+ reindex(applicationId, "?clusterId=foo,boo&documentType=bar,baz", "{\"message\":\"Reindexing document types bar, baz in clusters foo, boo of application default.default\"}");
expected = expected.withReady("foo", "bar", clock.instant())
.withReady("foo", "baz", clock.instant())
.withReady("boo", "bar", clock.instant())
@@ -252,12 +252,12 @@ public class ApplicationHandlerTest {
assertEquals(expected,
database.readReindexingStatus(applicationId).orElseThrow());
- reindexing(applicationId, DELETE);
+ reindexing(applicationId, DELETE, "{\"message\":\"Reindexing disabled\"}");
expected = expected.enabled(false);
assertEquals(expected,
database.readReindexingStatus(applicationId).orElseThrow());
- reindexing(applicationId, POST);
+ reindexing(applicationId, POST, "{\"message\":\"Reindexing enabled\"}");
expected = expected.enabled(true);
assertEquals(expected,
database.readReindexingStatus(applicationId).orElseThrow());
@@ -607,13 +607,9 @@ public class ApplicationHandlerTest {
reindexing(application, method, expectedBody, 200);
}
- private void reindexing(ApplicationId application, Method method) throws IOException {
- reindexing(application, method, null);
- }
-
- private void reindex(ApplicationId application, String query) throws IOException {
+ private void reindex(ApplicationId application, String query, String message) throws IOException {
String reindexUrl = toUrlPath(application, Zone.defaultZone(), true) + "/reindex" + query;
- assertHttpStatusCodeAndMessage(createApplicationHandler().handle(createTestRequest(reindexUrl, POST)), 200, "");
+ assertHttpStatusCodeAndMessage(createApplicationHandler().handle(createTestRequest(reindexUrl, POST)), 200, message);
}
private void restart(ApplicationId application, Zone zone) throws IOException {
diff --git a/default_build_settings.cmake b/default_build_settings.cmake
index 11a2c50aac1..d8bbf21605a 100644
--- a/default_build_settings.cmake
+++ b/default_build_settings.cmake
@@ -31,7 +31,7 @@ endfunction()
function(setup_vespa_default_build_settings_centos_8)
message("-- Setting up default build settings for centos 8")
set(DEFAULT_EXTRA_INCLUDE_DIRECTORY "${VESPA_DEPS}/include" "/usr/include/openblas" PARENT_SCOPE)
- set(DEFAULT_VESPA_LLVM_VERSION "9" PARENT_SCOPE)
+ set(DEFAULT_VESPA_LLVM_VERSION "10" PARENT_SCOPE)
endfunction()
function(setup_vespa_default_build_settings_darwin)
diff --git a/dist/vespa.spec b/dist/vespa.spec
index 6f4c8c65091..b4bbb13e7d6 100644
--- a/dist/vespa.spec
+++ b/dist/vespa.spec
@@ -64,7 +64,11 @@ BuildRequires: vespa-libzstd-devel >= 1.4.5-2
%endif
%if 0%{?el8}
BuildRequires: cmake >= 3.11.4-3
+%if 0%{?centos}
+BuildRequires: llvm-devel >= 10.0.1
+%else
BuildRequires: llvm-devel >= 9.0.1
+%endif
BuildRequires: boost-devel >= 1.66
BuildRequires: openssl-devel
BuildRequires: vespa-gtest >= 1.8.1-1
@@ -173,8 +177,13 @@ Requires: vespa-zstd >= 1.4.5-2
%define _extra_include_directory /usr/include/llvm7.0;%{_vespa_deps_prefix}/include;/usr/include/openblas
%endif
%if 0%{?el8}
+%if 0%{?centos}
+Requires: llvm-libs >= 10.0.1
+%define _vespa_llvm_version 10
+%else
Requires: llvm-libs >= 9.0.1
%define _vespa_llvm_version 9
+%endif
Requires: openssl-libs
Requires: vespa-lz4 >= 1.9.2-2
Requires: vespa-onnxruntime = 1.4.0
diff --git a/fbench/src/fbench/fbench.cpp b/fbench/src/fbench/fbench.cpp
index efac34409cc..b2bdc69eca4 100644
--- a/fbench/src/fbench/fbench.cpp
+++ b/fbench/src/fbench/fbench.cpp
@@ -291,7 +291,7 @@ FBench::Usage()
printf(" [-s seconds] [-q queryFilePattern] [-o outputFilePattern]\n");
printf(" [-r restartLimit] [-m maxLineSize] [-k] <hostname> <port>\n\n");
printf(" -H <str> : append extra header to each get request.\n");
- printf(" -A <str> : assign autority. <str> should be hostname:port format. Overrides Host: header sent.\n");
+ printf(" -A <str> : assign authority. <str> should be hostname:port format. Overrides Host: header sent.\n");
printf(" -P : use POST for requests instead of GET.\n");
printf(" -a <str> : append string to each query\n");
printf(" -n <num> : run with <num> parallel clients [10]\n");
diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp
index 520e5c31cfb..e566f16dcdd 100644
--- a/storage/src/tests/distributor/externaloperationhandlertest.cpp
+++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp
@@ -610,10 +610,8 @@ TEST_F(OperationHandlerSequencingTest, put_with_bucket_lock_tas_token_is_rejecte
auto put = makePutCommand("testdoctype1", "id:foo:testdoctype1:n=1:bar");
put->setCondition(bucket_lock_bypass_tas_condition("foo"));
ASSERT_NO_FATAL_FAILURE(start_operation_verify_rejected(put));
- // TODO determine most appropriate error code here. Want to fail the bucket but
- // not the entire visitor operation. Will likely need to be revisited (heh!) soon.
- EXPECT_EQ("ReturnCode(REJECTED, Operation expects a read-for-write bucket lock to be present, "
- "but none currently exists)",
+ EXPECT_EQ("ReturnCode(TEST_AND_SET_CONDITION_FAILED, Operation expects a read-for-write bucket "
+ "lock to be present, but none currently exists)",
_sender.reply(0)->getResult().toString());
}
@@ -625,8 +623,8 @@ TEST_F(OperationHandlerSequencingTest, put_with_bucket_lock_tas_token_is_rejecte
Operation::SP op;
ASSERT_NO_FATAL_FAILURE(start_operation_verify_not_rejected(makeUpdateCommand("testdoctype1", _dummy_id), op));
ASSERT_NO_FATAL_FAILURE(start_operation_verify_rejected(std::move(put)));
- EXPECT_EQ("ReturnCode(REJECTED, Operation expects a read-for-write bucket lock to be present, "
- "but none currently exists)",
+ EXPECT_EQ("ReturnCode(TEST_AND_SET_CONDITION_FAILED, Operation expects a read-for-write bucket "
+ "lock to be present, but none currently exists)",
_sender.reply(0)->getResult().toString());
}
diff --git a/storage/src/tests/visiting/visitortest.cpp b/storage/src/tests/visiting/visitortest.cpp
index b362e368b9b..a8c317655ca 100644
--- a/storage/src/tests/visiting/visitortest.cpp
+++ b/storage/src/tests/visiting/visitortest.cpp
@@ -1,21 +1,22 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/config/common/exceptions.h>
#include <vespa/document/fieldvalue/intfieldvalue.h>
#include <vespa/document/fieldvalue/stringfieldvalue.h>
#include <vespa/document/test/make_bucket_space.h>
-#include <vespa/storageapi/message/datagram.h>
-#include <vespa/storageapi/message/persistence.h>
+#include <vespa/documentapi/messagebus/messages/putdocumentmessage.h>
+#include <vespa/documentapi/messagebus/messages/removedocumentmessage.h>
+#include <vespa/documentapi/messagebus/messages/visitor.h>
+#include <vespa/storage/common/reindexing_constants.h>
#include <vespa/storage/persistence/filestorage/filestormanager.h>
#include <vespa/storage/visiting/visitormanager.h>
+#include <vespa/storageapi/message/datagram.h>
+#include <vespa/storageapi/message/persistence.h>
#include <tests/common/testhelper.h>
#include <tests/common/teststorageapp.h>
#include <tests/common/dummystoragelink.h>
#include <tests/storageserver/testvisitormessagesession.h>
-#include <vespa/documentapi/messagebus/messages/putdocumentmessage.h>
-#include <vespa/documentapi/messagebus/messages/removedocumentmessage.h>
-#include <vespa/documentapi/messagebus/messages/visitor.h>
#include <vespa/vespalib/io/fileutil.h>
-#include <vespa/config/common/exceptions.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <thread>
#include <sys/stat.h>
@@ -681,8 +682,7 @@ VisitorTest::sendInitialCreateVisitorAndGetIterRound()
{
GetIterCommand::SP getIterCmd;
ASSERT_NO_FATAL_FAILURE(fetchSingleCommand<GetIterCommand>(*_bottom, getIterCmd));
- sendGetIterReply(*getIterCmd, api::ReturnCode(api::ReturnCode::OK),
- 1, true);
+ sendGetIterReply(*getIterCmd, api::ReturnCode(api::ReturnCode::OK), 1, true);
}
}
@@ -729,8 +729,7 @@ TEST_F(VisitorTest, no_visitor_notification_for_transient_failures) {
}
TEST_F(VisitorTest, notification_sent_if_transient_error_retried_many_times) {
- constexpr size_t retries(
- Visitor::TRANSIENT_ERROR_RETRIES_BEFORE_NOTIFY);
+ constexpr size_t retries = Visitor::TRANSIENT_ERROR_RETRIES_BEFORE_NOTIFY;
ASSERT_NO_FATAL_FAILURE(initializeTest());
sendInitialCreateVisitorAndGetIterRound();
@@ -765,7 +764,6 @@ VisitorTest::doCompleteVisitingSession(
const std::shared_ptr<api::CreateVisitorCommand>& cmd,
std::shared_ptr<api::CreateVisitorReply>& reply_out)
{
- initializeTest();
_top->sendDown(cmd);
sendCreateIteratorReply();
@@ -795,6 +793,7 @@ VisitorTest::doCompleteVisitingSession(
}
TEST_F(VisitorTest, no_mbus_tracing_if_trace_level_is_zero) {
+ ASSERT_NO_FATAL_FAILURE(initializeTest());
std::shared_ptr<api::CreateVisitorCommand> cmd(makeCreateVisitor());
cmd->getTrace().setLevel(0);
std::shared_ptr<api::CreateVisitorReply> reply;
@@ -803,6 +802,7 @@ TEST_F(VisitorTest, no_mbus_tracing_if_trace_level_is_zero) {
}
TEST_F(VisitorTest, reply_contains_trace_if_trace_level_above_zero) {
+ ASSERT_NO_FATAL_FAILURE(initializeTest());
std::shared_ptr<api::CreateVisitorCommand> cmd(makeCreateVisitor());
cmd->getTrace().setLevel(1);
cmd->getTrace().trace(1,"at least one trace.");
@@ -887,4 +887,65 @@ TEST_F(VisitorTest, test_visitor_invokes_weak_read_consistency_iteration) {
"testvisitor", spi::ReadConsistency::WEAK);
}
+struct ReindexingVisitorTest : VisitorTest {
+ void respond_with_docs_from_persistence() {
+ sendCreateIteratorReply();
+ GetIterCommand::SP get_iter_cmd;
+ // Reply to GetIter with a single doc and bucket completed
+ ASSERT_NO_FATAL_FAILURE(fetchSingleCommand<GetIterCommand>(*_bottom, get_iter_cmd));
+ sendGetIterReply(*get_iter_cmd, api::ReturnCode(api::ReturnCode::OK), 1, true);
+ }
+
+ void respond_to_client_put(api::ReturnCode::Result result) {
+ // Reply to the Put from "client" back to the visitor
+ std::vector<document::Document::SP> docs;
+ std::vector<document::DocumentId> doc_ids;
+ std::vector<std::string> info_messages;
+ getMessagesAndReply(1, getSession(0), docs, doc_ids, info_messages, result);
+ }
+
+ void complete_visitor() {
+ DestroyIteratorCommand::SP destroy_iter_cmd;
+ ASSERT_NO_FATAL_FAILURE(fetchSingleCommand<DestroyIteratorCommand>(*_bottom, destroy_iter_cmd));
+ }
+};
+
+TEST_F(ReindexingVisitorTest, puts_are_sent_with_tas_condition) {
+ ASSERT_NO_FATAL_FAILURE(initializeTest());
+ auto cmd = makeCreateVisitor(VisitorOptions().withVisitorType("reindexingvisitor"));
+ cmd->getParameters().set(reindexing_bucket_lock_visitor_parameter_key(), "foobar");
+ _top->sendDown(cmd);
+
+ ASSERT_NO_FATAL_FAILURE(respond_with_docs_from_persistence());
+ auto& session = getSession(0);
+ session.waitForMessages(1);
+
+ ASSERT_EQ(session.sentMessages.size(), 1u);
+ auto* put_cmd = dynamic_cast<documentapi::PutDocumentMessage*>(session.sentMessages.front().get());
+ ASSERT_TRUE(put_cmd);
+ auto token_str = vespalib::make_string("%s=foobar", reindexing_bucket_lock_bypass_prefix());
+ EXPECT_EQ(put_cmd->getCondition().getSelection(), token_str);
+
+ ASSERT_NO_FATAL_FAILURE(respond_to_client_put(api::ReturnCode::OK));
+ ASSERT_NO_FATAL_FAILURE(complete_visitor());
+
+ ASSERT_NO_FATAL_FAILURE(verifyCreateVisitorReply(api::ReturnCode::OK));
+ ASSERT_TRUE(waitUntilNoActiveVisitors());
+}
+
+
+TEST_F(ReindexingVisitorTest, tas_responses_fail_the_visitor_and_are_rewritten_to_aborted) {
+ ASSERT_NO_FATAL_FAILURE(initializeTest());
+ auto cmd = makeCreateVisitor(VisitorOptions().withVisitorType("reindexingvisitor"));
+ cmd->getParameters().set(reindexing_bucket_lock_visitor_parameter_key(), "foobar");
+ _top->sendDown(cmd);
+
+ ASSERT_NO_FATAL_FAILURE(respond_with_docs_from_persistence());
+ ASSERT_NO_FATAL_FAILURE(respond_to_client_put(api::ReturnCode::TEST_AND_SET_CONDITION_FAILED));
+ ASSERT_NO_FATAL_FAILURE(complete_visitor());
+
+ ASSERT_NO_FATAL_FAILURE(verifyCreateVisitorReply(api::ReturnCode::ABORTED, -1, -1));
+ ASSERT_TRUE(waitUntilNoActiveVisitors());
+}
+
} // namespace storage
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
index 7c468457430..c7466c5f420 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
@@ -301,12 +301,12 @@ bool ExternalOperationHandler::onPut(const std::shared_ptr<api::PutCommand>& cmd
cmd->setCondition(documentapi::TestAndSetCondition()); // Must clear TaS or the backend will reject the op
allow = true;
} else {
- bounce_with_result(*cmd, api::ReturnCode(api::ReturnCode::REJECTED,
+ bounce_with_result(*cmd, api::ReturnCode(api::ReturnCode::TEST_AND_SET_CONDITION_FAILED,
"Expected bucket lock token did not match actual lock token"));
return true;
}
} else {
- bounce_with_result(*cmd, api::ReturnCode(api::ReturnCode::REJECTED,
+ bounce_with_result(*cmd, api::ReturnCode(api::ReturnCode::TEST_AND_SET_CONDITION_FAILED,
"Operation expects a read-for-write bucket lock to be present, "
"but none currently exists"));
return true;
diff --git a/storage/src/vespa/storage/visiting/reindexing_visitor.cpp b/storage/src/vespa/storage/visiting/reindexing_visitor.cpp
index 2d96876b641..5577bcd00ae 100644
--- a/storage/src/vespa/storage/visiting/reindexing_visitor.cpp
+++ b/storage/src/vespa/storage/visiting/reindexing_visitor.cpp
@@ -19,7 +19,8 @@ void ReindexingVisitor::handleDocuments(const document::BucketId& /*bucketId*/,
HitCounter& hitCounter)
{
auto lock_token = make_lock_access_token();
- LOG(debug, "Visitor %s handling block of %zu documents.", _id.c_str(), entries.size());
+ LOG(debug, "ReindexingVisitor %s handling block of %zu documents. Using access token '%s'",
+ _id.c_str(), entries.size(), lock_token.c_str());
for (auto& entry : entries) {
if (entry->isRemove()) {
// We don't reindex removed documents, as that would be very silly.
@@ -34,6 +35,15 @@ void ReindexingVisitor::handleDocuments(const document::BucketId& /*bucketId*/,
}
}
+bool ReindexingVisitor::remap_docapi_message_error_code(api::ReturnCode& in_out_code) {
+ if (in_out_code.getResult() == api::ReturnCode::TEST_AND_SET_CONDITION_FAILED) {
+ in_out_code = api::ReturnCode(api::ReturnCode::ABORTED, "Got TaS failure from upstream, indicating visitor is "
+ "outdated. Aborting session to allow client to retry");
+ return true;
+ }
+ return Visitor::remap_docapi_message_error_code(in_out_code);
+}
+
vespalib::string ReindexingVisitor::make_lock_access_token() const {
vespalib::string prefix = reindexing_bucket_lock_bypass_prefix();
vespalib::stringref passed_token = visitor_parameters().get(
diff --git a/storage/src/vespa/storage/visiting/reindexing_visitor.h b/storage/src/vespa/storage/visiting/reindexing_visitor.h
index 815ef7e67fa..0fdba607134 100644
--- a/storage/src/vespa/storage/visiting/reindexing_visitor.h
+++ b/storage/src/vespa/storage/visiting/reindexing_visitor.h
@@ -21,6 +21,7 @@ public:
private:
void handleDocuments(const document::BucketId&, std::vector<spi::DocEntry::UP>&, HitCounter&) override;
+ bool remap_docapi_message_error_code(api::ReturnCode& in_out_code) override;
vespalib::string make_lock_access_token() const;
};
diff --git a/storage/src/vespa/storage/visiting/visitor.cpp b/storage/src/vespa/storage/visiting/visitor.cpp
index c846e0357b4..e69721e8177 100644
--- a/storage/src/vespa/storage/visiting/visitor.cpp
+++ b/storage/src/vespa/storage/visiting/visitor.cpp
@@ -607,6 +607,11 @@ Visitor::visitor_parameters() const noexcept {
return _initiatingCmd->getParameters();
}
+bool
+Visitor::remap_docapi_message_error_code(api::ReturnCode& in_out_code) {
+ return in_out_code.isCriticalForVisitor();
+}
+
void
Visitor::handleDocumentApiReply(mbus::Reply::UP reply, VisitorThreadMetrics& metrics)
{
@@ -640,8 +645,7 @@ Visitor::handleDocumentApiReply(mbus::Reply::UP reply, VisitorThreadMetrics& met
metrics.visitorDestinationFailureReplies.inc();
if (message->getType() == documentapi::DocumentProtocol::MESSAGE_VISITORINFO) {
- LOG(debug, "Aborting visitor as we failed to talk to "
- "controller: %s",
+ LOG(debug, "Aborting visitor as we failed to talk to controller: %s",
reply->getError(0).toString().c_str());
api::ReturnCode returnCode(
static_cast<api::ReturnCode::Result>(
@@ -655,7 +659,8 @@ Visitor::handleDocumentApiReply(mbus::Reply::UP reply, VisitorThreadMetrics& met
api::ReturnCode returnCode(
static_cast<api::ReturnCode::Result>(reply->getError(0).getCode()),
reply->getError(0).getMessage());
- if (returnCode.isCriticalForVisitor()) {
+ const bool should_fail = remap_docapi_message_error_code(returnCode);
+ if (should_fail) {
// Abort - something is wrong with target.
fail(returnCode, true);
close();
diff --git a/storage/src/vespa/storage/visiting/visitor.h b/storage/src/vespa/storage/visiting/visitor.h
index 6666308a0b9..d6b4c4b9381 100644
--- a/storage/src/vespa/storage/visiting/visitor.h
+++ b/storage/src/vespa/storage/visiting/visitor.h
@@ -350,6 +350,12 @@ protected:
bool addBoundedTrace(uint32_t level, const vespalib::string& message);
const vdslib::Parameters& visitor_parameters() const noexcept;
+
+ // Possibly modifies the ReturnCode parameter in-place if its return code should
+ // be changed based on visitor subclass-specific behavior.
+ // Returns true if the visitor itself should be failed (aborted) with the
+ // error code, false if the DocumentAPI message should be retried later.
+ [[nodiscard]] virtual bool remap_docapi_message_error_code(api::ReturnCode& in_out_code);
public:
Visitor(StorageComponent& component);
virtual ~Visitor();
diff --git a/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/JobMetrics.java b/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/JobMetrics.java
index a43e2156025..483057a828d 100644
--- a/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/JobMetrics.java
+++ b/vespajlib/src/main/java/com/yahoo/concurrent/maintenance/JobMetrics.java
@@ -22,7 +22,7 @@ public class JobMetrics {
/** Record a run for given job */
public void recordRunOf(String job) {
- incompleteRuns.compute(job, (ignored, run) -> run == null ? 1 : ++run);
+ incompleteRuns.merge(job, 1L, Long::sum);
}
/** Record successful run of given job */
diff --git a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/ZooKeeperRunner.java b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/ZooKeeperRunner.java
index 5eeb31c31bb..71ac4bbf62f 100644
--- a/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/ZooKeeperRunner.java
+++ b/zookeeper-server/zookeeper-server-common/src/main/java/com/yahoo/vespa/zookeeper/ZooKeeperRunner.java
@@ -26,6 +26,7 @@ import static com.yahoo.vespa.zookeeper.Configurator.zookeeperServerHostnames;
public class ZooKeeperRunner implements Runnable {
private static final Logger log = java.util.logging.Logger.getLogger(ZooKeeperRunner.class.getName());
+ private static final Duration shutdownTimeout = Duration.ofSeconds(10);
private final ExecutorService executorService;
private final ZookeeperServerConfig zookeeperServerConfig;
@@ -40,10 +41,12 @@ public class ZooKeeperRunner implements Runnable {
}
void shutdown() {
+ log.log(Level.INFO, "Triggering shutdown");
executorService.shutdownNow();
+ log.log(Level.INFO, "Shutdown triggered");
try {
- if (!executorService.awaitTermination(10000, TimeUnit.MILLISECONDS)) {
- log.log(Level.WARNING, "Failed to shut down within timeout");
+ if (!executorService.awaitTermination(shutdownTimeout.toMillis(), TimeUnit.MILLISECONDS)) {
+ log.log(Level.WARNING, "Failed to shut down within " + shutdownTimeout);
}
} catch (InterruptedException e) {
log.log(Level.INFO, "Interrupted waiting for executor to complete", e);
@@ -71,7 +74,7 @@ public class ZooKeeperRunner implements Runnable {
log.log(Level.INFO, "Failed interrupting task", e);
}
}
- } while (Instant.now().isBefore(end));
+ } while (Instant.now().isBefore(end) && !executorService.isShutdown());
}
private void startServer(Path path) {