summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@oath.com>2018-05-29 09:40:20 +0000
committerTor Egge <Tor.Egge@oath.com>2018-05-29 11:05:35 +0000
commit302146b7ca558f99b5d990abc75c2d0c2ebeeef3 (patch)
tree7793bcec9bf4f9c5256e6f271a85cffc8ca7e9c0
parent6cd4e8945facda874bf1ada7ea8694c2c633f9da (diff)
Perform fixup on put and update operations using non-current document type repo.
This ensures that document type used do access fields is the same for live feeding and replay.
-rw-r--r--searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp50
-rw-r--r--searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.cpp9
-rw-r--r--searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.h1
-rw-r--r--searchcore/src/vespa/searchcore/proton/feedoperation/updateoperation.cpp40
-rw-r--r--searchcore/src/vespa/searchcore/proton/feedoperation/updateoperation.h8
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp32
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/feedhandler.h8
7 files changed, 113 insertions, 35 deletions
diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
index 5c0edce0b94..153b1ae2867 100644
--- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
+++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp
@@ -2,6 +2,7 @@
#include <vespa/persistence/spi/result.h>
#include <vespa/document/update/assignvalueupdate.h>
+#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/searchcore/proton/bucketdb/bucketdbhandler.h>
#include <vespa/searchcore/proton/test/bucketfactory.h>
#include <vespa/searchcore/proton/common/feedtoken.h>
@@ -35,6 +36,7 @@ LOG_SETUP("feedhandler_test");
using document::BucketId;
using document::Document;
using document::DocumentId;
+using document::DocumentType;
using document::DocumentTypeRepo;
using document::DocumentUpdate;
using document::GlobalId;
@@ -181,7 +183,9 @@ struct MyFeedView : public test::DummyFeedView {
int prune_removed_count;
int update_count;
SerialNum update_serial;
- MyFeedView(const std::shared_ptr<const DocumentTypeRepo> &dtr);
+ const DocumentType *documentType;
+ MyFeedView(const std::shared_ptr<const DocumentTypeRepo> &dtr,
+ const DocTypeName &docTypeName);
~MyFeedView() override;
void resetPutLatch(uint32_t count) { putLatch.reset(new vespalib::CountDownLatch(count)); }
void preparePut(PutOperation &op) override {
@@ -203,6 +207,8 @@ struct MyFeedView : public test::DummyFeedView {
if (usePutRdz) {
putRdz.run();
}
+ EXPECT_EQUAL(_docTypeRepo.get(), putOp.getDocument()->getRepo());
+ EXPECT_EQUAL(documentType, &putOp.getDocument()->getType());
++put_count;
put_serial = putOp.getSerialNum();
metaStore.allocate(putOp.getDocument()->getId().getGlobalId());
@@ -216,6 +222,7 @@ struct MyFeedView : public test::DummyFeedView {
void handleUpdate(FeedToken token, const UpdateOperation &op) override {
(void) token;
+ EXPECT_EQUAL(documentType, &op.getUpdate()->getType());
++update_count;
update_serial = op.getSerialNum();
}
@@ -237,7 +244,7 @@ struct MyFeedView : public test::DummyFeedView {
}
};
-MyFeedView::MyFeedView(const std::shared_ptr<const DocumentTypeRepo> &dtr)
+MyFeedView::MyFeedView(const std::shared_ptr<const DocumentTypeRepo> &dtr, const DocTypeName &docTypeName)
: test::DummyFeedView(dtr),
putRdz(),
usePutRdz(false),
@@ -250,7 +257,8 @@ MyFeedView::MyFeedView(const std::shared_ptr<const DocumentTypeRepo> &dtr)
move_count(0),
prune_removed_count(0),
update_count(0),
- update_serial(0)
+ update_serial(0),
+ documentType(dtr->getDocumentType(docTypeName.getName()))
{}
MyFeedView::~MyFeedView() {}
@@ -294,6 +302,13 @@ struct DocumentContext {
}
};
+struct TwoFieldsSchemaContext : public SchemaContext {
+ TwoFieldsSchemaContext()
+ : SchemaContext()
+ {
+ addField("i2");
+ }
+};
struct UpdateContext {
DocumentUpdate::SP update;
@@ -433,7 +448,7 @@ struct FeedHandlerFixture
owner(),
_state(),
replayConfig(),
- feedView(schema.getRepo()),
+ feedView(schema.getRepo(), schema.getDocType()),
_bucketDB(),
_bucketDBHandler(_bucketDB),
handler(writeService, tlsSpec, schema.getDocType(), _state, owner,
@@ -714,15 +729,13 @@ TEST_F("require that update with same document type repo is ok", FeedHandlerFixt
TEST_F("require that update with different document type repo can be ok", FeedHandlerFixture)
{
- SchemaContext schema;
- schema.addField("i2");
+ TwoFieldsSchemaContext schema;
checkUpdate(f, schema, "i1", false, true);
}
TEST_F("require that update with different document type repo can be rejected", FeedHandlerFixture)
{
- SchemaContext schema;
- schema.addField("i2");
+ TwoFieldsSchemaContext schema;
checkUpdate(f, schema, "i2", true, true);
}
@@ -733,18 +746,31 @@ TEST_F("require that update with same document type repo is ok, fallback to crea
TEST_F("require that update with different document type repo can be ok, fallback to create document", FeedHandlerFixture)
{
- SchemaContext schema;
- schema.addField("i2");
+ TwoFieldsSchemaContext schema;
checkUpdate(f, schema, "i1", false, false);
}
TEST_F("require that update with different document type repo can be rejected, preventing fallback to create document", FeedHandlerFixture)
{
- SchemaContext schema;
- schema.addField("i2");
+ TwoFieldsSchemaContext schema;
checkUpdate(f, schema, "i2", true, false);
}
+TEST_F("require that put with different document type repo is ok", FeedHandlerFixture)
+{
+ TwoFieldsSchemaContext schema;
+ DocumentContext doc_context("doc:test:foo", *schema.builder);
+ auto op = std::make_unique<PutOperation>(doc_context.bucketId,
+ Timestamp(10), doc_context.doc);
+ FeedTokenContext token_context;
+ EXPECT_EQUAL(schema.getRepo().get(), op->getDocument()->getRepo());
+ EXPECT_NOT_EQUAL(f.schema.getRepo().get(), op->getDocument()->getRepo());
+ EXPECT_NOT_EQUAL(f.feedView.documentType, &op->getDocument()->getType());
+ f.handler.performOperation(std::move(token_context.token), std::move(op));
+ EXPECT_EQUAL(1, f.feedView.put_count);
+ EXPECT_EQUAL(1, f.tls_writer.store_count);
+}
+
} // namespace
TEST_MAIN()
diff --git a/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.cpp b/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.cpp
index a6d34b696ca..efac7297c67 100644
--- a/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.cpp
+++ b/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.cpp
@@ -49,6 +49,15 @@ PutOperation::deserialize(vespalib::nbostream &is,
_serializedDocSize = oldSize - is.size();
}
+void
+PutOperation::deserializeDocument(const DocumentTypeRepo &repo)
+{
+ vespalib::nbostream stream;
+ _doc->serialize(stream);
+ auto fixedDoc = std::make_shared<Document>(repo, stream);
+ _doc = std::move(fixedDoc);
+}
+
vespalib::string
PutOperation::toString() const
{
diff --git a/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.h b/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.h
index 7e9dee2cbc0..33330692fab 100644
--- a/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.h
+++ b/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.h
@@ -21,6 +21,7 @@ public:
virtual void serialize(vespalib::nbostream &os) const override;
virtual void deserialize(vespalib::nbostream &is,
const document::DocumentTypeRepo &repo) override;
+ void deserializeDocument(const document::DocumentTypeRepo &repo);
virtual vespalib::string toString() const override;
};
diff --git a/searchcore/src/vespa/searchcore/proton/feedoperation/updateoperation.cpp b/searchcore/src/vespa/searchcore/proton/feedoperation/updateoperation.cpp
index fa5bbee8e6d..beaf719dc5c 100644
--- a/searchcore/src/vespa/searchcore/proton/feedoperation/updateoperation.cpp
+++ b/searchcore/src/vespa/searchcore/proton/feedoperation/updateoperation.cpp
@@ -8,6 +8,7 @@ LOG_SETUP(".proton.feedoperation.updateoperation");
using document::BucketId;
+using document::DocumentType;
using document::DocumentTypeRepo;
using document::DocumentUpdate;
using storage::spi::Timestamp;
@@ -46,12 +47,9 @@ UpdateOperation::UpdateOperation(const BucketId &bucketId,
{
}
-
void
-UpdateOperation::serialize(vespalib::nbostream &os) const
+UpdateOperation::serializeUpdate(vespalib::nbostream &os) const
{
- assertValidBucketId(_upd->getId());
- DocumentOperation::serialize(os);
if (getType() == FeedOperation::UPDATE_42) {
_upd->serialize42(os);
} else {
@@ -59,19 +57,33 @@ UpdateOperation::serialize(vespalib::nbostream &os) const
}
}
+void
+UpdateOperation::deserializeUpdate(vespalib::nbostream &is, const document::DocumentTypeRepo &repo)
+{
+ document::ByteBuffer buf(is.peek(), is.size());
+ using Version = DocumentUpdate::SerializeVersion;
+ Version version = ((getType() == FeedOperation::UPDATE_42) ? Version::SERIALIZE_42 : Version::SERIALIZE_HEAD);
+ DocumentUpdate::SP update(std::make_shared<DocumentUpdate>(repo, buf, version));
+ is.adjustReadPos(buf.getPos());
+ _upd = std::move(update);
+}
+
+void
+UpdateOperation::serialize(vespalib::nbostream &os) const
+{
+ assertValidBucketId(_upd->getId());
+ DocumentOperation::serialize(os);
+ serializeUpdate(os);
+}
+
void
UpdateOperation::deserialize(vespalib::nbostream &is,
const DocumentTypeRepo &repo)
{
DocumentOperation::deserialize(is, repo);
- document::ByteBuffer buf(is.peek(), is.size());
- using Version = DocumentUpdate::SerializeVersion;
- Version version = ((getType() == FeedOperation::UPDATE_42) ? Version::SERIALIZE_42 : Version::SERIALIZE_HEAD);
try {
- DocumentUpdate::SP update(std::make_shared<DocumentUpdate>(repo, buf, version));
- is.adjustReadPos(buf.getPos());
- _upd = update;
+ deserializeUpdate(is, repo);
} catch (document::DocumentTypeNotFoundException &e) {
LOG(warning, "Failed deserialize update operation using unknown document type '%s'",
e.getDocumentTypeName().c_str());
@@ -80,6 +92,14 @@ UpdateOperation::deserialize(vespalib::nbostream &is,
}
}
+void
+UpdateOperation::deserializeUpdate(const DocumentTypeRepo &repo)
+{
+ vespalib::nbostream stream;
+ serializeUpdate(stream);
+ deserializeUpdate(stream, repo);
+}
+
vespalib::string UpdateOperation::toString() const {
return make_string("%s(%s, %s)",
((getType() == FeedOperation::UPDATE_42) ? "Update42" : "Update"),
diff --git a/searchcore/src/vespa/searchcore/proton/feedoperation/updateoperation.h b/searchcore/src/vespa/searchcore/proton/feedoperation/updateoperation.h
index 6e061f79f30..7886231af82 100644
--- a/searchcore/src/vespa/searchcore/proton/feedoperation/updateoperation.h
+++ b/searchcore/src/vespa/searchcore/proton/feedoperation/updateoperation.h
@@ -3,7 +3,10 @@
#include "documentoperation.h"
-namespace document { class DocumentUpdate; }
+namespace document {
+class DocumentTypeRepo;
+class DocumentUpdate;
+}
namespace proton {
@@ -16,6 +19,8 @@ private:
const document::BucketId &bucketId,
const storage::spi::Timestamp &timestamp,
const DocumentUpdateSP &upd);
+ void serializeUpdate(vespalib::nbostream &os) const;
+ void deserializeUpdate(vespalib::nbostream &is, const document::DocumentTypeRepo &repo);
public:
UpdateOperation();
UpdateOperation(Type type);
@@ -26,6 +31,7 @@ public:
const DocumentUpdateSP &getUpdate() const { return _upd; }
void serialize(vespalib::nbostream &os) const override;
void deserialize(vespalib::nbostream &is, const document::DocumentTypeRepo &repo) override;
+ void deserializeUpdate(const document::DocumentTypeRepo &repo);
virtual vespalib::string toString() const override;
static UpdateOperation makeOldUpdate(const document::BucketId &bucketId,
const storage::spi::Timestamp &timestamp,
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
index ad5372b4af6..897e58139c1 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp
@@ -96,6 +96,9 @@ void FeedHandler::performPut(FeedToken token, PutOperation &op) {
}
return;
}
+ if (_repo != op.getDocument()->getRepo()) {
+ op.deserializeDocument(*_repo);
+ }
storeOperation(op, token);
if (token) {
token->setResult(make_unique<Result>(), false);
@@ -344,6 +347,8 @@ FeedHandler::FeedHandler(IThreadingService &writeService,
_feedLock(),
_feedState(make_shared<InitState>(getDocTypeName())),
_activeFeedView(nullptr),
+ _repo(nullptr),
+ _documentType(nullptr),
_bucketDBHandler(nullptr),
_syncLock(),
_syncedSerialNum(0),
@@ -408,6 +413,14 @@ void FeedHandler::changeToNormalFeedState() {
changeFeedState(make_shared<NormalState>(*this));
}
+void
+FeedHandler::setActiveFeedView(IFeedView *feedView)
+{
+ _activeFeedView = feedView;
+ _repo = feedView->getDocumentTypeRepo().get();
+ _documentType = _repo->getDocumentType(_docTypeName.getName());
+}
+
bool
FeedHandler::isDoingReplay() const {
return _tlsMgr.isDoingReplay();
@@ -492,22 +505,17 @@ FeedHandler::considerWriteOperationForRejection(FeedToken & token, const FeedOpe
}
bool
-FeedHandler::considerUpdateOperationForRejection(FeedToken &token, const UpdateOperation &op)
+FeedHandler::considerUpdateOperationForRejection(FeedToken &token, UpdateOperation &op)
{
- const auto *repo = _activeFeedView->getDocumentTypeRepo().get();
const auto &update = *op.getUpdate();
/*
* Check if document types are equal. DocumentTypeRepoFactory::make returns
* the same document type repo if document type configs are equal, thus we
* can just perform a cheaper identity check here.
*/
- if (repo->getDocumentType(_docTypeName.getName()) != &update.getType()) {
+ if (_documentType != &update.getType()) {
try {
- vespalib::nbostream stream;
- op.serialize(stream);
- UpdateOperation checkOp(op.getType());
- vespalib::nbostream checkStream(stream.peek(), stream.size());
- checkOp.deserialize(stream, *repo);
+ op.deserializeUpdate(*_repo);
} catch (document::FieldNotFoundException &e) {
if (token) {
auto message = make_string("Update operation rejected for document '%s' of type '%s': 'Field not found'",
@@ -516,6 +524,14 @@ FeedHandler::considerUpdateOperationForRejection(FeedToken &token, const UpdateO
token->fail();
}
return true;
+ } catch (document::DocumentTypeNotFoundException &e) {
+ auto message = make_string("Update operation rejected for document '%s' of type '%s': 'Uknown document type', expected '%s'",
+ update.getId().toString().c_str(),
+ e.getDocumentTypeName().c_str(),
+ _docTypeName.toString().c_str());
+ token->setResult(make_unique<UpdateResult>(Result::TRANSIENT_ERROR, message), false);
+ token->fail();
+ return true;
}
}
return false;
diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
index faf909080d9..34f979b7115 100644
--- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
+++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h
@@ -90,6 +90,8 @@ private:
FeedStateSP _feedState;
// used by master write thread tasks
IFeedView *_activeFeedView;
+ const document::DocumentTypeRepo *_repo;
+ const document::DocumentType *_documentType;
bucketdb::IBucketDBHandler *_bucketDBHandler;
std::mutex _syncLock;
SerialNum _syncedSerialNum;
@@ -102,7 +104,7 @@ private:
void doHandleOperation(FeedToken token, FeedOperationUP op);
bool considerWriteOperationForRejection(FeedToken & token, const FeedOperation &op);
- bool considerUpdateOperationForRejection(FeedToken &token, const UpdateOperation &op);
+ bool considerUpdateOperationForRejection(FeedToken &token, UpdateOperation &op);
/**
* Delayed execution of feed operations against feed view, in
@@ -203,9 +205,7 @@ public:
* Update the active feed view.
* Always called by the master write thread so locking is not needed.
*/
- void setActiveFeedView(IFeedView *feedView) {
- _activeFeedView = feedView;
- }
+ void setActiveFeedView(IFeedView *feedView);
void setBucketDBHandler(bucketdb::IBucketDBHandler *bucketDBHandler) {
_bucketDBHandler = bucketDBHandler;