diff options
author | Tor Brede Vekterli <vekterli@yahoo-inc.com> | 2017-06-26 14:58:46 +0200 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahoo-inc.com> | 2017-06-26 15:01:13 +0200 |
commit | 134e23ec9b83084f1ab3ecc4bc6eeb42b2d0d028 (patch) | |
tree | c2b1dc685108dadb460b03575ee780b7dd079e2c /documentapi | |
parent | c54e745f8866462f602eb35fde2046631b369ecc (diff) |
Add a lockfree Slobrok generation cache to StoragePolicy
Avoids O(n) pattern matching against entire Slobrok service list for
each message sent towards distributors. Cache is automatically reset
when a new generation is detected.
Diffstat (limited to 'documentapi')
2 files changed, 194 insertions, 1 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java index 65912cdd0bd..6a8c00cc1b1 100644 --- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java +++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.documentapi.messagebus.protocol; +import com.yahoo.concurrent.CopyOnWriteHashMap; import com.yahoo.config.subscription.ConfigSourceSet; import com.yahoo.document.BucketId; import com.yahoo.document.BucketIdFactory; @@ -124,6 +125,68 @@ public class StoragePolicy extends ExternalSlobrokPolicy { } } + static class TargetCachingSlobrokHostFetcher extends SlobrokHostFetcher { + + /** + * Distributor index to resolved RPC spec cache for a single given Slobrok + * update generation. Uses a thread safe COW map which will grow until stable. + */ + private static class GenerationCache { + private final int generation; + private final CopyOnWriteHashMap<Integer, String> targets = new CopyOnWriteHashMap<>(); + + GenerationCache(int generation) { + this.generation = generation; + } + + public int generation() { return this.generation; } + + public String get(Integer index) { + return targets.get(index); + } + public void put(Integer index, String target) { + targets.put(index, target); + } + } + + private volatile GenerationCache generationCache = null; + + TargetCachingSlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, ExternalSlobrokPolicy policy) { + super(patternGenerator, policy); + } + + @Override + public String getTargetSpec(Integer distributor, RoutingContext context) { + GenerationCache cache = generationCache; + int currentGeneration = getMirror(context).updates(); + // The below code might race with other threads during a generation change. That is OK, as the cache + // is thread safe and will quickly converge to a stable state for the new generation. + if (cache == null || currentGeneration != cache.generation()) { + cache = new GenerationCache(currentGeneration); + generationCache = cache; + } + if (distributor != null) { + return cachingGetTargetSpec(distributor, context, cache); + } + // Wildcard lookup case. Must not be cached. + return super.getTargetSpec(null, context); + } + + private String cachingGetTargetSpec(Integer distributor, RoutingContext context, GenerationCache cache) { + String cachedTarget = cache.get(distributor); + if (cachedTarget != null) { + return cachedTarget; + } + // Mirror _may_ be at a higher version if we race with generation read, but that is OK since + // we'll either way get the most up-to-date mapping and the cache will be invalidated on the + // next invocation. + String resolvedTarget = super.getTargetSpec(distributor, context); + cache.put(distributor, resolvedTarget); + return resolvedTarget; + } + + } + /** Class parsing the semicolon separated parameter string and exposes the appropriate value to the policy. */ public static class Parameters { protected String clusterName = null; @@ -147,7 +210,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy { return new SlobrokHostPatternGenerator(getClusterName()); } public HostFetcher createHostFetcher(ExternalSlobrokPolicy policy) { - return new SlobrokHostFetcher(slobrokHostPatternGenerator, policy); + return new TargetCachingSlobrokHostFetcher(slobrokHostPatternGenerator, policy); } public Distribution createDistribution(ExternalSlobrokPolicy policy) { return (policy.configSources != null ? diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java new file mode 100644 index 00000000000..7e6c7bc468a --- /dev/null +++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java @@ -0,0 +1,130 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.documentapi.messagebus.protocol; + +import com.yahoo.jrt.slobrok.api.IMirror; +import com.yahoo.jrt.slobrok.api.Mirror; +import com.yahoo.messagebus.routing.RoutingContext; +import org.junit.Test; + +import java.util.Arrays; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * @author vekterli + */ +public class TargetCachingSlobrokHostFetcherTest { + + static String idOfIndex(int index) { + return String.format("storage/cluster.foo/distributor/%d/default", index); + } + + static String idOfWildcardLookup() { + return "storage/cluster.foo/distributor/*/default"; + } + + static String lookupSpecOfIndex(int index) { + return String.format("tcp/localhost:%d", index); + } + + static String resolvedSpecOfIndex(int index) { + return String.format("tcp/localhost:%d/default", index); + } + + static Mirror.Entry[] dummyEntries(int... indices) { + return Arrays.stream(indices) + .mapToObj(index -> new Mirror.Entry(idOfIndex(index), lookupSpecOfIndex(index))) + .toArray(Mirror.Entry[]::new); + } + + static class Fixture { + ExternalSlobrokPolicy mockSlobrokPolicy = mock(ExternalSlobrokPolicy.class); + IMirror mockMirror = mock(IMirror.class); + StoragePolicy.SlobrokHostPatternGenerator patternGenerator = new StoragePolicy.SlobrokHostPatternGenerator("foo"); + StoragePolicy.TargetCachingSlobrokHostFetcher hostFetcher = new StoragePolicy.TargetCachingSlobrokHostFetcher(patternGenerator, mockSlobrokPolicy); + RoutingContext routingContext = mock(RoutingContext.class); + + Fixture() { + when(mockMirror.updates()).thenReturn(1); + when(routingContext.getMirror()).thenReturn(mockMirror); + when(mockSlobrokPolicy.lookup(anyObject(), eq(idOfIndex(1)))).thenReturn(dummyEntries(1)); + when(mockSlobrokPolicy.lookup(anyObject(), eq(idOfIndex(2)))).thenReturn(dummyEntries(2)); + when(mockSlobrokPolicy.lookup(anyObject(), eq(idOfWildcardLookup()))).thenReturn(dummyEntries(1, 2, 3, 4)); + } + } + + @Test + public void lookup_passed_through_on_first_fetch() { + Fixture fixture = new Fixture(); + + String spec = fixture.hostFetcher.getTargetSpec(1, fixture.routingContext); + assertEquals(resolvedSpecOfIndex(1), spec); + verify(fixture.mockSlobrokPolicy, times(1)).lookup(anyObject(), eq(idOfIndex(1))); + } + + @Test + public void cached_index_does_not_do_slobrok_lookup() { + Fixture fixture = new Fixture(); + + String spec1 = fixture.hostFetcher.getTargetSpec(1, fixture.routingContext); + String spec2 = fixture.hostFetcher.getTargetSpec(1, fixture.routingContext); + assertEquals(spec1, spec2); + // Only invoked once + verify(fixture.mockSlobrokPolicy, times(1)).lookup(anyObject(), anyString()); + } + + @Test + public void multiple_indexes_are_cached() { + Fixture fixture = new Fixture(); + + String spec1_1 = fixture.hostFetcher.getTargetSpec(1, fixture.routingContext); + String spec2_1 = fixture.hostFetcher.getTargetSpec(2, fixture.routingContext); + + assertEquals(resolvedSpecOfIndex(1), spec1_1); + assertEquals(resolvedSpecOfIndex(2), spec2_1); + + String spec1_2 = fixture.hostFetcher.getTargetSpec(1, fixture.routingContext); + String spec2_2 = fixture.hostFetcher.getTargetSpec(2, fixture.routingContext); + assertEquals(spec1_1, spec1_2); + assertEquals(spec2_1, spec2_2); + + verify(fixture.mockSlobrokPolicy, times(1)).lookup(anyObject(), eq(idOfIndex(1))); + verify(fixture.mockSlobrokPolicy, times(1)).lookup(anyObject(), eq(idOfIndex(2))); + } + + @Test + public void generation_change_evicts_cache() { + Fixture fixture = new Fixture(); + + when(fixture.mockMirror.updates()).thenReturn(1).thenReturn(2); + when(fixture.mockSlobrokPolicy.lookup(anyObject(), eq(idOfIndex(1)))) + .thenReturn(dummyEntries(1)).thenReturn(dummyEntries(2)); + + String spec1 = fixture.hostFetcher.getTargetSpec(1, fixture.routingContext); + String spec2 = fixture.hostFetcher.getTargetSpec(1, fixture.routingContext); + + assertEquals(resolvedSpecOfIndex(1), spec1); + assertEquals(resolvedSpecOfIndex(2), spec2); + } + + @Test + public void wildcard_null_distributor_index_is_not_cached() { + Fixture fixture = new Fixture(); + + String spec = fixture.hostFetcher.getTargetSpec(null, fixture.routingContext); + assertNotNull(spec); + spec = fixture.hostFetcher.getTargetSpec(null, fixture.routingContext); + assertNotNull(spec); + + verify(fixture.mockSlobrokPolicy, times(2)).lookup(anyObject(), eq(idOfWildcardLookup())); + } + +} |