summaryrefslogtreecommitdiffstats
path: root/documentapi/src
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahoo-inc.com>2017-06-26 14:58:46 +0200
committerTor Brede Vekterli <vekterli@yahoo-inc.com>2017-06-26 15:01:13 +0200
commit134e23ec9b83084f1ab3ecc4bc6eeb42b2d0d028 (patch)
treec2b1dc685108dadb460b03575ee780b7dd079e2c /documentapi/src
parentc54e745f8866462f602eb35fde2046631b369ecc (diff)
Add a lockfree Slobrok generation cache to StoragePolicy
Avoids O(n) pattern matching against entire Slobrok service list for each message sent towards distributors. Cache is automatically reset when a new generation is detected.
Diffstat (limited to 'documentapi/src')
-rw-r--r--documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java65
-rw-r--r--documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java130
2 files changed, 194 insertions, 1 deletions
diff --git a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java
index 65912cdd0bd..6a8c00cc1b1 100644
--- a/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java
+++ b/documentapi/src/main/java/com/yahoo/documentapi/messagebus/protocol/StoragePolicy.java
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.documentapi.messagebus.protocol;
+import com.yahoo.concurrent.CopyOnWriteHashMap;
import com.yahoo.config.subscription.ConfigSourceSet;
import com.yahoo.document.BucketId;
import com.yahoo.document.BucketIdFactory;
@@ -124,6 +125,68 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
}
}
+ static class TargetCachingSlobrokHostFetcher extends SlobrokHostFetcher {
+
+ /**
+ * Distributor index to resolved RPC spec cache for a single given Slobrok
+ * update generation. Uses a thread safe COW map which will grow until stable.
+ */
+ private static class GenerationCache {
+ private final int generation;
+ private final CopyOnWriteHashMap<Integer, String> targets = new CopyOnWriteHashMap<>();
+
+ GenerationCache(int generation) {
+ this.generation = generation;
+ }
+
+ public int generation() { return this.generation; }
+
+ public String get(Integer index) {
+ return targets.get(index);
+ }
+ public void put(Integer index, String target) {
+ targets.put(index, target);
+ }
+ }
+
+ private volatile GenerationCache generationCache = null;
+
+ TargetCachingSlobrokHostFetcher(SlobrokHostPatternGenerator patternGenerator, ExternalSlobrokPolicy policy) {
+ super(patternGenerator, policy);
+ }
+
+ @Override
+ public String getTargetSpec(Integer distributor, RoutingContext context) {
+ GenerationCache cache = generationCache;
+ int currentGeneration = getMirror(context).updates();
+ // The below code might race with other threads during a generation change. That is OK, as the cache
+ // is thread safe and will quickly converge to a stable state for the new generation.
+ if (cache == null || currentGeneration != cache.generation()) {
+ cache = new GenerationCache(currentGeneration);
+ generationCache = cache;
+ }
+ if (distributor != null) {
+ return cachingGetTargetSpec(distributor, context, cache);
+ }
+ // Wildcard lookup case. Must not be cached.
+ return super.getTargetSpec(null, context);
+ }
+
+ private String cachingGetTargetSpec(Integer distributor, RoutingContext context, GenerationCache cache) {
+ String cachedTarget = cache.get(distributor);
+ if (cachedTarget != null) {
+ return cachedTarget;
+ }
+ // Mirror _may_ be at a higher version if we race with generation read, but that is OK since
+ // we'll either way get the most up-to-date mapping and the cache will be invalidated on the
+ // next invocation.
+ String resolvedTarget = super.getTargetSpec(distributor, context);
+ cache.put(distributor, resolvedTarget);
+ return resolvedTarget;
+ }
+
+ }
+
/** Class parsing the semicolon separated parameter string and exposes the appropriate value to the policy. */
public static class Parameters {
protected String clusterName = null;
@@ -147,7 +210,7 @@ public class StoragePolicy extends ExternalSlobrokPolicy {
return new SlobrokHostPatternGenerator(getClusterName());
}
public HostFetcher createHostFetcher(ExternalSlobrokPolicy policy) {
- return new SlobrokHostFetcher(slobrokHostPatternGenerator, policy);
+ return new TargetCachingSlobrokHostFetcher(slobrokHostPatternGenerator, policy);
}
public Distribution createDistribution(ExternalSlobrokPolicy policy) {
return (policy.configSources != null ?
diff --git a/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java
new file mode 100644
index 00000000000..7e6c7bc468a
--- /dev/null
+++ b/documentapi/src/test/java/com/yahoo/documentapi/messagebus/protocol/TargetCachingSlobrokHostFetcherTest.java
@@ -0,0 +1,130 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.documentapi.messagebus.protocol;
+
+import com.yahoo.jrt.slobrok.api.IMirror;
+import com.yahoo.jrt.slobrok.api.Mirror;
+import com.yahoo.messagebus.routing.RoutingContext;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * @author vekterli
+ */
+public class TargetCachingSlobrokHostFetcherTest {
+
+ static String idOfIndex(int index) {
+ return String.format("storage/cluster.foo/distributor/%d/default", index);
+ }
+
+ static String idOfWildcardLookup() {
+ return "storage/cluster.foo/distributor/*/default";
+ }
+
+ static String lookupSpecOfIndex(int index) {
+ return String.format("tcp/localhost:%d", index);
+ }
+
+ static String resolvedSpecOfIndex(int index) {
+ return String.format("tcp/localhost:%d/default", index);
+ }
+
+ static Mirror.Entry[] dummyEntries(int... indices) {
+ return Arrays.stream(indices)
+ .mapToObj(index -> new Mirror.Entry(idOfIndex(index), lookupSpecOfIndex(index)))
+ .toArray(Mirror.Entry[]::new);
+ }
+
+ static class Fixture {
+ ExternalSlobrokPolicy mockSlobrokPolicy = mock(ExternalSlobrokPolicy.class);
+ IMirror mockMirror = mock(IMirror.class);
+ StoragePolicy.SlobrokHostPatternGenerator patternGenerator = new StoragePolicy.SlobrokHostPatternGenerator("foo");
+ StoragePolicy.TargetCachingSlobrokHostFetcher hostFetcher = new StoragePolicy.TargetCachingSlobrokHostFetcher(patternGenerator, mockSlobrokPolicy);
+ RoutingContext routingContext = mock(RoutingContext.class);
+
+ Fixture() {
+ when(mockMirror.updates()).thenReturn(1);
+ when(routingContext.getMirror()).thenReturn(mockMirror);
+ when(mockSlobrokPolicy.lookup(anyObject(), eq(idOfIndex(1)))).thenReturn(dummyEntries(1));
+ when(mockSlobrokPolicy.lookup(anyObject(), eq(idOfIndex(2)))).thenReturn(dummyEntries(2));
+ when(mockSlobrokPolicy.lookup(anyObject(), eq(idOfWildcardLookup()))).thenReturn(dummyEntries(1, 2, 3, 4));
+ }
+ }
+
+ @Test
+ public void lookup_passed_through_on_first_fetch() {
+ Fixture fixture = new Fixture();
+
+ String spec = fixture.hostFetcher.getTargetSpec(1, fixture.routingContext);
+ assertEquals(resolvedSpecOfIndex(1), spec);
+ verify(fixture.mockSlobrokPolicy, times(1)).lookup(anyObject(), eq(idOfIndex(1)));
+ }
+
+ @Test
+ public void cached_index_does_not_do_slobrok_lookup() {
+ Fixture fixture = new Fixture();
+
+ String spec1 = fixture.hostFetcher.getTargetSpec(1, fixture.routingContext);
+ String spec2 = fixture.hostFetcher.getTargetSpec(1, fixture.routingContext);
+ assertEquals(spec1, spec2);
+ // Only invoked once
+ verify(fixture.mockSlobrokPolicy, times(1)).lookup(anyObject(), anyString());
+ }
+
+ @Test
+ public void multiple_indexes_are_cached() {
+ Fixture fixture = new Fixture();
+
+ String spec1_1 = fixture.hostFetcher.getTargetSpec(1, fixture.routingContext);
+ String spec2_1 = fixture.hostFetcher.getTargetSpec(2, fixture.routingContext);
+
+ assertEquals(resolvedSpecOfIndex(1), spec1_1);
+ assertEquals(resolvedSpecOfIndex(2), spec2_1);
+
+ String spec1_2 = fixture.hostFetcher.getTargetSpec(1, fixture.routingContext);
+ String spec2_2 = fixture.hostFetcher.getTargetSpec(2, fixture.routingContext);
+ assertEquals(spec1_1, spec1_2);
+ assertEquals(spec2_1, spec2_2);
+
+ verify(fixture.mockSlobrokPolicy, times(1)).lookup(anyObject(), eq(idOfIndex(1)));
+ verify(fixture.mockSlobrokPolicy, times(1)).lookup(anyObject(), eq(idOfIndex(2)));
+ }
+
+ @Test
+ public void generation_change_evicts_cache() {
+ Fixture fixture = new Fixture();
+
+ when(fixture.mockMirror.updates()).thenReturn(1).thenReturn(2);
+ when(fixture.mockSlobrokPolicy.lookup(anyObject(), eq(idOfIndex(1))))
+ .thenReturn(dummyEntries(1)).thenReturn(dummyEntries(2));
+
+ String spec1 = fixture.hostFetcher.getTargetSpec(1, fixture.routingContext);
+ String spec2 = fixture.hostFetcher.getTargetSpec(1, fixture.routingContext);
+
+ assertEquals(resolvedSpecOfIndex(1), spec1);
+ assertEquals(resolvedSpecOfIndex(2), spec2);
+ }
+
+ @Test
+ public void wildcard_null_distributor_index_is_not_cached() {
+ Fixture fixture = new Fixture();
+
+ String spec = fixture.hostFetcher.getTargetSpec(null, fixture.routingContext);
+ assertNotNull(spec);
+ spec = fixture.hostFetcher.getTargetSpec(null, fixture.routingContext);
+ assertNotNull(spec);
+
+ verify(fixture.mockSlobrokPolicy, times(2)).lookup(anyObject(), eq(idOfWildcardLookup()));
+ }
+
+}