diff options
4 files changed, 30 insertions, 25 deletions
diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp index 619c3151295..8f728c39bb8 100644 --- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp +++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp @@ -1048,28 +1048,28 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_consistent_get_reply_timestamps_do } TEST_F(TwoPhaseUpdateOperationTest, fast_path_not_restarted_if_replica_set_altered_between_get_send_and_receive) { - setupDistributor(3, 3, "storage:3 distributor:1"); - getConfig().set_update_fast_path_restart_enabled(true); - - std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas. - DistributorMessageSenderStub sender; - cb->start(sender, framework::MilliSecTime(0)); - - // Replica set changes between time of Get requests sent and - // responses received. This may happen e.g. if concurrent mutations - // to the same bucket create a new replica. If this happens, we - // must not send the Update operations verbatim, as they will - // be started with the _current_ replica set, not the one that - // was present during the Get request. - BucketId bucket(0x400000000000cac4); // Always the same in the test. - addNodesToBucketDB(bucket, "0=1/2/3,1=2/3/4,2=3/3/3"); - - Timestamp old_timestamp = 500; - ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true)); - replyToGet(*cb, sender, 0, old_timestamp); - replyToGet(*cb, sender, 1, old_timestamp); - - ASSERT_EQ("Put => 1,Put => 2,Put => 0", sender.getCommands(true, false, 2)); + setupDistributor(3, 3, "storage:3 distributor:1"); + getConfig().set_update_fast_path_restart_enabled(true); + + std::shared_ptr<TwoPhaseUpdateOperation> cb(sendUpdate("0=1/2/3,1=2/3/4")); // Inconsistent replicas. + DistributorMessageSenderStub sender; + cb->start(sender, framework::MilliSecTime(0)); + + // Replica set changes between time of Get requests sent and + // responses received. This may happen e.g. if concurrent mutations + // to the same bucket create a new replica. If this happens, we + // must not send the Update operations verbatim, as they will + // be started with the _current_ replica set, not the one that + // was present during the Get request. + BucketId bucket(0x400000000000cac4); // Always the same in the test. + addNodesToBucketDB(bucket, "0=1/2/3,1=2/3/4,2=3/3/3"); + + Timestamp old_timestamp = 500; + ASSERT_EQ("Get => 0,Get => 1", sender.getCommands(true)); + replyToGet(*cb, sender, 0, old_timestamp); + replyToGet(*cb, sender, 1, old_timestamp); + + ASSERT_EQ("Put => 1,Put => 2,Put => 0", sender.getCommands(true, false, 2)); } // XXX currently differs in behavior from content nodes in that updates for diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def index f292b60a70e..e4a002ae81f 100644 --- a/storage/src/vespa/storage/config/stor-distributormanager.def +++ b/storage/src/vespa/storage/config/stor-distributormanager.def @@ -213,4 +213,4 @@ use_btree_database bool default=false restart ## content nodes, the two-phase update path reverts back to the regular fast path. ## Since all replicas of the document were in sync, applying the update in-place ## shall be considered safe. -restart_with_fast_update_path_if_all_get_timestamps_are_consistent bool default=true +restart_with_fast_update_path_if_all_get_timestamps_are_consistent bool default=false diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp index a209c54cc47..47cacafee80 100644 --- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp @@ -35,6 +35,7 @@ TwoPhaseUpdateOperation::TwoPhaseUpdateOperation( _bucketSpace(bucketSpace), _sendState(SendState::NONE_SENT), _mode(Mode::FAST_PATH), + _fast_path_repair_source_node(0xffff), _replySent(false) { document::BucketIdFactory idFactory; @@ -52,7 +53,7 @@ struct IntermediateMessageSender : DistributorMessageSender { std::shared_ptr<api::StorageReply> _reply; IntermediateMessageSender(SentMessageMap& mm, std::shared_ptr<Operation> cb, DistributorMessageSender & fwd); - ~IntermediateMessageSender(); + ~IntermediateMessageSender() override; void sendCommand(const std::shared_ptr<api::StorageCommand>& cmd) override { msgMap.insert(cmd->getMsgId(), callback); @@ -312,11 +313,12 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorMessageSender& sender, LOG(debug, "Update(%s) fast path: was inconsistent!", _updateCmd->getDocumentId().toString().c_str()); _updateReply = intermediate._reply; + _fast_path_repair_source_node = bestNode.second; document::Bucket bucket(_updateCmd->getBucket().getBucketSpace(), bestNode.first); auto cmd = std::make_shared<api::GetCommand>(bucket, _updateCmd->getDocumentId(), "[all]"); copyMessageSettings(*_updateCmd, *cmd); - sender.sendToNode(lib::NodeType::STORAGE, bestNode.second, cmd); + sender.sendToNode(lib::NodeType::STORAGE, _fast_path_repair_source_node, cmd); transitionTo(SendState::GETS_SENT); } } @@ -325,6 +327,8 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorMessageSender& sender, // PUTs are done. addTraceFromReply(*intermediate._reply); sendReplyWithResult(sender, intermediate._reply->getResult()); + LOG(warning, "Forced convergence of '%s' using document from node %u", + _updateCmd->getDocumentId().toString().c_str(), _fast_path_repair_source_node); } } } diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h index 44c463666fd..f9e32c31a9c 100644 --- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h @@ -138,6 +138,7 @@ private: mbus::TraceNode _trace; document::BucketId _updateDocBucketId; std::vector<std::pair<document::BucketId, uint16_t>> _replicas_at_get_send_time; + uint16_t _fast_path_repair_source_node; bool _replySent; }; |