diff options
26 files changed, 122 insertions, 467 deletions
diff --git a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/PathGroup.java b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/PathGroup.java index 5c11dfc2a55..ced3d201f6f 100644 --- a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/PathGroup.java +++ b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/PathGroup.java @@ -58,13 +58,25 @@ enum PathGroup { "/application/v4/tenant/{tenant}/application/", "/application/v4/tenant/{tenant}/cost", "/application/v4/tenant/{tenant}/cost/{date}", - "/routing/v1/status/tenant/{tenant}/{*}", - "/billing/v1/tenant/{tenant}/{*}"), + "/routing/v1/status/tenant/{tenant}/{*}"), tenantKeys(Matcher.tenant, PathPrefix.api, "/application/v4/tenant/{tenant}/key/"), + + billingToken(Matcher.tenant, + PathPrefix.api, + "/billing/v1/tenant/{tenant}/token"), + + billingInstrument(Matcher.tenant, + PathPrefix.api, + "/billing/v1/tenant/{tenant}/instrument/{*}"), + + billingList(Matcher.tenant, + PathPrefix.api, + "/billing/v1/tenant/{tenant}/billing/{*}"), + applicationKeys(Matcher.tenant, Matcher.application, PathPrefix.api, diff --git a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/Policy.java b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/Policy.java index cfe8d247e54..0afa0668a00 100644 --- a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/Policy.java +++ b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/Policy.java @@ -142,7 +142,32 @@ enum Policy { /** Access to /payment/notification */ paymentProcessor(Privilege.grant(Action.create) .on(PathGroup.paymentProcessor) - .in(SystemName.PublicCd)); + .in(SystemName.PublicCd)), + + /** Read your own instrument information */ + paymentInstrumentRead(Privilege.grant(Action.read) + .on(PathGroup.billingInstrument) + .in(SystemName.PublicCd)), + + /** Ability to update tenant payment instrument */ + paymentInstrumentUpdate(Privilege.grant(Action.update) + .on(PathGroup.billingInstrument) + .in(SystemName.PublicCd)), + + /** Ability to remove your own payment instrument */ + paymentInstrumentDelete(Privilege.grant(Action.delete) + .on(PathGroup.billingInstrument) + .in(SystemName.PublicCd)), + + /** Get the token to view instrument form */ + paymentInstrumentCreate(Privilege.grant(Action.read) + .on(PathGroup.billingToken) + .in(SystemName.PublicCd)), + + /** Read the generated bills */ + billingInformationRead(Privilege.grant(Action.read) + .on(PathGroup.billingList) + .in(SystemName.PublicCd)); private final Set<Privilege> privileges; diff --git a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/RoleDefinition.java b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/RoleDefinition.java index c05936ee593..438e79bcc4f 100644 --- a/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/RoleDefinition.java +++ b/controller-api/src/main/java/com/yahoo/vespa/hosted/controller/api/role/RoleDefinition.java @@ -43,7 +43,10 @@ public enum RoleDefinition { reader(Policy.tenantRead, Policy.applicationRead, Policy.deploymentRead, - Policy.publicRead), + Policy.publicRead, + Policy.paymentInstrumentRead, + Policy.paymentInstrumentDelete, + Policy.billingInformationRead), /** User — the dev.ops. role for normal Vespa tenant users */ developer(Policy.applicationCreate, @@ -52,12 +55,20 @@ public enum RoleDefinition { Policy.applicationOperations, Policy.developmentDeployment, Policy.keyManagement, - Policy.submission), + Policy.submission, + Policy.paymentInstrumentRead, + Policy.paymentInstrumentDelete, + Policy.billingInformationRead), /** Admin — the administrative function for user management etc. */ administrator(Policy.tenantUpdate, Policy.tenantManager, - Policy.applicationManager), + Policy.applicationManager, + Policy.paymentInstrumentRead, + Policy.paymentInstrumentUpdate, + Policy.paymentInstrumentDelete, + Policy.paymentInstrumentCreate, + Policy.billingInformationRead), /** Headless — the application specific role identified by deployment keys for production */ headless(Policy.submission), diff --git a/controller-api/src/test/java/com/yahoo/vespa/hosted/controller/api/role/RoleTest.java b/controller-api/src/test/java/com/yahoo/vespa/hosted/controller/api/role/RoleTest.java index 57b4af9d16c..2da93c5ceca 100644 --- a/controller-api/src/test/java/com/yahoo/vespa/hosted/controller/api/role/RoleTest.java +++ b/controller-api/src/test/java/com/yahoo/vespa/hosted/controller/api/role/RoleTest.java @@ -6,8 +6,10 @@ import com.yahoo.config.provision.SystemName; import com.yahoo.config.provision.TenantName; import org.junit.Test; +import java.awt.event.AdjustmentEvent; import java.net.URI; import java.util.List; +import java.util.stream.Stream; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -19,6 +21,7 @@ public class RoleTest { private static final Enforcer mainEnforcer = new Enforcer(SystemName.main); private static final Enforcer publicEnforcer = new Enforcer(SystemName.Public); + private static final Enforcer publicCdEnforcer = new Enforcer(SystemName.PublicCd); @Test public void operator_membership() { @@ -143,4 +146,46 @@ public class RoleTest { } } + @Test + public void payment_instrument() { + URI paymentInstrumentUri = URI.create("/billing/v1/tenant/t1/instrument/foobar"); + URI tenantPaymentInstrumentUri = URI.create("/billing/v1/tenant/t1/instrument"); + URI tokenUri = URI.create("/billing/v1/tenant/t1/token"); + + Role user = Role.reader(TenantName.from("t1")); + assertTrue(publicCdEnforcer.allows(user, Action.read, paymentInstrumentUri)); + assertTrue(publicCdEnforcer.allows(user, Action.delete, paymentInstrumentUri)); + assertFalse(publicCdEnforcer.allows(user, Action.update, tenantPaymentInstrumentUri)); + assertFalse(publicCdEnforcer.allows(user, Action.read, tokenUri)); + + Role developer = Role.developer(TenantName.from("t1")); + assertTrue(publicCdEnforcer.allows(developer, Action.read, paymentInstrumentUri)); + assertTrue(publicCdEnforcer.allows(developer, Action.delete, paymentInstrumentUri)); + assertFalse(publicCdEnforcer.allows(developer, Action.update, tenantPaymentInstrumentUri)); + assertFalse(publicCdEnforcer.allows(developer, Action.read, tokenUri)); + + Role admin = Role.administrator(TenantName.from("t1")); + assertTrue(publicCdEnforcer.allows(admin, Action.read, paymentInstrumentUri)); + assertTrue(publicCdEnforcer.allows(admin, Action.delete, paymentInstrumentUri)); + assertTrue(publicCdEnforcer.allows(admin, Action.update, tenantPaymentInstrumentUri)); + assertTrue(publicCdEnforcer.allows(admin, Action.read, tokenUri)); + } + + @Test + public void billing() { + URI billing = URI.create("/billing/v1/tenant/t1/billing"); + + Role user = Role.reader(TenantName.from("t1")); + Role developer = Role.developer(TenantName.from("t1")); + Role admin = Role.administrator(TenantName.from("t1")); + + Stream.of(user, developer, admin).forEach(role -> { + assertTrue(publicCdEnforcer.allows(role, Action.read, billing)); + assertFalse(publicCdEnforcer.allows(role, Action.update, billing)); + assertFalse(publicCdEnforcer.allows(role, Action.delete, billing)); + assertFalse(publicCdEnforcer.allows(role, Action.create, billing)); + }); + + } + } diff --git a/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp b/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp index 5c8b5b029d2..298e29cbecb 100644 --- a/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp +++ b/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp @@ -300,8 +300,6 @@ feedDocs(PersistenceProvider& spi, EXPECT_TRUE(!result.hasError()); docs.push_back(DocAndTimestamp(doc, Timestamp(1000 + i))); } - spi.flush(bucket, context); - EXPECT_EQ(Result(), Result(spi.flush(bucket, context))); return docs; } @@ -354,8 +352,6 @@ TEST_F(ConformanceTest, testBasics) Result(), Result(spi->remove(bucket, Timestamp(3), doc1->getId(), context))); - EXPECT_EQ(Result(), Result(spi->flush(bucket, context))); - // Iterate first without removes, then with. for (int iterPass = 0; iterPass < 2; ++iterPass) { bool includeRemoves = (iterPass == 1); @@ -441,11 +437,8 @@ TEST_F(ConformanceTest, testListBuckets) spi->createBucket(bucket3, context); spi->put(bucket1, Timestamp(1), doc1, context); - spi->flush(bucket1, context); spi->put(bucket2, Timestamp(2), doc2, context); - spi->flush(bucket2, context); spi->put(bucket3, Timestamp(3), doc3, context); - spi->flush(bucket3, context); { BucketIdListResult result = spi->listBuckets(makeBucketSpace(), PartitionId(1)); @@ -482,7 +475,6 @@ TEST_F(ConformanceTest, testBucketInfo) spi->put(bucket, Timestamp(2), doc2, context); const BucketInfo info1 = spi->getBucketInfo(bucket).getBucketInfo(); - spi->flush(bucket, context); { EXPECT_EQ(1, (int)info1.getDocumentCount()); @@ -492,7 +484,6 @@ TEST_F(ConformanceTest, testBucketInfo) spi->put(bucket, Timestamp(3), doc1, context); const BucketInfo info2 = spi->getBucketInfo(bucket).getBucketInfo(); - spi->flush(bucket, context); { EXPECT_EQ(2, (int)info2.getDocumentCount()); @@ -503,7 +494,6 @@ TEST_F(ConformanceTest, testBucketInfo) spi->put(bucket, Timestamp(4), doc1, context); const BucketInfo info3 = spi->getBucketInfo(bucket).getBucketInfo(); - spi->flush(bucket, context); { EXPECT_EQ(2, (int)info3.getDocumentCount()); @@ -514,7 +504,6 @@ TEST_F(ConformanceTest, testBucketInfo) spi->remove(bucket, Timestamp(5), doc1->getId(), context); const BucketInfo info4 = spi->getBucketInfo(bucket).getBucketInfo(); - spi->flush(bucket, context); { EXPECT_EQ(1, (int)info4.getDocumentCount()); @@ -544,7 +533,6 @@ TEST_F(ConformanceTest, testOrderIndependentBucketInfo) { spi->put(bucket, Timestamp(2), doc1, context); spi->put(bucket, Timestamp(3), doc2, context); - spi->flush(bucket, context); const BucketInfo info(spi->getBucketInfo(bucket).getBucketInfo()); checksumOrdered = info.getChecksum(); @@ -563,7 +551,6 @@ TEST_F(ConformanceTest, testOrderIndependentBucketInfo) // Swap order of puts spi->put(bucket, Timestamp(3), doc2, context); spi->put(bucket, Timestamp(2), doc1, context); - spi->flush(bucket, context); const BucketInfo info(spi->getBucketInfo(bucket).getBucketInfo()); checksumUnordered = info.getChecksum(); @@ -589,7 +576,6 @@ TEST_F(ConformanceTest, testPut) { const BucketInfo info = spi->getBucketInfo(bucket).getBucketInfo(); - spi->flush(bucket, context); EXPECT_EQ(1, (int)info.getDocumentCount()); EXPECT_TRUE(info.getEntryCount() >= info.getDocumentCount()); @@ -615,7 +601,6 @@ TEST_F(ConformanceTest, testPutNewDocumentVersion) Result result = spi->put(bucket, Timestamp(3), doc1, context); { const BucketInfo info = spi->getBucketInfo(bucket).getBucketInfo(); - spi->flush(bucket, context); EXPECT_EQ(1, (int)info.getDocumentCount()); EXPECT_TRUE(info.getEntryCount() >= info.getDocumentCount()); @@ -627,7 +612,6 @@ TEST_F(ConformanceTest, testPutNewDocumentVersion) result = spi->put(bucket, Timestamp(4), doc2, context); { const BucketInfo info = spi->getBucketInfo(bucket).getBucketInfo(); - spi->flush(bucket, context); EXPECT_EQ(1, (int)info.getDocumentCount()); EXPECT_TRUE(info.getEntryCount() >= info.getDocumentCount()); @@ -666,7 +650,6 @@ TEST_F(ConformanceTest, testPutOlderDocumentVersion) Result result = spi->put(bucket, Timestamp(5), doc1, context); const BucketInfo info1 = spi->getBucketInfo(bucket).getBucketInfo(); - spi->flush(bucket, context); { EXPECT_EQ(1, (int)info1.getDocumentCount()); EXPECT_TRUE(info1.getEntryCount() >= info1.getDocumentCount()); @@ -678,7 +661,6 @@ TEST_F(ConformanceTest, testPutOlderDocumentVersion) result = spi->put(bucket, Timestamp(4), doc2, context); { const BucketInfo info2 = spi->getBucketInfo(bucket).getBucketInfo(); - spi->flush(bucket, context); EXPECT_EQ(1, (int)info2.getDocumentCount()); EXPECT_TRUE(info2.getEntryCount() >= info1.getDocumentCount()); @@ -712,7 +694,6 @@ TEST_F(ConformanceTest, testPutDuplicate) BucketChecksum checksum; { const BucketInfo info = spi->getBucketInfo(bucket).getBucketInfo(); - spi->flush(bucket, context); EXPECT_EQ(1, (int)info.getDocumentCount()); checksum = info.getChecksum(); } @@ -721,7 +702,6 @@ TEST_F(ConformanceTest, testPutDuplicate) { const BucketInfo info = spi->getBucketInfo(bucket).getBucketInfo(); - spi->flush(bucket, context); EXPECT_EQ(1, (int)info.getDocumentCount()); EXPECT_EQ(checksum, info.getChecksum()); } @@ -746,7 +726,6 @@ TEST_F(ConformanceTest, testRemove) { const BucketInfo info = spi->getBucketInfo(bucket).getBucketInfo(); - spi->flush(bucket, context); EXPECT_EQ(1, (int)info.getDocumentCount()); EXPECT_TRUE(info.getChecksum() != 0); @@ -764,7 +743,6 @@ TEST_F(ConformanceTest, testRemove) { const BucketInfo info = spi->getBucketInfo(bucket).getBucketInfo(); - spi->flush(bucket, context); EXPECT_EQ(0, (int)info.getDocumentCount()); EXPECT_EQ(0, (int)info.getChecksum()); @@ -791,7 +769,6 @@ TEST_F(ConformanceTest, testRemove) context); { const BucketInfo info = spi->getBucketInfo(bucket).getBucketInfo(); - spi->flush(bucket, context); EXPECT_EQ(0, (int)info.getDocumentCount()); EXPECT_EQ(0, (int)info.getChecksum()); @@ -799,7 +776,6 @@ TEST_F(ConformanceTest, testRemove) } Result result4 = spi->put(bucket, Timestamp(9), doc1, context); - spi->flush(bucket, context); EXPECT_TRUE(!result4.hasError()); @@ -809,7 +785,6 @@ TEST_F(ConformanceTest, testRemove) context); { const BucketInfo info = spi->getBucketInfo(bucket).getBucketInfo(); - spi->flush(bucket, context); EXPECT_EQ(0, (int)info.getDocumentCount()); EXPECT_EQ(0, (int)info.getChecksum()); @@ -847,7 +822,6 @@ TEST_F(ConformanceTest, testRemoveMerge) Timestamp(10), removeId, context); - spi->flush(bucket, context); EXPECT_EQ(Result::ErrorType::NONE, removeResult.getErrorCode()); EXPECT_EQ(false, removeResult.wasFound()); } @@ -875,7 +849,6 @@ TEST_F(ConformanceTest, testRemoveMerge) Timestamp(11), removeId, context); - spi->flush(bucket, context); EXPECT_EQ(Result::ErrorType::NONE, removeResult.getErrorCode()); EXPECT_EQ(false, removeResult.wasFound()); } @@ -903,7 +876,6 @@ TEST_F(ConformanceTest, testRemoveMerge) Timestamp(7), removeId, context); - spi->flush(bucket, context); EXPECT_EQ(Result::ErrorType::NONE, removeResult.getErrorCode()); EXPECT_EQ(false, removeResult.wasFound()); } @@ -946,7 +918,6 @@ TEST_F(ConformanceTest, testUpdate) { UpdateResult result = spi->update(bucket, Timestamp(3), update, context); - spi->flush(bucket, context); EXPECT_EQ(Result(), Result(result)); EXPECT_EQ(Timestamp(0), result.getExistingTimestamp()); } @@ -955,7 +926,6 @@ TEST_F(ConformanceTest, testUpdate) { UpdateResult result = spi->update(bucket, Timestamp(4), update, context); - spi->flush(bucket, context); EXPECT_EQ(Result::ErrorType::NONE, result.getErrorCode()); EXPECT_EQ(Timestamp(3), result.getExistingTimestamp()); @@ -975,7 +945,6 @@ TEST_F(ConformanceTest, testUpdate) } spi->remove(bucket, Timestamp(5), doc1->getId(), context); - spi->flush(bucket, context); { GetResult result = spi->get(bucket, @@ -991,7 +960,6 @@ TEST_F(ConformanceTest, testUpdate) { UpdateResult result = spi->update(bucket, Timestamp(6), update, context); - spi->flush(bucket, context); EXPECT_EQ(Result::ErrorType::NONE, result.getErrorCode()); EXPECT_EQ(Timestamp(0), result.getExistingTimestamp()); @@ -1009,7 +977,6 @@ TEST_F(ConformanceTest, testUpdate) // Document does not exist (and therefore its condition cannot match by definition), // but since CreateIfNonExistent is set it should be auto-created anyway. UpdateResult result = spi->update(bucket, Timestamp(7), update, context); - spi->flush(bucket, context); EXPECT_EQ(Result::ErrorType::NONE, result.getErrorCode()); EXPECT_EQ(Timestamp(7), result.getExistingTimestamp()); } @@ -1044,7 +1011,6 @@ TEST_F(ConformanceTest, testGet) } spi->put(bucket, Timestamp(3), doc1, context); - spi->flush(bucket, context); { GetResult result = spi->get(bucket, document::AllFields(), @@ -1054,7 +1020,6 @@ TEST_F(ConformanceTest, testGet) } spi->remove(bucket, Timestamp(4), doc1->getId(), context); - spi->flush(bucket, context); { GetResult result = spi->get(bucket, document::AllFields(), @@ -1168,7 +1133,6 @@ TEST_F(ConformanceTest, testIterateAllDocsNewestVersionOnly) spi->put(b, newTimestamp, newDoc, context); newDocs.push_back(DocAndTimestamp(newDoc, newTimestamp)); } - spi->flush(b, context); CreateIteratorResult iter(createIterator(*spi, b, createSelection(""))); @@ -1249,7 +1213,6 @@ TEST_F(ConformanceTest, testIterateMatchTimestampRange) DocAndTimestamp(doc, Timestamp(1000 + i))); } } - spi->flush(b, context); Selection sel = Selection(DocumentSelection("")); sel.setFromTimestamp(fromTimestamp); @@ -1295,7 +1258,6 @@ TEST_F(ConformanceTest, testIterateExplicitTimestampSubset) Timestamp(2000), docsToVisit.front().doc->getId(), context) .wasFound()); - spi->flush(b, context); timestampsToVisit.push_back(Timestamp(2000)); removes.insert(docsToVisit.front().doc->getId().toString()); @@ -1339,7 +1301,6 @@ TEST_F(ConformanceTest, testIterateRemoves) nonRemovedDocs.push_back(docs[i]); } } - spi->flush(b, context); // First, test iteration without removes { @@ -1387,7 +1348,6 @@ TEST_F(ConformanceTest, testIterateMatchSelection) DocAndTimestamp(doc, Timestamp(1000 + i))); } } - spi->flush(b, context); CreateIteratorResult iter( createIterator(*spi, @@ -1416,7 +1376,6 @@ TEST_F(ConformanceTest, testIterationRequiringDocumentIdOnlyMatching) // remove entry for it regardless. EXPECT_TRUE( !spi->remove(b, Timestamp(2000), removedId, context).wasFound()); - spi->flush(b, context); Selection sel(createSelection("id == '" + removedId.toString() + "'")); @@ -1530,7 +1489,6 @@ TEST_F(ConformanceTest, testDeleteBucket) spi->createBucket(bucket, context); spi->put(bucket, Timestamp(3), doc1, context); - spi->flush(bucket, context); spi->deleteBucket(bucket, context); testDeleteBucketPostCondition(spi, bucket, *doc1); @@ -1586,8 +1544,6 @@ TEST_F(ConformanceTest, testSplitNormalCase) spi->put(bucketC, Timestamp(i + 1), doc1, context); } - spi->flush(bucketC, context); - spi->split(bucketC, bucketA, bucketB, context); testSplitNormalCasePostCondition(spi, bucketA, bucketB, bucketC, testDocMan); @@ -1657,28 +1613,23 @@ TEST_F(ConformanceTest, testSplitTargetExists) spi->put(bucketC, Timestamp(i + 1), doc1, context); } - spi->flush(bucketC, context); for (uint32_t i = 10; i < 20; ++i) { Document::SP doc1 = testDocMan.createRandomDocumentAtLocation(0x06, i); spi->put(bucketB, Timestamp(i + 1), doc1, context); } - spi->flush(bucketB, context); EXPECT_TRUE(!spi->getBucketInfo(bucketB).getBucketInfo().isActive()); for (uint32_t i = 10; i < 20; ++i) { Document::SP doc1 = testDocMan.createRandomDocumentAtLocation(0x06, i); spi->put(bucketC, Timestamp(i + 1), doc1, context); } - spi->flush(bucketC, context); for (uint32_t i = 20; i < 25; ++i) { Document::SP doc1 = testDocMan.createRandomDocumentAtLocation(0x06, i); spi->put(bucketB, Timestamp(i + 1), doc1, context); } - spi->flush(bucketB, context); - spi->split(bucketC, bucketA, bucketB, context); testSplitTargetExistsPostCondition(spi, bucketA, bucketB, bucketC, testDocMan); @@ -1745,8 +1696,6 @@ TEST_F(ConformanceTest, testSplitSingleDocumentInSource) Document::SP doc = testDocMan.createRandomDocumentAtLocation(0x06, 0); spi->put(source, Timestamp(1), doc, context); - spi->flush(source, context); - spi->split(source, target1, target2, context); testSplitSingleDocumentInSourcePostCondition( spi, source, target1, target2, testDocMan); @@ -1804,7 +1753,6 @@ ConformanceTest::createAndPopulateJoinSourceBuckets( source1.getBucketId().getId(), i)); spi.put(source1, Timestamp(i + 1), doc, context); } - spi.flush(source1, context); for (uint32_t i = 10; i < 20; ++i) { Document::SP doc( @@ -1812,7 +1760,6 @@ ConformanceTest::createAndPopulateJoinSourceBuckets( source2.getBucketId().getId(), i)); spi.put(source2, Timestamp(i + 1), doc, context); } - spi.flush(source2, context); } void @@ -1912,21 +1859,17 @@ TEST_F(ConformanceTest, testJoinTargetExists) spi->put(bucketA, Timestamp(i + 1), doc1, context); } - spi->flush(bucketA, context); for (uint32_t i = 10; i < 20; ++i) { Document::SP doc1 = testDocMan.createRandomDocumentAtLocation(0x06, i); spi->put(bucketB, Timestamp(i + 1), doc1, context); } - spi->flush(bucketB, context); for (uint32_t i = 20; i < 30; ++i) { Document::SP doc1 = testDocMan.createRandomDocumentAtLocation(0x06, i); spi->put(bucketC, Timestamp(i + 1), doc1, context); } - spi->flush(bucketC, context); - spi->join(bucketA, bucketB, bucketC, context); testJoinTargetExistsPostCondition(spi, bucketA, bucketB, bucketC, testDocMan); @@ -1991,7 +1934,6 @@ ConformanceTest::populateBucket(const Bucket& b, location, i); spi.put(b, Timestamp(i + 1), doc1, context); } - spi.flush(b, context); } TEST_F(ConformanceTest, testJoinOneBucket) @@ -2150,7 +2092,6 @@ TEST_F(ConformanceTest, testMaintain) spi->createBucket(bucket, context); spi->put(bucket, Timestamp(3), doc1, context); - spi->flush(bucket, context); EXPECT_EQ(Result::ErrorType::NONE, spi->maintain(bucket, LOW).getErrorCode()); @@ -2229,7 +2170,6 @@ TEST_F(SingleDocTypeConformanceTest, testBucketActivationSplitAndJoin) spi->createBucket(bucketC, context); spi->put(bucketC, Timestamp(1), doc1, context); spi->put(bucketC, Timestamp(2), doc2, context); - spi->flush(bucketC, context); spi->setActiveState(bucketC, BucketInfo::ACTIVE); EXPECT_TRUE(spi->getBucketInfo(bucketC).getBucketInfo().isActive()); @@ -2304,14 +2244,11 @@ TEST_F(ConformanceTest, testRemoveEntry) spi->createBucket(bucket, context); spi->put(bucket, Timestamp(3), doc1, context); - spi->flush(bucket, context); BucketInfo info1 = spi->getBucketInfo(bucket).getBucketInfo(); { spi->put(bucket, Timestamp(4), doc2, context); - spi->flush(bucket, context); spi->removeEntry(bucket, Timestamp(4), context); - spi->flush(bucket, context); BucketInfo info2 = spi->getBucketInfo(bucket).getBucketInfo(); EXPECT_EQ(info1, info2); } @@ -2319,9 +2256,7 @@ TEST_F(ConformanceTest, testRemoveEntry) // Test case where there exists a previous version of the document. { spi->put(bucket, Timestamp(5), doc1, context); - spi->flush(bucket, context); spi->removeEntry(bucket, Timestamp(5), context); - spi->flush(bucket, context); BucketInfo info2 = spi->getBucketInfo(bucket).getBucketInfo(); EXPECT_EQ(info1, info2); } @@ -2329,14 +2264,11 @@ TEST_F(ConformanceTest, testRemoveEntry) // Test case where the newest document version after removeEntrying is a remove. { spi->remove(bucket, Timestamp(6), doc1->getId(), context); - spi->flush(bucket, context); BucketInfo info2 = spi->getBucketInfo(bucket).getBucketInfo(); EXPECT_EQ(uint32_t(0), info2.getDocumentCount()); spi->put(bucket, Timestamp(7), doc1, context); - spi->flush(bucket, context); spi->removeEntry(bucket, Timestamp(7), context); - spi->flush(bucket, context); BucketInfo info3 = spi->getBucketInfo(bucket).getBucketInfo(); EXPECT_EQ(info2, info3); } @@ -2395,9 +2327,6 @@ TEST_F(ConformanceTest, testBucketSpaces) spi->put(bucket01, Timestamp(4), doc2, context); spi->put(bucket11, Timestamp(5), doc3, context); spi->put(bucket12, Timestamp(6), doc4, context); - spi->flush(bucket01, context); - spi->flush(bucket11, context); - spi->flush(bucket12, context); // Check bucket lists assertBucketList(*spi, bucketSpace0, partId, { bucketId1 }); assertBucketList(*spi, bucketSpace1, partId, { bucketId1, bucketId2 }); diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h index 557a9ec2edd..cbd76951004 100644 --- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h @@ -41,11 +41,6 @@ public: Result removeEntry(const Bucket&, Timestamp, Context&) override { return Result(); } /** - * Default impl is getBucketInfo(); - */ - Result flush(const Bucket&, Context&) override { return Result(); } - - /** * Default impl is remove(). */ RemoveResult removeIfFound(const Bucket&, Timestamp, const DocumentId&, Context&) override; diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.h b/persistence/src/vespa/persistence/spi/persistenceprovider.h index 857630e2606..c70d5e3f1c3 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.h @@ -202,30 +202,6 @@ struct PersistenceProvider virtual UpdateResult update(const Bucket&, Timestamp timestamp, const DocumentUpdateSP& update, Context&) = 0; /** - * The service layer may choose to batch certain commands. This means that - * the service layer will lock the bucket only once, then perform several - * commands, and finally get the bucket info from the bucket, and then - * flush it. This can be used to improve performance by caching the - * modifications, and persisting them to disk only when flush is called. - * The service layer guarantees that after one of these operations, flush() - * is called, regardless of whether the operation succeeded or not, before - * another bucket is processed in the same worker thead. The following - * operations can be batched and have the guarantees - * above: - * - put - * - get - * - remove (all versions) - * - update - * - revert - * - join - * <p/> - * A provider may of course choose to not sync to disk at flush time either, - * but then data may be more prone to being lost on node issues, and the - * provider must figure out when to flush its cache itself. - */ - virtual Result flush(const Bucket&, Context&) = 0; - - /** * Retrieves the latest version of the document specified by the * document id. If no versions were found, or the document was removed, * the result should be successful, but contain no document (see GetResult). diff --git a/searchcore/src/apps/proton/downpersistence.cpp b/searchcore/src/apps/proton/downpersistence.cpp index 4b911eb6d2b..d4ec9cee395 100644 --- a/searchcore/src/apps/proton/downpersistence.cpp +++ b/searchcore/src/apps/proton/downpersistence.cpp @@ -98,12 +98,6 @@ UpdateResult DownPersistence::update(const Bucket&, Timestamp, errorResult.getErrorMessage()); } -Result -DownPersistence::flush(const Bucket&, Context&) -{ - return errorResult; -} - GetResult DownPersistence::get(const Bucket&, const document::FieldSet&, const DocumentId&, Context&) const diff --git a/searchcore/src/apps/proton/downpersistence.h b/searchcore/src/apps/proton/downpersistence.h index 0a602c4467e..8cdac7aaa1b 100644 --- a/searchcore/src/apps/proton/downpersistence.h +++ b/searchcore/src/apps/proton/downpersistence.h @@ -37,7 +37,6 @@ public: RemoveResult removeIfFound(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&) override; Result removeEntry(const Bucket&, Timestamp, Context&) override; UpdateResult update(const Bucket&, Timestamp timestamp, const DocumentUpdateSP& update, Context&) override; - Result flush(const Bucket&, Context&) override; GetResult get(const Bucket&, const document::FieldSet& fieldSet, const DocumentId& id, Context&) const override; CreateIteratorResult createIterator(const Bucket&, const document::FieldSet& fieldSet, diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp index 05e51993f49..fb80c25bfb7 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp @@ -145,15 +145,6 @@ PersistenceProviderWrapper::get(const spi::Bucket& bucket, return _spi.get(bucket, fieldSet, id, context); } -spi::Result -PersistenceProviderWrapper::flush(const spi::Bucket& bucket, - spi::Context& context) -{ - LOG_SPI("flush(" << bucket << ")"); - CHECK_ERROR(spi::Result, FAIL_FLUSH); - return _spi.flush(bucket, context); -} - spi::CreateIteratorResult PersistenceProviderWrapper::createIterator(const spi::Bucket& bucket, const document::FieldSet& fields, diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h index 511ced02118..9bd3653e8a1 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h @@ -36,7 +36,6 @@ public: FAIL_REPLACE_WITH_REMOVE = 1 << 6, FAIL_UPDATE = 1 << 7, FAIL_REVERT = 1 << 8, - FAIL_FLUSH = 1 << 9, FAIL_CREATE_ITERATOR = 1 << 10, FAIL_ITERATE = 1 << 11, FAIL_DESTROY_ITERATOR = 1 << 12, @@ -44,7 +43,7 @@ public: FAIL_SPLIT = 1 << 14, FAIL_JOIN = 1 << 15, FAIL_CREATE_BUCKET = 1 << 16, - FAIL_BUCKET_PERSISTENCE = FAIL_PUT|FAIL_REMOVE|FAIL_UPDATE|FAIL_REVERT|FAIL_FLUSH, + FAIL_BUCKET_PERSISTENCE = FAIL_PUT|FAIL_REMOVE|FAIL_UPDATE|FAIL_REVERT, FAIL_ALL_OPERATIONS = 0xffff, // TODO: add more as needed }; @@ -55,7 +54,7 @@ private: uint32_t _failureMask; public: PersistenceProviderWrapper(spi::PersistenceProvider& spi); - ~PersistenceProviderWrapper(); + ~PersistenceProviderWrapper() override; /** * Explicitly set result to anything != NONE to have all operations @@ -96,7 +95,6 @@ public: spi::GetResult get(const spi::Bucket&, const document::FieldSet&, const spi::DocumentId&, spi::Context&) const override ; - spi::Result flush(const spi::Bucket&, spi::Context&) override; spi::CreateIteratorResult createIterator(const spi::Bucket&, const document::FieldSet&, const spi::Selection&, spi::IncludedVersions versions, spi::Context&) override; diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index 4576f8a08f8..2cb390fca86 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -698,19 +698,6 @@ TEST_F(FileStorManagerTest, handler_pause) { ASSERT_EQ(30, filestorHandler.getNextMessage(0, stripeId).second->getPriority()); } -namespace { - -uint64_t getPutTime(api::StorageMessage::SP& msg) -{ - if (!msg.get()) { - return (uint64_t)-1; - } - - return static_cast<api::PutCommand*>(msg.get())->getTimestamp(); -}; - -} - TEST_F(FileStorManagerTest, remap_split) { // Setup a filestorthread to test DummyStorageLink top; @@ -768,63 +755,6 @@ TEST_F(FileStorManagerTest, remap_split) { filestorHandler.dumpQueue(0)); } -TEST_F(FileStorManagerTest, handler_multi) { - // Setup a filestorthread to test - DummyStorageLink top; - DummyStorageLink *dummyManager; - top.push_back(std::unique_ptr<StorageLink>( - dummyManager = new DummyStorageLink)); - top.open(); - ForwardingMessageSender messageSender(*dummyManager); - // Since we fake time with small numbers, we need to make sure we dont - // compact them away, as they will seem to be from 1970 - - documentapi::LoadTypeSet loadTypes("raw:"); - FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); - metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1, 1); - - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); - filestorHandler.setGetNextMessageTimeout(50); - uint32_t stripeId = filestorHandler.getNextStripeId(0); - - std::string content("Here is some content which is in all documents"); - - Document::SP doc1(createDocument(content, "id:footype:testdoctype1:n=1234:bar").release()); - - Document::SP doc2(createDocument(content, "id:footype:testdoctype1:n=4567:bar").release()); - - document::BucketIdFactory factory; - document::BucketId bucket1(16, factory.getBucketId(doc1->getId()).getRawId()); - document::BucketId bucket2(16, factory.getBucketId(doc2->getId()).getRawId()); - - // Populate bucket with the given data - for (uint32_t i = 1; i < 10; i++) { - filestorHandler.schedule( - api::StorageMessage::SP(new api::PutCommand(makeDocumentBucket(bucket1), doc1, i)), 0); - filestorHandler.schedule( - api::StorageMessage::SP(new api::PutCommand(makeDocumentBucket(bucket2), doc2, i + 10)), 0); - } - - { - FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0, stripeId); - ASSERT_EQ(1, getPutTime(lock.second)); - - lock = filestorHandler.getNextMessage(0, stripeId, lock); - ASSERT_EQ(2, getPutTime(lock.second)); - - lock = filestorHandler.getNextMessage(0, stripeId, lock); - ASSERT_EQ(3, getPutTime(lock.second)); - } - - { - FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0, stripeId); - ASSERT_EQ(11, getPutTime(lock.second)); - - lock = filestorHandler.getNextMessage(0, stripeId, lock); - ASSERT_EQ(12, getPutTime(lock.second)); - } -} - TEST_F(FileStorManagerTest, handler_timeout) { // Setup a filestorthread to test DummyStorageLink top; diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index 0b2baab5652..7e7a76b95a8 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -626,8 +626,7 @@ MergeHandlerTest::createDummyGetBucketDiff(int timestampOffset, } TEST_F(MergeHandlerTest, spi_flush_guard) { - PersistenceProviderWrapper providerWrapper( - getPersistenceProvider()); + PersistenceProviderWrapper providerWrapper(getPersistenceProvider()); MergeHandler handler(providerWrapper, getEnv()); providerWrapper.setResult( @@ -635,8 +634,7 @@ TEST_F(MergeHandlerTest, spi_flush_guard) { setUpChain(MIDDLE); // Fail applying unrevertable remove - providerWrapper.setFailureMask( - PersistenceProviderWrapper::FAIL_REMOVE); + providerWrapper.setFailureMask(PersistenceProviderWrapper::FAIL_REMOVE); providerWrapper.clearOperationLog(); try { @@ -645,11 +643,6 @@ TEST_F(MergeHandlerTest, spi_flush_guard) { } catch (const std::runtime_error& e) { EXPECT_TRUE(std::string(e.what()).find("Failed remove") != std::string::npos); } - // Test that we always flush after applying diff locally, even when - // errors are encountered. - const std::vector<std::string>& opLog(providerWrapper.getOperationLog()); - ASSERT_FALSE(opLog.empty()); - EXPECT_EQ("flush(Bucket(0x40000000000004d2, partition 0))", opLog.back()); } TEST_F(MergeHandlerTest, bucket_not_found_in_db) { @@ -901,7 +894,6 @@ TEST_F(MergeHandlerTest, apply_bucket_diff_spi_failures) { { PersistenceProviderWrapper::FAIL_ITERATE, "iterate" }, { PersistenceProviderWrapper::FAIL_PUT, "Failed put" }, { PersistenceProviderWrapper::FAIL_REMOVE, "Failed remove" }, - { PersistenceProviderWrapper::FAIL_FLUSH, "Failed flush" }, }; typedef ExpectedExceptionSpec* ExceptionIterator; @@ -1058,7 +1050,6 @@ TEST_F(MergeHandlerTest, apply_bucket_diff_reply_spi_failures) { { PersistenceProviderWrapper::FAIL_ITERATE, "iterate" }, { PersistenceProviderWrapper::FAIL_PUT, "Failed put" }, { PersistenceProviderWrapper::FAIL_REMOVE, "Failed remove" }, - { PersistenceProviderWrapper::FAIL_FLUSH, "Failed flush" }, }; typedef ExpectedExceptionSpec* ExceptionIterator; diff --git a/storage/src/tests/persistence/persistencequeuetest.cpp b/storage/src/tests/persistence/persistencequeuetest.cpp index be276dd7f9d..3754a82e7ae 100644 --- a/storage/src/tests/persistence/persistencequeuetest.cpp +++ b/storage/src/tests/persistence/persistencequeuetest.cpp @@ -168,19 +168,4 @@ TEST_F(PersistenceQueueTest, exclusive_locked_operation_not_started_if_exclusive ASSERT_FALSE(lock1.first.get()); } -TEST_F(PersistenceQueueTest, operation_batching_not_allowed_across_different_lock_modes) { - Fixture f(*this); - - f.filestorHandler->schedule(createPut(1234, 0), _disk); - f.filestorHandler->schedule(createGet(1234), _disk); - - auto lock0 = f.filestorHandler->getNextMessage(_disk, f.stripeId); - ASSERT_TRUE(lock0.first); - ASSERT_TRUE(lock0.second); - EXPECT_EQ(api::LockingRequirements::Exclusive, lock0.first->lockingRequirements()); - - f.filestorHandler->getNextMessage(_disk, f.stripeId, lock0); - ASSERT_FALSE(lock0.second); -} - } // namespace storage diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp index e32fc056413..25c0a36a7f5 100644 --- a/storage/src/tests/persistence/persistencetestutils.cpp +++ b/storage/src/tests/persistence/persistencetestutils.cpp @@ -159,7 +159,6 @@ PersistenceTestUtils::doPutOnDisk( getPersistenceProvider().put(spi::Bucket(b), timestamp, doc, context); - getPersistenceProvider().flush(b, context); return doc; } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp index 543accc80ae..44cd8f0ab0c 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp @@ -60,12 +60,6 @@ FileStorHandler::getNextMessage(uint16_t disk, uint32_t stripeId) return _impl->getNextMessage(disk, stripeId); } -FileStorHandler::LockedMessage & -FileStorHandler::getNextMessage(uint16_t disk, uint32_t stripeId, LockedMessage& lck) -{ - return _impl->getNextMessage(disk, stripeId, lck); -} - FileStorHandler::BucketLockInterface::SP FileStorHandler::lock(const document::Bucket& bucket, uint16_t disk, api::LockingRequirements lockReq) { diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h index db294d7c39f..af0a52b2fa0 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h @@ -121,11 +121,6 @@ public: LockedMessage getNextMessage(uint16_t disk, uint32_t stripeId); /** - * Returns the next message for the same bucket. - */ - LockedMessage & getNextMessage(uint16_t disk, uint32_t stripeId, LockedMessage& lock); - - /** * Lock a bucket. By default, each file stor thread has the locks of all * buckets in their area of responsibility. If they need to access buckets * outside of their area, they can call this to make sure the thread diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index bfd5233d017..5ad37d585ef 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -319,25 +319,6 @@ FileStorHandlerImpl::getNextStripeId(uint32_t disk) { return _diskInfo[disk].getNextStripeId(); } - -FileStorHandler::LockedMessage & -FileStorHandlerImpl::getNextMessage(uint16_t diskId, uint32_t stripeId, FileStorHandler::LockedMessage& lck) -{ - document::Bucket bucket(lck.first->getBucket()); - - LOG(spam, "Disk %d retrieving message for buffered bucket %s", diskId, bucket.getBucketId().toString().c_str()); - - assert(diskId < _diskInfo.size()); - Disk& disk(_diskInfo[diskId]); - - if (disk.isClosed()) { - lck.second.reset(); - return lck; - } - - return disk.getNextMessage(stripeId, lck); -} - bool FileStorHandlerImpl::tryHandlePause(uint16_t disk) const { @@ -977,49 +958,6 @@ FileStorHandlerImpl::Stripe::getNextMessage(uint32_t timeout, Disk & disk) return {}; // No message fetched. } -FileStorHandler::LockedMessage & -FileStorHandlerImpl::Stripe::getNextMessage(FileStorHandler::LockedMessage& lck) -{ - const document::Bucket & bucket = lck.second->getBucket(); - vespalib::MonitorGuard guard(_lock); - BucketIdx& idx = bmi::get<2>(_queue); - std::pair<BucketIdx::iterator, BucketIdx::iterator> range = idx.equal_range(bucket); - - // No more for this bucket. - if (range.first == range.second) { - lck.second.reset(); - return lck; - } - - api::StorageMessage & m(*range.first->_command); - // For now, don't allow batching of operations across lock requirement modes. - // We might relax this requirement later once we're 100% sure it can't trigger - // any unfortunate edge cases. - if (lck.first->lockingRequirements() != m.lockingRequirements()) { - lck.second.reset(); - return lck; - } - - std::chrono::milliseconds waitTime(uint64_t(range.first->_timer.stop(_metrics->averageQueueWaitingTime[m.getLoadType()]))); - - if (!messageTimedOutInQueue(m, waitTime)) { - std::shared_ptr<api::StorageMessage> msg = std::move(range.first->_command); - idx.erase(range.first); - lck.second.swap(msg); - guard.broadcast(); - } else { - std::shared_ptr<api::StorageReply> msgReply = static_cast<api::StorageCommand&>(m).makeReply(); - idx.erase(range.first); - guard.broadcast(); - guard.unlock(); - msgReply->setResult(api::ReturnCode(api::ReturnCode::TIMEOUT, "Message waited too long in storage queue")); - _messageSender.sendReply(msgReply); - - lck.second.reset(); - } - return lck; -} - FileStorHandler::LockedMessage FileStorHandlerImpl::Stripe::getMessage(vespalib::MonitorGuard & guard, PriorityIdx & idx, PriorityIdx::iterator iter) { diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index da4d242a4c9..7a4f9000e82 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -129,7 +129,6 @@ public: void failOperations(const document::Bucket & bucket, const api::ReturnCode & code); FileStorHandler::LockedMessage getNextMessage(uint32_t timeout, Disk & disk); - FileStorHandler::LockedMessage & getNextMessage(FileStorHandler::LockedMessage& lock); void dumpQueue(std::ostream & os) const; void dumpActiveHtml(std::ostream & os) const; void dumpQueueHtml(std::ostream & os) const; @@ -185,9 +184,6 @@ public: FileStorHandler::LockedMessage getNextMessage(uint32_t stripeId, uint32_t timeout) { return _stripes[stripeId].getNextMessage(timeout, *this); } - FileStorHandler::LockedMessage & getNextMessage(uint32_t stripeId, FileStorHandler::LockedMessage & lck) { - return _stripes[stripeId].getNextMessage(lck); - } std::shared_ptr<FileStorHandler::BucketLockInterface> lock(const document::Bucket & bucket, api::LockingRequirements lockReq) { return stripe(bucket).lock(bucket, lockReq); @@ -253,8 +249,6 @@ public: FileStorHandler::LockedMessage getNextMessage(uint16_t disk, uint32_t stripeId); - FileStorHandler::LockedMessage & getNextMessage(uint16_t disk, uint32_t stripeId, FileStorHandler::LockedMessage& lock); - enum Operation { MOVE, SPLIT, JOIN }; void remapQueue(const RemapInfo& source, RemapInfo& target, Operation op); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index ea6f5e03a80..6eab44923f7 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -802,7 +802,7 @@ void FileStorManager::onFlush(bool downwards) LOG(debug, "Flushed _filestorHandler->flush(!downwards);"); for (uint32_t i = 0; i < _disks.size(); ++i) { for (uint32_t j = 0; j < _disks[i].size(); ++j) { - if (_disks[i][j].get() != NULL) { + if (_disks[i][j]) { _disks[i][j]->flush(); LOG(debug, "flushed disk[%d][%d]", i, j); } diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index e5e358cbb60..d47ed28f636 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -91,40 +91,6 @@ public: } }; -class FlushGuard -{ - spi::PersistenceProvider& _spi; - spi::Bucket _bucket; - spi::Context& _context; - bool _hasFlushed; -public: - FlushGuard(spi::PersistenceProvider& spi, - const spi::Bucket& bucket, - spi::Context& context) - : _spi(spi), - _bucket(bucket), - _context(context), - _hasFlushed(false) - {} - ~FlushGuard() - { - if (!_hasFlushed) { - LOG(debug, "Auto-flushing %s", _bucket.toString().c_str()); - spi::Result result =_spi.flush(_bucket, _context); - if (result.hasError()) { - LOG(debug, "Flush %s failed: %s", - _bucket.toString().c_str(), - result.toString().c_str()); - } - } - } - void flush() { - LOG(debug, "Flushing %s", _bucket.toString().c_str()); - _hasFlushed = true; - checkResult(_spi.flush(_bucket, _context), _bucket, "flush"); - } -}; - struct IndirectDocEntryTimestampPredicate { bool operator()(const spi::DocEntry::UP& e1, @@ -607,8 +573,6 @@ MergeHandler::applyDiffLocally( std::vector<spi::DocEntry::UP> entries; populateMetaData(bucket, MAX_TIMESTAMP, entries, context); - FlushGuard flushGuard(_spi, bucket, context); - std::shared_ptr<const document::DocumentTypeRepo> repo(_env._component.getTypeRepo()); assert(repo.get() != nullptr); @@ -702,8 +666,6 @@ MergeHandler::applyDiffLocally( LOG(debug, "Merge(%s): Applied %u entries locally from ApplyBucketDiff.", bucket.toString().c_str(), addedCount); - flushGuard.flush(); - spi::BucketInfoResult infoResult(_spi.getBucketInfo(bucket)); if (infoResult.getErrorCode() != spi::Result::ErrorType::NONE) { LOG(warning, "Failed to get bucket info for %s: %s", diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index e7123164659..aaf62a85e87 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -607,10 +607,6 @@ PersistenceThread::handleJoinBuckets(api::JoinBucketsCommand& cmd, spi::Context if (!checkForError(result, *tracker)) { return tracker; } - result = _spi.flush(spi::Bucket(destBucket, spi::PartitionId(_env._partition)), context); - if (!checkForError(result, *tracker)) { - return tracker; - } uint64_t lastModified = 0; for (uint32_t i = 0; i < cmd.getSourceBuckets().size(); i++) { document::Bucket srcBucket(destBucket.getBucketSpace(), cmd.getSourceBuckets()[i]); @@ -895,114 +891,24 @@ bool hasBucketInfo(const api::StorageMessage& msg) } void -PersistenceThread::flushAllReplies( - const document::Bucket& bucket, - std::vector<std::unique_ptr<MessageTracker> >& replies) -{ - if (replies.empty()) { - return; - } - - try { - if (replies.size() > 1) { - _env._metrics.batchingSize.addValue(replies.size()); - } -#ifdef ENABLE_BUCKET_OPERATION_LOGGING - { - size_t nputs = 0, nremoves = 0, nother = 0; - for (size_t i = 0; i < replies.size(); ++i) { - if (dynamic_cast<api::PutReply*>(replies[i]->getReply().get())) - { - ++nputs; - } else if (dynamic_cast<api::RemoveReply*>( - replies[i]->getReply().get())) - { - ++nremoves; - } else { - ++nother; - } - } - LOG_BUCKET_OPERATION( - bucket.getBucketId(), - vespalib::make_string( - "flushing %zu operations (%zu puts, %zu removes, " - "%zu other)", - replies.size(), nputs, nremoves, nother)); - } -#endif - spi::Bucket b(bucket, spi::PartitionId(_env._partition)); - // Flush is not used for anything currentlu, and the context is not correct either when batching is done - // So just faking it here. - spi::Context dummyContext(documentapi::LoadType::DEFAULT, 0, 0); - spi::Result result = _spi.flush(b, dummyContext); - uint32_t errorCode = _env.convertErrorCode(result); - if (errorCode != 0) { - for (uint32_t i = 0; i < replies.size(); ++i) { - replies[i]->getReply()->setResult(api::ReturnCode((api::ReturnCode::Result)errorCode, result.getErrorMessage())); - } - } - } catch (std::exception& e) { - for (uint32_t i = 0; i < replies.size(); ++i) { - replies[i]->getReply()->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, e.what())); - } - } - - for (uint32_t i = 0; i < replies.size(); ++i) { - LOG(spam, "Sending reply up (batched): %s %" PRIu64, - replies[i]->getReply()->toString().c_str(), replies[i]->getReply()->getMsgId()); - _env._fileStorHandler.sendReply(replies[i]->getReply()); - } - - replies.clear(); -} - -void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock) -{ +PersistenceThread::processLockedMessage(FileStorHandler::LockedMessage & lock) { std::vector<MessageTracker::UP> trackers; document::Bucket bucket = lock.first->getBucket(); - while (lock.second) { - LOG(debug, "Inside while loop %d, nodeIndex %d, ptr=%p", _env._partition, _env._nodeIndex, lock.second.get()); - std::shared_ptr<api::StorageMessage> msg(lock.second); - bool batchable = isBatchable(*msg); - - // If the next operation wasn't batchable, we should flush - // everything that came before. - if (!batchable) { - flushAllReplies(bucket, trackers); - } + LOG(debug, "Partition %d, nodeIndex %d, ptr=%p", _env._partition, _env._nodeIndex, lock.second.get()); + api::StorageMessage & msg(*lock.second); - std::unique_ptr<MessageTracker> tracker = processMessage(*msg); - if (!tracker || !tracker->getReply()) { - // Was a reply - break; - } - - if (hasBucketInfo(*msg)) { + std::unique_ptr<MessageTracker> tracker = processMessage(msg); + if (tracker && tracker->getReply()) { + if (hasBucketInfo(msg)) { if (tracker->getReply()->getResult().success()) { _env.setBucketInfo(*tracker, bucket); } } - if (batchable) { - LOG(spam, "Adding reply %s to batch for bucket %s", - tracker->getReply()->toString().c_str(), bucket.getBucketId().toString().c_str()); - - trackers.push_back(std::move(tracker)); - - if (trackers.back()->getReply()->getResult().success()) { - _env._fileStorHandler.getNextMessage(_env._partition, _stripeId, lock); - } else { - break; - } - } else { - LOG(spam, "Sending reply up: %s %" PRIu64, - tracker->getReply()->toString().c_str(), tracker->getReply()->getMsgId()); - _env._fileStorHandler.sendReply(tracker->getReply()); - break; - } + LOG(spam, "Sending reply up: %s %" PRIu64, + tracker->getReply()->toString().c_str(), tracker->getReply()->getMsgId()); + _env._fileStorHandler.sendReply(tracker->getReply()); } - - flushAllReplies(bucket, trackers); } void @@ -1016,7 +922,7 @@ PersistenceThread::run(framework::ThreadHandle& thread) FileStorHandler::LockedMessage lock(_env._fileStorHandler.getNextMessage(_env._partition, _stripeId)); if (lock.first) { - processMessages(lock); + processLockedMessage(lock); } vespalib::MonitorGuard flushMonitorGuard(_flushMonitor); diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h index e410843c1be..56414835b7b 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.h +++ b/storage/src/vespa/storage/persistence/persistencethread.h @@ -22,7 +22,7 @@ public: PersistenceThread(ServiceLayerComponentRegister&, const config::ConfigUri & configUri, spi::PersistenceProvider& provider, FileStorHandler& filestorHandler, FileStorThreadMetrics& metrics, uint16_t deviceIndex); - ~PersistenceThread(); + ~PersistenceThread() override; /** Waits for current operation to be finished. */ void flush() override; @@ -75,15 +75,13 @@ private: void handleReply(api::StorageReply&); MessageTracker::UP processMessage(api::StorageMessage& msg); - void processMessages(FileStorHandler::LockedMessage & lock); + void processLockedMessage(FileStorHandler::LockedMessage & lock); // Thread main loop void run(framework::ThreadHandle&) override; bool checkForError(const spi::Result& response, MessageTracker& tracker); spi::Bucket getBucket(const DocumentId& id, const document::Bucket &bucket) const; - void flushAllReplies(const document::Bucket& bucket, std::vector<MessageTracker::UP>& trackers); - friend class TestAndSetHelper; bool tasConditionExists(const api::TestAndSetCommand & cmd); bool tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker & tracker, diff --git a/storage/src/vespa/storage/persistence/processallhandler.cpp b/storage/src/vespa/storage/persistence/processallhandler.cpp index 5b94a3da027..f37c6723933 100644 --- a/storage/src/vespa/storage/persistence/processallhandler.cpp +++ b/storage/src/vespa/storage/persistence/processallhandler.cpp @@ -83,9 +83,9 @@ MessageTracker::UP ProcessAllHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, spi::Context& context) { - MessageTracker::UP tracker(new MessageTracker( + auto tracker = std::make_unique<MessageTracker>( _env._metrics.removeLocation[cmd.getLoadType()], - _env._component.getClock())); + _env._component.getClock()); LOG(debug, "RemoveLocation(%s): using selection '%s'", cmd.getBucketId().toString().c_str(), @@ -99,13 +99,8 @@ ProcessAllHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, processor, spi::NEWEST_DOCUMENT_ONLY, context); - spi::Result result = _spi.flush(bucket, context); - uint32_t code = _env.convertErrorCode(result); - if (code == 0) { - tracker->setReply(std::make_shared<api::RemoveLocationReply>(cmd, processor._n_removed)); - } else { - tracker->fail(code, result.getErrorMessage()); - } + + tracker->setReply(std::make_shared<api::RemoveLocationReply>(cmd, processor._n_removed)); return tracker; } @@ -114,9 +109,9 @@ MessageTracker::UP ProcessAllHandler::handleStatBucket(api::StatBucketCommand& cmd, spi::Context& context) { - MessageTracker::UP tracker(new MessageTracker( + auto tracker = std::make_unique<MessageTracker>( _env._metrics.statBucket[cmd.getLoadType()], - _env._component.getClock())); + _env._component.getClock()); std::ostringstream ost; ost << "Persistence bucket " << cmd.getBucketId() diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp index fe947bff4d5..76c420e76e6 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp @@ -118,12 +118,6 @@ ProviderErrorWrapper::get(const spi::Bucket& bucket, return checkResult(_impl.get(bucket, fieldSet, docId, context)); } -spi::Result -ProviderErrorWrapper::flush(const spi::Bucket& bucket, spi::Context& context) -{ - return checkResult(_impl.flush(bucket, context)); -} - spi::CreateIteratorResult ProviderErrorWrapper::createIterator(const spi::Bucket& bucket, const document::FieldSet& fieldSet, diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h index 3b5ace90d13..292eb004223 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h @@ -52,7 +52,6 @@ public: spi::RemoveResult removeIfFound(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override; spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, const spi::DocumentUpdateSP&, spi::Context&) override; spi::GetResult get(const spi::Bucket&, const document::FieldSet&, const document::DocumentId&, spi::Context&) const override; - spi::Result flush(const spi::Bucket&, spi::Context&) override; spi::CreateIteratorResult createIterator(const spi::Bucket&, const document::FieldSet&, const spi::Selection&, spi::IncludedVersions versions, spi::Context&) override; spi::IterateResult iterate(spi::IteratorId, uint64_t maxByteSize, spi::Context&) const override; |