diff options
author | Tor Brede Vekterli <vekterli@oath.com> | 2018-03-20 13:22:39 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-03-20 13:22:39 +0100 |
commit | 76cd17602b493a13d1e993b1f72cfdaf7b627e0b (patch) | |
tree | 2d3bd1ee963de6107b80b183a23d03708764fa1a | |
parent | e31b9616921f9046a1fabbe9fca9cf3a5d535279 (diff) | |
parent | b9c175537bc743a0bf457d579d03b59867d1a1f1 (diff) |
Merge pull request #5378 from vespa-engine/balder/no-more-thread-priorities
Balder/no more thread priorities
23 files changed, 377 insertions, 1614 deletions
diff --git a/config-model/src/main/java/com/yahoo/vespa/model/content/PriorityMapping.java b/config-model/src/main/java/com/yahoo/vespa/model/content/PriorityMapping.java deleted file mode 100644 index e239c66f1cf..00000000000 --- a/config-model/src/main/java/com/yahoo/vespa/model/content/PriorityMapping.java +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.model.content; - -import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; -import com.yahoo.vespa.model.builder.xml.dom.ModelElement; - -import java.util.HashMap; -import java.util.Map; - -/** - * Created with IntelliJ IDEA. - * User: thomasg - * Date: 5/7/12 - * Time: 2:00 PM - * To change this template use File | Settings | File Templates. - */ -public class PriorityMapping { - ModelElement clusterXml; - Map<DocumentProtocol.Priority, Integer> priorityMappings = new HashMap<>(); - - public PriorityMapping(ModelElement clusterXml) { - this.clusterXml = clusterXml; - - int val = 50; - for (DocumentProtocol.Priority p : DocumentProtocol.Priority.values()) { - priorityMappings.put(p, val); - val += 10; - } - priorityMappings.put(DocumentProtocol.Priority.HIGHEST, 0); - priorityMappings.put(DocumentProtocol.Priority.LOWEST, 255); - } - - public int getPriorityMapping(String priorityName) { - return priorityMappings.get(Enum.valueOf(DocumentProtocol.Priority.class, priorityName)); - } - -} diff --git a/config-model/src/main/java/com/yahoo/vespa/model/content/storagecluster/FileStorProducer.java b/config-model/src/main/java/com/yahoo/vespa/model/content/storagecluster/FileStorProducer.java index b4faa6eeb7e..0cc62fb6680 100644 --- a/config-model/src/main/java/com/yahoo/vespa/model/content/storagecluster/FileStorProducer.java +++ b/config-model/src/main/java/com/yahoo/vespa/model/content/storagecluster/FileStorProducer.java @@ -3,12 +3,8 @@ package com.yahoo.vespa.model.content.storagecluster; import com.yahoo.vespa.config.content.StorFilestorConfig; import com.yahoo.vespa.model.builder.xml.dom.ModelElement; -import com.yahoo.vespa.model.content.PriorityMapping; import com.yahoo.vespa.model.content.cluster.ContentCluster; -import java.util.ArrayList; -import java.util.List; - /** * Serves stor-filestor for storage clusters. */ @@ -19,7 +15,7 @@ public class FileStorProducer implements StorFilestorConfig.Producer { return new FileStorProducer(parent, getThreads(clusterElem)); } - private List<StorFilestorConfig.Threads.Builder> getThreads(ModelElement clusterElem) { + private Integer getThreads(ModelElement clusterElem) { ModelElement tuning = clusterElem.getChild("tuning"); if (tuning == null) { return null; @@ -29,44 +25,34 @@ public class FileStorProducer implements StorFilestorConfig.Producer { return null; } - List<StorFilestorConfig.Threads.Builder> retVal = new ArrayList<>(); - - PriorityMapping mapping = new PriorityMapping(clusterElem); + Integer count = threads.getIntegerAttribute("count"); + if (count != null) { + return count; + } + // Backward compatible fallback + int numThreads = 0; for (ModelElement thread : threads.subElements("thread")) { - String priorityName = thread.getStringAttribute("lowest-priority"); - if (priorityName == null) { - priorityName = "LOWEST"; - } - - Integer count = thread.getIntegerAttribute("count"); - if (count == null) { - count = 1; - } - - for (int i = 0; i < count; ++i) { - retVal.add(new StorFilestorConfig.Threads.Builder().lowestpri(mapping.getPriorityMapping(priorityName))); - } + count = thread.getIntegerAttribute("count"); + numThreads += (count == null) ? 1 : count; } - return retVal; + return numThreads; } } - private List<StorFilestorConfig.Threads.Builder> threads; + private Integer numThreads; private ContentCluster cluster; - public FileStorProducer(ContentCluster parent, List<StorFilestorConfig.Threads.Builder> threads) { - this.threads = threads; + public FileStorProducer(ContentCluster parent, Integer numThreads) { + this.numThreads = numThreads; this.cluster = parent; } @Override public void getConfig(StorFilestorConfig.Builder builder) { - if (threads != null) { - for (StorFilestorConfig.Threads.Builder t : threads) { - builder.threads.add(t); - } + if (numThreads != null) { + builder.num_threads(numThreads); } builder.enable_multibit_split_optimalization(cluster.getPersistence().enableMultiLevelSplitting()); } diff --git a/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java b/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java index 0fae6c6d751..95da1516e80 100644 --- a/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java +++ b/config-model/src/test/java/com/yahoo/vespa/model/content/StorageClusterTest.java @@ -120,12 +120,7 @@ public class StorageClusterTest { StorFilestorConfig config = new StorFilestorConfig(builder); - assertEquals(4, config.threads().size()); - assertEquals(190, config.threads().get(0).lowestpri()); - assertEquals(190, config.threads().get(1).lowestpri()); - assertEquals(60, config.threads().get(2).lowestpri()); - assertEquals(255, config.threads().get(3).lowestpri()); - + assertEquals(4, config.num_threads()); assertEquals(true, config.enable_multibit_split_optimalization()); } @@ -145,7 +140,7 @@ public class StorageClusterTest { StorFilestorConfig config = new StorFilestorConfig(builder); - assertEquals(0, config.threads().size()); + assertEquals(6, config.num_threads()); } @Test diff --git a/configdefinitions/src/vespa/stor-filestor.def b/configdefinitions/src/vespa/stor-filestor.def index 80994bfb477..c02a9018064 100644 --- a/configdefinitions/src/vespa/stor-filestor.def +++ b/configdefinitions/src/vespa/stor-filestor.def @@ -1,79 +1,8 @@ # Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. namespace=vespa.config.content -## Use the new storage core -use_new_core bool default=false restart - -## FILE LAYOUT PARAMETERS - -## Number of directories per level to spread files across -## DEPRECATED: see stor-memfilepersistence config instead -dir_spread int default=256 restart - -## Number of directory levels -## DEPRECATED: see stor-memfilepersistence config instead -dir_levels int default=1 restart - -## FILE SIZE PARAMETERS - -## Minimum number of meta data entries in one slotfile. When creating new -## files or resizing files, enforce it to contain at least this many meta -## entries. Set to 512 by default, using 20544 bytes total for metadata in -## new files. -## DEPRECATED: see stor-memfilepersistence config instead -minimum_file_meta_slots int default=512 - -## Maximum number of entries in one slotfile. File must be split before -## accepting more data -## -## Default ensure meta data is less than 512 kB. -## DEPRECATED: see stor-memfilepersistence config instead -maximum_file_meta_slots int default=13106 - -## Minimum size of header block. At least this amount of header space will -## be available in new or resized files. For 512 documents, this will be -## 200 bytes per document. -## DEPRECATED: see stor-memfilepersistence config instead -minimum_file_header_block_size int default=102848 - -## Maximum header block size (in bytes). Since the whole meta data list and -## header block needs to be read into memory for some operations, a limit -## can be set for the header block, to avoid consuming too much memory. -## -## Default is set high, as we dont configure it automatically right now, so we -## would rather people configure it down than up. -## DEPRECATED: see stor-memfilepersistence config instead -maximum_file_header_block_size int default=33554432 - -## Minimum size of a single slotfile. When creating or resizing files, they -## will never become smaller than this. Default of 1 MB, will be 1807 byte -## per doc if we use all 512 meta data entries set as default min meta -## entries. -## DEPRECATED: see stor-memfilepersistence config instead -minimum_file_size int default=1048576 - -## Maximum size of a single slotfile. File must be split before accepting -## more data. Will return file full errors. -## -## Default is set high, as we dont configure it automatically right now, so we -## would rather people configure it down than up. -## DEPRECATED: see stor-memfilepersistence config instead -maximum_file_size int default=268431360 - -## When creating new files, always create files as a multiplum of this size. -## Should be set to the block size on the disk, or possibly a multiplum of -## that, though we know no advantage of setting it to more than the block -## size. If you want more free space in files, you should rather adjust that -## with growfactor or min file sizes. -## DEPRECATED: see stor-memfilepersistence config instead -file_block_size int default=4096 - ## DETECT FAILURE PARAMETERS -## If persistence operations take more than this amount of milliseconds, it -## will be logged as a warning. -warn_on_slow_operations int default=5000 - ## After seeing given number of errors on a disk, storage will disable the ## disk and restart. If set to 0, storage will never disable disks. Note ## that if you get disk errors, which arent automatically fixed, this will @@ -92,75 +21,15 @@ fail_disk_after_error_count int default=1 restart ## during normal load. disk_operation_timeout int default=0 restart -## Timestamps provided for entries written are normally provided externally. -## To detect clock skew between nodes, storage will warn if timestamps look -## out of sync. Future time clearly indicates clock skew and thus the limit -## here is low. Time in the past might just indicate that the operation has -## been queued for a while, so the default value here is not very strict. -## Time is given in seconds. Note that there are valid usecases for -## inserting data with old timestamps, for instance when synchronizing two -## copies of the same data. Warnings should not be printed in these cases. -time_future_limit int default=5 -time_past_limit int default=3600 - -## Enabling debug verifications will make storage do extra verifications -## to find errors as soon as possible. These extra verifications will use up -## a lot of resources though, and should not be needed in normal operations. -## They are mostly to be used during test phases or when debugging problems. -## The value itself is a bit mask, where you can enable different kinds of -## verifications by setting given bits. -debug_verifications int default=0 restart - -## CONSISTENCY PARAMETERS - -## If true, fsync after all disk operations, to ensure no dirty OS file -## cache afterwards. This is only useful if using cached IO, which is not -## recommended to start with. -## DEPRECATED: see stor-memfilepersistence config instead -fsync_after_each_operation bool default=false restart - -## Time period to keep all updates (given in seconds). One can revert any -## operation done within this time. -## DEPRECATED: see stor-memfilepersistence config instead -revert_time_period int default=300 - -## If a remove is older than the reverttimeperiod, the document it is -## removing may be removed from the file. There are a few advantages to -## keeping removes or a bit longer though. If you use this copy to -## synchronize another copy of data, having the remove entry makes it easy -## to detect that you should delete this entry from the other data copy. -## This is useful for internal synchronization of files within VDS if you -## use multiple copies, or for partial recovery or BCP situations. To -## guarantuee consistency in such situations, a data destination that have -## been unavailable for longer than this amount of time, should have its -## data cleared before being set into the system again. This value is given -## in seconds, with the default being one week -## DEPRECATED: see stor-memfilepersistence config instead -keep_remove_time_period int default=604800 - -## Maximum number of entries to keep for a single document. Heavy resending -## within the revert time period may add loads of entries to a file, ruining -## performance. This setting sets a maximum number of entries for each -## document. This will make entries potentially live shorter than the revert -## time period to avoid a resending worst case. A value of 0 means infinite. -## DEPRECATED: see stor-memfilepersistence config instead -maximum_versions_of_single_document_stored int default=0 - ## PERFORMANCE PARAMETERS -## Number of threads to use for each mountpoint. VDS needs memory per thread -## to perform disk operations, so increasing this number will increase -## memory usage, but it will also make the disk queue on a given disk be -## able to be larger, such that the disk can choose operations to optimize -## seek time. -## See benchmarks for performance/memory tradeoff. -threads[].lowestpri int default=255 restart +## Number of threads to use for each mountpoint. +num_threads int default=6 restart -## Pause operations (and block new ones from starting) with priority -## lower than this value when executing operations with higher pri than -## min_priority_to_be_blocking -max_priority_to_block int default=255 restart -min_priority_to_be_blocking int default=0 restart +## When merging, if we find more than this number of documents that exist on all +## of the same copies, send a separate apply bucket diff with these entries +## to an optimized merge chain that guarantuees minimum data transfer. +common_merge_chain_optimalization_minimum_size int default=64 restart ## Chunksize to use while merging buckets between nodes. ## @@ -169,173 +38,6 @@ min_priority_to_be_blocking int default=0 restart ## while still reading 4k blocks from disk. bucket_merge_chunk_size int default=4190208 restart -## When reading a slotfile, one does not know the size of the meta data -## list, so one have to read a static amount of data, and possibly read more -## if one didnt read enough. This value needs to be at least 64 byte to read -## the initial header stating the true size of the meta data list. -## DEPRECATED: see stor-memfilepersistence config instead -initial_index_read int default=61440 - -## Similar to index read, when reading the document identifiers, one does -## not know the length of the name prior to reading. Thus, if you read less -## than the size, you will have to do an extra read to get the rest. If you -## use very long document identifiers you should increase this value to be -## larger than most of your identifiers. -## restart flag was added automatically and needs to be verified. -## DEPRECATED: see stor-memfilepersistence config instead -initial_name_read int default=512 restart - -## When we need to read (or write) multiple entries in a file where we can -## either read a big enough section to cover all of them. But at some -## distance between the areas we need, it becomes beneficial to do multiple -## reads rather than to read over them. This setting set how many sequential -## bytes we dont need that we allow to be read/written in order to join two -## logical IO operations together in the application. Setting this low might -## be ok if system calls are cheap and we manage to queue up next IO -## operation in time such that the disk dont need to spin an extra round. -## Setting it high will make the disk more likely to process them together, -## but the time used to read/write the gap might have been used to do -## something useful instead. -## DEPRECATED: see stor-memfilepersistence config instead -maximum_gap_to_read_through int default=65536 - -## Currently not in here as we dont have append functionality yet. Might -## improve performance in some cases. -## max_file_appends int default=0 - -## When writing with direct IO, we need to align blocks to 512b, and to -## avoid reading we write garbage after each doc to fill 512b block. When -## resizing or splitting the file we can realign the files such that we -## remove the gaps of existing data, as we will rewrite everything anyhow. -## If using very small documents this might improve your disk space -## utilization. For larger documents it doesnt really reduce much, so might -## be useful to turn it off to save CPU. -## DEPRECATED: see stor-memfilepersistence config instead -remove_file_gaps_on_rewrite bool default=true restart - -## The order we try to enforce in file body blocks. Visitors that only need -## to visit some of the data in a bucket will be able to read less if what -## it needs to read is located next to each other on disk. However, as we -## dont enforce order on write operations, this demands that resize/split -## operations do the resorting, which, if we cant do it all in memory is -## very expensive. ANY will not do any reordering. Timestamp will enforce -## timestamp order, which is fairly close to order that will normally be -## written anyway, so it should be cheap to reorder even if we cant do it -## all in memory. This might be useful if you often visit subsets based on -## time. RGID uses reverse GID, which stores data from one location -## together. This is useful if you want to visit data from one user from -## buckets that have many users often. This is a much more expensive sort -## though, and should only be done if we have enough memory. -## DEPRECATED: see stor-memfilepersistence config instead -body_block_order enum { ANY, TIMESTAMP, RGID } default=ANY restart - -## If set to true, we will refuse to do reordering of memory unless we have -## enough memory to do it all in memory. See body_block_order comments. -## DEPRECATED: see stor-memfilepersistence config instead -only_reorder_body_in_memory bool default=true restart - -## Whether or not we should verify checksums of all read data during regular -## operations like put, get & remove. Note that some operations, like merge -## and bucket integrity verify operations will still check checksums even if -## this is set false. -## DEPRECATED: see stor-memfilepersistence config instead -verify_checksum_on_regular_load bool default=true restart - -## For streaming search, visiting is very performance critical. Thus you can -## specifically disable checksum verification for visiting. -## DEPRECATED: see stor-memfilepersistence config instead -verify_checksum_on_visit bool default=true restart - -## Maximum size of index buffer that will be allowed to stay in memory and -## not being reduced back to this size after we no longer need it. -## DEPRECATED: see stor-memfilepersistence config instead -maximum_sustainable_index_buffer_size int default=1044480 restart - -## Maximum size of input buffer that will be allowed to stay in memory and -## not being reduced back to this size after we no longer need it. -## DEPRECATED: see stor-memfilepersistence config instead -maximum_sustainable_input_buffer_size int default=1044480 restart - -## Maximum size of output buffer that will be allowed to stay in memory and -## not being reduced back to this size after we no longer need it. -## DEPRECATED: see stor-memfilepersistence config instead -maximum_sustainable_output_buffer_size int default=1044480 restart - -## Whether to downsize index buffer immediately after usage if its above the -## maximum size. If not, it will not be resized down until someone requests -## to use it that needs less data in the buffer. Index buffer is used all -## the time so there should be little reason for immediately downsizing it. -## DEPRECATED: see stor-memfilepersistence config instead -downsize_index_buffer_immediately_after_use bool default=false restart - -## Whether to downsize input buffer immediately after usage if its above the -## maximum size. If not, it will not be resized down until someone requests -## to use it that needs less data in the buffer. Input buffer is not used -## that often, so downsizing it immediately might save some memory. -## DEPRECATED: see stor-memfilepersistence config instead -downsize_input_buffer_immediately_after_use bool default=true restart - -## Whether to downsize output buffer immediately after usage if its above -## the maximum size. If not, it will not be resized down until someone -## requests to use it that needs less data in the buffer. Input buffer is -## not used that often, so downsizing it immediately might save some memory. -## DEPRECATED: see stor-memfilepersistence config instead -downsize_output_buffer_immediately_after_use bool default=true restart - -## Minimum size of buffer used to write a continuous file. If maximum amount -## of memory is not available. At least this amount will be allocated. -## DEPRECATED: see stor-memfilepersistence config instead -minimum_continuous_file_write_buffer_size int default=1044480 restart - -## Maximum size of buffer used to write a continuous file. If set above max -## file size we will always write new files in one go, which probably makes -## for the least chance of getting fragmentation, but will also consume the -## most memory. Default of writing a MB at a time, should make for little -## performance loss because of disk seek time, and hopefully get little -## fragmentation while keeping memory usage down. -## DEPRECATED: see stor-memfilepersistence config instead -maximum_continuous_file_write_buffer_size int default=1044480 restart - -## Minimum amount of memory allocated to read source data during join. This -## amount of memory will be forced allocated. -## DEPRECATED: see stor-memfilepersistence config instead -minimum_join_source_body_read_buffer_size int default=1044480 restart - -## This sets the maximum size of the buffer used in join to read source -## data. Join uses the least IO if this buffer is as big as the body block -## of the source file. Due to the memory manager, each join might get that -## much memory though. -## DEPRECATED: see stor-memfilepersistence config instead -maximum_join_source_body_read_buffer_size int default=16773120 restart - -## Minimum amount of memory allocated to read source data during export. This -## amount of memory will be forced allocated. -## DEPRECATED: see stor-memfilepersistence config instead -minimum_export_source_body_read_buffer_size int default=1044480 restart - -## This sets the maximum size of the buffer used in export to read source -## data. Export uses the least IO if this buffer is as big as the body block -## of the source file. In addition, reordering of body block might not be -## feasibly unless the buffer is big enough to include the whole body block. -## DEPRECATED: see stor-memfilepersistence config instead -maximum_export_source_body_read_buffer_size int default=33550336 restart - -## Minimum size of buffer used to read data during defragmentation. -## DEPRECATED: see stor-memfilepersistence config instead -minimum_defrag_source_body_read_buffer_size int default=1044480 restart - -## This sets the maximum size of the buffer used in defragmentation to read -## source data. Defragmentation uses the least IO if this buffer is as big -## as the body block of the source file, but this might consume some memory. -## Defragmentation is not enabled by default. -## DEPRECATED: see stor-memfilepersistence config instead -maximum_defrag_source_body_read_buffer_size int default=1044480 restart - -## When merging, if we find more than this number of documents that exist on all -## of the same copies, send a separate apply bucket diff with these entries -## to an optimized merge chain that guarantuees minimum data transfer. -common_merge_chain_optimalization_minimum_size int default=64 restart - ## When merging, it is possible to send more metadata than needed in order to ## let local nodes in merge decide which entries fits best to add this time ## based on disk location. Toggle this option on to use it. Note that memory @@ -344,100 +46,7 @@ common_merge_chain_optimalization_minimum_size int default=64 restart ## fill all. enable_merge_local_node_choose_docs_optimalization bool default=true restart -## If set, when we need to cache the entire header, and we already have cached -## all the metadata, read the metadata to find max header position, and only -## read the part containing information. -## DEPRECATED: see stor-memfilepersistence config instead -read_only_used_header_part_when_possible bool default=true restart - -## Enable the slotfile read cache. The cache holds recent metadata and header -## blocks read from disks, and even if small, is very useful for localized -## access. -## DEPRECATED: see stor-memfilepersistence config instead -enable_slotfile_cache bool default=true restart - -## Let the slotfile cache the whole header on a header only operation not -## needing the entire header, if this amount of header only accesses needing -## part of the header has already happened. -## -## Set very high to begin with to never reduce performance. If you have heavy -## header only access to some files, you may get better performance by tuning -## this value. -## DEPRECATED: see stor-memfilepersistence config instead -slotfile_precache_header_after_access_count int default=512 restart - ## Whether or not to enable the multibit split optimalization. This is useful ## if splitting is expensive, but listing document identifiers is fairly cheap. ## This is true for memfile persistence layer, but not for vespa search. enable_multibit_split_optimalization bool default=true restart - -## STORAGE SPACE vs IO/CPU PERFORMANCE OPTIONS - -## If true, use direct IO, bypassing OS caches for disk access. This is very -## useful as VDS does a random distribution dividing load, so it is unlikely -## that the OS cache will ever hit, and thus it is a huge performance drain. -## DEPRECATED: see stor-memfilepersistence config instead -use_direct_io bool default=true restart - -## All IO operations will be aligned to this amount of bytes if direct IO is -## enabled. -## DEPRECATED: see stor-memfilepersistence config instead -block_alignment_size int default=512 restart - -## When a disk is fuller than this factor, we will not allow more data to be -## written to the system, unless this data is written in order to reduce -## storage consumption, such as resizing files to become smaller or add -## meta entries to write remove entries into. This value is set high as -## default as we expect a lot of users to have formatted their disks to -## already reserve 8% of the data to root user which is often default. We -## suggest using 0% reserved for root, and rather set this parameter lower -## to reserve space. That way, VDS have more space available in worst case -## in order to resize files to become smaller. -## DEPRECATED: see stor-memfilepersistence config instead -disk_full_factor double default=0.98 restart - -## The grow factor sets how much free space to add to a file when we resize -## it, whether we are making it smaller or larger. If the space we need -## after the operation triggering the resize is X, then the file will be -## resized to be of size X * growfactor. In a growing system with no -## deletes, a growfactor of 2 will make the files have 25% free space on -## average. Reducing it to 1.5 will reduce the average free space to 16.7%. -## DEPRECATED: see stor-memfilepersistence config instead -grow_factor double default=2.0 - -## If files fall below this fill rate, resize file to become smaller. -## Note that this parameter is tightly coupled with the growfactor. For -## instance, with a growfactor of 2.0, a file will contain 50 % free space -## after a resize. If the min fill rate then is 0.50, that means that if a -## single doc is deleted from this file, we need to resize it to -## become smaller straight away. -## DEPRECATED: see stor-memfilepersistence config instead -min_fill_rate double default=0.1 - -## Minimum part of defragmented space one need to reclaim to allow -## defragmentation of file. This value is given as a ratio of reclaimed -## space compared to the total size of the data block. -## Example: A body block of 100 MB, has 15 MB free space, with largest -## continuos free space of 5 MB. Gain of defragmentation will then be 0.1. -## DEPRECATED: see stor-memfilepersistence config instead -defrag_minimum_gain double default=1.0 restart - -## When creating/resizing slotfiles, one uses average document sizes to -## decide how much free space to add to metadata, header and body portions. -## This option can be used to allocate extra free space to meta data in -## order to reduce the chance of the file needing resize due to lack of free -## meta data entries. -## DEPRECATED: see stor-memfilepersistence config instead -overrepresent_meta_data_factor double default=1.2 - -## When creating/resizing slotfiles, one uses average document sizes to -## decide how much free space to add to metadata, header and body portions. -## This option can be used to allocate extra free space to header data in -## order to reduce the chance of the file needing resize due to lack of -## header block space. -## DEPRECATED: see stor-memfilepersistence config instead -overrepresent_header_block_factor double default=1.1 - -## Load types to use cache for. If empty, cache for all. -## DEPRECATED: see stor-memfilepersistence config instead -load_types_to_cache[] string restart diff --git a/storage/src/tests/common/testhelper.cpp b/storage/src/tests/common/testhelper.cpp index 296a9a3cc0f..f7da854be68 100644 --- a/storage/src/tests/common/testhelper.cpp +++ b/storage/src/tests/common/testhelper.cpp @@ -9,23 +9,6 @@ LOG_SETUP(".testhelper"); namespace storage { -namespace { - bool useNewStorageCore() { - if ( // Unit test directory - vespalib::fileExists("use_new_storage_core") || - // src/cpp directory - vespalib::fileExists("../use_new_storage_core") || - // Top build directory where storage-HEAD remains - vespalib::fileExists("../../../../use_new_storage_core")) - { - std::cerr << "Using new storage core for unit tests\n"; - return true; - } - return false; - } - bool newStorageCore(useNewStorageCore()); -} - void addStorageDistributionConfig(vdstestlib::DirConfig& dc) { vdstestlib::DirConfig::Config* config; @@ -124,18 +107,14 @@ vdstestlib::DirConfig getStandardConfig(bool storagenode, const std::string & ro config->set("minimum_file_meta_slots", "2"); config->set("minimum_file_header_block_size", "368"); config->set("minimum_file_size", "4096"); - config->set("threads[1]"); - config->set("threads[0].lowestpri 255"); + config->set("num_threads", "1"); config->set("dir_spread", "4"); config->set("dir_levels", "0"); - config->set("use_new_core", newStorageCore ? "true" : "false"); config->set("maximum_versions_of_single_document_stored", "0"); //config->set("enable_slotfile_cache", "false"); // Unit tests typically use fake low time values, so don't complain // about them or compact/delete them by default. Override in tests testing that // behavior - config->set("time_future_limit", "5"); - config->set("time_past_limit", "2000000000"); config->set("keep_remove_time_period", "2000000000"); config->set("revert_time_period", "2000000000"); // Don't want test to call exit() diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index 8d03c9de54c..26bfeb41b42 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -7,17 +7,10 @@ #include <vespa/document/test/make_document_bucket.h> #include <vespa/storage/storageserver/statemanager.h> #include <vespa/storage/bucketdb/bucketmanager.h> -#include <vespa/vdslib/state/cluster_state_bundle.h> #include <vespa/storage/persistence/persistencethread.h> #include <vespa/storage/persistence/filestorage/filestormanager.h> #include <vespa/storage/persistence/filestorage/modifiedbucketchecker.h> #include <vespa/document/update/assignvalueupdate.h> -#include <vespa/document/datatype/datatype.h> -#include <vespa/document/fieldvalue/document.h> -#include <vespa/document/datatype/documenttype.h> -#include <vespa/document/update/documentupdate.h> -#include <vespa/document/fieldvalue/rawfieldvalue.h> -#include <vespa/document/fieldvalue/stringfieldvalue.h> #include <vespa/document/select/parser.h> #include <vespa/vdslib/state/random.h> #include <vespa/storageapi/message/bucketsplitting.h> @@ -76,8 +69,6 @@ struct FileStorManagerTest : public CppUnit::TestFixture { void testFlush(); void testRemapSplit(); void testHandlerPriority(); - void testHandlerPriorityBlocking(); - void testHandlerPriorityPreempt(); void testHandlerMulti(); void testHandlerTimeout(); void testHandlerPause(); @@ -102,7 +93,6 @@ struct FileStorManagerTest : public CppUnit::TestFixture { void testMergeBucketImplicitCreateBucket(); void testNewlyCreatedBucketIsReady(); void testCreateBucketSetsActiveFlagInDatabaseAndReply(); - void testFileStorThreadLockingStressTest(); void testStateChange(); void testRepairNotifiesDistributorOnChange(); void testDiskMove(); @@ -113,8 +103,6 @@ struct FileStorManagerTest : public CppUnit::TestFixture { CPPUNIT_TEST(testFlush); CPPUNIT_TEST(testRemapSplit); CPPUNIT_TEST(testHandlerPriority); - CPPUNIT_TEST(testHandlerPriorityBlocking); - CPPUNIT_TEST(testHandlerPriorityPreempt); CPPUNIT_TEST(testHandlerMulti); CPPUNIT_TEST(testHandlerTimeout); CPPUNIT_TEST(testHandlerPause); @@ -146,21 +134,17 @@ struct FileStorManagerTest : public CppUnit::TestFixture { void createBucket(document::BucketId bid, uint16_t disk) { - spi::Context context(defaultLoadType, spi::Priority(0), - spi::Trace::TraceLevel(0)); - _node->getPersistenceProvider().createBucket( - makeSpiBucket(bid, spi::PartitionId(disk)), context); + spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0)); + _node->getPersistenceProvider().createBucket(makeSpiBucket(bid, spi::PartitionId(disk)), context); StorBucketDatabase::WrappedEntry entry( - _node->getStorageBucketDatabase().get(bid, "foo", - StorBucketDatabase::CREATE_IF_NONEXISTING)); + _node->getStorageBucketDatabase().get(bid, "foo", StorBucketDatabase::CREATE_IF_NONEXISTING)); entry->disk = disk; entry->info = api::BucketInfo(0, 0, 0, 0, 0, true, false); entry.write(); } - document::Document::UP createDocument( - const std::string& content, const std::string& id) + document::Document::UP createDocument(const std::string& content, const std::string& id) { return _node->getTestDocMan().createDocument(content, id); } @@ -265,15 +249,14 @@ std::unique_ptr<DiskThread> createThread(vdstestlib::DirConfig& config, spi::PersistenceProvider& provider, FileStorHandler& filestorHandler, FileStorThreadMetrics& metrics, - uint16_t deviceIndex, - uint8_t lowestPriority) + uint16_t deviceIndex) { (void) config; std::unique_ptr<DiskThread> disk; disk.reset(new PersistenceThread( node.getComponentRegister(), config.getConfigId(), provider, filestorHandler, metrics, - deviceIndex, lowestPriority)); + deviceIndex)); return disk; } @@ -629,39 +612,31 @@ FileStorManagerTest::testHandlerPriority() FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50); std::string content("Here is some content which is in all documents"); std::ostringstream uri; - Document::SP doc(createDocument( - content, "userdoc:footype:1234:bar").release()); + Document::SP doc(createDocument(content, "userdoc:footype:1234:bar").release()); document::BucketIdFactory factory; - document::BucketId bucket(16, factory.getBucketId( - doc->getId()).getRawId()); + document::BucketId bucket(16, factory.getBucketId(doc->getId()).getRawId()); // Populate bucket with the given data for (uint32_t i = 1; i < 6; i++) { - std::shared_ptr<api::PutCommand> cmd( - new api::PutCommand(makeDocumentBucket(bucket), doc, 100)); - std::unique_ptr<api::StorageMessageAddress> address( - new api::StorageMessageAddress( - "storage", lib::NodeType::STORAGE, 3)); + auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), doc, 100); + auto address = std::make_shared<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3); cmd->setAddress(*address); cmd->setPriority(i * 15); filestorHandler.schedule(cmd, 0); } - CPPUNIT_ASSERT_EQUAL(15, (int)filestorHandler.getNextMessage(0, 20).second->getPriority()); - CPPUNIT_ASSERT(filestorHandler.getNextMessage(0, 20).second.get() == NULL); - CPPUNIT_ASSERT_EQUAL(30, (int)filestorHandler.getNextMessage(0, 50).second->getPriority()); - CPPUNIT_ASSERT_EQUAL(45, (int)filestorHandler.getNextMessage(0, 50).second->getPriority()); - CPPUNIT_ASSERT(filestorHandler.getNextMessage(0, 50).second.get() == NULL); - CPPUNIT_ASSERT_EQUAL(60, (int)filestorHandler.getNextMessage(0, 255).second->getPriority()); - CPPUNIT_ASSERT_EQUAL(75, (int)filestorHandler.getNextMessage(0, 255).second->getPriority()); + CPPUNIT_ASSERT_EQUAL(15, (int)filestorHandler.getNextMessage(0).second->getPriority()); + CPPUNIT_ASSERT_EQUAL(30, (int)filestorHandler.getNextMessage(0).second->getPriority()); + CPPUNIT_ASSERT_EQUAL(45, (int)filestorHandler.getNextMessage(0).second->getPriority()); + CPPUNIT_ASSERT_EQUAL(60, (int)filestorHandler.getNextMessage(0).second->getPriority()); + CPPUNIT_ASSERT_EQUAL(75, (int)filestorHandler.getNextMessage(0).second->getPriority()); } class MessagePusherThread : public document::Runnable @@ -678,11 +653,9 @@ public: void run() override { while (!_done) { document::BucketIdFactory factory; - document::BucketId bucket(16, factory.getBucketId( - _doc->getId()).getRawId()); + document::BucketId bucket(16, factory.getBucketId(_doc->getId()).getRawId()); - std::shared_ptr<api::PutCommand> cmd( - new api::PutCommand(makeDocumentBucket(bucket), _doc, 100)); + auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), _doc, 100); _handler.schedule(cmd, 0); FastOS_Thread::Sleep(1); } @@ -694,7 +667,7 @@ public: MessagePusherThread::MessagePusherThread(FileStorHandler& handler, Document::SP doc) : _handler(handler), _doc(doc), _done(false), _threadDone(false) {} -MessagePusherThread::~MessagePusherThread() {} +MessagePusherThread::~MessagePusherThread() = default; class MessageFetchingThread : public document::Runnable { public: @@ -711,7 +684,7 @@ public: void run() override { while (!_done) { - FileStorHandler::LockedMessage msg = _handler.getNextMessage(0, 255); + FileStorHandler::LockedMessage msg = _handler.getNextMessage(0); if (msg.second.get()) { uint32_t originalConfig = _config.load(); _fetchedCount++; @@ -747,8 +720,7 @@ FileStorManagerTest::testHandlerPausedMultiThread() FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50); std::string content("Here is some content which is in all documents"); @@ -799,8 +771,7 @@ FileStorManagerTest::testHandlerPause() FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50); std::string content("Here is some content which is in all documents"); @@ -810,29 +781,26 @@ FileStorManagerTest::testHandlerPause() document::BucketIdFactory factory; document::BucketId bucket(16, factory.getBucketId( - doc->getId()).getRawId()); + doc->getId()).getRawId()); // Populate bucket with the given data for (uint32_t i = 1; i < 6; i++) { - std::shared_ptr<api::PutCommand> cmd( - new api::PutCommand(makeDocumentBucket(bucket), doc, 100)); - std::unique_ptr<api::StorageMessageAddress> address( - new api::StorageMessageAddress( - "storage", lib::NodeType::STORAGE, 3)); + auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), doc, 100); + auto address = std::make_unique<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3); cmd->setAddress(*address); cmd->setPriority(i * 15); filestorHandler.schedule(cmd, 0); } - CPPUNIT_ASSERT_EQUAL(15, (int)filestorHandler.getNextMessage(0, 255).second->getPriority()); + CPPUNIT_ASSERT_EQUAL(15, (int)filestorHandler.getNextMessage(0).second->getPriority()); { ResumeGuard guard = filestorHandler.pause(); (void)guard; - CPPUNIT_ASSERT(filestorHandler.getNextMessage(0, 255).second.get() == NULL); + CPPUNIT_ASSERT(filestorHandler.getNextMessage(0).second.get() == NULL); } - CPPUNIT_ASSERT_EQUAL(30, (int)filestorHandler.getNextMessage(0, 255).second->getPriority()); + CPPUNIT_ASSERT_EQUAL(30, (int)filestorHandler.getNextMessage(0).second->getPriority()); } namespace { @@ -855,8 +823,7 @@ FileStorManagerTest::testRemapSplit() // Setup a filestorthread to test DummyStorageLink top; DummyStorageLink *dummyManager; - top.push_back(std::unique_ptr<StorageLink>( - dummyManager = new DummyStorageLink)); + top.push_back(std::unique_ptr<StorageLink>(dummyManager = new DummyStorageLink)); top.open(); ForwardingMessageSender messageSender(*dummyManager); // Since we fake time with small numbers, we need to make sure we dont @@ -867,7 +834,7 @@ FileStorManagerTest::testRemapSplit() metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50); std::string content("Here is some content which is in all documents"); @@ -882,10 +849,8 @@ FileStorManagerTest::testRemapSplit() // Populate bucket with the given data for (uint32_t i = 1; i < 4; i++) { - filestorHandler.schedule( - api::StorageMessage::SP(new api::PutCommand(makeDocumentBucket(bucket1), doc1, i)), 0); - filestorHandler.schedule( - api::StorageMessage::SP(new api::PutCommand(makeDocumentBucket(bucket2), doc2, i + 10)), 0); + filestorHandler.schedule(std::make_shared<api::PutCommand>(makeDocumentBucket(bucket1), doc1, i), 0); + filestorHandler.schedule(std::make_shared<api::PutCommand>(makeDocumentBucket(bucket2), doc2, i + 10), 0); } CPPUNIT_ASSERT_EQUAL(std::string("BucketId(0x40000000000004d2): Put(BucketId(0x40000000000004d2), userdoc:footype:1234:bar, timestamp 1, size 108) (priority: 127)\n" @@ -933,7 +898,7 @@ FileStorManagerTest::testHandlerMulti() metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50); std::string content("Here is some content which is in all documents"); @@ -943,10 +908,8 @@ FileStorManagerTest::testHandlerMulti() Document::SP doc2(createDocument(content, "userdoc:footype:4567:bar").release()); document::BucketIdFactory factory; - document::BucketId bucket1(16, factory.getBucketId( - doc1->getId()).getRawId()); - document::BucketId bucket2(16, factory.getBucketId( - doc2->getId()).getRawId()); + document::BucketId bucket1(16, factory.getBucketId(doc1->getId()).getRawId()); + document::BucketId bucket2(16, factory.getBucketId(doc2->getId()).getRawId()); // Populate bucket with the given data for (uint32_t i = 1; i < 10; i++) { @@ -957,21 +920,21 @@ FileStorManagerTest::testHandlerMulti() } { - FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0, 255); + FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0); CPPUNIT_ASSERT_EQUAL((uint64_t)1, getPutTime(lock.second)); - lock = filestorHandler.getNextMessage(0, lock, 255); + lock = filestorHandler.getNextMessage(0, lock); CPPUNIT_ASSERT_EQUAL((uint64_t)2, getPutTime(lock.second)); - lock = filestorHandler.getNextMessage(0, lock, 255); + lock = filestorHandler.getNextMessage(0, lock); CPPUNIT_ASSERT_EQUAL((uint64_t)3, getPutTime(lock.second)); } { - FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0, 255); + FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0); CPPUNIT_ASSERT_EQUAL((uint64_t)11, getPutTime(lock.second)); - lock = filestorHandler.getNextMessage(0, lock, 255); + lock = filestorHandler.getNextMessage(0, lock); CPPUNIT_ASSERT_EQUAL((uint64_t)12, getPutTime(lock.second)); } } @@ -984,8 +947,7 @@ FileStorManagerTest::testHandlerTimeout() // Setup a filestorthread to test DummyStorageLink top; DummyStorageLink *dummyManager; - top.push_back(std::unique_ptr<StorageLink>( - dummyManager = new DummyStorageLink)); + top.push_back(std::unique_ptr<StorageLink>(dummyManager = new DummyStorageLink)); top.open(); ForwardingMessageSender messageSender(*dummyManager); @@ -997,7 +959,7 @@ FileStorManagerTest::testHandlerTimeout() metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50); std::string content("Here is some content which is in all documents"); @@ -1036,7 +998,7 @@ FileStorManagerTest::testHandlerTimeout() FastOS_Thread::Sleep(51); for (;;) { - auto lock = filestorHandler.getNextMessage(0, 255); + auto lock = filestorHandler.getNextMessage(0); if (lock.first.get()) { CPPUNIT_ASSERT_EQUAL(uint8_t(200), lock.second->getPriority()); break; @@ -1050,208 +1012,6 @@ FileStorManagerTest::testHandlerTimeout() } void -FileStorManagerTest::testHandlerPriorityBlocking() -{ - TestName testName("testHandlerPriorityBlocking"); - // Setup a filestorthread to test - DummyStorageLink top; - DummyStorageLink *dummyManager; - top.push_back(std::unique_ptr<StorageLink>( - dummyManager = new DummyStorageLink)); - top.open(); - ForwardingMessageSender messageSender(*dummyManager); - // Since we fake time with small numbers, we need to make sure we dont - // compact them away, as they will seem to be from 1970 - - documentapi::LoadTypeSet loadTypes("raw:"); - FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); - metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); - - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 21, 21); - filestorHandler.setGetNextMessageTimeout(50); - - std::string content("Here is some content which is in all documents"); - std::ostringstream uri; - - document::BucketIdFactory factory; - - // Populate bucket with the given data - for (uint32_t i = 1; i < 6; i++) { - Document::SP doc(createDocument(content, vespalib::make_string("doc:foo:%d",i)).release()); - document::BucketId bucket(16, factory.getBucketId( - doc->getId()).getRawId()); - std::shared_ptr<api::PutCommand> cmd( - new api::PutCommand(makeDocumentBucket(bucket), doc, 100)); - std::unique_ptr<api::StorageMessageAddress> address( - new api::StorageMessageAddress( - "storage", lib::NodeType::STORAGE, 3)); - cmd->setAddress(*address); - cmd->setPriority(i * 15); - filestorHandler.schedule(cmd, 0); - } - - { - FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0, 20); - CPPUNIT_ASSERT_EQUAL(15, (int)lock1.second->getPriority()); - - LOG(debug, "Waiting for request that should time out"); - FileStorHandler::LockedMessage lock2 = filestorHandler.getNextMessage(0, 30); - LOG(debug, "Got request that should time out"); - CPPUNIT_ASSERT(lock2.second.get() == NULL); - } - - { - FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0, 40); - CPPUNIT_ASSERT_EQUAL(30, (int)lock1.second->getPriority()); - - // New high-pri message comes in - Document::SP doc(createDocument(content, vespalib::make_string("doc:foo:%d", 100)).release()); - document::BucketId bucket(16, factory.getBucketId( - doc->getId()).getRawId()); - std::shared_ptr<api::PutCommand> cmd( - new api::PutCommand(makeDocumentBucket(bucket), doc, 100)); - std::unique_ptr<api::StorageMessageAddress> address( - new api::StorageMessageAddress( - "storage", lib::NodeType::STORAGE, 3)); - cmd->setAddress(*address); - cmd->setPriority(15); - filestorHandler.schedule(cmd, 0); - - FileStorHandler::LockedMessage lock2 = filestorHandler.getNextMessage(0, 20); - CPPUNIT_ASSERT_EQUAL(15, (int)lock2.second->getPriority()); - - LOG(debug, "Waiting for request that should time out"); - FileStorHandler::LockedMessage lock3 = filestorHandler.getNextMessage(0, 255); - LOG(debug, "Got request that should time out"); - CPPUNIT_ASSERT(lock3.second.get() == NULL); - } - - { - FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0, 255); - CPPUNIT_ASSERT_EQUAL(45, (int)lock1.second->getPriority()); - - FileStorHandler::LockedMessage lock = filestorHandler.getNextMessage(0, 255); - CPPUNIT_ASSERT_EQUAL(60, (int)lock.second->getPriority()); - } - LOG(debug, "Test done"); -} - -class PausedThread : public document::Runnable { -private: - FileStorHandler& _handler; - -public: - bool pause; - bool done; - bool gotoperation; - - PausedThread(FileStorHandler& handler) - : _handler(handler), pause(false), done(false), gotoperation(false) {} - - void run() override { - FileStorHandler::LockedMessage msg = _handler.getNextMessage(0, 255); - gotoperation = true; - - while (!done) { - if (pause) { - _handler.pause(0, msg.second->getPriority()); - pause = false; - } - FastOS_Thread::Sleep(10); - } - - done = false; - }; -}; - -void -FileStorManagerTest::testHandlerPriorityPreempt() -{ - TestName testName("testHandlerPriorityPreempt"); - // Setup a filestorthread to test - DummyStorageLink top; - DummyStorageLink *dummyManager; - top.push_back(std::unique_ptr<StorageLink>( - dummyManager = new DummyStorageLink)); - top.open(); - ForwardingMessageSender messageSender(*dummyManager); - // Since we fake time with small numbers, we need to make sure we dont - // compact them away, as they will seem to be from 1970 - - documentapi::LoadTypeSet loadTypes("raw:"); - FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); - metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); - - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 21, 21); - filestorHandler.setGetNextMessageTimeout(50); - - std::string content("Here is some content which is in all documents"); - std::ostringstream uri; - - document::BucketIdFactory factory; - - { - Document::SP doc(createDocument(content, "doc:foo:1").release()); - document::BucketId bucket(16, factory.getBucketId( - doc->getId()).getRawId()); - std::shared_ptr<api::PutCommand> cmd( - new api::PutCommand(makeDocumentBucket(bucket), doc, 100)); - std::unique_ptr<api::StorageMessageAddress> address( - new api::StorageMessageAddress( - "storage", lib::NodeType::STORAGE, 3)); - cmd->setAddress(*address); - cmd->setPriority(60); - filestorHandler.schedule(cmd, 0); - } - - PausedThread thread(filestorHandler); - FastOS_ThreadPool pool(512 * 1024); - thread.start(pool); - - while (!thread.gotoperation) { - FastOS_Thread::Sleep(10); - } - - { - Document::SP doc(createDocument(content, "doc:foo:2").release()); - document::BucketId bucket(16, factory.getBucketId( - doc->getId()).getRawId()); - std::shared_ptr<api::PutCommand> cmd( - new api::PutCommand(makeDocumentBucket(bucket), doc, 100)); - std::unique_ptr<api::StorageMessageAddress> address( - new api::StorageMessageAddress( - "storage", lib::NodeType::STORAGE, 3)); - cmd->setAddress(*address); - cmd->setPriority(20); - filestorHandler.schedule(cmd, 0); - } - - { - FileStorHandler::LockedMessage lock1 = filestorHandler.getNextMessage(0, 20); - CPPUNIT_ASSERT_EQUAL(20, (int)lock1.second->getPriority()); - - thread.pause = true; - - for (uint32_t i = 0; i < 10; i++) { - CPPUNIT_ASSERT(thread.pause); - FastOS_Thread::Sleep(100); - } - } - - while (thread.pause) { - FastOS_Thread::Sleep(10); - } - - thread.done = true; - - while (thread.done) { - FastOS_Thread::Sleep(10); - } -} - -void FileStorManagerTest::testPriority() { TestName testName("testPriority"); @@ -1269,14 +1029,13 @@ FileStorManagerTest::testPriority() FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 2); - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), - filestorHandler, *metrics.disks[0]->threads[0], 0, 25)); + filestorHandler, *metrics.disks[0]->threads[0], 0)); std::unique_ptr<DiskThread> thread2(createThread( *config, *_node, _node->getPersistenceProvider(), - filestorHandler, *metrics.disks[0]->threads[1], 0, 255)); + filestorHandler, *metrics.disks[0]->threads[1], 0)); // Creating documents to test with. Different gids, 2 locations. std::vector<document::Document::SP > documents; @@ -1284,8 +1043,7 @@ FileStorManagerTest::testPriority() std::string content("Here is some content which is in all documents"); std::ostringstream uri; - uri << "userdoc:footype:" << (i % 3 == 0 ? 0x10001 : 0x0100001) - << ":mydoc-" << i; + uri << "userdoc:footype:" << (i % 3 == 0 ? 0x10001 : 0x0100001)<< ":mydoc-" << i; Document::SP doc(createDocument(content, uri.str()).release()); documents.push_back(doc); } @@ -1294,20 +1052,16 @@ FileStorManagerTest::testPriority() // Create buckets in separate, initial pass to avoid races with puts for (uint32_t i=0; i<documents.size(); ++i) { - document::BucketId bucket(16, factory.getBucketId( - documents[i]->getId()).getRawId()); + document::BucketId bucket(16, factory.getBucketId(documents[i]->getId()).getRawId()); - spi::Context context(defaultLoadType, spi::Priority(0), - spi::Trace::TraceLevel(0)); + spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0)); - _node->getPersistenceProvider().createBucket( - makeSpiBucket(bucket), context); + _node->getPersistenceProvider().createBucket(makeSpiBucket(bucket), context); } // Populate bucket with the given data for (uint32_t i=0; i<documents.size(); ++i) { - document::BucketId bucket(16, factory.getBucketId( - documents[i]->getId()).getRawId()); + document::BucketId bucket(16, factory.getBucketId(documents[i]->getId()).getRawId()); std::shared_ptr<api::PutCommand> cmd( new api::PutCommand(makeDocumentBucket(bucket), documents[i], 100 + i)); @@ -1342,9 +1096,8 @@ FileStorManagerTest::testPriority() CPPUNIT_ASSERT_EQUAL(uint64_t(documents.size()), metrics.disks[0]->threads[0]->operations.getValue() + metrics.disks[0]->threads[1]->operations.getValue()); - CPPUNIT_ASSERT(metrics.disks[0]->threads[0]->operations.getValue() <= 13); - // Closing file stor handler before threads are deleted, such that - // file stor threads getNextMessage calls returns. + // Closing file stor handler before threads are deleted, such that + // file stor threads getNextMessage calls returns. filestorHandler.close(); } @@ -1363,11 +1116,10 @@ FileStorManagerTest::testSplit1() documentapi::LoadTypeSet loadTypes("raw:"); FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), - filestorHandler, *metrics.disks[0]->threads[0], 0, 255)); + filestorHandler, *metrics.disks[0]->threads[0], 0)); // Creating documents to test with. Different gids, 2 locations. std::vector<document::Document::SP > documents; for (uint32_t i=0; i<20; ++i) { @@ -1532,10 +1284,8 @@ FileStorManagerTest::testSplitSingleGroup() documentapi::LoadTypeSet loadTypes("raw:"); FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); - spi::Context context(defaultLoadType, spi::Priority(0), - spi::Trace::TraceLevel(0)); + FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); + spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0)); for (uint32_t j=0; j<1; ++j) { // Test this twice, once where all the data ends up in file with // splitbit set, and once where all the data ends up in file with @@ -1544,7 +1294,7 @@ FileStorManagerTest::testSplitSingleGroup() std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), - filestorHandler, *metrics.disks[0]->threads[0], 0, 255)); + filestorHandler, *metrics.disks[0]->threads[0], 0)); // Creating documents to test with. Different gids, 2 locations. std::vector<document::Document::SP > documents; for (uint32_t i=0; i<20; ++i) { @@ -1673,11 +1423,10 @@ FileStorManagerTest::testSplitEmptyTargetWithRemappedOps() documentapi::LoadTypeSet loadTypes("raw:"); FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), - filestorHandler, *metrics.disks[0]->threads[0], 0, 255)); + filestorHandler, *metrics.disks[0]->threads[0], 0)); document::BucketId source(16, 0x10001); @@ -1751,11 +1500,10 @@ FileStorManagerTest::testNotifyOnSplitSourceOwnershipChanged() documentapi::LoadTypeSet loadTypes("raw:"); FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), - filestorHandler, *metrics.disks[0]->threads[0], 0, 255)); + filestorHandler, *metrics.disks[0]->threads[0], 0)); document::BucketId source(getFirstBucketNotOwnedByDistributor(0)); createBucket(source, 0); @@ -1801,21 +1549,18 @@ FileStorManagerTest::testJoin() documentapi::LoadTypeSet loadTypes("raw:"); FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), - filestorHandler, *metrics.disks[0]->threads[0], 0, 255)); + filestorHandler, *metrics.disks[0]->threads[0], 0)); // Creating documents to test with. Different gids, 2 locations. std::vector<document::Document::SP > documents; for (uint32_t i=0; i<20; ++i) { std::string content("Here is some content which is in all documents"); std::ostringstream uri; - uri << "userdoc:footype:" << (i % 3 == 0 ? 0x10001 : 0x0100001) - << ":mydoc-" << i; - Document::SP doc(createDocument( - content, uri.str()).release()); + uri << "userdoc:footype:" << (i % 3 == 0 ? 0x10001 : 0x0100001) << ":mydoc-" << i; + Document::SP doc(createDocument(content, uri.str()).release()); documents.push_back(doc); } document::BucketIdFactory factory; @@ -1826,36 +1571,26 @@ FileStorManagerTest::testJoin() { // Populate bucket with the given data for (uint32_t i=0; i<documents.size(); ++i) { - document::BucketId bucket(17, factory.getBucketId( - documents[i]->getId()).getRawId()); - std::shared_ptr<api::PutCommand> cmd( - new api::PutCommand(makeDocumentBucket(bucket), documents[i], 100 + i)); - std::unique_ptr<api::StorageMessageAddress> address( - new api::StorageMessageAddress( - "storage", lib::NodeType::STORAGE, 3)); + document::BucketId bucket(17, factory.getBucketId(documents[i]->getId()).getRawId()); + auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bucket), documents[i], 100 + i); + auto address = std::make_unique<api::StorageMessageAddress>("storage", lib::NodeType::STORAGE, 3); cmd->setAddress(*address); filestorHandler.schedule(cmd, 0); filestorHandler.flush(true); CPPUNIT_ASSERT_EQUAL((size_t) 1, top.getNumReplies()); - std::shared_ptr<api::PutReply> reply( - std::dynamic_pointer_cast<api::PutReply>( - top.getReply(0))); + auto reply = std::dynamic_pointer_cast<api::PutReply>(top.getReply(0)); CPPUNIT_ASSERT(reply.get()); - CPPUNIT_ASSERT_EQUAL(ReturnCode(ReturnCode::OK), - reply->getResult()); + CPPUNIT_ASSERT_EQUAL(ReturnCode(ReturnCode::OK), reply->getResult()); top.reset(); // Delete every 5th document to have delete entries in file too if (i % 5 == 0) { - std::shared_ptr<api::RemoveCommand> rcmd( - new api::RemoveCommand( - makeDocumentBucket(bucket), documents[i]->getId(), 1000000 + 100 + i)); + auto rcmd = std::make_shared<api::RemoveCommand>(makeDocumentBucket(bucket), + documents[i]->getId(), 1000000 + 100 + i); rcmd->setAddress(*address); filestorHandler.schedule(rcmd, 0); filestorHandler.flush(true); CPPUNIT_ASSERT_EQUAL((size_t) 1, top.getNumReplies()); - std::shared_ptr<api::RemoveReply> rreply( - std::dynamic_pointer_cast<api::RemoveReply>( - top.getReply(0))); + auto rreply = std::dynamic_pointer_cast<api::RemoveReply>(top.getReply(0)); CPPUNIT_ASSERT_MSG(top.getReply(0)->getType().toString(), rreply.get()); CPPUNIT_ASSERT_EQUAL(ReturnCode(ReturnCode::OK), diff --git a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp index 95642ac6215..64ef48b5719 100644 --- a/storage/src/tests/persistence/filestorage/operationabortingtest.cpp +++ b/storage/src/tests/persistence/filestorage/operationabortingtest.cpp @@ -43,10 +43,8 @@ public: _deleteBucketInvocations(0) {} - spi::Result put(const spi::Bucket& bucket, - spi::Timestamp timestamp, - const document::Document::SP& doc, - spi::Context& context) override + spi::Result put(const spi::Bucket& bucket, spi::Timestamp timestamp, + const document::Document::SP& doc, spi::Context& context) override { (void) bucket; (void) timestamp; @@ -90,26 +88,20 @@ public: void setupDisks(uint32_t diskCount, uint32_t queueBarrierThreads) { FileStorTestFixture::setupDisks(diskCount); - _dummyProvider.reset(new spi::dummy::DummyPersistence( - _node->getTypeRepo(), diskCount)); + _dummyProvider.reset(new spi::dummy::DummyPersistence(_node->getTypeRepo(), diskCount)); _queueBarrier.reset(new vespalib::Barrier(queueBarrierThreads)); _completionBarrier.reset(new vespalib::Barrier(2)); - _blockingProvider = new BlockingMockProvider(*_dummyProvider, - *_queueBarrier, *_completionBarrier); - - _node->setPersistenceProvider( - spi::PersistenceProvider::UP(_blockingProvider)); + _blockingProvider = new BlockingMockProvider(*_dummyProvider, *_queueBarrier, *_completionBarrier); + _node->setPersistenceProvider(spi::PersistenceProvider::UP(_blockingProvider)); } - void validateReplies(DummyStorageLink& link, - size_t repliesTotal, + void validateReplies(DummyStorageLink& link, size_t repliesTotal, const std::vector<document::BucketId>& okReplies, const std::vector<document::BucketId>& abortedGetDiffs); - void doTestSpecificOperationsNotAborted( - const char* testName, - const std::vector<api::StorageMessage::SP>& msgs, - bool shouldCreateBucketInitially); + void doTestSpecificOperationsNotAborted(const char* testName, + const std::vector<api::StorageMessage::SP>& msgs, + bool shouldCreateBucketInitially); api::BucketInfo getBucketInfoFromDB(const document::BucketId&) const; @@ -138,8 +130,7 @@ namespace { template <typename T, typename Collection> bool existsIn(const T& elem, const Collection& collection) { - return (std::find(collection.begin(), collection.end(), elem) - != collection.end()); + return (std::find(collection.begin(), collection.end(), elem) != collection.end()); } } @@ -150,18 +141,15 @@ OperationAbortingTest::setUp() } void -OperationAbortingTest::validateReplies( - DummyStorageLink& link, - size_t repliesTotal, - const std::vector<document::BucketId>& okReplies, - const std::vector<document::BucketId>& abortedGetDiffs) +OperationAbortingTest::validateReplies(DummyStorageLink& link, size_t repliesTotal, + const std::vector<document::BucketId>& okReplies, + const std::vector<document::BucketId>& abortedGetDiffs) { link.waitForMessages(repliesTotal, MSG_WAIT_TIME); CPPUNIT_ASSERT_EQUAL(repliesTotal, link.getNumReplies()); for (uint32_t i = 0; i < repliesTotal; ++i) { - api::StorageReply& reply( - dynamic_cast<api::StorageReply&>(*link.getReply(i))); + api::StorageReply& reply(dynamic_cast<api::StorageReply&>(*link.getReply(i))); LOG(info, "Checking reply %s", reply.toString(true).c_str()); switch (static_cast<uint32_t>(reply.getType().getId())) { case api::MessageType::PUT_REPLY_ID: @@ -222,11 +210,8 @@ template <typename Container> AbortBucketOperationsCommand::SP makeAbortCmd(const Container& buckets) { - std::unique_ptr<AbortBucketOperationsCommand::AbortPredicate> pred( - new ExplicitBucketSetPredicate(buckets.begin(), buckets.end())); - AbortBucketOperationsCommand::SP cmd( - new AbortBucketOperationsCommand(std::move(pred))); - return cmd; + auto pred = std::make_unique<ExplicitBucketSetPredicate>(buckets.begin(), buckets.end()); + return std::make_shared<AbortBucketOperationsCommand>(std::move(pred)); } } @@ -320,8 +305,7 @@ public: void OperationAbortingTest::testWaitForCurrentOperationCompletionForAbortedBucket() { - uint32_t queueBarrierThreads = 3; - setupDisks(1, queueBarrierThreads); + setupDisks(1, 3); TestFileStorComponents c(*this, "testWaitForCurrentOperationCompletionForAbortedBucket"); document::BucketId bucket(16, 1); @@ -350,10 +334,8 @@ OperationAbortingTest::testWaitForCurrentOperationCompletionForAbortedBucket() // reply, as it must finish processing fully before the abort returns. c.top.waitForMessages(2, MSG_WAIT_TIME); CPPUNIT_ASSERT_EQUAL(size_t(2), c.top.getNumReplies()); - CPPUNIT_ASSERT_EQUAL(api::MessageType::PUT_REPLY, - c.top.getReply(0)->getType()); - CPPUNIT_ASSERT_EQUAL(api::MessageType::INTERNAL_REPLY, - c.top.getReply(1)->getType()); + CPPUNIT_ASSERT_EQUAL(api::MessageType::PUT_REPLY, c.top.getReply(0)->getType()); + CPPUNIT_ASSERT_EQUAL(api::MessageType::INTERNAL_REPLY, c.top.getReply(1)->getType()); } void @@ -364,10 +346,7 @@ OperationAbortingTest::testDoNotAbortCreateBucketCommands() msgs.push_back(api::StorageMessage::SP(new api::CreateBucketCommand(makeDocumentBucket(bucket)))); bool shouldCreateBucketInitially(false); - doTestSpecificOperationsNotAborted( - "testDoNotAbortCreateBucketCommands", - msgs, - shouldCreateBucketInitially); + doTestSpecificOperationsNotAborted("testDoNotAbortCreateBucketCommands", msgs, shouldCreateBucketInitially); } void @@ -378,18 +357,14 @@ OperationAbortingTest::testDoNotAbortRecheckBucketCommands() msgs.push_back(api::StorageMessage::SP(new RecheckBucketInfoCommand(makeDocumentBucket(bucket)))); bool shouldCreateBucketInitially(true); - doTestSpecificOperationsNotAborted( - "testDoNotAbortRecheckBucketCommands", - msgs, - shouldCreateBucketInitially); + doTestSpecificOperationsNotAborted("testDoNotAbortRecheckBucketCommands", msgs, shouldCreateBucketInitially); } api::BucketInfo OperationAbortingTest::getBucketInfoFromDB(const document::BucketId& id) const { StorBucketDatabase::WrappedEntry entry( - _node->getStorageBucketDatabase().get(id, "foo", - StorBucketDatabase::CREATE_IF_NONEXISTING)); + _node->getStorageBucketDatabase().get(id, "foo", StorBucketDatabase::CREATE_IF_NONEXISTING)); CPPUNIT_ASSERT(entry.exist()); return entry->info; } @@ -403,20 +378,15 @@ OperationAbortingTest::testDoNotAbortDeleteBucketCommands() msgs.push_back(cmd); bool shouldCreateBucketInitially(true); - doTestSpecificOperationsNotAborted( - "testDoNotAbortRecheckBucketCommands", - msgs, - shouldCreateBucketInitially); + doTestSpecificOperationsNotAborted("testDoNotAbortRecheckBucketCommands", msgs, shouldCreateBucketInitially); } void -OperationAbortingTest::doTestSpecificOperationsNotAborted( - const char* testName, - const std::vector<api::StorageMessage::SP>& msgs, - bool shouldCreateBucketInitially) +OperationAbortingTest::doTestSpecificOperationsNotAborted(const char* testName, + const std::vector<api::StorageMessage::SP>& msgs, + bool shouldCreateBucketInitially) { - uint32_t queueBarrierThreads = 2; - setupDisks(1, queueBarrierThreads); + setupDisks(1, 2); TestFileStorComponents c(*this, testName); document::BucketId bucket(16, 1); document::BucketId blockerBucket(16, 2); @@ -443,8 +413,7 @@ OperationAbortingTest::doTestSpecificOperationsNotAborted( break; case api::MessageType::DELETEBUCKET_ID: { - api::DeleteBucketCommand& delCmd( - dynamic_cast<api::DeleteBucketCommand&>(*msgs[i])); + api::DeleteBucketCommand& delCmd(dynamic_cast<api::DeleteBucketCommand&>(*msgs[i])); delCmd.setBucketInfo(getBucketInfoFromDB(delCmd.getBucketId())); } ++expectedDeleteBuckets; @@ -473,8 +442,7 @@ OperationAbortingTest::doTestSpecificOperationsNotAborted( c.sendDummyGet(blockerBucket); // put+abort+get + any other creates/deletes/rechecks - size_t expectedMsgs(3 + expectedCreateBuckets + expectedDeleteBuckets - + expectedRecheckReplies); + size_t expectedMsgs(3 + expectedCreateBuckets + expectedDeleteBuckets + expectedRecheckReplies); LOG(info, "barrier passed, waiting for %zu replies", expectedMsgs); std::vector<document::BucketId> okReplies; @@ -483,13 +451,10 @@ OperationAbortingTest::doTestSpecificOperationsNotAborted( std::vector<document::BucketId> abortedGetDiffs; validateReplies(c.top, expectedMsgs, okReplies, abortedGetDiffs); - CPPUNIT_ASSERT_EQUAL(expectedBucketInfoInvocations, - _blockingProvider->_bucketInfoInvocations); + CPPUNIT_ASSERT_EQUAL(expectedBucketInfoInvocations, _blockingProvider->_bucketInfoInvocations); CPPUNIT_ASSERT_EQUAL(expectedCreateBuckets + (shouldCreateBucketInitially ? 2 : 1), _blockingProvider->_createBucketInvocations); - CPPUNIT_ASSERT_EQUAL(expectedDeleteBuckets, - _blockingProvider->_deleteBucketInvocations); + CPPUNIT_ASSERT_EQUAL(expectedDeleteBuckets, _blockingProvider->_deleteBucketInvocations); } - - + } // storage diff --git a/storage/src/tests/persistence/persistencequeuetest.cpp b/storage/src/tests/persistence/persistencequeuetest.cpp index 77e0ed6b71c..f5f190ad718 100644 --- a/storage/src/tests/persistence/persistencequeuetest.cpp +++ b/storage/src/tests/persistence/persistencequeuetest.cpp @@ -77,9 +77,7 @@ PersistenceQueueTest::testFetchNextUnlockedMessageIfBucketLocked() metrics.initDiskMetrics(_node->getPartitions().size(), loadTypes.getMetricLoadTypes(), 1); - FileStorHandler filestorHandler(messageSender, metrics, - _node->getPartitions(), - _node->getComponentRegister(), 255, 0); + FileStorHandler filestorHandler(messageSender, metrics, _node->getPartitions(), _node->getComponentRegister()); // Send 2 puts, 2 to the first bucket, 1 to the second. Calling // getNextMessage 2 times should then return a lock on the first bucket, @@ -89,17 +87,15 @@ PersistenceQueueTest::testFetchNextUnlockedMessageIfBucketLocked() filestorHandler.schedule(createPut(1234, 1), 0); filestorHandler.schedule(createPut(5432, 0), 0); - auto lock0 = filestorHandler.getNextMessage(0, 255); + auto lock0 = filestorHandler.getNextMessage(0); CPPUNIT_ASSERT(lock0.first.get()); - CPPUNIT_ASSERT_EQUAL( - document::BucketId(16, 1234), - dynamic_cast<api::PutCommand&>(*lock0.second).getBucketId()); + CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 1234), + dynamic_cast<api::PutCommand&>(*lock0.second).getBucketId()); - auto lock1 = filestorHandler.getNextMessage(0, 255); + auto lock1 = filestorHandler.getNextMessage(0); CPPUNIT_ASSERT(lock1.first.get()); - CPPUNIT_ASSERT_EQUAL( - document::BucketId(16, 5432), - dynamic_cast<api::PutCommand&>(*lock1.second).getBucketId()); + CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 5432), + dynamic_cast<api::PutCommand&>(*lock1.second).getBucketId()); } } // namespace storage diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp index 9d3a3d5f008..d46708b3aaa 100644 --- a/storage/src/tests/persistence/persistencetestutils.cpp +++ b/storage/src/tests/persistence/persistencetestutils.cpp @@ -52,21 +52,13 @@ PersistenceTestEnvironment::PersistenceTestEnvironment(DiskCount numDisks, const _node.setupDummyPersistence(); _metrics.initDiskMetrics( numDisks, _node.getLoadTypes()->getMetricLoadTypes(), 1); - _handler.reset(new FileStorHandler( - _messageKeeper, _metrics, - _node.getPersistenceProvider().getPartitionStates().getList(), - _node.getComponentRegister(), 255, 0)); + _handler.reset(new FileStorHandler(_messageKeeper, _metrics, + _node.getPersistenceProvider().getPartitionStates().getList(), + _node.getComponentRegister())); for (uint32_t i = 0; i < numDisks; i++) { _diskEnvs.push_back( - std::unique_ptr<PersistenceUtil>( - new PersistenceUtil( - _config.getConfigId(), - _node.getComponentRegister(), - *_handler, - *_metrics.disks[i]->threads[0], - i, - 255, - _node.getPersistenceProvider()))); + std::make_unique<PersistenceUtil>(_config.getConfigId(), _node.getComponentRegister(), *_handler, + *_metrics.disks[i]->threads[0], i, _node.getPersistenceProvider())); } } @@ -79,8 +71,7 @@ PersistenceTestUtils::~PersistenceTestUtils() } std::string -PersistenceTestUtils::dumpBucket(const document::BucketId& bid, - uint16_t disk) { +PersistenceTestUtils::dumpBucket(const document::BucketId& bid, uint16_t disk) { return dynamic_cast<spi::dummy::DummyPersistence&>(_env->_node.getPersistenceProvider()).dumpBucket(makeSpiBucket(bid, spi::PartitionId(disk))); } @@ -92,14 +83,9 @@ PersistenceTestUtils::setupDisks(uint32_t numDisks) { std::unique_ptr<PersistenceThread> PersistenceTestUtils::createPersistenceThread(uint32_t disk) { - return std::unique_ptr<PersistenceThread>( - new PersistenceThread(_env->_node.getComponentRegister(), - _env->_config.getConfigId(), - getPersistenceProvider(), - getEnv()._fileStorHandler, - getEnv()._metrics, - disk, - 255)); + return std::make_unique<PersistenceThread>(_env->_node.getComponentRegister(), _env->_config.getConfigId(), + getPersistenceProvider(), getEnv()._fileStorHandler, + getEnv()._metrics, disk); } document::Document::SP diff --git a/storage/src/vespa/storage/common/storagelink.cpp b/storage/src/vespa/storage/common/storagelink.cpp index 8636628c7cc..b53c8c270f2 100644 --- a/storage/src/vespa/storage/common/storagelink.cpp +++ b/storage/src/vespa/storage/common/storagelink.cpp @@ -14,21 +14,16 @@ using namespace storage::api; namespace storage { -StorageLink::~StorageLink() -{ -} +StorageLink::~StorageLink() = default; void StorageLink::push_back(StorageLink::UP link) { if (_state != CREATED) { - LOG(error, "Attempted to alter chain by adding link %s after link %s " - "while state is %s", - link->toString().c_str(), - toString().c_str(), - stateToString(_state)); + LOG(error, "Attempted to alter chain by adding link %s after link %s while state is %s", + link->toString().c_str(), toString().c_str(), stateToString(_state)); assert(false); } - assert(link.get()); + assert(link); if (isBottom()) { link->_up = this; _down = std::move(link); @@ -39,26 +34,24 @@ void StorageLink::push_back(StorageLink::UP link) void StorageLink::open() { - // First tag all states as opened, as components are allowed to send - // messages both ways in onOpen call, in case any component send message - // up, the link receiving them should have their state as opened. + // First tag all states as opened, as components are allowed to send + // messages both ways in onOpen call, in case any component send message + // up, the link receiving them should have their state as opened. StorageLink* link = this; while (true) { if (link->_state != CREATED) { - LOG(error, "During open(), link %s should be in CREATED state, " - "not in state %s.", - toString().c_str(), - stateToString(link->_state)); + LOG(error, "During open(), link %s should be in CREATED state, not in state %s.", + toString().c_str(), stateToString(link->_state)); assert(false); } link->_state = OPENED; if (link->_down.get() == 0) break; link = link->_down.get(); } - // When give all links an onOpen call, bottoms up. Do it bottoms up, as - // links are more likely to send messages down in their onOpen() call - // than up. Thus, chances are best that the component is ready to - // receive messages sent during onOpen(). + // When give all links an onOpen call, bottoms up. Do it bottoms up, as + // links are more likely to send messages down in their onOpen() call + // than up. Thus, chances are best that the component is ready to + // receive messages sent during onOpen(). while (link != 0) { link->onOpen(); link = link->_up; @@ -91,34 +84,31 @@ void StorageLink::closeNextLink() { void StorageLink::flush() { if (_state != CLOSING) { - LOG(error, "During flush(), link %s should be in CLOSING state, " - "not in state %s.", - toString().c_str(), - stateToString(_state)); + LOG(error, "During flush(), link %s should be in CLOSING state, not in state %s.", + toString().c_str(), stateToString(_state)); assert(false); } - // First flush down to get all requests out of the system. + // First flush down to get all requests out of the system. _state = FLUSHINGDOWN; LOG(debug, "Flushing link %s on the way down.", toString().c_str()); onFlush(true); LOG(debug, "Flushed link %s on the way down.", toString().c_str()); if (!isBottom()) { _down->flush(); - // Then flush up to get replies out of the system + // Then flush up to get replies out of the system LOG(debug, "Flushing link %s on the way back up.", toString().c_str()); _state = FLUSHINGUP; onFlush(false); LOG(debug, "Flushed link %s on the way back up.", toString().c_str()); } else { - // Then flush up to get replies out of the system + // Then flush up to get replies out of the system LOG(debug, "Flushing link %s on the way back up.", toString().c_str()); _state = FLUSHINGUP; onFlush(false); LOG(debug, "Flushed link %s on the way back up.", toString().c_str()); } _state = CLOSED; - LOG(debug, "Link %s is now closed and should do nothing more.", - toString().c_str()); + LOG(debug, "Link %s is now closed and should do nothing more.", toString().c_str()); } void StorageLink::sendDown(const StorageMessage::SP& msg) @@ -130,58 +120,31 @@ void StorageLink::sendDown(const StorageMessage::SP& msg) case FLUSHINGDOWN: break; default: - LOG(error, - "Link %s trying to send %s down while in state %s", - toString().c_str(), - msg->toString().c_str(), - stateToString(_state)); + LOG(error, "Link %s trying to send %s down while in state %s", + toString().c_str(), msg->toString().c_str(), stateToString(_state)); assert(false); } assert(msg.get()); - LOG(spam, "Storage Link %s to handle %s", - toString().c_str(), msg->toString().c_str()); + LOG(spam, "Storage Link %s to handle %s", toString().c_str(), msg->toString().c_str()); if (isBottom()) { - LOG(spam, "Storage link %s at bottom of chain got message %s.", - toString().c_str(), msg->toString().c_str()); - /* - if (isFlush(msg)) { + LOG(spam, "Storage link %s at bottom of chain got message %s.", toString().c_str(), msg->toString().c_str()); + ostringstream ost; + ost << "Unhandled message at bottom of chain " << *msg << " (message type " + << msg->getType().getName() << "). " << vespalib::getStackTrace(0); + if (!msg->getType().isReply()) { + LOGBP(warning, "%s", ost.str().c_str()); StorageCommand& cmd = static_cast<StorageCommand&>(*msg); shared_ptr<StorageReply> reply(cmd.makeReply().release()); if (reply.get()) { + reply->setResult(ReturnCode(ReturnCode::NOT_IMPLEMENTED, msg->getType().getName())); sendUp(reply); } } else { - */ - ostringstream ost; - ost << "Unhandled message at bottom of chain " - << *msg << " (message type " - << msg->getType().getName() - << "). " - << vespalib::getStackTrace(0); - if (!msg->getType().isReply()) { - //if (!_closed) { - LOGBP(warning, "%s", ost.str().c_str()); - //} - StorageCommand& cmd = static_cast<StorageCommand&>(*msg); - shared_ptr<StorageReply> reply(cmd.makeReply().release()); - - if (reply.get()) { - reply->setResult(ReturnCode(ReturnCode::NOT_IMPLEMENTED, - msg->getType().getName())); - sendUp(reply); - } - } else { - ost << " Return code: " - << static_cast<StorageReply&>(*msg).getResult(); - //if (!_closed) { - LOGBP(warning, "%s", ost.str().c_str()); - //} - } - //} + ost << " Return code: " << static_cast<StorageReply&>(*msg).getResult(); + LOGBP(warning, "%s", ost.str().c_str()); + } } else if (!_down->onDown(msg)) { - //LOG(spam, "Storage link %s forwarding message %s.", - // toString().c_str(), msg->toString().c_str()); _down->sendDown(msg); } else { LOG(spam, "Storage link %s handled message %s.", @@ -191,7 +154,7 @@ void StorageLink::sendDown(const StorageMessage::SP& msg) void StorageLink::sendUp(const shared_ptr<StorageMessage> & msg) { - // Verify acceptable state to send messages up + // Verify acceptable state to send messages up switch(_state) { case OPENED: case CLOSING: @@ -199,48 +162,28 @@ void StorageLink::sendUp(const shared_ptr<StorageMessage> & msg) case FLUSHINGUP: break; default: - LOG(error, - "Link %s trying to send %s up while in state %s", - toString().c_str(), - msg->toString(true).c_str(), - stateToString(_state)); + LOG(error, "Link %s trying to send %s up while in state %s", + toString().c_str(), msg->toString(true).c_str(), stateToString(_state)); assert(false); } assert(msg.get()); if (isTop()) { - /* - if (isFlush(msg)) { + ostringstream ost; + ost << "Unhandled message at top of chain " << *msg << "."; + ost << vespalib::getStackTrace(0); + if (!msg->getType().isReply()) { + LOGBP(warning, "%s", ost.str().c_str()); StorageCommand& cmd = static_cast<StorageCommand&>(*msg); shared_ptr<StorageReply> reply(cmd.makeReply().release()); if (reply.get()) { + reply->setResult(ReturnCode(ReturnCode::NOT_IMPLEMENTED, msg->getType().getName())); sendDown(reply); } } else { - */ - ostringstream ost; - ost << "Unhandled message at top of chain " << *msg << "."; - ost << vespalib::getStackTrace(0); - if (!msg->getType().isReply()) { - //if (!_closed) { - LOGBP(warning, "%s", ost.str().c_str()); - //} - StorageCommand& cmd = static_cast<StorageCommand&>(*msg); - shared_ptr<StorageReply> reply(cmd.makeReply().release()); - - if (reply.get()) { - reply->setResult(ReturnCode(ReturnCode::NOT_IMPLEMENTED, - msg->getType().getName())); - sendDown(reply); - } - } else { - ost << " Return code: " - << static_cast<StorageReply&>(*msg).getResult(); - //if (!_closed) { - LOGBP(warning, "%s", ost.str().c_str()); - //} - } - //} + ost << " Return code: " << static_cast<StorageReply&>(*msg).getResult(); + LOGBP(warning, "%s", ost.str().c_str()); + } } else if (!_up->onUp(msg)) { _up->sendUp(msg); } @@ -261,19 +204,7 @@ void StorageLink::printChain(std::ostream& out, std::string indent) const { bool StorageLink::onDown(const shared_ptr<StorageMessage> & msg) { - //LOG(spam, "Checking if storage link %s handles %s.", - // toString().c_str(), msg->toString().c_str()); - bool result = msg->callHandler(*this, msg); - /* - if (result) { - LOG(spam, "Storage link %s handled message %s.", - toString().c_str(), msg->toString().c_str()); - } else { - LOG(spam, "Storage link %s did not handle message %s.", - toString().c_str(), msg->toString().c_str()); - } - */ - return result; + return msg->callHandler(*this, msg); } bool StorageLink::onUp(const shared_ptr<StorageMessage> & msg) diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp index e23bfda192c..949ceb901e1 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp @@ -7,12 +7,8 @@ namespace storage { FileStorHandler::FileStorHandler(MessageSender& sender, FileStorMetrics& metrics, const spi::PartitionStateList& partitions, - ServiceLayerComponentRegister& compReg, - uint8_t maxPriorityToBlock, - uint8_t minPriorityToBeBlocking) - : _impl(new FileStorHandlerImpl( - sender, metrics, partitions, compReg, - maxPriorityToBlock, minPriorityToBeBlocking)) + ServiceLayerComponentRegister& compReg) + : _impl(new FileStorHandlerImpl(sender, metrics, partitions, compReg)) { } @@ -57,23 +53,16 @@ FileStorHandler::schedule(const api::StorageMessage::SP& msg, uint16_t thread) return _impl->schedule(msg, thread); } -void -FileStorHandler::pause(uint16_t disk, uint8_t priority) const { - return _impl->pause(disk, priority); -} - FileStorHandler::LockedMessage -FileStorHandler::getNextMessage(uint16_t thread, uint8_t lowestPriority) +FileStorHandler::getNextMessage(uint16_t thread) { - return _impl->getNextMessage(thread, lowestPriority); + return _impl->getNextMessage(thread); } FileStorHandler::LockedMessage & -FileStorHandler::getNextMessage(uint16_t thread, - LockedMessage& lck, - uint8_t lowestPriority) +FileStorHandler::getNextMessage(uint16_t thread, LockedMessage& lck) { - return _impl->getNextMessage(thread, lck, lowestPriority); + return _impl->getNextMessage(thread, lck); } FileStorHandler::BucketLockInterface::SP @@ -89,30 +78,23 @@ FileStorHandler::remapQueueAfterDiskMove( { RemapInfo target(bucket, targetDisk); - _impl->remapQueue(RemapInfo(bucket, sourceDisk), target, - FileStorHandlerImpl::MOVE); + _impl->remapQueue(RemapInfo(bucket, sourceDisk), target, FileStorHandlerImpl::MOVE); } void -FileStorHandler::remapQueueAfterJoin( - const RemapInfo& source, - RemapInfo& target) +FileStorHandler::remapQueueAfterJoin(const RemapInfo& source,RemapInfo& target) { _impl->remapQueue(source, target, FileStorHandlerImpl::JOIN); } void -FileStorHandler::remapQueueAfterSplit( - const RemapInfo& source, - RemapInfo& target1, - RemapInfo& target2) +FileStorHandler::remapQueueAfterSplit(const RemapInfo& source,RemapInfo& target1, RemapInfo& target2) { _impl->remapQueue(source, target1, target2, FileStorHandlerImpl::SPLIT); } void -FileStorHandler::failOperations(const document::Bucket &bucket, - uint16_t fromDisk, const api::ReturnCode& err) +FileStorHandler::failOperations(const document::Bucket &bucket, uint16_t fromDisk, const api::ReturnCode& err) { _impl->failOperations(bucket, fromDisk, err); } @@ -130,8 +112,7 @@ FileStorHandler::sendReply(const api::StorageReply::SP& msg) } void -FileStorHandler::getStatus(std::ostream& out, - const framework::HttpUrlPath& path) const +FileStorHandler::getStatus(std::ostream& out, const framework::HttpUrlPath& path) const { _impl->getStatus(out, path); } @@ -149,8 +130,7 @@ FileStorHandler::getQueueSize(uint16_t disk) const } void -FileStorHandler::addMergeStatus(const document::Bucket& bucket, - MergeStatus::SP ms) +FileStorHandler::addMergeStatus(const document::Bucket& bucket, MergeStatus::SP ms) { return _impl->addMergeStatus(bucket, ms); } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h index 6e2d6a0fc07..2cfc97ca71b 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h @@ -15,7 +15,6 @@ #include "mergestatus.h" #include <vespa/document/bucket/bucket.h> -#include <ostream> #include <vespa/storage/storageutil/resumeguard.h> #include <vespa/storage/common/messagesender.h> @@ -72,12 +71,7 @@ public: CLOSED }; - FileStorHandler(MessageSender&, - FileStorMetrics&, - const spi::PartitionStateList&, - ServiceLayerComponentRegister&, - uint8_t maxPriorityToBlock, - uint8_t minPriorityToBeBlocking); + FileStorHandler(MessageSender&, FileStorMetrics&, const spi::PartitionStateList&, ServiceLayerComponentRegister&); ~FileStorHandler(); // Commands used by file stor manager @@ -115,32 +109,19 @@ public: * Schedule a storage message to be processed by the given disk * @return True if we maanged to schedule operation. False if not */ - bool schedule(const std::shared_ptr<api::StorageMessage>&, - uint16_t disk); - - // Commands used by file stor threads - - /** - * When called, checks if any running operations have "preempting" - * priority. If so, and the given priority is less than that, this call - * will hang until the other operation is done. - */ - void pause(uint16_t disk, uint8_t priority) const; + bool schedule(const std::shared_ptr<api::StorageMessage>&, uint16_t disk); /** * Used by file stor threads to get their next message to process. * * @param disk The disk to get messages for - * @param lowestPriority The lowest priority of operation we should return */ - LockedMessage getNextMessage(uint16_t disk, uint8_t lowestPriority); + LockedMessage getNextMessage(uint16_t disk); /** * Returns the next message for the same bucket. */ - LockedMessage & getNextMessage(uint16_t disk, - LockedMessage& lock, - uint8_t lowestPriority); + LockedMessage & getNextMessage(uint16_t disk, LockedMessage& lock); /** * Lock a bucket. By default, each file stor thread has the locks of all @@ -219,8 +200,7 @@ public: * Fail all operations towards a single bucket currently queued to the * given thread with the given error code. */ - void failOperations(const document::Bucket&, uint16_t fromDisk, - const api::ReturnCode&); + void failOperations(const document::Bucket&, uint16_t fromDisk, const api::ReturnCode&); /** * Add a new merge state to the registry. diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index c8b5c71ee2e..5eb168a9a42 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -23,20 +23,14 @@ using document::BucketSpace; namespace storage { -FileStorHandlerImpl::FileStorHandlerImpl( - MessageSender& sender, - FileStorMetrics& metrics, - const spi::PartitionStateList& partitions, - ServiceLayerComponentRegister& compReg, - uint8_t maxPriorityToBlock, - uint8_t minPriorityToBeBlocking) +FileStorHandlerImpl::FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics, + const spi::PartitionStateList& partitions, + ServiceLayerComponentRegister& compReg) : _partitions(partitions), _component(compReg, "filestorhandlerimpl"), _diskInfo(_component.getDiskCount()), _messageSender(sender), _bucketIdFactory(_component.getBucketIdFactory()), - _maxPriorityToBlock(maxPriorityToBlock), - _minPriorityToBeBlocking(minPriorityToBeBlocking), _getNextMessageTimeout(100), _paused(false) { @@ -46,23 +40,21 @@ FileStorHandlerImpl::FileStorHandlerImpl( } if (_diskInfo.size() == 0) { - throw vespalib::IllegalArgumentException( - "No disks configured", VESPA_STRLOC); + throw vespalib::IllegalArgumentException("No disks configured", VESPA_STRLOC); } // Add update hook, so we will get callbacks each 5 seconds to update // metrics. _component.registerMetricUpdateHook(*this, framework::SecondTime(5)); } -FileStorHandlerImpl::~FileStorHandlerImpl() { } +FileStorHandlerImpl::~FileStorHandlerImpl() = default; void FileStorHandlerImpl::addMergeStatus(const document::Bucket& bucket, MergeStatus::SP status) { vespalib::LockGuard mlock(_mergeStatesLock); if (_mergeStates.find(bucket) != _mergeStates.end()) {; - LOG(warning, "A merge status already existed for %s. Overwriting it.", - bucket.toString().c_str()); + LOG(warning, "A merge status already existed for %s. Overwriting it.", bucket.toString().c_str()); } _mergeStates[bucket] = status; } @@ -73,8 +65,7 @@ FileStorHandlerImpl::editMergeStatus(const document::Bucket& bucket) vespalib::LockGuard mlock(_mergeStatesLock); MergeStatus::SP status = _mergeStates[bucket]; if (status.get() == 0) { - throw vespalib::IllegalStateException( - "No merge state exist for " + bucket.toString(), VESPA_STRLOC); + throw vespalib::IllegalStateException("No merge state exist for " + bucket.toString(), VESPA_STRLOC); } return *status; } @@ -94,8 +85,7 @@ FileStorHandlerImpl::getNumActiveMerges() const } void -FileStorHandlerImpl::clearMergeStatus(const document::Bucket& bucket, - const api::ReturnCode* code) +FileStorHandlerImpl::clearMergeStatus(const document::Bucket& bucket, const api::ReturnCode* code) { vespalib::LockGuard mlock(_mergeStatesLock); auto it = _mergeStates.find(bucket); @@ -153,10 +143,9 @@ FileStorHandlerImpl::flush(bool killPendingMerges) if (killPendingMerges) { api::ReturnCode code(api::ReturnCode::ABORTED, "Storage node is shutting down"); - for (std::map<document::Bucket, MergeStatus::SP>::iterator it - = _mergeStates.begin(); it != _mergeStates.end(); ++it) + for (auto & entry : _mergeStates) { - MergeStatus& s(*it->second); + MergeStatus& s(*entry.second); if (s.pendingGetDiff.get() != 0) { s.pendingGetDiff->setResult(code); _messageSender.sendReply(s.pendingGetDiff); @@ -182,11 +171,9 @@ FileStorHandlerImpl::reply(api::StorageMessage& msg, std::shared_ptr<api::StorageReply> rep( static_cast<api::StorageCommand&>(msg).makeReply().release()); if (state == FileStorHandler::DISABLED) { - rep->setResult(api::ReturnCode( - api::ReturnCode::DISK_FAILURE, "Disk disabled")); + rep->setResult(api::ReturnCode(api::ReturnCode::DISK_FAILURE, "Disk disabled")); } else { - rep->setResult(api::ReturnCode( - api::ReturnCode::ABORTED, "Shutting down storage node.")); + rep->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Shutting down storage node.")); } _messageSender.sendReply(rep); } @@ -270,29 +257,6 @@ FileStorHandlerImpl::schedule(const std::shared_ptr<api::StorageMessage>& msg, return true; } -void -FileStorHandlerImpl::pause(uint16_t disk, uint8_t priority) const { - if (priority < _maxPriorityToBlock) { - return; - } - - assert(disk < _diskInfo.size()); - const Disk& t(_diskInfo[disk]); - vespalib::MonitorGuard lockGuard(t.lock); - - bool paused = true; - while (paused) { - paused = false; - for (auto& lockedBucket : t.lockedBuckets) { - if (lockedBucket.second.priority <= _minPriorityToBeBlocking) { - paused = true; - lockGuard.wait(); - break; - } - } - } -} - bool FileStorHandlerImpl::messageMayBeAborted(const api::StorageMessage& msg) const { @@ -395,18 +359,6 @@ FileStorHandlerImpl::abortQueuedOperations( } } -bool -FileStorHandlerImpl::hasBlockingOperations(const Disk& t) const -{ - for (auto& lockedBucket : t.lockedBuckets) { - if (lockedBucket.second.priority <= _minPriorityToBeBlocking) { - return true; - } - } - - return false; -} - void FileStorHandlerImpl::updateMetrics(const MetricLockGuard &) { @@ -419,16 +371,11 @@ FileStorHandlerImpl::updateMetrics(const MetricLockGuard &) } FileStorHandler::LockedMessage & -FileStorHandlerImpl::getNextMessage(uint16_t disk, - FileStorHandler::LockedMessage& lck, - uint8_t maxPriority) +FileStorHandlerImpl::getNextMessage(uint16_t disk, FileStorHandler::LockedMessage& lck) { document::Bucket bucket(lck.first->getBucket()); - LOG(spam, - "Disk %d retrieving message for buffered bucket %s", - disk, - bucket.getBucketId().toString().c_str()); + LOG(spam, "Disk %d retrieving message for buffered bucket %s", disk, bucket.getBucketId().toString().c_str()); assert(disk < _diskInfo.size()); Disk& t(_diskInfo[disk]); @@ -451,22 +398,12 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, api::StorageMessage & m(*range.first->_command); mbus::Trace& trace = m.getTrace(); - // Priority is too low, not buffering any more. - if (m.getPriority() > maxPriority || m.getPriority() >= _maxPriorityToBlock) { - lck.second.reset(); - return lck; - } + MBUS_TRACE(trace, 9, "FileStorHandler: Message identified by disk thread looking for more requests to active bucket."); - MBUS_TRACE(trace, 9, - "FileStorHandler: Message identified by disk thread looking for " - "more requests to active bucket."); + uint64_t waitTime(const_cast<metrics::MetricTimer&>(range.first->_timer).stop( + t.metrics->averageQueueWaitingTime[m.getLoadType()])); - uint64_t waitTime( - const_cast<metrics::MetricTimer&>(range.first->_timer).stop( - t.metrics->averageQueueWaitingTime[m.getLoadType()])); - - LOG(debug, "Message %s waited %" PRIu64 " ms in storage queue (bucket %s), " - "timeout %d", + LOG(debug, "Message %s waited %" PRIu64 " ms in storage queue (bucket %s), timeout %d", m.toString().c_str(), waitTime, bucket.getBucketId().toString().c_str(), static_cast<api::StorageCommand&>(m).getTimeout()); @@ -480,15 +417,11 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, lockGuard.unlock(); return lck; } else { - std::shared_ptr<api::StorageReply> msgReply( - static_cast<api::StorageCommand&>(m) - .makeReply().release()); + std::shared_ptr<api::StorageReply> msgReply(static_cast<api::StorageCommand&>(m).makeReply().release()); idx.erase(range.first); lockGuard.broadcast(); lockGuard.unlock(); - msgReply->setResult(api::ReturnCode( - api::ReturnCode::TIMEOUT, - "Message waited too long in storage queue")); + msgReply->setResult(api::ReturnCode(api::ReturnCode::TIMEOUT, "Message waited too long in storage queue")); _messageSender.sendReply(msgReply); lck.second.reset(); @@ -517,23 +450,13 @@ FileStorHandlerImpl::diskIsClosed(uint16_t disk) const } bool -FileStorHandlerImpl::operationBlockedByHigherPriorityThread( - const api::StorageMessage& msg, - const Disk& disk) const -{ - return ((msg.getPriority() >= _maxPriorityToBlock) - && hasBlockingOperations(disk)); -} - -bool FileStorHandlerImpl::messageTimedOutInQueue(const api::StorageMessage& msg, uint64_t waitTime) const { if (msg.getType().isReply()) { return false; // Replies must always be processed and cannot time out. } - return (waitTime >= static_cast<const api::StorageCommand&>( - msg).getTimeout()); + return (waitTime >= static_cast<const api::StorageCommand&>(msg).getTimeout()); } std::unique_ptr<FileStorHandler::BucketLockInterface> @@ -543,8 +466,7 @@ FileStorHandlerImpl::takeDiskBucketLockOwnership( const document::Bucket &bucket, const api::StorageMessage& msg) { - return std::unique_ptr<FileStorHandler::BucketLockInterface>( - new BucketLock(guard, disk, bucket, msg.getPriority(), msg.getSummary())); + return std::make_unique<BucketLock>(guard, disk, bucket, msg.getPriority(), msg.getSummary()); } std::unique_ptr<api::StorageReply> @@ -564,23 +486,10 @@ namespace { bucketIsLockedOnDisk(const document::Bucket &id, const FileStorHandlerImpl::Disk &t) { return (id.getBucketId().getRawId() != 0 && t.isLocked(id)); } - - /** - * Return whether msg has sufficiently high priority that a thread with - * a configured priority threshold of maxPriority can even run in. - * Often, operations such as streaming searches will have dedicated threads - * that refuse lower priority operations such as Puts etc. - */ - bool - operationHasHighEnoughPriorityToBeRun(const api::StorageMessage& msg, uint8_t maxPriority) - { - // NOTE: priority integral value 0 is considered highest pri. - return (msg.getPriority() <= maxPriority); - } } FileStorHandler::LockedMessage -FileStorHandlerImpl::getNextMessage(uint16_t disk, uint8_t maxPriority) +FileStorHandlerImpl::getNextMessage(uint16_t disk) { assert(disk < _diskInfo.size()); if (!tryHandlePause(disk)) { @@ -602,12 +511,7 @@ FileStorHandlerImpl::getNextMessage(uint16_t disk, uint8_t maxPriority) iter++; } if (iter != end) { - api::StorageMessage &m(*iter->_command); - - if (operationHasHighEnoughPriorityToBeRun(m, maxPriority) - && ! operationBlockedByHigherPriorityThread(m, t) - && ! isPaused()) - { + if (! isPaused()) { return getMessage(lockGuard, t, idx, iter); } } @@ -655,22 +559,16 @@ FileStorHandlerImpl::lock(const document::Bucket &bucket, uint16_t disk) assert(disk < _diskInfo.size()); Disk& t(_diskInfo[disk]); - LOG(spam, - "Acquiring filestor lock for %s on disk %d", - bucket.getBucketId().toString().c_str(), - disk); + LOG(spam, "Acquiring filestor lock for %s on disk %d", bucket.getBucketId().toString().c_str(), disk); vespalib::MonitorGuard lockGuard(t.lock); while (bucket.getBucketId().getRawId() != 0 && t.isLocked(bucket)) { - LOG(spam, - "Contending for filestor lock for %s", - bucket.getBucketId().toString().c_str()); + LOG(spam, "Contending for filestor lock for %s", bucket.getBucketId().toString().c_str()); lockGuard.wait(100); } - std::shared_ptr<FileStorHandler::BucketLockInterface> locker( - new BucketLock(lockGuard, t, bucket, 255, "External lock")); + auto locker = std::make_shared<BucketLock>(lockGuard, t, bucket, 255, "External lock"); lockGuard.broadcast(); return locker; diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index 56ac9ea0577..6b6d154e149 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -142,12 +142,8 @@ public: document::Bucket _bucket; }; - FileStorHandlerImpl(MessageSender&, - FileStorMetrics&, - const spi::PartitionStateList&, - ServiceLayerComponentRegister&, - uint8_t maxPriorityToBlock, - uint8_t minPriorityToBeBlocking); + FileStorHandlerImpl(MessageSender&, FileStorMetrics&, + const spi::PartitionStateList&, ServiceLayerComponentRegister&); ~FileStorHandlerImpl(); void setGetNextMessageTimeout(uint32_t timeout) { _getNextMessageTimeout = timeout; } @@ -158,12 +154,10 @@ public: void close(); bool schedule(const std::shared_ptr<api::StorageMessage>&, uint16_t disk); - void pause(uint16_t disk, uint8_t priority) const; - FileStorHandler::LockedMessage getNextMessage(uint16_t disk, uint8_t lowestPriority); + FileStorHandler::LockedMessage getNextMessage(uint16_t disk); FileStorHandler::LockedMessage getMessage(vespalib::MonitorGuard & guard, Disk & t, PriorityIdx & idx, PriorityIdx::iterator iter); - FileStorHandler::LockedMessage & getNextMessage(uint16_t disk, FileStorHandler::LockedMessage& lock, - uint8_t lowestPriority); + FileStorHandler::LockedMessage & getNextMessage(uint16_t disk, FileStorHandler::LockedMessage& lock); enum Operation { MOVE, SPLIT, JOIN }; void remapQueue(const RemapInfo& source, RemapInfo& target, Operation op); @@ -204,8 +198,6 @@ private: std::map<document::Bucket, MergeStatus::SP> _mergeStates; - uint8_t _maxPriorityToBlock; - uint8_t _minPriorityToBeBlocking; uint32_t _getNextMessageTimeout; vespalib::Monitor _pauseMonitor; @@ -237,12 +229,6 @@ private: bool diskIsClosed(uint16_t disk) const; /** - * Return whether an already running high priority operation pre-empts - * (blocks) the operation in msg from even starting in the current thread. - */ - bool operationBlockedByHigherPriorityThread(const api::StorageMessage& msg, const Disk& disk) const; - - /** * Return whether msg has timed out based on waitTime and the message's * specified timeout. */ @@ -262,7 +248,6 @@ private: */ std::unique_ptr<api::StorageReply> makeQueueTimeoutReply(api::StorageMessage& msg) const; bool messageMayBeAborted(const api::StorageMessage& msg) const; - bool hasBlockingOperations(const Disk& t) const; void abortQueuedCommandsForBuckets(Disk& disk, const AbortBucketOperationsCommand& cmd); bool diskHasActiveOperationForAbortedBucket(const Disk& disk, const AbortBucketOperationsCommand& cmd) const; void waitUntilNoActiveOperationsForAbortedBuckets(Disk& disk, const AbortBucketOperationsCommand& cmd); @@ -287,4 +272,3 @@ private: }; } // storage - diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 6644d09da7f..b4735c2961a 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -29,10 +29,8 @@ using document::BucketSpace; namespace storage { FileStorManager:: -FileStorManager(const config::ConfigUri & configUri, - const spi::PartitionStateList& partitions, - spi::PersistenceProvider& provider, - ServiceLayerComponentRegister& compReg) +FileStorManager(const config::ConfigUri & configUri, const spi::PartitionStateList& partitions, + spi::PersistenceProvider& provider, ServiceLayerComponentRegister& compReg) : StorageLinkQueued("File store manager", compReg), framework::HtmlStatusReporter("filestorman", "File store manager"), _compReg(compReg), @@ -83,8 +81,7 @@ FileStorManager::~FileStorManager() } } } - LOG(debug, "Closing all filestor queues, answering queued messages. " - "New messages will be refused."); + LOG(debug, "Closing all filestor queues, answering queued messages. New messages will be refused."); _filestorHandler->close(); LOG(debug, "Deleting filestor threads. Waiting for their current operation " "to finish. Stop their threads and delete objects."); @@ -116,38 +113,16 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC _disks.resize(_component.getDiskCount()); - _metrics->initDiskMetrics( - _disks.size(), - _component.getLoadTypes()->getMetricLoadTypes(), - (_config->threads.size() > 0) ? (_config->threads.size()) : 6); + size_t numThreads = _config->numThreads; + _metrics->initDiskMetrics(_disks.size(), _component.getLoadTypes()->getMetricLoadTypes(), numThreads); - _filestorHandler.reset(new FileStorHandler( - *this, *_metrics, _partitions, _compReg, - _config->maxPriorityToBlock, _config->minPriorityToBeBlocking)); + _filestorHandler.reset(new FileStorHandler(*this, *_metrics, _partitions, _compReg)); for (uint32_t i=0; i<_component.getDiskCount(); ++i) { if (_partitions[i].isUp()) { - if (_config->threads.size() == 0) { - LOG(spam, "Setting up disk %u", i); - for (uint32_t j = 0; j < 4; j++) { - _disks[i].push_back(DiskThread::SP( - new PersistenceThread(_compReg, _configUri, *_provider, *_filestorHandler, - *_metrics->disks[i]->threads[j], i, 255))); - - } - for (uint32_t j = 4; j < 6; j++) { - _disks[i].push_back(DiskThread::SP( - new PersistenceThread(_compReg, _configUri, *_provider, *_filestorHandler, - *_metrics->disks[i]->threads[j], i, 100))); - } - } - - for (uint16_t j = 0; j < _config->threads.size(); j++) { - LOG(spam, "Setting up disk %u, thread %u with priority %d", - i, j, _config->threads[j].lowestpri); - _disks[i].push_back(DiskThread::SP( - new PersistenceThread(_compReg, _configUri, *_provider, *_filestorHandler, - *_metrics->disks[i]->threads[j], i, _config->threads[j].lowestpri))); - + LOG(spam, "Setting up disk %u", i); + for (uint32_t j = 0; j < numThreads; j++) { + _disks[i].push_back(std::make_shared<PersistenceThread>(_compReg, _configUri, *_provider, *_filestorHandler, + *_metrics->disks[i]->threads[j], i)); } } else { _filestorHandler->disable(i); @@ -157,36 +132,28 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC } void -FileStorManager::replyDroppedOperation(api::StorageMessage& msg, - const document::Bucket& bucket, - api::ReturnCode::Result returnCode, - vespalib::stringref reason) +FileStorManager::replyDroppedOperation(api::StorageMessage& msg, const document::Bucket& bucket, + api::ReturnCode::Result returnCode, vespalib::stringref reason) { std::ostringstream error; error << "Dropping " << msg.getType() << " to bucket " << bucket.toString() << ". Reason: " << reason; LOGBT(debug, bucket.toString(), "%s", error.str().c_str()); if (!msg.getType().isReply()) { - std::shared_ptr<api::StorageReply> reply( - static_cast<api::StorageCommand&>(msg).makeReply().release()); + std::shared_ptr<api::StorageReply> reply = static_cast<api::StorageCommand&>(msg).makeReply(); reply->setResult(api::ReturnCode(returnCode, error.str())); sendUp(reply); } } void -FileStorManager::replyWithBucketNotFound(api::StorageMessage& msg, - const document::Bucket& bucket) +FileStorManager::replyWithBucketNotFound(api::StorageMessage& msg, const document::Bucket& bucket) { - replyDroppedOperation(msg, - bucket, - api::ReturnCode::BUCKET_NOT_FOUND, - "bucket does not exist"); + replyDroppedOperation(msg, bucket, api::ReturnCode::BUCKET_NOT_FOUND, "bucket does not exist"); } StorBucketDatabase::WrappedEntry -FileStorManager::mapOperationToDisk(api::StorageMessage& msg, - const document::Bucket& bucket) +FileStorManager::mapOperationToDisk(api::StorageMessage& msg, const document::Bucket& bucket) { StorBucketDatabase::WrappedEntry entry(_component.getBucketDatabase(bucket.getBucketSpace()).get( bucket.getBucketId(), "FileStorManager::mapOperationToDisk")); @@ -197,8 +164,7 @@ FileStorManager::mapOperationToDisk(api::StorageMessage& msg, } StorBucketDatabase::WrappedEntry -FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd, - const document::DocumentId* docId) +FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd, const document::DocumentId* docId) { StorBucketDatabase &database = _component.getBucketDatabase(cmd.getBucket().getBucketSpace()); StorBucketDatabase::WrappedEntry entry(database.get( @@ -208,17 +174,12 @@ FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd, if (docId) { specific = _bucketIdFactory.getBucketId(*docId); } - typedef std::map<document::BucketId, - StorBucketDatabase::WrappedEntry> BucketMap; + typedef std::map<document::BucketId, StorBucketDatabase::WrappedEntry> BucketMap; std::shared_ptr<api::StorageReply> reply; { - BucketMap results( - database.getContained( - specific, "FileStorManager::mapOperationToBucketAndDisk-2")); + BucketMap results( database.getContained( specific, "FileStorManager::mapOperationToBucketAndDisk-2")); if (results.size() == 1) { - LOG(debug, - "Remapping %s operation to specific %s versus " - "non-existing %s to %s.", + LOG(debug, "Remapping %s operation to specific %s versus non-existing %s to %s.", cmd.toString().c_str(), specific.toString().c_str(), cmd.getBucketId().toString().c_str(), results.begin()->first.toString().c_str()); @@ -243,10 +204,8 @@ FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd, } LOGBT(debug, cmd.getBucketId().toString(), "%s", error.str().c_str()); - reply.reset(static_cast<api::StorageCommand&>(cmd).makeReply().release()); - reply->setResult( - api::ReturnCode( - api::ReturnCode::BUCKET_NOT_FOUND, error.str())); + reply = static_cast<api::StorageCommand&>(cmd).makeReply(); + reply->setResult( api::ReturnCode( api::ReturnCode::BUCKET_NOT_FOUND, error.str())); } sendUp(reply); } @@ -254,18 +213,15 @@ FileStorManager::mapOperationToBucketAndDisk(api::BucketCommand& cmd, } bool -FileStorManager::handlePersistenceMessage( - const shared_ptr<api::StorageMessage>& msg, uint16_t disk) +FileStorManager::handlePersistenceMessage( const shared_ptr<api::StorageMessage>& msg, uint16_t disk) { api::ReturnCode errorCode(api::ReturnCode::OK); do { - LOG(spam, "Received %s. Attempting to queue it to disk %u.", - msg->getType().getName().c_str(), disk); + LOG(spam, "Received %s. Attempting to queue it to disk %u.", msg->getType().getName().c_str(), disk); LOG_BUCKET_OPERATION_NO_LOCK( getStorageMessageBucket(*msg).getBucketId(), - vespalib::make_string("Attempting to queue %s to disk %u", - msg->toString().c_str(), disk)); + vespalib::make_string("Attempting to queue %s to disk %u", msg->toString().c_str(), disk)); if (_filestorHandler->schedule(msg, disk)) { @@ -275,12 +231,10 @@ FileStorManager::handlePersistenceMessage( } switch (_filestorHandler->getDiskState(disk)) { case FileStorHandler::DISABLED: - errorCode = api::ReturnCode(api::ReturnCode::DISK_FAILURE, - "Disk disabled"); + errorCode = api::ReturnCode(api::ReturnCode::DISK_FAILURE, "Disk disabled"); break; case FileStorHandler::CLOSED: - errorCode = api::ReturnCode(api::ReturnCode::ABORTED, - "Shutting down storage node."); + errorCode = api::ReturnCode(api::ReturnCode::ABORTED, "Shutting down storage node."); break; case FileStorHandler::AVAILABLE: assert(false); @@ -289,8 +243,7 @@ FileStorManager::handlePersistenceMessage( // If we get here, we failed to schedule message. errorCode says why // We need to reply to message (while not having bucket lock) if (!msg->getType().isReply()) { - std::shared_ptr<api::StorageReply> reply( - static_cast<api::StorageCommand&>(*msg).makeReply().release()); + std::shared_ptr<api::StorageReply> reply = static_cast<api::StorageCommand&>(*msg).makeReply(); reply->setResult(errorCode); LOG(spam, "Received persistence message %s. Returning reply: %s", msg->getType().getName().c_str(), errorCode.toString().c_str()); @@ -303,7 +256,7 @@ bool FileStorManager::onPut(const shared_ptr<api::PutCommand>& cmd) { if (cmd->getTimestamp() == 0) { - shared_ptr<api::StorageReply> reply(cmd->makeReply().release()); + shared_ptr<api::StorageReply> reply = cmd->makeReply(); std::string msg("Put command received without timestamp set. " "Distributor need to set timestamp to ensure equal " "timestamps between storage nodes. Rejecting."); @@ -311,8 +264,7 @@ FileStorManager::onPut(const shared_ptr<api::PutCommand>& cmd) sendUp(reply); return true; } - StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk( - *cmd, &cmd->getDocumentId())); + StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, &cmd->getDocumentId())); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -323,7 +275,7 @@ bool FileStorManager::onUpdate(const shared_ptr<api::UpdateCommand>& cmd) { if (cmd->getTimestamp() == 0) { - shared_ptr<api::StorageReply> reply(cmd->makeReply().release()); + shared_ptr<api::StorageReply> reply = cmd->makeReply(); std::string msg("Update command received without timestamp set. " "Distributor need to set timestamp to ensure equal " "timestamps between storage nodes. Rejecting."); @@ -331,8 +283,7 @@ FileStorManager::onUpdate(const shared_ptr<api::UpdateCommand>& cmd) sendUp(reply); return true; } - StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk( - *cmd, &cmd->getDocumentId())); + StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, &cmd->getDocumentId())); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -342,8 +293,7 @@ FileStorManager::onUpdate(const shared_ptr<api::UpdateCommand>& cmd) bool FileStorManager::onGet(const shared_ptr<api::GetCommand>& cmd) { - StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk( - *cmd, &cmd->getDocumentId())); + StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, &cmd->getDocumentId())); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -354,7 +304,7 @@ bool FileStorManager::onRemove(const shared_ptr<api::RemoveCommand>& cmd) { if (cmd->getTimestamp() == 0) { - shared_ptr<api::StorageReply> reply(cmd->makeReply().release()); + shared_ptr<api::StorageReply> reply = cmd->makeReply(); std::string msg("Remove command received without timestamp set. " "Distributor need to set timestamp to ensure equal " "timestamps between storage nodes. Rejecting."); @@ -362,8 +312,7 @@ FileStorManager::onRemove(const shared_ptr<api::RemoveCommand>& cmd) sendUp(reply); return true; } - StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk( - *cmd, &cmd->getDocumentId())); + StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, &cmd->getDocumentId())); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -373,8 +322,7 @@ FileStorManager::onRemove(const shared_ptr<api::RemoveCommand>& cmd) bool FileStorManager::onRevert(const shared_ptr<api::RevertCommand>& cmd) { - StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk( - *cmd, 0)); + StorBucketDatabase::WrappedEntry entry(mapOperationToBucketAndDisk(*cmd, 0)); if (entry.exist()) { handlePersistenceMessage(cmd, entry->disk); } @@ -423,12 +371,10 @@ FileStorManager::onCreateBucket( bucket.getBucketId(), "FileStorManager::onCreateBucket", StorBucketDatabase::CREATE_IF_NONEXISTING)); if (entry.preExisted()) { - LOG(debug, - "Got create bucket request for %s which already exists: %s", + LOG(debug, "Got create bucket request for %s which already exists: %s", cmd->getBucketId().toString().c_str(), entry->getBucketInfo().toString().c_str()); - code = api::ReturnCode(api::ReturnCode::EXISTS, - "Bucket already exist"); + code = api::ReturnCode(api::ReturnCode::EXISTS, "Bucket already exist"); } else { entry->disk = _component.getIdealPartition(cmd->getBucket()); if (_partitions[entry->disk].isUp()) { @@ -445,12 +391,9 @@ FileStorManager::onCreateBucket( return true; } else { entry.remove(); - code = api::ReturnCode( - api::ReturnCode::IO_FAILURE, - vespalib::make_string( - "Trying to create bucket %s on disabled disk %d", - cmd->getBucketId().toString().c_str(), - entry->disk)); + code = api::ReturnCode(api::ReturnCode::IO_FAILURE, + vespalib::make_string("Trying to create bucket %s on disabled disk %d", + cmd->getBucketId().toString().c_str(), entry->disk)); } } } @@ -471,7 +414,7 @@ FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd) "FileStorManager::onDeleteBucket")); if (!entry.exist()) { LOG(debug, "%s was already deleted", cmd->getBucketId().toString().c_str()); - std::shared_ptr<api::StorageReply> reply(cmd->makeReply().release()); + std::shared_ptr<api::StorageReply> reply = cmd->makeReply(); sendUp(reply); return true; } @@ -493,7 +436,7 @@ FileStorManager::onDeleteBucket(const shared_ptr<api::DeleteBucketCommand>& cmd) << entry->getBucketInfo().toString(); LOG(debug, "Rejecting bucket delete: %s", ost.str().c_str()); - std::shared_ptr<api::StorageReply> reply(cmd->makeReply().release()); + std::shared_ptr<api::StorageReply> reply = cmd->makeReply(); static_cast<api::DeleteBucketReply&>(*reply).setBucketInfo(entry->getBucketInfo()); reply->setResult(api::ReturnCode(api::ReturnCode::REJECTED, ost.str())); entry.unlock(); @@ -543,8 +486,7 @@ FileStorManager::ensureConsistentBucket( bool FileStorManager::onMergeBucket(const shared_ptr<api::MergeBucketCommand>& cmd) { - StorBucketDatabase::WrappedEntry entry(ensureConsistentBucket(cmd->getBucket(), *cmd, - "FileStorManager::onMergeBucket")); + StorBucketDatabase::WrappedEntry entry(ensureConsistentBucket(cmd->getBucket(), *cmd, "FileStorManager::onMergeBucket")); if (!entry.exist()) { return true; } @@ -567,7 +509,7 @@ FileStorManager::onMergeBucket(const shared_ptr<api::MergeBucketCommand>& cmd) cmd->toString().c_str(), entry->disk, cmd->getClusterStateVersion(), _component.getStateUpdater().getClusterStateBundle()->getVersion())); LOGBT(debug, cmd->getBucketId().toString(), "%s", code.getMessage().c_str()); - api::MergeBucketReply::SP reply(new api::MergeBucketReply(*cmd)); + auto reply = std::make_shared<api::MergeBucketReply>(*cmd); reply->setResult(code); sendUp(reply); return true; @@ -582,39 +524,30 @@ bool FileStorManager::onGetBucketDiff( const shared_ptr<api::GetBucketDiffCommand>& cmd) { - StorBucketDatabase::WrappedEntry entry( - ensureConsistentBucket(cmd->getBucket(), - *cmd, - "FileStorManager::onGetBucketDiff")); + StorBucketDatabase::WrappedEntry entry(ensureConsistentBucket(cmd->getBucket(), *cmd, "FileStorManager::onGetBucketDiff")); if (!entry.exist()) { return true; } if (!entry.preExisted()) { entry->disk = _component.getIdealPartition(cmd->getBucket()); if (_partitions[entry->disk].isUp()) { - LOG(debug, "Created bucket %s on disk %d (node index is %d) due " - "to get bucket diff being received.", - cmd->getBucketId().toString().c_str(), - entry->disk, _component.getIndex()); + LOG(debug, "Created bucket %s on disk %d (node index is %d) due to get bucket diff being received.", + cmd->getBucketId().toString().c_str(), entry->disk, _component.getIndex()); entry->info.setTotalDocumentSize(0); entry->info.setUsedFileSize(0); entry->info.setReady(true); - // Call before writing bucket entry as we need to have bucket - // lock while calling + // Call before writing bucket entry as we need to have bucket + // lock while calling handlePersistenceMessage(cmd, entry->disk); entry.write(); } else { entry.remove(); api::ReturnCode code(api::ReturnCode::IO_FAILURE, vespalib::make_string( - "Trying to merge non-existing bucket %s, which " - "can't be created because target disk %d is down", - cmd->getBucketId().toString().c_str(), - entry->disk)); - LOGBT(warning, cmd->getBucketId().toString(), - "%s", code.getMessage().c_str()); - api::GetBucketDiffReply::SP reply( - new api::GetBucketDiffReply(*cmd)); + "Trying to merge non-existing bucket %s, which can't be created because target disk %d is down", + cmd->getBucketId().toString().c_str(), entry->disk)); + LOGBT(warning, cmd->getBucketId().toString(), "%s", code.getMessage().c_str()); + auto reply = std::make_shared<api::GetBucketDiffReply>(*cmd); reply->setResult(code); sendUp(reply); return true; @@ -634,8 +567,7 @@ FileStorManager::validateApplyDiffCommandBucket(api::StorageMessage& msg, const BucketSpace bucketSpace(msg.getBucket().getBucketSpace()); if (!_component.getBucketDatabase(bucketSpace).isConsistent(entry)) { document::Bucket bucket(bucketSpace, entry.getBucketId()); - replyDroppedOperation(msg, bucket, api::ReturnCode::ABORTED, - "bucket became inconsistent during merging"); + replyDroppedOperation(msg, bucket, api::ReturnCode::ABORTED, "bucket became inconsistent during merging"); return false; } return true; @@ -715,8 +647,7 @@ FileStorManager::onSplitBucket(const std::shared_ptr<api::SplitBucketCommand>& c } bool -FileStorManager::onSetBucketState( - const std::shared_ptr<api::SetBucketStateCommand>& cmd) +FileStorManager::onSetBucketState(const std::shared_ptr<api::SetBucketStateCommand>& cmd) { StorBucketDatabase::WrappedEntry entry(mapOperationToDisk(*cmd, cmd->getBucket())); if (entry.exist()) { @@ -821,7 +752,7 @@ void FileStorManager::handleAbortBucketOperations(const shared_ptr<AbortBucketOperationsCommand>& cmd) { _filestorHandler->abortQueuedOperations(*cmd); - sendReply(api::StorageReply::SP(cmd->makeReply().release())); + sendReply(api::StorageReply::SP(cmd->makeReply())); } bool @@ -913,8 +844,7 @@ void FileStorManager::onFlush(bool downwards) } void -FileStorManager::reportHtmlStatus(std::ostream& out, - const framework::HttpUrlPath& path) const +FileStorManager::reportHtmlStatus(std::ostream& out, const framework::HttpUrlPath& path) const { bool showStatus = !path.hasAttribute("thread"); bool verbose = path.hasAttribute("verbose"); @@ -962,7 +892,7 @@ FileStorManager::updateState() bool nodeUp = state->getNodeState(node).getState().oneOf("uir"); LOG(debug, "FileStorManager received cluster state '%s'", state->toString().c_str()); - // If edge where we go down + // If edge where we go down if (_nodeUpInLastNodeStateSeenByProvider && !nodeUp) { LOG(debug, "Received cluster state where this node is down; de-activating all buckets in database"); Deactivator deactivator; diff --git a/storage/src/vespa/storage/persistence/filestorage/pausehandler.h b/storage/src/vespa/storage/persistence/filestorage/pausehandler.h deleted file mode 100644 index 59c543b0067..00000000000 --- a/storage/src/vespa/storage/persistence/filestorage/pausehandler.h +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -/** - * @class PauseHandler - * @ingroup persistence - * - * @brief Object that can be used to possibly pause running operation - */ -#pragma once - -#include <vespa/storage/persistence/filestorage/filestorhandler.h> - -namespace storage { - -class PauseHandler { - FileStorHandler* _handler; - uint16_t _disk; - uint8_t _priority; - -public: - PauseHandler() : _handler(0), _disk(0), _priority(0) {} - PauseHandler(FileStorHandler& handler, uint16_t disk) - : _handler(&handler), - _disk(disk), - _priority(0) - { - } - - void setPriority(uint8_t priority) { _priority = priority; } - - void pause() const { if (_handler != 0) _handler->pause(_disk, _priority); } -}; - -} // storage - diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index 6c4cd7b64d2..be6f9577642 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -9,7 +9,6 @@ #include <vespa/document/fieldset/fieldsetrepo.h> #include <vespa/vespalib/stllike/hash_map.hpp> #include <vespa/vespalib/util/exceptions.h> -#include <algorithm> #include <vespa/log/bufferedlogger.h> LOG_SETUP(".persistence.thread"); @@ -21,9 +20,8 @@ PersistenceThread::PersistenceThread(ServiceLayerComponentRegister& compReg, spi::PersistenceProvider& provider, FileStorHandler& filestorHandler, FileStorThreadMetrics& metrics, - uint16_t deviceIndex, - uint8_t lowestPriority) - : _env(configUri, compReg, filestorHandler, metrics, deviceIndex, lowestPriority, provider), + uint16_t deviceIndex) + : _env(configUri, compReg, filestorHandler, metrics, deviceIndex, provider), _warnOnSlowOperations(5000), _spi(provider), _processAllHandler(_env, provider), @@ -106,19 +104,14 @@ bool PersistenceThread::tasConditionMatches(const api::TestAndSetCommand & cmd, MessageTracker::UP PersistenceThread::handlePut(api::PutCommand& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.put[cmd.getLoadType()], - _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.put[cmd.getLoadType()],_env._component.getClock()); if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker)) { return tracker; } - spi::Result response = - _spi.put(getBucket(cmd.getDocumentId(), cmd.getBucket()), - spi::Timestamp(cmd.getTimestamp()), - cmd.getDocument(), - _context); + spi::Result response = _spi.put(getBucket(cmd.getDocumentId(), cmd.getBucket()), + spi::Timestamp(cmd.getTimestamp()), cmd.getDocument(), _context); checkForError(response, *tracker); return tracker; } @@ -126,22 +119,16 @@ PersistenceThread::handlePut(api::PutCommand& cmd) MessageTracker::UP PersistenceThread::handleRemove(api::RemoveCommand& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.remove[cmd.getLoadType()], - _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.remove[cmd.getLoadType()],_env._component.getClock()); if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker)) { return tracker; } - spi::RemoveResult response = - _spi.removeIfFound(getBucket(cmd.getDocumentId(), cmd.getBucket()), - spi::Timestamp(cmd.getTimestamp()), - cmd.getDocumentId(), _context); + spi::RemoveResult response = _spi.removeIfFound(getBucket(cmd.getDocumentId(), cmd.getBucket()), + spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), _context); if (checkForError(response, *tracker)) { - api::RemoveReply* reply(new api::RemoveReply( - cmd, response.wasFound() ? cmd.getTimestamp() : 0)); - tracker->setReply(api::StorageReply::SP(reply)); + tracker->setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0)); } if (!response.wasFound()) { ++_env._metrics.remove[cmd.getLoadType()].notFound; @@ -152,22 +139,18 @@ PersistenceThread::handleRemove(api::RemoveCommand& cmd) MessageTracker::UP PersistenceThread::handleUpdate(api::UpdateCommand& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.update[cmd.getLoadType()], - _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.update[cmd.getLoadType()],_env._component.getClock()); if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker)) { return tracker; } - spi::UpdateResult response = - _spi.update(getBucket(cmd.getUpdate()->getId(), cmd.getBucket()), - spi::Timestamp(cmd.getTimestamp()), - cmd.getUpdate(), _context); + spi::UpdateResult response = _spi.update(getBucket(cmd.getUpdate()->getId(), cmd.getBucket()), + spi::Timestamp(cmd.getTimestamp()), cmd.getUpdate(), _context); if (checkForError(response, *tracker)) { - api::UpdateReply* reply = new api::UpdateReply(cmd); + auto reply = std::make_shared<api::UpdateReply>(cmd); reply->setOldTimestamp(response.getExistingTimestamp()); - tracker->setReply(api::StorageReply::SP(reply)); + tracker->setReply(std::move(reply)); } return tracker; } @@ -175,30 +158,18 @@ PersistenceThread::handleUpdate(api::UpdateCommand& cmd) MessageTracker::UP PersistenceThread::handleGet(api::GetCommand& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.get[cmd.getLoadType()], - _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.get[cmd.getLoadType()],_env._component.getClock()); document::FieldSetRepo repo; - document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(), - cmd.getFieldSet()); + document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(), cmd.getFieldSet()); spi::GetResult result = - _spi.get(getBucket(cmd.getDocumentId(), cmd.getBucket()), - *fieldSet, - cmd.getDocumentId(), - _context); + _spi.get(getBucket(cmd.getDocumentId(), cmd.getBucket()), *fieldSet, cmd.getDocumentId(), _context); if (checkForError(result, *tracker)) { if (!result.hasDocument()) { ++_env._metrics.get[cmd.getLoadType()].notFound; } - - api::GetReply::UP reply( - new api::GetReply(cmd, - Document::SP(result.getDocumentPtr()), - result.getTimestamp())); - - tracker->setReply(api::StorageReply::SP(reply.release())); + tracker->setReply(std::make_shared<api::GetReply>(cmd, result.getDocumentPtr(), result.getTimestamp())); } return tracker; @@ -207,19 +178,14 @@ PersistenceThread::handleGet(api::GetCommand& cmd) MessageTracker::UP PersistenceThread::handleRepairBucket(RepairBucketCommand& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.repairs, - _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.repairs,_env._component.getClock()); NotificationGuard notifyGuard(*_bucketOwnershipNotifier); - LOG(debug, "Repair(%s): %s", - cmd.getBucketId().toString().c_str(), + LOG(debug, "Repair(%s): %s", cmd.getBucketId().toString().c_str(), (cmd.verifyBody() ? "Verifying body" : "Not verifying body")); api::BucketInfo before = _env.getBucketInfo(cmd.getBucket()); spi::Result result = - _spi.maintain(spi::Bucket(cmd.getBucket(), - spi::PartitionId(_env._partition)), - cmd.verifyBody() ? - spi::HIGH : spi::LOW); + _spi.maintain(spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition)), + cmd.verifyBody() ? spi::HIGH : spi::LOW); if (checkForError(result, *tracker)) { api::BucketInfo after = _env.getBucketInfo(cmd.getBucket()); @@ -239,15 +205,11 @@ PersistenceThread::handleRepairBucket(RepairBucketCommand& cmd) MessageTracker::UP PersistenceThread::handleRevert(api::RevertCommand& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.revert[cmd.getLoadType()], - _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.revert[cmd.getLoadType()],_env._component.getClock()); spi::Bucket b = spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); - const std::vector<api::Timestamp> tokens = cmd.getRevertTokens(); - for (uint32_t i = 0; i < tokens.size(); ++i) { - spi::Result result = _spi.removeEntry(b, - spi::Timestamp(tokens[i]), - _context); + const std::vector<api::Timestamp> & tokens = cmd.getRevertTokens(); + for (const api::Timestamp & token : tokens) { + spi::Result result = _spi.removeEntry(b, spi::Timestamp(token), _context); } return tracker; } @@ -255,9 +217,7 @@ PersistenceThread::handleRevert(api::RevertCommand& cmd) MessageTracker::UP PersistenceThread::handleCreateBucket(api::CreateBucketCommand& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.createBuckets, - _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.createBuckets,_env._component.getClock()); LOG(debug, "CreateBucket(%s)", cmd.getBucketId().toString().c_str()); if (_env._fileStorHandler.isMerging(cmd.getBucket())) { LOG(warning, "Bucket %s was merging at create time. Unexpected.", @@ -299,8 +259,7 @@ PersistenceThread::checkProviderBucketInfoMatches(const spi::Bucket& bucket, result.getErrorMessage().c_str()); return false; } - api::BucketInfo providerInfo( - _env.convertBucketInfo(result.getBucketInfo())); + api::BucketInfo providerInfo(_env.convertBucketInfo(result.getBucketInfo())); // Don't check meta fields or active/ready fields since these are not // that important and ready may change under the hood in a race with // getModifiedBuckets(). If bucket is empty it means it has already @@ -323,15 +282,12 @@ PersistenceThread::checkProviderBucketInfoMatches(const spi::Bucket& bucket, MessageTracker::UP PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.deleteBuckets, - _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.deleteBuckets,_env._component.getClock()); LOG(debug, "DeletingBucket(%s)", cmd.getBucketId().toString().c_str()); LOG_BUCKET_OPERATION(cmd.getBucketId(), "deleteBucket()"); if (_env._fileStorHandler.isMerging(cmd.getBucket())) { _env._fileStorHandler.clearMergeStatus(cmd.getBucket(), - api::ReturnCode(api::ReturnCode::ABORTED, - "Bucket was deleted during the merge")); + api::ReturnCode(api::ReturnCode::ABORTED, "Bucket was deleted during the merge")); } spi::Bucket bucket(cmd.getBucket(), spi::PartitionId(_env._partition)); if (!checkProviderBucketInfoMatches(bucket, cmd.getBucketInfo())) { @@ -340,8 +296,7 @@ PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd) _spi.deleteBucket(bucket, _context); StorBucketDatabase& db(_env.getBucketDatabase(cmd.getBucket().getBucketSpace())); { - StorBucketDatabase::WrappedEntry entry(db.get( - cmd.getBucketId(), "FileStorThread::onDeleteBucket")); + StorBucketDatabase::WrappedEntry entry(db.get(cmd.getBucketId(), "FileStorThread::onDeleteBucket")); if (entry.exist() && entry->getMetaCount() > 0) { LOG(debug, "onDeleteBucket(%s): Bucket DB entry existed. Likely " "active operation when delete bucket was queued. " @@ -365,11 +320,8 @@ PersistenceThread::handleDeleteBucket(api::DeleteBucketCommand& cmd) MessageTracker::UP PersistenceThread::handleGetIter(GetIterCommand& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.visit[cmd.getLoadType()], - _env._component.getClock())); - spi::IterateResult result(_spi.iterate(cmd.getIteratorId(), - cmd.getMaxByteSize(), _context)); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.visit[cmd.getLoadType()],_env._component.getClock()); + spi::IterateResult result(_spi.iterate(cmd.getIteratorId(), cmd.getMaxByteSize(), _context)); if (checkForError(result, *tracker)) { GetIterReply::SP reply(new GetIterReply(cmd)); reply->getEntries() = result.steal_entries(); @@ -386,13 +338,11 @@ PersistenceThread::handleGetIter(GetIterCommand& cmd) MessageTracker::UP PersistenceThread::handleReadBucketList(ReadBucketList& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.readBucketList, - _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.readBucketList,_env._component.getClock()); spi::BucketIdListResult result(_spi.listBuckets(cmd.getBucketSpace(), cmd.getPartition())); if (checkForError(result, *tracker)) { - ReadBucketListReply::SP reply(new ReadBucketListReply(cmd)); + auto reply = std::make_shared<ReadBucketListReply>(cmd); result.getList().swap(reply->getBuckets()); tracker->setReply(reply); } @@ -403,36 +353,24 @@ PersistenceThread::handleReadBucketList(ReadBucketList& cmd) MessageTracker::UP PersistenceThread::handleReadBucketInfo(ReadBucketInfo& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.readBucketInfo, - _env._component.getClock())); - - _env.updateBucketDatabase(cmd.getBucket(), - _env.getBucketInfo(cmd.getBucket())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.readBucketInfo,_env._component.getClock()); + _env.updateBucketDatabase(cmd.getBucket(), _env.getBucketInfo(cmd.getBucket())); return tracker; } MessageTracker::UP PersistenceThread::handleCreateIterator(CreateIteratorCommand& cmd) { - MessageTracker::UP tracker(new MessageTracker( - _env._metrics.createIterator, - _env._component.getClock())); + auto tracker = std::make_unique<MessageTracker>(_env._metrics.createIterator,_env._component.getClock()); document::FieldSetRepo repo; - document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(), - cmd.getFields()); + document::FieldSet::UP fieldSet = repo.parse(*_env._component.getTypeRepo(), cmd.getFields()); // _context is reset per command, so it's safe to modify it like this. _context.setReadConsistency(cmd.getReadConsistency()); spi::CreateIteratorResult result(_spi.createIterator( spi::Bucket(cmd.getBucket(), spi::PartitionId(_env._partition)), - *fieldSet, - cmd.getSelection(), - cmd.getIncludedVersions(), - _context)); + *fieldSet, cmd.getSelection(), cmd.getIncludedVersions(), _context)); if (checkForError(result, *tracker)) { - tracker->setReply(CreateIteratorReply::SP( - new CreateIteratorReply( - cmd, spi::IteratorId(result.getIteratorId())))); + tracker->setReply(std::make_shared<CreateIteratorReply>(cmd, spi::IteratorId(result.getIteratorId()))); } return tracker; } @@ -447,10 +385,8 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd) // Calculate the various bucket ids involved. if (cmd.getBucketId().getUsedBits() >= 58) { - tracker->fail( - api::ReturnCode::ILLEGAL_PARAMETERS, - "Can't split anymore since maximum split bits " - "is already reached"); + tracker->fail(api::ReturnCode::ILLEGAL_PARAMETERS, + "Can't split anymore since maximum split bits is already reached"); return tracker; } if (cmd.getMaxSplitBits() <= cmd.getBucketId().getUsedBits()) { @@ -470,13 +406,11 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd) if (targetInfo.empty() || !_env._config.enableMultibitSplitOptimalization) { document::BucketId src(cmd.getBucketId()); document::BucketId target1(src.getUsedBits() + 1, src.getId()); - document::BucketId target2(src.getUsedBits() + 1, src.getId() - | (uint64_t(1) << src.getUsedBits())); + document::BucketId target2(src.getUsedBits() + 1, src.getId() | (uint64_t(1) << src.getUsedBits())); targetInfo = SplitBitDetector::Result(target1, target2, false); } if (targetInfo.failed()) { - tracker->fail(api::ReturnCode::INTERNAL_FAILURE, - targetInfo.getReason()); + tracker->fail(api::ReturnCode::INTERNAL_FAILURE, targetInfo.getReason()); return tracker; } // If we get here, we're splitting data in two. @@ -519,8 +453,9 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd) // Ensure to take them in rising order. StorBucketDatabase::WrappedEntry sourceEntry(_env.getBucketDatabase(spiBucket.getBucket().getBucketSpace()).get( cmd.getBucketId(), "PersistenceThread::handleSplitBucket-source")); - api::SplitBucketReply* splitReply(new api::SplitBucketReply(cmd)); - tracker->setReply(api::StorageReply::SP(splitReply)); + auto reply = std::make_shared<api::SplitBucketReply>(cmd); + api::SplitBucketReply & splitReply = *reply; + tracker->setReply(std::move(reply)); typedef std::pair<StorBucketDatabase::WrappedEntry, FileStorHandler::RemapInfo> TargetInfo; @@ -581,7 +516,7 @@ PersistenceThread::handleSplitBucket(api::SplitBucketCommand& cmd) createTarget.toString().c_str()); _spi.createBucket(createTarget, _context); } - splitReply->getSplitInfo().push_back( + splitReply.getSplitInfo().push_back( api::SplitBucketReply::Entry( targets[i].second.bucket.getBucketId(), targets[i].first->getBucketInfo())); @@ -953,28 +888,23 @@ PersistenceThread::processMessage(api::StorageMessage& msg) ++_env._metrics.operations; if (msg.getType().isReply()) { try{ - _env._pauseHandler.setPriority(msg.getPriority()); LOG(debug, "Handling reply: %s", msg.toString().c_str()); LOG(spam, "Message content: %s", msg.toString(true).c_str()); handleReply(static_cast<api::StorageReply&>(msg)); } catch (std::exception& e) { // It's a reply, so nothing we can do. - LOG(debug, "Caught exception for %s: %s", - msg.toString().c_str(), - e.what()); + LOG(debug, "Caught exception for %s: %s", msg.toString().c_str(), e.what()); } } else { api::StorageCommand& initiatingCommand = static_cast<api::StorageCommand&>(msg); try { - int64_t startTime( - _component->getClock().getTimeInMillis().getTime()); + int64_t startTime(_component->getClock().getTimeInMillis().getTime()); LOG(debug, "Handling command: %s", msg.toString().c_str()); LOG(spam, "Message content: %s", msg.toString(true).c_str()); - std::unique_ptr<MessageTracker> tracker( - handleCommand(initiatingCommand)); + auto tracker(handleCommand(initiatingCommand)); if (!tracker.get()) { LOG(debug, "Received unsupported command %s", msg.getType().getName().c_str()); @@ -1080,23 +1010,18 @@ PersistenceThread::flushAllReplies( if (errorCode != 0) { for (uint32_t i = 0; i < replies.size(); ++i) { replies[i]->getReply()->setResult( - api::ReturnCode( - (api::ReturnCode::Result)errorCode, - result.getErrorMessage())); + api::ReturnCode((api::ReturnCode::Result)errorCode, result.getErrorMessage())); } } } catch (std::exception& e) { for (uint32_t i = 0; i < replies.size(); ++i) { - replies[i]->getReply()->setResult(api::ReturnCode( - api::ReturnCode::INTERNAL_FAILURE, e.what())); + replies[i]->getReply()->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, e.what())); } } for (uint32_t i = 0; i < replies.size(); ++i) { - LOG(spam, - "Sending reply up (batched): %s %zu", - replies[i]->getReply()->toString().c_str(), - replies[i]->getReply()->getMsgId()); + LOG(spam, "Sending reply up (batched): %s %zu", + replies[i]->getReply()->toString().c_str(), replies[i]->getReply()->getMsgId()); _env._fileStorHandler.sendReply(replies[i]->getReply()); } @@ -1108,7 +1033,7 @@ void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock) std::vector<MessageTracker::UP> trackers; document::Bucket bucket = lock.first->getBucket(); - while (lock.second.get() != 0) { + while (lock.second) { LOG(debug, "Inside while loop %d, nodeIndex %d, ptr=%p", _env._partition, _env._nodeIndex, lock.second.get()); std::shared_ptr<api::StorageMessage> msg(lock.second); @@ -1121,7 +1046,7 @@ void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock) } std::unique_ptr<MessageTracker> tracker = processMessage(*msg); - if (!tracker.get() || !tracker->getReply().get()) { + if (!tracker || !tracker->getReply()) { // Was a reply break; } @@ -1133,24 +1058,18 @@ void PersistenceThread::processMessages(FileStorHandler::LockedMessage & lock) } if (batchable) { LOG(spam, "Adding reply %s to batch for bucket %s", - tracker->getReply()->toString().c_str(), - bucket.getBucketId().toString().c_str()); + tracker->getReply()->toString().c_str(), bucket.getBucketId().toString().c_str()); trackers.push_back(std::move(tracker)); if (trackers.back()->getReply()->getResult().success()) { - _env._fileStorHandler.getNextMessage( - _env._partition, - lock, - _env._lowestPriority); + _env._fileStorHandler.getNextMessage(_env._partition, lock); } else { break; } } else { - LOG(spam, - "Sending reply up: %s %zu", - tracker->getReply()->toString().c_str(), - tracker->getReply()->getMsgId()); + LOG(spam, "Sending reply up: %s %zu", + tracker->getReply()->toString().c_str(), tracker->getReply()->getMsgId()); _env._fileStorHandler.sendReply(tracker->getReply()); break; @@ -1165,16 +1084,12 @@ PersistenceThread::run(framework::ThreadHandle& thread) { LOG(debug, "Started persistence thread with pid %d", getpid()); - while (!thread.interrupted() - && !_env._fileStorHandler.closed(_env._partition)) - { + while (!thread.interrupted() && !_env._fileStorHandler.closed(_env._partition)) { thread.registerTick(); - FileStorHandler::LockedMessage lock( - _env._fileStorHandler.getNextMessage( - _env._partition, _env._lowestPriority)); + FileStorHandler::LockedMessage lock(_env._fileStorHandler.getNextMessage(_env._partition)); - if (lock.first.get()) { + if (lock.first) { processMessages(lock); } diff --git a/storage/src/vespa/storage/persistence/persistencethread.h b/storage/src/vespa/storage/persistence/persistencethread.h index 07fadb875cc..640614f7edf 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.h +++ b/storage/src/vespa/storage/persistence/persistencethread.h @@ -21,7 +21,7 @@ class PersistenceThread final : public DiskThread, public Types public: PersistenceThread(ServiceLayerComponentRegister&, const config::ConfigUri & configUri, spi::PersistenceProvider& provider, FileStorHandler& filestorHandler, - FileStorThreadMetrics& metrics, uint16_t deviceIndex, uint8_t lowestPriority); + FileStorThreadMetrics& metrics, uint16_t deviceIndex); ~PersistenceThread(); /** Waits for current operation to be finished. */ diff --git a/storage/src/vespa/storage/persistence/persistenceutil.cpp b/storage/src/vespa/storage/persistence/persistenceutil.cpp index 8e96c05a66a..c2dcb8e2a29 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.cpp +++ b/storage/src/vespa/storage/persistence/persistenceutil.cpp @@ -66,7 +66,6 @@ PersistenceUtil::PersistenceUtil( FileStorHandler& fileStorHandler, FileStorThreadMetrics& metrics, uint16_t partition, - uint8_t lowestPriority, spi::PersistenceProvider& provider) : _config(*config::ConfigGetter<vespa::config::content::StorFilestorConfig>::getConfig(configUri.getConfigId(), configUri.getContext())), _compReg(compReg), @@ -77,24 +76,18 @@ PersistenceUtil::PersistenceUtil( _metrics(metrics), _bucketFactory(_component.getBucketIdFactory()), _repo(_component.getTypeRepo()), - _lowestPriority(lowestPriority), - _pauseHandler(), _spi(provider) { } -PersistenceUtil::~PersistenceUtil() -{ -} +PersistenceUtil::~PersistenceUtil() { } void -PersistenceUtil::updateBucketDatabase(const document::Bucket &bucket, - const api::BucketInfo& i) +PersistenceUtil::updateBucketDatabase(const document::Bucket &bucket, const api::BucketInfo& i) { // Update bucket database - StorBucketDatabase::WrappedEntry entry(getBucketDatabase(bucket.getBucketSpace()).get( - bucket.getBucketId(), - "env::updatebucketdb")); + StorBucketDatabase::WrappedEntry entry(getBucketDatabase(bucket.getBucketSpace()).get(bucket.getBucketId(), + "env::updatebucketdb")); if (entry.exist()) { api::BucketInfo info = i; @@ -106,9 +99,7 @@ PersistenceUtil::updateBucketDatabase(const document::Bucket &bucket, entry->setBucketInfo(info); entry.write(); } else { - LOG(debug, - "Bucket(%s).getBucketInfo: Bucket does not exist.", - bucket.getBucketId().toString().c_str()); + LOG(debug, "Bucket(%s).getBucketInfo: Bucket does not exist.", bucket.getBucketId().toString().c_str()); } } diff --git a/storage/src/vespa/storage/persistence/persistenceutil.h b/storage/src/vespa/storage/persistence/persistenceutil.h index 5126931bab6..bf347ccb10a 100644 --- a/storage/src/vespa/storage/persistence/persistenceutil.h +++ b/storage/src/vespa/storage/persistence/persistenceutil.h @@ -1,12 +1,11 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once +#include "types.h" #include <vespa/persistence/spi/persistenceprovider.h> #include <vespa/storage/common/servicelayercomponent.h> #include <vespa/storage/persistence/filestorage/filestorhandler.h> #include <vespa/storage/persistence/filestorage/filestormetrics.h> -#include <vespa/storage/persistence/filestorage/pausehandler.h> -#include <vespa/storage/persistence/types.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/storage/storageutil/utils.h> #include <vespa/config-stor-filestor.h> @@ -70,8 +69,6 @@ struct PersistenceUtil { FileStorThreadMetrics& _metrics; const document::BucketIdFactory& _bucketFactory; const std::shared_ptr<document::DocumentTypeRepo> _repo; - uint8_t _lowestPriority; - PauseHandler _pauseHandler; spi::PersistenceProvider& _spi; PersistenceUtil( @@ -80,7 +77,6 @@ struct PersistenceUtil { FileStorHandler& fileStorHandler, FileStorThreadMetrics& metrics, uint16_t partition, - uint8_t lowestPriority, spi::PersistenceProvider& provider); ~PersistenceUtil(); diff --git a/storage/src/vespa/storage/storageserver/storagenode.h b/storage/src/vespa/storage/storageserver/storagenode.h index 38a48ac2eed..c8739442c13 100644 --- a/storage/src/vespa/storage/storageserver/storagenode.h +++ b/storage/src/vespa/storage/storageserver/storagenode.h @@ -115,20 +115,20 @@ private: bool _attemptedStopped; vespalib::string _pidFile; - // First components that doesn't depend on others + // First components that doesn't depend on others std::unique_ptr<StatusWebServer> _statusWebServer; std::shared_ptr<StorageMetricSet> _metrics; std::unique_ptr<metrics::MetricManager> _metricManager; - // Depends on bucket databases and stop() functionality + // Depends on bucket databases and stop() functionality std::unique_ptr<DeadLockDetector> _deadLockDetector; - // Depends on metric manager + // Depends on metric manager std::unique_ptr<StatusMetricConsumer> _statusMetrics; - // Depends on metric manager + // Depends on metric manager std::unique_ptr<StateReporter> _stateReporter; std::unique_ptr<StateManager> _stateManager; - // The storage chain can depend on anything. + // The storage chain can depend on anything. std::unique_ptr<StorageLink> _chain; /** Implementation of config callbacks. */ diff --git a/storageserver/src/tests/storageservertest.cpp b/storageserver/src/tests/storageservertest.cpp index 36d72bdc2b4..03b4dfd80da 100644 --- a/storageserver/src/tests/storageservertest.cpp +++ b/storageserver/src/tests/storageservertest.cpp @@ -222,8 +222,7 @@ StorageServerTest::setUp() storConfig.reset(new vdstestlib::DirConfig(getStandardConfig(true))); addSlobrokConfig(*distConfig, *slobrok); addSlobrokConfig(*storConfig, *slobrok); - storConfig->getConfig("stor-filestor") - .set("fail_disk_after_error_count", "1"); + storConfig->getConfig("stor-filestor").set("fail_disk_after_error_count", "1"); system("mkdir -p vdsroot/disks/d0"); system("mkdir -p vdsroot.distributor"); slobrokMirror.reset(new SlobrokMirror(slobrok->config())); diff --git a/storageserver/src/tests/testhelper.cpp b/storageserver/src/tests/testhelper.cpp index bcc5967215b..b245a6500bd 100644 --- a/storageserver/src/tests/testhelper.cpp +++ b/storageserver/src/tests/testhelper.cpp @@ -7,23 +7,6 @@ LOG_SETUP(".testhelper"); namespace storage { -namespace { - bool useNewStorageCore() { - if ( // Unit test directory - vespalib::fileExists("use_new_storage_core") || - // src/cpp directory - vespalib::fileExists("../use_new_storage_core") || - // Top build directory where storage-HEAD remains - vespalib::fileExists("../../../../../use_new_storage_core")) - { - std::cerr << "Using new storage core for unit tests\n"; - return true; - } - return false; - } - bool newStorageCore(useNewStorageCore()); -} - void addStorageDistributionConfig(vdstestlib::DirConfig& dc) { vdstestlib::DirConfig::Config* config; @@ -72,18 +55,13 @@ vdstestlib::DirConfig getStandardConfig(bool storagenode) { config->set("threads[0].lowestpri 255"); config->set("dir_spread", "4"); config->set("dir_levels", "0"); - config->set("use_new_core", newStorageCore ? "true" : "false"); config->set("maximum_versions_of_single_document_stored", "0"); //config->set("enable_slotfile_cache", "false"); // Unit tests typically use fake low time values, so don't complain // about them or compact/delete them by default. Override in tests testing that // behavior - config->set("time_future_limit", "5"); - config->set("time_past_limit", "2000000000"); config->set("keep_remove_time_period", "2000000000"); config->set("revert_time_period", "2000000000"); - // Don't want test to call exit() - config->set("fail_disk_after_error_count", "0"); config = &dc.addConfig("stor-memfilepersistence"); // Easier to see what goes wrong with only 1 thread per disk. config->set("minimum_file_meta_slots", "2"); @@ -94,6 +72,7 @@ vdstestlib::DirConfig getStandardConfig(bool storagenode) { config = &dc.addConfig("persistence"); config->set("keep_remove_time_period", "2000000000"); config->set("revert_time_period", "2000000000"); + config->set("fail_disk_after_error_count", "0"); config = &dc.addConfig("stor-bouncer"); config = &dc.addConfig("stor-integritychecker"); config = &dc.addConfig("stor-bucketmover"); |