summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/federation
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
committerJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
commit72231250ed81e10d66bfe70701e64fa5fe50f712 (patch)
tree2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /container-search/src/main/java/com/yahoo/search/federation
Publish
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/federation')
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/CommonFields.java22
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/FederationSearcher.java948
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/ForwardingSearcher.java106
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/FutureWaiter.java58
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/TimeoutException.java20
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/http/ConfiguredHTTPClientSearcher.java36
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/http/ConfiguredHTTPProviderSearcher.java68
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/http/ConfiguredSearcherHelper.java27
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/http/Connection.java30
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/http/GzipDecompressingEntity.java125
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/http/HTTPClientSearcher.java276
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/http/HTTPParameters.java315
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/http/HTTPProviderSearcher.java260
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/http/HTTPSearcher.java958
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/http/TimedHttpEntity.java88
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/http/TimedStream.java111
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/http/TimeoutException.java20
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/http/package-info.java7
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/package-info.java17
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/selection/FederationTarget.java68
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/selection/TargetSelector.java35
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/selection/package-info.java7
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/sourceref/SearchChainInvocationSpec.java37
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/sourceref/SearchChainResolver.java160
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/sourceref/SingleTarget.java36
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/sourceref/SourceRefResolver.java71
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/sourceref/SourcesTarget.java112
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/sourceref/Target.java31
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/sourceref/UnresolvedProviderException.java22
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/sourceref/UnresolvedSearchChainException.java13
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/sourceref/UnresolvedSourceRefException.java21
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/vespa/QueryMarshaller.java170
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/vespa/ResultBuilder.java642
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/vespa/VespaSearcher.java270
-rw-r--r--container-search/src/main/java/com/yahoo/search/federation/vespa/package-info.java7
35 files changed, 5194 insertions, 0 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/federation/CommonFields.java b/container-search/src/main/java/com/yahoo/search/federation/CommonFields.java
new file mode 100644
index 00000000000..912a1db6202
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/CommonFields.java
@@ -0,0 +1,22 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.federation;
+/**
+ * A set of string constants for common hit field names.
+ * @author laboisse
+ *
+ */
+public class CommonFields {
+
+ public static final String TITLE = "title";
+ public static final String URL = "url";
+ public static final String DESCRIPTION = "description";
+ public static final String DATE = "date";
+ public static final String SIZE = "size";
+ public static final String DISP_URL = "dispurl";
+ public static final String BASE_URL = "baseurl";
+ public static final String MIME_TYPE = "mimetype";
+ public static final String RELEVANCY = "relevancy";
+ public static final String THUMBNAIL_URL = "thumbnailUrl";
+ public static final String THUMBNAIL_WIDTH = "thumbnailWidth";
+ public static final String THUMBNAIL_HEIGHT = "thumbnailHeight";
+}
diff --git a/container-search/src/main/java/com/yahoo/search/federation/FederationSearcher.java b/container-search/src/main/java/com/yahoo/search/federation/FederationSearcher.java
new file mode 100644
index 00000000000..4ec04d0d577
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/FederationSearcher.java
@@ -0,0 +1,948 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.federation;
+
+import com.google.inject.Inject;
+import com.yahoo.component.ComponentId;
+import com.yahoo.component.ComponentSpecification;
+import com.yahoo.component.chain.Chain;
+import com.yahoo.component.chain.dependencies.After;
+import com.yahoo.component.chain.dependencies.Provides;
+import com.yahoo.component.provider.ComponentRegistry;
+import com.yahoo.concurrent.CopyOnWriteHashMap;
+import com.yahoo.errorhandling.Results.Builder;
+import com.yahoo.prelude.IndexFacts;
+import com.yahoo.processing.request.CompoundName;
+import com.yahoo.search.Query;
+import com.yahoo.search.Result;
+import com.yahoo.search.Searcher;
+import com.yahoo.search.federation.selection.FederationTarget;
+import com.yahoo.search.federation.selection.TargetSelector;
+import com.yahoo.search.federation.sourceref.SearchChainInvocationSpec;
+import com.yahoo.search.federation.sourceref.SearchChainResolver;
+import com.yahoo.search.federation.sourceref.SingleTarget;
+import com.yahoo.search.federation.sourceref.SourceRefResolver;
+import com.yahoo.search.federation.sourceref.SourcesTarget;
+import com.yahoo.search.federation.sourceref.Target;
+import com.yahoo.search.federation.sourceref.UnresolvedSearchChainException;
+import com.yahoo.search.query.Properties;
+import com.yahoo.search.query.properties.QueryProperties;
+import com.yahoo.search.query.properties.SubProperties;
+import com.yahoo.search.result.ErrorMessage;
+import com.yahoo.search.result.Hit;
+import com.yahoo.search.result.HitGroup;
+import com.yahoo.search.result.HitOrderer;
+import com.yahoo.search.searchchain.AsyncExecution;
+import com.yahoo.search.searchchain.Execution;
+import com.yahoo.search.searchchain.ForkingSearcher;
+import com.yahoo.search.searchchain.FutureResult;
+import com.yahoo.search.searchchain.SearchChainRegistry;
+import com.yahoo.search.searchchain.model.federation.FederationOptions;
+import com.yahoo.errorhandling.Results;
+
+import org.apache.commons.lang.StringUtils;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.yahoo.collections.CollectionUtil.first;
+import static com.yahoo.container.util.Util.quote;
+import static com.yahoo.search.federation.StrictContractsConfig.PropagateSourceProperties;
+
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+
+/**
+ * This searcher takes a set of sources, looks them up in config and fire off the correct searchchains.
+ *
+ * @author <a href="mailto:arnebef@yahoo-inc.com">Arne Bergene Fossaa</a>
+ * @author tonytv
+ */
+@Provides(FederationSearcher.FEDERATION)
+@After("*")
+public class FederationSearcher extends ForkingSearcher {
+ public static final String FEDERATION = "Federation";
+
+ private static abstract class TargetHandler {
+ abstract Chain<Searcher> getChain();
+ abstract void modifyTargetQuery(Query query);
+ abstract void modifyTargetResult(Result result);
+
+ ComponentId getId() {
+ return getChain().getId();
+ }
+
+ public abstract FederationOptions federationOptions();
+
+ @Override
+ public String toString() {
+ return getChain().getId().stringValue();
+ }
+
+ }
+
+ private static class StandardTargetHandler extends TargetHandler {
+ private final SearchChainInvocationSpec target;
+ private final Chain<Searcher> chain;
+
+ public StandardTargetHandler(SearchChainInvocationSpec target, Chain<Searcher> chain) {
+ this.target = target;
+ this.chain = chain;
+ }
+
+ @Override
+ Chain<Searcher> getChain() {
+ return chain;
+ }
+
+ @Override
+ void modifyTargetQuery(Query query) {}
+ @Override
+ void modifyTargetResult(Result result) {}
+
+ @Override
+ public FederationOptions federationOptions() {
+ return target.federationOptions;
+ }
+ }
+
+
+ private static class CustomTargetHandler<T> extends TargetHandler {
+ private final TargetSelector<T> selector;
+ private final FederationTarget<T> target;
+
+ CustomTargetHandler(TargetSelector<T> selector, FederationTarget<T> target) {
+ this.selector = selector;
+ this.target = target;
+ }
+
+ @Override
+ Chain<Searcher> getChain() {
+ return target.getChain();
+ }
+
+ @Override
+ public void modifyTargetQuery(Query query) {
+ selector.modifyTargetQuery(target, query);
+ }
+
+ @Override
+ public void modifyTargetResult(Result result) {
+ selector.modifyTargetResult(target, result);
+ }
+
+ @Override
+ public FederationOptions federationOptions() {
+ return target.getFederationOptions();
+ }
+ }
+
+
+
+ private static class ExecutionInfo {
+ final TargetHandler targetHandler;
+ final FederationOptions federationOptions;
+ final FutureResult futureResult;
+
+ public ExecutionInfo(TargetHandler targetHandler, FederationOptions federationOptions, FutureResult futureResult) {
+ this.targetHandler = targetHandler;
+ this.federationOptions = federationOptions;
+ this.futureResult = futureResult;
+ }
+ }
+
+ private static class CompoundKey {
+ private final String sourceName;
+ private final String propertyName;
+ CompoundKey(String sourceName, String propertyName) {
+ this.sourceName = sourceName;
+ this.propertyName = propertyName;
+ }
+
+ @Override
+ public int hashCode() {
+ return sourceName.hashCode() ^ propertyName.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ CompoundKey rhs = (CompoundKey) o;
+ return sourceName.equals(rhs.sourceName) && propertyName.equals(rhs.propertyName);
+ }
+
+ @Override
+ public String toString() {
+ return sourceName + '.' + propertyName;
+ }
+ }
+
+ private static class SourceKey extends CompoundKey {
+ public static final String SOURCE = "source.";
+ SourceKey(String sourceName, String propertyName) {
+ super(sourceName, propertyName);
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode() ^ 7;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return (o instanceof SourceKey) && super.equals(o);
+ }
+
+ @Override
+ public String toString() {
+ return SOURCE + super.toString();
+ }
+ }
+ private static class ProviderKey extends CompoundKey {
+ public static final String PROVIDER = "provider.";
+ ProviderKey(String sourceName, String propertyName) {
+ super(sourceName, propertyName);
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode() ^ 17;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ return (o instanceof ProviderKey) && super.equals(o);
+ }
+
+ @Override
+ public String toString() {
+ return PROVIDER + super.toString();
+ }
+ }
+
+ private static final Logger log = Logger.getLogger(FederationSearcher.class.getName());
+
+ /** The name of the query property containing the source name added to the query to each source by this */
+ public final static CompoundName SOURCENAME = new CompoundName("sourceName");
+ public final static CompoundName PROVIDERNAME = new CompoundName("providerName");
+
+
+ /** Logging field name constants */
+ public static final String LOG_COUNT_PREFIX = "count_";
+
+ private final SearchChainResolver searchChainResolver;
+ private final PropagateSourceProperties.Enum propagateSourceProperties;
+ private final SourceRefResolver sourceRefResolver;
+ private final CopyOnWriteHashMap<CompoundKey, CompoundName> map = new CopyOnWriteHashMap<>();
+
+ private final boolean strictSearchchain;
+ private final TargetSelector<?> targetSelector;
+
+
+ @Inject
+ public FederationSearcher(FederationConfig config, StrictContractsConfig strict,
+ ComponentRegistry<TargetSelector> targetSelectors) {
+ this(createResolver(config), strict.searchchains(), strict.propagateSourceProperties(),
+ resolveSelector(config.targetSelector(), targetSelectors));
+ }
+
+ private static TargetSelector resolveSelector(String selectorId, ComponentRegistry<TargetSelector> targetSelectors) {
+ if (selectorId.isEmpty())
+ return null;
+
+ return checkNotNull(
+ targetSelectors.getComponent(selectorId),
+ "Missing target selector with id" + quote(selectorId));
+ }
+
+ //for testing
+ public FederationSearcher(ComponentId id, SearchChainResolver searchChainResolver) {
+ this(searchChainResolver, false, PropagateSourceProperties.ALL, null);
+ }
+
+ private FederationSearcher(SearchChainResolver searchChainResolver, boolean strictSearchchain,
+ PropagateSourceProperties.Enum propagateSourceProperties,
+ TargetSelector targetSelector) {
+ this.searchChainResolver = searchChainResolver;
+ sourceRefResolver = new SourceRefResolver(searchChainResolver);
+ this.strictSearchchain = strictSearchchain;
+ this.propagateSourceProperties = propagateSourceProperties;
+ this.targetSelector = targetSelector;
+ }
+
+
+ private static SearchChainResolver createResolver(FederationConfig config) {
+ SearchChainResolver.Builder builder = new SearchChainResolver.Builder();
+
+ for (FederationConfig.Target target : config.target()) {
+ boolean isDefaultProviderForSource = true;
+
+ for (FederationConfig.Target.SearchChain searchChain : target.searchChain()) {
+ if (searchChain.providerId() == null || searchChain.providerId().isEmpty()) {
+ addSearchChain(builder, target, searchChain);
+ } else {
+ addSourceForProvider(builder, target, searchChain, isDefaultProviderForSource);
+ isDefaultProviderForSource = false;
+ }
+ }
+
+ //Allow source groups to use by default.
+ if (target.useByDefault())
+ builder.useTargetByDefault(target.id());
+ }
+
+ return builder.build();
+ }
+
+ private static void addSearchChain(SearchChainResolver.Builder builder,
+ FederationConfig.Target target, FederationConfig.Target.SearchChain searchChain) {
+ if (!target.id().equals(searchChain.searchChainId()))
+ throw new RuntimeException("Invalid federation config, " + target.id() + " != " + searchChain.searchChainId());
+
+ builder.addSearchChain(ComponentId.fromString(searchChain.searchChainId()),
+ federationOptions(searchChain), searchChain.documentTypes());
+ }
+
+ private static void addSourceForProvider(SearchChainResolver.Builder builder, FederationConfig.Target target,
+ FederationConfig.Target.SearchChain searchChain, boolean isDefaultProvider) {
+ builder.addSourceForProvider(
+ ComponentId.fromString(target.id()),
+ ComponentId.fromString(searchChain.providerId()),
+ ComponentId.fromString(searchChain.searchChainId()),
+ isDefaultProvider, federationOptions(searchChain),
+ searchChain.documentTypes());
+ }
+
+ private static FederationOptions federationOptions(FederationConfig.Target.SearchChain searchChain) {
+ return new FederationOptions().
+ setOptional(searchChain.optional()).
+ setUseByDefault(searchChain.useByDefault()).
+ setTimeoutInMilliseconds(searchChain.timeoutMillis()).
+ setRequestTimeoutInMilliseconds(searchChain.requestTimeoutMillis());
+ }
+
+ private static long calculateTimeout(Query query, List<TargetHandler> targets) {
+
+ class PartitionByOptional {
+ final List<TargetHandler> mandatoryTargets;
+ final List<TargetHandler> optionalTargets;
+
+ PartitionByOptional(List<TargetHandler> targets) {
+ List<TargetHandler> mandatoryTargets = new ArrayList<>();
+ List<TargetHandler> optionalTargets = new ArrayList<>();
+
+ for (TargetHandler target : targets) {
+ if (target.federationOptions().getOptional()) {
+ optionalTargets.add(target);
+ } else {
+ mandatoryTargets.add(target);
+ }
+ }
+
+ this.mandatoryTargets = Collections.unmodifiableList(mandatoryTargets);
+ this.optionalTargets = Collections.unmodifiableList(optionalTargets);
+ }
+ }
+
+ if (query.requestHasProperty("timeout") || targets.isEmpty()) {
+ return query.getTimeLeft();
+ } else {
+ PartitionByOptional partition = new PartitionByOptional(targets);
+ long queryTimeout = query.getTimeout();
+
+ return partition.mandatoryTargets.isEmpty() ?
+ maximumTimeout(partition.optionalTargets, queryTimeout) :
+ maximumTimeout(partition.mandatoryTargets, queryTimeout);
+ }
+ }
+
+ private static long maximumTimeout(List<TargetHandler> invocationSpecs, long queryTimeout) {
+ long timeout = 0;
+ for (TargetHandler target : invocationSpecs) {
+ timeout = Math.max(timeout,
+ target.federationOptions().getSearchChainExecutionTimeoutInMilliseconds(queryTimeout));
+ }
+ return timeout;
+ }
+
+ private void addSearchChainTimedOutError(Query query,
+ ComponentId searchChainId) {
+ ErrorMessage timeoutMessage=
+ ErrorMessage.createTimeout("The search chain '" + searchChainId + "' timed out.");
+ timeoutMessage.setSource(searchChainId.stringValue());
+ query.errors().add(timeoutMessage);
+ }
+
+ private void mergeResult(Query query, TargetHandler targetHandler,
+ Result mergedResults, Result result) {
+
+
+ targetHandler.modifyTargetResult(result);
+ final ComponentId searchChainId = targetHandler.getId();
+ Chain<Searcher> searchChain = targetHandler.getChain();
+
+ mergedResults.mergeWith(result);
+ HitGroup group = result.hits();
+ group.setId("source:" + searchChainId.getName());
+
+ group.setSearcherSpecificMetaData(this, searchChain);
+ group.setMeta(false); // Set hit groups as non-meta as a default
+ group.setAuxiliary(true); // Set hit group as auxiliary so that it doesn't contribute to count
+ group.setSource(searchChainId.getName());
+ group.setQuery(result.getQuery());
+
+ for (Iterator<Hit> it = group.unorderedDeepIterator(); it.hasNext();) {
+ Hit hit = it.next();
+ hit.setSearcherSpecificMetaData(this, searchChain);
+ hit.setSource(searchChainId.stringValue());
+
+ // This is the backend request meta hit, that is holding logging information
+ // See HTTPBackendSearcher, where this hit is created
+ if (hit.isMeta() && hit.types().contains("logging")) {
+ // Augment this hit with count fields
+ hit.setField(LOG_COUNT_PREFIX + "deep", result.getDeepHitCount());
+ hit.setField(LOG_COUNT_PREFIX + "total", result.getTotalHitCount());
+ int offset = result.getQuery().getOffset();
+ hit.setField(LOG_COUNT_PREFIX + "first", offset + 1);
+ hit.setField(LOG_COUNT_PREFIX + "last", result.getConcreteHitCount() + offset);
+ }
+
+ }
+ if (query.getTraceLevel()>=4)
+ query.trace("Got " + group.getConcreteSize() + " hits from " + group.getId(),false, 4);
+ mergedResults.hits().add(group);
+ }
+
+ private boolean successfullyCompleted(FutureResult result) {
+ return result.isDone() && !result.isCancelled();
+ }
+
+ private Query setupSingleQuery(Query query, long timeout, TargetHandler targetHandler) {
+ if (strictSearchchain) {
+ query.resetTimeout();
+ return setupFederationQuery(query, query,
+ windowParameters(query.getHits(), query.getOffset()), timeout, targetHandler);
+ } else {
+ return cloneFederationQuery(query,
+ windowParameters(query.getHits(), query.getOffset()), timeout, targetHandler);
+ }
+ }
+
+ private Result startExecuteSingleQuery(Query query, TargetHandler chain, long timeout, Execution execution) {
+ Query outgoing = setupSingleQuery(query, timeout, chain);
+ Execution exec = new Execution(chain.getChain(), execution.context());
+ return exec.search(outgoing);
+ }
+
+ private List<ExecutionInfo> startExecuteQueryForEachTarget(
+ Query query, Collection<TargetHandler> targets, long timeout, Execution execution) {
+
+ List<ExecutionInfo> results = new ArrayList<>();
+
+ Map<String, Object> windowParameters;
+ if (targets.size()==1) // preserve requested top-level offset by default as an optimization
+ windowParameters = Collections.unmodifiableMap(windowParameters(query.getHits(), query.getOffset()));
+ else // request from offset 0 to enable correct upstream blending into a single top-level hit list
+ windowParameters = Collections.unmodifiableMap(windowParameters(query.getHits() + query.getOffset(), 0));
+
+ for (TargetHandler targetHandler : targets) {
+ long executeTimeout = timeout;
+ if (targetHandler.federationOptions().getRequestTimeoutInMilliseconds() != -1)
+ executeTimeout = targetHandler.federationOptions().getRequestTimeoutInMilliseconds();
+ results.add(new ExecutionInfo(targetHandler, targetHandler.federationOptions(),
+ createFutureSearch(query, windowParameters, targetHandler, executeTimeout, execution)));
+ }
+
+ return results;
+ }
+
+ private Map<String, Object> windowParameters(int hits, int offset) {
+ Map<String, Object> params = new HashMap<>();
+ params.put(Query.HITS.toString(), hits);
+ params.put(Query.OFFSET.toString(), offset);
+ return params;
+ }
+
+ private FutureResult createFutureSearch(Query query, Map<String, Object> windowParameters, TargetHandler targetHandler,
+ long timeout, Execution execution) {
+ Query clonedQuery = cloneFederationQuery(query, windowParameters, timeout, targetHandler);
+ return new AsyncExecution(targetHandler.getChain(), execution).search(clonedQuery);
+ }
+
+
+ private Query cloneFederationQuery(Query query,
+ Map<String, Object> windowParameters, long timeout, TargetHandler targetHandler) {
+ Query clonedQuery = Query.createNewQuery(query);
+ return setupFederationQuery(query, clonedQuery, windowParameters, timeout, targetHandler);
+ }
+
+ private Query setupFederationQuery(Query query, Query outgoing,
+ Map<String, Object> windowParameters, long timeout, TargetHandler targetHandler) {
+
+ ComponentId chainId = targetHandler.getChain().getId();
+
+ String sourceName = chainId.getName();
+ outgoing.properties().set(SOURCENAME, sourceName);
+ String providerName = chainId.getName();
+ if (chainId.getNamespace() != null)
+ providerName = chainId.getNamespace().getName();
+ outgoing.properties().set(PROVIDERNAME, providerName);
+
+ outgoing.setTimeout(timeout);
+
+ switch (propagateSourceProperties) {
+ case ALL:
+ propagatePerSourceQueryProperties(query, outgoing, windowParameters, sourceName, providerName,
+ QueryProperties.PER_SOURCE_QUERY_PROPERTIES);
+ break;
+ case OFFSET_HITS:
+ propagatePerSourceQueryProperties(query, outgoing, windowParameters, sourceName, providerName,
+ new CompoundName[]{Query.OFFSET, Query.HITS});
+ break;
+ }
+
+ //TODO: FederationTarget
+ //TODO: only for target produced by this, not others
+ targetHandler.modifyTargetQuery(outgoing);
+ return outgoing;
+ }
+
+ private void propagatePerSourceQueryProperties(Query original, Query outgoing,
+ Map<String, Object> windowParameters,
+ String sourceName, String providerName,
+ CompoundName[] queryProperties) {
+
+ for (CompoundName key : queryProperties) {
+ Object value = getSourceOrProviderProperty(original, key, sourceName, providerName, windowParameters.get(key.toString()));
+ if (value != null) {
+ outgoing.properties().set(key, value);
+ }
+ }
+ }
+
+ private Object getSourceOrProviderProperty(Query query, CompoundName propertyName,
+ String sourceName, String providerName,
+ Object defaultValue) {
+ Object result = getProperty(query, new SourceKey(sourceName, propertyName.toString()));
+ if (result == null)
+ result = getProperty(query, new ProviderKey(providerName, propertyName.toString()));
+ if (result == null)
+ result = defaultValue;
+
+ return result;
+ }
+
+ private Object getProperty(Query query, CompoundKey key) {
+
+ CompoundName name = map.get(key);
+ if (name == null) {
+ name = new CompoundName(key.toString());
+ map.put(key, name);
+ }
+ return query.properties().get(name);
+ }
+
+ private ErrorMessage missingSearchChainsErrorMessage(List<UnresolvedSearchChainException> unresolvedSearchChainExceptions) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(StringUtils.join(getMessagesSet(unresolvedSearchChainExceptions), ' '));
+
+
+ sb.append(" Valid source refs are ");
+ sb.append(
+ StringUtils.join(allSourceRefDescriptions().iterator(),
+ ", ")).append('.');
+
+ return ErrorMessage.createInvalidQueryParameter(sb.toString());
+ }
+
+ private List<String> allSourceRefDescriptions() {
+ List<String> descriptions = new ArrayList<>();
+
+ for (Target target : searchChainResolver.allTopLevelTargets()) {
+ descriptions.add(target.searchRefDescription());
+ }
+ return descriptions;
+ }
+
+ private Set<String> getMessagesSet(List<UnresolvedSearchChainException> unresolvedSearchChainExceptions) {
+ Set<String> messages = new LinkedHashSet<>();
+ for (UnresolvedSearchChainException exception : unresolvedSearchChainExceptions) {
+ messages.add(exception.getMessage());
+ }
+ return messages;
+ }
+
+ private void warnIfUnresolvedSearchChains(List<UnresolvedSearchChainException> missingTargets,
+ HitGroup errorHitGroup) {
+
+ if (!missingTargets.isEmpty()) {
+ errorHitGroup.addError(missingSearchChainsErrorMessage(missingTargets));
+ }
+ }
+
+ @Override
+ public Collection<CommentedSearchChain> getSearchChainsForwarded(SearchChainRegistry registry) {
+ List<CommentedSearchChain> searchChains = new ArrayList<>();
+
+ for (Target target : searchChainResolver.allTopLevelTargets()) {
+ if (target instanceof SourcesTarget) {
+ searchChains.addAll(commentedSourceProviderSearchChains((SourcesTarget)target, registry));
+ } else if (target instanceof SingleTarget) {
+ searchChains.add(commentedSearchChain((SingleTarget)target, registry));
+ } else {
+ log.warning("Invalid target type " + target.getClass().getName());
+ }
+ }
+
+ return searchChains;
+ }
+
+ private CommentedSearchChain commentedSearchChain(SingleTarget singleTarget, SearchChainRegistry registry) {
+ return new CommentedSearchChain("If source refs contains '" + singleTarget.getId() + "'.",
+ registry.getChain(singleTarget.getId()));
+ }
+
+ private List<CommentedSearchChain> commentedSourceProviderSearchChains(SourcesTarget sourcesTarget,
+ SearchChainRegistry registry) {
+
+ List<CommentedSearchChain> commentedSearchChains = new ArrayList<>();
+ String ifMatchingSourceRefPrefix = "If source refs contains '" + sourcesTarget.getId() + "' and provider is '";
+
+ commentedSearchChains.add(
+ new CommentedSearchChain(ifMatchingSourceRefPrefix + sourcesTarget.defaultProviderSource().provider +
+ "'(or not given).", registry.getChain(sourcesTarget.defaultProviderSource().searchChainId)));
+
+ for (SearchChainInvocationSpec providerSource : sourcesTarget.allProviderSources()) {
+ if (!providerSource.equals(sourcesTarget.defaultProviderSource())) {
+ commentedSearchChains.add(
+ new CommentedSearchChain(ifMatchingSourceRefPrefix + providerSource.provider + "'.",
+ registry.getChain(providerSource.searchChainId)));
+ }
+ }
+ return commentedSearchChains;
+ }
+
+ /** Returns the set of properties set for the source or provider given in the query (if any).
+ *
+ * If the query has not set sourceName or providerName, null will be returned */
+ public static Properties getSourceProperties(Query query) {
+ String sourceName = query.properties().getString(SOURCENAME);
+ String providerName = query.properties().getString(PROVIDERNAME);
+ if (sourceName == null || providerName == null)
+ return null;
+ Properties sourceProperties = new SubProperties("source." + sourceName, query.properties());
+ Properties providerProperties = new SubProperties("provider." + providerName, query.properties());
+ sourceProperties.chain(providerProperties);
+ return sourceProperties;
+ }
+
+ @Override
+ public void fill(final Result result, final String summaryClass, Execution execution) {
+ List<FutureResult> filledResults = new ArrayList<>();
+ UniqueExecutionsToResults uniqueExecutionsToResults = new UniqueExecutionsToResults();
+ addResultsToFill(result.hits(), result, summaryClass, uniqueExecutionsToResults);
+ final Set<Entry<Chain<Searcher>, Map<Query, Result>>> resultsForAllChains = uniqueExecutionsToResults.resultsToFill
+ .entrySet();
+ int numberOfCallsToFillNeeded = 0;
+
+ for (Entry<Chain<Searcher>, Map<Query, Result>> resultsToFillForAChain : resultsForAllChains) {
+ numberOfCallsToFillNeeded += resultsToFillForAChain.getValue().size();
+ }
+
+ for (Entry<Chain<Searcher>, Map<Query, Result>> resultsToFillForAChain : resultsForAllChains) {
+ Chain<Searcher> chain = resultsToFillForAChain.getKey();
+ Execution chainExecution = (chain == null) ? execution : new Execution(chain, execution.context());
+
+ for (Entry<Query, Result> resultsToFillForAChainAndQuery : resultsToFillForAChain.getValue().entrySet()) {
+ Result resultToFill = resultsToFillForAChainAndQuery.getValue();
+ if (numberOfCallsToFillNeeded == 1) {
+ chainExecution.fill(resultToFill, summaryClass);
+ propagateErrors(resultToFill, result);
+ } else {
+ AsyncExecution asyncFill = new AsyncExecution(chainExecution);
+ filledResults.add(asyncFill.fill(resultToFill, summaryClass));
+ }
+ }
+ }
+ for (FutureResult filledResult : filledResults) {
+ propagateErrors(filledResult.get(result.getQuery().getTimeLeft(), TimeUnit.MILLISECONDS), result);
+ }
+ }
+
+ private void propagateErrors(Result source, Result destination) {
+ ErrorMessage error = source.hits().getError();
+ if (error != null)
+ destination.hits().addError(error);
+ }
+
+ /** A map from a unique search chain and query instance to a result */
+ private static class UniqueExecutionsToResults {
+
+ /** Implemented as a nested identity hashmap */
+ final Map<Chain<Searcher>,Map<Query,Result>> resultsToFill = new IdentityHashMap<>();
+
+ /** Returns a result to fill for a query and chain, by creating it if necessary */
+ public Result get(Chain<Searcher> chain, Query query) {
+ Map<Query,Result> resultsToFillForAChain = resultsToFill.get(chain);
+ if (resultsToFillForAChain == null) {
+ resultsToFillForAChain = new IdentityHashMap<>();
+ resultsToFill.put(chain,resultsToFillForAChain);
+ }
+
+ Result resultsToFillForAChainAndQuery = resultsToFillForAChain.get(query);
+ if (resultsToFillForAChainAndQuery == null) {
+ resultsToFillForAChainAndQuery = new Result(query);
+ resultsToFillForAChain.put(query,resultsToFillForAChainAndQuery);
+ }
+
+ return resultsToFillForAChainAndQuery;
+ }
+
+ }
+
+ private void addResultsToFill(HitGroup hitGroup, Result result, String summaryClass,
+ UniqueExecutionsToResults uniqueExecutionsToResults) {
+ for (Hit hit : hitGroup) {
+ if (hit instanceof HitGroup) {
+ addResultsToFill((HitGroup) hit, result, summaryClass, uniqueExecutionsToResults);
+ } else {
+ if ( ! hit.isFilled(summaryClass))
+ getSearchChainGroup(hit,result,uniqueExecutionsToResults).hits().add(hit);
+ }
+ }
+ }
+
+ private Result getSearchChainGroup(Hit hit, Result result, UniqueExecutionsToResults uniqueExecutionsToResults) {
+ @SuppressWarnings("unchecked")
+ Chain<Searcher> chain = (Chain<Searcher>) hit.getSearcherSpecificMetaData(this);
+ Query query = hit.getQuery() !=null ? hit.getQuery() : result.getQuery();
+
+ return uniqueExecutionsToResults.get(chain,query);
+ }
+
+ private void searchMultipleTargets(Query query, Result mergedResults,
+ Collection<TargetHandler> targets,
+ long timeout,
+ Execution execution) {
+
+ List<ExecutionInfo> executionInfos = startExecuteQueryForEachTarget(query, targets, timeout, execution);
+ waitForMandatoryTargets(executionInfos, query.getTimeout());
+
+ HitOrderer s=null;
+ for (ExecutionInfo executionInfo : executionInfos) {
+ if ( ! successfullyCompleted(executionInfo.futureResult)) {
+ addSearchChainTimedOutError(query, executionInfo.targetHandler.getId());
+ } else {
+ if (s == null) {
+ s = dirtyCopyIfModifiedOrderer(mergedResults.hits(), executionInfo.futureResult.get().hits().getOrderer());
+ }
+ mergeResult(query, executionInfo.targetHandler, mergedResults, executionInfo.futureResult.get());
+
+ }
+ }
+ }
+
+ /**
+ * TODO This is probably a dirty hack for bug 4711376. There are probably better ways.
+ * But I will leave that to trd-processing@
+ *
+ * @param group The merging hitgroup to be updated if necessary
+ * @param orderer The per provider hit orderer.
+ * @return The hitorderer chosen
+ */
+ private HitOrderer dirtyCopyIfModifiedOrderer(HitGroup group, HitOrderer orderer) {
+ if (orderer != null) {
+ HitOrderer old = group.getOrderer();
+ if ((old == null) || ! orderer.equals(old)) {
+ group.setOrderer(orderer);
+ }
+ }
+
+ return orderer;
+ }
+
+ private void waitForMandatoryTargets(List<ExecutionInfo> executionInfos, long queryTimeout) {
+ FutureWaiter futureWaiter = new FutureWaiter();
+
+ boolean hasMandatoryTargets = false;
+ for (ExecutionInfo executionInfo : executionInfos) {
+ if (isMandatory(executionInfo)) {
+ futureWaiter.add(executionInfo.futureResult,
+ getSearchChainExecutionTimeoutInMilliseconds(executionInfo, queryTimeout));
+ hasMandatoryTargets = true;
+ }
+ }
+
+ if (!hasMandatoryTargets) {
+ for (ExecutionInfo executionInfo : executionInfos) {
+ futureWaiter.add(executionInfo.futureResult,
+ getSearchChainExecutionTimeoutInMilliseconds(executionInfo, queryTimeout));
+ }
+ }
+
+ futureWaiter.waitForFutures();
+ }
+
+ private long getSearchChainExecutionTimeoutInMilliseconds(ExecutionInfo executionInfo, long queryTimeout) {
+ return executionInfo.federationOptions.
+ getSearchChainExecutionTimeoutInMilliseconds(queryTimeout);
+ }
+
+ private boolean isMandatory(ExecutionInfo executionInfo) {
+ return !executionInfo.federationOptions.getOptional();
+ }
+
+ private void searchSingleTarget(Query query, Result mergedResults,
+ TargetHandler targetHandler,
+ long timeout,
+ Execution execution) {
+ Result result = startExecuteSingleQuery(query, targetHandler, timeout, execution);
+ mergeResult(query, targetHandler, mergedResults, result);
+ }
+
+
+ private Results<SearchChainInvocationSpec, UnresolvedSearchChainException> getTargets(Set<String> sources, Properties properties, IndexFacts indexFacts) {
+ return sources.isEmpty() ?
+ defaultSearchChains(properties):
+ resolveSources(sources, properties, indexFacts);
+ }
+
+ private Results<SearchChainInvocationSpec, UnresolvedSearchChainException> resolveSources(Set<String> sources, Properties properties, IndexFacts indexFacts) {
+ Results.Builder<SearchChainInvocationSpec, UnresolvedSearchChainException> result = new Builder<>();
+
+ for (String source : sources) {
+ try {
+ result.addAllData(sourceRefResolver.resolve(asSourceSpec(source), properties, indexFacts));
+ } catch (UnresolvedSearchChainException e) {
+ result.addError(e);
+ }
+ }
+
+ return result.build();
+ }
+
+
+ public Results<SearchChainInvocationSpec, UnresolvedSearchChainException> defaultSearchChains(Properties sourceToProviderMap) {
+ Results.Builder<SearchChainInvocationSpec, UnresolvedSearchChainException> result = new Builder<>();
+
+ for (Target target : searchChainResolver.defaultTargets()) {
+ try {
+ result.addData(target.responsibleSearchChain(sourceToProviderMap));
+ } catch (UnresolvedSearchChainException e) {
+ result.addError(e);
+ }
+ }
+
+ return result.build();
+ }
+
+
+ private ComponentSpecification asSourceSpec(String source) {
+ try {
+ return new ComponentSpecification(source);
+ } catch(Exception e) {
+ throw new IllegalArgumentException("The source ref '" + source
+ + "' used for federation is not valid.", e);
+ }
+ }
+
+ @Override
+ public Result search(Query query, Execution execution) {
+ Result mergedResults = execution.search(query);
+
+ Results<SearchChainInvocationSpec, UnresolvedSearchChainException> targets =
+ getTargets(query.getModel().getSources(), query.properties(), execution.context().getIndexFacts());
+ warnIfUnresolvedSearchChains(targets.errors(), mergedResults.hits());
+
+ Collection<SearchChainInvocationSpec> prunedTargets =
+ pruneTargetsWithoutDocumentTypes(query.getModel().getRestrict(), targets.data());
+
+ Results<TargetHandler, ErrorMessage> regularTargetHandlers = resolveSearchChains(prunedTargets, execution.searchChainRegistry());
+ query.errors().addAll(regularTargetHandlers.errors());
+
+ List<TargetHandler> targetHandlers = new ArrayList<>(regularTargetHandlers.data());
+ targetHandlers.addAll(getAdditionalTargets(query, execution, targetSelector));
+
+ final long targetsTimeout = calculateTimeout(query, targetHandlers);
+ if (targetsTimeout < 0)
+ return new Result(query, ErrorMessage.createTimeout("Timed out when about to federate"));
+
+ traceTargets(query, targetHandlers);
+
+ if (targetHandlers.size() == 0) {
+ return mergedResults;
+ } else if (targetHandlers.size() == 1 && ! shouldExecuteTargetLongerThanThread(query, targetHandlers.get(0))) {
+ TargetHandler chain = first(targetHandlers);
+ searchSingleTarget(query, mergedResults, chain, targetsTimeout, execution);
+ } else {
+ searchMultipleTargets(query, mergedResults, targetHandlers, targetsTimeout, execution);
+ }
+
+ return mergedResults;
+ }
+
+ private void traceTargets(Query query, List<TargetHandler> targetHandlers) {
+ final int traceFederationLevel = 2;
+ if ( ! query.isTraceable(traceFederationLevel)) return;
+ query.trace("Federating to " + targetHandlers, traceFederationLevel);
+ }
+
+ /**
+ * Returns true if we are requested to keep executing a target longer than we're waiting for it.
+ * This is useful to populate caches inside targets.
+ */
+ private boolean shouldExecuteTargetLongerThanThread(Query query, TargetHandler target) {
+ return target.federationOptions().getRequestTimeoutInMilliseconds() > query.getTimeout();
+ }
+
+ private static Results<TargetHandler, ErrorMessage> resolveSearchChains(
+ Collection<SearchChainInvocationSpec> prunedTargets,
+ SearchChainRegistry registry) {
+
+ Results.Builder<TargetHandler, ErrorMessage> targetHandlers = new Results.Builder<>();
+
+ for (SearchChainInvocationSpec target: prunedTargets) {
+ Chain<Searcher> chain = registry.getChain(target.searchChainId);
+ if (chain == null) {
+ targetHandlers.addError(ErrorMessage.createIllegalQuery(
+ "Could not find search chain '" + target.searchChainId + "'"));
+ } else {
+ targetHandlers.addData(new StandardTargetHandler(target, chain));
+ }
+ }
+
+ return targetHandlers.build();
+ }
+
+ private static <T> List<TargetHandler> getAdditionalTargets(Query query, Execution execution, TargetSelector<T> targetSelector) {
+ if (targetSelector == null)
+ return Collections.emptyList();
+
+ ArrayList<TargetHandler> result = new ArrayList<>();
+ for (FederationTarget<T> target: targetSelector.getTargets(query, execution.searchChainRegistry()))
+ result.add(new CustomTargetHandler<>(targetSelector, target));
+
+ return result;
+ }
+
+ private Collection<SearchChainInvocationSpec> pruneTargetsWithoutDocumentTypes(Set<String> restrict, List<SearchChainInvocationSpec> targets) {
+ if (restrict.isEmpty())
+ return targets;
+
+ Collection<SearchChainInvocationSpec> prunedTargets = new ArrayList<>();
+
+ for (SearchChainInvocationSpec target : targets) {
+ if (target.documentTypes.isEmpty() || documentTypeIntersectionIsNonEmpty(restrict, target))
+ prunedTargets.add(target);
+ }
+
+ return prunedTargets;
+ }
+
+ private boolean documentTypeIntersectionIsNonEmpty(Set<String> restrict, SearchChainInvocationSpec target) {
+ for (String documentType : target.documentTypes) {
+ if (restrict.contains(documentType))
+ return true;
+ }
+
+ return false;
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/federation/ForwardingSearcher.java b/container-search/src/main/java/com/yahoo/search/federation/ForwardingSearcher.java
new file mode 100644
index 00000000000..b43798113de
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/ForwardingSearcher.java
@@ -0,0 +1,106 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.federation;
+
+import com.yahoo.component.ComponentSpecification;
+import com.yahoo.component.chain.Chain;
+import com.yahoo.component.chain.dependencies.After;
+import com.yahoo.prelude.Ping;
+import com.yahoo.prelude.Pong;
+import com.yahoo.search.Query;
+import com.yahoo.search.Result;
+import com.yahoo.search.Searcher;
+import com.yahoo.search.cluster.PingableSearcher;
+import com.yahoo.search.result.ErrorMessage;
+import com.yahoo.search.result.HitGroup;
+import com.yahoo.search.searchchain.Execution;
+
+/**
+ * A lightweight searcher to forward all incoming requests to a single search
+ * chain defined in config. An alternative to federation searcher when standard
+ * semantics are not necessary for the application.
+ *
+ * @see FederationSearcher
+ * @since 5.0.13
+ * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
+ */
+@After("*")
+public class ForwardingSearcher extends PingableSearcher {
+ private final ComponentSpecification target;
+
+ public ForwardingSearcher(final SearchchainForwardConfig config) {
+ if (config.target() == null) {
+ throw new RuntimeException(
+ "Configuration value searchchain-forward.target was null.");
+ }
+ try {
+ target = new ComponentSpecification(config.target());
+ } catch (RuntimeException e) {
+ throw new RuntimeException(
+ "Failed constructing the component specification from searchchain-forward.target: "
+ + config.target(), e);
+ }
+ }
+
+ @Override
+ public Result search(final Query query, final Execution execution) {
+ Execution next = createForward(execution);
+
+ if (next == null) {
+ return badResult(query);
+ } else {
+ return next.search(query);
+ }
+ }
+
+ private Result badResult(final Query query) {
+ final ErrorMessage error = noSearchchain();
+ return new Result(query, error);
+ }
+
+ @Override
+ public Pong ping(final Ping ping, final Execution execution) {
+ Execution next = createForward(execution);
+
+ if (next == null) {
+ return badPong();
+ } else {
+ return next.ping(ping);
+ }
+ }
+
+ private Pong badPong() {
+ final Pong pong = new Pong();
+ pong.addError(noSearchchain());
+ return pong;
+ }
+
+ @Override
+ public void fill(final Result result, final String summaryClass,
+ final Execution execution) {
+ Execution next = createForward(execution);
+ if (next == null) {
+ badFill(result.hits());
+ return;
+ } else {
+ next.fill(result, summaryClass);
+ }
+ }
+
+ private void badFill(HitGroup hits) {
+ hits.addError(noSearchchain());
+ }
+
+ private Execution createForward(Execution execution) {
+ Chain<Searcher> targetChain = execution.context().searchChainRegistry()
+ .getComponent(target);
+ if (targetChain == null) {
+ return null;
+ }
+ return new Execution(targetChain, execution.context());
+ }
+
+ private ErrorMessage noSearchchain() {
+ return ErrorMessage
+ .createServerIsMisconfigured("Could not get search chain matching component specification: " + target);
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/federation/FutureWaiter.java b/container-search/src/main/java/com/yahoo/search/federation/FutureWaiter.java
new file mode 100644
index 00000000000..52cd5397489
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/FutureWaiter.java
@@ -0,0 +1,58 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.federation;
+
+import com.yahoo.search.searchchain.FutureResult;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author tonytv
+ */
+class FutureWaiter {
+ private class Future {
+ final FutureResult result;
+ final long timeoutInMilliseconds;
+
+ public Future(FutureResult result, long timeoutInMilliseconds) {
+ this.result = result;
+ this.timeoutInMilliseconds = timeoutInMilliseconds;
+ }
+ }
+
+ private List<Future> futures = new ArrayList<>();
+
+ public void add(FutureResult futureResult, long timeoutInMilliseconds) {
+ futures.add(new Future(futureResult, timeoutInMilliseconds));
+ }
+
+ public void waitForFutures() {
+ sortFuturesByTimeoutDescending();
+
+ final long startTime = System.currentTimeMillis();
+
+ for (Future future : futures) {
+ long timeToWait = startTime + future.timeoutInMilliseconds - System.currentTimeMillis();
+ if (timeToWait <= 0)
+ break;
+
+ future.result.get(timeToWait, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ private void sortFuturesByTimeoutDescending() {
+ Collections.sort(futures, new Comparator<Future>() {
+ @Override
+ public int compare(Future lhs, Future rhs) {
+ return -compareLongs(lhs.timeoutInMilliseconds, rhs.timeoutInMilliseconds);
+ }
+
+ private int compareLongs(long lhs, long rhs) {
+ return new Long(lhs).compareTo(rhs);
+ }
+ });
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/federation/TimeoutException.java b/container-search/src/main/java/com/yahoo/search/federation/TimeoutException.java
new file mode 100644
index 00000000000..8b7e8a1d9d5
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/TimeoutException.java
@@ -0,0 +1,20 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.federation;
+
+/**
+ * Thrown on timeouts
+ *
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a>
+ */
+@SuppressWarnings("serial")
+public class TimeoutException extends RuntimeException {
+
+ public TimeoutException(String message) {
+ super(message);
+ }
+
+ public TimeoutException(String message,Throwable cause) {
+ super(message,cause);
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/federation/http/ConfiguredHTTPClientSearcher.java b/container-search/src/main/java/com/yahoo/search/federation/http/ConfiguredHTTPClientSearcher.java
new file mode 100644
index 00000000000..576c16f68db
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/http/ConfiguredHTTPClientSearcher.java
@@ -0,0 +1,36 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.federation.http;
+
+import java.util.Collections;
+
+import com.yahoo.component.ComponentId;
+import com.yahoo.search.federation.ProviderConfig;
+import com.yahoo.search.Result;
+import com.yahoo.search.searchchain.Execution;
+import com.yahoo.statistics.Statistics;
+
+
+/**
+ * Superclass for http client searchers which depends on config. All this is doing is translating
+ * the provider and cache configurations to parameters which are passed upwards.
+ *
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a>
+ */
+public abstract class ConfiguredHTTPClientSearcher extends HTTPClientSearcher {
+
+ /** Create this from a configuraton */
+ public ConfiguredHTTPClientSearcher(final ComponentId id, final ProviderConfig providerConfig, Statistics manager) {
+ super(id, ConfiguredSearcherHelper.toConnectionList(providerConfig), new HTTPParameters(providerConfig), manager);
+ }
+
+ /** Create an instance from direct parameters having a single connection. Useful for testing */
+ public ConfiguredHTTPClientSearcher(String idString,String host,int port,String path, Statistics manager) {
+ super(new ComponentId(idString), Collections.singletonList(new Connection(host,port)),path, manager);
+ }
+
+ /** Forwards to the next in chain fill(result,summaryName) */
+ public @Override void fill(Result result,String summaryName, Execution execution,Connection connection) {
+ execution.fill(result,summaryName);
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/federation/http/ConfiguredHTTPProviderSearcher.java b/container-search/src/main/java/com/yahoo/search/federation/http/ConfiguredHTTPProviderSearcher.java
new file mode 100644
index 00000000000..25253f768bd
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/http/ConfiguredHTTPProviderSearcher.java
@@ -0,0 +1,68 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.federation.http;
+
+import com.yahoo.component.ComponentId;
+import com.yahoo.search.federation.ProviderConfig;
+import com.yahoo.search.cache.QrBinaryCacheConfig;
+import com.yahoo.search.cache.QrBinaryCacheRegionConfig;
+import com.yahoo.search.Result;
+import com.yahoo.search.searchchain.Execution;
+import com.yahoo.statistics.Statistics;
+
+import java.util.Collections;
+
+
+/**
+ * Superclass for http provider searchers which depends on config. All this is doing is translating
+ * the provider and cache configurations to parameters which are passed upwards.
+ *
+ * @author <a href="mailto:arnebef@yahoo-inc.com">Arne Bergene Fossaa</a>
+ * @author bratseth
+ */
+public abstract class ConfiguredHTTPProviderSearcher extends HTTPProviderSearcher {
+
+ /** Create this from a configuraton */
+ public ConfiguredHTTPProviderSearcher(final ComponentId id, final ProviderConfig providerConfig, Statistics manager) {
+ super(id,ConfiguredSearcherHelper.toConnectionList(providerConfig),new HTTPParameters(providerConfig), manager);
+ }
+
+ /** Create this from a configuraton */
+ public ConfiguredHTTPProviderSearcher(final ComponentId id, final ProviderConfig providerConfig,
+ HTTPParameters parameters, Statistics manager) {
+ super(id,ConfiguredSearcherHelper.toConnectionList(providerConfig),parameters, manager);
+ }
+
+ /** Create this from a configuraton with a configured cache */
+ public ConfiguredHTTPProviderSearcher(final ComponentId id, final ProviderConfig providerConfig,
+ final QrBinaryCacheConfig cacheConfig,
+ final QrBinaryCacheRegionConfig regionConfig, Statistics manager) {
+ super(id,ConfiguredSearcherHelper.toConnectionList(providerConfig),new HTTPParameters(providerConfig), manager);
+ configureCache(cacheConfig,regionConfig);
+ }
+
+ /** Create this from a configuraton with a configured cache */
+ public ConfiguredHTTPProviderSearcher(final ComponentId id, final ProviderConfig providerConfig,
+ final QrBinaryCacheConfig cacheConfig,
+ final QrBinaryCacheRegionConfig regionConfig, HTTPParameters parameters, Statistics manager) {
+ super(id,ConfiguredSearcherHelper.toConnectionList(providerConfig),parameters, manager);
+ configureCache(cacheConfig,regionConfig);
+ }
+
+ /** Create an instance from direct parameters having a single connection. Useful for testing */
+ public ConfiguredHTTPProviderSearcher(String idString,String host,int port,String path, Statistics manager) {
+ super(new ComponentId(idString), Collections.singletonList(new Connection(host,port)),path, manager);
+ }
+
+ /** Create an instance from direct parameters having a single connection. Useful for testing */
+ public ConfiguredHTTPProviderSearcher(String idString,String host,int port,HTTPParameters parameters, Statistics manager) {
+ super(new ComponentId(idString), Collections.singletonList(new Connection(host,port)),parameters, manager);
+ }
+
+ /**
+ * Override this to provider multi-phase result filling towards a backend.
+ * This default implementation does nothing.
+ */
+ public @Override void fill(Result result,String summaryName, Execution execution,Connection connection) {
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/federation/http/ConfiguredSearcherHelper.java b/container-search/src/main/java/com/yahoo/search/federation/http/ConfiguredSearcherHelper.java
new file mode 100644
index 00000000000..8d3ee016b4f
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/http/ConfiguredSearcherHelper.java
@@ -0,0 +1,27 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.federation.http;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.yahoo.search.federation.ProviderConfig;
+
+/**
+ * Some static helper classes for configured*Searcher classes
+ *
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a>
+ */
+class ConfiguredSearcherHelper {
+
+ /** No instantiation */
+ private ConfiguredSearcherHelper() { }
+
+ public static List<Connection> toConnectionList(ProviderConfig providerConfig) {
+ List<Connection> connections=new ArrayList<>();
+ for(ProviderConfig.Node node : providerConfig.node()) {
+ connections.add(new Connection(node.host(), node.port()));
+ }
+ return connections;
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/federation/http/Connection.java b/container-search/src/main/java/com/yahoo/search/federation/http/Connection.java
new file mode 100644
index 00000000000..88e2c6ad0a0
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/http/Connection.java
@@ -0,0 +1,30 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.federation.http;
+
+/**
+ * Represents a connection to a particular node (host/port).
+ * Right now this is just a container of connection parameters, but might be extended to
+ * contain an open connection later.
+ * The host and port state is immutable.
+ *
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a>
+ */
+public class Connection {
+
+ private String host;
+ private int port;
+
+ public Connection(String host,int port) {
+ this.host=host;
+ this.port=port;
+ }
+
+ public String getHost() { return host; }
+
+ public int getPort() { return port; }
+
+ public String toString() {
+ return "http connection '" + host + ":" + port + "'";
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/federation/http/GzipDecompressingEntity.java b/container-search/src/main/java/com/yahoo/search/federation/http/GzipDecompressingEntity.java
new file mode 100644
index 00000000000..1dc58ecd65e
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/http/GzipDecompressingEntity.java
@@ -0,0 +1,125 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.federation.http;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.entity.HttpEntityWrapper;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.zip.GZIPInputStream;
+
+/**
+ * Used by HTTPSearcher when talking to services returning compressed content.
+ *
+ * @author <a href="mailto:mainak@yahoo-inc.com">Mainak Mandal</a>
+ */
+public class GzipDecompressingEntity extends HttpEntityWrapper {
+
+ private static class Resources {
+
+ byte [] buffer;
+ int total;
+
+ Resources() {
+ total = 0;
+ buffer = new byte[65536];
+ }
+ void drain(InputStream zipStream) throws IOException {
+ int numRead = zipStream.read(buffer, total, buffer.length);
+ while (numRead != -1) {
+ total += numRead;
+ if ((total + 65536) > buffer.length) {
+ buffer = Arrays.copyOf(buffer, buffer.length + numRead);
+ }
+ numRead = zipStream.read(buffer, total, buffer.length - total);
+ }
+ }
+
+ }
+
+ private final Resources resources = new Resources();
+
+ public GzipDecompressingEntity(final HttpEntity entity) throws IllegalStateException, IOException {
+ super(entity);
+ GZIPInputStream gz = new GZIPInputStream(entity.getContent());
+ InputStream zipStream = new BufferedInputStream(gz);
+ try {
+ resources.drain(zipStream);
+ } catch (IOException e) {
+ throw e;
+ } finally {
+ zipStream.close();
+ }
+ }
+
+ @Override
+ public InputStream getContent() throws IOException, IllegalStateException {
+
+ final ByteBuffer buff = ByteBuffer.wrap(resources.buffer, 0, resources.total);
+ return new InputStream() {
+
+ @Override
+ public int available() throws IOException {
+ return buff.remaining();
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (buff.hasRemaining())
+ return buff.get() & 0xFF;
+
+ return -1;
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ if (!buff.hasRemaining())
+ return -1;
+
+ int len = b.length;
+ if (len > buff.remaining())
+ len = buff.remaining();
+ buff.get(b, 0, len);
+ return len;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (!buff.hasRemaining())
+ return -1;
+
+ if (len > buff.remaining())
+ len = buff.remaining();
+ buff.get(b, off, len);
+ return len;
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ if (!buff.hasRemaining())
+ return -1;
+
+ if (n > buff.remaining())
+ n = buff.remaining();
+
+ buff.position(buff.position() + (int) n);
+ return n;
+ }
+ };
+ }
+
+ @Override
+ public long getContentLength() {
+ return resources.total;
+ }
+
+ @Override
+ public void writeTo(OutputStream outstream) throws IOException {
+ outstream.write(resources.buffer, 0, resources.total);
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/federation/http/HTTPClientSearcher.java b/container-search/src/main/java/com/yahoo/search/federation/http/HTTPClientSearcher.java
new file mode 100644
index 00000000000..1459fb6f226
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/http/HTTPClientSearcher.java
@@ -0,0 +1,276 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.federation.http;
+
+import com.yahoo.component.ComponentId;
+import com.yahoo.jdisc.http.CertificateStore;
+import com.yahoo.yolean.Exceptions;
+import com.yahoo.search.Query;
+import com.yahoo.search.Result;
+import com.yahoo.processing.request.CompoundName;
+import com.yahoo.search.result.ErrorMessage;
+import com.yahoo.search.result.Hit;
+import com.yahoo.search.searchchain.Execution;
+import com.yahoo.statistics.Statistics;
+
+import org.apache.http.HttpEntity;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.logging.Logger;
+
+/**
+ * A utility parent for searchers which gets data from web services which is incorporated into the query.
+ * This searcher will take care of implementing the search method while the extending class implements
+ * {@link #getQueryMap} and {@link #handleResponse} to create the http request and handle the response, respectively.
+ *
+ * <p>This class automatically adds a meta hit containing latency and other
+ * meta information about the obtained HTTP data using createRequestMeta().
+ * The fields available in the hit are:</p>
+ *
+ * <dl><dt>
+ * HTTPSearcher.LOG_LATENCY_START
+ * <dd>
+ * The latency of the external provider answering a request.
+ * <dt>
+ * HTTPSearcher.LOG_LATENCY_FINISH
+ * <dd>
+ * Total time of the HTTP traffic, but also decoding of the data, is this
+ * happens at the same time.
+ * <dt>
+ * HTTPSearcher.LOG_URI
+ * <dd>
+ * The complete URI used for external service.
+ * <dt>
+ * HTTPSearcher.LOG_SCHEME
+ * <dd>
+ * The scheme of the request URI sent.
+ * <dt>
+ * HTTPSearcher.LOG_HOST
+ * <dd>
+ * The host used for the request URI sent.
+ * <dt>
+ * HTTPSearcher.LOG_PORT
+ * <dd>
+ * The port used for the request URI sent.
+ * <dt>
+ * HTTPSearcher.LOG_PATH
+ * <dd>
+ * Path element of the request URI sent.
+ * <dt>
+ * HTTPSearcher.LOG_STATUS
+ * <dd>
+ * Status code of the HTTP response.
+ * <dt>
+ * HTTPSearcher.LOG_PROXY_TYPE
+ * <dd>
+ * The proxy type used, if any. Default is "http".
+ * <dt>
+ * HTTPSearcher.LOG_PROXY_HOST
+ * <dd>
+ * The proxy host, if any.
+ * <dt>
+ * HTTPSearcher.LOG_PROXY_PORT
+ * <dd>
+ * The proxy port, if any.
+ * <dt>
+ * HTTPSearcher.LOG_HEADER_PREFIX prepended to request header field name
+ * <dd>
+ * The content of any additional request header fields.
+ * <dt>
+ * HTTPSearcher.LOG_RESPONSE_HEADER_PREFIX prepended to response header field name
+ * <dd>
+ * The content of any additional response header fields.
+ * </dl>
+
+ * @author <a href="mailto:arnebef@yahoo-inc.com">Arne Bergene Fossaa</a>
+ * @author bratseth
+ */
+public abstract class HTTPClientSearcher extends HTTPSearcher {
+
+ static final CompoundName REQUEST_META_CARRIER = new CompoundName("com.yahoo.search.federation.http.HTTPClientSearcher_requestMeta");
+
+ protected final static Logger log = Logger.getLogger(HTTPClientSearcher.class.getName());
+
+ /**
+ * Creates a client searcher
+ *
+ * @param id the id of this instance
+ * @param connections the connections this will load balance and fail over between
+ * @param path the path portion of the url to be used
+ */
+ public HTTPClientSearcher(ComponentId id, List<Connection> connections,String path,Statistics statistics) {
+ super(id, connections, path, statistics);
+ }
+
+ public HTTPClientSearcher(ComponentId id, List<Connection> connections,String path,Statistics statistics,
+ CertificateStore certificateStore) {
+ super(id, connections, path, statistics, certificateStore);
+ }
+
+ public HTTPClientSearcher(ComponentId id, List<Connection> connections, HTTPParameters parameters, Statistics statistics) {
+ super(id, connections, parameters, statistics);
+ }
+ /**
+ * Creates a client searcher
+ *
+ * @param id the id of this instance
+ * @param connections the connections this will load balance and fail over between
+ * @param parameters the parameters to use when making http calls
+ * @param certificateStore the certificate store to use to pass certificates in requests
+ */
+ public HTTPClientSearcher(ComponentId id, List<Connection> connections, HTTPParameters parameters,
+ Statistics statistics, CertificateStore certificateStore) {
+ super(id, connections, parameters, statistics, certificateStore);
+ }
+
+ /** Overridden to avoid interfering with errors from nested searchers, which is inappropriate for a <i>client</i> */
+ @Override
+ public Result robustSearch(Query query, Execution execution, Connection connection) {
+ return search(query,execution,connection);
+ }
+
+ /** Implements a search towards the connection chosen by the cluster searcher for this query */
+ @Override
+ public Result search(Query query, Execution execution, Connection connection) {
+ Hit requestMeta = doHttpRequest(query, connection);
+ Result result = execution.search(query);
+ result.hits().add(requestMeta);
+ return result;
+ }
+
+ private Hit doHttpRequest(Query query, Connection connection) {
+ URI uri;
+ // Create default meta hit for holding logging information
+ Hit requestMeta = createRequestMeta();
+ query.properties().set(REQUEST_META_CARRIER, requestMeta);
+
+ query.trace("Created request information hit",false,9);
+ try {
+ uri = getURI(query, connection);
+ } catch (MalformedURLException e) {
+ query.errors().add(createMalformedUrlError(query,e));
+ return requestMeta;
+ } catch (URISyntaxException e) {
+ query.errors().add(createMalformedUrlError(query,e));
+ return requestMeta;
+ }
+
+ HttpEntity entity;
+ try {
+ if (query.getTraceLevel()>=1)
+ query.trace("Fetching " + uri.toString(), false, 1);
+ entity = getEntity(uri, requestMeta, query);
+ } catch (IOException e) {
+ query.errors().add(ErrorMessage.createBackendCommunicationError(
+ "Error when trying to connect to HTTP backend in " + this + " using " + connection + " for " +
+ query + ": " + Exceptions.toMessageString(e)));
+ return requestMeta;
+ } catch (TimeoutException e) {
+ query.errors().add(ErrorMessage.createTimeout("HTTP traffic timed out in "
+ + this + " for " + query + ": " + e.getMessage()));
+ return requestMeta;
+ }
+ if (entity==null) {
+ query.errors().add(ErrorMessage.createBackendCommunicationError(
+ "No result from connecting to HTTP backend in " + this + " using " + connection + " for " + query));
+ return requestMeta;
+ }
+
+ try {
+ query = handleResponse(entity,query);
+ }
+ catch (IOException e) {
+ query.errors().add(ErrorMessage.createBackendCommunicationError(
+ "Error when trying to consume input in " + this + ": " + Exceptions.toMessageString(e)));
+ } finally {
+ cleanupHttpEntity(entity);
+ }
+ return requestMeta;
+ }
+
+ /** Overrides to pass the query on to the next searcher */
+ @Override
+ public Result search(Query query, Execution execution, ErrorMessage error) {
+ query.errors().add(error);
+ return execution.search(query);
+ }
+
+ /** Do nothing on fill in client searchers */
+ @Override
+ public void fill(Result result,String summaryClass,Execution execution,Connection connection) {
+ }
+
+ /**
+ * Convenience hook for unmarshalling the response and adding the information to the query.
+ * Implement this or <code>handleResponse(entity,query)</code> in any subclass.
+ * This default implementation throws an exception.
+ *
+ * @param inputStream the stream containing the data from the http service
+ * @param contentLength the length of the content in the stream in bytes, or a negative number if not known
+ * @param query the current query, to which information from the stream should be added
+ * @return query the query to propagate down the chain. This should almost always be the
+ * query instance given as a parameter.
+ */
+ public Query handleResponse(InputStream inputStream, long contentLength, Query query) throws IOException {
+ throw new UnsupportedOperationException("handleResponse must be implemented by " + this);
+ }
+
+ /**
+ * Unmarshals the response and adds the resulting data to the given query.
+ * This default implementation calls
+ * <code>return handleResponse(entity.getContent(), entity.getContentLength(), query);</code>
+ * (and does some detailed query tracing).
+ *
+ * @param query the current query, to which information from the stream should be added
+ * @return query the query to propagate down the chain. This should almost always be the
+ * query instance given as a parameter.
+ */
+ public Query handleResponse(HttpEntity entity, Query query) throws IOException {
+ long len = entity.getContentLength();
+ if (query.getTraceLevel()>=4)
+ query.trace("Received " + len + " bytes response in " + this, false, 4);
+ query = handleResponse(entity.getContent(), len, query);
+ if (query.getTraceLevel()>=2)
+ query.trace("Handled " + len + " bytes response in " + this, false, 2);
+ return query;
+ }
+
+ /** Never retry individual queries to clients for now */
+ @Override
+ protected boolean shouldRetry(Query query,Result result) { return false; }
+
+ /**
+ * numHits and offset should not be part of the cache key as cache supports
+ * partial read/write that is only one cache entry is maintained per query
+ * irrespective of the offset and numhits.
+ */
+ public abstract Map<String, String> getCacheKey(Query q);
+
+ /**
+ * Adds all key-values starting by "service." + getClientName() in query.properties().
+ * Returns the empty map if {@link #getServiceName} is not overridden.
+ */
+ @Override
+ public Map<String,String> getQueryMap(Query query) {
+ LinkedHashMap<String, String> queryMap=new LinkedHashMap<>();
+ if (getServiceName().isEmpty()) return queryMap;
+
+ for (Map.Entry<String,Object> objectProperty : query.properties().listProperties("service." + getServiceName()).entrySet()) // TODO: Make more efficient using CompoundName
+ queryMap.put(objectProperty.getKey(),objectProperty.getValue().toString());
+ return queryMap;
+ }
+
+ /**
+ * Override this to return the name of the service this is a client of.
+ * This is used to look up service specific properties as service.getServiceName.serviceSpecificProperty.
+ * This default implementation returns "", which means service specific parameters will not be used.
+ */
+ protected String getServiceName() { return ""; }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/federation/http/HTTPParameters.java b/container-search/src/main/java/com/yahoo/search/federation/http/HTTPParameters.java
new file mode 100644
index 00000000000..19fe1df3e2e
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/http/HTTPParameters.java
@@ -0,0 +1,315 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.federation.http;
+
+import com.google.common.base.Preconditions;
+import com.yahoo.search.federation.ProviderConfig.PingOption;
+import org.apache.http.conn.params.ConnManagerParams;
+import org.apache.http.conn.params.ConnPerRouteBean;
+import org.apache.http.params.BasicHttpParams;
+import org.apache.http.params.HttpConnectionParams;
+import org.apache.http.params.HttpParams;
+
+import com.yahoo.search.federation.ProviderConfig;
+
+/**
+ * A set of parameters for talking to an http backend
+ *
+ * @author bratseth
+ */
+public final class HTTPParameters {
+
+ public static final String RETRIES = "com.yahoo.search.federation.http.retries";
+
+ private boolean frozen=false;
+
+ // All timing parameters below are in milliseconds
+ /** The url request path portion */
+ private String path="/";
+ private int connectionTimeout=2000;
+ private int readTimeout=5000;
+ private boolean persistentConnections=true;
+ private boolean enableProxy = false;
+ private String proxyHost = "localhost";
+ private int proxyPort = 1080;
+ private String method = "GET";
+ private String schema = "http";
+ private String inputEncoding = "utf-8";
+ private String outputEncoding = "utf-8";
+ private int maxTotalConnections=10000;
+ private int maxConnectionsPerRoute=10000;
+ private int socketBufferSizeBytes=-1;
+ private int retries = 1;
+ private int configuredReadTimeout = -1;
+ private int configuredConnectionTimeout = -1;
+ private int connectionPoolTimeout = -1;
+ private String ycaProxy = null;
+ private int ycaPort = 0;
+ private String ycaApplicationId = null;
+ private boolean ycaUseProxy = false;
+ private long ycaTtl = 0L;
+ private long ycaRetry = 0L;
+
+ private PingOption.Enum pingOption = PingOption.NORMAL;
+
+
+ private boolean followRedirects = true;
+
+ public HTTPParameters() {}
+
+ public HTTPParameters(String path) {
+ setPath(path);
+ }
+
+ public HTTPParameters(ProviderConfig providerConfig) {
+ configuredReadTimeout = (int) (providerConfig.readTimeout() * 1000.0d);
+ configuredConnectionTimeout = (int) (providerConfig.connectionTimeout() * 1000.0d);
+ connectionPoolTimeout = (int) (providerConfig.connectionPoolTimeout() * 1000.0d);
+ retries = providerConfig.retries();
+ setPath(providerConfig.path());
+ ycaUseProxy = providerConfig.yca().useProxy();
+ if (ycaUseProxy) {
+ ycaProxy = providerConfig.yca().host();
+ ycaPort = providerConfig.yca().port();
+ }
+ ycaApplicationId = providerConfig.yca().applicationId();
+ ycaTtl = providerConfig.yca().ttl() * 1000L;
+ ycaRetry = providerConfig.yca().retry() * 1000L;
+ followRedirects = providerConfig.followRedirects();
+ pingOption = providerConfig.pingOption();
+ }
+
+ /**
+ * Set the url path to use in queries to this. If the argument is null or empty the path is set to "/".
+ * If a leading "/" is missing, it is added automatically.
+ */
+ public final void setPath(String path) {
+ if (path==null || path.isEmpty()) path="/";
+
+ if (! path.startsWith("/"))
+ path="/" + path;
+ this.path = path;
+ }
+
+ public PingOption.Enum getPingOption() {
+ return pingOption;
+ }
+
+ public void setPingOption(PingOption.Enum pingOption) {
+ Preconditions.checkNotNull(pingOption);
+ ensureNotFrozen();
+ this.pingOption = pingOption;
+ }
+
+ /** Returns the url path. Default is "/". */
+ public String getPath() { return path; }
+
+ public boolean getFollowRedirects() {
+ return followRedirects;
+ }
+
+ public void setFollowRedirects(boolean followRedirects) {
+ ensureNotFrozen();
+ this.followRedirects = followRedirects;
+ }
+
+
+ public void setConnectionTimeout(int connectionTimeout) {
+ ensureNotFrozen();
+ this.connectionTimeout=connectionTimeout;
+ }
+
+ /** Returns the connection timeout in milliseconds. Default is 2000. */
+ public int getConnectionTimeout() { return connectionTimeout; }
+
+ public void setReadTimeout(int readTimeout) {
+ ensureNotFrozen();
+ this.readTimeout=readTimeout;
+ }
+
+ /** Returns the read timeout in milliseconds. Default is 5000. */
+ public int getReadTimeout() { return readTimeout; }
+
+ /**
+ * <b>Note: This is currently largely a noop: Connections are reused even when this is set to true.
+ * The setting will change from sharing connections between threads to only reusing it within a thread
+ * but it is still reused.</b>
+ */
+ public void setPersistentConnections(boolean persistentConnections) {
+ ensureNotFrozen();
+ this.persistentConnections=persistentConnections;
+ }
+
+ /** Returns whether this should use persistent connections. Default is true. */
+ public boolean getPersistentConnections() { return persistentConnections; }
+
+ /** Returns whether proxying should be enabled. Default is false. */
+ public boolean getEnableProxy() { return enableProxy; }
+
+ public void setEnableProxy(boolean enableProxy ) {
+ ensureNotFrozen();
+ this.enableProxy=enableProxy;
+ }
+
+ /** Returns the proxy type to use (if enabled). Default is "http". */
+ public String getProxyType() {
+ return "http";
+ }
+
+ public void setProxyHost(String proxyHost) {
+ ensureNotFrozen();
+ this.proxyHost=proxyHost;
+ }
+
+ /** Returns the proxy host to use (if enabled). Default is "localhost". */
+ public String getProxyHost() { return proxyHost; }
+
+ public void setProxyPort(int proxyPort) {
+ ensureNotFrozen();
+ this.proxyPort=proxyPort;
+ }
+
+ /** Returns the proxy port to use (if enabled). Default is 1080. */
+ public int getProxyPort() { return proxyPort; }
+
+ public void setMethod(String method) {
+ ensureNotFrozen();
+ this.method=method;
+ }
+
+ /** Returns the http method to use. Default is "GET". */
+ public String getMethod() { return method; }
+
+ public void setSchema(String schema) {
+ ensureNotFrozen();
+ this.schema=schema;
+ }
+
+ /** Returns the schema to use. Default is "http". */
+ public String getSchema() { return schema; }
+
+ public void setInputEncoding(String inputEncoding) {
+ ensureNotFrozen();
+ this.inputEncoding=inputEncoding;
+ }
+
+ /** Returns the input encoding. Default is "utf-8". */
+ public String getInputEncoding() { return inputEncoding; }
+
+ public void setOutputEncoding(String outputEncoding) {
+ ensureNotFrozen();
+ this.outputEncoding=outputEncoding;
+ }
+
+ /** Returns the output encoding. Default is "utf-8". */
+ public String getOutputEncoding() { return outputEncoding; }
+
+ /** Make this unmodifiable. Note that any thread synchronization must be done outside this object. */
+ public void freeze() {
+ frozen=true;
+ }
+
+ private void ensureNotFrozen() {
+ if (frozen) throw new IllegalStateException("Cannot modify frozen " + this);
+ }
+
+ /**
+ * Returns the eligible subset of this as a HttpParams snapshot
+ * AND configures the Apache HTTP library with the parameters of this
+ */
+ public HttpParams toHttpParams() {
+ return toHttpParams(connectionTimeout, readTimeout);
+ }
+
+ /**
+ * Returns the eligible subset of this as a HttpParams snapshot
+ * AND configures the Apache HTTP library with the parameters of this
+ */
+ public HttpParams toHttpParams(int connectionTimeout, int readTimeout) {
+ HttpParams params = new BasicHttpParams();
+ // force use of configured value if available
+ if (configuredConnectionTimeout > 0) {
+ HttpConnectionParams.setConnectionTimeout(params, configuredConnectionTimeout);
+ } else {
+ HttpConnectionParams.setConnectionTimeout(params, connectionTimeout);
+ }
+ if (configuredReadTimeout > 0) {
+ HttpConnectionParams.setSoTimeout(params, configuredReadTimeout);
+ } else {
+ HttpConnectionParams.setSoTimeout(params, readTimeout);
+ }
+ if (socketBufferSizeBytes > 0) {
+ HttpConnectionParams.setSocketBufferSize(params, socketBufferSizeBytes);
+ }
+ if (connectionPoolTimeout > 0) {
+ ConnManagerParams.setTimeout(params, connectionPoolTimeout);
+ }
+ ConnManagerParams.setMaxTotalConnections(params, maxTotalConnections);
+ ConnManagerParams.setMaxConnectionsPerRoute(params, new ConnPerRouteBean(maxConnectionsPerRoute));
+ if (retries >= 0) {
+ params.setIntParameter(RETRIES, retries);
+ }
+ params.setParameter("http.protocol.handle-redirects", followRedirects);
+ return params;
+ }
+
+ public int getMaxTotalConnections() {
+ return maxTotalConnections;
+ }
+
+ public void setMaxTotalConnections(int maxTotalConnections) {
+ ensureNotFrozen();
+ this.maxTotalConnections = maxTotalConnections;
+ }
+
+ public int getMaxConnectionsPerRoute() {
+ return maxConnectionsPerRoute;
+ }
+
+ public void setMaxConnectionsPerRoute(int maxConnectionsPerRoute) {
+ ensureNotFrozen();
+ this.maxConnectionsPerRoute = maxConnectionsPerRoute;
+ }
+
+ public int getSocketBufferSizeBytes() {
+ return socketBufferSizeBytes;
+ }
+
+ public void setSocketBufferSizeBytes(int socketBufferSizeBytes) {
+ ensureNotFrozen();
+ this.socketBufferSizeBytes = socketBufferSizeBytes;
+ }
+
+ public int getRetries() {
+ return retries;
+ }
+
+ public void setRetries(int retries) {
+ ensureNotFrozen();
+ this.retries = retries;
+ }
+
+ public String getYcaProxy() {
+ return ycaProxy;
+ }
+
+ public int getYcaPort() {
+ return ycaPort;
+ }
+
+ public String getYcaApplicationId() {
+ return ycaApplicationId;
+ }
+
+ public boolean getYcaUseProxy() {
+ return ycaUseProxy;
+ }
+
+ public long getYcaTtl() {
+ return ycaTtl;
+ }
+
+ public long getYcaRetry() {
+ return ycaRetry;
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/federation/http/HTTPProviderSearcher.java b/container-search/src/main/java/com/yahoo/search/federation/http/HTTPProviderSearcher.java
new file mode 100644
index 00000000000..c2bc6b2196b
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/http/HTTPProviderSearcher.java
@@ -0,0 +1,260 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.federation.http;
+
+import com.google.common.collect.ImmutableList;
+import com.yahoo.component.ComponentId;
+import com.yahoo.jdisc.http.CertificateStore;
+import com.yahoo.search.cache.QrBinaryCacheConfig;
+import com.yahoo.search.cache.QrBinaryCacheRegionConfig;
+import com.yahoo.yolean.Exceptions;
+import com.yahoo.search.Query;
+import com.yahoo.search.Result;
+import com.yahoo.search.federation.FederationSearcher;
+import com.yahoo.search.query.Properties;
+import com.yahoo.search.result.ErrorMessage;
+import com.yahoo.search.result.Hit;
+import com.yahoo.search.searchchain.Execution;
+import com.yahoo.statistics.Counter;
+import com.yahoo.statistics.Statistics;
+import com.yahoo.statistics.Value;
+
+import org.apache.http.HttpEntity;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.*;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Superclass of searchers which talks to HTTP backends. Implement a subclass to talk to a backend
+ * over HTTP which is not supported by the platform out of the box.
+ * <p>
+ * Implementations must override one of the <code>unmarshal</code> methods to unmarshal the response.
+ * </p>
+ *
+ * @author <a href="mailto:arnebef@yahoo-inc.com">Arne Bergene Fossaa</a>
+ * @author bratseth
+ */
+public abstract class HTTPProviderSearcher extends HTTPSearcher {
+
+ private final Counter emptyResults;
+ private final Value hitsPerQuery;
+ private final Value responseLatency;
+ private final Counter readTimeouts;
+
+ private final static List<String> excludedSourceProperties = ImmutableList.of("offset", "hits", "provider");
+
+ protected final static Logger log = Logger.getLogger(HTTPProviderSearcher.class.getName());
+
+ /** The name of the cache used (which is just getid().stringValue(), or null if no cache is used */
+ protected String cacheName=null;
+
+ public HTTPProviderSearcher(ComponentId id, List<Connection> connections,String path, Statistics statistics) {
+ this(id,connections,new HTTPParameters(path), statistics);
+ }
+
+ /** Creates a http provider searcher using id.getName as provider name */
+ public HTTPProviderSearcher(ComponentId id, List<Connection> connections, String path,
+ Statistics statistics, CertificateStore certificateStore) {
+ this(id, connections, new HTTPParameters(path), statistics, certificateStore);
+ }
+
+ public HTTPProviderSearcher(ComponentId id, List<Connection> connections, HTTPParameters parameters,
+ Statistics statistics) {
+ this(id, connections, parameters, statistics, new ThrowingCertificateStore());
+ }
+
+ /**
+ * Creates a provider searcher
+ *
+ * @param id the id of this instance
+ * @param connections the connections this will load balance and fail over between
+ * @param parameters the parameters to use when making http calls
+ */
+ public HTTPProviderSearcher(ComponentId id, List<Connection> connections, HTTPParameters parameters,
+ Statistics statistics, CertificateStore certificateStore) {
+ super(id, connections, parameters, statistics, certificateStore);
+ String suffix = "_" + getId().getName().replace('.', '_');
+ hitsPerQuery = new Value("hits_per_query" + suffix, statistics,
+ new Value.Parameters().setLogRaw(false).setNameExtension(false).setLogMean(true));
+ responseLatency = new Value(LOG_LATENCY_START + suffix, statistics,
+ new Value.Parameters().setLogRaw(false).setLogMean(true).setNameExtension(false));
+ emptyResults = new Counter("empty_results" + suffix, statistics, false);
+ readTimeouts = new Counter(LOG_READ_TIMEOUT_PREFIX + suffix, statistics, false);
+ }
+
+ /** @deprecated this method does nothing */
+ @Deprecated
+ protected void configureCache(final QrBinaryCacheConfig cacheConfig,final QrBinaryCacheRegionConfig regionConfig) {
+ }
+
+ /**
+ * Unmarshal the stream by converting it to hits and adding the hits to the given result.
+ * A convenience hook called by the default <code>unmarshal(entity,result).</code>
+ * Override this in subclasses which does not override <code>unmarshal(entity,result).</code>
+ * <p>
+ * This default implementation throws an exception.
+ *
+ * @param stream the stream of data returned
+ * @param contentLength the length of the content in bytes if known, or a negative number if unknown
+ * @param result the result to which unmarshalled data should be added
+ */
+ public void unmarshal(final InputStream stream, long contentLength, final Result result) throws IOException {
+ throw new UnsupportedOperationException("Unmarshal must be implemented by " + this);
+ }
+
+ /**
+ * Unmarshal the result from an http entity. This default implementation calls
+ * <code>unmarshal(entity.getContent(), entity.getContentLength(), result)</code>
+ * (and does some detailed query tracing).
+ *
+ * @param entity the entity containing the data to unmarshal
+ * @param result the result to which unmarshalled data should be added
+ */
+ public void unmarshal(HttpEntity entity,Result result) throws IOException {
+ Query query=result.getQuery();
+ long len = entity.getContentLength();
+ if (query.getTraceLevel()>=4)
+ query.trace("Received " + len + " bytes response in " + this, false, 4);
+ query.trace("Unmarshaling result.", false, 6);
+ unmarshal(entity.getContent(), len, result);
+
+ if (query.getTraceLevel()>=2)
+ query.trace("Handled " + len + " bytes response in " + this, false, 2);
+
+ }
+
+ protected void addNonExcludedSourceProperties(Query query, Map<String, String> queryMap) {
+ Properties sourceProperties = FederationSearcher.getSourceProperties(query);
+ if (sourceProperties != null) {
+ for(Map.Entry<String, Object> entry : sourceProperties.listProperties("").entrySet()) {
+ if (!excludedSourceProperties.contains(entry.getKey())) {
+ queryMap.put(entry.getKey(), entry.getValue().toString());
+ }
+ }
+ }
+ }
+
+ /**
+ * Hook called at the moment the result is returned from this searcher. This default implementation
+ * does <code>return result</code>.
+ *
+ * @param result the result which is to be returned
+ * @param requestMeta the request information hit, or null if none was created (e.g if this was a cache lookup)
+ * @param e the exception caused during execution of this query, or null if none
+ * @return the result which is returned upwards
+ */
+ protected Result inspectAndReturnFinalResult(Result result, Hit requestMeta, Exception e) {
+ return result;
+ }
+
+ private Result statisticsBeforeInspection(Result result,
+ Hit requestMeta, Exception e) {
+ int hitCount = result.getConcreteHitCount();
+ if (hitCount == 0) {
+ emptyResults.increment();
+ }
+ hitsPerQuery.put((double) hitCount);
+
+ if (requestMeta != null) {
+ requestMeta.setField(LOG_HITCOUNT, Integer.valueOf(hitCount));
+ }
+
+ return inspectAndReturnFinalResult(result,
+ requestMeta, e);
+ }
+
+
+ @Override
+ protected void logResponseLatency(long latency) {
+ responseLatency.put((double) latency);
+ }
+
+ @Override
+ public Result search(Query query, Execution execution,Connection connection) {
+ // Create default meta hit for holding logging information
+ Hit requestMeta = createRequestMeta();
+ Result result = new Result(query);
+ result.hits().add(requestMeta);
+ query.trace("Created request information hit", false, 9);
+
+ try {
+ URI uri = getURI(query, requestMeta, connection);
+ if (query.getTraceLevel()>=1)
+ query.trace("Fetching " + uri.toString(), false, 1);
+ long requestStartTime = System.currentTimeMillis();
+
+ HttpEntity entity = getEntity(uri, requestMeta, query);
+
+ // Why should consumeEntity call inspectAndReturnFinalResult itself?
+ // Seems confusing to me.
+ return entity == null
+ ? statisticsBeforeInspection(result, requestMeta, null)
+ : consumeEntity(entity, query, result, requestMeta, requestStartTime);
+
+ } catch (MalformedURLException|URISyntaxException e) {
+ result.hits().addError(createMalformedUrlError(query,e));
+ return statisticsBeforeInspection(result, requestMeta, e);
+ } catch (TimeoutException e) {
+ result.hits().addError(ErrorMessage.createTimeout("No time left for HTTP traffic in "
+ + this
+ + " for " + query + ": " + e.getMessage()));
+ return statisticsBeforeInspection(result, requestMeta, e);
+ } catch (IOException e) {
+ result.hits().addError(ErrorMessage.createBackendCommunicationError(
+ "Error when trying to connect to HTTP backend in " + this
+ + " for " + query + ": " + Exceptions.toMessageString(e)));
+ return statisticsBeforeInspection(result, requestMeta, e);
+ }
+ }
+
+ private Result consumeEntity(HttpEntity entity, Query query, Result result, Hit logHit, long requestStartTime) {
+
+ try {
+ // remove some time from timeout to allow for close calls with return result
+ unmarshal(new TimedHttpEntity(entity, query.getStartTime(), Math.max(1, query.getTimeout() - 10)), result);
+ logHit.setField(LOG_LATENCY_FINISH, System.currentTimeMillis() - requestStartTime);
+ return statisticsBeforeInspection(result, logHit, null);
+ } catch (IOException e) {
+ result.hits().addError(ErrorMessage.createBackendCommunicationError(
+ "Error when trying to consume input in " + this + ": " + Exceptions.toMessageString(e)));
+ return statisticsBeforeInspection(result, logHit, e);
+ } catch (TimeoutException e) {
+ readTimeouts.increment();
+ result.hits().addError(ErrorMessage
+ .createTimeout("Timed out while reading/unmarshaling from backend in "
+ + this + " for " + query
+ + ": " + e.getMessage()));
+ return statisticsBeforeInspection(result, logHit, e);
+ } finally { // TODO: The scope of this finally must be enlarged to release the connection also on errors
+ cleanupHttpEntity(entity);
+ }
+ }
+
+ /**
+ * Returns the key-value pairs that should be added as properties to the request url sent to the service.
+ * Must be overridden in subclasses to add the key-values expected by the service in question, unless
+ * {@link #getURI} (from which this is called) is overridden.
+ * <p>
+ * This default implementation returns the query.properties() prefixed by
+ * "source.[sourceName]" or "property.[propertyName]"
+ * (by calling {@link #addNonExcludedSourceProperties}).
+ */
+ @Override
+ public Map<String,String> getQueryMap(Query query) {
+ Map<String,String> queryMap = super.getQueryMap(query);
+ addNonExcludedSourceProperties(query, queryMap);
+ return queryMap;
+ }
+
+ /**
+ * @deprecated the cache key is ignored as there is no built-in caching support
+ */
+ @Deprecated
+ public abstract Map<String, String> getCacheKey(Query q);
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/federation/http/HTTPSearcher.java b/container-search/src/main/java/com/yahoo/search/federation/http/HTTPSearcher.java
new file mode 100644
index 00000000000..65ce7b3647c
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/http/HTTPSearcher.java
@@ -0,0 +1,958 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.federation.http;
+
+import com.google.inject.Inject;
+import com.yahoo.component.ComponentId;
+import com.yahoo.jdisc.http.CertificateStore;
+import com.yahoo.log.LogLevel;
+import com.yahoo.prelude.Ping;
+import com.yahoo.prelude.Pong;
+import com.yahoo.yolean.Exceptions;
+import com.yahoo.search.Query;
+import com.yahoo.search.cluster.ClusterSearcher;
+import com.yahoo.search.federation.ProviderConfig.PingOption;
+import com.yahoo.search.result.ErrorMessage;
+import com.yahoo.search.result.Hit;
+import com.yahoo.statistics.Counter;
+import com.yahoo.statistics.Statistics;
+import com.yahoo.text.Utf8;
+
+import org.apache.http.*;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.HttpRequestRetryHandler;
+import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.conn.ClientConnectionManager;
+import org.apache.http.conn.ConnectTimeoutException;
+import org.apache.http.conn.params.ConnManagerParams;
+import org.apache.http.conn.params.ConnRoutePNames;
+import org.apache.http.conn.routing.HttpRoutePlanner;
+import org.apache.http.conn.scheme.PlainSocketFactory;
+import org.apache.http.conn.scheme.Scheme;
+import org.apache.http.conn.scheme.SchemeRegistry;
+import org.apache.http.conn.ssl.SSLSocketFactory;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.impl.conn.DefaultHttpRoutePlanner;
+import org.apache.http.impl.conn.SingleClientConnManager;
+import org.apache.http.impl.conn.tsccm.ThreadSafeClientConnManager;
+import org.apache.http.params.HttpParams;
+import org.apache.http.params.HttpProtocolParams;
+import org.apache.http.protocol.BasicHttpContext;
+import org.apache.http.protocol.ExecutionContext;
+import org.apache.http.protocol.HttpContext;
+import org.apache.http.protocol.HttpRequestExecutor;
+import org.apache.http.util.EntityUtils;
+
+import javax.net.ssl.SSLHandshakeException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.UnsupportedEncodingException;
+import java.net.*;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Generic superclass of searchers making connections to some HTTP service. This
+ * supports clustered connections - a list of alternative servers may be given,
+ * requests will be hashed across these and failed over in case some are down.
+ * <p>
+ * This simply provides some utility methods for working with http connections
+ * and implements ping against the service.
+ *
+ * <p>This searcher contains code from the Apache httpcomponents client library,
+ * licensed to the Apache Software Foundation under the Apache License, Version
+ * 2.0. Please refer to http://www.apache.org/licenses/LICENSE-2.0 for details.
+ *
+ * <p>This class automatically adds a meta hit containing latency and other
+ * meta information about the obtained HTTP data using createRequestMeta().
+ * The fields available in the hit are:</p>
+ *
+ * <dl><dt>
+ * HTTPSearcher.LOG_LATENCY_START
+ * <dd>
+ * The latency of the external provider answering a request.
+ * <dt>
+ * HTTPSearcher.LOG_LATENCY_FINISH
+ * <dd>
+ * Total time of the HTTP traffic, but also decoding of the data, as this
+ * happens at the same time.
+ * <dt>
+ * HTTPSearcher.LOG_HITCOUNT
+ * <dd>
+ * Number of concrete hits in the result returned by this provider.
+ * <dt>
+ * HTTPSearcher.LOG_URI
+ * <dd>
+ * The complete URI used for external service.
+ * <dt>
+ * HTTPSearcher.LOG_SCHEME
+ * <dd>
+ * The scheme of the request URI sent.
+ * <dt>
+ * HTTPSearcher.LOG_HOST
+ * <dd>
+ * The host used for the request URI sent.
+ * <dt>
+ * HTTPSearcher.LOG_PORT
+ * <dd>
+ * The port used for the request URI sent.
+ * <dt>
+ * HTTPSearcher.LOG_PATH
+ * <dd>
+ * Path element of the request URI sent.
+ * <dt>
+ * HTTPSearcher.LOG_STATUS
+ * <dd>
+ * Status code of the HTTP response.
+ * <dt>
+ * HTTPSearcher.LOG_PROXY_TYPE
+ * <dd>
+ * The proxy type used, if any. Default is "http".
+ * <dt>
+ * HTTPSearcher.LOG_PROXY_HOST
+ * <dd>
+ * The proxy host, if any.
+ * <dt>
+ * HTTPSearcher.LOG_PROXY_PORT
+ * <dd>
+ * The proxy port, if any.
+ * <dt>
+ * HTTPSearcher.LOG_HEADER_PREFIX prepended to request header field name
+ * <dd>
+ * The content of any additional request header fields.
+ * <dt>
+ * HTTPSearcher.LOG_RESPONSE_HEADER_PREFIX prepended to response header field name
+ * <dd>
+ * The content of any additional response header fields.
+ * </dl>
+ *
+ * @author <a href="mailto:arnebef@yahoo-inc.com">Arne Bergene Fossaa</a>
+ */
+public abstract class HTTPSearcher extends ClusterSearcher<Connection> {
+
+ protected static final String YCA_HTTP_HEADER = "Yahoo-App-Auth";
+
+ private static final Charset iso8859Charset = Charset.forName("ISO-8859-1");
+
+ // Logging field name constants
+ public static final String LOG_PATH = "path";
+ public static final String LOG_PORT = "port";
+ public static final String LOG_HOST = "host";
+ public static final String LOG_IP_ADDRESS = "ip_address";
+ public static final String IP_ADDRESS_UNKNOWN = "unknown";
+
+ public static final String LOG_SCHEME = "scheme";
+ public static final String LOG_URI = "uri";
+ public static final String LOG_PROXY_PORT = "proxy_port";
+ public static final String LOG_PROXY_HOST = "proxy_host";
+ public static final String LOG_PROXY_TYPE = "proxy_type";
+ public static final String LOG_STATUS = "status";
+ public static final String LOG_LATENCY_FINISH = "latency_finish";
+ public static final String LOG_LATENCY_START = "latency_start";
+ public static final String LOG_LATENCY_CONNECT = "latency_connect";
+ public static final String LOG_QUERY_PARAM_PREFIX = "query_param_";
+ public static final String LOG_HEADER_PREFIX = "header_";
+ public static final String LOG_RESPONSE_HEADER_PREFIX = "response_header_";
+ public static final String LOG_HITCOUNT = "hit_count";
+ public static final String LOG_CONNECT_TIMEOUT_PREFIX = "connect_timeout_";
+ public static final String LOG_READ_TIMEOUT_PREFIX = "read_timeout_";
+
+ protected final Logger log = Logger.getLogger(HTTPSearcher.class.getName());
+
+ /** The HTTP parameters to use. Assigned in the constructor */
+ private HTTPParameters httpParameters;
+
+ private final Counter connectTimeouts;
+
+ /** Whether to use certificates */
+ protected boolean useCertificate = false;
+
+ private final CertificateStore certificateStore;
+
+ /** The (optional) YCA application ID. */
+ private String ycaApplicationId = null;
+
+ /** The (optional) YCA proxy */
+ protected HttpHost ycaProxy = null;
+
+ /** YCA cache TTL in ms */
+ private long ycaTtl = 0L;
+
+ /** YCA retry rate in the cache if no cert is found, in ms */
+ private long ycaRetry = 0L;
+
+ /** Set at construction if this is using persistent connections */
+ private ClientConnectionManager sharedConnectionManager = null;
+
+ /** Set at construction if using non-persistent connections */
+ private ThreadLocal<SingleClientConnManager> singleClientConnManagerThreadLocal = null;
+
+ private static final SchemeRegistry schemeRegistry = new SchemeRegistry();
+
+ static {
+ schemeRegistry.register(new Scheme("http", PlainSocketFactory
+ .getSocketFactory(), 80));
+ schemeRegistry.register(new Scheme("https", SSLSocketFactory
+ .getSocketFactory(), 443));
+ }
+
+ public HTTPSearcher(ComponentId componentId, List<Connection> connections,String path, Statistics statistics) {
+ this(componentId, connections, new HTTPParameters(path), statistics, new ThrowingCertificateStore());
+ }
+
+ /** Creates a http searcher with default connection and read timeouts (currently 2 and 5s respectively) */
+ public HTTPSearcher(ComponentId componentId, List<Connection> connections,String path, Statistics statistics,
+ CertificateStore certificateStore) {
+ this(componentId, connections, new HTTPParameters(path), statistics, certificateStore);
+ }
+
+ public HTTPSearcher(ComponentId componentId, List<Connection> connections, HTTPParameters parameters,
+ Statistics statistics) {
+ this(componentId, connections, parameters, statistics, new ThrowingCertificateStore());
+ }
+ /**
+ * Creates a http searcher
+ *
+ * @param componentId the id of this instance
+ * @param connections the connections to establish to the backend nodes
+ * @param parameters the http parameters to use. This object will be frozen if it isn't already
+ */
+ @Inject
+ public HTTPSearcher(ComponentId componentId, List<Connection> connections, HTTPParameters parameters,
+ Statistics statistics, CertificateStore certificateStore) {
+ super(componentId,connections,false);
+ String suffix = "_" + getId().getName().replace('.', '_');
+
+ connectTimeouts = new Counter(LOG_CONNECT_TIMEOUT_PREFIX + suffix, statistics, false);
+
+ parameters.freeze();
+ this.httpParameters = parameters;
+ this.certificateStore = certificateStore;
+
+ if (parameters.getPersistentConnections()) {
+ HttpParams params=parameters.toHttpParams();
+ HttpProtocolParams.setVersion(params, HttpVersion.HTTP_1_1);
+ ConnManagerParams.setTimeout(params, 10);
+ sharedConnectionManager = new ThreadSafeClientConnManager(params, schemeRegistry);
+ Thread connectionPurgerThread = new Thread(() -> {
+ //this is the default value in yahoo jvm installations
+ long DNSTTLSec = 120;
+ while (true) {
+ try {
+ Thread.sleep(DNSTTLSec * 1000);
+ if (sharedConnectionManager == null)
+ continue;
+
+ sharedConnectionManager.closeExpiredConnections();
+ DNSTTLSec = Long.valueOf(java.security.Security
+ .getProperty("networkaddress.cache.ttl"));
+ //No DNS TTL, no need to close idle connections
+ if (DNSTTLSec <= 0) {
+ DNSTTLSec = 120;
+ continue;
+ }
+ sharedConnectionManager.closeIdleConnections(2 * DNSTTLSec, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ return;
+ } catch (NumberFormatException e) {
+ continue;
+ }
+ }
+ });
+ connectionPurgerThread.setDaemon(true);
+ connectionPurgerThread.start();
+
+ }
+ else {
+ singleClientConnManagerThreadLocal =new ThreadLocal<>();
+ }
+
+ initializeYCA(httpParameters, certificateStore);
+ }
+
+ /**
+ * Initialize YCA certificate and proxy if they have been set to non-null,
+ * non-empty values. It will wrap thrown exceptions from the YCA layer into
+ * RuntimeException and propagate them.
+ */
+ private void initializeYCA(HTTPParameters parameters, CertificateStore certificateStore) {
+ String applicationId = parameters.getYcaApplicationId();
+ String proxy = parameters.getYcaProxy();
+ int port = parameters.getYcaPort();
+ long ttl = parameters.getYcaTtl();
+ long retry = parameters.getYcaRetry();
+
+ if (applicationId != null && !applicationId.trim().isEmpty()) {
+ initializeCertificate(applicationId, ttl, retry, certificateStore);
+ }
+
+ if (parameters.getYcaUseProxy()) {
+ initializeProxy(proxy, port);
+ }
+ }
+
+ /** Returns the HTTP parameters used in this. This is always frozen */
+ public HTTPParameters getParameters() { return httpParameters; }
+
+ /**
+ * Returns the key-value pairs that should be added as properties to the request url sent to the service.
+ * Must be overridden in subclasses to add the key-values expected by the service in question, unless
+ * {@link #getURI} (from which this is called) is overridden.
+ * <p>
+ * This default implementation returns an empty LinkedHashMap.
+ */
+ public Map<String,String> getQueryMap(Query query) {
+ return new LinkedHashMap<>();
+ }
+
+ /**
+ * Initialize the YCA certificate.
+ * This will warn but not throw if certificates could not be loaded, as the certificates
+ * are external state which can fail independently.
+ */
+ private void initializeCertificate(String applicationId, long ttl, long retry, CertificateStore certificateStore) {
+ try {
+ // get the certificate, i.e. init the cache and check integrity
+ String certificate = certificateStore.getCertificate(applicationId, ttl, retry);
+ if (certificate == null) {
+ getLogger().log(LogLevel.WARNING, "No certificate found for application '" + applicationId + "'");
+ return;
+ }
+
+ this.useCertificate = true;
+ this.ycaApplicationId = applicationId;
+ this.ycaTtl = ttl;
+ this.ycaRetry = retry;
+ getLogger().log(LogLevel.CONFIG, "Got certificate: " + certificate);
+ }
+ catch (Exception e) {
+ getLogger().log(LogLevel.WARNING,"Exception while initializing certificate for application '" +
+ applicationId + "' in " + this, e);
+ }
+ }
+
+ /**
+ * Initialize the YCA proxy setting.
+ */
+ private void initializeProxy(String host, int port) {
+ ycaProxy = new HttpHost(host, port);
+ getLogger().log(LogLevel.CONFIG,"Proxy is configured; will use proxy: " + ycaProxy);
+ }
+
+ /**
+ * Same a {@code getURI(query, offset, hits, null)}.
+ * @see #getURI(Query, Hit, Connection)
+ */
+ protected URI getURI(Query query,Connection connection) throws MalformedURLException, URISyntaxException {
+ Hit requestMeta;
+ try {
+ requestMeta = (Hit) query.properties().get(HTTPClientSearcher.REQUEST_META_CARRIER);
+ } catch (ClassCastException e) {
+ requestMeta = null;
+ }
+ return getURI(query, requestMeta, connection);
+ }
+
+ /**
+ * Creates the URI for a query.
+ * Populates the {@code requestMeta} meta hit with the created URI HTTP properties.
+ *
+ * @param requestMeta a meta hit that holds logging information about this request (may be {@code null}).
+ */
+ protected URI getURI(Query query, Hit requestMeta, Connection connection)
+ throws MalformedURLException, URISyntaxException {
+ StringBuilder parameters = new StringBuilder();
+
+ Map<String, String> queries = getQueryMap(query);
+ if (queries.size() > 0) {
+ Iterator<Map.Entry<String, String>> mapIterator = queries.entrySet().iterator();
+ parameters.append("?");
+ try {
+ Map.Entry<String, String> entry;
+ while (mapIterator.hasNext()) {
+ entry = mapIterator.next();
+
+ if (requestMeta != null)
+ requestMeta.setField(LOG_QUERY_PARAM_PREFIX
+ + entry.getKey(), entry.getValue());
+
+ parameters.append(entry.getKey() + "=" + URLEncoder.encode(entry.getValue(),
+ httpParameters.getInputEncoding()));
+ if (mapIterator.hasNext()) {
+ parameters.append("&");
+ }
+ }
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException("Unknown input encoding set in " + this, e);
+ }
+ }
+
+ URI uri = new URL(httpParameters.getSchema(), connection.getHost(),
+ connection.getPort(), getPath() + parameters.toString()).toURI();
+ if (requestMeta != null) {
+ requestMeta.setField(LOG_URI, uri.toString());
+ requestMeta.setField(LOG_SCHEME, uri.getScheme());
+ requestMeta.setField(LOG_HOST, uri.getHost());
+ requestMeta.setField(LOG_PORT, uri.getPort());
+ requestMeta.setField(LOG_PATH, uri.getPath());
+ }
+ return uri;
+ }
+
+ /**
+ * Called by getURI() to get the path of the URI for the external service.
+ * The default implementation returns httpParameters.getPath(); subclasses
+ * which only wants to override the path from httpParameters may use this
+ * method instead of overriding all of getURI().
+ *
+ * @return the path to use for getURI
+ */
+ protected String getPath() {
+ return httpParameters.getPath();
+ }
+
+ /**
+ * The URI that is used to check if the provider is up or down. This will again be used in the
+ * checkPing method by checking that we get a response that has a good status code (below 300). If better
+ * validation than just status code checking is needed, override the checkPing method.
+ */
+ protected URI getPingURI(Connection connection) throws MalformedURLException, URISyntaxException {
+ return new URL(httpParameters.getSchema(),connection.getHost(),connection.getPort(),getPingPath()).toURI();
+ }
+
+ /**
+ * Called by getPingURI() to get the path of the URI for pinging the
+ * external service. The default implementation returns
+ * httpParameters.getPath(); subclasses which only wants to override the
+ * path from httpParameters may use this method instead of overriding all of
+ * getPingURI().
+ *
+ * @return the path to use for getPingURI
+ */
+ protected String getPingPath() {
+ return httpParameters.getPath();
+ }
+
+ /**
+ * Checks if the response is valid.
+ * @param response The response from the ping request
+ * @param pong The pong result to return back to the calling method. This method
+ * will add an error to the pong result (using addError) if the status of the HTTP response is 300 or above.
+ */
+ protected void checkPing(HttpResponse response, Pong pong) {
+ if (response.getStatusLine().getStatusCode() >= 300) {
+ pong.addError(com.yahoo.search.result.ErrorMessage.createBackendCommunicationError(
+ "Got error " + response.getStatusLine().getStatusCode()
+ + " when contacting backend")
+ );
+ }
+ }
+
+ /**
+ * Pinging in HTTPBackend is done by creating a PING uri from http://host:port/path.
+ * If this returns a status that is below 300, the ping is considered good.
+ *
+ * If another uri is needed for pinging, reimplement getPingURI.
+ *
+ * Override either this method to change how ping
+ */
+ @Override
+ public Pong ping(Ping ping, Connection connection) {
+ URI uri = null;
+ Pong pong = new Pong();
+ HttpResponse response = null;
+
+ if (httpParameters.getPingOption() == PingOption.DISABLE)
+ return pong;
+
+ try {
+ uri = getPingURI(connection);
+ if (uri == null)
+ pong.addError(ErrorMessage.createIllegalQuery("Ping uri is null"));
+ if (uri.getHost()==null) {
+ pong.addError(ErrorMessage.createIllegalQuery("Ping uri has no host"));
+ uri=null;
+ }
+ } catch (MalformedURLException | URISyntaxException e) {
+ pong.addError(ErrorMessage.createIllegalQuery("Malformed ping uri '" + uri + "': " +
+ Exceptions.toMessageString(e)));
+ } catch (RuntimeException e) {
+ log.log(Level.WARNING,"Unexpected exception while attempting to ping " + connection + " using uri '" + uri + "'",e);
+ pong.addError(ErrorMessage.createIllegalQuery("Unexpected problem with ping uri '" + uri + "': " +
+ Exceptions.toMessageString(e)));
+ }
+
+ if (uri == null) return pong;
+ pong.setPingInfo("using uri '" + uri + "'");
+
+ try {
+ response = getPingResponse(uri, ping);
+ checkPing(response, pong);
+ } catch (IOException e) {
+ //We do not have a valid ping
+ pong.addError(ErrorMessage.createBackendCommunicationError(
+ "Exception thrown when pinging with url '" + uri + "': " + Exceptions.toMessageString(e)));
+ } catch (TimeoutException e) {
+ pong.addError(ErrorMessage.createTimeout("Timeout for ping "
+ + uri + " in " + this + ": " + e.getMessage()));
+ } catch (RuntimeException e) {
+ log.log(Level.WARNING,"Unexpected exception while attempting to ping " + connection + " using uri '" + uri + "'",e);
+ pong.addError(ErrorMessage.createIllegalQuery("Unexpected problem with ping uri '" + uri + "': " +
+ Exceptions.toMessageString(e)));
+ } finally {
+ if (response != null) {
+ cleanupHttpEntity(response.getEntity());
+ }
+ }
+
+ return pong;
+ }
+
+ private HttpResponse getPingResponse(URI uri, Ping ping) throws IOException {
+ long timeLeft = ping.getTimeout();
+ int connectionTimeout = (int) (timeLeft / 4L);
+ int readTimeout = (int) (timeLeft * 3L / 4L);
+
+ Map<String, String> requestHeaders = null;
+ if (httpParameters.getPingOption() == PingOption.YCA)
+ requestHeaders = generateYCAHeaders();
+
+ return getResponse(uri, null, requestHeaders, null, connectionTimeout, readTimeout);
+ }
+
+ /**
+ * Same a {@code getEntity(uri, null)}.
+ * @param uri resource to fetch
+ * @param query the originating query
+ * @throws TimeoutException If query.timeLeft() equal to or lower than 0
+ */
+ protected HttpEntity getEntity(URI uri, Query query) throws IOException{
+ return getEntity(uri, null, query);
+ }
+
+
+ /**
+ * Gets the HTTP entity that holds the response contents.
+ * @param uri the request URI.
+ * @param requestMeta a meta hit that holds logging information about this request (may be {@code null}).
+ * @param query the originating query
+ * @return the http entity, or null if none
+ * @throws java.io.IOException Whenever HTTP status code is in the 300 or higher range.
+ * @throws TimeoutException If query.timeLeft() equal to or lower than 0
+ */
+ protected HttpEntity getEntity(URI uri, Hit requestMeta, Query query) throws IOException {
+ if (query.getTimeLeft() <= 0) {
+ throw new TimeoutException("No time left for querying external backend.");
+ }
+ HttpResponse response = getResponse(uri, requestMeta, query);
+ StatusLine statusLine = response.getStatusLine();
+
+ // Logging
+ if (requestMeta != null) {
+ requestMeta.setField(LOG_STATUS, statusLine.getStatusCode());
+ for (HeaderIterator headers = response.headerIterator(); headers.hasNext(); ) {
+ Header h = headers.nextHeader();
+ requestMeta.setField(LOG_RESPONSE_HEADER_PREFIX + h.getName(), h.getValue());
+ }
+ }
+
+ if (statusLine.getStatusCode() >= 300) {
+ HttpEntity entity = response.getEntity();
+ String message = createServerReporterErrorMessage(statusLine, entity);
+ cleanupHttpEntity(response.getEntity());
+ throw new IOException(message);
+ }
+
+ return response.getEntity();
+ }
+
+ private String createServerReporterErrorMessage(StatusLine statusLine, HttpEntity entity) {
+ String message = "Error when trying to connect to HTTP backend: "
+ + statusLine.getStatusCode() + " : " + statusLine.getReasonPhrase();
+
+ try {
+ if (entity != null) {
+ message += "(Message = " + EntityUtils.toString(entity) + ")";
+ }
+ } catch (Exception e) {
+ log.log(LogLevel.WARNING, "Could not get message.", e);
+ }
+
+ return message;
+ }
+
+ /**
+ * Creates a meta hit dedicated to holding logging information. This hit has
+ * the 'logging:[searcher's ID]' type.
+ */
+ protected Hit createRequestMeta() {
+ Hit requestMeta = new Hit("logging:" + getId().toString());
+ requestMeta.setMeta(true);
+ requestMeta.types().add("logging");
+ return requestMeta;
+ }
+
+ protected void cleanupHttpEntity(HttpEntity entity) {
+ if (entity == null) return;
+
+ try {
+ entity.consumeContent();
+ } catch (IOException e) {
+ // It is ok if do not consume it, the resource will be freed after
+ // timeout.
+ // But log it just in case.
+ log.log(LogLevel.getVespaLogLevel(LogLevel.DEBUG),
+ "Not able to consume after processing: " + Exceptions.toMessageString(e));
+ }
+ }
+
+ /**
+ * Same as {@code getResponse(uri, null)}.
+ */
+ protected HttpResponse getResponse(URI uri, Query query) throws IOException{
+ return getResponse(uri, null, query);
+ }
+
+ /**
+ * Executes an HTTP request and gets the response.
+ * @param uri the request URI.
+ * @param requestMeta a meta hit that holds logging information about this request (may be {@code null}).
+ * @param query the originating query, used to calculate timeouts
+ */
+ protected HttpResponse getResponse(URI uri, Hit requestMeta, Query query) throws IOException {
+ long timeLeft = query.getTimeLeft();
+ int connectionTimeout = (int) (timeLeft / 4L);
+ int readTimeout = (int) (timeLeft * 3L / 4L);
+ connectionTimeout = connectionTimeout <= 0 ? 1 : connectionTimeout;
+ readTimeout = readTimeout <= 0 ? 1 : readTimeout;
+ HttpEntity reqEntity = getRequestEntity(query, requestMeta);
+ Map<String, String> reqHeaders = getRequestHeaders(query, requestMeta);
+ if ((reqEntity == null) && (reqHeaders == null)) {
+ return getResponse(uri, requestMeta, connectionTimeout, readTimeout);
+ } else {
+ return getResponse(uri, reqEntity, reqHeaders, requestMeta, connectionTimeout, readTimeout);
+ }
+ }
+
+ /**
+ * Returns the set of headers to be passed in the http request to provider backend. The default
+ * implementation returns null, unless YCA is in use. If YCA is used, it will return a map
+ * only containing the needed YCA headers.
+ */
+ protected Map<String, String> getRequestHeaders(Query query, Hit requestMeta) {
+ if (useCertificate) {
+ return generateYCAHeaders();
+ }
+ return null;
+ }
+
+ /**
+ * Returns the HTTP request entity to use when making the request for this query.
+ * This default implementation returns null.
+ *
+ * <p> Do return a repeatable entity if HTTP retry is active.
+ *
+ * @return the http request entity to use, or null to use the default entity
+ */
+ protected HttpEntity getRequestEntity(Query query, Hit requestMeta) {
+ return null;
+ }
+
+ /**
+ * Executes an HTTP request and gets the response.
+ * @param uri the request URI.
+ * @param requestMeta a meta hit that holds logging information about this request (may be {@code null}).
+ * @param connectionTimeout how long to wait for getting a connection
+ * @param readTimeout timeout for reading HTTP data
+ */
+ protected HttpResponse getResponse(URI uri, Hit requestMeta, int connectionTimeout, int readTimeout)
+ throws IOException {
+ return getResponse(uri, null, null, requestMeta, connectionTimeout, readTimeout);
+ }
+
+
+ /**
+ * Executes an HTTP request and gets the response.
+ * @param uri the request URI.
+ * @param requestMeta a meta hit that holds logging information about this request (may be {@code null}).
+ * @param connectionTimeout how long to wait for getting a connection
+ * @param readTimeout timeout for reading HTTP data
+ */
+ protected HttpResponse getResponse(URI uri, HttpEntity reqEntity,
+ Map<String, String> reqHeaders, Hit requestMeta,
+ int connectionTimeout, int readTimeout) throws IOException {
+
+ HttpParams httpParams = httpParameters.toHttpParams(connectionTimeout, readTimeout);
+ HttpClient httpClient = createClient(httpParams);
+ long start = 0L;
+ HttpUriRequest request;
+ if (httpParameters.getEnableProxy() && "http".equals(httpParameters.getProxyType())) {
+ HttpHost proxy = new HttpHost(httpParameters.getProxyHost(),
+ httpParameters.getProxyPort(), httpParameters.getProxyType());
+ httpClient.getParams().setParameter(ConnRoutePNames.DEFAULT_PROXY, proxy);
+ // Logging
+ if (requestMeta != null) {
+ requestMeta.setField(LOG_PROXY_TYPE, httpParameters.getProxyType());
+ requestMeta.setField(LOG_PROXY_HOST, httpParameters.getProxyHost());
+ requestMeta.setField(LOG_PROXY_PORT, httpParameters.getProxyPort());
+ }
+ }
+ if (reqEntity == null) {
+ request = createRequest(httpParameters.getMethod(), uri);
+ } else {
+ request = createRequest(httpParameters.getMethod(), uri, reqEntity);
+ }
+
+ if (reqHeaders != null) {
+ for (Entry<String, String> entry : reqHeaders.entrySet()) {
+ if (entry.getValue() == null || isAscii(entry.getValue())) {
+ request.addHeader(entry.getKey(), entry.getValue());
+ } else {
+ byte[] asBytes = Utf8.toBytes(entry.getValue());
+ String asLyingString = new String(asBytes, 0, asBytes.length, iso8859Charset);
+ request.addHeader(entry.getKey(), asLyingString);
+ }
+ }
+ }
+
+ // Logging
+ if (requestMeta != null) {
+ for (HeaderIterator headers = request.headerIterator(); headers.hasNext();) {
+ Header h = headers.nextHeader();
+ requestMeta.setField(LOG_HEADER_PREFIX + h.getName(), h.getValue());
+ }
+ start = System.currentTimeMillis();
+ }
+
+ HttpResponse response;
+
+ try {
+ HttpContext context = new BasicHttpContext();
+ response = httpClient.execute(request, context);
+
+ if (requestMeta != null) {
+ requestMeta.setField(LOG_IP_ADDRESS, getIpAddress(context));
+ }
+ } catch (ConnectTimeoutException e) {
+ connectTimeouts.increment();
+ throw e;
+ }
+
+ // Logging
+ long latencyStart = System.currentTimeMillis() - start;
+ if (requestMeta != null) {
+ requestMeta.setField(LOG_LATENCY_START, latencyStart);
+ }
+ logResponseLatency(latencyStart);
+ return response;
+ }
+
+ private String getIpAddress(HttpContext context) {
+ HttpConnection connection = (HttpConnection) context.getAttribute(ExecutionContext.HTTP_CONNECTION);
+ if (connection instanceof HttpInetConnection) {
+ InetAddress address = ((HttpInetConnection) connection).getRemoteAddress();
+ String hostAddress = address.getHostAddress();
+ return hostAddress == null ?
+ IP_ADDRESS_UNKNOWN:
+ hostAddress;
+ } else {
+ getLogger().log(LogLevel.DEBUG, "Unexpected connection type: " + connection.getClass().getName());
+ return IP_ADDRESS_UNKNOWN;
+ }
+ }
+
+ private boolean isAscii(String value) {
+ char[] scanBuffer = new char[value.length()];
+ value.getChars(0, value.length(), scanBuffer, 0);
+ for (char c: scanBuffer)
+ if (c > 127) return false;
+ return true;
+ }
+
+ protected void logResponseLatency(long latency) { }
+
+ /**
+ * Creates a http client for one request. Override to customize the client
+ * to use, e.g for testing. This default implementation will add the YCA
+ * proxy to params if is necessary, and then do
+ * <code>return new SearcherHttpClient(getConnectionManager(params), params);</code>
+ */
+ protected HttpClient createClient(HttpParams params) {
+ if (ycaProxy != null) {
+ params.setParameter(ConnRoutePNames.DEFAULT_PROXY, ycaProxy);
+ }
+ return new SearcherHttpClient(getConnectionManager(params), params);
+ }
+
+ /**
+ * Creates a HttpRequest. Override to customize the request.
+ * This default implementation does <code>return new HttpRequest(method,uri);</code>
+ */
+ protected HttpUriRequest createRequest(String method,URI uri) {
+ return createRequest(method, uri, null);
+ }
+
+ /**
+ * Creates a HttpRequest. Override to customize the request.
+ * This default implementation does <code>return new HttpRequest(method,uri);</code>
+ */
+ protected HttpUriRequest createRequest(String method,URI uri, HttpEntity entity) {
+ return new SearcherHttpRequest(method,uri);
+ }
+
+ /** Get a connection manager which may be used safely from this thread */
+ protected ClientConnectionManager getConnectionManager(HttpParams params) {
+ if (sharedConnectionManager != null) {// We are using shared connections
+ return sharedConnectionManager;
+ } else {
+ SingleClientConnManager singleClientConnManager = singleClientConnManagerThreadLocal.get();
+ if (singleClientConnManager == null) {
+ singleClientConnManager = new SingleClientConnManager(params, schemeRegistry);
+ singleClientConnManagerThreadLocal.set(singleClientConnManager);
+ }
+ return singleClientConnManager;
+ }
+ }
+
+ /** Utility method for creating error messages when a url is incorrect */
+ protected ErrorMessage createMalformedUrlError(Query query,Exception e) {
+ return ErrorMessage.createErrorInPluginSearcher("Malformed url in " + this + " for " + query +
+ ": " + Exceptions.toMessageString(e));
+ }
+
+ private Map<String, String> generateYCAHeaders() {
+ Map<String, String> headers = new HashMap<>();
+ String certificate = certificateStore.getCertificate(ycaApplicationId, ycaTtl, ycaRetry);
+ headers.put(YCA_HTTP_HEADER, certificate);
+ return headers;
+ }
+
+ protected static class SearcherHttpClient extends DefaultHttpClient {
+
+ private final int retries;
+
+ public SearcherHttpClient(final ClientConnectionManager conman, final HttpParams params) {
+ super(conman, params);
+ retries = params.getIntParameter(HTTPParameters.RETRIES, 1);
+ addRequestInterceptor((request, context) -> {
+ if (!request.containsHeader("Accept-Encoding")) {
+ request.addHeader("Accept-Encoding", "gzip");
+ }
+ });
+ addResponseInterceptor((response, context) -> {
+ HttpEntity entity = response.getEntity();
+ if (entity == null) return;
+ Header ceheader = entity.getContentEncoding();
+ if (ceheader == null) return;
+ for (HeaderElement codec : ceheader.getElements()) {
+ if (codec.getName().equalsIgnoreCase("gzip")) {
+ response.setEntity(new GzipDecompressingEntity(response.getEntity()));
+ return;
+ }
+ }
+ });
+ }
+
+ @Override
+ protected HttpRequestExecutor createRequestExecutor() {
+ return new HttpRequestExecutor();
+ }
+
+ @Override
+ protected HttpRoutePlanner createHttpRoutePlanner() {
+ return new DefaultHttpRoutePlanner(getConnectionManager().getSchemeRegistry());
+ }
+
+ @Override
+ protected HttpRequestRetryHandler createHttpRequestRetryHandler() {
+ return new SearcherHttpRequestRetryHandler(retries);
+ }
+ }
+
+ /** A retry handler which avoids retrying forever on errors misclassified as transient */
+ private static class SearcherHttpRequestRetryHandler implements HttpRequestRetryHandler {
+ private final int retries;
+
+ public SearcherHttpRequestRetryHandler(int retries) {
+ this.retries = retries;
+ }
+
+ @Override
+ public boolean retryRequest(IOException e, int executionCount, HttpContext httpContext) {
+ if (e == null) {
+ throw new IllegalArgumentException("Exception parameter may not be null");
+ }
+ if (executionCount > retries) {
+ return false;
+ }
+ if (e instanceof NoHttpResponseException) {
+ // Retry if the server dropped connection on us
+ return true;
+ }
+ if (e instanceof InterruptedIOException) {
+ // Timeout from federation layer
+ return false;
+ }
+ if (e instanceof UnknownHostException) {
+ // Unknown host
+ return false;
+ }
+ if (e instanceof SSLHandshakeException) {
+ // SSL handshake exception
+ return false;
+ }
+ return true;
+ }
+
+
+ }
+
+ private static class SearcherHttpRequest extends HttpRequestBase {
+ String method;
+
+ public SearcherHttpRequest(String method, final URI uri) {
+ super();
+ this.method = method;
+ setURI(uri);
+ }
+
+ @Override
+ public String getMethod() {
+ return method;
+ }
+ }
+
+ /**
+ * Only for testing.
+ */
+ public void shutdownConnectionManagers() {
+ ClientConnectionManager manager;
+ if (sharedConnectionManager != null) {
+ manager = sharedConnectionManager;
+ } else {
+ manager = singleClientConnManagerThreadLocal.get();
+ }
+ if (manager != null) {
+ manager.shutdown();
+ }
+ }
+
+ protected static final class ThrowingCertificateStore implements CertificateStore {
+
+ @Override
+ public String getCertificate(String key, long ttl, long retry) {
+ throw new UnsupportedOperationException("A certificate store is not available");
+ }
+
+ }
+
+}
+
diff --git a/container-search/src/main/java/com/yahoo/search/federation/http/TimedHttpEntity.java b/container-search/src/main/java/com/yahoo/search/federation/http/TimedHttpEntity.java
new file mode 100644
index 00000000000..9d89a318c32
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/http/TimedHttpEntity.java
@@ -0,0 +1,88 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.federation.http;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.http.Header;
+import org.apache.http.HttpEntity;
+
+/**
+ * Wrapper for adding timeout to an HttpEntity instance.
+ *
+ * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
+ */
+public class TimedHttpEntity implements HttpEntity {
+ /**
+ * The wrapped entity. Never null.
+ */
+ private final HttpEntity entity;
+ private final long startTime;
+ private final long timeout;
+
+ public TimedHttpEntity(HttpEntity entity, long startTime, long timeout) {
+ if (entity == null) {
+ throw new IllegalArgumentException("TimedHttpEntity cannot be instantiated with null HttpEntity.");
+ }
+ this.entity = entity;
+ this.startTime = startTime;
+ this.timeout = timeout;
+ }
+
+
+ @Override
+ public InputStream getContent() throws IOException, IllegalStateException {
+ InputStream content = entity.getContent();
+ if (content == null) {
+ return null;
+ } else {
+ return new TimedStream(content, startTime, timeout);
+ }
+ }
+
+
+ // START OF PURE FORWARDING METHODS
+ @Override
+ public void consumeContent() throws IOException {
+ entity.consumeContent();
+ }
+
+
+ @Override
+ public Header getContentEncoding() {
+ return entity.getContentEncoding();
+ }
+
+ @Override
+ public long getContentLength() {
+ return entity.getContentLength();
+ }
+
+ @Override
+ public Header getContentType() {
+ return entity.getContentType();
+ }
+
+ @Override
+ public boolean isChunked() {
+ return entity.isChunked();
+ }
+
+ @Override
+ public boolean isRepeatable() {
+ return entity.isRepeatable();
+ }
+
+ @Override
+ public boolean isStreaming() {
+ return entity.isStreaming();
+ }
+
+ @Override
+ public void writeTo(OutputStream outstream) throws IOException {
+ entity.writeTo(outstream);
+ }
+ // END OF PURE FORWARDING METHODS
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/federation/http/TimedStream.java b/container-search/src/main/java/com/yahoo/search/federation/http/TimedStream.java
new file mode 100644
index 00000000000..02777afb43c
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/http/TimedStream.java
@@ -0,0 +1,111 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.federation.http;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A stream which throws a TimeoutException if query timeout has been reached.
+ *
+ * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
+ */
+public class TimedStream extends InputStream {
+
+ /**
+ * A time barrier value, the point in time from which on read operations will cause an exception.
+ */
+ private final long limit;
+
+ /**
+ * A wrapped InputStream instance.
+ */
+ private final InputStream content;
+
+ /**
+ * Wrap an InputStream to make read operations potentially fire off
+ * TimeoutException.
+ *
+ * <p>Typical use would be<br>
+ * <code>new TimedStream(httpEntity.getContent(), query.getStartTime(), query.getTimeout())</code>
+ *
+ * @param content
+ * the InputStream to wrap
+ * @param startTime
+ * start time of query
+ * @param timeout
+ * how long the query is allowed to run
+ */
+ public TimedStream(InputStream content, long startTime, long timeout) {
+ if (content == null) {
+ throw new IllegalArgumentException("Cannot instantiate TimedStream with null InputStream");
+ }
+ this.content = content;
+ // The reasion for doing it in here instead of outside the constructor
+ // is this makes the usage of the class more intuitive IMHO
+ this.limit = startTime + timeout;
+ }
+
+ private void checkTime(String message) {
+ if (System.currentTimeMillis() >= limit) {
+ throw new TimeoutException(message);
+ }
+ }
+
+ // START FORWARDING METHODS:
+ // All methods below are forwarding methods to the contained stream, where
+ // some do a timeout check.
+ @Override
+ public int read() throws IOException {
+ int data = content.read();
+ checkTime("Timed out during read().");
+ return data;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return content.available();
+ }
+
+ @Override
+ public void close() throws IOException {
+ content.close();
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ content.mark(readlimit);
+ }
+
+ @Override
+ public boolean markSupported() {
+ return content.markSupported();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ int length = content.read(b, off, len);
+ checkTime("Timed out during read(byte[], int, int)");
+ return length;
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ int length = content.read(b);
+ checkTime("Timed out during read(byte[])");
+ return length;
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ content.reset();
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ long skipped = content.skip(n);
+ checkTime("Timed out during skip(long)");
+ return skipped;
+ }
+ // END FORWARDING METHODS
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/federation/http/TimeoutException.java b/container-search/src/main/java/com/yahoo/search/federation/http/TimeoutException.java
new file mode 100644
index 00000000000..9e0536ea053
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/http/TimeoutException.java
@@ -0,0 +1,20 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.federation.http;
+
+/**
+ * Timeout marker for slow HTTP connections.
+ *
+ * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
+ */
+public class TimeoutException extends RuntimeException {
+
+ /**
+ * Auto-generated version ID.
+ */
+ private static final long serialVersionUID = 7084147598258586559L;
+
+ public TimeoutException(String message) {
+ super(message);
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/federation/http/package-info.java b/container-search/src/main/java/com/yahoo/search/federation/http/package-info.java
new file mode 100644
index 00000000000..aa3d249ab66
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/http/package-info.java
@@ -0,0 +1,7 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+@PublicApi
+package com.yahoo.search.federation.http;
+
+import com.yahoo.api.annotations.PublicApi;
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/container-search/src/main/java/com/yahoo/search/federation/package-info.java b/container-search/src/main/java/com/yahoo/search/federation/package-info.java
new file mode 100644
index 00000000000..008e339db4b
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/package-info.java
@@ -0,0 +1,17 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+/**
+ * The federation layer on top of the search container. This contains
+ *
+ * <ul>
+ * <li>A model of Sources which can be selected in and for a Query and which are implemented
+ * by a Search Chain, and Providers which represents the connection to specific backends (these
+ * two are often 1-1 but not always)
+ * <li>The federation searcher responsible for forking a query to multiple sources in parallel
+ * <li>A simple searcher which can talk to other vespa services
+ * </ul>
+ */
+@ExportPackage
+package com.yahoo.search.federation;
+
+import com.yahoo.api.annotations.PublicApi;
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/container-search/src/main/java/com/yahoo/search/federation/selection/FederationTarget.java b/container-search/src/main/java/com/yahoo/search/federation/selection/FederationTarget.java
new file mode 100644
index 00000000000..676292d6a3a
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/selection/FederationTarget.java
@@ -0,0 +1,68 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.federation.selection;
+
+import java.util.Optional;
+import com.yahoo.component.chain.Chain;
+import com.yahoo.search.Searcher;
+import com.yahoo.search.searchchain.model.federation.FederationOptions;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Represents a search chain that the federation searcher should send a query to,
+ * along with a timeout and
+ * custom data reserved for use by the TargetSelector.
+ *
+ * @author tonytv
+ */
+public final class FederationTarget<T> {
+ private final Chain<Searcher> chain;
+ private final FederationOptions federationOptions;
+ private final T customData;
+
+ public FederationTarget(Chain<Searcher> chain, FederationOptions federationOptions, T customData) {
+ checkNotNull(chain);
+ checkNotNull(federationOptions);
+
+ this.chain = chain;
+ this.federationOptions = federationOptions;
+ this.customData = customData;
+ }
+
+ public Chain<Searcher> getChain() {
+ return chain;
+ }
+
+ public FederationOptions getFederationOptions() {
+ return federationOptions;
+ }
+
+ /**
+ * Any data that the TargetSelector wants to associate with this target.
+ * Owned exclusively by the TargetSelector that created this instance.
+ */
+ public T getCustomData() {
+ return customData;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ FederationTarget that = (FederationTarget) o;
+
+ if (!chain.equals(that.chain)) return false;
+ if (customData != null ? !customData.equals(that.customData) : that.customData != null) return false;
+ if (!federationOptions.equals(that.federationOptions)) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = chain.hashCode();
+ result = 31 * result + federationOptions.hashCode();
+ return result;
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/federation/selection/TargetSelector.java b/container-search/src/main/java/com/yahoo/search/federation/selection/TargetSelector.java
new file mode 100644
index 00000000000..0f6bf2d5b71
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/selection/TargetSelector.java
@@ -0,0 +1,35 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.federation.selection;
+
+import com.yahoo.processing.execution.chain.ChainRegistry;
+import com.yahoo.search.Query;
+import com.yahoo.search.Result;
+import com.yahoo.search.Searcher;
+import com.yahoo.search.federation.selection.FederationTarget;
+
+import java.util.Collection;
+
+/**
+ * Allows adding extra targets that the federation searcher should federate to.
+ *
+ * For each federation search call, the federation searcher will call targetSelector.getTargets.
+ *
+ * Then, for each target, it will:
+ * 1) call modifyTargetQuery(target, query)
+ * 2) call modifyTargetResult(target, result)
+ *
+ * @author tonytv
+ */
+public interface TargetSelector<T> {
+ Collection<FederationTarget<T>> getTargets(Query query, ChainRegistry<Searcher> searcherChainRegistry);
+
+ /**
+ * For modifying the query before sending it to a the target
+ */
+ void modifyTargetQuery(FederationTarget<T> target, Query query);
+
+ /**
+ * For modifying the result produced by the target.
+ */
+ void modifyTargetResult(FederationTarget<T> target, Result result);
+}
diff --git a/container-search/src/main/java/com/yahoo/search/federation/selection/package-info.java b/container-search/src/main/java/com/yahoo/search/federation/selection/package-info.java
new file mode 100644
index 00000000000..f3c289f6b43
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/selection/package-info.java
@@ -0,0 +1,7 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+@PublicApi
+package com.yahoo.search.federation.selection;
+
+import com.yahoo.api.annotations.PublicApi;
+import com.yahoo.osgi.annotation.ExportPackage;
diff --git a/container-search/src/main/java/com/yahoo/search/federation/sourceref/SearchChainInvocationSpec.java b/container-search/src/main/java/com/yahoo/search/federation/sourceref/SearchChainInvocationSpec.java
new file mode 100644
index 00000000000..7e82801d85f
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/sourceref/SearchChainInvocationSpec.java
@@ -0,0 +1,37 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.federation.sourceref;
+
+import com.yahoo.component.ComponentId;
+import com.yahoo.search.searchchain.model.federation.FederationOptions;
+
+import java.util.List;
+
+/**
+ * Specifices which search chain should be run and how it should be run.
+ *
+ * @author tonytv
+ */
+public class SearchChainInvocationSpec implements Cloneable {
+ public final ComponentId searchChainId;
+
+ public final ComponentId source;
+ public final ComponentId provider;
+
+ public final FederationOptions federationOptions;
+ public final List<String> documentTypes;
+
+ SearchChainInvocationSpec(ComponentId searchChainId,
+ ComponentId source, ComponentId provider, FederationOptions federationOptions,
+ List<String> documentTypes) {
+ this.searchChainId = searchChainId;
+ this.source = source;
+ this.provider = provider;
+ this.federationOptions = federationOptions;
+ this.documentTypes = documentTypes;
+ }
+
+ @Override
+ public SearchChainInvocationSpec clone() throws CloneNotSupportedException {
+ return (SearchChainInvocationSpec)super.clone();
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/federation/sourceref/SearchChainResolver.java b/container-search/src/main/java/com/yahoo/search/federation/sourceref/SearchChainResolver.java
new file mode 100644
index 00000000000..fc70fb5e5e7
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/sourceref/SearchChainResolver.java
@@ -0,0 +1,160 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.federation.sourceref;
+
+import com.yahoo.component.ComponentId;
+import com.yahoo.component.ComponentSpecification;
+import com.yahoo.component.provider.ComponentRegistry;
+import com.yahoo.processing.request.Properties;
+import com.yahoo.search.searchchain.model.federation.FederationOptions;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Resolves (source, provider) component specifications to a search chain invocation spec.
+ * The provider component specification is given by the entry in the queryMap with key
+ * 'source.&lt;source-name&gt;.provider'.
+ *
+ * <p>
+ * The diagram shows the relationship between source, provider and the result:
+ * (source is used to select row, provider is used to select column.)
+ * Provider id = null is used for regular search chains.
+ * </p>
+ *
+ * <pre>
+ * Provider id
+ * null
+ * |----+---+---+---|
+ * | o | | | |
+ * |----+---+---+---|
+ * Source id | | o | o | |
+ * |----+---+---+---|
+ * | | | | o |
+ * |----+---+---+---|
+ *
+ * o: SearchChainInvocationSpec
+ * </pre>
+ *
+ * @author tonytv
+ */
+public class SearchChainResolver {
+ private final ComponentRegistry<Target> targets;
+ private final SortedSet<Target> defaultTargets;
+
+ public static class Builder {
+
+ private SortedSet<Target> defaultTargets = new TreeSet<>();
+
+ private final ComponentRegistry<Target> targets = new ComponentRegistry<Target>() {
+ @Override
+ public void freeze() {
+ for (Target target : allComponents()) {
+ target.freeze();
+ }
+ super.freeze();
+ }
+ };
+
+ public Builder addSearchChain(ComponentId searchChainId) {
+ return addSearchChain(searchChainId, Collections.<String>emptyList());
+ }
+
+ public Builder addSearchChain(ComponentId searchChainId, FederationOptions federationOptions) {
+ return addSearchChain(searchChainId, federationOptions, Collections.<String>emptyList());
+ }
+
+ public Builder addSearchChain(ComponentId searchChainId, List<String> documentTypes) {
+ return addSearchChain(searchChainId, new FederationOptions(), documentTypes);
+ }
+
+ public Builder addSearchChain(ComponentId searchChainId, FederationOptions federationOptions,
+ List<String> documentTypes) {
+ registerTarget(new SingleTarget(searchChainId,
+ new SearchChainInvocationSpec(searchChainId, null, null, federationOptions, documentTypes), false));
+ return this;
+ }
+
+ private Builder registerTarget(SingleTarget singleTarget) {
+ targets.register(singleTarget.getId(), singleTarget);
+ if (singleTarget.useByDefault()) {
+ defaultTargets.add(singleTarget);
+ }
+ return this;
+ }
+
+ public Builder addSourceForProvider(ComponentId sourceId, ComponentId providerId, ComponentId searchChainId,
+ boolean isDefaultProviderForSource, FederationOptions federationOptions,
+ List<String> documentTypes) {
+
+ SearchChainInvocationSpec searchChainInvocationSpec =
+ new SearchChainInvocationSpec(searchChainId, sourceId, providerId, federationOptions, documentTypes);
+
+ SourcesTarget sourcesTarget = getOrRegisterSourceTarget(sourceId);
+ sourcesTarget.addSource(providerId, searchChainInvocationSpec, isDefaultProviderForSource);
+
+ registerTarget(new SingleTarget(searchChainId, searchChainInvocationSpec, true));
+ return this;
+ }
+
+ private SourcesTarget getOrRegisterSourceTarget(ComponentId sourceId) {
+ Target sourcesTarget = targets.getComponent(sourceId);
+ if (sourcesTarget == null) {
+ targets.register(sourceId, new SourcesTarget(sourceId));
+ return getOrRegisterSourceTarget(sourceId);
+ } else if (sourcesTarget instanceof SourcesTarget) {
+ return (SourcesTarget) sourcesTarget;
+ } else {
+ throw new IllegalStateException("Expected " + sourceId + " to be a source.");
+ }
+ }
+
+ public void useTargetByDefault(String targetId) {
+ Target target = targets.getComponent(targetId);
+ assert target != null : "Target not added yet.";
+
+ defaultTargets.add(target);
+ }
+
+ public SearchChainResolver build() {
+ targets.freeze();
+ return new SearchChainResolver(targets, defaultTargets);
+ }
+ }
+
+ private SearchChainResolver(ComponentRegistry<Target> targets, SortedSet<Target> defaultTargets) {
+ this.targets = targets;
+ this.defaultTargets = Collections.unmodifiableSortedSet(defaultTargets);
+ }
+
+
+ public SearchChainInvocationSpec resolve(ComponentSpecification sourceRef, Properties sourceToProviderMap)
+ throws UnresolvedSearchChainException {
+
+ Target target = resolveTarget(sourceRef);
+ return target.responsibleSearchChain(sourceToProviderMap);
+ }
+
+ private Target resolveTarget(ComponentSpecification sourceRef) throws UnresolvedSearchChainException {
+ Target target = targets.getComponent(sourceRef);
+ if (target == null) {
+ throw UnresolvedSourceRefException.createForMissingSourceRef(sourceRef);
+ }
+ return target;
+ }
+
+ public SortedSet<Target> allTopLevelTargets() {
+ SortedSet<Target> topLevelTargets = new TreeSet<>();
+ for (Target target : targets.allComponents()) {
+ if (!target.isDerived) {
+ topLevelTargets.add(target);
+ }
+ }
+ return topLevelTargets;
+ }
+
+ public SortedSet<Target> defaultTargets() {
+ return defaultTargets;
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/federation/sourceref/SingleTarget.java b/container-search/src/main/java/com/yahoo/search/federation/sourceref/SingleTarget.java
new file mode 100644
index 00000000000..4210b56a501
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/sourceref/SingleTarget.java
@@ -0,0 +1,36 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.federation.sourceref;
+
+import com.yahoo.component.ComponentId;
+import com.yahoo.processing.request.Properties;
+
+/**
+ * TODO: What is this?
+ *
+* @author tonytv
+*/
+public class SingleTarget extends Target {
+ private final SearchChainInvocationSpec searchChainInvocationSpec;
+
+ public SingleTarget(ComponentId id, SearchChainInvocationSpec searchChainInvocationSpec, boolean isDerived) {
+ super(id, isDerived);
+ this.searchChainInvocationSpec = searchChainInvocationSpec;
+ }
+
+ @Override
+ public SearchChainInvocationSpec responsibleSearchChain(Properties queryProperties) {
+ return searchChainInvocationSpec;
+ }
+
+ @Override
+ public String searchRefDescription() {
+ return localId.toString();
+ }
+
+ @Override
+ void freeze() {}
+
+ public final boolean useByDefault() {
+ return searchChainInvocationSpec.federationOptions.getUseByDefault();
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/federation/sourceref/SourceRefResolver.java b/container-search/src/main/java/com/yahoo/search/federation/sourceref/SourceRefResolver.java
new file mode 100644
index 00000000000..8de6635e517
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/sourceref/SourceRefResolver.java
@@ -0,0 +1,71 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.federation.sourceref;
+
+import static com.yahoo.container.util.Util.quote;
+
+import java.util.Arrays;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+import com.yahoo.component.ComponentSpecification;
+import com.yahoo.prelude.IndexFacts;
+import com.yahoo.processing.request.Properties;
+
+/**
+ * Maps a source reference to search chain invocation specs.
+ *
+ * @author tonytv
+ */
+public class SourceRefResolver {
+ private final SearchChainResolver searchChainResolver;
+
+ public SourceRefResolver(SearchChainResolver searchChainResolver) {
+ this.searchChainResolver = searchChainResolver;
+ }
+ public Set<SearchChainInvocationSpec> resolve(ComponentSpecification sourceRef, Properties sourceToProviderMap,
+ IndexFacts indexFacts)
+ throws UnresolvedSearchChainException {
+
+ try {
+ return new LinkedHashSet<>(Arrays.asList(searchChainResolver.resolve(sourceRef, sourceToProviderMap)));
+ } catch (UnresolvedSourceRefException e) {
+ return resolveClustersWithDocument(sourceRef, sourceToProviderMap, indexFacts);
+ }
+ }
+
+ private Set<SearchChainInvocationSpec> resolveClustersWithDocument(ComponentSpecification sourceRef,
+ Properties sourceToProviderMap,
+ IndexFacts indexFacts)
+ throws UnresolvedSearchChainException {
+
+ if (hasOnlyName(sourceRef)) {
+ Set<SearchChainInvocationSpec> clusterSearchChains = new LinkedHashSet<>();
+
+ List<String> clusters = indexFacts.clustersHavingSearchDefinition(sourceRef.getName());
+ for (String cluster : clusters) {
+ clusterSearchChains.add(resolveClusterSearchChain(cluster, sourceRef, sourceToProviderMap));
+ }
+
+ if (!clusterSearchChains.isEmpty())
+ return clusterSearchChains;
+ }
+
+ throw UnresolvedSourceRefException.createForMissingSourceRef(sourceRef);
+
+ }
+
+ private SearchChainInvocationSpec resolveClusterSearchChain(String cluster, ComponentSpecification sourceRef,
+ Properties sourceToProviderMap) throws UnresolvedSearchChainException {
+ try {
+ return searchChainResolver.resolve(new ComponentSpecification(cluster), sourceToProviderMap);
+ } catch (UnresolvedSearchChainException e) {
+ throw new UnresolvedSearchChainException("Failed to resolve cluster search chain " + quote(cluster) +
+ " when using source ref " + quote(sourceRef) + " as a document name.");
+ }
+ }
+
+ private boolean hasOnlyName(ComponentSpecification sourceSpec) {
+ return new ComponentSpecification(sourceSpec.getName()).equals(sourceSpec);
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/federation/sourceref/SourcesTarget.java b/container-search/src/main/java/com/yahoo/search/federation/sourceref/SourcesTarget.java
new file mode 100644
index 00000000000..bb1de051ed0
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/sourceref/SourcesTarget.java
@@ -0,0 +1,112 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.federation.sourceref;
+
+
+import com.google.common.base.Joiner;
+import com.yahoo.component.ComponentId;
+import com.yahoo.component.ComponentSpecification;
+import com.yahoo.component.chain.model.ComponentAdaptor;
+import com.yahoo.component.provider.ComponentRegistry;
+import com.yahoo.processing.request.Properties;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+
+public class SourcesTarget extends Target {
+ private ComponentRegistry<ComponentAdaptor<SearchChainInvocationSpec>> providerSources =
+ new ComponentRegistry<ComponentAdaptor<SearchChainInvocationSpec>>() {};
+ private SearchChainInvocationSpec defaultProviderSource;
+
+ public SourcesTarget(ComponentId sourceId) {
+ super(sourceId);
+ }
+
+ @Override
+ public SearchChainInvocationSpec responsibleSearchChain(Properties queryProperties) throws UnresolvedSearchChainException {
+ ComponentSpecification providerSpecification = providerSpecificationForSource(queryProperties);
+ if (providerSpecification == null) {
+ return defaultProviderSource;
+ } else {
+ return lookupProviderSource(providerSpecification);
+ }
+ }
+
+ @Override
+ public String searchRefDescription() {
+ StringBuilder builder = new StringBuilder(sourceId().stringValue());
+ builder.append("[provider = ").
+ append(Joiner.on(", ").join(allProviderIdsStringValue())).
+ append("]");
+ return builder.toString();
+ }
+
+ private SortedSet<String> allProviderIdsStringValue() {
+ SortedSet<String> result = new TreeSet<>();
+ for (ComponentAdaptor<SearchChainInvocationSpec> providerSource : providerSources.allComponents()) {
+ result.add(providerSource.getId().stringValue());
+ }
+ return result;
+ }
+
+ private SearchChainInvocationSpec lookupProviderSource(ComponentSpecification providerSpecification)
+ throws UnresolvedSearchChainException {
+ ComponentAdaptor<SearchChainInvocationSpec> providerSource = providerSources.getComponent(providerSpecification);
+
+ if (providerSource == null)
+ throw UnresolvedProviderException.createForMissingProvider(sourceId(), providerSpecification);
+
+ return providerSource.model;
+ }
+
+ public void freeze() {
+ if (defaultProviderSource == null)
+ throw new RuntimeException("Null default provider source for source " + sourceId() + ".");
+
+ providerSources.freeze();
+ }
+
+ public void addSource(ComponentId providerId, SearchChainInvocationSpec searchChainInvocationSpec,
+ boolean isDefaultProviderForSource) {
+ providerSources.register(providerId, new ComponentAdaptor<>(providerId, searchChainInvocationSpec));
+
+ if (isDefaultProviderForSource) {
+ setDefaultProviderSource(searchChainInvocationSpec);
+ }
+ }
+
+ private void setDefaultProviderSource(SearchChainInvocationSpec searchChainInvocationSpec) {
+ if (defaultProviderSource != null)
+ throw new RuntimeException("Tried to set two default providers for source " + sourceId() + ".");
+
+ defaultProviderSource = searchChainInvocationSpec;
+ }
+
+ ComponentId sourceId() {
+ return localId;
+ }
+
+
+ /**
+ * Looks up source.(sourceId).provider in the query properties.
+ * @return null if the default provider should be used
+ */
+ private ComponentSpecification providerSpecificationForSource(Properties queryProperties) {
+ String spec = queryProperties.getString("source." + sourceId().stringValue() + ".provider");
+ return ComponentSpecification.fromString(spec);
+ }
+
+ public SearchChainInvocationSpec defaultProviderSource() {
+ return defaultProviderSource;
+ }
+
+ public List<SearchChainInvocationSpec> allProviderSources() {
+ List<SearchChainInvocationSpec> allProviderSources = new ArrayList<>();
+ for (ComponentAdaptor<SearchChainInvocationSpec> component : providerSources.allComponents()) {
+ allProviderSources.add(component.model);
+ }
+ return allProviderSources;
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/federation/sourceref/Target.java b/container-search/src/main/java/com/yahoo/search/federation/sourceref/Target.java
new file mode 100644
index 00000000000..4cf5d406959
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/sourceref/Target.java
@@ -0,0 +1,31 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.federation.sourceref;
+
+import com.yahoo.component.AbstractComponent;
+import com.yahoo.component.ComponentId;
+import com.yahoo.processing.request.Properties;
+
+/**
+ * TODO: What's this?
+ *
+* @author tonytv
+*/
+public abstract class Target extends AbstractComponent {
+ final ComponentId localId;
+ final boolean isDerived;
+
+ Target(ComponentId localId, boolean derived) {
+ super(localId);
+ this.localId = localId;
+ isDerived = derived;
+ }
+
+ Target(ComponentId localId) {
+ this(localId, false);
+ }
+
+ public abstract SearchChainInvocationSpec responsibleSearchChain(Properties queryProperties) throws UnresolvedSearchChainException;
+ public abstract String searchRefDescription();
+
+ abstract void freeze();
+}
diff --git a/container-search/src/main/java/com/yahoo/search/federation/sourceref/UnresolvedProviderException.java b/container-search/src/main/java/com/yahoo/search/federation/sourceref/UnresolvedProviderException.java
new file mode 100644
index 00000000000..50b2dc95660
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/sourceref/UnresolvedProviderException.java
@@ -0,0 +1,22 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.federation.sourceref;
+
+import com.yahoo.component.ComponentId;
+import com.yahoo.component.ComponentSpecification;
+
+import static com.yahoo.container.util.Util.quote;
+
+/**
+ * @author tonytv
+ */
+@SuppressWarnings("serial")
+class UnresolvedProviderException extends UnresolvedSearchChainException {
+ UnresolvedProviderException(String msg) {
+ super(msg);
+ }
+
+ static UnresolvedSearchChainException createForMissingProvider(ComponentId source,
+ ComponentSpecification provider) {
+ return new UnresolvedProviderException("No provider " + quote(provider) + " for source " + quote(source) + ".");
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/federation/sourceref/UnresolvedSearchChainException.java b/container-search/src/main/java/com/yahoo/search/federation/sourceref/UnresolvedSearchChainException.java
new file mode 100644
index 00000000000..b8417a3d05a
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/sourceref/UnresolvedSearchChainException.java
@@ -0,0 +1,13 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.federation.sourceref;
+
+/**
+ * Thrown if a search chain can not be resolved from one or more ids.
+ * @author tonytv
+ */
+@SuppressWarnings("serial")
+public class UnresolvedSearchChainException extends Exception {
+ public UnresolvedSearchChainException(String msg) {
+ super(msg);
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/federation/sourceref/UnresolvedSourceRefException.java b/container-search/src/main/java/com/yahoo/search/federation/sourceref/UnresolvedSourceRefException.java
new file mode 100644
index 00000000000..4c15366914b
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/sourceref/UnresolvedSourceRefException.java
@@ -0,0 +1,21 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.federation.sourceref;
+
+import com.yahoo.component.ComponentSpecification;
+
+import static com.yahoo.container.util.Util.quote;
+
+/**
+ * @author tonytv
+ */
+@SuppressWarnings("serial")
+class UnresolvedSourceRefException extends UnresolvedSearchChainException {
+ UnresolvedSourceRefException(String msg) {
+ super(msg);
+ }
+
+
+ static UnresolvedSearchChainException createForMissingSourceRef(ComponentSpecification source) {
+ return new UnresolvedSourceRefException("Could not resolve source ref " + quote(source) + ".");
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/federation/vespa/QueryMarshaller.java b/container-search/src/main/java/com/yahoo/search/federation/vespa/QueryMarshaller.java
new file mode 100644
index 00000000000..554424c267f
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/vespa/QueryMarshaller.java
@@ -0,0 +1,170 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.federation.vespa;
+
+import java.util.Iterator;
+
+import com.yahoo.prelude.query.*;
+
+/**
+ * Marshal a query stack into an advanced query string suitable for
+ * passing to another QRS.
+ *
+ * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
+ * @author <a href="mailto:rafan@yahoo-inc.com">Rong-En Fan</a>
+ */
+public class QueryMarshaller {
+ private boolean atRoot = true;
+
+ public String marshal(Item root) {
+ if (root == null || root instanceof NullItem) {
+ return null;
+ }
+ StringBuilder s = new StringBuilder();
+ marshal(root, s);
+ atRoot = true;
+ return s.toString();
+ }
+
+ /**
+ * We do not yet care about exact match indices
+ */
+ private void marshal(Item root, StringBuilder s) {
+ switch (root.getItemType()) {
+ case OR:
+ marshalOr((OrItem) root, s);
+ break;
+ case AND:
+ marshalAnd((CompositeItem) root, s);
+ break;
+ case NOT:
+ marshalNot((NotItem) root, s);
+ break;
+ case RANK:
+ marshalRank((RankItem) root, s);
+ break;
+ case WORD:
+ case INT:
+ case PREFIX:
+ case SUBSTRING:
+ case SUFFIX:
+ marshalWord((TermItem) root, s);
+ break;
+ case PHRASE:
+ // PhraseItem and PhraseSegmentItem don't add quotes for segmented
+ // termse
+ if (root instanceof PhraseSegmentItem) {
+ marshalPhrase((PhraseSegmentItem) root, s);
+ } else {
+ marshalPhrase((PhraseItem) root, s);
+ }
+ break;
+ case NEAR:
+ marshalNear((NearItem) root, s);
+ break;
+ case ONEAR:
+ marshalNear((ONearItem) root, s);
+ break;
+ case WEAK_AND:
+ marshalWeakAnd((WeakAndItem)root, s);
+ default:
+ break;
+ }
+ }
+
+
+ private void marshalWord(TermItem item, StringBuilder s) {
+ String index = item.getIndexName();
+ if (index.length() != 0) {
+ s.append(item.getIndexName()).append(':');
+ }
+ s.append(item.stringValue());
+ if (item.getWeight() != Item.DEFAULT_WEIGHT)
+ s.append("!").append(item.getWeight());
+ }
+
+ private void marshalRank(RankItem root, StringBuilder s) {
+ marshalComposite("RANK", root, s);
+ }
+
+ private void marshalNot(NotItem root, StringBuilder s) {
+ marshalComposite("ANDNOT", root, s);
+ }
+
+ private void marshalOr(OrItem root, StringBuilder s) {
+ marshalComposite("OR", root, s);
+ }
+
+ /**
+ * Dump WORD items, and add space between each of them unless those
+ * words came from segmentation.
+ *
+ * @param root CompositeItem
+ * @param s current marshaled query
+ */
+ private void dumpWords(CompositeItem root, StringBuilder s) {
+ for (Iterator<Item> i = root.getItemIterator(); i.hasNext();) {
+ Item word = i.next();
+ boolean useSeparator = true;
+ if (word instanceof TermItem) {
+ s.append(((TermItem) word).stringValue());
+ if (word instanceof WordItem) {
+ useSeparator = !((WordItem) word).isFromSegmented();
+ }
+ } else {
+ dumpWords((CompositeItem) word, s);
+ }
+ if (useSeparator && i.hasNext()) {
+ s.append(' ');
+ }
+ }
+ }
+
+ private void marshalPhrase(PhraseItem root, StringBuilder s) {
+ marshalPhrase(root, s, root.isExplicit(), false);
+ }
+
+ private void marshalPhrase(PhraseSegmentItem root, StringBuilder s) {
+ marshalPhrase(root, s, root.isExplicit(), true);
+ }
+
+ private void marshalPhrase(IndexedItem root, StringBuilder s, boolean isExplicit, boolean isSegmented) {
+ String index = root.getIndexName();
+ if (index.length() != 0) {
+ s.append(root.getIndexName()).append(':');
+ }
+ if (isExplicit || !isSegmented) s.append('"');
+ dumpWords((CompositeItem) root, s);
+ if (isExplicit || !isSegmented) s.append('"');
+ }
+
+ private void marshalNear(NearItem root, StringBuilder s) {
+ marshalComposite(root.getName() + "(" + root.getDistance() + ")", root, s);
+ }
+
+ // Not only AndItem returns ItemType.AND
+ private void marshalAnd(CompositeItem root, StringBuilder s) {
+ marshalComposite("AND", root, s);
+ }
+
+ private void marshalWeakAnd(WeakAndItem root, StringBuilder s) {
+ marshalComposite("WAND(" + root.getN() + ")", root, s);
+ }
+
+ private void marshalComposite(String operator, CompositeItem root, StringBuilder s) {
+ boolean useParen = !atRoot;
+ if (useParen) {
+ s.append("( ");
+ } else {
+ atRoot = false;
+ }
+ for (Iterator<Item> i = root.getItemIterator(); i.hasNext();) {
+ Item item = i.next();
+ marshal(item, s);
+ if (i.hasNext())
+ s.append(' ').append(operator).append(' ');
+ }
+ if (useParen) {
+ s.append(" )");
+ }
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/federation/vespa/ResultBuilder.java b/container-search/src/main/java/com/yahoo/search/federation/vespa/ResultBuilder.java
new file mode 100644
index 00000000000..1361c7c14db
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/vespa/ResultBuilder.java
@@ -0,0 +1,642 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.federation.vespa;
+
+import com.yahoo.log.LogLevel;
+import com.yahoo.prelude.hitfield.XMLString;
+import com.yahoo.search.Query;
+import com.yahoo.search.Result;
+import com.yahoo.search.result.ErrorMessage;
+import com.yahoo.search.result.Hit;
+import com.yahoo.search.result.HitGroup;
+import com.yahoo.search.result.Relevance;
+import com.yahoo.text.XML;
+import com.yahoo.text.DoubleParser;
+import org.xml.sax.*;
+import org.xml.sax.helpers.DefaultHandler;
+import org.xml.sax.helpers.XMLReaderFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Logger;
+
+import static com.yahoo.text.Lowercase.toLowerCase;
+
+/**
+ * Parse Vespa XML results and create Result instances.
+ *
+ * <p> TODO: Ripe for a rewrite or major refactoring.
+ *
+ * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
+ */
+@SuppressWarnings("deprecation")
+public class ResultBuilder extends DefaultHandler {
+ private static final String ERROR = "error";
+
+ private static final String FIELD = "field";
+
+ private static Logger log = Logger.getLogger(ResultBuilder.class.getName());
+
+ /** Namespaces feature id (http://xml.org/sax/features/namespaces). */
+ protected static final String NAMESPACES_FEATURE_ID = "http://xml.org/sax/features/namespaces";
+
+ /**
+ * Namespace prefixes feature id
+ * (http://xml.org/sax/features/namespace-prefixes).
+ */
+ protected static final String NAMESPACE_PREFIXES_FEATURE_ID = "http://xml.org/sax/features/namespace-prefixes";
+
+ /** Validation feature id (http://xml.org/sax/features/validation). */
+ protected static final String VALIDATION_FEATURE_ID = "http://xml.org/sax/features/validation";
+
+ /**
+ * Schema validation feature id
+ * (http://apache.org/xml/features/validation/schema).
+ */
+ protected static final String SCHEMA_VALIDATION_FEATURE_ID = "http://apache.org/xml/features/validation/schema";
+
+ /**
+ * Dynamic validation feature id
+ * (http://apache.org/xml/features/validation/dynamic).
+ */
+ protected static final String DYNAMIC_VALIDATION_FEATURE_ID = "http://apache.org/xml/features/validation/dynamic";
+
+ // default settings
+
+ /** Default parser name. */
+ protected static final String DEFAULT_PARSER_NAME = "org.apache.xerces.parsers.SAXParser";
+
+ /** Default namespaces support (false). */
+ protected static final boolean DEFAULT_NAMESPACES = false;
+
+ /** Default namespace prefixes (false). */
+ protected static final boolean DEFAULT_NAMESPACE_PREFIXES = false;
+
+ /** Default validation support (false). */
+ protected static final boolean DEFAULT_VALIDATION = false;
+
+ /** Default Schema validation support (false). */
+ protected static final boolean DEFAULT_SCHEMA_VALIDATION = false;
+
+ /** Default dynamic validation support (false). */
+ protected static final boolean DEFAULT_DYNAMIC_VALIDATION = false;
+
+ private StringBuilder fieldContent;
+
+ private String fieldName;
+
+ private int fieldLevel = 0;
+
+ private boolean hasLiteralTags = false;
+
+ private Map<String, Object> hitFields = new HashMap<>();
+ private String hitType;
+ private String hitRelevance;
+ private String hitSource;
+
+ private int offset = 0;
+
+ private List<Tag> tagStack = new ArrayList<>();
+
+ private final XMLReader parser;
+
+ private Query query;
+
+ private Result result;
+
+ private static enum ResultPart {
+ ROOT, ERRORDETAILS, HIT, HITGROUP;
+ }
+
+ Deque<ResultPart> location = new ArrayDeque<>(10);
+
+ private String currentErrorCode;
+
+ private String currentError;
+
+ private Deque<HitGroup> hitGroups = new ArrayDeque<>(5);
+
+ private static class Tag {
+ public final String name;
+
+ /**
+ * Offset is a number which is generated for all data and tags inside
+ * fields, used to determine whether a tag was closed without enclosing
+ * any characters or other tags.
+ */
+ public final int offset;
+
+ public Tag(final String name, final int offset) {
+ this.name = name;
+ this.offset = offset;
+ }
+
+ @Override
+ public String toString() {
+ return name + '(' + Integer.valueOf(offset) + ')';
+ }
+ }
+
+ /** Default constructor. */
+ public ResultBuilder() throws RuntimeException {
+ this(createParser());
+ }
+
+ public ResultBuilder(XMLReader parser) {
+ this.parser = parser;
+ this.parser.setContentHandler(this);
+ this.parser.setErrorHandler(this);
+ }
+
+ public static XMLReader createParser() {
+ ClassLoader savedContextClassLoader = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(org.apache.xerces.parsers.SAXParser.class.getClassLoader());
+
+ try {
+ XMLReader reader = XMLReaderFactory.createXMLReader(DEFAULT_PARSER_NAME);
+ setParserFeatures(reader);
+ return reader;
+ } catch (Exception e) {
+ throw new RuntimeException("error: Unable to instantiate parser ("
+ + DEFAULT_PARSER_NAME + ")", e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(savedContextClassLoader);
+ }
+ }
+
+ private static void setParserFeatures(XMLReader reader) {
+ try {
+ reader.setFeature(NAMESPACES_FEATURE_ID, DEFAULT_NAMESPACES);
+ } catch (SAXException e) {
+ log.log(LogLevel.WARNING, "warning: Parser does not support feature ("
+ + NAMESPACES_FEATURE_ID + ")");
+ }
+ try {
+ reader.setFeature(NAMESPACE_PREFIXES_FEATURE_ID,
+ DEFAULT_NAMESPACE_PREFIXES);
+ } catch (SAXException e) {
+ log.log(LogLevel.WARNING, "warning: Parser does not support feature ("
+ + NAMESPACE_PREFIXES_FEATURE_ID + ")");
+ }
+ try {
+ reader.setFeature(VALIDATION_FEATURE_ID, DEFAULT_VALIDATION);
+ } catch (SAXException e) {
+ log.log(LogLevel.WARNING, "warning: Parser does not support feature ("
+ + VALIDATION_FEATURE_ID + ")");
+ }
+ try {
+ reader.setFeature(SCHEMA_VALIDATION_FEATURE_ID,
+ DEFAULT_SCHEMA_VALIDATION);
+ } catch (SAXNotRecognizedException e) {
+ log.log(LogLevel.WARNING, "warning: Parser does not recognize feature ("
+ + SCHEMA_VALIDATION_FEATURE_ID + ")");
+
+ } catch (SAXNotSupportedException e) {
+ log.log(LogLevel.WARNING, "warning: Parser does not support feature ("
+ + SCHEMA_VALIDATION_FEATURE_ID + ")");
+ }
+
+ try {
+ reader.setFeature(DYNAMIC_VALIDATION_FEATURE_ID,
+ DEFAULT_DYNAMIC_VALIDATION);
+ } catch (SAXNotRecognizedException e) {
+ log.log(LogLevel.WARNING, "warning: Parser does not recognize feature ("
+ + DYNAMIC_VALIDATION_FEATURE_ID + ")");
+
+ } catch (SAXNotSupportedException e) {
+ log.log(LogLevel.WARNING, "warning: Parser does not support feature ("
+ + DYNAMIC_VALIDATION_FEATURE_ID + ")");
+ }
+ }
+
+ @Override
+ public void startDocument() throws SAXException {
+ reset();
+ result = new Result(query);
+ hitGroups.addFirst(result.hits());
+ location.addFirst(ResultPart.ROOT);
+ return;
+ }
+
+ private void reset() {
+ result = null;
+ fieldLevel = 0;
+ hasLiteralTags = false;
+ tagStack = null;
+ fieldContent = null;
+ offset = 0;
+ currentError = null;
+ currentErrorCode = null;
+ hitGroups.clear();
+ location.clear();
+ }
+
+ @Override
+ public void startElement(String uri, String local, String raw,
+ Attributes attrs) throws SAXException {
+ // "Everybody" wants this switch to be moved into the
+ // enum class instead, but in this case, I find the classic
+ // approach more readable.
+ switch (location.peekFirst()) {
+ case HIT:
+ if (fieldLevel > 0) {
+ tagInField(raw, attrs, FIELD);
+ ++offset;
+ return;
+ }
+ if (FIELD.equals(raw)) {
+ ++fieldLevel;
+ fieldName = attrs.getValue("name");
+ fieldContent = new StringBuilder();
+ hasLiteralTags = false;
+ }
+ break;
+ case ERRORDETAILS:
+ if (fieldLevel > 0) {
+ tagInField(raw, attrs, ERROR);
+ ++offset;
+ return;
+ }
+ if (ERROR.equals(raw)) {
+ if (attrs != null) {
+ currentErrorCode = attrs.getValue("code");
+ currentError = attrs.getValue("error");
+ }
+ ++fieldLevel;
+ fieldContent = new StringBuilder();
+ hasLiteralTags = false;
+ }
+ break;
+ case HITGROUP:
+ if ("hit".equals(raw)) {
+ startHit(attrs);
+ } else if ("group".equals(raw)) {
+ startHitGroup(attrs);
+ }
+ break;
+ case ROOT:
+ if ("hit".equals(raw)) {
+ startHit(attrs);
+ } else if ("errordetails".equals(raw)) {
+ location.addFirst(ResultPart.ERRORDETAILS);
+ } else if ("result".equals(raw)) {
+ if (attrs != null) {
+ String total = attrs.getValue("total-hit-count");
+ if (total != null) {
+ result.setTotalHitCount(Long.valueOf(total));
+ }
+ }
+ } else if ("group".equals(raw)) {
+ startHitGroup(attrs);
+ } else if (ERROR.equals(raw)) {
+ if (attrs != null) {
+ currentErrorCode = attrs.getValue("code");
+ fieldContent = new StringBuilder();
+ }
+ }
+ break;
+ }
+ ++offset;
+ }
+
+ private void startHitGroup(Attributes attrs) {
+ HitGroup g = new HitGroup();
+ Set<String> types = g.types();
+
+ final String source;
+ if (attrs != null) {
+ String groupType = attrs.getValue("type");
+ if (groupType != null) {
+ for (String s : groupType.split(" ")) {
+ if (s.length() > 0) {
+ types.add(s);
+ }
+ }
+ }
+
+ source = attrs.getValue("source");
+ } else {
+ source = null;
+ }
+
+ g.setId((source != null) ? source : "dummy");
+
+ hitGroups.peekFirst().add(g);
+ hitGroups.addFirst(g);
+ location.addFirst(ResultPart.HITGROUP);
+ }
+
+ private void startHit(Attributes attrs) {
+ hitFields.clear();
+ location.addFirst(ResultPart.HIT);
+ if (attrs != null) {
+ hitRelevance = attrs.getValue("relevancy");
+ hitSource = attrs.getValue("source");
+ hitType = attrs.getValue("type");
+ } else {
+ hitRelevance = null;
+ hitSource = null;
+ hitType = null;
+ }
+ }
+
+ private void tagInField(String tag, Attributes attrs, String enclosingTag) {
+ if (!hasLiteralTags) {
+ hasLiteralTags = true;
+ String fieldTillNow = XML.xmlEscape(fieldContent.toString(), false);
+ fieldContent = new StringBuilder(fieldTillNow);
+ tagStack = new ArrayList<>();
+ }
+ if (enclosingTag.equals(tag)) {
+ ++fieldLevel;
+ }
+ if (tagStack.size() > 0) {
+ Tag prevTag = tagStack.get(tagStack.size() - 1);
+ if (prevTag != null && (prevTag.offset + 1) == offset) {
+ fieldContent.append(">");
+ }
+ }
+ fieldContent.append("<").append(tag);
+ if (attrs != null) {
+ int attrCount = attrs.getLength();
+ for (int i = 0; i < attrCount; i++) {
+ fieldContent.append(" ").append(attrs.getQName(i))
+ .append("=\"").append(
+ XML.xmlEscape(attrs.getValue(i), true)).append(
+ "\"");
+ }
+ }
+ tagStack.add(new Tag(tag, offset));
+ }
+
+ private void endElementInField(String qName, String enclosingTag) {
+ Tag prevTag = tagStack.get(tagStack.size() - 1);
+ if (qName.equals(prevTag.name) && offset == (prevTag.offset + 1)) {
+ fieldContent.append(" />");
+ } else {
+ fieldContent.append("</").append(qName).append('>');
+ }
+ if (prevTag.name.equals(qName)) {
+ tagStack.remove(tagStack.size() - 1);
+ }
+ }
+
+ private void endElementInHitField(String qName) {
+ if (FIELD.equals(qName) && --fieldLevel == 0) {
+ Object content;
+ if (hasLiteralTags) {
+ content = new XMLString(fieldContent.toString());
+ } else {
+ content = fieldContent.toString();
+ }
+ hitFields.put(fieldName, content);
+ if ("collapseId".equals(fieldName)) {
+ hitFields.put(fieldName, Integer.valueOf(content.toString()));
+ }
+ fieldName = null;
+ fieldContent = null;
+ tagStack = null;
+ } else {
+ Tag prevTag = tagStack.get(tagStack.size() - 1);
+ if (qName.equals(prevTag.name) && offset == (prevTag.offset + 1)) {
+ fieldContent.append(" />");
+ } else {
+ fieldContent.append("</").append(qName).append('>');
+ }
+ if (prevTag.name.equals(qName)) {
+ tagStack.remove(tagStack.size() - 1);
+ }
+ }
+ }
+ @Override
+ public void characters(char ch[], int start, int length)
+ throws SAXException {
+
+ switch (location.peekFirst()) {
+ case ERRORDETAILS:
+ case HIT:
+ if (fieldLevel > 0) {
+ if (hasLiteralTags) {
+ if (tagStack.size() > 0) {
+ Tag tag = tagStack.get(tagStack.size() - 1);
+ if (tag != null && (tag.offset + 1) == offset) {
+ fieldContent.append(">");
+ }
+ }
+ fieldContent.append(
+ XML.xmlEscape(new String(ch, start, length), false));
+ } else {
+ fieldContent.append(ch, start, length);
+ }
+ }
+ break;
+ default:
+ if (fieldContent != null) {
+ fieldContent.append(ch, start, length);
+ }
+ break;
+ }
+ ++offset;
+ }
+
+ @Override
+ public void ignorableWhitespace(char ch[], int start, int length)
+ throws SAXException {
+ return;
+ }
+
+ @Override
+ public void processingInstruction(String target, String data)
+ throws SAXException {
+ return;
+ }
+
+ @Override
+ public void endElement(String namespaceURI, String localName, String qName)
+ throws SAXException {
+ switch (location.peekFirst()) {
+ case HITGROUP:
+ if ("group".equals(qName)) {
+ hitGroups.removeFirst();
+ location.removeFirst();
+ }
+ break;
+ case HIT:
+ if (fieldLevel > 0) {
+ endElementInHitField(qName);
+ } else if ("hit".equals(qName)) {
+ //assert(hitKeys.size() == hitValues.size());
+ //We try to get either uri or documentID and use that as id
+ Object docId = extractDocumentID();
+ Hit newHit = new Hit(docId.toString());
+ if (hitRelevance != null) newHit.setRelevance(new Relevance(DoubleParser.parse(hitRelevance)));
+ if(hitSource != null) newHit.setSource(hitSource);
+ if(hitType != null) {
+ for(String type: hitType.split(" ")) {
+ newHit.types().add(type);
+ }
+ }
+ for(Map.Entry<String, Object> field : hitFields.entrySet()) {
+ newHit.setField(field.getKey(), field.getValue());
+ }
+
+ hitGroups.peekFirst().add(newHit);
+ location.removeFirst();
+ }
+ break;
+ case ERRORDETAILS:
+ if (fieldLevel == 1 && ERROR.equals(qName)) {
+ ErrorMessage error = new ErrorMessage(Integer.valueOf(currentErrorCode),
+ currentError,
+ fieldContent.toString());
+ hitGroups.peekFirst().addError(error);
+ currentError = null;
+ currentErrorCode = null;
+ fieldContent = null;
+ tagStack = null;
+ fieldLevel = 0;
+ } else if (fieldLevel > 0) {
+ endElementInField(qName, ERROR);
+ } else if ("errordetails".equals(qName)) {
+ location.removeFirst();
+ }
+ break;
+ case ROOT:
+ if (ERROR.equals(qName)) {
+ ErrorMessage error = new ErrorMessage(Integer.valueOf(currentErrorCode),
+ fieldContent.toString());
+ hitGroups.peekFirst().setError(error);
+ currentErrorCode = null;
+ fieldContent = null;
+ }
+ break;
+ default:
+ break;
+ }
+ ++offset;
+ }
+
+ private Object extractDocumentID() {
+ Object docId = null;
+ if (hitFields.containsKey("uri")) {
+ docId = hitFields.get("uri");
+ } else {
+ final String documentId = "documentId";
+ if (hitFields.containsKey(documentId)) {
+ docId = hitFields.get(documentId);
+ } else {
+ final String lcDocumentId = toLowerCase(documentId);
+ for (Map.Entry<String, Object> e : hitFields.entrySet()) {
+ String key = e.getKey();
+ // case insensitive matching, checking length first hoping to avoid some lowercasing
+ if (documentId.length() == key.length() && lcDocumentId.equals(toLowerCase(key))) {
+ docId = e.getValue();
+ break;
+ }
+ }
+ }
+ }
+ if (docId == null) {
+ docId = "dummy";
+ log.info("Results from vespa backend did not contain either uri or documentId");
+ }
+ return docId;
+ }
+
+ @Override
+ public void warning(SAXParseException ex) throws SAXException {
+ printError("Warning", ex);
+ }
+
+ @Override
+ public void error(SAXParseException ex) throws SAXException {
+ printError("Error", ex);
+ }
+
+ @Override
+ public void fatalError(SAXParseException ex) throws SAXException {
+ printError("Fatal Error", ex);
+ // throw ex;
+ }
+
+ /** Prints the error message. */
+ protected void printError(String type, SAXParseException ex) {
+ StringBuilder errorMessage = new StringBuilder();
+
+ errorMessage.append(type);
+ if (ex != null) {
+ String systemId = ex.getSystemId();
+ if (systemId != null) {
+ int index = systemId.lastIndexOf('/');
+ if (index != -1)
+ systemId = systemId.substring(index + 1);
+ errorMessage.append(' ').append(systemId);
+ }
+ }
+ errorMessage.append(':')
+ .append(ex.getLineNumber())
+ .append(':')
+ .append(ex.getColumnNumber())
+ .append(": ")
+ .append(ex.getMessage());
+ log.log(LogLevel.WARNING, errorMessage.toString());
+
+ }
+
+ public Result parse(String identifier, Query query) {
+ Result toReturn;
+
+ setQuery(query);
+ try {
+ parser.parse(identifier);
+ } catch (SAXParseException e) {
+ // ignore
+ } catch (Exception e) {
+ log.log(LogLevel.WARNING, "Error parsing result from Vespa",e);
+ Exception se = e;
+ if (e instanceof SAXException) {
+ se = ((SAXException) e).getException();
+ }
+ if (se != null)
+ se.printStackTrace(System.err);
+ else
+ e.printStackTrace(System.err);
+ }
+ toReturn = result;
+ reset();
+ return toReturn;
+ }
+
+ public Result parse(InputSource input, Query query) {
+ Result toReturn;
+
+ setQuery(query);
+ try {
+ parser.parse(input);
+ } catch (SAXParseException e) {
+ // ignore
+ } catch (Exception e) {
+ log.log(LogLevel.WARNING, "Error parsing result from Vespa",e);
+ Exception se = e;
+ if (e instanceof SAXException) {
+ se = ((SAXException) e).getException();
+ }
+ if (se != null)
+ se.printStackTrace(System.err);
+ else
+ e.printStackTrace(System.err);
+ }
+ toReturn = result;
+ reset();
+ return toReturn;
+ }
+
+
+ private void setQuery(Query query) {
+ this.query = query;
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/federation/vespa/VespaSearcher.java b/container-search/src/main/java/com/yahoo/search/federation/vespa/VespaSearcher.java
new file mode 100644
index 00000000000..26c9b8ad2cd
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/vespa/VespaSearcher.java
@@ -0,0 +1,270 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.federation.vespa;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.util.Map;
+import java.util.Set;
+
+import org.xml.sax.InputSource;
+import org.xml.sax.XMLReader;
+
+import com.google.inject.Inject;
+import com.yahoo.collections.Tuple2;
+import com.yahoo.component.ComponentId;
+import com.yahoo.component.Version;
+import com.yahoo.component.chain.dependencies.After;
+import com.yahoo.component.chain.dependencies.Provides;
+import com.yahoo.language.Linguistics;
+import com.yahoo.log.LogLevel;
+import com.yahoo.prelude.query.Item;
+import com.yahoo.prelude.query.QueryCanonicalizer;
+import com.yahoo.processing.request.CompoundName;
+import com.yahoo.search.Query;
+import com.yahoo.search.Result;
+import com.yahoo.search.cache.QrBinaryCacheConfig;
+import com.yahoo.search.cache.QrBinaryCacheRegionConfig;
+import com.yahoo.search.federation.FederationSearcher;
+import com.yahoo.search.federation.ProviderConfig;
+import com.yahoo.search.federation.http.ConfiguredHTTPProviderSearcher;
+import com.yahoo.search.federation.http.Connection;
+import com.yahoo.search.intent.model.IntentModel;
+import com.yahoo.search.query.QueryTree;
+import com.yahoo.search.query.textserialize.TextSerialize;
+import com.yahoo.search.yql.MinimalQueryInserter;
+import com.yahoo.statistics.Statistics;
+
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Backend searcher for external Vespa clusters (queried over http).
+ *
+ * <p>If the "sources" argument should be honored on an external cluster
+ * when using YQL+, override {@link #chooseYqlSources(Set)}.</p>
+ *
+ * @author <a href="mailto:arnebef@yahoo-inc.com">Arne Bergene Fossaa</a>
+ * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
+ */
+@Provides("Vespa")
+@After("*")
+public class VespaSearcher extends ConfiguredHTTPProviderSearcher {
+ private final ThreadLocal<XMLReader> readerHolder = new ThreadLocal<>();
+ private final Query.Type queryType;
+ private final Tuple2<String, Version> segmenterVersion;
+
+ private static final CompoundName select = new CompoundName("select");
+ private static final CompoundName streamingUserid = new CompoundName(
+ "streaming.userid");
+ private static final CompoundName streamingGroupname = new CompoundName(
+ "streaming.groupname");
+ private static final CompoundName streamingSelection = new CompoundName(
+ "streaming.selection");
+
+ /** Create an instance from configuration */
+ public VespaSearcher(ComponentId id, ProviderConfig config,
+ QrBinaryCacheConfig c, QrBinaryCacheRegionConfig r,
+ Statistics statistics) {
+ this(id, config, c, r, statistics, null);
+ }
+
+ /**
+ * Create an instance from configuration
+ *
+ * @param linguistics used for generating meta info for YQL+
+ */
+ @Inject
+ public VespaSearcher(ComponentId id, ProviderConfig config,
+ QrBinaryCacheConfig c, QrBinaryCacheRegionConfig r,
+ Statistics statistics, @Nullable Linguistics linguistics) {
+ super(id, config, c, r, statistics);
+ queryType = toQueryType(config.queryType());
+ if (linguistics == null) {
+ segmenterVersion = null;
+ } else {
+ segmenterVersion = linguistics.getVersion(Linguistics.Component.SEGMENTER);
+ }
+ }
+
+ /**
+ * Create an instance from direct parameters having a single connection.
+ * Useful for testing
+ */
+ public VespaSearcher(String idString, String host, int port, String path) {
+ super(idString, host, port, path, Statistics.nullImplementation);
+ queryType = toQueryType(ProviderConfig.QueryType.LEGACY);
+ segmenterVersion = null;
+ }
+
+ void addProperty(Map<String, String> queryMap, Query query,
+ CompoundName property) {
+ Object o = query.properties().get(property);
+ if (o != null) {
+ queryMap.put(property.toString(), o.toString());
+ }
+ }
+
+ @Override
+ public Map<String, String> getQueryMap(Query query) {
+ Map<String, String> queryMap = getQueryMapWithoutHitsOffset(query);
+ queryMap.put("offset", Integer.toString(query.getOffset()));
+ queryMap.put("hits", Integer.toString(query.getHits()));
+ queryMap.put("presentation.format", "xml");
+
+ addProperty(queryMap, query, select);
+ addProperty(queryMap, query, streamingUserid);
+ addProperty(queryMap, query, streamingGroupname);
+ addProperty(queryMap, query, streamingSelection);
+ return queryMap;
+ }
+
+ @Override
+ public Map<String, String> getCacheKey(Query q) {
+ return getQueryMapWithoutHitsOffset(q);
+ }
+
+ private Map<String, String> getQueryMapWithoutHitsOffset(Query query) {
+ Map<String, String> queryMap = super.getQueryMap(query);
+ if (queryType == Query.Type.YQL) {
+ queryMap.put(MinimalQueryInserter.YQL.toString(), marshalQuery(query));
+ } else {
+ queryMap.put("query", marshalQuery(query.getModel().getQueryTree()));
+ queryMap.put("type", queryType.toString());
+ }
+
+ addNonExcludedSourceProperties(query, queryMap);
+ return queryMap;
+ }
+
+ Query.Type toQueryType(ProviderConfig.QueryType.Enum providerQueryType) {
+ if (providerQueryType == ProviderConfig.QueryType.LEGACY) {
+ return Query.Type.ADVANCED;
+ } else if (providerQueryType == ProviderConfig.QueryType.PROGRAMMATIC) {
+ return Query.Type.PROGRAMMATIC;
+ } else if (providerQueryType == ProviderConfig.QueryType.YQL) {
+ return Query.Type.YQL;
+ } else {
+ throw new RuntimeException("Query type " + providerQueryType
+ + " unsupported.");
+ }
+ }
+
+ /**
+ * Serialize the query parameter for outgoing queries. For YQL+ queries,
+ * sources and fields will be set to all sources and all fields, to follow
+ * the behavior of other query types.
+ *
+ * @param query
+ * the current, outgoing query
+ * @return a string to include in an HTTP request
+ */
+ public String marshalQuery(Query query) {
+ if (queryType != Query.Type.YQL) {
+ return marshalQuery(query.getModel().getQueryTree());
+ }
+
+ Query workQuery = query.clone();
+ String error = QueryCanonicalizer.canonicalize(workQuery);
+ if (error != null) {
+ getLogger().log(LogLevel.WARNING,
+ "Could not normalize [" + query.toString() + "]: " + error);
+ // Just returning null here is the pattern from existing code...
+ return null;
+ }
+ chooseYqlSources(workQuery.getModel().getSources());
+ chooseYqlSummaryFields(workQuery.getPresentation().getSummaryFields());
+ return workQuery.yqlRepresentation(getSegmenterVersion(), false);
+ }
+
+ public String marshalQuery(QueryTree root) {
+ QueryCanonicalizer.QueryWrapper qw = new QueryCanonicalizer.QueryWrapper();
+ root = root.clone();
+ qw.setRoot(root.getRoot());
+ boolean could = QueryCanonicalizer.treeCanonicalize(qw, root.getRoot(),
+ null);
+ if (!could) {
+ return null;
+ }
+ return marshalRoot(qw.getRoot());
+ }
+
+ private String marshalRoot(Item root) {
+ switch (queryType) {
+ case ADVANCED:
+ QueryMarshaller marshaller = new QueryMarshaller();
+ return marshaller.marshal(root);
+ case PROGRAMMATIC:
+ return TextSerialize.serialize(root);
+ default:
+ throw new RuntimeException("Unsupported query type.");
+ }
+ }
+
+ private XMLReader getReader() {
+ XMLReader reader = readerHolder.get();
+ if (reader == null) {
+ reader = ResultBuilder.createParser();
+ readerHolder.set(reader);
+ }
+ return reader;
+ }
+
+ @Override
+ public void unmarshal(InputStream stream, long contentLength, Result result) {
+ ResultBuilder parser = new ResultBuilder(getReader());
+ Result mResult = parser.parse(new InputSource(stream),
+ result.getQuery());
+ result.mergeWith(mResult);
+ result.hits().addAll(mResult.hits().asUnorderedHits());
+ }
+
+ /** Returns the canonical Vespa ping URI, http://host:port/status.html */
+ @Override
+ public URI getPingURI(Connection connection) throws MalformedURLException,
+ URISyntaxException {
+ return new URL(getParameters().getSchema(), connection.getHost(),
+ connection.getPort(), "/status.html").toURI();
+ }
+
+ /**
+ * Get the segmenter version data used when creating YQL queries. Useful if
+ * overriding {@link #marshalQuery(Query)}.
+ *
+ * @return a tuple with the name of the segmenting engine in use, and its
+ * version
+ */
+ protected Tuple2<String, Version> getSegmenterVersion() {
+ return segmenterVersion;
+ }
+
+ /**
+ * Choose which source arguments to use for the external cluster when
+ * generating a YQL+ query string. This is called from
+ * {@link #marshalQuery(Query)}. The default implementation clears the set,
+ * i.e. requests all sources. Other implementations may modify the source
+ * set as they see fit, or simply do nothing.
+ *
+ * @param sources
+ * the set of source names to use for the outgoing query
+ */
+ protected void chooseYqlSources(Set<String> sources) {
+ sources.clear();
+ }
+
+ /**
+ * Choose which summary fields to request from the external cluster when
+ * generating a YQL+ query string. This is called from
+ * {@link #marshalQuery(Query)}. The default implementation clears the set,
+ * i.e. requests all fields. Other implementations may modify the summary
+ * field set as they see fit, or simply do nothing.
+ *
+ * @param summaryFields
+ * the set of source names to use for the outgoing query
+ */
+ protected void chooseYqlSummaryFields(Set<String> summaryFields) {
+ summaryFields.clear();
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/federation/vespa/package-info.java b/container-search/src/main/java/com/yahoo/search/federation/vespa/package-info.java
new file mode 100644
index 00000000000..6a9f1decb21
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/federation/vespa/package-info.java
@@ -0,0 +1,7 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+@ExportPackage
+@PublicApi
+package com.yahoo.search.federation.vespa;
+
+import com.yahoo.api.annotations.PublicApi;
+import com.yahoo.osgi.annotation.ExportPackage;